1 /** 2 * Represents connection to the PostgreSQL server 3 * 4 * Most functions is correspond to those in the documentation of Postgres: 5 * $(HTTPS https://www.postgresql.org/docs/current/static/libpq.html) 6 */ 7 module dpq2.connection; 8 9 import dpq2.query; 10 import dpq2.args: QueryParams; 11 import dpq2.result; 12 import dpq2.exception; 13 14 import derelict.pq.pq; 15 import std.conv: to; 16 import std.string: toStringz, fromStringz; 17 import std.exception: enforce; 18 import std.range; 19 import std.stdio: File; 20 import std.socket; 21 import core.exception; 22 import core.time: Duration; 23 24 /* 25 * Bugs: On Unix connection is not thread safe. 26 * 27 * On Unix, forking a process with open libpq connections can lead 28 * to unpredictable results because the parent and child processes share 29 * the same sockets and operating system resources. For this reason, 30 * such usage is not recommended, though doing an exec from the child 31 * process to load a new executable is safe. 32 33 34 35 int PQisthreadsafe(); 36 Returns 1 if the libpq is thread-safe and 0 if it is not. 37 */ 38 39 /// dumb flag for Connection ctor parametrization 40 struct ConnectionStart {}; 41 42 version(DerelictPQ_Static) 43 { 44 /// Connection for static version of libpq 45 class Connection 46 { 47 /// 48 this(T...)(T args) 49 { 50 ctor(args); 51 } 52 53 mixin ConnectionMethods; 54 } 55 } 56 else 57 { 58 /// Connection for dynamic version of libpq 59 package class Connection 60 { 61 import dpq2.dynloader: DynamicLoader; 62 63 package immutable DynamicLoader dynamicLoader; 64 65 /// 66 this(T...)(immutable DynamicLoader dl, T args) 67 { 68 dynamicLoader = dl; 69 70 ctor(args); 71 } 72 73 mixin ConnectionMethods; 74 } 75 } 76 77 private mixin template ConnectionMethods() 78 { 79 package PGconn* conn; 80 81 invariant 82 { 83 version(DerelictPQ_Dynamic) 84 assert(dynamicLoader !is null); 85 86 assert(conn !is null); 87 } 88 89 /// Makes a new connection to the database server 90 private void ctor(string connString) 91 { 92 conn = PQconnectdb(toStringz(connString)); 93 checkCreatedConnection(); 94 } 95 96 /// ditto 97 private void ctor(in string[string] keyValueParams) 98 { 99 auto a = keyValueParams.keyValToPQparamsArrays; 100 101 conn = PQconnectdbParams(&a.keys[0], &a.vals[0], 0); 102 checkCreatedConnection(); 103 } 104 105 /// Starts creation of a connection to the database server in a nonblocking manner 106 private void ctor(ConnectionStart, string connString) 107 { 108 conn = PQconnectStart(toStringz(connString)); 109 checkCreatedConnection(); 110 } 111 112 /// ditto 113 private void ctor(ConnectionStart, in string[string] keyValueParams) 114 { 115 auto a = keyValueParams.keyValToPQparamsArrays; 116 117 conn = PQconnectStartParams(&a.keys[0], &a.vals[0], 0); 118 checkCreatedConnection(); 119 } 120 121 private void checkCreatedConnection() 122 { 123 enforce!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 124 125 if( status == CONNECTION_BAD ) 126 throw new ConnectionException(this, __FILE__, __LINE__); 127 } 128 129 ~this() 130 { 131 PQfinish( conn ); 132 } 133 134 mixin Queries; 135 136 /// Returns the blocking status of the database connection 137 bool isNonBlocking() 138 { 139 return PQisnonblocking(conn) == 1; 140 } 141 142 /// Sets the nonblocking status of the connection 143 private void setNonBlocking(bool state) 144 { 145 if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 ) 146 throw new ConnectionException(this, __FILE__, __LINE__); 147 } 148 149 /// Begin reset the communication channel to the server, in a nonblocking manner 150 /// 151 /// Useful only for non-blocking operations. 152 void resetStart() 153 { 154 if(PQresetStart(conn) == 0) 155 throw new ConnectionException(this, __FILE__, __LINE__); 156 } 157 158 /// Useful only for non-blocking operations. 159 PostgresPollingStatusType poll() nothrow 160 { 161 assert(conn); 162 163 return PQconnectPoll(conn); 164 } 165 166 /// Useful only for non-blocking operations. 167 PostgresPollingStatusType resetPoll() nothrow 168 { 169 assert(conn); 170 171 return PQresetPoll(conn); 172 } 173 174 /// Returns the status of the connection 175 ConnStatusType status() nothrow 176 { 177 return PQstatus(conn); 178 } 179 180 /** 181 Returns the current in-transaction status of the server. 182 The status can be: 183 * PQTRANS_IDLE - currently idle 184 * PQTRANS_ACTIVE - a command is in progress (reported only when a query has been sent to the server and not yet completed) 185 * PQTRANS_INTRANS - idle, in a valid transaction block 186 * PQTRANS_INERROR - idle, in a failed transaction block 187 * PQTRANS_UNKNOWN - reported if the connection is bad 188 */ 189 PGTransactionStatusType transactionStatus() nothrow 190 { 191 return PQtransactionStatus(conn); 192 } 193 194 /// If input is available from the server, consume it 195 /// 196 /// Useful only for non-blocking operations. 197 void consumeInput() 198 { 199 assert(conn); 200 201 const size_t r = PQconsumeInput( conn ); 202 if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__); 203 } 204 205 package bool flush() 206 { 207 assert(conn); 208 209 auto r = PQflush(conn); 210 if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__); 211 return r == 0; 212 } 213 214 /// Obtains the file descriptor number of the connection socket to the server 215 int posixSocket() 216 { 217 int r = PQsocket(conn); 218 219 if(r == -1) 220 throw new ConnectionException(this, __FILE__, __LINE__); 221 222 return r; 223 } 224 225 /// Obtains duplicate file descriptor number of the connection socket to the server 226 socket_t posixSocketDuplicate() 227 { 228 version(Windows) 229 { 230 assert(false, "FIXME: implement socket duplication"); 231 } 232 else // Posix OS 233 { 234 import core.sys.posix.unistd: dup; 235 236 return cast(socket_t) dup(cast(socket_t) posixSocket); 237 } 238 } 239 240 /// Obtains std.socket.Socket of the connection to the server 241 /// 242 /// Due to a limitation of Socket actually for the Socket creation 243 /// duplicate of internal posix socket will be used. 244 Socket socket() 245 { 246 return new Socket(posixSocketDuplicate, AddressFamily.UNSPEC); 247 } 248 249 /// Returns the error message most recently generated by an operation on the connection 250 string errorMessage() const nothrow 251 { 252 return PQerrorMessage(conn).to!string; 253 } 254 255 /** 256 * Sets or examines the current notice processor 257 * 258 * Returns the previous notice receiver or processor function pointer, and sets the new value. 259 * If you supply a null function pointer, no action is taken, but the current pointer is returned. 260 */ 261 PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow 262 { 263 assert(conn); 264 265 return PQsetNoticeProcessor(conn, proc, arg); 266 } 267 268 /// Get next result after sending a non-blocking commands. Can return null. 269 /// 270 /// Useful only for non-blocking operations. 271 immutable(Result) getResult() 272 { 273 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 274 auto r = cast(immutable) PQgetResult(conn); 275 276 if(r) 277 { 278 auto container = new immutable ResultContainer(dynamicLoader, r); 279 return new immutable Result(container); 280 } 281 282 return null; 283 } 284 285 /// Get result after PQexec* functions or throw exception if pull is empty 286 package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const 287 { 288 if(r is null) throw new ConnectionException(this, __FILE__, __LINE__); 289 290 version(DerelictPQ_Static) 291 return new immutable ResultContainer(r); 292 else 293 return new immutable ResultContainer(dynamicLoader, r); 294 } 295 296 /// Select single-row mode for the currently-executing query 297 bool setSingleRowMode() 298 { 299 return PQsetSingleRowMode(conn) == 1; 300 } 301 302 /** 303 Try to cancel query 304 305 If the cancellation is effective, the current command will 306 terminate early and return an error result or exception. If the 307 cancellation will fails (say, because the server was already done 308 processing the command) there will be no visible result at all. 309 */ 310 void cancel() 311 { 312 auto c = new Cancellation(this); 313 c.doCancel; 314 } 315 316 /// 317 bool isBusy() nothrow 318 { 319 assert(conn); 320 321 return PQisBusy(conn) == 1; 322 } 323 324 /// 325 string parameterStatus(string paramName) 326 { 327 assert(conn); 328 329 auto res = PQparameterStatus(conn, toStringz(paramName)); 330 331 if(res is null) 332 throw new ConnectionException(this, __FILE__, __LINE__); 333 334 return to!string(fromStringz(res)); 335 } 336 337 /// 338 string escapeLiteral(string msg) 339 { 340 assert(conn); 341 342 auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length); 343 344 if(buf is null) 345 throw new ConnectionException(this, __FILE__, __LINE__); 346 347 string res = buf.fromStringz.to!string; 348 349 PQfreemem(buf); 350 351 return res; 352 } 353 354 /// 355 string escapeIdentifier(string msg) 356 { 357 assert(conn); 358 359 auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length); 360 361 if(buf is null) 362 throw new ConnectionException(this, __FILE__, __LINE__); 363 364 string res = buf.fromStringz.to!string; 365 366 PQfreemem(buf); 367 368 return res; 369 } 370 371 /// 372 string dbName() const nothrow 373 { 374 assert(conn); 375 376 return PQdb(conn).fromStringz.to!string; 377 } 378 379 /// 380 string host() const nothrow 381 { 382 assert(conn); 383 384 return PQhost(conn).fromStringz.to!string; 385 } 386 387 /// 388 int protocolVersion() const nothrow 389 { 390 assert(conn); 391 392 return PQprotocolVersion(conn); 393 } 394 395 /// 396 int serverVersion() const nothrow 397 { 398 assert(conn); 399 400 return PQserverVersion(conn); 401 } 402 403 /// 404 void trace(ref File stream) 405 { 406 PQtrace(conn, stream.getFP); 407 } 408 409 /// 410 void untrace() 411 { 412 PQuntrace(conn); 413 } 414 415 /// 416 void setClientEncoding(string encoding) 417 { 418 if(PQsetClientEncoding(conn, encoding.toStringz) != 0) 419 throw new ConnectionException(this, __FILE__, __LINE__); 420 } 421 } 422 423 private auto keyValToPQparamsArrays(in string[string] keyValueParams) 424 { 425 static struct PQparamsArrays 426 { 427 immutable(char)*[] keys; 428 immutable(char)*[] vals; 429 } 430 431 PQparamsArrays a; 432 a.keys.length = keyValueParams.length + 1; 433 a.vals.length = keyValueParams.length + 1; 434 435 size_t i; 436 foreach(e; keyValueParams.byKeyValue) 437 { 438 a.keys[i] = e.key.toStringz; 439 a.vals[i] = e.value.toStringz; 440 441 i++; 442 } 443 444 assert(i == keyValueParams.length); 445 446 return a; 447 } 448 449 /// Check connection options in the provided connection string 450 /// 451 /// Throws exception if connection string isn't passes check. 452 void connStringCheck(string connString) 453 { 454 char* errmsg = null; 455 PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg); 456 457 if(r is null) 458 { 459 enforce!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data"); 460 } 461 else 462 { 463 PQconninfoFree(r); 464 } 465 466 if(errmsg !is null) 467 { 468 string s = errmsg.fromStringz.to!string; 469 PQfreemem(cast(void*) errmsg); 470 471 throw new ConnectionException(s, __FILE__, __LINE__); 472 } 473 } 474 475 unittest 476 { 477 connStringCheck("dbname=postgres user=postgres"); 478 479 { 480 bool raised = false; 481 482 try 483 connStringCheck("wrong conninfo string"); 484 catch(ConnectionException e) 485 raised = true; 486 487 assert(raised); 488 } 489 } 490 491 /// Represents query cancellation process 492 class Cancellation 493 { 494 private PGcancel* cancel; 495 496 /// 497 this(Connection c) 498 { 499 cancel = PQgetCancel(c.conn); 500 501 if(cancel is null) 502 throw new ConnectionException(c, __FILE__, __LINE__); 503 } 504 505 /// 506 ~this() 507 { 508 PQfreeCancel(cancel); 509 } 510 511 /** 512 Requests that the server abandon processing of the current command 513 514 Throws exception if cancel request was not successfully dispatched. 515 516 Successful dispatch is no guarantee that the request will have any 517 effect, however. If the cancellation is effective, the current 518 command will terminate early and return an error result 519 (exception). If the cancellation fails (say, because the server 520 was already done processing the command), then there will be no 521 visible result at all. 522 */ 523 void doCancel() 524 { 525 char[256] errbuf; 526 auto res = PQcancel(cancel, errbuf.ptr, errbuf.length); 527 528 if(res != 1) 529 throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__); 530 } 531 } 532 533 /// 534 class CancellationException : Dpq2Exception 535 { 536 this(string msg, string file = __FILE__, size_t line = __LINE__) 537 { 538 super(msg, file, line); 539 } 540 } 541 542 /// Connection exception 543 class ConnectionException : Dpq2Exception 544 { 545 this(in Connection c, string file = __FILE__, size_t line = __LINE__) 546 { 547 super(c.errorMessage(), file, line); 548 } 549 550 this(string msg, string file = __FILE__, size_t line = __LINE__) 551 { 552 super(msg, file, line); 553 } 554 } 555 556 version (integration_tests) 557 Connection createTestConn(T...)(T params) 558 { 559 version(DerelictPQ_Static) 560 auto c = new Connection(params); 561 else 562 { 563 import dpq2.dynloader: DynamicLoader; 564 565 auto dynamicLoader = new immutable DynamicLoader; 566 auto c = dynamicLoader.createConnection(params); 567 } 568 569 return c; 570 } 571 572 version (integration_tests) 573 void _integration_test( string connParam ) 574 { 575 { 576 debug import std.experimental.logger; 577 578 auto c = createTestConn(connParam); 579 580 assert( PQlibVersion() >= 9_0100 ); 581 582 auto dbname = c.dbName(); 583 auto pver = c.protocolVersion(); 584 auto sver = c.serverVersion(); 585 586 debug 587 { 588 trace("DB name: ", dbname); 589 trace("Protocol version: ", pver); 590 trace("Server version: ", sver); 591 } 592 593 destroy(c); 594 } 595 596 { 597 bool exceptionFlag = false; 598 599 try 600 auto c = createTestConn(ConnectionStart(), "!!!some incorrect connection string!!!"); 601 catch(ConnectionException e) 602 { 603 exceptionFlag = true; 604 assert(e.msg.length > 40); // error message check 605 } 606 finally 607 assert(exceptionFlag); 608 } 609 610 { 611 auto c = createTestConn(connParam); 612 613 assert(c.escapeLiteral("abc'def") == "'abc''def'"); 614 assert(c.escapeIdentifier("abc'def") == "\"abc'def\""); 615 616 c.setClientEncoding("WIN866"); 617 assert(c.exec("show client_encoding")[0][0].as!string == "WIN866"); 618 } 619 620 { 621 auto c = createTestConn(connParam); 622 623 assert(c.transactionStatus == PQTRANS_IDLE); 624 625 c.exec("BEGIN"); 626 assert(c.transactionStatus == PQTRANS_INTRANS); 627 628 try c.exec("DISCARD ALL"); 629 catch (Exception) {} 630 assert(c.transactionStatus == PQTRANS_INERROR); 631 632 c.exec("ROLLBACK"); 633 assert(c.transactionStatus == PQTRANS_IDLE); 634 } 635 636 { 637 import std.exception: assertThrown; 638 639 string[string] kv; 640 kv["host"] = "wrong-host"; 641 kv["dbname"] = "wrong-db-name"; 642 643 assertThrown!ConnectionException(createTestConn(kv)); 644 assertThrown!ConnectionException(createTestConn(ConnectionStart(), kv)); 645 } 646 }