1 module dpq2.connection; 2 3 import dpq2; 4 import dpq2.exception; 5 6 import std.conv: to; 7 import std..string: toStringz, fromStringz; 8 import std.exception: enforce, enforceEx; 9 import std.range; 10 import std.stdio: File; 11 import std.socket; 12 import core.exception; 13 import core.time: Duration; 14 15 /* 16 * Bugs: On Unix connection is not thread safe. 17 * 18 * On Unix, forking a process with open libpq connections can lead 19 * to unpredictable results because the parent and child processes share 20 * the same sockets and operating system resources. For this reason, 21 * such usage is not recommended, though doing an exec from the child 22 * process to load a new executable is safe. 23 24 25 26 int PQisthreadsafe(); 27 Returns 1 if the libpq is thread-safe and 0 if it is not. 28 */ 29 30 /// dumb flag for Connection ctor parametrization 31 struct ConnectionStart {}; 32 33 /// Connection 34 class Connection 35 { 36 package PGconn* conn; 37 38 invariant 39 { 40 assert(conn !is null); 41 } 42 43 this(string connString) 44 { 45 conn = PQconnectdb(toStringz(connString)); 46 47 enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 48 49 if(status != CONNECTION_OK) 50 throw new ConnectionException(this, __FILE__, __LINE__); 51 } 52 53 /// Connect to DB in a nonblocking manner 54 this(ConnectionStart, string connString) 55 { 56 conn = PQconnectStart(toStringz(connString)); 57 58 enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 59 60 if( status == CONNECTION_BAD ) 61 throw new ConnectionException(this, __FILE__, __LINE__); 62 } 63 64 ~this() 65 { 66 PQfinish( conn ); 67 } 68 69 mixin Queries; 70 71 bool isNonBlocking() 72 { 73 return PQisnonblocking(conn) == 1; 74 } 75 76 private void setNonBlocking( bool state ) 77 { 78 if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 ) 79 throw new ConnectionException(this, __FILE__, __LINE__); 80 } 81 82 void resetStart() 83 { 84 if(PQresetStart(conn) == 0) 85 throw new ConnectionException(this, __FILE__, __LINE__); 86 } 87 88 PostgresPollingStatusType poll() nothrow 89 { 90 assert(conn); 91 92 return PQconnectPoll(conn); 93 } 94 95 PostgresPollingStatusType resetPoll() nothrow 96 { 97 assert(conn); 98 99 return PQresetPoll(conn); 100 } 101 102 ConnStatusType status() nothrow 103 { 104 return PQstatus(conn); 105 } 106 107 void consumeInput() 108 { 109 assert(conn); 110 111 const size_t r = PQconsumeInput( conn ); 112 if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__); 113 } 114 115 package bool flush() 116 { 117 assert(conn); 118 119 auto r = PQflush(conn); 120 if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__); 121 return r == 0; 122 } 123 124 int posixSocket() 125 { 126 int r = PQsocket(conn); 127 128 if(r == -1) 129 throw new ConnectionException(this, __FILE__, __LINE__); 130 131 return r; 132 } 133 134 Socket socket() 135 { 136 version(Windows) 137 { 138 assert(false, "FIXME: implement socket duplication"); 139 } 140 else // Posix 141 { 142 import core.sys.posix.unistd: dup; 143 144 socket_t s = cast(socket_t) dup(cast(socket_t) posixSocket); 145 return new Socket(s, AddressFamily.UNSPEC); 146 } 147 } 148 149 string errorMessage() const nothrow 150 { 151 return PQerrorMessage(conn).to!string; 152 } 153 154 /** 155 * returns the previous notice receiver or processor function pointer, and sets the new value. 156 * If you supply a null function pointer, no action is taken, but the current pointer is returned. 157 */ 158 PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow 159 { 160 assert(conn); 161 162 return PQsetNoticeProcessor(conn, proc, arg); 163 } 164 165 /// Get for the next result from a sendQuery. Can return null. 166 immutable(Result) getResult() 167 { 168 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 169 auto r = cast(immutable) PQgetResult(conn); 170 171 if(r) 172 { 173 auto container = new immutable ResultContainer(r); 174 return new immutable Result(container); 175 } 176 177 return null; 178 } 179 180 /// Get result from PQexec* functions or throw error if pull is empty 181 package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const 182 { 183 if(r is null) throw new ConnectionException(this, __FILE__, __LINE__); 184 185 return new immutable ResultContainer(r); 186 } 187 188 bool setSingleRowMode() 189 { 190 return PQsetSingleRowMode(conn) == 1; 191 } 192 193 /** 194 If the cancellation is effective, the current command will 195 terminate early and return an error result (exception). If the 196 cancellation fails (say, because the server was already done 197 processing the command), then there will be no visible result at 198 all. 199 */ 200 void cancel() 201 { 202 auto c = new Cancellation(this); 203 c.doCancel; 204 } 205 206 bool isBusy() nothrow 207 { 208 assert(conn); 209 210 return PQisBusy(conn) == 1; 211 } 212 213 string parameterStatus(string paramName) 214 { 215 assert(conn); 216 217 auto res = PQparameterStatus(conn, toStringz(paramName)); 218 219 if(res is null) 220 throw new ConnectionException(this, __FILE__, __LINE__); 221 222 return to!string(fromStringz(res)); 223 } 224 225 string escapeLiteral(string msg) 226 { 227 assert(conn); 228 229 auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length); 230 231 if(buf is null) 232 throw new ConnectionException(this, __FILE__, __LINE__); 233 234 string res = buf.fromStringz.to!string; 235 236 PQfreemem(buf); 237 238 return res; 239 } 240 241 string escapeIdentifier(string msg) 242 { 243 assert(conn); 244 245 auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length); 246 247 if(buf is null) 248 throw new ConnectionException(this, __FILE__, __LINE__); 249 250 string res = buf.fromStringz.to!string; 251 252 PQfreemem(buf); 253 254 return res; 255 } 256 257 string dbName() const nothrow 258 { 259 assert(conn); 260 261 return PQdb(conn).fromStringz.to!string; 262 } 263 264 string host() const nothrow 265 { 266 assert(conn); 267 268 return PQhost(conn).fromStringz.to!string; 269 } 270 271 int protocolVersion() const nothrow 272 { 273 assert(conn); 274 275 return PQprotocolVersion(conn); 276 } 277 278 int serverVersion() const nothrow 279 { 280 assert(conn); 281 282 return PQserverVersion(conn); 283 } 284 285 void trace(ref File stream) 286 { 287 PQtrace(conn, stream.getFP); 288 } 289 290 void untrace() 291 { 292 PQuntrace(conn); 293 } 294 295 void setClientEncoding(string encoding) 296 { 297 if(PQsetClientEncoding(conn, encoding.toStringz) != 0) 298 throw new ConnectionException(this, __FILE__, __LINE__); 299 } 300 } 301 302 void connStringCheck(string connString) 303 { 304 char* errmsg = null; 305 PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg); 306 307 if(r is null) 308 { 309 enforceEx!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data"); 310 } 311 else 312 { 313 PQconninfoFree(r); 314 } 315 316 if(errmsg !is null) 317 { 318 string s = errmsg.fromStringz.to!string; 319 PQfreemem(cast(void*) errmsg); 320 321 throw new ConnectionException(s, __FILE__, __LINE__); 322 } 323 } 324 325 unittest 326 { 327 connStringCheck("dbname=postgres user=postgres"); 328 329 { 330 bool raised = false; 331 332 try 333 connStringCheck("wrong conninfo string"); 334 catch(ConnectionException e) 335 raised = true; 336 337 assert(raised); 338 } 339 } 340 341 /// Doing canceling queries in progress 342 class Cancellation 343 { 344 private PGcancel* cancel; 345 346 this(Connection c) 347 { 348 cancel = PQgetCancel(c.conn); 349 350 if(cancel is null) 351 throw new ConnectionException(c, __FILE__, __LINE__); 352 } 353 354 ~this() 355 { 356 PQfreeCancel(cancel); 357 } 358 359 /** 360 Successful dispatch is no guarantee that the request will have any 361 effect, however. If the cancellation is effective, the current 362 command will terminate early and return an error result 363 (exception). If the cancellation fails (say, because the server 364 was already done processing the command), then there will be no 365 visible result at all. 366 */ 367 void doCancel() 368 { 369 char[256] errbuf; 370 auto res = PQcancel(cancel, errbuf.ptr, errbuf.length); 371 372 if(res != 1) 373 throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__); 374 } 375 } 376 377 class CancellationException : Dpq2Exception 378 { 379 this(string msg, string file, size_t line) 380 { 381 super(msg, file, line); 382 } 383 } 384 385 /// Connection exception 386 class ConnectionException : Dpq2Exception 387 { 388 this(in Connection c, string file, size_t line) 389 { 390 super(c.errorMessage(), file, line); 391 } 392 393 this(string msg, string file, size_t line) 394 { 395 super(msg, file, line); 396 } 397 } 398 399 void _integration_test( string connParam ) 400 { 401 assert( PQlibVersion() >= 9_0100 ); 402 403 { 404 debug import std.experimental.logger; 405 406 auto c = new Connection(connParam); 407 auto dbname = c.dbName(); 408 auto pver = c.protocolVersion(); 409 auto sver = c.serverVersion(); 410 411 debug 412 { 413 trace("DB name: ", dbname); 414 trace("Protocol version: ", pver); 415 trace("Server version: ", sver); 416 } 417 418 destroy(c); 419 } 420 421 { 422 bool exceptionFlag = false; 423 424 try 425 auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!"); 426 catch(ConnectionException e) 427 { 428 exceptionFlag = true; 429 assert(e.msg.length > 40); // error message check 430 } 431 finally 432 assert(exceptionFlag); 433 } 434 435 { 436 auto c = new Connection(connParam); 437 438 assert(c.escapeLiteral("abc'def") == "'abc''def'"); 439 assert(c.escapeIdentifier("abc'def") == "\"abc'def\""); 440 441 c.setClientEncoding("WIN866"); 442 assert(c.exec("show client_encoding")[0][0].as!string == "WIN866"); 443 } 444 }