1 module dpq2.connection; 2 3 import dpq2; 4 import dpq2.exception; 5 6 import std.conv: to; 7 import std..string: toStringz, fromStringz; 8 import std.exception: enforce, enforceEx; 9 import std.range; 10 import std.stdio: File; 11 import std.socket; 12 import core.exception; 13 import core.time: Duration; 14 15 /* 16 * Bugs: On Unix connection is not thread safe. 17 * 18 * On Unix, forking a process with open libpq connections can lead 19 * to unpredictable results because the parent and child processes share 20 * the same sockets and operating system resources. For this reason, 21 * such usage is not recommended, though doing an exec from the child 22 * process to load a new executable is safe. 23 24 25 26 int PQisthreadsafe(); 27 Returns 1 if the libpq is thread-safe and 0 if it is not. 28 */ 29 30 /// dumb flag for Connection ctor parametrization 31 struct ConnectionStart {}; 32 33 /// Connection 34 class Connection 35 { 36 //string connString; /// Database connection parameters 37 package PGconn* conn; 38 39 invariant 40 { 41 assert(conn !is null); 42 } 43 44 this(string connString) 45 { 46 conn = PQconnectdb(toStringz(connString)); 47 48 enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 49 50 if(status != CONNECTION_OK) 51 throw new ConnectionException(this, __FILE__, __LINE__); 52 } 53 54 /// Connect to DB in a nonblocking manner 55 this(ConnectionStart, string connString) 56 { 57 conn = PQconnectStart(toStringz(connString)); 58 59 enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 60 61 if( status == CONNECTION_BAD ) 62 throw new ConnectionException(this, __FILE__, __LINE__); 63 } 64 65 ~this() 66 { 67 PQfinish( conn ); 68 } 69 70 mixin Queries; 71 72 @property bool isNonBlocking() 73 { 74 return PQisnonblocking(conn) == 1; 75 } 76 77 private void setNonBlocking( bool state ) 78 { 79 if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 ) 80 throw new ConnectionException(this, __FILE__, __LINE__); 81 } 82 83 void resetStart() 84 { 85 if(PQresetStart(conn) == 0) 86 throw new ConnectionException(this, __FILE__, __LINE__); 87 } 88 89 PostgresPollingStatusType poll() nothrow 90 { 91 assert(conn); 92 93 return PQconnectPoll(conn); 94 } 95 96 PostgresPollingStatusType resetPoll() nothrow 97 { 98 assert(conn); 99 100 return PQresetPoll(conn); 101 } 102 103 ConnStatusType status() nothrow 104 { 105 return PQstatus(conn); 106 } 107 108 void consumeInput() 109 { 110 assert(conn); 111 112 const size_t r = PQconsumeInput( conn ); 113 if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__); 114 } 115 116 package bool flush() 117 { 118 assert(conn); 119 120 auto r = PQflush(conn); 121 if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__); 122 return r == 0; 123 } 124 125 int posixSocket() 126 { 127 int r = PQsocket(conn); 128 129 if(r == -1) 130 throw new ConnectionException(this, __FILE__, __LINE__); 131 132 return r; 133 } 134 135 Socket socket() 136 { 137 version(Posix) 138 { 139 import core.sys.posix.unistd: dup; 140 141 socket_t s = cast(socket_t) dup(cast(socket_t) posixSocket); 142 return new Socket(s, AddressFamily.UNSPEC); 143 } 144 else version(Windows) 145 { 146 assert(false, "FIXME: implement socket duplication"); 147 } 148 } 149 150 string errorMessage() const nothrow 151 { 152 return PQerrorMessage(conn).to!string; 153 } 154 155 /** 156 * returns the previous notice receiver or processor function pointer, and sets the new value. 157 * If you supply a null function pointer, no action is taken, but the current pointer is returned. 158 */ 159 PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow 160 { 161 assert(conn); 162 163 return PQsetNoticeProcessor(conn, proc, arg); 164 } 165 166 /// Get for the next result from a sendQuery. Can return null. 167 immutable(Result) getResult() 168 { 169 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 170 auto r = cast(immutable) PQgetResult(conn); 171 172 if(r) 173 { 174 auto container = new immutable ResultContainer(r); 175 return new immutable Result(container); 176 } 177 178 return null; 179 } 180 181 /// Get result from PQexec* functions or throw error if pull is empty 182 package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const 183 { 184 if(r is null) throw new ConnectionException(this, __FILE__, __LINE__); 185 186 return new immutable ResultContainer(r); 187 } 188 189 bool setSingleRowMode() 190 { 191 return PQsetSingleRowMode(conn) == 1; 192 } 193 194 /** 195 If the cancellation is effective, the current command will 196 terminate early and return an error result (exception). If the 197 cancellation fails (say, because the server was already done 198 processing the command), then there will be no visible result at 199 all. 200 */ 201 void cancel() 202 { 203 auto c = new Cancellation(this); 204 c.doCancel; 205 } 206 207 bool isBusy() nothrow 208 { 209 assert(conn); 210 211 return PQisBusy(conn) == 1; 212 } 213 214 string parameterStatus(string paramName) 215 { 216 assert(conn); 217 218 auto res = PQparameterStatus(conn, toStringz(paramName)); 219 220 if(res is null) 221 throw new ConnectionException(this, __FILE__, __LINE__); 222 223 return to!string(fromStringz(res)); 224 } 225 226 string escapeLiteral(string msg) 227 { 228 assert(conn); 229 230 auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length); 231 232 if(buf is null) 233 throw new ConnectionException(this, __FILE__, __LINE__); 234 235 string res = buf.fromStringz.to!string; 236 237 PQfreemem(buf); 238 239 return res; 240 } 241 242 string escapeIdentifier(string msg) 243 { 244 assert(conn); 245 246 auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length); 247 248 if(buf is null) 249 throw new ConnectionException(this, __FILE__, __LINE__); 250 251 string res = buf.fromStringz.to!string; 252 253 PQfreemem(buf); 254 255 return res; 256 } 257 258 string host() const nothrow 259 { 260 assert(conn); 261 262 return PQhost(conn).fromStringz.to!string; 263 } 264 265 void trace(ref File stream) 266 { 267 PQtrace(conn, stream.getFP); 268 } 269 270 void untrace() 271 { 272 PQuntrace(conn); 273 } 274 275 void setClientEncoding(string encoding) 276 { 277 if(PQsetClientEncoding(conn, encoding.toStringz) != 0) 278 throw new ConnectionException(this, __FILE__, __LINE__); 279 } 280 } 281 282 void connStringCheck(string connString) 283 { 284 char* errmsg = null; 285 PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg); 286 287 if(r is null) 288 { 289 enforceEx!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data"); 290 } 291 else 292 { 293 PQconninfoFree(r); 294 } 295 296 if(errmsg !is null) 297 { 298 string s = errmsg.fromStringz.to!string; 299 PQfreemem(cast(void*) errmsg); 300 301 throw new ConnectionException(s, __FILE__, __LINE__); 302 } 303 } 304 305 unittest 306 { 307 connStringCheck("dbname=postgres user=postgres"); 308 309 { 310 bool raised = false; 311 312 try 313 connStringCheck("wrong conninfo string"); 314 catch(ConnectionException e) 315 raised = true; 316 317 assert(raised); 318 } 319 } 320 321 /// Doing canceling queries in progress 322 class Cancellation 323 { 324 private PGcancel* cancel; 325 326 this(Connection c) 327 { 328 cancel = PQgetCancel(c.conn); 329 330 if(cancel is null) 331 throw new ConnectionException(c, __FILE__, __LINE__); 332 } 333 334 ~this() 335 { 336 PQfreeCancel(cancel); 337 } 338 339 /** 340 Successful dispatch is no guarantee that the request will have any 341 effect, however. If the cancellation is effective, the current 342 command will terminate early and return an error result 343 (exception). If the cancellation fails (say, because the server 344 was already done processing the command), then there will be no 345 visible result at all. 346 */ 347 void doCancel() 348 { 349 char[256] errbuf; 350 auto res = PQcancel(cancel, errbuf.ptr, errbuf.length); 351 352 if(res != 1) 353 throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__); 354 } 355 } 356 357 class CancellationException : Dpq2Exception 358 { 359 this(string msg, string file, size_t line) 360 { 361 super(msg, file, line); 362 } 363 } 364 365 /// Connection exception 366 class ConnectionException : Dpq2Exception 367 { 368 this(in Connection c, string file, size_t line) 369 { 370 super(c.errorMessage(), file, line); 371 } 372 373 this(string msg, string file, size_t line) 374 { 375 super(msg, file, line); 376 } 377 } 378 379 void _integration_test( string connParam ) 380 { 381 assert( PQlibVersion() >= 9_0100 ); 382 383 { 384 auto c = new Connection(connParam); 385 386 destroy(c); 387 } 388 389 { 390 bool exceptionFlag = false; 391 392 try 393 auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!"); 394 catch(ConnectionException e) 395 { 396 exceptionFlag = true; 397 assert(e.msg.length > 40); // error message check 398 } 399 finally 400 assert(exceptionFlag); 401 } 402 403 { 404 auto c = new Connection(connParam); 405 406 assert(c.escapeLiteral("abc'def") == "'abc''def'"); 407 assert(c.escapeIdentifier("abc'def") == "\"abc'def\""); 408 409 c.setClientEncoding("WIN866"); 410 assert(c.exec("show client_encoding")[0][0].as!string == "WIN866"); 411 } 412 }