1 /** 2 * Represents connection to the PostgreSQL server 3 * 4 * Most functions is correspond to those in the documentation of Postgres: 5 * $(HTTPS https://www.postgresql.org/docs/current/static/libpq.html) 6 */ 7 module dpq2.connection; 8 9 import dpq2.query; 10 import dpq2.args: QueryParams; 11 import dpq2.result; 12 import dpq2.exception; 13 14 import derelict.pq.pq; 15 import std.conv: to; 16 import std..string: toStringz, fromStringz; 17 import std.exception: enforce, enforceEx; 18 import std.range; 19 import std.stdio: File; 20 import std.socket; 21 import core.exception; 22 import core.time: Duration; 23 24 /* 25 * Bugs: On Unix connection is not thread safe. 26 * 27 * On Unix, forking a process with open libpq connections can lead 28 * to unpredictable results because the parent and child processes share 29 * the same sockets and operating system resources. For this reason, 30 * such usage is not recommended, though doing an exec from the child 31 * process to load a new executable is safe. 32 33 34 35 int PQisthreadsafe(); 36 Returns 1 if the libpq is thread-safe and 0 if it is not. 37 */ 38 39 /// dumb flag for Connection ctor parametrization 40 struct ConnectionStart {}; 41 42 /// Connection 43 class Connection 44 { 45 package PGconn* conn; 46 47 invariant 48 { 49 assert(conn !is null); 50 } 51 52 /// Makes a new connection to the database server 53 this(string connString) 54 { 55 conn = PQconnectdb(toStringz(connString)); 56 57 enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 58 59 if(status != CONNECTION_OK) 60 throw new ConnectionException(this, __FILE__, __LINE__); 61 } 62 63 /// Starts creation of a connection to the database server in a nonblocking manner 64 this(ConnectionStart, string connString) 65 { 66 conn = PQconnectStart(toStringz(connString)); 67 68 enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 69 70 if( status == CONNECTION_BAD ) 71 throw new ConnectionException(this, __FILE__, __LINE__); 72 } 73 74 ~this() 75 { 76 PQfinish( conn ); 77 } 78 79 mixin Queries; 80 81 /// Returns the blocking status of the database connection 82 bool isNonBlocking() 83 { 84 return PQisnonblocking(conn) == 1; 85 } 86 87 /// Sets the nonblocking status of the connection 88 private void setNonBlocking(bool state) 89 { 90 if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 ) 91 throw new ConnectionException(this, __FILE__, __LINE__); 92 } 93 94 /// Begin reset the communication channel to the server, in a nonblocking manner 95 /// 96 /// Useful only for non-blocking operations. 97 void resetStart() 98 { 99 if(PQresetStart(conn) == 0) 100 throw new ConnectionException(this, __FILE__, __LINE__); 101 } 102 103 /// Useful only for non-blocking operations. 104 PostgresPollingStatusType poll() nothrow 105 { 106 assert(conn); 107 108 return PQconnectPoll(conn); 109 } 110 111 /// Useful only for non-blocking operations. 112 PostgresPollingStatusType resetPoll() nothrow 113 { 114 assert(conn); 115 116 return PQresetPoll(conn); 117 } 118 119 /// Returns the status of the connection 120 ConnStatusType status() nothrow 121 { 122 return PQstatus(conn); 123 } 124 125 /// If input is available from the server, consume it 126 /// 127 /// Useful only for non-blocking operations. 128 void consumeInput() 129 { 130 assert(conn); 131 132 const size_t r = PQconsumeInput( conn ); 133 if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__); 134 } 135 136 package bool flush() 137 { 138 assert(conn); 139 140 auto r = PQflush(conn); 141 if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__); 142 return r == 0; 143 } 144 145 /// Obtains the file descriptor number of the connection socket to the server 146 int posixSocket() 147 { 148 int r = PQsocket(conn); 149 150 if(r == -1) 151 throw new ConnectionException(this, __FILE__, __LINE__); 152 153 return r; 154 } 155 156 /// Obtains duplicate file descriptor number of the connection socket to the server 157 socket_t posixSocketDuplicate() 158 { 159 version(Windows) 160 { 161 static assert(false, "FIXME: implement socket duplication"); 162 } 163 else // Posix OS 164 { 165 import core.sys.posix.unistd: dup; 166 167 return cast(socket_t) dup(cast(socket_t) posixSocket); 168 } 169 } 170 171 /// Obtains std.socket.Socket of the connection to the server 172 /// 173 /// Due to a limitation of Socket actually for the Socket creation 174 /// duplicate of internal posix socket will be used. 175 Socket socket() 176 { 177 return new Socket(posixSocketDuplicate, AddressFamily.UNSPEC); 178 } 179 180 /// Returns the error message most recently generated by an operation on the connection 181 string errorMessage() const nothrow 182 { 183 return PQerrorMessage(conn).to!string; 184 } 185 186 /** 187 * Sets or examines the current notice processor 188 * 189 * Returns the previous notice receiver or processor function pointer, and sets the new value. 190 * If you supply a null function pointer, no action is taken, but the current pointer is returned. 191 */ 192 PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow 193 { 194 assert(conn); 195 196 return PQsetNoticeProcessor(conn, proc, arg); 197 } 198 199 /// Get next result after sending a non-blocking commands. Can return null. 200 /// 201 /// Useful only for non-blocking operations. 202 immutable(Result) getResult() 203 { 204 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 205 auto r = cast(immutable) PQgetResult(conn); 206 207 if(r) 208 { 209 auto container = new immutable ResultContainer(r); 210 return new immutable Result(container); 211 } 212 213 return null; 214 } 215 216 /// Get result after PQexec* functions or throw exception if pull is empty 217 package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const 218 { 219 if(r is null) throw new ConnectionException(this, __FILE__, __LINE__); 220 221 return new immutable ResultContainer(r); 222 } 223 224 /// Select single-row mode for the currently-executing query 225 bool setSingleRowMode() 226 { 227 return PQsetSingleRowMode(conn) == 1; 228 } 229 230 /** 231 Try to cancel query 232 233 If the cancellation is effective, the current command will 234 terminate early and return an error result or exception. If the 235 cancellation will fails (say, because the server was already done 236 processing the command) there will be no visible result at all. 237 */ 238 void cancel() 239 { 240 auto c = new Cancellation(this); 241 c.doCancel; 242 } 243 244 /// 245 bool isBusy() nothrow 246 { 247 assert(conn); 248 249 return PQisBusy(conn) == 1; 250 } 251 252 /// 253 string parameterStatus(string paramName) 254 { 255 assert(conn); 256 257 auto res = PQparameterStatus(conn, toStringz(paramName)); 258 259 if(res is null) 260 throw new ConnectionException(this, __FILE__, __LINE__); 261 262 return to!string(fromStringz(res)); 263 } 264 265 /// 266 string escapeLiteral(string msg) 267 { 268 assert(conn); 269 270 auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length); 271 272 if(buf is null) 273 throw new ConnectionException(this, __FILE__, __LINE__); 274 275 string res = buf.fromStringz.to!string; 276 277 PQfreemem(buf); 278 279 return res; 280 } 281 282 /// 283 string escapeIdentifier(string msg) 284 { 285 assert(conn); 286 287 auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length); 288 289 if(buf is null) 290 throw new ConnectionException(this, __FILE__, __LINE__); 291 292 string res = buf.fromStringz.to!string; 293 294 PQfreemem(buf); 295 296 return res; 297 } 298 299 /// 300 string dbName() const nothrow 301 { 302 assert(conn); 303 304 return PQdb(conn).fromStringz.to!string; 305 } 306 307 /// 308 string host() const nothrow 309 { 310 assert(conn); 311 312 return PQhost(conn).fromStringz.to!string; 313 } 314 315 /// 316 int protocolVersion() const nothrow 317 { 318 assert(conn); 319 320 return PQprotocolVersion(conn); 321 } 322 323 /// 324 int serverVersion() const nothrow 325 { 326 assert(conn); 327 328 return PQserverVersion(conn); 329 } 330 331 /// 332 void trace(ref File stream) 333 { 334 PQtrace(conn, stream.getFP); 335 } 336 337 /// 338 void untrace() 339 { 340 PQuntrace(conn); 341 } 342 343 /// 344 void setClientEncoding(string encoding) 345 { 346 if(PQsetClientEncoding(conn, encoding.toStringz) != 0) 347 throw new ConnectionException(this, __FILE__, __LINE__); 348 } 349 } 350 351 /// Check connection options in the provided connection string 352 /// 353 /// Throws exception if connection string isn't passes check. 354 void connStringCheck(string connString) 355 { 356 char* errmsg = null; 357 PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg); 358 359 if(r is null) 360 { 361 enforceEx!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data"); 362 } 363 else 364 { 365 PQconninfoFree(r); 366 } 367 368 if(errmsg !is null) 369 { 370 string s = errmsg.fromStringz.to!string; 371 PQfreemem(cast(void*) errmsg); 372 373 throw new ConnectionException(s, __FILE__, __LINE__); 374 } 375 } 376 377 unittest 378 { 379 connStringCheck("dbname=postgres user=postgres"); 380 381 { 382 bool raised = false; 383 384 try 385 connStringCheck("wrong conninfo string"); 386 catch(ConnectionException e) 387 raised = true; 388 389 assert(raised); 390 } 391 } 392 393 /// Represents query cancellation process 394 class Cancellation 395 { 396 private PGcancel* cancel; 397 398 /// 399 this(Connection c) 400 { 401 cancel = PQgetCancel(c.conn); 402 403 if(cancel is null) 404 throw new ConnectionException(c, __FILE__, __LINE__); 405 } 406 407 /// 408 ~this() 409 { 410 PQfreeCancel(cancel); 411 } 412 413 /** 414 Requests that the server abandon processing of the current command 415 416 Throws exception if cancel request was not successfully dispatched. 417 418 Successful dispatch is no guarantee that the request will have any 419 effect, however. If the cancellation is effective, the current 420 command will terminate early and return an error result 421 (exception). If the cancellation fails (say, because the server 422 was already done processing the command), then there will be no 423 visible result at all. 424 */ 425 void doCancel() 426 { 427 char[256] errbuf; 428 auto res = PQcancel(cancel, errbuf.ptr, errbuf.length); 429 430 if(res != 1) 431 throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__); 432 } 433 } 434 435 /// 436 class CancellationException : Dpq2Exception 437 { 438 this(string msg, string file, size_t line) 439 { 440 super(msg, file, line); 441 } 442 } 443 444 /// Connection exception 445 class ConnectionException : Dpq2Exception 446 { 447 this(in Connection c, string file, size_t line) 448 { 449 super(c.errorMessage(), file, line); 450 } 451 452 this(string msg, string file, size_t line) 453 { 454 super(msg, file, line); 455 } 456 } 457 458 version (integration_tests) 459 void _integration_test( string connParam ) 460 { 461 assert( PQlibVersion() >= 9_0100 ); 462 463 { 464 debug import std.experimental.logger; 465 466 auto c = new Connection(connParam); 467 auto dbname = c.dbName(); 468 auto pver = c.protocolVersion(); 469 auto sver = c.serverVersion(); 470 471 debug 472 { 473 trace("DB name: ", dbname); 474 trace("Protocol version: ", pver); 475 trace("Server version: ", sver); 476 } 477 478 destroy(c); 479 } 480 481 { 482 bool exceptionFlag = false; 483 484 try 485 auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!"); 486 catch(ConnectionException e) 487 { 488 exceptionFlag = true; 489 assert(e.msg.length > 40); // error message check 490 } 491 finally 492 assert(exceptionFlag); 493 } 494 495 { 496 auto c = new Connection(connParam); 497 498 assert(c.escapeLiteral("abc'def") == "'abc''def'"); 499 assert(c.escapeIdentifier("abc'def") == "\"abc'def\""); 500 501 c.setClientEncoding("WIN866"); 502 assert(c.exec("show client_encoding")[0][0].as!string == "WIN866"); 503 } 504 }