1 module dpq2.connection; 2 3 import dpq2; 4 5 import std.conv: to; 6 import std.string: toStringz, fromStringz; 7 import std.exception: enforce, enforceEx; 8 import std.range; 9 import std.stdio: File; 10 import std.socket; 11 import core.exception; 12 import core.time: Duration; 13 14 /* 15 * Bugs: On Unix connection is not thread safe. 16 * 17 * On Unix, forking a process with open libpq connections can lead 18 * to unpredictable results because the parent and child processes share 19 * the same sockets and operating system resources. For this reason, 20 * such usage is not recommended, though doing an exec from the child 21 * process to load a new executable is safe. 22 23 24 25 int PQisthreadsafe(); 26 Returns 1 if the libpq is thread-safe and 0 if it is not. 27 */ 28 29 /// dumb flag for Connection ctor parametrization 30 struct ConnectionStart {}; 31 32 /// Connection 33 class Connection 34 { 35 //string connString; /// Database connection parameters 36 package PGconn* conn; 37 38 invariant 39 { 40 assert(conn !is null); 41 } 42 43 this(string connString) 44 { 45 conn = PQconnectdb(toStringz(connString)); 46 47 enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 48 49 if(status != CONNECTION_OK) 50 throw new ConnectionException(this, __FILE__, __LINE__); 51 } 52 53 /// Connect to DB in a nonblocking manner 54 this(ConnectionStart, string connString) 55 { 56 conn = PQconnectStart(toStringz(connString)); 57 58 enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 59 60 if( status == CONNECTION_BAD ) 61 throw new ConnectionException(this, __FILE__, __LINE__); 62 } 63 64 ~this() 65 { 66 PQfinish( conn ); 67 } 68 69 mixin Queries; 70 71 @property bool isNonBlocking() 72 { 73 return PQisnonblocking(conn) == 1; 74 } 75 76 private void setNonBlocking( bool state ) 77 { 78 if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 ) 79 throw new ConnectionException(this, __FILE__, __LINE__); 80 } 81 82 void resetStart() 83 { 84 if(PQresetStart(conn) == 0) 85 throw new ConnectionException(this, __FILE__, __LINE__); 86 } 87 88 PostgresPollingStatusType poll() nothrow 89 { 90 assert(conn); 91 92 return PQconnectPoll(conn); 93 } 94 95 PostgresPollingStatusType resetPoll() nothrow 96 { 97 assert(conn); 98 99 return PQresetPoll(conn); 100 } 101 102 ConnStatusType status() nothrow 103 { 104 return PQstatus(conn); 105 } 106 107 void consumeInput() 108 { 109 assert(conn); 110 111 const size_t r = PQconsumeInput( conn ); 112 if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__); 113 } 114 115 package bool flush() 116 { 117 assert(conn); 118 119 auto r = PQflush(conn); 120 if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__); 121 return r == 0; 122 } 123 124 int posixSocket() 125 { 126 int r = PQsocket(conn); 127 128 if(r == -1) 129 throw new ConnectionException(this, __FILE__, __LINE__); 130 131 return r; 132 } 133 134 Socket socket() 135 { 136 version(Posix) 137 { 138 import core.sys.posix.unistd: dup; 139 140 socket_t s = cast(socket_t) dup(cast(socket_t) posixSocket); 141 return new Socket(s, AddressFamily.UNSPEC); 142 } 143 else version(Windows) 144 { 145 assert(false, "FIXME: implement socket duplication"); 146 } 147 } 148 149 string errorMessage() const nothrow 150 { 151 return PQerrorMessage(conn).to!string; 152 } 153 154 /** 155 * returns the previous notice receiver or processor function pointer, and sets the new value. 156 * If you supply a null function pointer, no action is taken, but the current pointer is returned. 157 */ 158 PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow 159 { 160 assert(conn); 161 162 return PQsetNoticeProcessor(conn, proc, arg); 163 } 164 165 /// Get for the next result from a sendQuery. Can return null. 166 immutable(Result) getResult() 167 { 168 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 169 auto r = cast(immutable) PQgetResult(conn); 170 171 if(r) 172 { 173 auto container = new immutable ResultContainer(r); 174 return new immutable Result(container); 175 } 176 177 return null; 178 } 179 180 /// Get result from PQexec* functions or throw error if pull is empty 181 package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const 182 { 183 if(r is null) throw new ConnectionException(this, __FILE__, __LINE__); 184 185 return new immutable ResultContainer(r); 186 } 187 188 bool setSingleRowMode() 189 { 190 return PQsetSingleRowMode(conn) == 1; 191 } 192 193 /** 194 If the cancellation is effective, the current command will 195 terminate early and return an error result (exception). If the 196 cancellation fails (say, because the server was already done 197 processing the command), then there will be no visible result at 198 all. 199 */ 200 void cancel() 201 { 202 auto c = new Cancellation(this); 203 c.doCancel; 204 } 205 206 bool isBusy() nothrow 207 { 208 assert(conn); 209 210 return PQisBusy(conn) == 1; 211 } 212 213 string parameterStatus(string paramName) 214 { 215 assert(conn); 216 217 auto res = PQparameterStatus(conn, toStringz(paramName)); 218 219 if(res is null) 220 throw new ConnectionException(this, __FILE__, __LINE__); 221 222 return to!string(fromStringz(res)); 223 } 224 225 string escapeLiteral(string msg) 226 { 227 assert(conn); 228 229 auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length); 230 231 if(buf is null) 232 throw new ConnectionException(this, __FILE__, __LINE__); 233 234 string res = buf.fromStringz.to!string; 235 236 PQfreemem(buf); 237 238 return res; 239 } 240 241 string escapeIdentifier(string msg) 242 { 243 assert(conn); 244 245 auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length); 246 247 if(buf is null) 248 throw new ConnectionException(this, __FILE__, __LINE__); 249 250 string res = buf.fromStringz.to!string; 251 252 PQfreemem(buf); 253 254 return res; 255 } 256 257 string host() const nothrow 258 { 259 assert(conn); 260 261 return PQhost(conn).fromStringz.to!string; 262 } 263 264 void trace(ref File stream) 265 { 266 PQtrace(conn, stream.getFP); 267 } 268 269 void untrace() 270 { 271 PQuntrace(conn); 272 } 273 274 void setClientEncoding(string encoding) 275 { 276 if(PQsetClientEncoding(conn, encoding.toStringz) != 0) 277 throw new ConnectionException(this, __FILE__, __LINE__); 278 } 279 } 280 281 void connStringCheck(string connString) 282 { 283 char* errmsg = null; 284 PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg); 285 286 if(r is null) 287 { 288 enforceEx!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data"); 289 } 290 else 291 { 292 PQconninfoFree(r); 293 } 294 295 if(errmsg !is null) 296 { 297 string s = errmsg.fromStringz.to!string; 298 PQfreemem(cast(void*) errmsg); 299 300 throw new ConnectionException(s, __FILE__, __LINE__); 301 } 302 } 303 304 unittest 305 { 306 connStringCheck("dbname=postgres user=postgres"); 307 308 { 309 bool raised = false; 310 311 try 312 connStringCheck("wrong conninfo string"); 313 catch(ConnectionException e) 314 raised = true; 315 316 assert(raised); 317 } 318 } 319 320 /// Doing canceling queries in progress 321 class Cancellation 322 { 323 private PGcancel* cancel; 324 325 this(Connection c) 326 { 327 cancel = PQgetCancel(c.conn); 328 329 if(cancel is null) 330 throw new ConnectionException(c, __FILE__, __LINE__); 331 } 332 333 ~this() 334 { 335 PQfreeCancel(cancel); 336 } 337 338 /** 339 Successful dispatch is no guarantee that the request will have any 340 effect, however. If the cancellation is effective, the current 341 command will terminate early and return an error result 342 (exception). If the cancellation fails (say, because the server 343 was already done processing the command), then there will be no 344 visible result at all. 345 */ 346 void doCancel() 347 { 348 char[256] errbuf; 349 auto res = PQcancel(cancel, errbuf.ptr, errbuf.length); 350 351 if(res != 1) 352 throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__); 353 } 354 } 355 356 class CancellationException : Dpq2Exception 357 { 358 this(string msg, string file, size_t line) 359 { 360 super(msg, file, line); 361 } 362 } 363 364 /// Connection exception 365 class ConnectionException : Dpq2Exception 366 { 367 this(in Connection c, string file, size_t line) 368 { 369 super(c.errorMessage(), file, line); 370 } 371 372 this(string msg, string file, size_t line) 373 { 374 super(msg, file, line); 375 } 376 } 377 378 void _integration_test( string connParam ) 379 { 380 assert( PQlibVersion() >= 9_0100 ); 381 382 { 383 auto c = new Connection(connParam); 384 385 destroy(c); 386 } 387 388 { 389 bool exceptionFlag = false; 390 391 try 392 auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!"); 393 catch(ConnectionException e) 394 { 395 exceptionFlag = true; 396 assert(e.msg.length > 40); // error message check 397 } 398 finally 399 assert(exceptionFlag); 400 } 401 402 { 403 auto c = new Connection(connParam); 404 405 assert(c.escapeLiteral("abc'def") == "'abc''def'"); 406 assert(c.escapeIdentifier("abc'def") == "\"abc'def\""); 407 408 c.setClientEncoding("WIN866"); 409 assert(c.exec("show client_encoding")[0][0].as!string == "WIN866"); 410 } 411 }