1 module dpq2.connection;
2 
3 import dpq2;
4 
5 import std.conv: to;
6 import std.string: toStringz, fromStringz;
7 import std.exception: enforce, enforceEx;
8 import std.range;
9 import std.stdio: File;
10 import std.socket;
11 import core.exception;
12 import core.time: Duration;
13 
14 /*
15  * Bugs: On Unix connection is not thread safe.
16  * 
17  * On Unix, forking a process with open libpq connections can lead
18  * to unpredictable results because the parent and child processes share
19  * the same sockets and operating system resources. For this reason,
20  * such usage is not recommended, though doing an exec from the child
21  * process to load a new executable is safe.
22 
23 
24 
25 int PQisthreadsafe();
26 Returns 1 if the libpq is thread-safe and 0 if it is not.
27 */
28 
29 /// dumb flag for Connection ctor parametrization
30 struct ConnectionStart {};
31 
32 /// Connection
33 class Connection
34 {
35     //string connString; /// Database connection parameters
36     package PGconn* conn;
37 
38     invariant
39     {
40         assert(conn !is null);
41     }
42 
43     this(string connString)
44     {
45         conn = PQconnectdb(toStringz(connString));
46 
47         enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
48 
49         if(status != CONNECTION_OK)
50             throw new ConnectionException(this, __FILE__, __LINE__);
51     }
52 
53 	/// Connect to DB in a nonblocking manner
54     this(ConnectionStart, string connString)
55     {
56         conn = PQconnectStart(toStringz(connString));
57 
58         enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
59 
60         if( status == CONNECTION_BAD )
61             throw new ConnectionException(this, __FILE__, __LINE__);
62     }
63 
64     ~this()
65     {
66         PQfinish( conn );
67     }
68 
69     mixin Queries;
70 
71     @property bool isNonBlocking()
72     {
73         return PQisnonblocking(conn) == 1;
74     }
75 
76     private void setNonBlocking( bool state )
77     {
78         if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 )
79             throw new ConnectionException(this, __FILE__, __LINE__);
80     }
81 
82     void resetStart()
83     {
84         if(PQresetStart(conn) == 0)
85             throw new ConnectionException(this, __FILE__, __LINE__);
86     }
87 
88     PostgresPollingStatusType poll() nothrow
89     {
90         assert(conn);
91 
92         return PQconnectPoll(conn);
93     }
94 
95     PostgresPollingStatusType resetPoll() nothrow
96     {
97         assert(conn);
98 
99         return PQresetPoll(conn);
100     }
101 
102     ConnStatusType status() nothrow
103     {
104         return PQstatus(conn);
105     }
106 
107     void consumeInput()
108     {
109         assert(conn);
110 
111         const size_t r = PQconsumeInput( conn );
112         if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__);
113     }
114     
115     package bool flush()
116     {
117         assert(conn);
118 
119         auto r = PQflush(conn);
120         if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__);
121         return r == 0;
122     }
123 
124     int posixSocket()
125     {
126         int r = PQsocket(conn);
127 
128         if(r == -1)
129             throw new ConnectionException(this, __FILE__, __LINE__);
130 
131         return r;
132     }
133 
134     Socket socket()
135     {
136         version(Posix)
137         {
138             import core.sys.posix.unistd: dup;
139 
140             socket_t s = cast(socket_t) dup(cast(socket_t) posixSocket);
141             return new Socket(s, AddressFamily.UNSPEC);
142         }
143         else version(Windows)
144         {
145             assert(false, "FIXME: implement socket duplication");
146         }
147     }
148 
149     string errorMessage() const nothrow
150     {
151         return PQerrorMessage(conn).to!string;
152     }
153 
154     /**
155      * returns the previous notice receiver or processor function pointer, and sets the new value.
156      * If you supply a null function pointer, no action is taken, but the current pointer is returned.
157      */
158     PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow
159     {
160         assert(conn);
161 
162         return PQsetNoticeProcessor(conn, proc, arg);
163     }
164 
165     /// Get for the next result from a sendQuery. Can return null.
166     immutable(Result) getResult()
167     {
168         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
169         auto r = cast(immutable) PQgetResult(conn);
170 
171         if(r)
172         {
173             auto container = new immutable ResultContainer(r);
174             return new immutable Result(container);
175         }
176 
177         return null;
178     }
179 
180     /// Get result from PQexec* functions or throw error if pull is empty
181     package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const
182     {
183         if(r is null) throw new ConnectionException(this, __FILE__, __LINE__);
184 
185         return new immutable ResultContainer(r);
186     }
187 
188     bool setSingleRowMode()
189     {
190         return PQsetSingleRowMode(conn) == 1;
191     }
192 
193     /**
194      If the cancellation is effective, the current command will
195      terminate early and return an error result (exception). If the
196      cancellation fails (say, because the server was already done
197      processing the command), then there will be no visible result at
198      all.
199     */
200     void cancel()
201     {
202         auto c = new Cancellation(this);
203         c.doCancel;
204     }
205 
206     bool isBusy() nothrow
207     {
208         assert(conn);
209 
210         return PQisBusy(conn) == 1;
211     }
212 
213     string parameterStatus(string paramName)
214     {
215         assert(conn);
216 
217         auto res = PQparameterStatus(conn, toStringz(paramName));
218 
219         if(res is null)
220             throw new ConnectionException(this, __FILE__, __LINE__);
221 
222         return to!string(fromStringz(res));
223     }
224 
225     string escapeLiteral(string msg)
226     {
227         assert(conn);
228 
229         auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length);
230 
231         if(buf is null)
232             throw new ConnectionException(this, __FILE__, __LINE__);
233 
234         string res = buf.fromStringz.to!string;
235 
236         PQfreemem(buf);
237 
238         return res;
239     }
240 
241     string escapeIdentifier(string msg)
242     {
243         assert(conn);
244 
245         auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length);
246 
247         if(buf is null)
248             throw new ConnectionException(this, __FILE__, __LINE__);
249 
250         string res = buf.fromStringz.to!string;
251 
252         PQfreemem(buf);
253 
254         return res;
255     }
256 
257     string host() const nothrow
258     {
259         assert(conn);
260 
261         return PQhost(conn).fromStringz.to!string;
262     }
263 
264     void trace(ref File stream)
265     {
266         PQtrace(conn, stream.getFP);
267     }
268 
269     void untrace()
270     {
271         PQuntrace(conn);
272     }
273 
274     void setClientEncoding(string encoding)
275     {
276         if(PQsetClientEncoding(conn, encoding.toStringz) != 0)
277             throw new ConnectionException(this, __FILE__, __LINE__);
278     }
279 }
280 
281 void connStringCheck(string connString)
282 {
283     char* errmsg = null;
284     PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg);
285 
286     if(r is null)
287     {
288         enforceEx!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data");
289     }
290     else
291     {
292         PQconninfoFree(r);
293     }
294 
295     if(errmsg !is null)
296     {
297         string s = errmsg.fromStringz.to!string;
298         PQfreemem(cast(void*) errmsg);
299 
300         throw new ConnectionException(s, __FILE__, __LINE__);
301     }
302 }
303 
304 unittest
305 {
306     connStringCheck("dbname=postgres user=postgres");
307 
308     {
309         bool raised = false;
310 
311         try
312             connStringCheck("wrong conninfo string");
313         catch(ConnectionException e)
314             raised = true;
315 
316         assert(raised);
317     }
318 }
319 
320 /// Doing canceling queries in progress
321 class Cancellation
322 {
323     private PGcancel* cancel;
324 
325     this(Connection c)
326     {
327         cancel = PQgetCancel(c.conn);
328 
329         if(cancel is null)
330             throw new ConnectionException(c, __FILE__, __LINE__);
331     }
332 
333     ~this()
334     {
335         PQfreeCancel(cancel);
336     }
337 
338     /**
339      Successful dispatch is no guarantee that the request will have any
340      effect, however. If the cancellation is effective, the current
341      command will terminate early and return an error result
342      (exception). If the cancellation fails (say, because the server
343      was already done processing the command), then there will be no
344      visible result at all.
345     */
346     void doCancel()
347     {
348         char[256] errbuf;
349         auto res = PQcancel(cancel, errbuf.ptr, errbuf.length);
350 
351         if(res != 1)
352             throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__);
353     }
354 }
355 
356 class CancellationException : Dpq2Exception
357 {
358     this(string msg, string file, size_t line)
359     {
360         super(msg, file, line);
361     }
362 }
363 
364 /// Connection exception
365 class ConnectionException : Dpq2Exception
366 {
367     this(in Connection c, string file, size_t line)
368     {
369         super(c.errorMessage(), file, line);
370     }
371 
372     this(string msg, string file, size_t line)
373     {
374         super(msg, file, line);
375     }
376 }
377 
378 void _integration_test( string connParam )
379 {
380     assert( PQlibVersion() >= 9_0100 );
381 
382     {
383         auto c = new Connection(connParam);
384 
385         destroy(c);
386     }
387 
388     {
389         bool exceptionFlag = false;
390 
391         try
392             auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!");
393         catch(ConnectionException e)
394         {
395             exceptionFlag = true;
396             assert(e.msg.length > 40); // error message check
397         }
398         finally
399             assert(exceptionFlag);
400     }
401 
402     {
403         auto c = new Connection(connParam);
404 
405         assert(c.escapeLiteral("abc'def") == "'abc''def'");
406         assert(c.escapeIdentifier("abc'def") == "\"abc'def\"");
407 
408         c.setClientEncoding("WIN866");
409         assert(c.exec("show client_encoding")[0][0].as!string == "WIN866");
410     }
411 }