1 /** 2 * Represents connection to the PostgreSQL server 3 * 4 * Most functions is correspond to those in the documentation of Postgres: 5 * $(HTTPS https://www.postgresql.org/docs/current/static/libpq.html) 6 */ 7 module dpq2.connection; 8 9 import dpq2.query; 10 import dpq2.args: QueryParams; 11 import dpq2.result; 12 import dpq2.exception; 13 14 import derelict.pq.pq; 15 import std.conv: to; 16 import std.string: toStringz, fromStringz; 17 import std.exception: enforce; 18 import std.range; 19 import std.stdio: File; 20 import std.socket; 21 import core.exception; 22 import core.time: Duration; 23 24 /* 25 * Bugs: On Unix connection is not thread safe. 26 * 27 * On Unix, forking a process with open libpq connections can lead 28 * to unpredictable results because the parent and child processes share 29 * the same sockets and operating system resources. For this reason, 30 * such usage is not recommended, though doing an exec from the child 31 * process to load a new executable is safe. 32 33 34 35 int PQisthreadsafe(); 36 Returns 1 if the libpq is thread-safe and 0 if it is not. 37 */ 38 39 /// dumb flag for Connection ctor parametrization 40 struct ConnectionStart {}; 41 42 /// Connection 43 class Connection 44 { 45 package PGconn* conn; 46 47 invariant 48 { 49 assert(conn !is null); 50 } 51 52 /// Makes a new connection to the database server 53 this(string connString) 54 { 55 conn = PQconnectdb(toStringz(connString)); 56 checkCreatedConnection(); 57 } 58 59 /// ditto 60 this(in string[string] keyValueParams) 61 { 62 auto a = keyValueParams.keyValToPQparamsArrays; 63 64 conn = PQconnectdbParams(&a.keys[0], &a.vals[0], 0); 65 checkCreatedConnection(); 66 } 67 68 /// Starts creation of a connection to the database server in a nonblocking manner 69 this(ConnectionStart, string connString) 70 { 71 conn = PQconnectStart(toStringz(connString)); 72 checkCreatedConnection(); 73 } 74 75 /// ditto 76 this(ConnectionStart, in string[string] keyValueParams) 77 { 78 auto a = keyValueParams.keyValToPQparamsArrays; 79 80 conn = PQconnectStartParams(&a.keys[0], &a.vals[0], 0); 81 checkCreatedConnection(); 82 } 83 84 private void checkCreatedConnection() 85 { 86 enforce!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 87 88 if( status == CONNECTION_BAD ) 89 throw new ConnectionException(this, __FILE__, __LINE__); 90 } 91 92 ~this() 93 { 94 PQfinish( conn ); 95 } 96 97 mixin Queries; 98 99 /// Returns the blocking status of the database connection 100 bool isNonBlocking() 101 { 102 return PQisnonblocking(conn) == 1; 103 } 104 105 /// Sets the nonblocking status of the connection 106 private void setNonBlocking(bool state) 107 { 108 if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 ) 109 throw new ConnectionException(this, __FILE__, __LINE__); 110 } 111 112 /// Begin reset the communication channel to the server, in a nonblocking manner 113 /// 114 /// Useful only for non-blocking operations. 115 void resetStart() 116 { 117 if(PQresetStart(conn) == 0) 118 throw new ConnectionException(this, __FILE__, __LINE__); 119 } 120 121 /// Useful only for non-blocking operations. 122 PostgresPollingStatusType poll() nothrow 123 { 124 assert(conn); 125 126 return PQconnectPoll(conn); 127 } 128 129 /// Useful only for non-blocking operations. 130 PostgresPollingStatusType resetPoll() nothrow 131 { 132 assert(conn); 133 134 return PQresetPoll(conn); 135 } 136 137 /// Returns the status of the connection 138 ConnStatusType status() nothrow 139 { 140 return PQstatus(conn); 141 } 142 143 /** 144 Returns the current in-transaction status of the server. 145 The status can be: 146 * PQTRANS_IDLE - currently idle 147 * PQTRANS_ACTIVE - a command is in progress (reported only when a query has been sent to the server and not yet completed) 148 * PQTRANS_INTRANS - idle, in a valid transaction block 149 * PQTRANS_INERROR - idle, in a failed transaction block 150 * PQTRANS_UNKNOWN - reported if the connection is bad 151 */ 152 PGTransactionStatusType transactionStatus() nothrow 153 { 154 return PQtransactionStatus(conn); 155 } 156 157 /// If input is available from the server, consume it 158 /// 159 /// Useful only for non-blocking operations. 160 void consumeInput() 161 { 162 assert(conn); 163 164 const size_t r = PQconsumeInput( conn ); 165 if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__); 166 } 167 168 package bool flush() 169 { 170 assert(conn); 171 172 auto r = PQflush(conn); 173 if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__); 174 return r == 0; 175 } 176 177 /// Obtains the file descriptor number of the connection socket to the server 178 int posixSocket() 179 { 180 int r = PQsocket(conn); 181 182 if(r == -1) 183 throw new ConnectionException(this, __FILE__, __LINE__); 184 185 return r; 186 } 187 188 /// Obtains duplicate file descriptor number of the connection socket to the server 189 socket_t posixSocketDuplicate() 190 { 191 version(Windows) 192 { 193 assert(false, "FIXME: implement socket duplication"); 194 } 195 else // Posix OS 196 { 197 import core.sys.posix.unistd: dup; 198 199 return cast(socket_t) dup(cast(socket_t) posixSocket); 200 } 201 } 202 203 /// Obtains std.socket.Socket of the connection to the server 204 /// 205 /// Due to a limitation of Socket actually for the Socket creation 206 /// duplicate of internal posix socket will be used. 207 Socket socket() 208 { 209 return new Socket(posixSocketDuplicate, AddressFamily.UNSPEC); 210 } 211 212 /// Returns the error message most recently generated by an operation on the connection 213 string errorMessage() const nothrow 214 { 215 return PQerrorMessage(conn).to!string; 216 } 217 218 /** 219 * Sets or examines the current notice processor 220 * 221 * Returns the previous notice receiver or processor function pointer, and sets the new value. 222 * If you supply a null function pointer, no action is taken, but the current pointer is returned. 223 */ 224 PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow 225 { 226 assert(conn); 227 228 return PQsetNoticeProcessor(conn, proc, arg); 229 } 230 231 /// Get next result after sending a non-blocking commands. Can return null. 232 /// 233 /// Useful only for non-blocking operations. 234 immutable(Result) getResult() 235 { 236 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 237 auto r = cast(immutable) PQgetResult(conn); 238 239 if(r) 240 { 241 auto container = new immutable ResultContainer(r); 242 return new immutable Result(container); 243 } 244 245 return null; 246 } 247 248 /// Get result after PQexec* functions or throw exception if pull is empty 249 package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const 250 { 251 if(r is null) throw new ConnectionException(this, __FILE__, __LINE__); 252 253 return new immutable ResultContainer(r); 254 } 255 256 /// Select single-row mode for the currently-executing query 257 bool setSingleRowMode() 258 { 259 return PQsetSingleRowMode(conn) == 1; 260 } 261 262 /** 263 Try to cancel query 264 265 If the cancellation is effective, the current command will 266 terminate early and return an error result or exception. If the 267 cancellation will fails (say, because the server was already done 268 processing the command) there will be no visible result at all. 269 */ 270 void cancel() 271 { 272 auto c = new Cancellation(this); 273 c.doCancel; 274 } 275 276 /// 277 bool isBusy() nothrow 278 { 279 assert(conn); 280 281 return PQisBusy(conn) == 1; 282 } 283 284 /// 285 string parameterStatus(string paramName) 286 { 287 assert(conn); 288 289 auto res = PQparameterStatus(conn, toStringz(paramName)); 290 291 if(res is null) 292 throw new ConnectionException(this, __FILE__, __LINE__); 293 294 return to!string(fromStringz(res)); 295 } 296 297 /// 298 string escapeLiteral(string msg) 299 { 300 assert(conn); 301 302 auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length); 303 304 if(buf is null) 305 throw new ConnectionException(this, __FILE__, __LINE__); 306 307 string res = buf.fromStringz.to!string; 308 309 PQfreemem(buf); 310 311 return res; 312 } 313 314 /// 315 string escapeIdentifier(string msg) 316 { 317 assert(conn); 318 319 auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length); 320 321 if(buf is null) 322 throw new ConnectionException(this, __FILE__, __LINE__); 323 324 string res = buf.fromStringz.to!string; 325 326 PQfreemem(buf); 327 328 return res; 329 } 330 331 /// 332 string dbName() const nothrow 333 { 334 assert(conn); 335 336 return PQdb(conn).fromStringz.to!string; 337 } 338 339 /// 340 string host() const nothrow 341 { 342 assert(conn); 343 344 return PQhost(conn).fromStringz.to!string; 345 } 346 347 /// 348 int protocolVersion() const nothrow 349 { 350 assert(conn); 351 352 return PQprotocolVersion(conn); 353 } 354 355 /// 356 int serverVersion() const nothrow 357 { 358 assert(conn); 359 360 return PQserverVersion(conn); 361 } 362 363 /// 364 void trace(ref File stream) 365 { 366 PQtrace(conn, stream.getFP); 367 } 368 369 /// 370 void untrace() 371 { 372 PQuntrace(conn); 373 } 374 375 /// 376 void setClientEncoding(string encoding) 377 { 378 if(PQsetClientEncoding(conn, encoding.toStringz) != 0) 379 throw new ConnectionException(this, __FILE__, __LINE__); 380 } 381 } 382 383 private auto keyValToPQparamsArrays(in string[string] keyValueParams) 384 { 385 static struct PQparamsArrays 386 { 387 immutable(char)*[] keys; 388 immutable(char)*[] vals; 389 } 390 391 PQparamsArrays a; 392 a.keys.length = keyValueParams.length + 1; 393 a.vals.length = keyValueParams.length + 1; 394 395 size_t i; 396 foreach(e; keyValueParams.byKeyValue) 397 { 398 a.keys[i] = e.key.toStringz; 399 a.vals[i] = e.value.toStringz; 400 401 i++; 402 } 403 404 assert(i == keyValueParams.length); 405 406 return a; 407 } 408 409 /// Check connection options in the provided connection string 410 /// 411 /// Throws exception if connection string isn't passes check. 412 void connStringCheck(string connString) 413 { 414 char* errmsg = null; 415 PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg); 416 417 if(r is null) 418 { 419 enforce!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data"); 420 } 421 else 422 { 423 PQconninfoFree(r); 424 } 425 426 if(errmsg !is null) 427 { 428 string s = errmsg.fromStringz.to!string; 429 PQfreemem(cast(void*) errmsg); 430 431 throw new ConnectionException(s, __FILE__, __LINE__); 432 } 433 } 434 435 unittest 436 { 437 connStringCheck("dbname=postgres user=postgres"); 438 439 { 440 bool raised = false; 441 442 try 443 connStringCheck("wrong conninfo string"); 444 catch(ConnectionException e) 445 raised = true; 446 447 assert(raised); 448 } 449 } 450 451 /// Represents query cancellation process 452 class Cancellation 453 { 454 private PGcancel* cancel; 455 456 /// 457 this(Connection c) 458 { 459 cancel = PQgetCancel(c.conn); 460 461 if(cancel is null) 462 throw new ConnectionException(c, __FILE__, __LINE__); 463 } 464 465 /// 466 ~this() 467 { 468 PQfreeCancel(cancel); 469 } 470 471 /** 472 Requests that the server abandon processing of the current command 473 474 Throws exception if cancel request was not successfully dispatched. 475 476 Successful dispatch is no guarantee that the request will have any 477 effect, however. If the cancellation is effective, the current 478 command will terminate early and return an error result 479 (exception). If the cancellation fails (say, because the server 480 was already done processing the command), then there will be no 481 visible result at all. 482 */ 483 void doCancel() 484 { 485 char[256] errbuf; 486 auto res = PQcancel(cancel, errbuf.ptr, errbuf.length); 487 488 if(res != 1) 489 throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__); 490 } 491 } 492 493 /// 494 class CancellationException : Dpq2Exception 495 { 496 this(string msg, string file = __FILE__, size_t line = __LINE__) 497 { 498 super(msg, file, line); 499 } 500 } 501 502 /// Connection exception 503 class ConnectionException : Dpq2Exception 504 { 505 this(in Connection c, string file = __FILE__, size_t line = __LINE__) 506 { 507 super(c.errorMessage(), file, line); 508 } 509 510 this(string msg, string file = __FILE__, size_t line = __LINE__) 511 { 512 super(msg, file, line); 513 } 514 } 515 516 version (integration_tests) 517 void _integration_test( string connParam ) 518 { 519 assert( PQlibVersion() >= 9_0100 ); 520 521 { 522 debug import std.experimental.logger; 523 524 auto c = new Connection(connParam); 525 auto dbname = c.dbName(); 526 auto pver = c.protocolVersion(); 527 auto sver = c.serverVersion(); 528 529 debug 530 { 531 trace("DB name: ", dbname); 532 trace("Protocol version: ", pver); 533 trace("Server version: ", sver); 534 } 535 536 destroy(c); 537 } 538 539 { 540 bool exceptionFlag = false; 541 542 try 543 auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!"); 544 catch(ConnectionException e) 545 { 546 exceptionFlag = true; 547 assert(e.msg.length > 40); // error message check 548 } 549 finally 550 assert(exceptionFlag); 551 } 552 553 { 554 auto c = new Connection(connParam); 555 556 assert(c.escapeLiteral("abc'def") == "'abc''def'"); 557 assert(c.escapeIdentifier("abc'def") == "\"abc'def\""); 558 559 c.setClientEncoding("WIN866"); 560 assert(c.exec("show client_encoding")[0][0].as!string == "WIN866"); 561 } 562 563 { 564 auto c = new Connection(connParam); 565 566 assert(c.transactionStatus == PQTRANS_IDLE); 567 568 c.exec("BEGIN"); 569 assert(c.transactionStatus == PQTRANS_INTRANS); 570 571 try c.exec("DISCARD ALL"); 572 catch (Exception) {} 573 assert(c.transactionStatus == PQTRANS_INERROR); 574 575 c.exec("ROLLBACK"); 576 assert(c.transactionStatus == PQTRANS_IDLE); 577 } 578 579 { 580 import std.exception: assertThrown; 581 582 string[string] kv; 583 kv["host"] = "wrong-host"; 584 kv["dbname"] = "wrong-db-name"; 585 586 assertThrown!ConnectionException(new Connection(kv)); 587 assertThrown!ConnectionException(new Connection(ConnectionStart(), kv)); 588 } 589 }