1 module dpq2.connection;
2 
3 import dpq2;
4 import dpq2.exception;
5 
6 import std.conv: to;
7 import std..string: toStringz, fromStringz;
8 import std.exception: enforce, enforceEx;
9 import std.range;
10 import std.stdio: File;
11 import std.socket;
12 import core.exception;
13 import core.time: Duration;
14 
15 /*
16  * Bugs: On Unix connection is not thread safe.
17  * 
18  * On Unix, forking a process with open libpq connections can lead
19  * to unpredictable results because the parent and child processes share
20  * the same sockets and operating system resources. For this reason,
21  * such usage is not recommended, though doing an exec from the child
22  * process to load a new executable is safe.
23 
24 
25 
26 int PQisthreadsafe();
27 Returns 1 if the libpq is thread-safe and 0 if it is not.
28 */
29 
30 /// dumb flag for Connection ctor parametrization
31 struct ConnectionStart {};
32 
33 /// Connection
34 class Connection
35 {
36     //string connString; /// Database connection parameters
37     package PGconn* conn;
38 
39     invariant
40     {
41         assert(conn !is null);
42     }
43 
44     this(string connString)
45     {
46         conn = PQconnectdb(toStringz(connString));
47 
48         enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
49 
50         if(status != CONNECTION_OK)
51             throw new ConnectionException(this, __FILE__, __LINE__);
52     }
53 
54 	/// Connect to DB in a nonblocking manner
55     this(ConnectionStart, string connString)
56     {
57         conn = PQconnectStart(toStringz(connString));
58 
59         enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
60 
61         if( status == CONNECTION_BAD )
62             throw new ConnectionException(this, __FILE__, __LINE__);
63     }
64 
65     ~this()
66     {
67         PQfinish( conn );
68     }
69 
70     mixin Queries;
71 
72     @property bool isNonBlocking()
73     {
74         return PQisnonblocking(conn) == 1;
75     }
76 
77     private void setNonBlocking( bool state )
78     {
79         if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 )
80             throw new ConnectionException(this, __FILE__, __LINE__);
81     }
82 
83     void resetStart()
84     {
85         if(PQresetStart(conn) == 0)
86             throw new ConnectionException(this, __FILE__, __LINE__);
87     }
88 
89     PostgresPollingStatusType poll() nothrow
90     {
91         assert(conn);
92 
93         return PQconnectPoll(conn);
94     }
95 
96     PostgresPollingStatusType resetPoll() nothrow
97     {
98         assert(conn);
99 
100         return PQresetPoll(conn);
101     }
102 
103     ConnStatusType status() nothrow
104     {
105         return PQstatus(conn);
106     }
107 
108     void consumeInput()
109     {
110         assert(conn);
111 
112         const size_t r = PQconsumeInput( conn );
113         if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__);
114     }
115     
116     package bool flush()
117     {
118         assert(conn);
119 
120         auto r = PQflush(conn);
121         if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__);
122         return r == 0;
123     }
124 
125     int posixSocket()
126     {
127         int r = PQsocket(conn);
128 
129         if(r == -1)
130             throw new ConnectionException(this, __FILE__, __LINE__);
131 
132         return r;
133     }
134 
135     Socket socket()
136     {
137         version(Posix)
138         {
139             import core.sys.posix.unistd: dup;
140 
141             socket_t s = cast(socket_t) dup(cast(socket_t) posixSocket);
142             return new Socket(s, AddressFamily.UNSPEC);
143         }
144         else version(Windows)
145         {
146             assert(false, "FIXME: implement socket duplication");
147         }
148     }
149 
150     string errorMessage() const nothrow
151     {
152         return PQerrorMessage(conn).to!string;
153     }
154 
155     /**
156      * returns the previous notice receiver or processor function pointer, and sets the new value.
157      * If you supply a null function pointer, no action is taken, but the current pointer is returned.
158      */
159     PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow
160     {
161         assert(conn);
162 
163         return PQsetNoticeProcessor(conn, proc, arg);
164     }
165 
166     /// Get for the next result from a sendQuery. Can return null.
167     immutable(Result) getResult()
168     {
169         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
170         auto r = cast(immutable) PQgetResult(conn);
171 
172         if(r)
173         {
174             auto container = new immutable ResultContainer(r);
175             return new immutable Result(container);
176         }
177 
178         return null;
179     }
180 
181     /// Get result from PQexec* functions or throw error if pull is empty
182     package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const
183     {
184         if(r is null) throw new ConnectionException(this, __FILE__, __LINE__);
185 
186         return new immutable ResultContainer(r);
187     }
188 
189     bool setSingleRowMode()
190     {
191         return PQsetSingleRowMode(conn) == 1;
192     }
193 
194     /**
195      If the cancellation is effective, the current command will
196      terminate early and return an error result (exception). If the
197      cancellation fails (say, because the server was already done
198      processing the command), then there will be no visible result at
199      all.
200     */
201     void cancel()
202     {
203         auto c = new Cancellation(this);
204         c.doCancel;
205     }
206 
207     bool isBusy() nothrow
208     {
209         assert(conn);
210 
211         return PQisBusy(conn) == 1;
212     }
213 
214     string parameterStatus(string paramName)
215     {
216         assert(conn);
217 
218         auto res = PQparameterStatus(conn, toStringz(paramName));
219 
220         if(res is null)
221             throw new ConnectionException(this, __FILE__, __LINE__);
222 
223         return to!string(fromStringz(res));
224     }
225 
226     string escapeLiteral(string msg)
227     {
228         assert(conn);
229 
230         auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length);
231 
232         if(buf is null)
233             throw new ConnectionException(this, __FILE__, __LINE__);
234 
235         string res = buf.fromStringz.to!string;
236 
237         PQfreemem(buf);
238 
239         return res;
240     }
241 
242     string escapeIdentifier(string msg)
243     {
244         assert(conn);
245 
246         auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length);
247 
248         if(buf is null)
249             throw new ConnectionException(this, __FILE__, __LINE__);
250 
251         string res = buf.fromStringz.to!string;
252 
253         PQfreemem(buf);
254 
255         return res;
256     }
257 
258     string host() const nothrow
259     {
260         assert(conn);
261 
262         return PQhost(conn).fromStringz.to!string;
263     }
264 
265     void trace(ref File stream)
266     {
267         PQtrace(conn, stream.getFP);
268     }
269 
270     void untrace()
271     {
272         PQuntrace(conn);
273     }
274 
275     void setClientEncoding(string encoding)
276     {
277         if(PQsetClientEncoding(conn, encoding.toStringz) != 0)
278             throw new ConnectionException(this, __FILE__, __LINE__);
279     }
280 }
281 
282 void connStringCheck(string connString)
283 {
284     char* errmsg = null;
285     PQconninfoOption* r = PQconninfoParse(connString.toStringz, &errmsg);
286 
287     if(r is null)
288     {
289         enforceEx!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data");
290     }
291     else
292     {
293         PQconninfoFree(r);
294     }
295 
296     if(errmsg !is null)
297     {
298         string s = errmsg.fromStringz.to!string;
299         PQfreemem(cast(void*) errmsg);
300 
301         throw new ConnectionException(s, __FILE__, __LINE__);
302     }
303 }
304 
305 unittest
306 {
307     connStringCheck("dbname=postgres user=postgres");
308 
309     {
310         bool raised = false;
311 
312         try
313             connStringCheck("wrong conninfo string");
314         catch(ConnectionException e)
315             raised = true;
316 
317         assert(raised);
318     }
319 }
320 
321 /// Doing canceling queries in progress
322 class Cancellation
323 {
324     private PGcancel* cancel;
325 
326     this(Connection c)
327     {
328         cancel = PQgetCancel(c.conn);
329 
330         if(cancel is null)
331             throw new ConnectionException(c, __FILE__, __LINE__);
332     }
333 
334     ~this()
335     {
336         PQfreeCancel(cancel);
337     }
338 
339     /**
340      Successful dispatch is no guarantee that the request will have any
341      effect, however. If the cancellation is effective, the current
342      command will terminate early and return an error result
343      (exception). If the cancellation fails (say, because the server
344      was already done processing the command), then there will be no
345      visible result at all.
346     */
347     void doCancel()
348     {
349         char[256] errbuf;
350         auto res = PQcancel(cancel, errbuf.ptr, errbuf.length);
351 
352         if(res != 1)
353             throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__);
354     }
355 }
356 
357 class CancellationException : Dpq2Exception
358 {
359     this(string msg, string file, size_t line)
360     {
361         super(msg, file, line);
362     }
363 }
364 
365 /// Connection exception
366 class ConnectionException : Dpq2Exception
367 {
368     this(in Connection c, string file, size_t line)
369     {
370         super(c.errorMessage(), file, line);
371     }
372 
373     this(string msg, string file, size_t line)
374     {
375         super(msg, file, line);
376     }
377 }
378 
379 void _integration_test( string connParam )
380 {
381     assert( PQlibVersion() >= 9_0100 );
382 
383     {
384         auto c = new Connection(connParam);
385 
386         destroy(c);
387     }
388 
389     {
390         bool exceptionFlag = false;
391 
392         try
393             auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!");
394         catch(ConnectionException e)
395         {
396             exceptionFlag = true;
397             assert(e.msg.length > 40); // error message check
398         }
399         finally
400             assert(exceptionFlag);
401     }
402 
403     {
404         auto c = new Connection(connParam);
405 
406         assert(c.escapeLiteral("abc'def") == "'abc''def'");
407         assert(c.escapeIdentifier("abc'def") == "\"abc'def\"");
408 
409         c.setClientEncoding("WIN866");
410         assert(c.exec("show client_encoding")[0][0].as!string == "WIN866");
411     }
412 }