1 module dpq2.connection; 2 3 import dpq2.query; 4 import dpq2.args: QueryParams; 5 import dpq2.result; 6 import dpq2.exception; 7 8 import derelict.pq.pq; 9 import std.conv: to; 10 import std..string: toStringz, fromStringz; 11 import std.exception: enforce, enforceEx; 12 import std.range; 13 import std.stdio: File; 14 import std.socket; 15 import core.exception; 16 import core.time: Duration; 17 18 /* 19 * Bugs: On Unix connection is not thread safe. 20 * 21 * On Unix, forking a process with open libpq connections can lead 22 * to unpredictable results because the parent and child processes share 23 * the same sockets and operating system resources. For this reason, 24 * such usage is not recommended, though doing an exec from the child 25 * process to load a new executable is safe. 26 27 28 29 int PQisthreadsafe(); 30 Returns 1 if the libpq is thread-safe and 0 if it is not. 31 */ 32 33 /// dumb flag for Connection ctor parametrization 34 struct ConnectionStart {}; 35 36 /// Connection 37 class Connection 38 { 39 package PGconn* conn; 40 41 invariant 42 { 43 assert(conn !is null); 44 } 45 46 this(string connString) 47 { 48 conn = PQconnectdb(toStringz(connString)); 49 50 enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 51 52 if(status != CONNECTION_OK) 53 throw new ConnectionException(this, __FILE__, __LINE__); 54 } 55 56 /// Connect to DB in a nonblocking manner 57 this(ConnectionStart, string connString) 58 { 59 conn = PQconnectStart(toStringz(connString)); 60 61 enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 62 63 if( status == CONNECTION_BAD ) 64 throw new ConnectionException(this, __FILE__, __LINE__); 65 } 66 67 ~this() 68 { 69 PQfinish( conn ); 70 } 71 72 mixin Queries; 73 74 bool isNonBlocking() 75 { 76 return PQisnonblocking(conn) == 1; 77 } 78 79 private void setNonBlocking( bool state ) 80 { 81 if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 ) 82 throw new ConnectionException(this, __FILE__, __LINE__); 83 } 84 85 void resetStart() 86 { 87 if(PQresetStart(conn) == 0) 88 throw new ConnectionException(this, __FILE__, __LINE__); 89 } 90 91 PostgresPollingStatusType poll() nothrow 92 { 93 assert(conn); 94 95 return PQconnectPoll(conn); 96 } 97 98 PostgresPollingStatusType resetPoll() nothrow 99 { 100 assert(conn); 101 102 return PQresetPoll(conn); 103 } 104 105 ConnStatusType status() nothrow 106 { 107 return PQstatus(conn); 108 } 109 110 void consumeInput() 111 { 112 assert(conn); 113 114 const size_t r = PQconsumeInput( conn ); 115 if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__); 116 } 117 118 package bool flush() 119 { 120 assert(conn); 121 122 auto r = PQflush(conn); 123 if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__); 124 return r == 0; 125 } 126 127 int posixSocket() 128 { 129 int r = PQsocket(conn); 130 131 if(r == -1) 132 throw new ConnectionException(this, __FILE__, __LINE__); 133 134 return r; 135 } 136 137 socket_t posixSocketDuplicate() 138 { 139 version(Windows) 140 { 141 static assert(false, "FIXME: implement socket duplication"); 142 } 143 else // Posix OS 144 { 145 import core.sys.posix.unistd: dup; 146 147 return cast(socket_t) dup(cast(socket_t) posixSocket); 148 } 149 } 150 151 Socket socket() 152 { 153 return new Socket(posixSocketDuplicate, AddressFamily.UNSPEC); 154 } 155 156 string errorMessage() const nothrow 157 { 158 return PQerrorMessage(conn).to!string; 159 } 160 161 /** 162 * returns the previous notice receiver or processor function pointer, and sets the new value. 163 * If you supply a null function pointer, no action is taken, but the current pointer is returned. 164 */ 165 PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow 166 { 167 assert(conn); 168 169 return PQsetNoticeProcessor(conn, proc, arg); 170 } 171 172 /// Get for the next result from a sendQuery. Can return null. 173 immutable(Result) getResult() 174 { 175 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 176 auto r = cast(immutable) PQgetResult(conn); 177 178 if(r) 179 { 180 auto container = new immutable ResultContainer(r); 181 return new immutable Result(container); 182 } 183 184 return null; 185 } 186 187 /// Get result from PQexec* functions or throw error if pull is empty 188 package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const 189 { 190 if(r is null) throw new ConnectionException(this, __FILE__, __LINE__); 191 192 return new immutable ResultContainer(r); 193 } 194 195 bool setSingleRowMode() 196 { 197 return PQsetSingleRowMode(conn) == 1; 198 } 199 200 /** 201 If the cancellation is effective, the current command will 202 terminate early and return an error result (exception). If the 203 cancellation fails (say, because the server was already done 204 processing the command), then there will be no visible result at 205 all. 206 */ 207 void cancel() 208 { 209 auto c = new Cancellation(this); 210 c.doCancel; 211 } 212 213 bool isBusy() nothrow 214 { 215 assert(conn); 216 217 return PQisBusy(conn) == 1; 218 } 219 220 string parameterStatus(string paramName) 221 { 222 assert(conn); 223 224 auto res = PQparameterStatus(conn, toStringz(paramName)); 225 226 if(res is null) 227 throw new ConnectionException(this, __FILE__, __LINE__); 228 229 return to!string(fromStringz(res)); 230 } 231 232 string escapeLiteral(string msg) 233 { 234 assert(conn); 235 236 auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length); 237 238 if(buf is null) 239 throw new ConnectionException(this, __FILE__, __LINE__); 240 241 string res = buf.fromStringz.to!string; 242 243 PQfreemem(buf); 244 245 return res; 246 } 247 248 string escapeIdentifier(string msg) 249 { 250 assert(conn); 251 252 auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length); 253 254 if(buf is null) 255 throw new ConnectionException(this, __FILE__, __LINE__); 256 257 string res = buf.fromStringz.to!string; 258 259 PQfreemem(buf); 260 261 return res; 262 } 263 264 string dbName() const nothrow 265 { 266 assert(conn); 267 268 return PQdb(conn).fromStringz.to!string; 269 } 270 271 string host() const nothrow 272 { 273 assert(conn); 274 275 return PQhost(conn).fromStringz.to!string; 276 } 277 278 int protocolVersion() const nothrow 279 { 280 assert(conn); 281 282 return PQprotocolVersion(conn); 283 } 284 285 int serverVersion() const nothrow 286 { 287 assert(conn); 288 289 return PQserverVersion(conn); 290 } 291 292 void trace(ref File stream) 293 { 294 PQtrace(conn, stream.getFP); 295 } 296 297 void untrace() 298 { 299 PQuntrace(conn); 300 } 301 302 void setClientEncoding(string encoding) 303 { 304 if(PQsetClientEncoding(conn, encoding.toStringz) != 0) 305 throw new ConnectionException(this, __FILE__, __LINE__); 306 } 307 } 308 309 void connStringCheck(string connString) 310 { 311 char* errmsg = null; 312 PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg); 313 314 if(r is null) 315 { 316 enforceEx!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data"); 317 } 318 else 319 { 320 PQconninfoFree(r); 321 } 322 323 if(errmsg !is null) 324 { 325 string s = errmsg.fromStringz.to!string; 326 PQfreemem(cast(void*) errmsg); 327 328 throw new ConnectionException(s, __FILE__, __LINE__); 329 } 330 } 331 332 unittest 333 { 334 connStringCheck("dbname=postgres user=postgres"); 335 336 { 337 bool raised = false; 338 339 try 340 connStringCheck("wrong conninfo string"); 341 catch(ConnectionException e) 342 raised = true; 343 344 assert(raised); 345 } 346 } 347 348 /// Doing canceling queries in progress 349 class Cancellation 350 { 351 private PGcancel* cancel; 352 353 this(Connection c) 354 { 355 cancel = PQgetCancel(c.conn); 356 357 if(cancel is null) 358 throw new ConnectionException(c, __FILE__, __LINE__); 359 } 360 361 ~this() 362 { 363 PQfreeCancel(cancel); 364 } 365 366 /** 367 Successful dispatch is no guarantee that the request will have any 368 effect, however. If the cancellation is effective, the current 369 command will terminate early and return an error result 370 (exception). If the cancellation fails (say, because the server 371 was already done processing the command), then there will be no 372 visible result at all. 373 */ 374 void doCancel() 375 { 376 char[256] errbuf; 377 auto res = PQcancel(cancel, errbuf.ptr, errbuf.length); 378 379 if(res != 1) 380 throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__); 381 } 382 } 383 384 class CancellationException : Dpq2Exception 385 { 386 this(string msg, string file, size_t line) 387 { 388 super(msg, file, line); 389 } 390 } 391 392 /// Connection exception 393 class ConnectionException : Dpq2Exception 394 { 395 this(in Connection c, string file, size_t line) 396 { 397 super(c.errorMessage(), file, line); 398 } 399 400 this(string msg, string file, size_t line) 401 { 402 super(msg, file, line); 403 } 404 } 405 406 void _integration_test( string connParam ) 407 { 408 assert( PQlibVersion() >= 9_0100 ); 409 410 { 411 debug import std.experimental.logger; 412 413 auto c = new Connection(connParam); 414 auto dbname = c.dbName(); 415 auto pver = c.protocolVersion(); 416 auto sver = c.serverVersion(); 417 418 debug 419 { 420 trace("DB name: ", dbname); 421 trace("Protocol version: ", pver); 422 trace("Server version: ", sver); 423 } 424 425 destroy(c); 426 } 427 428 { 429 bool exceptionFlag = false; 430 431 try 432 auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!"); 433 catch(ConnectionException e) 434 { 435 exceptionFlag = true; 436 assert(e.msg.length > 40); // error message check 437 } 438 finally 439 assert(exceptionFlag); 440 } 441 442 { 443 auto c = new Connection(connParam); 444 445 assert(c.escapeLiteral("abc'def") == "'abc''def'"); 446 assert(c.escapeIdentifier("abc'def") == "\"abc'def\""); 447 448 c.setClientEncoding("WIN866"); 449 assert(c.exec("show client_encoding")[0][0].as!string == "WIN866"); 450 } 451 }