1 module dpq2.connection; 2 3 import dpq2.query; 4 import dpq2.args: QueryParams; 5 import dpq2.result; 6 import dpq2.exception; 7 8 import derelict.pq.pq; 9 import std.conv: to; 10 import std.string: toStringz, fromStringz; 11 import std.exception: enforce, enforceEx; 12 import std.range; 13 import std.stdio: File; 14 import std.socket; 15 import core.exception; 16 import core.time: Duration; 17 18 /* 19 * Bugs: On Unix connection is not thread safe. 20 * 21 * On Unix, forking a process with open libpq connections can lead 22 * to unpredictable results because the parent and child processes share 23 * the same sockets and operating system resources. For this reason, 24 * such usage is not recommended, though doing an exec from the child 25 * process to load a new executable is safe. 26 27 28 29 int PQisthreadsafe(); 30 Returns 1 if the libpq is thread-safe and 0 if it is not. 31 */ 32 33 /// dumb flag for Connection ctor parametrization 34 struct ConnectionStart {}; 35 36 /// Connection 37 class Connection 38 { 39 package PGconn* conn; 40 41 invariant 42 { 43 assert(conn !is null); 44 } 45 46 this(string connString) 47 { 48 conn = PQconnectdb(toStringz(connString)); 49 50 enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 51 52 if(status != CONNECTION_OK) 53 throw new ConnectionException(this, __FILE__, __LINE__); 54 } 55 56 /// Connect to DB in a nonblocking manner 57 this(ConnectionStart, string connString) 58 { 59 conn = PQconnectStart(toStringz(connString)); 60 61 enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 62 63 if( status == CONNECTION_BAD ) 64 throw new ConnectionException(this, __FILE__, __LINE__); 65 } 66 67 ~this() 68 { 69 PQfinish( conn ); 70 } 71 72 mixin Queries; 73 74 bool isNonBlocking() 75 { 76 return PQisnonblocking(conn) == 1; 77 } 78 79 private void setNonBlocking( bool state ) 80 { 81 if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 ) 82 throw new ConnectionException(this, __FILE__, __LINE__); 83 } 84 85 void resetStart() 86 { 87 if(PQresetStart(conn) == 0) 88 throw new ConnectionException(this, __FILE__, __LINE__); 89 } 90 91 PostgresPollingStatusType poll() nothrow 92 { 93 assert(conn); 94 95 return PQconnectPoll(conn); 96 } 97 98 PostgresPollingStatusType resetPoll() nothrow 99 { 100 assert(conn); 101 102 return PQresetPoll(conn); 103 } 104 105 ConnStatusType status() nothrow 106 { 107 return PQstatus(conn); 108 } 109 110 void consumeInput() 111 { 112 assert(conn); 113 114 const size_t r = PQconsumeInput( conn ); 115 if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__); 116 } 117 118 package bool flush() 119 { 120 assert(conn); 121 122 auto r = PQflush(conn); 123 if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__); 124 return r == 0; 125 } 126 127 int posixSocket() 128 { 129 int r = PQsocket(conn); 130 131 if(r == -1) 132 throw new ConnectionException(this, __FILE__, __LINE__); 133 134 return r; 135 } 136 137 Socket socket() 138 { 139 version(Windows) 140 { 141 assert(false, "FIXME: implement socket duplication"); 142 } 143 else // Posix 144 { 145 import core.sys.posix.unistd: dup; 146 147 socket_t s = cast(socket_t) dup(cast(socket_t) posixSocket); 148 return new Socket(s, AddressFamily.UNSPEC); 149 } 150 } 151 152 string errorMessage() const nothrow 153 { 154 return PQerrorMessage(conn).to!string; 155 } 156 157 /** 158 * returns the previous notice receiver or processor function pointer, and sets the new value. 159 * If you supply a null function pointer, no action is taken, but the current pointer is returned. 160 */ 161 PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow 162 { 163 assert(conn); 164 165 return PQsetNoticeProcessor(conn, proc, arg); 166 } 167 168 /// Get for the next result from a sendQuery. Can return null. 169 immutable(Result) getResult() 170 { 171 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 172 auto r = cast(immutable) PQgetResult(conn); 173 174 if(r) 175 { 176 auto container = new immutable ResultContainer(r); 177 return new immutable Result(container); 178 } 179 180 return null; 181 } 182 183 /// Get result from PQexec* functions or throw error if pull is empty 184 package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const 185 { 186 if(r is null) throw new ConnectionException(this, __FILE__, __LINE__); 187 188 return new immutable ResultContainer(r); 189 } 190 191 bool setSingleRowMode() 192 { 193 return PQsetSingleRowMode(conn) == 1; 194 } 195 196 /** 197 If the cancellation is effective, the current command will 198 terminate early and return an error result (exception). If the 199 cancellation fails (say, because the server was already done 200 processing the command), then there will be no visible result at 201 all. 202 */ 203 void cancel() 204 { 205 auto c = new Cancellation(this); 206 c.doCancel; 207 } 208 209 bool isBusy() nothrow 210 { 211 assert(conn); 212 213 return PQisBusy(conn) == 1; 214 } 215 216 string parameterStatus(string paramName) 217 { 218 assert(conn); 219 220 auto res = PQparameterStatus(conn, toStringz(paramName)); 221 222 if(res is null) 223 throw new ConnectionException(this, __FILE__, __LINE__); 224 225 return to!string(fromStringz(res)); 226 } 227 228 string escapeLiteral(string msg) 229 { 230 assert(conn); 231 232 auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length); 233 234 if(buf is null) 235 throw new ConnectionException(this, __FILE__, __LINE__); 236 237 string res = buf.fromStringz.to!string; 238 239 PQfreemem(buf); 240 241 return res; 242 } 243 244 string escapeIdentifier(string msg) 245 { 246 assert(conn); 247 248 auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length); 249 250 if(buf is null) 251 throw new ConnectionException(this, __FILE__, __LINE__); 252 253 string res = buf.fromStringz.to!string; 254 255 PQfreemem(buf); 256 257 return res; 258 } 259 260 string dbName() const nothrow 261 { 262 assert(conn); 263 264 return PQdb(conn).fromStringz.to!string; 265 } 266 267 string host() const nothrow 268 { 269 assert(conn); 270 271 return PQhost(conn).fromStringz.to!string; 272 } 273 274 int protocolVersion() const nothrow 275 { 276 assert(conn); 277 278 return PQprotocolVersion(conn); 279 } 280 281 int serverVersion() const nothrow 282 { 283 assert(conn); 284 285 return PQserverVersion(conn); 286 } 287 288 void trace(ref File stream) 289 { 290 PQtrace(conn, stream.getFP); 291 } 292 293 void untrace() 294 { 295 PQuntrace(conn); 296 } 297 298 void setClientEncoding(string encoding) 299 { 300 if(PQsetClientEncoding(conn, encoding.toStringz) != 0) 301 throw new ConnectionException(this, __FILE__, __LINE__); 302 } 303 } 304 305 void connStringCheck(string connString) 306 { 307 char* errmsg = null; 308 PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg); 309 310 if(r is null) 311 { 312 enforceEx!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data"); 313 } 314 else 315 { 316 PQconninfoFree(r); 317 } 318 319 if(errmsg !is null) 320 { 321 string s = errmsg.fromStringz.to!string; 322 PQfreemem(cast(void*) errmsg); 323 324 throw new ConnectionException(s, __FILE__, __LINE__); 325 } 326 } 327 328 unittest 329 { 330 connStringCheck("dbname=postgres user=postgres"); 331 332 { 333 bool raised = false; 334 335 try 336 connStringCheck("wrong conninfo string"); 337 catch(ConnectionException e) 338 raised = true; 339 340 assert(raised); 341 } 342 } 343 344 /// Doing canceling queries in progress 345 class Cancellation 346 { 347 private PGcancel* cancel; 348 349 this(Connection c) 350 { 351 cancel = PQgetCancel(c.conn); 352 353 if(cancel is null) 354 throw new ConnectionException(c, __FILE__, __LINE__); 355 } 356 357 ~this() 358 { 359 PQfreeCancel(cancel); 360 } 361 362 /** 363 Successful dispatch is no guarantee that the request will have any 364 effect, however. If the cancellation is effective, the current 365 command will terminate early and return an error result 366 (exception). If the cancellation fails (say, because the server 367 was already done processing the command), then there will be no 368 visible result at all. 369 */ 370 void doCancel() 371 { 372 char[256] errbuf; 373 auto res = PQcancel(cancel, errbuf.ptr, errbuf.length); 374 375 if(res != 1) 376 throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__); 377 } 378 } 379 380 class CancellationException : Dpq2Exception 381 { 382 this(string msg, string file, size_t line) 383 { 384 super(msg, file, line); 385 } 386 } 387 388 /// Connection exception 389 class ConnectionException : Dpq2Exception 390 { 391 this(in Connection c, string file, size_t line) 392 { 393 super(c.errorMessage(), file, line); 394 } 395 396 this(string msg, string file, size_t line) 397 { 398 super(msg, file, line); 399 } 400 } 401 402 void _integration_test( string connParam ) 403 { 404 assert( PQlibVersion() >= 9_0100 ); 405 406 { 407 debug import std.experimental.logger; 408 409 auto c = new Connection(connParam); 410 auto dbname = c.dbName(); 411 auto pver = c.protocolVersion(); 412 auto sver = c.serverVersion(); 413 414 debug 415 { 416 trace("DB name: ", dbname); 417 trace("Protocol version: ", pver); 418 trace("Server version: ", sver); 419 } 420 421 destroy(c); 422 } 423 424 { 425 bool exceptionFlag = false; 426 427 try 428 auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!"); 429 catch(ConnectionException e) 430 { 431 exceptionFlag = true; 432 assert(e.msg.length > 40); // error message check 433 } 434 finally 435 assert(exceptionFlag); 436 } 437 438 { 439 auto c = new Connection(connParam); 440 441 assert(c.escapeLiteral("abc'def") == "'abc''def'"); 442 assert(c.escapeIdentifier("abc'def") == "\"abc'def\""); 443 444 c.setClientEncoding("WIN866"); 445 assert(c.exec("show client_encoding")[0][0].as!string == "WIN866"); 446 } 447 }