1 /**
2  * Represents connection to the PostgreSQL server
3  *
4  * Most functions is correspond to those in the documentation of Postgres:
5  * $(HTTPS https://www.postgresql.org/docs/current/static/libpq.html)
6  */
7 module dpq2.connection;
8 
9 import dpq2.query;
10 import dpq2.args: QueryParams;
11 import dpq2.result;
12 import dpq2.exception;
13 
14 import derelict.pq.pq;
15 import std.conv: to;
16 import std.string: toStringz, fromStringz;
17 import std.exception: enforce;
18 import std.range;
19 import std.stdio: File;
20 import std.socket;
21 import core.exception;
22 import core.time: Duration;
23 
24 /*
25  * Bugs: On Unix connection is not thread safe.
26  *
27  * On Unix, forking a process with open libpq connections can lead
28  * to unpredictable results because the parent and child processes share
29  * the same sockets and operating system resources. For this reason,
30  * such usage is not recommended, though doing an exec from the child
31  * process to load a new executable is safe.
32 
33 
34 
35 int PQisthreadsafe();
36 Returns 1 if the libpq is thread-safe and 0 if it is not.
37 */
38 
39 /// dumb flag for Connection ctor parametrization
40 struct ConnectionStart {};
41 
42 version(DerelictPQ_Static)
43 {
44     /// Connection for static version of libpq
45     class Connection
46     {
47         ///
48         this(T...)(T args)
49         {
50             ctor(args);
51         }
52 
53         mixin ConnectionMethods;
54     }
55 }
56 else
57 {
58     /// Connection for dynamic version of libpq
59     package class Connection
60     {
61         import dpq2.dynloader: DynamicLoader;
62 
63         package immutable DynamicLoader dynamicLoader;
64 
65         ///
66         this(T...)(immutable DynamicLoader dl, T args)
67         {
68             dynamicLoader = dl;
69 
70             ctor(args);
71         }
72 
73         mixin ConnectionMethods;
74     }
75 }
76 
77 private mixin template ConnectionMethods()
78 {
79     package PGconn* conn;
80 
81     invariant
82     {
83         version(DerelictPQ_Dynamic)
84             assert(dynamicLoader !is null);
85 
86         assert(conn !is null);
87     }
88 
89     /// Makes a new connection to the database server
90     private void ctor(string connString)
91     {
92         conn = PQconnectdb(toStringz(connString));
93         checkCreatedConnection();
94     }
95 
96     /// ditto
97     private void ctor(in string[string] keyValueParams)
98     {
99         auto a = keyValueParams.keyValToPQparamsArrays;
100 
101         conn = PQconnectdbParams(&a.keys[0], &a.vals[0], 0);
102         checkCreatedConnection();
103     }
104 
105 	/// Starts creation of a connection to the database server in a nonblocking manner
106     private void ctor(ConnectionStart, string connString)
107     {
108         conn = PQconnectStart(toStringz(connString));
109         checkCreatedConnection();
110     }
111 
112 	/// ditto
113     private void ctor(ConnectionStart, in string[string] keyValueParams)
114     {
115         auto a = keyValueParams.keyValToPQparamsArrays;
116 
117         conn = PQconnectStartParams(&a.keys[0], &a.vals[0], 0);
118         checkCreatedConnection();
119     }
120 
121     private void checkCreatedConnection()
122     {
123         enforce!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
124 
125         if( status == CONNECTION_BAD )
126             throw new ConnectionException(this, __FILE__, __LINE__);
127     }
128 
129     ~this()
130     {
131         PQfinish( conn );
132     }
133 
134     mixin Queries;
135 
136     /// Returns the blocking status of the database connection
137     bool isNonBlocking()
138     {
139         return PQisnonblocking(conn) == 1;
140     }
141 
142     /// Sets the nonblocking status of the connection
143     private void setNonBlocking(bool state)
144     {
145         if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 )
146             throw new ConnectionException(this, __FILE__, __LINE__);
147     }
148 
149     /// Begin reset the communication channel to the server, in a nonblocking manner
150     ///
151     /// Useful only for non-blocking operations.
152     void resetStart()
153     {
154         if(PQresetStart(conn) == 0)
155             throw new ConnectionException(this, __FILE__, __LINE__);
156     }
157 
158     /// Useful only for non-blocking operations.
159     PostgresPollingStatusType poll() nothrow
160     {
161         assert(conn);
162 
163         return PQconnectPoll(conn);
164     }
165 
166     /// Useful only for non-blocking operations.
167     PostgresPollingStatusType resetPoll() nothrow
168     {
169         assert(conn);
170 
171         return PQresetPoll(conn);
172     }
173 
174     /// Returns the status of the connection
175     ConnStatusType status() nothrow
176     {
177         return PQstatus(conn);
178     }
179 
180     /**
181         Returns the current in-transaction status of the server.
182         The status can be:
183             * PQTRANS_IDLE    - currently idle
184             * PQTRANS_ACTIVE  - a command is in progress (reported only when a query has been sent to the server and not yet completed)
185             * PQTRANS_INTRANS - idle, in a valid transaction block
186             * PQTRANS_INERROR - idle, in a failed transaction block
187             * PQTRANS_UNKNOWN - reported if the connection is bad
188      */
189     PGTransactionStatusType transactionStatus() nothrow
190     {
191         return PQtransactionStatus(conn);
192     }
193 
194     /// If input is available from the server, consume it
195     ///
196     /// Useful only for non-blocking operations.
197     void consumeInput()
198     {
199         assert(conn);
200 
201         const size_t r = PQconsumeInput( conn );
202         if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__);
203     }
204 
205     package bool flush()
206     {
207         assert(conn);
208 
209         auto r = PQflush(conn);
210         if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__);
211         return r == 0;
212     }
213 
214     /// Obtains the file descriptor number of the connection socket to the server
215     int posixSocket()
216     {
217         int r = PQsocket(conn);
218 
219         if(r == -1)
220             throw new ConnectionException(this, __FILE__, __LINE__);
221 
222         return r;
223     }
224 
225     /// Obtains duplicate file descriptor number of the connection socket to the server
226     socket_t posixSocketDuplicate()
227     {
228         version(Windows)
229         {
230             assert(false, "FIXME: implement socket duplication");
231         }
232         else // Posix OS
233         {
234             import core.sys.posix.unistd: dup;
235 
236             return cast(socket_t) dup(cast(socket_t) posixSocket);
237         }
238     }
239 
240     /// Obtains std.socket.Socket of the connection to the server
241     ///
242     /// Due to a limitation of Socket actually for the Socket creation
243     /// duplicate of internal posix socket will be used.
244     Socket socket()
245     {
246         return new Socket(posixSocketDuplicate, AddressFamily.UNSPEC);
247     }
248 
249     /// Returns the error message most recently generated by an operation on the connection
250     string errorMessage() const nothrow
251     {
252         return PQerrorMessage(conn).to!string;
253     }
254 
255     /**
256      * Sets or examines the current notice processor
257      *
258      * Returns the previous notice receiver or processor function pointer, and sets the new value.
259      * If you supply a null function pointer, no action is taken, but the current pointer is returned.
260      */
261     PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow
262     {
263         assert(conn);
264 
265         return PQsetNoticeProcessor(conn, proc, arg);
266     }
267 
268     /// Get next result after sending a non-blocking commands. Can return null.
269     ///
270     /// Useful only for non-blocking operations.
271     immutable(Result) getResult()
272     {
273         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
274         auto r = cast(immutable) PQgetResult(conn);
275 
276         if(r)
277         {
278             auto container = new immutable ResultContainer(dynamicLoader, r);
279             return new immutable Result(container);
280         }
281 
282         return null;
283     }
284 
285     /// Get result after PQexec* functions or throw exception if pull is empty
286     package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const
287     {
288         if(r is null) throw new ConnectionException(this, __FILE__, __LINE__);
289 
290         version(DerelictPQ_Static)
291             return new immutable ResultContainer(r);
292         else
293             return new immutable ResultContainer(dynamicLoader, r);
294     }
295 
296     /// Select single-row mode for the currently-executing query
297     bool setSingleRowMode()
298     {
299         return PQsetSingleRowMode(conn) == 1;
300     }
301 
302     /**
303      Try to cancel query
304 
305      If the cancellation is effective, the current command will
306      terminate early and return an error result or exception. If the
307      cancellation will fails (say, because the server was already done
308      processing the command) there will be no visible result at all.
309     */
310     void cancel()
311     {
312         auto c = new Cancellation(this);
313         c.doCancel;
314     }
315 
316     ///
317     bool isBusy() nothrow
318     {
319         assert(conn);
320 
321         return PQisBusy(conn) == 1;
322     }
323 
324     ///
325     string parameterStatus(string paramName)
326     {
327         assert(conn);
328 
329         auto res = PQparameterStatus(conn, toStringz(paramName));
330 
331         if(res is null)
332             throw new ConnectionException(this, __FILE__, __LINE__);
333 
334         return to!string(fromStringz(res));
335     }
336 
337     ///
338     string escapeLiteral(string msg)
339     {
340         assert(conn);
341 
342         auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length);
343 
344         if(buf is null)
345             throw new ConnectionException(this, __FILE__, __LINE__);
346 
347         string res = buf.fromStringz.to!string;
348 
349         PQfreemem(buf);
350 
351         return res;
352     }
353 
354     ///
355     string escapeIdentifier(string msg)
356     {
357         assert(conn);
358 
359         auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length);
360 
361         if(buf is null)
362             throw new ConnectionException(this, __FILE__, __LINE__);
363 
364         string res = buf.fromStringz.to!string;
365 
366         PQfreemem(buf);
367 
368         return res;
369     }
370 
371     ///
372     string dbName() const nothrow
373     {
374         assert(conn);
375 
376         return PQdb(conn).fromStringz.to!string;
377     }
378 
379     ///
380     string host() const nothrow
381     {
382         assert(conn);
383 
384         return PQhost(conn).fromStringz.to!string;
385     }
386 
387     ///
388     int protocolVersion() const nothrow
389     {
390         assert(conn);
391 
392         return PQprotocolVersion(conn);
393     }
394 
395     ///
396     int serverVersion() const nothrow
397     {
398         assert(conn);
399 
400         return PQserverVersion(conn);
401     }
402 
403     ///
404     void trace(ref File stream)
405     {
406         PQtrace(conn, stream.getFP);
407     }
408 
409     ///
410     void untrace()
411     {
412         PQuntrace(conn);
413     }
414 
415     ///
416     void setClientEncoding(string encoding)
417     {
418         if(PQsetClientEncoding(conn, encoding.toStringz) != 0)
419             throw new ConnectionException(this, __FILE__, __LINE__);
420     }
421 }
422 
423 private auto keyValToPQparamsArrays(in string[string] keyValueParams)
424 {
425     static struct PQparamsArrays
426     {
427         immutable(char)*[] keys;
428         immutable(char)*[] vals;
429     }
430 
431     PQparamsArrays a;
432     a.keys.length = keyValueParams.length + 1;
433     a.vals.length = keyValueParams.length + 1;
434 
435     size_t i;
436     foreach(e; keyValueParams.byKeyValue)
437     {
438         a.keys[i] = e.key.toStringz;
439         a.vals[i] = e.value.toStringz;
440 
441         i++;
442     }
443 
444     assert(i == keyValueParams.length);
445 
446     return a;
447 }
448 
449 /// Check connection options in the provided connection string
450 ///
451 /// Throws exception if connection string isn't passes check.
452 void connStringCheck(string connString)
453 {
454     char* errmsg = null;
455     PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg);
456 
457     if(r is null)
458     {
459         enforce!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data");
460     }
461     else
462     {
463         PQconninfoFree(r);
464     }
465 
466     if(errmsg !is null)
467     {
468         string s = errmsg.fromStringz.to!string;
469         PQfreemem(cast(void*) errmsg);
470 
471         throw new ConnectionException(s, __FILE__, __LINE__);
472     }
473 }
474 
475 unittest
476 {
477     connStringCheck("dbname=postgres user=postgres");
478 
479     {
480         bool raised = false;
481 
482         try
483             connStringCheck("wrong conninfo string");
484         catch(ConnectionException e)
485             raised = true;
486 
487         assert(raised);
488     }
489 }
490 
491 /// Represents query cancellation process
492 class Cancellation
493 {
494     private PGcancel* cancel;
495 
496     ///
497     this(Connection c)
498     {
499         cancel = PQgetCancel(c.conn);
500 
501         if(cancel is null)
502             throw new ConnectionException(c, __FILE__, __LINE__);
503     }
504 
505     ///
506     ~this()
507     {
508         PQfreeCancel(cancel);
509     }
510 
511     /**
512      Requests that the server abandon processing of the current command
513 
514      Throws exception if cancel request was not successfully dispatched.
515 
516      Successful dispatch is no guarantee that the request will have any
517      effect, however. If the cancellation is effective, the current
518      command will terminate early and return an error result
519      (exception). If the cancellation fails (say, because the server
520      was already done processing the command), then there will be no
521      visible result at all.
522     */
523     void doCancel()
524     {
525         char[256] errbuf;
526         auto res = PQcancel(cancel, errbuf.ptr, errbuf.length);
527 
528         if(res != 1)
529             throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__);
530     }
531 }
532 
533 ///
534 class CancellationException : Dpq2Exception
535 {
536     this(string msg, string file = __FILE__, size_t line = __LINE__)
537     {
538         super(msg, file, line);
539     }
540 }
541 
542 /// Connection exception
543 class ConnectionException : Dpq2Exception
544 {
545     this(in Connection c, string file = __FILE__, size_t line = __LINE__)
546     {
547         super(c.errorMessage(), file, line);
548     }
549 
550     this(string msg, string file = __FILE__, size_t line = __LINE__)
551     {
552         super(msg, file, line);
553     }
554 }
555 
556 version (integration_tests)
557 Connection createTestConn(T...)(T params)
558 {
559     version(DerelictPQ_Static)
560         auto c = new Connection(params);
561     else
562     {
563         import dpq2.dynloader: DynamicLoader;
564 
565         auto dynamicLoader = new immutable DynamicLoader;
566         auto c = dynamicLoader.createConnection(params);
567     }
568 
569     return c;
570 }
571 
572 version (integration_tests)
573 void _integration_test( string connParam )
574 {
575     {
576         debug import std.experimental.logger;
577 
578         auto c = createTestConn(connParam);
579 
580         assert( PQlibVersion() >= 9_0100 );
581 
582         auto dbname = c.dbName();
583         auto pver = c.protocolVersion();
584         auto sver = c.serverVersion();
585 
586         debug
587         {
588             trace("DB name: ", dbname);
589             trace("Protocol version: ", pver);
590             trace("Server version: ", sver);
591         }
592 
593         destroy(c);
594     }
595 
596     {
597         bool exceptionFlag = false;
598 
599         try
600             auto c = createTestConn(ConnectionStart(), "!!!some incorrect connection string!!!");
601         catch(ConnectionException e)
602         {
603             exceptionFlag = true;
604             assert(e.msg.length > 40); // error message check
605         }
606         finally
607             assert(exceptionFlag);
608     }
609 
610     {
611         auto c = createTestConn(connParam);
612 
613         assert(c.escapeLiteral("abc'def") == "'abc''def'");
614         assert(c.escapeIdentifier("abc'def") == "\"abc'def\"");
615 
616         c.setClientEncoding("WIN866");
617         assert(c.exec("show client_encoding")[0][0].as!string == "WIN866");
618     }
619 
620     {
621         auto c = createTestConn(connParam);
622 
623         assert(c.transactionStatus == PQTRANS_IDLE);
624 
625         c.exec("BEGIN");
626         assert(c.transactionStatus == PQTRANS_INTRANS);
627 
628         try c.exec("DISCARD ALL");
629         catch (Exception) {}
630         assert(c.transactionStatus == PQTRANS_INERROR);
631 
632         c.exec("ROLLBACK");
633         assert(c.transactionStatus == PQTRANS_IDLE);
634     }
635 
636     {
637         import std.exception: assertThrown;
638 
639         string[string] kv;
640         kv["host"] = "wrong-host";
641         kv["dbname"] = "wrong-db-name";
642 
643         assertThrown!ConnectionException(createTestConn(kv));
644         assertThrown!ConnectionException(createTestConn(ConnectionStart(), kv));
645     }
646 }