1 module dpq2.connection;
2 
3 import dpq2;
4 
5 import std.conv: to;
6 import std..string: toStringz, fromStringz;
7 import std.exception: enforceEx;
8 import std.range;
9 import std.stdio: File;
10 import std.socket;
11 import core.exception;
12 
13 /*
14  * Bugs: On Unix connection is not thread safe.
15  * 
16  * On Unix, forking a process with open libpq connections can lead
17  * to unpredictable results because the parent and child processes share
18  * the same sockets and operating system resources. For this reason,
19  * such usage is not recommended, though doing an exec from the child
20  * process to load a new executable is safe.
21 
22 
23 
24 int PQisthreadsafe();
25 Returns 1 if the libpq is thread-safe and 0 if it is not.
26 */
27 
28 /// dumb flag for Connection ctor parametrization
29 struct ConnectionStart {};
30 
31 /// Connection
32 class Connection
33 {
34     //string connString; /// Database connection parameters
35     package PGconn* conn;
36 
37     invariant
38     {
39         assert(conn !is null);
40     }
41 
42     this(string connString)
43     {
44         conn = PQconnectdb(toStringz(connString));
45 
46         enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
47 
48         if(status != CONNECTION_OK)
49             throw new ConnectionException(this, __FILE__, __LINE__);
50     }
51 
52 	/// Connect to DB in a nonblocking manner
53     this(ConnectionStart, string connString)
54     {
55         conn = PQconnectStart(cast(char*) toStringz(connString)); // TODO: wrong DerelictPQ args
56 
57         enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
58 
59         if( status == CONNECTION_BAD )
60             throw new ConnectionException(this, __FILE__, __LINE__);
61     }
62 
63     ~this()
64     {
65         PQfinish( conn );
66     }
67 
68     mixin Queries;
69 
70     @property bool isNonBlocking()
71     {
72         return PQisnonblocking(conn) == 1;
73     }
74 
75     private void setNonBlocking( bool state )
76     {
77         if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 )
78             throw new ConnectionException(this, __FILE__, __LINE__);
79     }
80 
81     void resetStart()
82     {
83         if(PQresetStart(conn) == 0)
84             throw new ConnectionException(this, __FILE__, __LINE__);
85     }
86 
87     PostgresPollingStatusType poll() nothrow
88     {
89         assert(conn);
90 
91         return PQconnectPoll(conn);
92     }
93 
94     PostgresPollingStatusType resetPoll() nothrow
95     {
96         assert(conn);
97 
98         return PQresetPoll(conn);
99     }
100 
101     ConnStatusType status() nothrow
102     {
103         return PQstatus(conn);
104     }
105 
106     void consumeInput()
107     {
108         assert(conn);
109 
110         const size_t r = PQconsumeInput( conn );
111         if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__);
112     }
113     
114     package bool flush()
115     {
116         assert(conn);
117 
118         auto r = PQflush(conn);
119         if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__);
120         return r == 0;
121     }
122 
123     int posixSocket()
124     {
125         int r = PQsocket(conn);
126 
127         if(r == -1)
128             throw new ConnectionException(this, __FILE__, __LINE__);
129 
130         return r;
131     }
132 
133     Socket socket()
134     {
135         import core.sys.posix.unistd: dup;
136 
137         socket_t s = cast(socket_t) dup(cast(socket_t) posixSocket);
138         return new Socket(s, AddressFamily.UNSPEC);
139     }
140 
141     string errorMessage() const nothrow
142     {
143         return to!string(PQerrorMessage(cast(PGconn*) conn)); //TODO: need report to derelict pq
144     }
145 
146     /**
147      * returns the previous notice receiver or processor function pointer, and sets the new value.
148      * If you supply a null function pointer, no action is taken, but the current pointer is returned.
149      */
150     PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow
151     {
152         assert(conn);
153 
154         return PQsetNoticeProcessor(conn, proc, arg);
155     }
156 
157     /// Get for the next result from a sendQuery. Can return null.
158     immutable(Result) getResult()
159     {
160         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
161         auto r = cast(immutable) PQgetResult(conn);
162 
163         if(r)
164         {
165             auto container = new immutable ResultContainer(r);
166             return new immutable Result(container);
167         }
168 
169         return null;
170     }
171 
172     /// Get result from PQexec* functions or throw error if pull is empty
173     package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const
174     {
175         if(r is null) throw new ConnectionException(this, __FILE__, __LINE__);
176 
177         return new immutable ResultContainer(r);
178     }
179 
180     bool setSingleRowMode()
181     {
182         return PQsetSingleRowMode(conn) == 1;
183     }
184 
185     /**
186      If the cancellation is effective, the current command will
187      terminate early and return an error result (exception). If the
188      cancellation fails (say, because the server was already done
189      processing the command), then there will be no visible result at
190      all.
191     */
192     void cancel()
193     {
194         auto c = new Cancellation(this);
195         c.doCancel;
196     }
197 
198     bool isBusy() nothrow
199     {
200         assert(conn);
201 
202         return PQisBusy(conn) == 1;
203     }
204 
205     string parameterStatus(string paramName)
206     {
207         assert(conn);
208 
209         auto res = PQparameterStatus(conn, cast(char*) toStringz(paramName)); //TODO: need report to derelict pq
210 
211         if(res is null)
212             throw new ConnectionException(this, __FILE__, __LINE__);
213 
214         return to!string(fromStringz(res));
215     }
216 
217     string escapeLiteral(string msg)
218     {
219         assert(conn);
220 
221         auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length);
222 
223         if(buf is null)
224             throw new ConnectionException(this, __FILE__, __LINE__);
225 
226         string res = buf.fromStringz.to!string;
227 
228         PQfreemem(buf);
229 
230         return res;
231     }
232 
233     string escapeIdentifier(string msg)
234     {
235         assert(conn);
236 
237         auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length);
238 
239         if(buf is null)
240             throw new ConnectionException(this, __FILE__, __LINE__);
241 
242         string res = buf.fromStringz.to!string;
243 
244         PQfreemem(buf);
245 
246         return res;
247     }
248 
249     string host() const nothrow
250     {
251         assert(conn);
252 
253         return to!string(PQhost(cast(PGconn*) conn).fromStringz); //TODO: need report to derelict pq
254     }
255 
256     void trace(ref File stream)
257     {
258         PQtrace(conn, stream.getFP);
259     }
260 
261     void untrace()
262     {
263         PQuntrace(conn);
264     }
265 
266     void setClientEncoding(string encoding)
267     {
268         if(PQsetClientEncoding(conn, cast(char*) encoding.toStringz) != 0) //TODO: need report to derelict pq
269             throw new ConnectionException(this, __FILE__, __LINE__);
270     }
271 }
272 
273 void connStringCheck(string connString)
274 {
275     char* errmsg = null;
276     PQconninfoOption* r = PQconninfoParse(cast(char*) connString.toStringz, &errmsg); //TODO: need report to derelict pq
277 
278     if(r is null)
279     {
280         enforceEx!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data");
281     }
282     else
283     {
284         PQconninfoFree(r);
285     }
286 
287     if(errmsg !is null)
288     {
289         string s = errmsg.fromStringz.to!string;
290         PQfreemem(cast(void*) errmsg);
291 
292         throw new ConnectionException(s, __FILE__, __LINE__);
293     }
294 }
295 
296 unittest
297 {
298     connStringCheck("dbname=postgres user=postgres");
299 
300     {
301         bool raised = false;
302 
303         try
304             connStringCheck("wrong conninfo string");
305         catch(ConnectionException e)
306             raised = true;
307 
308         assert(raised);
309     }
310 }
311 
312 /// Doing canceling queries in progress
313 class Cancellation
314 {
315     private PGcancel* cancel;
316 
317     this(Connection c)
318     {
319         cancel = PQgetCancel(c.conn);
320 
321         if(cancel is null)
322             throw new ConnectionException(c, __FILE__, __LINE__);
323     }
324 
325     ~this()
326     {
327         PQfreeCancel(cancel);
328     }
329 
330     /**
331      Successful dispatch is no guarantee that the request will have any
332      effect, however. If the cancellation is effective, the current
333      command will terminate early and return an error result
334      (exception). If the cancellation fails (say, because the server
335      was already done processing the command), then there will be no
336      visible result at all.
337     */
338     void doCancel()
339     {
340         char[256] errbuf;
341         auto res = PQcancel(cancel, errbuf.ptr, errbuf.length);
342 
343         if(res != 1)
344             throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__);
345     }
346 }
347 
348 class CancellationException : Dpq2Exception
349 {
350     this(string msg, string file, size_t line)
351     {
352         super(msg, file, line);
353     }
354 }
355 
356 /// Connection exception
357 class ConnectionException : Dpq2Exception
358 {
359     this(in Connection c, string file, size_t line)
360     {
361         super(c.errorMessage(), file, line);
362     }
363 
364     this(string msg, string file, size_t line)
365     {
366         super(msg, file, line);
367     }
368 }
369 
370 void _integration_test( string connParam )
371 {
372     assert( PQlibVersion() >= 9_0100 );
373 
374     {
375         auto c = new Connection(connParam);
376 
377         destroy(c);
378     }
379 
380     {
381         bool exceptionFlag = false;
382 
383         try
384             auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!");
385         catch(ConnectionException e)
386         {
387             exceptionFlag = true;
388             assert(e.msg.length > 40); // error message check
389         }
390         finally
391             assert(exceptionFlag);
392     }
393 
394     {
395         auto c = new Connection(connParam);
396 
397         assert(c.escapeLiteral("abc'def") == "'abc''def'");
398         assert(c.escapeIdentifier("abc'def") == "\"abc'def\"");
399 
400         c.setClientEncoding("WIN866");
401         assert(c.exec("show client_encoding")[0][0].as!string == "WIN866");
402     }
403 }