1 module dpq2.connection;
2 
3 @trusted:
4 
5 import dpq2;
6 
7 import std.conv: to;
8 import std..string: toStringz, fromStringz;
9 import std.exception: enforceEx;
10 import std.range;
11 import std.stdio: File;
12 import std.socket;
13 import core.exception;
14 
15 /*
16  * Bugs: On Unix connection is not thread safe.
17  * 
18  * On Unix, forking a process with open libpq connections can lead
19  * to unpredictable results because the parent and child processes share
20  * the same sockets and operating system resources. For this reason,
21  * such usage is not recommended, though doing an exec from the child
22  * process to load a new executable is safe.
23 
24 
25 
26 int PQisthreadsafe();
27 Returns 1 if the libpq is thread-safe and 0 if it is not.
28 */
29 
30 /// dumb flag for Connection ctor parametrization
31 struct ConnectionStart {};
32 
33 /// Connection
34 class Connection
35 {
36     //string connString; /// Database connection parameters
37     package PGconn* conn;
38 
39     invariant
40     {
41         assert(conn !is null);
42     }
43 
44     this(string connString)
45     {
46         conn = PQconnectdb(toStringz(connString));
47 
48         enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
49 
50         if(status != CONNECTION_OK)
51             throw new ConnectionException(this, __FILE__, __LINE__);
52     }
53 
54 	/// Connect to DB in a nonblocking manner
55     this(ConnectionStart, string connString)
56     {
57         conn = PQconnectStart(cast(char*) toStringz(connString)); // TODO: wrong DerelictPQ args
58 
59         enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
60 
61         if( status == CONNECTION_BAD )
62             throw new ConnectionException(this, __FILE__, __LINE__);
63     }
64 
65     ~this()
66     {
67         PQfinish( conn );
68     }
69 
70     mixin Queries;
71 
72     @property bool isNonBlocking()
73     {
74         return PQisnonblocking(conn) == 1;
75     }
76 
77     private void setNonBlocking( bool state )
78     {
79         if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 )
80             throw new ConnectionException(this, __FILE__, __LINE__);
81     }
82 
83     void resetStart()
84     {
85         if(PQresetStart(conn) == 0)
86             throw new ConnectionException(this, __FILE__, __LINE__);
87     }
88 
89     PostgresPollingStatusType poll() nothrow
90     {
91         assert(conn);
92 
93         return PQconnectPoll(conn);
94     }
95 
96     PostgresPollingStatusType resetPoll() nothrow
97     {
98         assert(conn);
99 
100         return PQresetPoll(conn);
101     }
102 
103     ConnStatusType status() nothrow
104     {
105         return PQstatus(conn);
106     }
107 
108     void consumeInput()
109     {
110         assert(conn);
111 
112         const size_t r = PQconsumeInput( conn );
113         if( r != 1 ) throw new ConnectionException(this, __FILE__, __LINE__);
114     }
115     
116     package bool flush()
117     {
118         assert(conn);
119 
120         auto r = PQflush(conn);
121         if( r == -1 ) throw new ConnectionException(this, __FILE__, __LINE__);
122         return r == 0;
123     }
124 
125     int posixSocket()
126     {
127         int r = PQsocket(conn);
128 
129         if(r == -1)
130             throw new ConnectionException(this, __FILE__, __LINE__);
131 
132         return r;
133     }
134 
135     Socket socket()
136     {
137         import core.sys.posix.unistd: dup;
138 
139         socket_t s = cast(socket_t) dup(cast(socket_t) posixSocket);
140         return new Socket(s, AddressFamily.UNSPEC);
141     }
142 
143     string errorMessage() const nothrow
144     {
145         return to!string(PQerrorMessage(cast(PGconn*) conn)); //TODO: need report to derelict pq
146     }
147 
148     /**
149      * returns the previous notice receiver or processor function pointer, and sets the new value.
150      * If you supply a null function pointer, no action is taken, but the current pointer is returned.
151      */
152     PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow
153     {
154         assert(conn);
155 
156         return PQsetNoticeProcessor(conn, proc, arg);
157     }
158 
159     /// Get for the next result from a sendQuery. Can return null.
160     immutable(Result) getResult()
161     {
162         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
163         auto r = cast(immutable) PQgetResult(conn);
164 
165         if(r)
166         {
167             auto container = new immutable ResultContainer(r);
168             return new immutable Result(container);
169         }
170 
171         return null;
172     }
173 
174     /// Get result from PQexec* functions or throw error if pull is empty
175     package immutable(ResultContainer) createResultContainer(immutable PGresult* r) const
176     {
177         if(r is null) throw new ConnectionException(this, __FILE__, __LINE__);
178 
179         return new immutable ResultContainer(r);
180     }
181 
182     bool setSingleRowMode()
183     {
184         return PQsetSingleRowMode(conn) == 1;
185     }
186 
187     void cancel()
188     {
189         auto c = new Cancellation(this);
190         c.doCancel;
191     }
192 
193     bool isBusy() nothrow
194     {
195         assert(conn);
196 
197         return PQisBusy(conn) == 1;
198     }
199 
200     string parameterStatus(string paramName)
201     {
202         assert(conn);
203 
204         auto res = PQparameterStatus(conn, cast(char*) toStringz(paramName)); //TODO: need report to derelict pq
205 
206         if(res is null)
207             throw new ConnectionException(this, __FILE__, __LINE__);
208 
209         return to!string(fromStringz(res));
210     }
211 
212     string escapeLiteral(string msg)
213     {
214         assert(conn);
215 
216         auto buf = PQescapeLiteral(conn, msg.toStringz, msg.length);
217 
218         if(buf is null)
219             throw new ConnectionException(this, __FILE__, __LINE__);
220 
221         string res = buf.fromStringz.to!string;
222 
223         PQfreemem(buf);
224 
225         return res;
226     }
227 
228     string escapeIdentifier(string msg)
229     {
230         assert(conn);
231 
232         auto buf = PQescapeIdentifier(conn, msg.toStringz, msg.length);
233 
234         if(buf is null)
235             throw new ConnectionException(this, __FILE__, __LINE__);
236 
237         string res = buf.fromStringz.to!string;
238 
239         PQfreemem(buf);
240 
241         return res;
242     }
243 
244     string host() const nothrow
245     {
246         assert(conn);
247 
248         return to!string(PQhost(cast(PGconn*) conn).fromStringz); //TODO: need report to derelict pq
249     }
250 
251     void trace(ref File stream)
252     {
253         PQtrace(conn, stream.getFP);
254     }
255 
256     void untrace()
257     {
258         PQuntrace(conn);
259     }
260 
261     void setClientEncoding(string encoding)
262     {
263         if(PQsetClientEncoding(conn, cast(char*) encoding.toStringz) != 0) //TODO: need report to derelict pq
264             throw new ConnectionException(this, __FILE__, __LINE__);
265     }
266 }
267 
268 void connStringCheck(string connString)
269 {
270     char* errmsg = null;
271     PQconninfoOption* r = PQconninfoParse(cast(char*) connString.toStringz, &errmsg); //TODO: need report to derelict pq
272 
273     if(r is null)
274     {
275         enforceEx!OutOfMemoryError(errmsg, "Unable to allocate libpq conninfo data");
276     }
277     else
278     {
279         PQconninfoFree(r);
280     }
281 
282     if(errmsg !is null)
283     {
284         string s = errmsg.fromStringz.to!string;
285         PQfreemem(cast(void*) errmsg);
286 
287         throw new ConnectionException(s, __FILE__, __LINE__);
288     }
289 }
290 
291 unittest
292 {
293     connStringCheck("dbname=postgres user=postgres");
294 
295     {
296         bool raised = false;
297 
298         try
299             connStringCheck("wrong conninfo string");
300         catch(ConnectionException e)
301             raised = true;
302 
303         assert(raised);
304     }
305 }
306 
307 /// Doing canceling queries in progress
308 class Cancellation
309 {
310     private PGcancel* cancel;
311 
312     this(Connection c)
313     {
314         cancel = PQgetCancel(c.conn);
315 
316         if(cancel is null)
317             throw new ConnectionException(c, __FILE__, __LINE__);
318     }
319 
320     ~this()
321     {
322         PQfreeCancel(cancel);
323     }
324 
325     void doCancel()
326     {
327         char[256] errbuf;
328         auto res = PQcancel(cancel, errbuf.ptr, errbuf.length);
329 
330         if(res != 1)
331             throw new CancellationException(to!string(errbuf.ptr.fromStringz), __FILE__, __LINE__);
332     }
333 }
334 
335 class CancellationException : Dpq2Exception
336 {
337     this(string msg, string file, size_t line)
338     {
339         super(msg, file, line);
340     }
341 }
342 
343 /// Connection exception
344 class ConnectionException : Dpq2Exception
345 {
346     this(in Connection c, string file, size_t line)
347     {
348         super(c.errorMessage(), file, line);
349     }
350 
351     this(string msg, string file, size_t line)
352     {
353         super(msg, file, line);
354     }
355 }
356 
357 void _integration_test( string connParam )
358 {
359     assert( PQlibVersion() >= 9_0100 );
360 
361     {
362         auto c = new Connection(connParam);
363 
364         destroy(c);
365     }
366 
367     {
368         bool exceptionFlag = false;
369 
370         try
371             auto c = new Connection(ConnectionStart(), "!!!some incorrect connection string!!!");
372         catch(ConnectionException e)
373         {
374             exceptionFlag = true;
375             assert(e.msg.length > 40); // error message check
376         }
377         finally
378             assert(exceptionFlag);
379     }
380 
381     {
382         auto c = new Connection(connParam);
383 
384         assert(c.escapeLiteral("abc'def") == "'abc''def'");
385         assert(c.escapeIdentifier("abc'def") == "\"abc'def\"");
386 
387         c.setClientEncoding("WIN866");
388         assert(c.exec("show client_encoding")[0][0].as!string == "WIN866");
389     }
390 }