1 /** 2 * Represents connection to the PostgreSQL server 3 * 4 * Most functions is correspond to those in the documentation of Postgres: 5 * $(HTTPS https://www.postgresql.org/docs/current/static/libpq.html) 6 */ 7 module dpq2.connection; 8 9 import dpq2.query; 10 import dpq2.args: QueryParams; 11 import dpq2.result; 12 import dpq2.exception; 13 14 import derelict.pq.pq; 15 import std.conv: to; 16 import std.string: toStringz, fromStringz; 17 import std.exception: enforce; 18 import std.range; 19 import std.stdio: File; 20 import std.socket; 21 import core.exception; 22 import core.time: Duration; 23 24 /* 25 * Bugs: On Unix connection is not thread safe. 26 * 27 * On Unix, forking a process with open libpq connections can lead 28 * to unpredictable results because the parent and child processes share 29 * the same sockets and operating system resources. For this reason, 30 * such usage is not recommended, though doing an exec from the child 31 * process to load a new executable is safe. 32 33 34 35 int PQisthreadsafe(); 36 Returns 1 if the libpq is thread-safe and 0 if it is not. 37 */ 38 39 /// dumb flag for Connection ctor parametrization 40 struct ConnectionStart {}; 41 42 /// Connection 43 version(DerelictPQ_Static) 44 class Connection 45 { 46 mixin ConnectionMethods; 47 } 48 else 49 package class Connection 50 { 51 import dpq2.dynloader: ReferenceCounter; 52 53 private immutable ReferenceCounter dynLoaderRefCnt; 54 55 mixin ConnectionMethods; 56 } 57 58 private mixin template ConnectionMethods() 59 { 60 package PGconn* conn; 61 62 invariant 63 { 64 assert(conn !is null); 65 } 66 67 /// Makes a new connection to the database server 68 this(string connString) 69 { 70 conn = PQconnectdb(toStringz(connString)); 71 version(DerelictPQ_Dynamic) dynLoaderRefCnt = ReferenceCounter(true); 72 checkCreatedConnection(); 73 } 74 75 /// ditto 76 this(in string[string] keyValueParams) 77 { 78 auto a = keyValueParams.keyValToPQparamsArrays; 79 80 conn = PQconnectdbParams(&a.keys[0], &a.vals[0], 0); 81 version(DerelictPQ_Dynamic) dynLoaderRefCnt = ReferenceCounter(true); 82 checkCreatedConnection(); 83 } 84 85 /// Starts creation of a connection to the database server in a nonblocking manner 86 this(ConnectionStart, string connString) 87 { 88 conn = PQconnectStart(toStringz(connString)); 89 version(DerelictPQ_Dynamic) dynLoaderRefCnt = ReferenceCounter(true); 90 checkCreatedConnection(); 91 } 92 93 /// ditto 94 this(ConnectionStart, in string[string] keyValueParams) 95 { 96 auto a = keyValueParams.keyValToPQparamsArrays; 97 98 conn = PQconnectStartParams(&a.keys[0], &a.vals[0], 0); 99 version(DerelictPQ_Dynamic) dynLoaderRefCnt = ReferenceCounter(true); 100 checkCreatedConnection(); 101 } 102 103 private void checkCreatedConnection() 104 { 105 enforce!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 106 107 if( status == CONNECTION_BAD ) 108 throw new ConnectionException(this, __FILE__, __LINE__); 109 } 110 111 ~this() 112 { 113 PQfinish( conn ); 114 115 version(DerelictPQ_Dynamic) dynLoaderRefCnt.__custom_dtor(); 116 } 117 118 mixin Queries; 119 120 /// Returns the blocking status of the database connection 121 bool isNonBlocking() 122 { 123 return PQisnonblocking(conn) == 1; 124 } 125 126 /// Sets the nonblocking status of the connection 127 private void setNonBlocking(bool state) 128 { 129 if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 ) 130 throw new ConnectionException(this, __FILE__, __LINE__); 131 } 132 133 /// Begin reset the communication channel to the server, in a nonblocking manner 134 /// 135 /// Useful only for non-blocking operations. 136 void resetStart() 137 { 138 if(PQresetStart(conn) == 0) 139 throw new ConnectionException(this, __FILE__, __LINE__); 140 } 141 142 /// Useful only for non-blocking operations. 143 PostgresPollingStatusType poll() nothrow 144 { 145 assert(conn); 146 147 return PQconnectPoll(conn); 148 } 149 150 /// Useful only for non-blocking operations. 151 PostgresPollingStatusType resetPoll() nothrow 152 { 153 assert(conn); 154 155 return PQresetPoll(conn); 156 } 157 158 /// Returns the status of the connection 159 ConnStatusType status() nothrow 160 { 161 return PQstatus(conn); 162 } 163 164 /** 165 Returns the current in-transaction status of the server. 166 The status can be: 167 * PQTRANS_IDLE - currently idle 168 * PQTRANS_ACTIVE - a command is in progress (reported only when a query has been sent to the server and not yet completed) 169 * PQTRANS_INTRANS - idle, in a valid transaction block 170 * PQTRANS_INERROR - idle, in a failed transaction block 171 * PQTRANS_UNKNOWN - reported if the connection is bad 172 */ 173 PGTransactionStatusType transactionStatus() nothrow 174 { 175 return PQtransactionStatus(conn); 176 } 177 178 /// If input is available from the server, consume it 179 /// 180 /// Useful only for non-blocking operations. 181 void consumeInput() 182 { 183 assert(conn); 184 185 const size_t r = PQconsumeInput( conn ); 186 if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__); 187 } 188 189 package bool flush() 190 { 191 assert(conn); 192 193 auto r = PQflush(conn); 194 if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__); 195 return r == 0; 196 } 197 198 /// Obtains the file descriptor number of the connection socket to the server 199 int posixSocket() 200 { 201 int r = PQsocket(conn); 202 203 if(r == -1) 204 throw new ConnectionException(this, __FILE__, __LINE__); 205 206 return r; 207 } 208 209 /// Obtains duplicate file descriptor number of the connection socket to the server 210 socket_t posixSocketDuplicate() 211 { 212 version(Windows) 213 { 214 assert(false, "FIXME: implement socket duplication"); 215 } 216 else // Posix OS 217 { 218 import core.sys.posix.unistd: dup; 219 220 return cast(socket_t) dup(cast(socket_t) posixSocket); 221 } 222 } 223 224 /// Obtains std.socket.Socket of the connection to the server 225 /// 226 /// Due to a limitation of Socket actually for the Socket creation 227 /// duplicate of internal posix socket will be used. 228 Socket socket() 229 { 230 return new Socket(posixSocketDuplicate, AddressFamily.UNSPEC); 231 } 232 233 /// Returns the error message most recently generated by an operation on the connection 234 string errorMessage() const nothrow 235 { 236 return PQerrorMessage(conn).to!string; 237 } 238 239 /** 240 * Sets or examines the current notice processor 241 * 242 * Returns the previous notice receiver or processor function pointer, and sets the new value. 243 * If you supply a null function pointer, no action is taken, but the current pointer is returned. 244 */ 245 PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow 246 { 247 assert(conn); 248 249 return PQsetNoticeProcessor(conn, proc, arg); 250 } 251 252 /// Get next result after sending a non-blocking commands. Can return null. 253 /// 254 /// Useful only for non-blocking operations. 255 immutable(Result) getResult() 256 { 257 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 258 auto r = cast(immutable) PQgetResult(conn); 259 260 if(r) 261 { 262 auto container = new immutable ResultContainer(r); 263 return new immutable Result(container); 264 } 265 266 return null; 267 } 268 269 /// Get result after PQexec* functions or throw exception if pull is empty 270 package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const 271 { 272 if(r is null) throw new ConnectionException(this, __FILE__, __LINE__); 273 274 return new immutable ResultContainer(r); 275 } 276 277 /// Select single-row mode for the currently-executing query 278 bool setSingleRowMode() 279 { 280 return PQsetSingleRowMode(conn) == 1; 281 } 282 283 /** 284 Try to cancel query 285 286 If the cancellation is effective, the current command will 287 terminate early and return an error result or exception. If the 288 cancellation will fails (say, because the server was already done 289 processing the command) there will be no visible result at all. 290 */ 291 void cancel() 292 { 293 auto c = new Cancellation(this); 294 c.doCancel; 295 } 296 297 /// 298 bool isBusy() nothrow 299 { 300 assert(conn); 301 302 return PQisBusy(conn) == 1; 303 } 304 305 /// 306 string parameterStatus(string paramName) 307 { 308 assert(conn); 309 310 auto res = PQparameterStatus(conn, toStringz(paramName)); 311 312 if(res is null) 313 throw new ConnectionException(this, __FILE__, __LINE__); 314 315 return to!string(fromStringz(res)); 316 } 317 318 /// 319 string escapeLiteral(string msg) 320 { 321 assert(conn); 322 323 auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length); 324 325 if(buf is null) 326 throw new ConnectionException(this, __FILE__, __LINE__); 327 328 string res = buf.fromStringz.to!string; 329 330 PQfreemem(buf); 331 332 return res; 333 } 334 335 /// 336 string escapeIdentifier(string msg) 337 { 338 assert(conn); 339 340 auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length); 341 342 if(buf is null) 343 throw new ConnectionException(this, __FILE__, __LINE__); 344 345 string res = buf.fromStringz.to!string; 346 347 PQfreemem(buf); 348 349 return res; 350 } 351 352 /// 353 string dbName() const nothrow 354 { 355 assert(conn); 356 357 return PQdb(conn).fromStringz.to!string; 358 } 359 360 /// 361 string host() const nothrow 362 { 363 assert(conn); 364 365 return PQhost(conn).fromStringz.to!string; 366 } 367 368 /// 369 int protocolVersion() const nothrow 370 { 371 assert(conn); 372 373 return PQprotocolVersion(conn); 374 } 375 376 /// 377 int serverVersion() const nothrow 378 { 379 assert(conn); 380 381 return PQserverVersion(conn); 382 } 383 384 /// 385 void trace(ref File stream) 386 { 387 PQtrace(conn, stream.getFP); 388 } 389 390 /// 391 void untrace() 392 { 393 PQuntrace(conn); 394 } 395 396 /// 397 void setClientEncoding(string encoding) 398 { 399 if(PQsetClientEncoding(conn, encoding.toStringz) != 0) 400 throw new ConnectionException(this, __FILE__, __LINE__); 401 } 402 } 403 404 private auto keyValToPQparamsArrays(in string[string] keyValueParams) 405 { 406 static struct PQparamsArrays 407 { 408 immutable(char)*[] keys; 409 immutable(char)*[] vals; 410 } 411 412 PQparamsArrays a; 413 a.keys.length = keyValueParams.length + 1; 414 a.vals.length = keyValueParams.length + 1; 415 416 size_t i; 417 foreach(e; keyValueParams.byKeyValue) 418 { 419 a.keys[i] = e.key.toStringz; 420 a.vals[i] = e.value.toStringz; 421 422 i++; 423 } 424 425 assert(i == keyValueParams.length); 426 427 return a; 428 } 429 430 /// Check connection options in the provided connection string 431 /// 432 /// Throws exception if connection string isn't passes check. 433 version(DerelictPQ_Static) 434 void connStringCheck(string connString) 435 { 436 _connStringCheck(connString); 437 } 438 439 /// ditto 440 package void _connStringCheck(string connString) 441 { 442 char* errmsg = null; 443 PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg); 444 445 if(r is null) 446 { 447 enforce!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data"); 448 } 449 else 450 { 451 PQconninfoFree(r); 452 } 453 454 if(errmsg !is null) 455 { 456 string s = errmsg.fromStringz.to!string; 457 PQfreemem(cast(void*) errmsg); 458 459 throw new ConnectionException(s, __FILE__, __LINE__); 460 } 461 } 462 463 unittest 464 { 465 _connStringCheck("dbname=postgres user=postgres"); 466 467 { 468 bool raised = false; 469 470 try 471 connStringCheck("wrong conninfo string"); 472 catch(ConnectionException e) 473 raised = true; 474 475 assert(raised); 476 } 477 } 478 479 /// Represents query cancellation process 480 class Cancellation 481 { 482 version(DerelictPQ_Dynamic) 483 { 484 import dpq2.dynloader: ReferenceCounter; 485 private immutable ReferenceCounter dynLoaderRefCnt; 486 } 487 488 private PGcancel* cancel; 489 490 /// 491 this(Connection c) 492 { 493 version(DerelictPQ_Dynamic) dynLoaderRefCnt = ReferenceCounter(true); 494 495 cancel = PQgetCancel(c.conn); 496 497 if(cancel is null) 498 throw new ConnectionException(c, __FILE__, __LINE__); 499 } 500 501 /// 502 ~this() 503 { 504 PQfreeCancel(cancel); 505 506 version(DerelictPQ_Dynamic) dynLoaderRefCnt.__custom_dtor(); 507 } 508 509 /** 510 Requests that the server abandon processing of the current command 511 512 Throws exception if cancel request was not successfully dispatched. 513 514 Successful dispatch is no guarantee that the request will have any 515 effect, however. If the cancellation is effective, the current 516 command will terminate early and return an error result 517 (exception). If the cancellation fails (say, because the server 518 was already done processing the command), then there will be no 519 visible result at all. 520 */ 521 void doCancel() 522 { 523 char[256] errbuf; 524 auto res = PQcancel(cancel, errbuf.ptr, errbuf.length); 525 526 if(res != 1) 527 throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__); 528 } 529 } 530 531 /// 532 class CancellationException : Dpq2Exception 533 { 534 this(string msg, string file = __FILE__, size_t line = __LINE__) 535 { 536 super(msg, file, line); 537 } 538 } 539 540 /// Connection exception 541 class ConnectionException : Dpq2Exception 542 { 543 this(in Connection c, string file = __FILE__, size_t line = __LINE__) 544 { 545 super(c.errorMessage(), file, line); 546 } 547 548 this(string msg, string file = __FILE__, size_t line = __LINE__) 549 { 550 super(msg, file, line); 551 } 552 } 553 554 version (integration_tests) 555 Connection createTestConn(T...)(T params) 556 { 557 version(DerelictPQ_Static) 558 auto c = new Connection(params); 559 else 560 { 561 import dpq2.dynloader: connFactory; 562 563 Connection c = connFactory.createConnection(params); 564 } 565 566 return c; 567 } 568 569 version (integration_tests) 570 void _integration_test( string connParam ) 571 { 572 { 573 debug import std.experimental.logger; 574 575 auto c = createTestConn(connParam); 576 577 assert( PQlibVersion() >= 9_0100 ); 578 579 auto dbname = c.dbName(); 580 auto pver = c.protocolVersion(); 581 auto sver = c.serverVersion(); 582 583 debug 584 { 585 trace("DB name: ", dbname); 586 trace("Protocol version: ", pver); 587 trace("Server version: ", sver); 588 } 589 590 destroy(c); 591 } 592 593 { 594 bool exceptionFlag = false; 595 596 try 597 auto c = createTestConn(ConnectionStart(), "!!!some incorrect connection string!!!"); 598 catch(ConnectionException e) 599 { 600 exceptionFlag = true; 601 assert(e.msg.length > 40); // error message check 602 } 603 finally 604 assert(exceptionFlag); 605 } 606 607 { 608 auto c = createTestConn(connParam); 609 610 assert(c.escapeLiteral("abc'def") == "'abc''def'"); 611 assert(c.escapeIdentifier("abc'def") == "\"abc'def\""); 612 613 c.setClientEncoding("WIN866"); 614 assert(c.exec("show client_encoding")[0][0].as!string == "WIN866"); 615 } 616 617 { 618 auto c = createTestConn(connParam); 619 620 assert(c.transactionStatus == PQTRANS_IDLE); 621 622 c.exec("BEGIN"); 623 assert(c.transactionStatus == PQTRANS_INTRANS); 624 625 try c.exec("DISCARD ALL"); 626 catch (Exception) {} 627 assert(c.transactionStatus == PQTRANS_INERROR); 628 629 c.exec("ROLLBACK"); 630 assert(c.transactionStatus == PQTRANS_IDLE); 631 } 632 633 { 634 import std.exception: assertThrown; 635 636 string[string] kv; 637 kv["host"] = "wrong-host"; 638 kv["dbname"] = "wrong-db-name"; 639 640 assertThrown!ConnectionException(createTestConn(kv)); 641 assertThrown!ConnectionException(createTestConn(ConnectionStart(), kv)); 642 } 643 }