1 module dpq2.connection; 2 3 @trusted: 4 5 import dpq2; 6 7 import std.conv: to; 8 import std..string: toStringz, fromStringz; 9 import std.exception: enforceEx; 10 import std.range; 11 import std.stdio: File; 12 import std.socket; 13 import core.exception; 14 15 /* 16 * Bugs: On Unix connection is not thread safe. 17 * 18 * On Unix, forking a process with open libpq connections can lead 19 * to unpredictable results because the parent and child processes share 20 * the same sockets and operating system resources. For this reason, 21 * such usage is not recommended, though doing an exec from the child 22 * process to load a new executable is safe. 23 24 25 26 int PQisthreadsafe(); 27 Returns 1 if the libpq is thread-safe and 0 if it is not. 28 */ 29 30 /// dumb flag for Connection ctor parametrization 31 struct ConnectionStart {}; 32 33 /// Connection 34 class Connection 35 { 36 //string connString; /// Database connection parameters 37 package PGconn* conn; 38 39 invariant 40 { 41 assert(conn !is null); 42 } 43 44 this(string connString) 45 { 46 conn = PQconnectdb(toStringz(connString)); 47 48 enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 49 50 if(status != CONNECTION_OK) 51 throw new ConnectionException(this, __FILE__, __LINE__); 52 } 53 54 /// Connect to DB in a nonblocking manner 55 this(ConnectionStart, string connString) 56 { 57 conn = PQconnectStart(cast(char*) toStringz(connString)); // TODO: wrong DerelictPQ args 58 59 enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 60 61 if( status == CONNECTION_BAD ) 62 throw new ConnectionException(this, __FILE__, __LINE__); 63 } 64 65 ~this() 66 { 67 PQfinish( conn ); 68 } 69 70 mixin Queries; 71 72 @property bool isNonBlocking() 73 { 74 return PQisnonblocking(conn) == 1; 75 } 76 77 private void setNonBlocking( bool state ) 78 { 79 if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 ) 80 throw new ConnectionException(this, __FILE__, __LINE__); 81 } 82 83 void resetStart() 84 { 85 if(PQresetStart(conn) == 0) 86 throw new ConnectionException(this, __FILE__, __LINE__); 87 } 88 89 PostgresPollingStatusType poll() nothrow 90 { 91 assert(conn); 92 93 return PQconnectPoll(conn); 94 } 95 96 PostgresPollingStatusType resetPoll() nothrow 97 { 98 assert(conn); 99 100 return PQresetPoll(conn); 101 } 102 103 ConnStatusType status() nothrow 104 { 105 return PQstatus(conn); 106 } 107 108 void consumeInput() 109 { 110 assert(conn); 111 112 const size_t r = PQconsumeInput( conn ); 113 if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__); 114 } 115 116 package bool flush() 117 { 118 assert(conn); 119 120 auto r = PQflush(conn); 121 if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__); 122 return r == 0; 123 } 124 125 int posixSocket() 126 { 127 int r = PQsocket(conn); 128 129 if(r == -1) 130 throw new ConnectionException(this, __FILE__, __LINE__); 131 132 return r; 133 } 134 135 Socket socket() 136 { 137 import core.sys.posix.unistd: dup; 138 139 socket_t s = cast(socket_t) dup(cast(socket_t) posixSocket); 140 return new Socket(s, AddressFamily.UNSPEC); 141 } 142 143 string errorMessage() const nothrow 144 { 145 return to!string(PQerrorMessage(cast(PGconn*) conn)); //TODO: need report to derelict pq 146 } 147 148 /** 149 * returns the previous notice receiver or processor function pointer, and sets the new value. 150 * If you supply a null function pointer, no action is taken, but the current pointer is returned. 151 */ 152 PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow 153 { 154 assert(conn); 155 156 return PQsetNoticeProcessor(conn, proc, arg); 157 } 158 159 /// Get for the next result from a sendQuery. Can return null. 160 immutable(Result) getResult() 161 { 162 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 163 auto r = cast(immutable) PQgetResult(conn); 164 165 if(r) 166 { 167 auto container = new immutable ResultContainer(r); 168 return new immutable Result(container); 169 } 170 171 return null; 172 } 173 174 /// Get result from PQexec* functions or throw error if pull is empty 175 package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const 176 { 177 if(r is null) throw new ConnectionException(this, __FILE__, __LINE__); 178 179 return new immutable ResultContainer(r); 180 } 181 182 bool setSingleRowMode() 183 { 184 return PQsetSingleRowMode(conn) == 1; 185 } 186 187 void cancel() 188 { 189 auto c = new Cancellation(this); 190 c.doCancel; 191 } 192 193 bool isBusy() nothrow 194 { 195 assert(conn); 196 197 return PQisBusy(conn) == 1; 198 } 199 200 string parameterStatus(string paramName) 201 { 202 assert(conn); 203 204 auto res = PQparameterStatus(conn, cast(char*) toStringz(paramName)); //TODO: need report to derelict pq 205 206 if(res is null) 207 throw new ConnectionException(this, __FILE__, __LINE__); 208 209 return to!string(fromStringz(res)); 210 } 211 212 string escapeLiteral(string msg) 213 { 214 assert(conn); 215 216 auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length); 217 218 if(buf is null) 219 throw new ConnectionException(this, __FILE__, __LINE__); 220 221 string res = buf.fromStringz.to!string; 222 223 PQfreemem(buf); 224 225 return res; 226 } 227 228 string escapeIdentifier(string msg) 229 { 230 assert(conn); 231 232 auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length); 233 234 if(buf is null) 235 throw new ConnectionException(this, __FILE__, __LINE__); 236 237 string res = buf.fromStringz.to!string; 238 239 PQfreemem(buf); 240 241 return res; 242 } 243 244 string host() const nothrow 245 { 246 assert(conn); 247 248 return to!string(PQhost(cast(PGconn*) conn).fromStringz); //TODO: need report to derelict pq 249 } 250 251 void trace(ref File stream) 252 { 253 PQtrace(conn, stream.getFP); 254 } 255 256 void untrace() 257 { 258 PQuntrace(conn); 259 } 260 261 void setClientEncoding(string encoding) 262 { 263 if(PQsetClientEncoding(conn, cast(char*) encoding.toStringz) != 0) //TODO: need report to derelict pq 264 throw new ConnectionException(this, __FILE__, __LINE__); 265 } 266 } 267 268 void connStringCheck(string connString) 269 { 270 char* errmsg = null; 271 PQconninfoOption* r = PQconninfoParse(cast(char*) connString.toStringz, &errmsg); //TODO: need report to derelict pq 272 273 if(r is null) 274 { 275 enforceEx!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data"); 276 } 277 else 278 { 279 PQconninfoFree(r); 280 } 281 282 if(errmsg !is null) 283 { 284 string s = errmsg.fromStringz.to!string; 285 PQfreemem(cast(void*) errmsg); 286 287 throw new ConnectionException(s, __FILE__, __LINE__); 288 } 289 } 290 291 unittest 292 { 293 connStringCheck("dbname=postgres user=postgres"); 294 295 { 296 bool raised = false; 297 298 try 299 connStringCheck("wrong conninfo string"); 300 catch(ConnectionException e) 301 raised = true; 302 303 assert(raised); 304 } 305 } 306 307 /// Doing canceling queries in progress 308 class Cancellation 309 { 310 private PGcancel* cancel; 311 312 this(Connection c) 313 { 314 cancel = PQgetCancel(c.conn); 315 316 if(cancel is null) 317 throw new ConnectionException(c, __FILE__, __LINE__); 318 } 319 320 ~this() 321 { 322 PQfreeCancel(cancel); 323 } 324 325 void doCancel() 326 { 327 char[256] errbuf; 328 auto res = PQcancel(cancel, errbuf.ptr, errbuf.length); 329 330 if(res != 1) 331 throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__); 332 } 333 } 334 335 class CancellationException : Dpq2Exception 336 { 337 this(string msg, string file, size_t line) 338 { 339 super(msg, file, line); 340 } 341 } 342 343 /// Connection exception 344 class ConnectionException : Dpq2Exception 345 { 346 this(in Connection c, string file, size_t line) 347 { 348 super(c.errorMessage(), file, line); 349 } 350 351 this(string msg, string file, size_t line) 352 { 353 super(msg, file, line); 354 } 355 } 356 357 void _integration_test( string connParam ) 358 { 359 assert( PQlibVersion() >= 9_0100 ); 360 361 { 362 auto c = new Connection(connParam); 363 364 destroy(c); 365 } 366 367 { 368 bool exceptionFlag = false; 369 370 try 371 auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!"); 372 catch(ConnectionException e) 373 { 374 exceptionFlag = true; 375 assert(e.msg.length > 40); // error message check 376 } 377 finally 378 assert(exceptionFlag); 379 } 380 381 { 382 auto c = new Connection(connParam); 383 384 assert(c.escapeLiteral("abc'def") == "'abc''def'"); 385 assert(c.escapeIdentifier("abc'def") == "\"abc'def\""); 386 387 c.setClientEncoding("WIN866"); 388 assert(c.exec("show client_encoding")[0][0].as!string == "WIN866"); 389 } 390 }