1 /**
2  * Represents connection to the PostgreSQL server
3  *
4  * Most functions is correspond to those in the documentation of Postgres:
5  * $(HTTPS https://www.postgresql.org/docs/current/static/libpq.html)
6  */
7 module dpq2.connection;
8 
9 import dpq2.query;
10 import dpq2.args: QueryParams;
11 import dpq2.result;
12 import dpq2.exception;
13 
14 import derelict.pq.pq;
15 import std.conv: to;
16 import std..string: toStringz, fromStringz;
17 import std.exception: enforce, enforceEx;
18 import std.range;
19 import std.stdio: File;
20 import std.socket;
21 import core.exception;
22 import core.time: Duration;
23 
24 /*
25  * Bugs: On Unix connection is not thread safe.
26  *
27  * On Unix, forking a process with open libpq connections can lead
28  * to unpredictable results because the parent and child processes share
29  * the same sockets and operating system resources. For this reason,
30  * such usage is not recommended, though doing an exec from the child
31  * process to load a new executable is safe.
32 
33 
34 
35 int PQisthreadsafe();
36 Returns 1 if the libpq is thread-safe and 0 if it is not.
37 */
38 
39 /// dumb flag for Connection ctor parametrization
40 struct ConnectionStart {};
41 
42 /// Connection
43 class Connection
44 {
45     package PGconn* conn;
46 
47     invariant
48     {
49         assert(conn !is null);
50     }
51 
52     /// Makes a new connection to the database server
53     this(string connString)
54     {
55         conn = PQconnectdb(toStringz(connString));
56 
57         enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
58 
59         if(status != CONNECTION_OK)
60             throw new ConnectionException(this, __FILE__, __LINE__);
61     }
62 
63 	/// Starts creation of a connection to the database server in a nonblocking manner
64     this(ConnectionStart, string connString)
65     {
66         conn = PQconnectStart(toStringz(connString));
67 
68         enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
69 
70         if( status == CONNECTION_BAD )
71             throw new ConnectionException(this, __FILE__, __LINE__);
72     }
73 
74     ~this()
75     {
76         PQfinish( conn );
77     }
78 
79     mixin Queries;
80 
81     /// Returns the blocking status of the database connection
82     bool isNonBlocking()
83     {
84         return PQisnonblocking(conn) == 1;
85     }
86 
87     /// Sets the nonblocking status of the connection
88     private void setNonBlocking(bool state)
89     {
90         if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 )
91             throw new ConnectionException(this, __FILE__, __LINE__);
92     }
93 
94     /// Begin reset the communication channel to the server, in a nonblocking manner
95     ///
96     /// Useful only for non-blocking operations.
97     void resetStart()
98     {
99         if(PQresetStart(conn) == 0)
100             throw new ConnectionException(this, __FILE__, __LINE__);
101     }
102 
103     /// Useful only for non-blocking operations.
104     PostgresPollingStatusType poll() nothrow
105     {
106         assert(conn);
107 
108         return PQconnectPoll(conn);
109     }
110 
111     /// Useful only for non-blocking operations.
112     PostgresPollingStatusType resetPoll() nothrow
113     {
114         assert(conn);
115 
116         return PQresetPoll(conn);
117     }
118 
119     /// Returns the status of the connection
120     ConnStatusType status() nothrow
121     {
122         return PQstatus(conn);
123     }
124 
125     /// If input is available from the server, consume it
126     ///
127     /// Useful only for non-blocking operations.
128     void consumeInput()
129     {
130         assert(conn);
131 
132         const size_t r = PQconsumeInput( conn );
133         if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__);
134     }
135 
136     package bool flush()
137     {
138         assert(conn);
139 
140         auto r = PQflush(conn);
141         if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__);
142         return r == 0;
143     }
144 
145     /// Obtains the file descriptor number of the connection socket to the server
146     int posixSocket()
147     {
148         int r = PQsocket(conn);
149 
150         if(r == -1)
151             throw new ConnectionException(this, __FILE__, __LINE__);
152 
153         return r;
154     }
155 
156     /// Obtains duplicate file descriptor number of the connection socket to the server
157     socket_t posixSocketDuplicate()
158     {
159         version(Windows)
160         {
161             static assert(false, "FIXME: implement socket duplication");
162         }
163         else // Posix OS
164         {
165             import core.sys.posix.unistd: dup;
166 
167             return cast(socket_t) dup(cast(socket_t) posixSocket);
168         }
169     }
170 
171     /// Obtains std.socket.Socket of the connection to the server
172     ///
173     /// Due to a limitation of Socket actually for the Socket creation
174     /// duplicate of internal posix socket will be used.
175     Socket socket()
176     {
177         return new Socket(posixSocketDuplicate, AddressFamily.UNSPEC);
178     }
179 
180     /// Returns the error message most recently generated by an operation on the connection
181     string errorMessage() const nothrow
182     {
183         return PQerrorMessage(conn).to!string;
184     }
185 
186     /**
187      * Sets or examines the current notice processor
188      *
189      * Returns the previous notice receiver or processor function pointer, and sets the new value.
190      * If you supply a null function pointer, no action is taken, but the current pointer is returned.
191      */
192     PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow
193     {
194         assert(conn);
195 
196         return PQsetNoticeProcessor(conn, proc, arg);
197     }
198 
199     /// Get next result after sending a non-blocking commands. Can return null.
200     ///
201     /// Useful only for non-blocking operations.
202     immutable(Result) getResult()
203     {
204         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
205         auto r = cast(immutable) PQgetResult(conn);
206 
207         if(r)
208         {
209             auto container = new immutable ResultContainer(r);
210             return new immutable Result(container);
211         }
212 
213         return null;
214     }
215 
216     /// Get result after PQexec* functions or throw exception if pull is empty
217     package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const
218     {
219         if(r is null) throw new ConnectionException(this, __FILE__, __LINE__);
220 
221         return new immutable ResultContainer(r);
222     }
223 
224     /// Select single-row mode for the currently-executing query
225     bool setSingleRowMode()
226     {
227         return PQsetSingleRowMode(conn) == 1;
228     }
229 
230     /**
231      Try to cancel query
232 
233      If the cancellation is effective, the current command will
234      terminate early and return an error result or exception. If the
235      cancellation will fails (say, because the server was already done
236      processing the command) there will be no visible result at all.
237     */
238     void cancel()
239     {
240         auto c = new Cancellation(this);
241         c.doCancel;
242     }
243 
244     ///
245     bool isBusy() nothrow
246     {
247         assert(conn);
248 
249         return PQisBusy(conn) == 1;
250     }
251 
252     ///
253     string parameterStatus(string paramName)
254     {
255         assert(conn);
256 
257         auto res = PQparameterStatus(conn, toStringz(paramName));
258 
259         if(res is null)
260             throw new ConnectionException(this, __FILE__, __LINE__);
261 
262         return to!string(fromStringz(res));
263     }
264 
265     ///
266     string escapeLiteral(string msg)
267     {
268         assert(conn);
269 
270         auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length);
271 
272         if(buf is null)
273             throw new ConnectionException(this, __FILE__, __LINE__);
274 
275         string res = buf.fromStringz.to!string;
276 
277         PQfreemem(buf);
278 
279         return res;
280     }
281 
282     ///
283     string escapeIdentifier(string msg)
284     {
285         assert(conn);
286 
287         auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length);
288 
289         if(buf is null)
290             throw new ConnectionException(this, __FILE__, __LINE__);
291 
292         string res = buf.fromStringz.to!string;
293 
294         PQfreemem(buf);
295 
296         return res;
297     }
298 
299     ///
300     string dbName() const nothrow
301     {
302         assert(conn);
303 
304         return PQdb(conn).fromStringz.to!string;
305     }
306 
307     ///
308     string host() const nothrow
309     {
310         assert(conn);
311 
312         return PQhost(conn).fromStringz.to!string;
313     }
314 
315     ///
316     int protocolVersion() const nothrow
317     {
318         assert(conn);
319 
320         return PQprotocolVersion(conn);
321     }
322 
323     ///
324     int serverVersion() const nothrow
325     {
326         assert(conn);
327 
328         return PQserverVersion(conn);
329     }
330 
331     ///
332     void trace(ref File stream)
333     {
334         PQtrace(conn, stream.getFP);
335     }
336 
337     ///
338     void untrace()
339     {
340         PQuntrace(conn);
341     }
342 
343     ///
344     void setClientEncoding(string encoding)
345     {
346         if(PQsetClientEncoding(conn, encoding.toStringz) != 0)
347             throw new ConnectionException(this, __FILE__, __LINE__);
348     }
349 }
350 
351 /// Check connection options in the provided connection string
352 ///
353 /// Throws exception if connection string isn't passes check.
354 void connStringCheck(string connString)
355 {
356     char* errmsg = null;
357     PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg);
358 
359     if(r is null)
360     {
361         enforceEx!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data");
362     }
363     else
364     {
365         PQconninfoFree(r);
366     }
367 
368     if(errmsg !is null)
369     {
370         string s = errmsg.fromStringz.to!string;
371         PQfreemem(cast(void*) errmsg);
372 
373         throw new ConnectionException(s, __FILE__, __LINE__);
374     }
375 }
376 
377 unittest
378 {
379     connStringCheck("dbname=postgres user=postgres");
380 
381     {
382         bool raised = false;
383 
384         try
385             connStringCheck("wrong conninfo string");
386         catch(ConnectionException e)
387             raised = true;
388 
389         assert(raised);
390     }
391 }
392 
393 /// Represents query cancellation process
394 class Cancellation
395 {
396     private PGcancel* cancel;
397 
398     ///
399     this(Connection c)
400     {
401         cancel = PQgetCancel(c.conn);
402 
403         if(cancel is null)
404             throw new ConnectionException(c, __FILE__, __LINE__);
405     }
406 
407     ///
408     ~this()
409     {
410         PQfreeCancel(cancel);
411     }
412 
413     /**
414      Requests that the server abandon processing of the current command
415 
416      Throws exception if cancel request was not successfully dispatched.
417 
418      Successful dispatch is no guarantee that the request will have any
419      effect, however. If the cancellation is effective, the current
420      command will terminate early and return an error result
421      (exception). If the cancellation fails (say, because the server
422      was already done processing the command), then there will be no
423      visible result at all.
424     */
425     void doCancel()
426     {
427         char[256] errbuf;
428         auto res = PQcancel(cancel, errbuf.ptr, errbuf.length);
429 
430         if(res != 1)
431             throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__);
432     }
433 }
434 
435 ///
436 class CancellationException : Dpq2Exception
437 {
438     this(string msg, string file, size_t line)
439     {
440         super(msg, file, line);
441     }
442 }
443 
444 /// Connection exception
445 class ConnectionException : Dpq2Exception
446 {
447     this(in Connection c, string file, size_t line)
448     {
449         super(c.errorMessage(), file, line);
450     }
451 
452     this(string msg, string file, size_t line)
453     {
454         super(msg, file, line);
455     }
456 }
457 
458 version (integration_tests)
459 void _integration_test( string connParam )
460 {
461     assert( PQlibVersion() >= 9_0100 );
462 
463     {
464         debug import std.experimental.logger;
465 
466         auto c = new Connection(connParam);
467         auto dbname = c.dbName();
468         auto pver = c.protocolVersion();
469         auto sver = c.serverVersion();
470 
471         debug
472         {
473             trace("DB name: ", dbname);
474             trace("Protocol version: ", pver);
475             trace("Server version: ", sver);
476         }
477 
478         destroy(c);
479     }
480 
481     {
482         bool exceptionFlag = false;
483 
484         try
485             auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!");
486         catch(ConnectionException e)
487         {
488             exceptionFlag = true;
489             assert(e.msg.length > 40); // error message check
490         }
491         finally
492             assert(exceptionFlag);
493     }
494 
495     {
496         auto c = new Connection(connParam);
497 
498         assert(c.escapeLiteral("abc'def") == "'abc''def'");
499         assert(c.escapeIdentifier("abc'def") == "\"abc'def\"");
500 
501         c.setClientEncoding("WIN866");
502         assert(c.exec("show client_encoding")[0][0].as!string == "WIN866");
503     }
504 }