1 module dpq2.connection;
2 
3 import dpq2;
4 import dpq2.exception;
5 
6 import std.conv: to;
7 import std..string: toStringz, fromStringz;
8 import std.exception: enforce, enforceEx;
9 import std.range;
10 import std.stdio: File;
11 import std.socket;
12 import core.exception;
13 import core.time: Duration;
14 
15 /*
16  * Bugs: On Unix connection is not thread safe.
17  * 
18  * On Unix, forking a process with open libpq connections can lead
19  * to unpredictable results because the parent and child processes share
20  * the same sockets and operating system resources. For this reason,
21  * such usage is not recommended, though doing an exec from the child
22  * process to load a new executable is safe.
23 
24 
25 
26 int PQisthreadsafe();
27 Returns 1 if the libpq is thread-safe and 0 if it is not.
28 */
29 
30 /// dumb flag for Connection ctor parametrization
31 struct ConnectionStart {};
32 
33 /// Connection
34 class Connection
35 {
36     package PGconn* conn;
37 
38     invariant
39     {
40         assert(conn !is null);
41     }
42 
43     this(string connString)
44     {
45         conn = PQconnectdb(toStringz(connString));
46 
47         enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
48 
49         if(status != CONNECTION_OK)
50             throw new ConnectionException(this, __FILE__, __LINE__);
51     }
52 
53 	/// Connect to DB in a nonblocking manner
54     this(ConnectionStart, string connString)
55     {
56         conn = PQconnectStart(toStringz(connString));
57 
58         enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
59 
60         if( status == CONNECTION_BAD )
61             throw new ConnectionException(this, __FILE__, __LINE__);
62     }
63 
64     ~this()
65     {
66         PQfinish( conn );
67     }
68 
69     mixin Queries;
70 
71     bool isNonBlocking()
72     {
73         return PQisnonblocking(conn) == 1;
74     }
75 
76     private void setNonBlocking( bool state )
77     {
78         if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 )
79             throw new ConnectionException(this, __FILE__, __LINE__);
80     }
81 
82     void resetStart()
83     {
84         if(PQresetStart(conn) == 0)
85             throw new ConnectionException(this, __FILE__, __LINE__);
86     }
87 
88     PostgresPollingStatusType poll() nothrow
89     {
90         assert(conn);
91 
92         return PQconnectPoll(conn);
93     }
94 
95     PostgresPollingStatusType resetPoll() nothrow
96     {
97         assert(conn);
98 
99         return PQresetPoll(conn);
100     }
101 
102     ConnStatusType status() nothrow
103     {
104         return PQstatus(conn);
105     }
106 
107     void consumeInput()
108     {
109         assert(conn);
110 
111         const size_t r = PQconsumeInput( conn );
112         if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__);
113     }
114     
115     package bool flush()
116     {
117         assert(conn);
118 
119         auto r = PQflush(conn);
120         if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__);
121         return r == 0;
122     }
123 
124     int posixSocket()
125     {
126         int r = PQsocket(conn);
127 
128         if(r == -1)
129             throw new ConnectionException(this, __FILE__, __LINE__);
130 
131         return r;
132     }
133 
134     Socket socket()
135     {
136         version(Windows)
137         {
138             assert(false, "FIXME: implement socket duplication");
139         }
140         else // Posix
141         {
142             import core.sys.posix.unistd: dup;
143 
144             socket_t s = cast(socket_t) dup(cast(socket_t) posixSocket);
145             return new Socket(s, AddressFamily.UNSPEC);
146         }
147     }
148 
149     string errorMessage() const nothrow
150     {
151         return PQerrorMessage(conn).to!string;
152     }
153 
154     /**
155      * returns the previous notice receiver or processor function pointer, and sets the new value.
156      * If you supply a null function pointer, no action is taken, but the current pointer is returned.
157      */
158     PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow
159     {
160         assert(conn);
161 
162         return PQsetNoticeProcessor(conn, proc, arg);
163     }
164 
165     /// Get for the next result from a sendQuery. Can return null.
166     immutable(Result) getResult()
167     {
168         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
169         auto r = cast(immutable) PQgetResult(conn);
170 
171         if(r)
172         {
173             auto container = new immutable ResultContainer(r);
174             return new immutable Result(container);
175         }
176 
177         return null;
178     }
179 
180     /// Get result from PQexec* functions or throw error if pull is empty
181     package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const
182     {
183         if(r is null) throw new ConnectionException(this, __FILE__, __LINE__);
184 
185         return new immutable ResultContainer(r);
186     }
187 
188     bool setSingleRowMode()
189     {
190         return PQsetSingleRowMode(conn) == 1;
191     }
192 
193     /**
194      If the cancellation is effective, the current command will
195      terminate early and return an error result (exception). If the
196      cancellation fails (say, because the server was already done
197      processing the command), then there will be no visible result at
198      all.
199     */
200     void cancel()
201     {
202         auto c = new Cancellation(this);
203         c.doCancel;
204     }
205 
206     bool isBusy() nothrow
207     {
208         assert(conn);
209 
210         return PQisBusy(conn) == 1;
211     }
212 
213     string parameterStatus(string paramName)
214     {
215         assert(conn);
216 
217         auto res = PQparameterStatus(conn, toStringz(paramName));
218 
219         if(res is null)
220             throw new ConnectionException(this, __FILE__, __LINE__);
221 
222         return to!string(fromStringz(res));
223     }
224 
225     string escapeLiteral(string msg)
226     {
227         assert(conn);
228 
229         auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length);
230 
231         if(buf is null)
232             throw new ConnectionException(this, __FILE__, __LINE__);
233 
234         string res = buf.fromStringz.to!string;
235 
236         PQfreemem(buf);
237 
238         return res;
239     }
240 
241     string escapeIdentifier(string msg)
242     {
243         assert(conn);
244 
245         auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length);
246 
247         if(buf is null)
248             throw new ConnectionException(this, __FILE__, __LINE__);
249 
250         string res = buf.fromStringz.to!string;
251 
252         PQfreemem(buf);
253 
254         return res;
255     }
256 
257     string dbName() const nothrow
258     {
259         assert(conn);
260 
261         return PQdb(conn).fromStringz.to!string;
262     }
263 
264     string host() const nothrow
265     {
266         assert(conn);
267 
268         return PQhost(conn).fromStringz.to!string;
269     }
270 
271     int protocolVersion() const nothrow
272     {
273         assert(conn);
274 
275         return PQprotocolVersion(conn);
276     }
277 
278     int serverVersion() const nothrow
279     {
280         assert(conn);
281 
282         return PQserverVersion(conn);
283     }
284 
285     void trace(ref File stream)
286     {
287         PQtrace(conn, stream.getFP);
288     }
289 
290     void untrace()
291     {
292         PQuntrace(conn);
293     }
294 
295     void setClientEncoding(string encoding)
296     {
297         if(PQsetClientEncoding(conn, encoding.toStringz) != 0)
298             throw new ConnectionException(this, __FILE__, __LINE__);
299     }
300 }
301 
302 void connStringCheck(string connString)
303 {
304     char* errmsg = null;
305     PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg);
306 
307     if(r is null)
308     {
309         enforceEx!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data");
310     }
311     else
312     {
313         PQconninfoFree(r);
314     }
315 
316     if(errmsg !is null)
317     {
318         string s = errmsg.fromStringz.to!string;
319         PQfreemem(cast(void*) errmsg);
320 
321         throw new ConnectionException(s, __FILE__, __LINE__);
322     }
323 }
324 
325 unittest
326 {
327     connStringCheck("dbname=postgres user=postgres");
328 
329     {
330         bool raised = false;
331 
332         try
333             connStringCheck("wrong conninfo string");
334         catch(ConnectionException e)
335             raised = true;
336 
337         assert(raised);
338     }
339 }
340 
341 /// Doing canceling queries in progress
342 class Cancellation
343 {
344     private PGcancel* cancel;
345 
346     this(Connection c)
347     {
348         cancel = PQgetCancel(c.conn);
349 
350         if(cancel is null)
351             throw new ConnectionException(c, __FILE__, __LINE__);
352     }
353 
354     ~this()
355     {
356         PQfreeCancel(cancel);
357     }
358 
359     /**
360      Successful dispatch is no guarantee that the request will have any
361      effect, however. If the cancellation is effective, the current
362      command will terminate early and return an error result
363      (exception). If the cancellation fails (say, because the server
364      was already done processing the command), then there will be no
365      visible result at all.
366     */
367     void doCancel()
368     {
369         char[256] errbuf;
370         auto res = PQcancel(cancel, errbuf.ptr, errbuf.length);
371 
372         if(res != 1)
373             throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__);
374     }
375 }
376 
377 class CancellationException : Dpq2Exception
378 {
379     this(string msg, string file, size_t line)
380     {
381         super(msg, file, line);
382     }
383 }
384 
385 /// Connection exception
386 class ConnectionException : Dpq2Exception
387 {
388     this(in Connection c, string file, size_t line)
389     {
390         super(c.errorMessage(), file, line);
391     }
392 
393     this(string msg, string file, size_t line)
394     {
395         super(msg, file, line);
396     }
397 }
398 
399 void _integration_test( string connParam )
400 {
401     assert( PQlibVersion() >= 9_0100 );
402 
403     {
404         debug import std.experimental.logger;
405 
406         auto c = new Connection(connParam);
407         auto dbname = c.dbName();
408         auto pver = c.protocolVersion();
409         auto sver = c.serverVersion();
410 
411         debug
412         {
413             trace("DB name: ", dbname);
414             trace("Protocol version: ", pver);
415             trace("Server version: ", sver);
416         }
417 
418         destroy(c);
419     }
420 
421     {
422         bool exceptionFlag = false;
423 
424         try
425             auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!");
426         catch(ConnectionException e)
427         {
428             exceptionFlag = true;
429             assert(e.msg.length > 40); // error message check
430         }
431         finally
432             assert(exceptionFlag);
433     }
434 
435     {
436         auto c = new Connection(connParam);
437 
438         assert(c.escapeLiteral("abc'def") == "'abc''def'");
439         assert(c.escapeIdentifier("abc'def") == "\"abc'def\"");
440 
441         c.setClientEncoding("WIN866");
442         assert(c.exec("show client_encoding")[0][0].as!string == "WIN866");
443     }
444 }