1 /**
2  * Represents connection to the PostgreSQL server
3  *
4  * Most functions is correspond to those in the documentation of Postgres:
5  * $(HTTPS https://www.postgresql.org/docs/current/static/libpq.html)
6  */
7 module dpq2.connection;
8 
9 import dpq2.query;
10 import dpq2.args: QueryParams;
11 import dpq2.result;
12 import dpq2.exception;
13 
14 import derelict.pq.pq;
15 import std.conv: to;
16 import std.string: toStringz, fromStringz;
17 import std.exception: enforce;
18 import std.range;
19 import std.stdio: File;
20 import std.socket;
21 import core.exception;
22 import core.time: Duration;
23 
24 /*
25  * Bugs: On Unix connection is not thread safe.
26  *
27  * On Unix, forking a process with open libpq connections can lead
28  * to unpredictable results because the parent and child processes share
29  * the same sockets and operating system resources. For this reason,
30  * such usage is not recommended, though doing an exec from the child
31  * process to load a new executable is safe.
32 
33 
34 
35 int PQisthreadsafe();
36 Returns 1 if the libpq is thread-safe and 0 if it is not.
37 */
38 
39 /// dumb flag for Connection ctor parametrization
40 struct ConnectionStart {};
41 
42 /// Connection
43 version(DerelictPQ_Static)
44 class Connection
45 {
46     mixin ConnectionMethods;
47 }
48 else
49 package class Connection
50 {
51     import dpq2.dynloader: ReferenceCounter;
52 
53     private immutable ReferenceCounter dynLoaderRefCnt;
54 
55     mixin ConnectionMethods;
56 }
57 
58 private mixin template ConnectionMethods()
59 {
60     package PGconn* conn;
61 
62     invariant
63     {
64         assert(conn !is null);
65     }
66 
67     /// Makes a new connection to the database server
68     this(string connString)
69     {
70         conn = PQconnectdb(toStringz(connString));
71         version(DerelictPQ_Dynamic) dynLoaderRefCnt = ReferenceCounter(true);
72         checkCreatedConnection();
73     }
74 
75     /// ditto
76     this(in string[string] keyValueParams)
77     {
78         auto a = keyValueParams.keyValToPQparamsArrays;
79 
80         conn = PQconnectdbParams(&a.keys[0], &a.vals[0], 0);
81         version(DerelictPQ_Dynamic) dynLoaderRefCnt = ReferenceCounter(true);
82         checkCreatedConnection();
83     }
84 
85 	/// Starts creation of a connection to the database server in a nonblocking manner
86     this(ConnectionStart, string connString)
87     {
88         conn = PQconnectStart(toStringz(connString));
89         version(DerelictPQ_Dynamic) dynLoaderRefCnt = ReferenceCounter(true);
90         checkCreatedConnection();
91     }
92 
93 	/// ditto
94     this(ConnectionStart, in string[string] keyValueParams)
95     {
96         auto a = keyValueParams.keyValToPQparamsArrays;
97 
98         conn = PQconnectStartParams(&a.keys[0], &a.vals[0], 0);
99         version(DerelictPQ_Dynamic) dynLoaderRefCnt = ReferenceCounter(true);
100         checkCreatedConnection();
101     }
102 
103     private void checkCreatedConnection()
104     {
105         enforce!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
106 
107         if( status == CONNECTION_BAD )
108             throw new ConnectionException(this, __FILE__, __LINE__);
109     }
110 
111     ~this()
112     {
113         PQfinish( conn );
114 
115         version(DerelictPQ_Dynamic) dynLoaderRefCnt.__custom_dtor();
116     }
117 
118     mixin Queries;
119 
120     /// Returns the blocking status of the database connection
121     bool isNonBlocking()
122     {
123         return PQisnonblocking(conn) == 1;
124     }
125 
126     /// Sets the nonblocking status of the connection
127     private void setNonBlocking(bool state)
128     {
129         if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 )
130             throw new ConnectionException(this, __FILE__, __LINE__);
131     }
132 
133     /// Begin reset the communication channel to the server, in a nonblocking manner
134     ///
135     /// Useful only for non-blocking operations.
136     void resetStart()
137     {
138         if(PQresetStart(conn) == 0)
139             throw new ConnectionException(this, __FILE__, __LINE__);
140     }
141 
142     /// Useful only for non-blocking operations.
143     PostgresPollingStatusType poll() nothrow
144     {
145         assert(conn);
146 
147         return PQconnectPoll(conn);
148     }
149 
150     /// Useful only for non-blocking operations.
151     PostgresPollingStatusType resetPoll() nothrow
152     {
153         assert(conn);
154 
155         return PQresetPoll(conn);
156     }
157 
158     /// Returns the status of the connection
159     ConnStatusType status() nothrow
160     {
161         return PQstatus(conn);
162     }
163 
164     /**
165         Returns the current in-transaction status of the server.
166         The status can be:
167             * PQTRANS_IDLE    - currently idle
168             * PQTRANS_ACTIVE  - a command is in progress (reported only when a query has been sent to the server and not yet completed)
169             * PQTRANS_INTRANS - idle, in a valid transaction block
170             * PQTRANS_INERROR - idle, in a failed transaction block
171             * PQTRANS_UNKNOWN - reported if the connection is bad
172      */
173     PGTransactionStatusType transactionStatus() nothrow
174     {
175         return PQtransactionStatus(conn);
176     }
177 
178     /// If input is available from the server, consume it
179     ///
180     /// Useful only for non-blocking operations.
181     void consumeInput()
182     {
183         assert(conn);
184 
185         const size_t r = PQconsumeInput( conn );
186         if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__);
187     }
188 
189     package bool flush()
190     {
191         assert(conn);
192 
193         auto r = PQflush(conn);
194         if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__);
195         return r == 0;
196     }
197 
198     /// Obtains the file descriptor number of the connection socket to the server
199     int posixSocket()
200     {
201         int r = PQsocket(conn);
202 
203         if(r == -1)
204             throw new ConnectionException(this, __FILE__, __LINE__);
205 
206         return r;
207     }
208 
209     /// Obtains duplicate file descriptor number of the connection socket to the server
210     socket_t posixSocketDuplicate()
211     {
212         version(Windows)
213         {
214             assert(false, "FIXME: implement socket duplication");
215         }
216         else // Posix OS
217         {
218             import core.sys.posix.unistd: dup;
219 
220             return cast(socket_t) dup(cast(socket_t) posixSocket);
221         }
222     }
223 
224     /// Obtains std.socket.Socket of the connection to the server
225     ///
226     /// Due to a limitation of Socket actually for the Socket creation
227     /// duplicate of internal posix socket will be used.
228     Socket socket()
229     {
230         return new Socket(posixSocketDuplicate, AddressFamily.UNSPEC);
231     }
232 
233     /// Returns the error message most recently generated by an operation on the connection
234     string errorMessage() const nothrow
235     {
236         return PQerrorMessage(conn).to!string;
237     }
238 
239     /**
240      * Sets or examines the current notice processor
241      *
242      * Returns the previous notice receiver or processor function pointer, and sets the new value.
243      * If you supply a null function pointer, no action is taken, but the current pointer is returned.
244      */
245     PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow
246     {
247         assert(conn);
248 
249         return PQsetNoticeProcessor(conn, proc, arg);
250     }
251 
252     /// Get next result after sending a non-blocking commands. Can return null.
253     ///
254     /// Useful only for non-blocking operations.
255     immutable(Result) getResult()
256     {
257         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
258         auto r = cast(immutable) PQgetResult(conn);
259 
260         if(r)
261         {
262             auto container = new immutable ResultContainer(r);
263             return new immutable Result(container);
264         }
265 
266         return null;
267     }
268 
269     /// Get result after PQexec* functions or throw exception if pull is empty
270     package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const
271     {
272         if(r is null) throw new ConnectionException(this, __FILE__, __LINE__);
273 
274         return new immutable ResultContainer(r);
275     }
276 
277     /// Select single-row mode for the currently-executing query
278     bool setSingleRowMode()
279     {
280         return PQsetSingleRowMode(conn) == 1;
281     }
282 
283     /**
284      Try to cancel query
285 
286      If the cancellation is effective, the current command will
287      terminate early and return an error result or exception. If the
288      cancellation will fails (say, because the server was already done
289      processing the command) there will be no visible result at all.
290     */
291     void cancel()
292     {
293         auto c = new Cancellation(this);
294         c.doCancel;
295     }
296 
297     ///
298     bool isBusy() nothrow
299     {
300         assert(conn);
301 
302         return PQisBusy(conn) == 1;
303     }
304 
305     ///
306     string parameterStatus(string paramName)
307     {
308         assert(conn);
309 
310         auto res = PQparameterStatus(conn, toStringz(paramName));
311 
312         if(res is null)
313             throw new ConnectionException(this, __FILE__, __LINE__);
314 
315         return to!string(fromStringz(res));
316     }
317 
318     ///
319     string escapeLiteral(string msg)
320     {
321         assert(conn);
322 
323         auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length);
324 
325         if(buf is null)
326             throw new ConnectionException(this, __FILE__, __LINE__);
327 
328         string res = buf.fromStringz.to!string;
329 
330         PQfreemem(buf);
331 
332         return res;
333     }
334 
335     ///
336     string escapeIdentifier(string msg)
337     {
338         assert(conn);
339 
340         auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length);
341 
342         if(buf is null)
343             throw new ConnectionException(this, __FILE__, __LINE__);
344 
345         string res = buf.fromStringz.to!string;
346 
347         PQfreemem(buf);
348 
349         return res;
350     }
351 
352     ///
353     string dbName() const nothrow
354     {
355         assert(conn);
356 
357         return PQdb(conn).fromStringz.to!string;
358     }
359 
360     ///
361     string host() const nothrow
362     {
363         assert(conn);
364 
365         return PQhost(conn).fromStringz.to!string;
366     }
367 
368     ///
369     int protocolVersion() const nothrow
370     {
371         assert(conn);
372 
373         return PQprotocolVersion(conn);
374     }
375 
376     ///
377     int serverVersion() const nothrow
378     {
379         assert(conn);
380 
381         return PQserverVersion(conn);
382     }
383 
384     ///
385     void trace(ref File stream)
386     {
387         PQtrace(conn, stream.getFP);
388     }
389 
390     ///
391     void untrace()
392     {
393         PQuntrace(conn);
394     }
395 
396     ///
397     void setClientEncoding(string encoding)
398     {
399         if(PQsetClientEncoding(conn, encoding.toStringz) != 0)
400             throw new ConnectionException(this, __FILE__, __LINE__);
401     }
402 }
403 
404 private auto keyValToPQparamsArrays(in string[string] keyValueParams)
405 {
406     static struct PQparamsArrays
407     {
408         immutable(char)*[] keys;
409         immutable(char)*[] vals;
410     }
411 
412     PQparamsArrays a;
413     a.keys.length = keyValueParams.length + 1;
414     a.vals.length = keyValueParams.length + 1;
415 
416     size_t i;
417     foreach(e; keyValueParams.byKeyValue)
418     {
419         a.keys[i] = e.key.toStringz;
420         a.vals[i] = e.value.toStringz;
421 
422         i++;
423     }
424 
425     assert(i == keyValueParams.length);
426 
427     return a;
428 }
429 
430 /// Check connection options in the provided connection string
431 ///
432 /// Throws exception if connection string isn't passes check.
433 version(DerelictPQ_Static)
434 void connStringCheck(string connString)
435 {
436     _connStringCheck(connString);
437 }
438 
439 /// ditto
440 package void _connStringCheck(string connString)
441 {
442     char* errmsg = null;
443     PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg);
444 
445     if(r is null)
446     {
447         enforce!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data");
448     }
449     else
450     {
451         PQconninfoFree(r);
452     }
453 
454     if(errmsg !is null)
455     {
456         string s = errmsg.fromStringz.to!string;
457         PQfreemem(cast(void*) errmsg);
458 
459         throw new ConnectionException(s, __FILE__, __LINE__);
460     }
461 }
462 
463 unittest
464 {
465     _connStringCheck("dbname=postgres user=postgres");
466 
467     {
468         bool raised = false;
469 
470         try
471             connStringCheck("wrong conninfo string");
472         catch(ConnectionException e)
473             raised = true;
474 
475         assert(raised);
476     }
477 }
478 
479 /// Represents query cancellation process
480 class Cancellation
481 {
482     version(DerelictPQ_Dynamic)
483     {
484         import dpq2.dynloader: ReferenceCounter;
485         private immutable ReferenceCounter dynLoaderRefCnt;
486     }
487 
488     private PGcancel* cancel;
489 
490     ///
491     this(Connection c)
492     {
493         version(DerelictPQ_Dynamic) dynLoaderRefCnt = ReferenceCounter(true);
494 
495         cancel = PQgetCancel(c.conn);
496 
497         if(cancel is null)
498             throw new ConnectionException(c, __FILE__, __LINE__);
499     }
500 
501     ///
502     ~this()
503     {
504         PQfreeCancel(cancel);
505 
506         version(DerelictPQ_Dynamic) dynLoaderRefCnt.__custom_dtor();
507     }
508 
509     /**
510      Requests that the server abandon processing of the current command
511 
512      Throws exception if cancel request was not successfully dispatched.
513 
514      Successful dispatch is no guarantee that the request will have any
515      effect, however. If the cancellation is effective, the current
516      command will terminate early and return an error result
517      (exception). If the cancellation fails (say, because the server
518      was already done processing the command), then there will be no
519      visible result at all.
520     */
521     void doCancel()
522     {
523         char[256] errbuf;
524         auto res = PQcancel(cancel, errbuf.ptr, errbuf.length);
525 
526         if(res != 1)
527             throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__);
528     }
529 }
530 
531 ///
532 class CancellationException : Dpq2Exception
533 {
534     this(string msg, string file = __FILE__, size_t line = __LINE__)
535     {
536         super(msg, file, line);
537     }
538 }
539 
540 /// Connection exception
541 class ConnectionException : Dpq2Exception
542 {
543     this(in Connection c, string file = __FILE__, size_t line = __LINE__)
544     {
545         super(c.errorMessage(), file, line);
546     }
547 
548     this(string msg, string file = __FILE__, size_t line = __LINE__)
549     {
550         super(msg, file, line);
551     }
552 }
553 
554 version (integration_tests)
555 Connection createTestConn(T...)(T params)
556 {
557     version(DerelictPQ_Static)
558         auto c = new Connection(params);
559     else
560     {
561         import dpq2.dynloader: connFactory;
562 
563         Connection c = connFactory.createConnection(params);
564     }
565 
566     return c;
567 }
568 
569 version (integration_tests)
570 void _integration_test( string connParam )
571 {
572     {
573         debug import std.experimental.logger;
574 
575         auto c = createTestConn(connParam);
576 
577         assert( PQlibVersion() >= 9_0100 );
578 
579         auto dbname = c.dbName();
580         auto pver = c.protocolVersion();
581         auto sver = c.serverVersion();
582 
583         debug
584         {
585             trace("DB name: ", dbname);
586             trace("Protocol version: ", pver);
587             trace("Server version: ", sver);
588         }
589 
590         destroy(c);
591     }
592 
593     {
594         bool exceptionFlag = false;
595 
596         try
597             auto c = createTestConn(ConnectionStart(), "!!!some incorrect connection string!!!");
598         catch(ConnectionException e)
599         {
600             exceptionFlag = true;
601             assert(e.msg.length > 40); // error message check
602         }
603         finally
604             assert(exceptionFlag);
605     }
606 
607     {
608         auto c = createTestConn(connParam);
609 
610         assert(c.escapeLiteral("abc'def") == "'abc''def'");
611         assert(c.escapeIdentifier("abc'def") == "\"abc'def\"");
612 
613         c.setClientEncoding("WIN866");
614         assert(c.exec("show client_encoding")[0][0].as!string == "WIN866");
615     }
616 
617     {
618         auto c = createTestConn(connParam);
619 
620         assert(c.transactionStatus == PQTRANS_IDLE);
621 
622         c.exec("BEGIN");
623         assert(c.transactionStatus == PQTRANS_INTRANS);
624 
625         try c.exec("DISCARD ALL");
626         catch (Exception) {}
627         assert(c.transactionStatus == PQTRANS_INERROR);
628 
629         c.exec("ROLLBACK");
630         assert(c.transactionStatus == PQTRANS_IDLE);
631     }
632 
633     {
634         import std.exception: assertThrown;
635 
636         string[string] kv;
637         kv["host"] = "wrong-host";
638         kv["dbname"] = "wrong-db-name";
639 
640         assertThrown!ConnectionException(createTestConn(kv));
641         assertThrown!ConnectionException(createTestConn(ConnectionStart(), kv));
642     }
643 }