1 module dpq2.connection;
2 
3 import dpq2.query;
4 import dpq2.args: QueryParams;
5 import dpq2.result;
6 import dpq2.exception;
7 
8 import derelict.pq.pq;
9 import std.conv: to;
10 import std.string: toStringz, fromStringz;
11 import std.exception: enforce, enforceEx;
12 import std.range;
13 import std.stdio: File;
14 import std.socket;
15 import core.exception;
16 import core.time: Duration;
17 
18 /*
19  * Bugs: On Unix connection is not thread safe.
20  * 
21  * On Unix, forking a process with open libpq connections can lead
22  * to unpredictable results because the parent and child processes share
23  * the same sockets and operating system resources. For this reason,
24  * such usage is not recommended, though doing an exec from the child
25  * process to load a new executable is safe.
26 
27 
28 
29 int PQisthreadsafe();
30 Returns 1 if the libpq is thread-safe and 0 if it is not.
31 */
32 
33 /// dumb flag for Connection ctor parametrization
34 struct ConnectionStart {};
35 
36 /// Connection
37 class Connection
38 {
39     package PGconn* conn;
40 
41     invariant
42     {
43         assert(conn !is null);
44     }
45 
46     this(string connString)
47     {
48         conn = PQconnectdb(toStringz(connString));
49 
50         enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
51 
52         if(status != CONNECTION_OK)
53             throw new ConnectionException(this, __FILE__, __LINE__);
54     }
55 
56 	/// Connect to DB in a nonblocking manner
57     this(ConnectionStart, string connString)
58     {
59         conn = PQconnectStart(toStringz(connString));
60 
61         enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
62 
63         if( status == CONNECTION_BAD )
64             throw new ConnectionException(this, __FILE__, __LINE__);
65     }
66 
67     ~this()
68     {
69         PQfinish( conn );
70     }
71 
72     mixin Queries;
73 
74     bool isNonBlocking()
75     {
76         return PQisnonblocking(conn) == 1;
77     }
78 
79     private void setNonBlocking( bool state )
80     {
81         if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 )
82             throw new ConnectionException(this, __FILE__, __LINE__);
83     }
84 
85     void resetStart()
86     {
87         if(PQresetStart(conn) == 0)
88             throw new ConnectionException(this, __FILE__, __LINE__);
89     }
90 
91     PostgresPollingStatusType poll() nothrow
92     {
93         assert(conn);
94 
95         return PQconnectPoll(conn);
96     }
97 
98     PostgresPollingStatusType resetPoll() nothrow
99     {
100         assert(conn);
101 
102         return PQresetPoll(conn);
103     }
104 
105     ConnStatusType status() nothrow
106     {
107         return PQstatus(conn);
108     }
109 
110     void consumeInput()
111     {
112         assert(conn);
113 
114         const size_t r = PQconsumeInput( conn );
115         if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__);
116     }
117     
118     package bool flush()
119     {
120         assert(conn);
121 
122         auto r = PQflush(conn);
123         if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__);
124         return r == 0;
125     }
126 
127     int posixSocket()
128     {
129         int r = PQsocket(conn);
130 
131         if(r == -1)
132             throw new ConnectionException(this, __FILE__, __LINE__);
133 
134         return r;
135     }
136 
137     Socket socket()
138     {
139         version(Windows)
140         {
141             assert(false, "FIXME: implement socket duplication");
142         }
143         else // Posix
144         {
145             import core.sys.posix.unistd: dup;
146 
147             socket_t s = cast(socket_t) dup(cast(socket_t) posixSocket);
148             return new Socket(s, AddressFamily.UNSPEC);
149         }
150     }
151 
152     string errorMessage() const nothrow
153     {
154         return PQerrorMessage(conn).to!string;
155     }
156 
157     /**
158      * returns the previous notice receiver or processor function pointer, and sets the new value.
159      * If you supply a null function pointer, no action is taken, but the current pointer is returned.
160      */
161     PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow
162     {
163         assert(conn);
164 
165         return PQsetNoticeProcessor(conn, proc, arg);
166     }
167 
168     /// Get for the next result from a sendQuery. Can return null.
169     immutable(Result) getResult()
170     {
171         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
172         auto r = cast(immutable) PQgetResult(conn);
173 
174         if(r)
175         {
176             auto container = new immutable ResultContainer(r);
177             return new immutable Result(container);
178         }
179 
180         return null;
181     }
182 
183     /// Get result from PQexec* functions or throw error if pull is empty
184     package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const
185     {
186         if(r is null) throw new ConnectionException(this, __FILE__, __LINE__);
187 
188         return new immutable ResultContainer(r);
189     }
190 
191     bool setSingleRowMode()
192     {
193         return PQsetSingleRowMode(conn) == 1;
194     }
195 
196     /**
197      If the cancellation is effective, the current command will
198      terminate early and return an error result (exception). If the
199      cancellation fails (say, because the server was already done
200      processing the command), then there will be no visible result at
201      all.
202     */
203     void cancel()
204     {
205         auto c = new Cancellation(this);
206         c.doCancel;
207     }
208 
209     bool isBusy() nothrow
210     {
211         assert(conn);
212 
213         return PQisBusy(conn) == 1;
214     }
215 
216     string parameterStatus(string paramName)
217     {
218         assert(conn);
219 
220         auto res = PQparameterStatus(conn, toStringz(paramName));
221 
222         if(res is null)
223             throw new ConnectionException(this, __FILE__, __LINE__);
224 
225         return to!string(fromStringz(res));
226     }
227 
228     string escapeLiteral(string msg)
229     {
230         assert(conn);
231 
232         auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length);
233 
234         if(buf is null)
235             throw new ConnectionException(this, __FILE__, __LINE__);
236 
237         string res = buf.fromStringz.to!string;
238 
239         PQfreemem(buf);
240 
241         return res;
242     }
243 
244     string escapeIdentifier(string msg)
245     {
246         assert(conn);
247 
248         auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length);
249 
250         if(buf is null)
251             throw new ConnectionException(this, __FILE__, __LINE__);
252 
253         string res = buf.fromStringz.to!string;
254 
255         PQfreemem(buf);
256 
257         return res;
258     }
259 
260     string dbName() const nothrow
261     {
262         assert(conn);
263 
264         return PQdb(conn).fromStringz.to!string;
265     }
266 
267     string host() const nothrow
268     {
269         assert(conn);
270 
271         return PQhost(conn).fromStringz.to!string;
272     }
273 
274     int protocolVersion() const nothrow
275     {
276         assert(conn);
277 
278         return PQprotocolVersion(conn);
279     }
280 
281     int serverVersion() const nothrow
282     {
283         assert(conn);
284 
285         return PQserverVersion(conn);
286     }
287 
288     void trace(ref File stream)
289     {
290         PQtrace(conn, stream.getFP);
291     }
292 
293     void untrace()
294     {
295         PQuntrace(conn);
296     }
297 
298     void setClientEncoding(string encoding)
299     {
300         if(PQsetClientEncoding(conn, encoding.toStringz) != 0)
301             throw new ConnectionException(this, __FILE__, __LINE__);
302     }
303 }
304 
305 void connStringCheck(string connString)
306 {
307     char* errmsg = null;
308     PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg);
309 
310     if(r is null)
311     {
312         enforceEx!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data");
313     }
314     else
315     {
316         PQconninfoFree(r);
317     }
318 
319     if(errmsg !is null)
320     {
321         string s = errmsg.fromStringz.to!string;
322         PQfreemem(cast(void*) errmsg);
323 
324         throw new ConnectionException(s, __FILE__, __LINE__);
325     }
326 }
327 
328 unittest
329 {
330     connStringCheck("dbname=postgres user=postgres");
331 
332     {
333         bool raised = false;
334 
335         try
336             connStringCheck("wrong conninfo string");
337         catch(ConnectionException e)
338             raised = true;
339 
340         assert(raised);
341     }
342 }
343 
344 /// Doing canceling queries in progress
345 class Cancellation
346 {
347     private PGcancel* cancel;
348 
349     this(Connection c)
350     {
351         cancel = PQgetCancel(c.conn);
352 
353         if(cancel is null)
354             throw new ConnectionException(c, __FILE__, __LINE__);
355     }
356 
357     ~this()
358     {
359         PQfreeCancel(cancel);
360     }
361 
362     /**
363      Successful dispatch is no guarantee that the request will have any
364      effect, however. If the cancellation is effective, the current
365      command will terminate early and return an error result
366      (exception). If the cancellation fails (say, because the server
367      was already done processing the command), then there will be no
368      visible result at all.
369     */
370     void doCancel()
371     {
372         char[256] errbuf;
373         auto res = PQcancel(cancel, errbuf.ptr, errbuf.length);
374 
375         if(res != 1)
376             throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__);
377     }
378 }
379 
380 class CancellationException : Dpq2Exception
381 {
382     this(string msg, string file, size_t line)
383     {
384         super(msg, file, line);
385     }
386 }
387 
388 /// Connection exception
389 class ConnectionException : Dpq2Exception
390 {
391     this(in Connection c, string file, size_t line)
392     {
393         super(c.errorMessage(), file, line);
394     }
395 
396     this(string msg, string file, size_t line)
397     {
398         super(msg, file, line);
399     }
400 }
401 
402 void _integration_test( string connParam )
403 {
404     assert( PQlibVersion() >= 9_0100 );
405 
406     {
407         debug import std.experimental.logger;
408 
409         auto c = new Connection(connParam);
410         auto dbname = c.dbName();
411         auto pver = c.protocolVersion();
412         auto sver = c.serverVersion();
413 
414         debug
415         {
416             trace("DB name: ", dbname);
417             trace("Protocol version: ", pver);
418             trace("Server version: ", sver);
419         }
420 
421         destroy(c);
422     }
423 
424     {
425         bool exceptionFlag = false;
426 
427         try
428             auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!");
429         catch(ConnectionException e)
430         {
431             exceptionFlag = true;
432             assert(e.msg.length > 40); // error message check
433         }
434         finally
435             assert(exceptionFlag);
436     }
437 
438     {
439         auto c = new Connection(connParam);
440 
441         assert(c.escapeLiteral("abc'def") == "'abc''def'");
442         assert(c.escapeIdentifier("abc'def") == "\"abc'def\"");
443 
444         c.setClientEncoding("WIN866");
445         assert(c.exec("show client_encoding")[0][0].as!string == "WIN866");
446     }
447 }