1 /** 2 * Represents connection to the PostgreSQL server 3 * 4 * Most functions is correspond to those in the documentation of Postgres: 5 * $(HTTPS https://www.postgresql.org/docs/current/static/libpq.html) 6 */ 7 module dpq2.connection; 8 9 import dpq2.query; 10 import dpq2.args: QueryParams; 11 import dpq2.result; 12 import dpq2.exception; 13 14 import derelict.pq.pq; 15 import std.conv: to; 16 import std.string: toStringz, fromStringz; 17 import std.exception: enforce; 18 import std.range; 19 import std.stdio: File; 20 import std.socket; 21 import core.exception; 22 import core.time: Duration; 23 24 /* 25 * Bugs: On Unix connection is not thread safe. 26 * 27 * On Unix, forking a process with open libpq connections can lead 28 * to unpredictable results because the parent and child processes share 29 * the same sockets and operating system resources. For this reason, 30 * such usage is not recommended, though doing an exec from the child 31 * process to load a new executable is safe. 32 33 34 35 int PQisthreadsafe(); 36 Returns 1 if the libpq is thread-safe and 0 if it is not. 37 */ 38 39 /// dumb flag for Connection ctor parametrization 40 struct ConnectionStart {}; 41 42 /// Connection 43 class Connection 44 { 45 package PGconn* conn; 46 47 invariant 48 { 49 assert(conn !is null); 50 } 51 52 /// Makes a new connection to the database server 53 this(string connString) 54 { 55 conn = PQconnectdb(toStringz(connString)); 56 57 enforce!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 58 59 if(status != CONNECTION_OK) 60 throw new ConnectionException(this, __FILE__, __LINE__); 61 } 62 63 /// Starts creation of a connection to the database server in a nonblocking manner 64 this(ConnectionStart, string connString) 65 { 66 conn = PQconnectStart(toStringz(connString)); 67 68 enforce!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 69 70 if( status == CONNECTION_BAD ) 71 throw new ConnectionException(this, __FILE__, __LINE__); 72 } 73 74 ~this() 75 { 76 PQfinish( conn ); 77 } 78 79 mixin Queries; 80 81 /// Returns the blocking status of the database connection 82 bool isNonBlocking() 83 { 84 return PQisnonblocking(conn) == 1; 85 } 86 87 /// Sets the nonblocking status of the connection 88 private void setNonBlocking(bool state) 89 { 90 if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 ) 91 throw new ConnectionException(this, __FILE__, __LINE__); 92 } 93 94 /// Begin reset the communication channel to the server, in a nonblocking manner 95 /// 96 /// Useful only for non-blocking operations. 97 void resetStart() 98 { 99 if(PQresetStart(conn) == 0) 100 throw new ConnectionException(this, __FILE__, __LINE__); 101 } 102 103 /// Useful only for non-blocking operations. 104 PostgresPollingStatusType poll() nothrow 105 { 106 assert(conn); 107 108 return PQconnectPoll(conn); 109 } 110 111 /// Useful only for non-blocking operations. 112 PostgresPollingStatusType resetPoll() nothrow 113 { 114 assert(conn); 115 116 return PQresetPoll(conn); 117 } 118 119 /// Returns the status of the connection 120 ConnStatusType status() nothrow 121 { 122 return PQstatus(conn); 123 } 124 125 /** 126 Returns the current in-transaction status of the server. 127 The status can be: 128 * PQTRANS_IDLE - currently idle 129 * PQTRANS_ACTIVE - a command is in progress (reported only when a query has been sent to the server and not yet completed) 130 * PQTRANS_INTRANS - idle, in a valid transaction block 131 * PQTRANS_INERROR - idle, in a failed transaction block 132 * PQTRANS_UNKNOWN - reported if the connection is bad 133 */ 134 PGTransactionStatusType transactionStatus() nothrow 135 { 136 return PQtransactionStatus(conn); 137 } 138 139 /// If input is available from the server, consume it 140 /// 141 /// Useful only for non-blocking operations. 142 void consumeInput() 143 { 144 assert(conn); 145 146 const size_t r = PQconsumeInput( conn ); 147 if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__); 148 } 149 150 package bool flush() 151 { 152 assert(conn); 153 154 auto r = PQflush(conn); 155 if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__); 156 return r == 0; 157 } 158 159 /// Obtains the file descriptor number of the connection socket to the server 160 int posixSocket() 161 { 162 int r = PQsocket(conn); 163 164 if(r == -1) 165 throw new ConnectionException(this, __FILE__, __LINE__); 166 167 return r; 168 } 169 170 /// Obtains duplicate file descriptor number of the connection socket to the server 171 socket_t posixSocketDuplicate() 172 { 173 version(Windows) 174 { 175 assert(false, "FIXME: implement socket duplication"); 176 } 177 else // Posix OS 178 { 179 import core.sys.posix.unistd: dup; 180 181 return cast(socket_t) dup(cast(socket_t) posixSocket); 182 } 183 } 184 185 /// Obtains std.socket.Socket of the connection to the server 186 /// 187 /// Due to a limitation of Socket actually for the Socket creation 188 /// duplicate of internal posix socket will be used. 189 Socket socket() 190 { 191 return new Socket(posixSocketDuplicate, AddressFamily.UNSPEC); 192 } 193 194 /// Returns the error message most recently generated by an operation on the connection 195 string errorMessage() const nothrow 196 { 197 return PQerrorMessage(conn).to!string; 198 } 199 200 /** 201 * Sets or examines the current notice processor 202 * 203 * Returns the previous notice receiver or processor function pointer, and sets the new value. 204 * If you supply a null function pointer, no action is taken, but the current pointer is returned. 205 */ 206 PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow 207 { 208 assert(conn); 209 210 return PQsetNoticeProcessor(conn, proc, arg); 211 } 212 213 /// Get next result after sending a non-blocking commands. Can return null. 214 /// 215 /// Useful only for non-blocking operations. 216 immutable(Result) getResult() 217 { 218 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 219 auto r = cast(immutable) PQgetResult(conn); 220 221 if(r) 222 { 223 auto container = new immutable ResultContainer(r); 224 return new immutable Result(container); 225 } 226 227 return null; 228 } 229 230 /// Get result after PQexec* functions or throw exception if pull is empty 231 package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const 232 { 233 if(r is null) throw new ConnectionException(this, __FILE__, __LINE__); 234 235 return new immutable ResultContainer(r); 236 } 237 238 /// Select single-row mode for the currently-executing query 239 bool setSingleRowMode() 240 { 241 return PQsetSingleRowMode(conn) == 1; 242 } 243 244 /** 245 Try to cancel query 246 247 If the cancellation is effective, the current command will 248 terminate early and return an error result or exception. If the 249 cancellation will fails (say, because the server was already done 250 processing the command) there will be no visible result at all. 251 */ 252 void cancel() 253 { 254 auto c = new Cancellation(this); 255 c.doCancel; 256 } 257 258 /// 259 bool isBusy() nothrow 260 { 261 assert(conn); 262 263 return PQisBusy(conn) == 1; 264 } 265 266 /// 267 string parameterStatus(string paramName) 268 { 269 assert(conn); 270 271 auto res = PQparameterStatus(conn, toStringz(paramName)); 272 273 if(res is null) 274 throw new ConnectionException(this, __FILE__, __LINE__); 275 276 return to!string(fromStringz(res)); 277 } 278 279 /// 280 string escapeLiteral(string msg) 281 { 282 assert(conn); 283 284 auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length); 285 286 if(buf is null) 287 throw new ConnectionException(this, __FILE__, __LINE__); 288 289 string res = buf.fromStringz.to!string; 290 291 PQfreemem(buf); 292 293 return res; 294 } 295 296 /// 297 string escapeIdentifier(string msg) 298 { 299 assert(conn); 300 301 auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length); 302 303 if(buf is null) 304 throw new ConnectionException(this, __FILE__, __LINE__); 305 306 string res = buf.fromStringz.to!string; 307 308 PQfreemem(buf); 309 310 return res; 311 } 312 313 /// 314 string dbName() const nothrow 315 { 316 assert(conn); 317 318 return PQdb(conn).fromStringz.to!string; 319 } 320 321 /// 322 string host() const nothrow 323 { 324 assert(conn); 325 326 return PQhost(conn).fromStringz.to!string; 327 } 328 329 /// 330 int protocolVersion() const nothrow 331 { 332 assert(conn); 333 334 return PQprotocolVersion(conn); 335 } 336 337 /// 338 int serverVersion() const nothrow 339 { 340 assert(conn); 341 342 return PQserverVersion(conn); 343 } 344 345 /// 346 void trace(ref File stream) 347 { 348 PQtrace(conn, stream.getFP); 349 } 350 351 /// 352 void untrace() 353 { 354 PQuntrace(conn); 355 } 356 357 /// 358 void setClientEncoding(string encoding) 359 { 360 if(PQsetClientEncoding(conn, encoding.toStringz) != 0) 361 throw new ConnectionException(this, __FILE__, __LINE__); 362 } 363 } 364 365 /// Check connection options in the provided connection string 366 /// 367 /// Throws exception if connection string isn't passes check. 368 void connStringCheck(string connString) 369 { 370 char* errmsg = null; 371 PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg); 372 373 if(r is null) 374 { 375 enforce!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data"); 376 } 377 else 378 { 379 PQconninfoFree(r); 380 } 381 382 if(errmsg !is null) 383 { 384 string s = errmsg.fromStringz.to!string; 385 PQfreemem(cast(void*) errmsg); 386 387 throw new ConnectionException(s, __FILE__, __LINE__); 388 } 389 } 390 391 unittest 392 { 393 connStringCheck("dbname=postgres user=postgres"); 394 395 { 396 bool raised = false; 397 398 try 399 connStringCheck("wrong conninfo string"); 400 catch(ConnectionException e) 401 raised = true; 402 403 assert(raised); 404 } 405 } 406 407 /// Represents query cancellation process 408 class Cancellation 409 { 410 private PGcancel* cancel; 411 412 /// 413 this(Connection c) 414 { 415 cancel = PQgetCancel(c.conn); 416 417 if(cancel is null) 418 throw new ConnectionException(c, __FILE__, __LINE__); 419 } 420 421 /// 422 ~this() 423 { 424 PQfreeCancel(cancel); 425 } 426 427 /** 428 Requests that the server abandon processing of the current command 429 430 Throws exception if cancel request was not successfully dispatched. 431 432 Successful dispatch is no guarantee that the request will have any 433 effect, however. If the cancellation is effective, the current 434 command will terminate early and return an error result 435 (exception). If the cancellation fails (say, because the server 436 was already done processing the command), then there will be no 437 visible result at all. 438 */ 439 void doCancel() 440 { 441 char[256] errbuf; 442 auto res = PQcancel(cancel, errbuf.ptr, errbuf.length); 443 444 if(res != 1) 445 throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__); 446 } 447 } 448 449 /// 450 class CancellationException : Dpq2Exception 451 { 452 this(string msg, string file = __FILE__, size_t line = __LINE__) 453 { 454 super(msg, file, line); 455 } 456 } 457 458 /// Connection exception 459 class ConnectionException : Dpq2Exception 460 { 461 this(in Connection c, string file = __FILE__, size_t line = __LINE__) 462 { 463 super(c.errorMessage(), file, line); 464 } 465 466 this(string msg, string file = __FILE__, size_t line = __LINE__) 467 { 468 super(msg, file, line); 469 } 470 } 471 472 version (integration_tests) 473 void _integration_test( string connParam ) 474 { 475 assert( PQlibVersion() >= 9_0100 ); 476 477 { 478 debug import std.experimental.logger; 479 480 auto c = new Connection(connParam); 481 auto dbname = c.dbName(); 482 auto pver = c.protocolVersion(); 483 auto sver = c.serverVersion(); 484 485 debug 486 { 487 trace("DB name: ", dbname); 488 trace("Protocol version: ", pver); 489 trace("Server version: ", sver); 490 } 491 492 destroy(c); 493 } 494 495 { 496 bool exceptionFlag = false; 497 498 try 499 auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!"); 500 catch(ConnectionException e) 501 { 502 exceptionFlag = true; 503 assert(e.msg.length > 40); // error message check 504 } 505 finally 506 assert(exceptionFlag); 507 } 508 509 { 510 auto c = new Connection(connParam); 511 512 assert(c.escapeLiteral("abc'def") == "'abc''def'"); 513 assert(c.escapeIdentifier("abc'def") == "\"abc'def\""); 514 515 c.setClientEncoding("WIN866"); 516 assert(c.exec("show client_encoding")[0][0].as!string == "WIN866"); 517 } 518 519 { 520 auto c = new Connection(connParam); 521 522 assert(c.transactionStatus == PQTRANS_IDLE); 523 524 c.exec("BEGIN"); 525 assert(c.transactionStatus == PQTRANS_INTRANS); 526 527 try c.exec("DISCARD ALL"); 528 catch (Exception) {} 529 assert(c.transactionStatus == PQTRANS_INERROR); 530 531 c.exec("ROLLBACK"); 532 assert(c.transactionStatus == PQTRANS_IDLE); 533 } 534 }