1 /**
2  * Represents connection to the PostgreSQL server
3  *
4  * Most functions is correspond to those in the documentation of Postgres:
5  * $(HTTPS https://www.postgresql.org/docs/current/static/libpq.html)
6  */
7 module dpq2.connection;
8 
9 import dpq2.query;
10 import dpq2.args: QueryParams;
11 import dpq2.result;
12 import dpq2.exception;
13 
14 import derelict.pq.pq;
15 import std.conv: to;
16 import std.string: toStringz, fromStringz;
17 import std.exception: enforce;
18 import std.range;
19 import std.stdio: File;
20 import std.socket;
21 import core.exception;
22 import core.time: Duration;
23 
24 /*
25  * Bugs: On Unix connection is not thread safe.
26  *
27  * On Unix, forking a process with open libpq connections can lead
28  * to unpredictable results because the parent and child processes share
29  * the same sockets and operating system resources. For this reason,
30  * such usage is not recommended, though doing an exec from the child
31  * process to load a new executable is safe.
32 
33 
34 
35 int PQisthreadsafe();
36 Returns 1 if the libpq is thread-safe and 0 if it is not.
37 */
38 
39 /// dumb flag for Connection ctor parametrization
40 struct ConnectionStart {};
41 
42 /// Connection
43 class Connection
44 {
45     package PGconn* conn;
46 
47     invariant
48     {
49         assert(conn !is null);
50     }
51 
52     /// Makes a new connection to the database server
53     this(string connString)
54     {
55         conn = PQconnectdb(toStringz(connString));
56 
57         enforce!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
58 
59         if(status != CONNECTION_OK)
60             throw new ConnectionException(this, __FILE__, __LINE__);
61     }
62 
63 	/// Starts creation of a connection to the database server in a nonblocking manner
64     this(ConnectionStart, string connString)
65     {
66         conn = PQconnectStart(toStringz(connString));
67 
68         enforce!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
69 
70         if( status == CONNECTION_BAD )
71             throw new ConnectionException(this, __FILE__, __LINE__);
72     }
73 
74     ~this()
75     {
76         PQfinish( conn );
77     }
78 
79     mixin Queries;
80 
81     /// Returns the blocking status of the database connection
82     bool isNonBlocking()
83     {
84         return PQisnonblocking(conn) == 1;
85     }
86 
87     /// Sets the nonblocking status of the connection
88     private void setNonBlocking(bool state)
89     {
90         if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 )
91             throw new ConnectionException(this, __FILE__, __LINE__);
92     }
93 
94     /// Begin reset the communication channel to the server, in a nonblocking manner
95     ///
96     /// Useful only for non-blocking operations.
97     void resetStart()
98     {
99         if(PQresetStart(conn) == 0)
100             throw new ConnectionException(this, __FILE__, __LINE__);
101     }
102 
103     /// Useful only for non-blocking operations.
104     PostgresPollingStatusType poll() nothrow
105     {
106         assert(conn);
107 
108         return PQconnectPoll(conn);
109     }
110 
111     /// Useful only for non-blocking operations.
112     PostgresPollingStatusType resetPoll() nothrow
113     {
114         assert(conn);
115 
116         return PQresetPoll(conn);
117     }
118 
119     /// Returns the status of the connection
120     ConnStatusType status() nothrow
121     {
122         return PQstatus(conn);
123     }
124 
125     /**
126         Returns the current in-transaction status of the server.
127         The status can be:
128             * PQTRANS_IDLE    - currently idle
129             * PQTRANS_ACTIVE  - a command is in progress (reported only when a query has been sent to the server and not yet completed)
130             * PQTRANS_INTRANS - idle, in a valid transaction block
131             * PQTRANS_INERROR - idle, in a failed transaction block
132             * PQTRANS_UNKNOWN - reported if the connection is bad
133      */
134     PGTransactionStatusType transactionStatus() nothrow
135     {
136         return PQtransactionStatus(conn);
137     }
138 
139     /// If input is available from the server, consume it
140     ///
141     /// Useful only for non-blocking operations.
142     void consumeInput()
143     {
144         assert(conn);
145 
146         const size_t r = PQconsumeInput( conn );
147         if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__);
148     }
149 
150     package bool flush()
151     {
152         assert(conn);
153 
154         auto r = PQflush(conn);
155         if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__);
156         return r == 0;
157     }
158 
159     /// Obtains the file descriptor number of the connection socket to the server
160     int posixSocket()
161     {
162         int r = PQsocket(conn);
163 
164         if(r == -1)
165             throw new ConnectionException(this, __FILE__, __LINE__);
166 
167         return r;
168     }
169 
170     /// Obtains duplicate file descriptor number of the connection socket to the server
171     socket_t posixSocketDuplicate()
172     {
173         version(Windows)
174         {
175             assert(false, "FIXME: implement socket duplication");
176         }
177         else // Posix OS
178         {
179             import core.sys.posix.unistd: dup;
180 
181             return cast(socket_t) dup(cast(socket_t) posixSocket);
182         }
183     }
184 
185     /// Obtains std.socket.Socket of the connection to the server
186     ///
187     /// Due to a limitation of Socket actually for the Socket creation
188     /// duplicate of internal posix socket will be used.
189     Socket socket()
190     {
191         return new Socket(posixSocketDuplicate, AddressFamily.UNSPEC);
192     }
193 
194     /// Returns the error message most recently generated by an operation on the connection
195     string errorMessage() const nothrow
196     {
197         return PQerrorMessage(conn).to!string;
198     }
199 
200     /**
201      * Sets or examines the current notice processor
202      *
203      * Returns the previous notice receiver or processor function pointer, and sets the new value.
204      * If you supply a null function pointer, no action is taken, but the current pointer is returned.
205      */
206     PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow
207     {
208         assert(conn);
209 
210         return PQsetNoticeProcessor(conn, proc, arg);
211     }
212 
213     /// Get next result after sending a non-blocking commands. Can return null.
214     ///
215     /// Useful only for non-blocking operations.
216     immutable(Result) getResult()
217     {
218         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
219         auto r = cast(immutable) PQgetResult(conn);
220 
221         if(r)
222         {
223             auto container = new immutable ResultContainer(r);
224             return new immutable Result(container);
225         }
226 
227         return null;
228     }
229 
230     /// Get result after PQexec* functions or throw exception if pull is empty
231     package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const
232     {
233         if(r is null) throw new ConnectionException(this, __FILE__, __LINE__);
234 
235         return new immutable ResultContainer(r);
236     }
237 
238     /// Select single-row mode for the currently-executing query
239     bool setSingleRowMode()
240     {
241         return PQsetSingleRowMode(conn) == 1;
242     }
243 
244     /**
245      Try to cancel query
246 
247      If the cancellation is effective, the current command will
248      terminate early and return an error result or exception. If the
249      cancellation will fails (say, because the server was already done
250      processing the command) there will be no visible result at all.
251     */
252     void cancel()
253     {
254         auto c = new Cancellation(this);
255         c.doCancel;
256     }
257 
258     ///
259     bool isBusy() nothrow
260     {
261         assert(conn);
262 
263         return PQisBusy(conn) == 1;
264     }
265 
266     ///
267     string parameterStatus(string paramName)
268     {
269         assert(conn);
270 
271         auto res = PQparameterStatus(conn, toStringz(paramName));
272 
273         if(res is null)
274             throw new ConnectionException(this, __FILE__, __LINE__);
275 
276         return to!string(fromStringz(res));
277     }
278 
279     ///
280     string escapeLiteral(string msg)
281     {
282         assert(conn);
283 
284         auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length);
285 
286         if(buf is null)
287             throw new ConnectionException(this, __FILE__, __LINE__);
288 
289         string res = buf.fromStringz.to!string;
290 
291         PQfreemem(buf);
292 
293         return res;
294     }
295 
296     ///
297     string escapeIdentifier(string msg)
298     {
299         assert(conn);
300 
301         auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length);
302 
303         if(buf is null)
304             throw new ConnectionException(this, __FILE__, __LINE__);
305 
306         string res = buf.fromStringz.to!string;
307 
308         PQfreemem(buf);
309 
310         return res;
311     }
312 
313     ///
314     string dbName() const nothrow
315     {
316         assert(conn);
317 
318         return PQdb(conn).fromStringz.to!string;
319     }
320 
321     ///
322     string host() const nothrow
323     {
324         assert(conn);
325 
326         return PQhost(conn).fromStringz.to!string;
327     }
328 
329     ///
330     int protocolVersion() const nothrow
331     {
332         assert(conn);
333 
334         return PQprotocolVersion(conn);
335     }
336 
337     ///
338     int serverVersion() const nothrow
339     {
340         assert(conn);
341 
342         return PQserverVersion(conn);
343     }
344 
345     ///
346     void trace(ref File stream)
347     {
348         PQtrace(conn, stream.getFP);
349     }
350 
351     ///
352     void untrace()
353     {
354         PQuntrace(conn);
355     }
356 
357     ///
358     void setClientEncoding(string encoding)
359     {
360         if(PQsetClientEncoding(conn, encoding.toStringz) != 0)
361             throw new ConnectionException(this, __FILE__, __LINE__);
362     }
363 }
364 
365 /// Check connection options in the provided connection string
366 ///
367 /// Throws exception if connection string isn't passes check.
368 void connStringCheck(string connString)
369 {
370     char* errmsg = null;
371     PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg);
372 
373     if(r is null)
374     {
375         enforce!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data");
376     }
377     else
378     {
379         PQconninfoFree(r);
380     }
381 
382     if(errmsg !is null)
383     {
384         string s = errmsg.fromStringz.to!string;
385         PQfreemem(cast(void*) errmsg);
386 
387         throw new ConnectionException(s, __FILE__, __LINE__);
388     }
389 }
390 
391 unittest
392 {
393     connStringCheck("dbname=postgres user=postgres");
394 
395     {
396         bool raised = false;
397 
398         try
399             connStringCheck("wrong conninfo string");
400         catch(ConnectionException e)
401             raised = true;
402 
403         assert(raised);
404     }
405 }
406 
407 /// Represents query cancellation process
408 class Cancellation
409 {
410     private PGcancel* cancel;
411 
412     ///
413     this(Connection c)
414     {
415         cancel = PQgetCancel(c.conn);
416 
417         if(cancel is null)
418             throw new ConnectionException(c, __FILE__, __LINE__);
419     }
420 
421     ///
422     ~this()
423     {
424         PQfreeCancel(cancel);
425     }
426 
427     /**
428      Requests that the server abandon processing of the current command
429 
430      Throws exception if cancel request was not successfully dispatched.
431 
432      Successful dispatch is no guarantee that the request will have any
433      effect, however. If the cancellation is effective, the current
434      command will terminate early and return an error result
435      (exception). If the cancellation fails (say, because the server
436      was already done processing the command), then there will be no
437      visible result at all.
438     */
439     void doCancel()
440     {
441         char[256] errbuf;
442         auto res = PQcancel(cancel, errbuf.ptr, errbuf.length);
443 
444         if(res != 1)
445             throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__);
446     }
447 }
448 
449 ///
450 class CancellationException : Dpq2Exception
451 {
452     this(string msg, string file = __FILE__, size_t line = __LINE__)
453     {
454         super(msg, file, line);
455     }
456 }
457 
458 /// Connection exception
459 class ConnectionException : Dpq2Exception
460 {
461     this(in Connection c, string file = __FILE__, size_t line = __LINE__)
462     {
463         super(c.errorMessage(), file, line);
464     }
465 
466     this(string msg, string file = __FILE__, size_t line = __LINE__)
467     {
468         super(msg, file, line);
469     }
470 }
471 
472 version (integration_tests)
473 void _integration_test( string connParam )
474 {
475     assert( PQlibVersion() >= 9_0100 );
476 
477     {
478         debug import std.experimental.logger;
479 
480         auto c = new Connection(connParam);
481         auto dbname = c.dbName();
482         auto pver = c.protocolVersion();
483         auto sver = c.serverVersion();
484 
485         debug
486         {
487             trace("DB name: ", dbname);
488             trace("Protocol version: ", pver);
489             trace("Server version: ", sver);
490         }
491 
492         destroy(c);
493     }
494 
495     {
496         bool exceptionFlag = false;
497 
498         try
499             auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!");
500         catch(ConnectionException e)
501         {
502             exceptionFlag = true;
503             assert(e.msg.length > 40); // error message check
504         }
505         finally
506             assert(exceptionFlag);
507     }
508 
509     {
510         auto c = new Connection(connParam);
511 
512         assert(c.escapeLiteral("abc'def") == "'abc''def'");
513         assert(c.escapeIdentifier("abc'def") == "\"abc'def\"");
514 
515         c.setClientEncoding("WIN866");
516         assert(c.exec("show client_encoding")[0][0].as!string == "WIN866");
517     }
518 
519     {
520         auto c = new Connection(connParam);
521 
522         assert(c.transactionStatus == PQTRANS_IDLE);
523 
524         c.exec("BEGIN");
525         assert(c.transactionStatus == PQTRANS_INTRANS);
526 
527         try c.exec("DISCARD ALL");
528         catch (Exception) {}
529         assert(c.transactionStatus == PQTRANS_INERROR);
530 
531         c.exec("ROLLBACK");
532         assert(c.transactionStatus == PQTRANS_IDLE);
533     }
534 }