1 /**
2  * Represents connection to the PostgreSQL server
3  *
4  * Most functions is correspond to those in the documentation of Postgres:
5  * $(HTTPS https://www.postgresql.org/docs/current/static/libpq.html)
6  */
7 module dpq2.connection;
8 
9 import dpq2.query;
10 import dpq2.args: QueryParams;
11 import dpq2.result;
12 import dpq2.exception;
13 
14 import derelict.pq.pq;
15 import std.conv: to;
16 import std.string: toStringz, fromStringz;
17 import std.exception: enforce;
18 import std.range;
19 import std.stdio: File;
20 import std.socket;
21 import core.exception;
22 import core.time: Duration;
23 
24 /*
25  * Bugs: On Unix connection is not thread safe.
26  *
27  * On Unix, forking a process with open libpq connections can lead
28  * to unpredictable results because the parent and child processes share
29  * the same sockets and operating system resources. For this reason,
30  * such usage is not recommended, though doing an exec from the child
31  * process to load a new executable is safe.
32 
33 
34 
35 int PQisthreadsafe();
36 Returns 1 if the libpq is thread-safe and 0 if it is not.
37 */
38 
39 /// dumb flag for Connection ctor parametrization
40 struct ConnectionStart {};
41 
42 /// Connection
43 class Connection
44 {
45     package PGconn* conn;
46 
47     invariant
48     {
49         assert(conn !is null);
50     }
51 
52     /// Makes a new connection to the database server
53     this(string connString)
54     {
55         conn = PQconnectdb(toStringz(connString));
56         checkCreatedConnection();
57     }
58 
59     /// ditto
60     this(in string[string] keyValueParams)
61     {
62         auto a = keyValueParams.keyValToPQparamsArrays;
63 
64         conn = PQconnectdbParams(&a.keys[0], &a.vals[0], 0);
65         checkCreatedConnection();
66     }
67 
68 	/// Starts creation of a connection to the database server in a nonblocking manner
69     this(ConnectionStart, string connString)
70     {
71         conn = PQconnectStart(toStringz(connString));
72         checkCreatedConnection();
73     }
74 
75 	/// ditto
76     this(ConnectionStart, in string[string] keyValueParams)
77     {
78         auto a = keyValueParams.keyValToPQparamsArrays;
79 
80         conn = PQconnectStartParams(&a.keys[0], &a.vals[0], 0);
81         checkCreatedConnection();
82     }
83 
84     private void checkCreatedConnection()
85     {
86         enforce!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
87 
88         if( status == CONNECTION_BAD )
89             throw new ConnectionException(this, __FILE__, __LINE__);
90     }
91 
92     ~this()
93     {
94         PQfinish( conn );
95     }
96 
97     mixin Queries;
98 
99     /// Returns the blocking status of the database connection
100     bool isNonBlocking()
101     {
102         return PQisnonblocking(conn) == 1;
103     }
104 
105     /// Sets the nonblocking status of the connection
106     private void setNonBlocking(bool state)
107     {
108         if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 )
109             throw new ConnectionException(this, __FILE__, __LINE__);
110     }
111 
112     /// Begin reset the communication channel to the server, in a nonblocking manner
113     ///
114     /// Useful only for non-blocking operations.
115     void resetStart()
116     {
117         if(PQresetStart(conn) == 0)
118             throw new ConnectionException(this, __FILE__, __LINE__);
119     }
120 
121     /// Useful only for non-blocking operations.
122     PostgresPollingStatusType poll() nothrow
123     {
124         assert(conn);
125 
126         return PQconnectPoll(conn);
127     }
128 
129     /// Useful only for non-blocking operations.
130     PostgresPollingStatusType resetPoll() nothrow
131     {
132         assert(conn);
133 
134         return PQresetPoll(conn);
135     }
136 
137     /// Returns the status of the connection
138     ConnStatusType status() nothrow
139     {
140         return PQstatus(conn);
141     }
142 
143     /**
144         Returns the current in-transaction status of the server.
145         The status can be:
146             * PQTRANS_IDLE    - currently idle
147             * PQTRANS_ACTIVE  - a command is in progress (reported only when a query has been sent to the server and not yet completed)
148             * PQTRANS_INTRANS - idle, in a valid transaction block
149             * PQTRANS_INERROR - idle, in a failed transaction block
150             * PQTRANS_UNKNOWN - reported if the connection is bad
151      */
152     PGTransactionStatusType transactionStatus() nothrow
153     {
154         return PQtransactionStatus(conn);
155     }
156 
157     /// If input is available from the server, consume it
158     ///
159     /// Useful only for non-blocking operations.
160     void consumeInput()
161     {
162         assert(conn);
163 
164         const size_t r = PQconsumeInput( conn );
165         if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__);
166     }
167 
168     package bool flush()
169     {
170         assert(conn);
171 
172         auto r = PQflush(conn);
173         if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__);
174         return r == 0;
175     }
176 
177     /// Obtains the file descriptor number of the connection socket to the server
178     int posixSocket()
179     {
180         int r = PQsocket(conn);
181 
182         if(r == -1)
183             throw new ConnectionException(this, __FILE__, __LINE__);
184 
185         return r;
186     }
187 
188     /// Obtains duplicate file descriptor number of the connection socket to the server
189     socket_t posixSocketDuplicate()
190     {
191         version(Windows)
192         {
193             assert(false, "FIXME: implement socket duplication");
194         }
195         else // Posix OS
196         {
197             import core.sys.posix.unistd: dup;
198 
199             return cast(socket_t) dup(cast(socket_t) posixSocket);
200         }
201     }
202 
203     /// Obtains std.socket.Socket of the connection to the server
204     ///
205     /// Due to a limitation of Socket actually for the Socket creation
206     /// duplicate of internal posix socket will be used.
207     Socket socket()
208     {
209         return new Socket(posixSocketDuplicate, AddressFamily.UNSPEC);
210     }
211 
212     /// Returns the error message most recently generated by an operation on the connection
213     string errorMessage() const nothrow
214     {
215         return PQerrorMessage(conn).to!string;
216     }
217 
218     /**
219      * Sets or examines the current notice processor
220      *
221      * Returns the previous notice receiver or processor function pointer, and sets the new value.
222      * If you supply a null function pointer, no action is taken, but the current pointer is returned.
223      */
224     PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow
225     {
226         assert(conn);
227 
228         return PQsetNoticeProcessor(conn, proc, arg);
229     }
230 
231     /// Get next result after sending a non-blocking commands. Can return null.
232     ///
233     /// Useful only for non-blocking operations.
234     immutable(Result) getResult()
235     {
236         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
237         auto r = cast(immutable) PQgetResult(conn);
238 
239         if(r)
240         {
241             auto container = new immutable ResultContainer(r);
242             return new immutable Result(container);
243         }
244 
245         return null;
246     }
247 
248     /// Get result after PQexec* functions or throw exception if pull is empty
249     package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const
250     {
251         if(r is null) throw new ConnectionException(this, __FILE__, __LINE__);
252 
253         return new immutable ResultContainer(r);
254     }
255 
256     /// Select single-row mode for the currently-executing query
257     bool setSingleRowMode()
258     {
259         return PQsetSingleRowMode(conn) == 1;
260     }
261 
262     /**
263      Try to cancel query
264 
265      If the cancellation is effective, the current command will
266      terminate early and return an error result or exception. If the
267      cancellation will fails (say, because the server was already done
268      processing the command) there will be no visible result at all.
269     */
270     void cancel()
271     {
272         auto c = new Cancellation(this);
273         c.doCancel;
274     }
275 
276     ///
277     bool isBusy() nothrow
278     {
279         assert(conn);
280 
281         return PQisBusy(conn) == 1;
282     }
283 
284     ///
285     string parameterStatus(string paramName)
286     {
287         assert(conn);
288 
289         auto res = PQparameterStatus(conn, toStringz(paramName));
290 
291         if(res is null)
292             throw new ConnectionException(this, __FILE__, __LINE__);
293 
294         return to!string(fromStringz(res));
295     }
296 
297     ///
298     string escapeLiteral(string msg)
299     {
300         assert(conn);
301 
302         auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length);
303 
304         if(buf is null)
305             throw new ConnectionException(this, __FILE__, __LINE__);
306 
307         string res = buf.fromStringz.to!string;
308 
309         PQfreemem(buf);
310 
311         return res;
312     }
313 
314     ///
315     string escapeIdentifier(string msg)
316     {
317         assert(conn);
318 
319         auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length);
320 
321         if(buf is null)
322             throw new ConnectionException(this, __FILE__, __LINE__);
323 
324         string res = buf.fromStringz.to!string;
325 
326         PQfreemem(buf);
327 
328         return res;
329     }
330 
331     ///
332     string dbName() const nothrow
333     {
334         assert(conn);
335 
336         return PQdb(conn).fromStringz.to!string;
337     }
338 
339     ///
340     string host() const nothrow
341     {
342         assert(conn);
343 
344         return PQhost(conn).fromStringz.to!string;
345     }
346 
347     ///
348     int protocolVersion() const nothrow
349     {
350         assert(conn);
351 
352         return PQprotocolVersion(conn);
353     }
354 
355     ///
356     int serverVersion() const nothrow
357     {
358         assert(conn);
359 
360         return PQserverVersion(conn);
361     }
362 
363     ///
364     void trace(ref File stream)
365     {
366         PQtrace(conn, stream.getFP);
367     }
368 
369     ///
370     void untrace()
371     {
372         PQuntrace(conn);
373     }
374 
375     ///
376     void setClientEncoding(string encoding)
377     {
378         if(PQsetClientEncoding(conn, encoding.toStringz) != 0)
379             throw new ConnectionException(this, __FILE__, __LINE__);
380     }
381 }
382 
383 private auto keyValToPQparamsArrays(in string[string] keyValueParams)
384 {
385     static struct PQparamsArrays
386     {
387         immutable(char)*[] keys;
388         immutable(char)*[] vals;
389     }
390 
391     PQparamsArrays a;
392     a.keys.length = keyValueParams.length + 1;
393     a.vals.length = keyValueParams.length + 1;
394 
395     size_t i;
396     foreach(e; keyValueParams.byKeyValue)
397     {
398         a.keys[i] = e.key.toStringz;
399         a.vals[i] = e.value.toStringz;
400 
401         i++;
402     }
403 
404     assert(i == keyValueParams.length);
405 
406     return a;
407 }
408 
409 /// Check connection options in the provided connection string
410 ///
411 /// Throws exception if connection string isn't passes check.
412 void connStringCheck(string connString)
413 {
414     char* errmsg = null;
415     PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg);
416 
417     if(r is null)
418     {
419         enforce!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data");
420     }
421     else
422     {
423         PQconninfoFree(r);
424     }
425 
426     if(errmsg !is null)
427     {
428         string s = errmsg.fromStringz.to!string;
429         PQfreemem(cast(void*) errmsg);
430 
431         throw new ConnectionException(s, __FILE__, __LINE__);
432     }
433 }
434 
435 unittest
436 {
437     connStringCheck("dbname=postgres user=postgres");
438 
439     {
440         bool raised = false;
441 
442         try
443             connStringCheck("wrong conninfo string");
444         catch(ConnectionException e)
445             raised = true;
446 
447         assert(raised);
448     }
449 }
450 
451 /// Represents query cancellation process
452 class Cancellation
453 {
454     private PGcancel* cancel;
455 
456     ///
457     this(Connection c)
458     {
459         cancel = PQgetCancel(c.conn);
460 
461         if(cancel is null)
462             throw new ConnectionException(c, __FILE__, __LINE__);
463     }
464 
465     ///
466     ~this()
467     {
468         PQfreeCancel(cancel);
469     }
470 
471     /**
472      Requests that the server abandon processing of the current command
473 
474      Throws exception if cancel request was not successfully dispatched.
475 
476      Successful dispatch is no guarantee that the request will have any
477      effect, however. If the cancellation is effective, the current
478      command will terminate early and return an error result
479      (exception). If the cancellation fails (say, because the server
480      was already done processing the command), then there will be no
481      visible result at all.
482     */
483     void doCancel()
484     {
485         char[256] errbuf;
486         auto res = PQcancel(cancel, errbuf.ptr, errbuf.length);
487 
488         if(res != 1)
489             throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__);
490     }
491 }
492 
493 ///
494 class CancellationException : Dpq2Exception
495 {
496     this(string msg, string file = __FILE__, size_t line = __LINE__)
497     {
498         super(msg, file, line);
499     }
500 }
501 
502 /// Connection exception
503 class ConnectionException : Dpq2Exception
504 {
505     this(in Connection c, string file = __FILE__, size_t line = __LINE__)
506     {
507         super(c.errorMessage(), file, line);
508     }
509 
510     this(string msg, string file = __FILE__, size_t line = __LINE__)
511     {
512         super(msg, file, line);
513     }
514 }
515 
516 version (integration_tests)
517 void _integration_test( string connParam )
518 {
519     assert( PQlibVersion() >= 9_0100 );
520 
521     {
522         debug import std.experimental.logger;
523 
524         auto c = new Connection(connParam);
525         auto dbname = c.dbName();
526         auto pver = c.protocolVersion();
527         auto sver = c.serverVersion();
528 
529         debug
530         {
531             trace("DB name: ", dbname);
532             trace("Protocol version: ", pver);
533             trace("Server version: ", sver);
534         }
535 
536         destroy(c);
537     }
538 
539     {
540         bool exceptionFlag = false;
541 
542         try
543             auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!");
544         catch(ConnectionException e)
545         {
546             exceptionFlag = true;
547             assert(e.msg.length > 40); // error message check
548         }
549         finally
550             assert(exceptionFlag);
551     }
552 
553     {
554         auto c = new Connection(connParam);
555 
556         assert(c.escapeLiteral("abc'def") == "'abc''def'");
557         assert(c.escapeIdentifier("abc'def") == "\"abc'def\"");
558 
559         c.setClientEncoding("WIN866");
560         assert(c.exec("show client_encoding")[0][0].as!string == "WIN866");
561     }
562 
563     {
564         auto c = new Connection(connParam);
565 
566         assert(c.transactionStatus == PQTRANS_IDLE);
567 
568         c.exec("BEGIN");
569         assert(c.transactionStatus == PQTRANS_INTRANS);
570 
571         try c.exec("DISCARD ALL");
572         catch (Exception) {}
573         assert(c.transactionStatus == PQTRANS_INERROR);
574 
575         c.exec("ROLLBACK");
576         assert(c.transactionStatus == PQTRANS_IDLE);
577     }
578 
579     {
580         import std.exception: assertThrown;
581 
582         string[string] kv;
583         kv["host"] = "wrong-host";
584         kv["dbname"] = "wrong-db-name";
585 
586         assertThrown!ConnectionException(new Connection(kv));
587         assertThrown!ConnectionException(new Connection(ConnectionStart(), kv));
588     }
589 }