1 module dpq2.connection;
2 
3 import dpq2.query;
4 import dpq2.args: QueryParams;
5 import dpq2.result;
6 import dpq2.exception;
7 
8 import derelict.pq.pq;
9 import std.conv: to;
10 import std..string: toStringz, fromStringz;
11 import std.exception: enforce, enforceEx;
12 import std.range;
13 import std.stdio: File;
14 import std.socket;
15 import core.exception;
16 import core.time: Duration;
17 
18 /*
19  * Bugs: On Unix connection is not thread safe.
20  *
21  * On Unix, forking a process with open libpq connections can lead
22  * to unpredictable results because the parent and child processes share
23  * the same sockets and operating system resources. For this reason,
24  * such usage is not recommended, though doing an exec from the child
25  * process to load a new executable is safe.
26 
27 
28 
29 int PQisthreadsafe();
30 Returns 1 if the libpq is thread-safe and 0 if it is not.
31 */
32 
33 /// dumb flag for Connection ctor parametrization
34 struct ConnectionStart {};
35 
36 /// Connection
37 class Connection
38 {
39     package PGconn* conn;
40 
41     invariant
42     {
43         assert(conn !is null);
44     }
45 
46     this(string connString)
47     {
48         conn = PQconnectdb(toStringz(connString));
49 
50         enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
51 
52         if(status != CONNECTION_OK)
53             throw new ConnectionException(this, __FILE__, __LINE__);
54     }
55 
56 	/// Connect to DB in a nonblocking manner
57     this(ConnectionStart, string connString)
58     {
59         conn = PQconnectStart(toStringz(connString));
60 
61         enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
62 
63         if( status == CONNECTION_BAD )
64             throw new ConnectionException(this, __FILE__, __LINE__);
65     }
66 
67     ~this()
68     {
69         PQfinish( conn );
70     }
71 
72     mixin Queries;
73 
74     bool isNonBlocking()
75     {
76         return PQisnonblocking(conn) == 1;
77     }
78 
79     private void setNonBlocking( bool state )
80     {
81         if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 )
82             throw new ConnectionException(this, __FILE__, __LINE__);
83     }
84 
85     void resetStart()
86     {
87         if(PQresetStart(conn) == 0)
88             throw new ConnectionException(this, __FILE__, __LINE__);
89     }
90 
91     PostgresPollingStatusType poll() nothrow
92     {
93         assert(conn);
94 
95         return PQconnectPoll(conn);
96     }
97 
98     PostgresPollingStatusType resetPoll() nothrow
99     {
100         assert(conn);
101 
102         return PQresetPoll(conn);
103     }
104 
105     ConnStatusType status() nothrow
106     {
107         return PQstatus(conn);
108     }
109 
110     void consumeInput()
111     {
112         assert(conn);
113 
114         const size_t r = PQconsumeInput( conn );
115         if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__);
116     }
117 
118     package bool flush()
119     {
120         assert(conn);
121 
122         auto r = PQflush(conn);
123         if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__);
124         return r == 0;
125     }
126 
127     int posixSocket()
128     {
129         int r = PQsocket(conn);
130 
131         if(r == -1)
132             throw new ConnectionException(this, __FILE__, __LINE__);
133 
134         return r;
135     }
136 
137     socket_t posixSocketDuplicate()
138     {
139         version(Windows)
140         {
141             static assert(false, "FIXME: implement socket duplication");
142         }
143         else // Posix OS
144         {
145             import core.sys.posix.unistd: dup;
146 
147             return cast(socket_t) dup(cast(socket_t) posixSocket);
148         }
149     }
150 
151     Socket socket()
152     {
153         return new Socket(posixSocketDuplicate, AddressFamily.UNSPEC);
154     }
155 
156     string errorMessage() const nothrow
157     {
158         return PQerrorMessage(conn).to!string;
159     }
160 
161     /**
162      * returns the previous notice receiver or processor function pointer, and sets the new value.
163      * If you supply a null function pointer, no action is taken, but the current pointer is returned.
164      */
165     PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow
166     {
167         assert(conn);
168 
169         return PQsetNoticeProcessor(conn, proc, arg);
170     }
171 
172     /// Get for the next result from a sendQuery. Can return null.
173     immutable(Result) getResult()
174     {
175         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
176         auto r = cast(immutable) PQgetResult(conn);
177 
178         if(r)
179         {
180             auto container = new immutable ResultContainer(r);
181             return new immutable Result(container);
182         }
183 
184         return null;
185     }
186 
187     /// Get result from PQexec* functions or throw error if pull is empty
188     package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const
189     {
190         if(r is null) throw new ConnectionException(this, __FILE__, __LINE__);
191 
192         return new immutable ResultContainer(r);
193     }
194 
195     bool setSingleRowMode()
196     {
197         return PQsetSingleRowMode(conn) == 1;
198     }
199 
200     /**
201      If the cancellation is effective, the current command will
202      terminate early and return an error result (exception). If the
203      cancellation fails (say, because the server was already done
204      processing the command), then there will be no visible result at
205      all.
206     */
207     void cancel()
208     {
209         auto c = new Cancellation(this);
210         c.doCancel;
211     }
212 
213     bool isBusy() nothrow
214     {
215         assert(conn);
216 
217         return PQisBusy(conn) == 1;
218     }
219 
220     string parameterStatus(string paramName)
221     {
222         assert(conn);
223 
224         auto res = PQparameterStatus(conn, toStringz(paramName));
225 
226         if(res is null)
227             throw new ConnectionException(this, __FILE__, __LINE__);
228 
229         return to!string(fromStringz(res));
230     }
231 
232     string escapeLiteral(string msg)
233     {
234         assert(conn);
235 
236         auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length);
237 
238         if(buf is null)
239             throw new ConnectionException(this, __FILE__, __LINE__);
240 
241         string res = buf.fromStringz.to!string;
242 
243         PQfreemem(buf);
244 
245         return res;
246     }
247 
248     string escapeIdentifier(string msg)
249     {
250         assert(conn);
251 
252         auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length);
253 
254         if(buf is null)
255             throw new ConnectionException(this, __FILE__, __LINE__);
256 
257         string res = buf.fromStringz.to!string;
258 
259         PQfreemem(buf);
260 
261         return res;
262     }
263 
264     string dbName() const nothrow
265     {
266         assert(conn);
267 
268         return PQdb(conn).fromStringz.to!string;
269     }
270 
271     string host() const nothrow
272     {
273         assert(conn);
274 
275         return PQhost(conn).fromStringz.to!string;
276     }
277 
278     int protocolVersion() const nothrow
279     {
280         assert(conn);
281 
282         return PQprotocolVersion(conn);
283     }
284 
285     int serverVersion() const nothrow
286     {
287         assert(conn);
288 
289         return PQserverVersion(conn);
290     }
291 
292     void trace(ref File stream)
293     {
294         PQtrace(conn, stream.getFP);
295     }
296 
297     void untrace()
298     {
299         PQuntrace(conn);
300     }
301 
302     void setClientEncoding(string encoding)
303     {
304         if(PQsetClientEncoding(conn, encoding.toStringz) != 0)
305             throw new ConnectionException(this, __FILE__, __LINE__);
306     }
307 }
308 
309 void connStringCheck(string connString)
310 {
311     char* errmsg = null;
312     PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg);
313 
314     if(r is null)
315     {
316         enforceEx!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data");
317     }
318     else
319     {
320         PQconninfoFree(r);
321     }
322 
323     if(errmsg !is null)
324     {
325         string s = errmsg.fromStringz.to!string;
326         PQfreemem(cast(void*) errmsg);
327 
328         throw new ConnectionException(s, __FILE__, __LINE__);
329     }
330 }
331 
332 unittest
333 {
334     connStringCheck("dbname=postgres user=postgres");
335 
336     {
337         bool raised = false;
338 
339         try
340             connStringCheck("wrong conninfo string");
341         catch(ConnectionException e)
342             raised = true;
343 
344         assert(raised);
345     }
346 }
347 
348 /// Doing canceling queries in progress
349 class Cancellation
350 {
351     private PGcancel* cancel;
352 
353     this(Connection c)
354     {
355         cancel = PQgetCancel(c.conn);
356 
357         if(cancel is null)
358             throw new ConnectionException(c, __FILE__, __LINE__);
359     }
360 
361     ~this()
362     {
363         PQfreeCancel(cancel);
364     }
365 
366     /**
367      Successful dispatch is no guarantee that the request will have any
368      effect, however. If the cancellation is effective, the current
369      command will terminate early and return an error result
370      (exception). If the cancellation fails (say, because the server
371      was already done processing the command), then there will be no
372      visible result at all.
373     */
374     void doCancel()
375     {
376         char[256] errbuf;
377         auto res = PQcancel(cancel, errbuf.ptr, errbuf.length);
378 
379         if(res != 1)
380             throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__);
381     }
382 }
383 
384 class CancellationException : Dpq2Exception
385 {
386     this(string msg, string file, size_t line)
387     {
388         super(msg, file, line);
389     }
390 }
391 
392 /// Connection exception
393 class ConnectionException : Dpq2Exception
394 {
395     this(in Connection c, string file, size_t line)
396     {
397         super(c.errorMessage(), file, line);
398     }
399 
400     this(string msg, string file, size_t line)
401     {
402         super(msg, file, line);
403     }
404 }
405 
406 void _integration_test( string connParam )
407 {
408     assert( PQlibVersion() >= 9_0100 );
409 
410     {
411         debug import std.experimental.logger;
412 
413         auto c = new Connection(connParam);
414         auto dbname = c.dbName();
415         auto pver = c.protocolVersion();
416         auto sver = c.serverVersion();
417 
418         debug
419         {
420             trace("DB name: ", dbname);
421             trace("Protocol version: ", pver);
422             trace("Server version: ", sver);
423         }
424 
425         destroy(c);
426     }
427 
428     {
429         bool exceptionFlag = false;
430 
431         try
432             auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!");
433         catch(ConnectionException e)
434         {
435             exceptionFlag = true;
436             assert(e.msg.length > 40); // error message check
437         }
438         finally
439             assert(exceptionFlag);
440     }
441 
442     {
443         auto c = new Connection(connParam);
444 
445         assert(c.escapeLiteral("abc'def") == "'abc''def'");
446         assert(c.escapeIdentifier("abc'def") == "\"abc'def\"");
447 
448         c.setClientEncoding("WIN866");
449         assert(c.exec("show client_encoding")[0][0].as!string == "WIN866");
450     }
451 }