1 module dpq2.connection;
2 
3 @trusted:
4 
5 public import derelict.pq.pq;
6 import dpq2.answer: Answer;
7 import std.conv: to;
8 import std.string: toStringz;
9 import std.exception: enforceEx;
10 import std.range;
11 import core.exception;
12 
13 /*
14  * Bugs: On Unix connection is not thread safe.
15  * 
16  * On Unix, forking a process with open libpq connections can lead
17  * to unpredictable results because the parent and child processes share
18  * the same sockets and operating system resources. For this reason,
19  * such usage is not recommended, though doing an exec from the child
20  * process to load a new executable is safe.
21 
22 
23 
24 int PQisthreadsafe();
25 Returns 1 if the libpq is thread-safe and 0 if it is not.
26 */
27 
28 /// BaseConnection
29 package class BaseConnection
30 {
31     string connString; /// Database connection parameters
32     
33     package PGconn* conn;
34     private
35     {
36         bool readyForQuery; // connection started and not disconnect() was called
37 
38         enum ConsumeResult
39         {
40             PQ_CONSUME_ERROR,
41             PQ_CONSUME_OK
42         }
43     }
44     
45     @property bool nonBlocking()
46     {
47         return PQisnonblocking(conn) == 1;
48     }
49 
50     private void setNonBlocking( bool state )
51     {
52         if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 )
53             throw new ConnException(this, __FILE__, __LINE__);
54     }
55     
56 	/// Connect to DB
57     void connect()
58     {
59         assert( !readyForQuery );
60         
61         conn = PQconnectdb(toStringz(connString));
62         
63         enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
64         
65         if( !nonBlocking && status != CONNECTION_OK )
66             throw new ConnException(this, __FILE__, __LINE__);
67         
68         readyForQuery = true;
69     }
70 
71 	/// Connect to DB in a nonblocking manner
72     void connectNonblockingStart()
73     {
74         assert( !readyForQuery );
75 
76         conn = PQconnectStart(cast(char*) toStringz(connString)); // TODO: wrong DerelictPQ args
77 
78         enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data");
79 
80         if( status == CONNECTION_BAD )
81             throw new ConnException(this, __FILE__, __LINE__);
82 
83         readyForQuery = true;
84     }
85 
86     PostgresPollingStatusType poll()
87     {
88         assert( readyForQuery );
89 
90         return PQconnectPoll(conn);
91     }
92 
93     ConnStatusType status()
94     {
95         return PQstatus(conn);
96     }
97 
98 	/// Disconnect from DB
99     void disconnect()
100     {
101         if( readyForQuery )
102         {
103             readyForQuery = false;
104             PQfinish( conn );
105             // TODO: remove readyForQuery and just use conn = null as flag
106         }
107     }
108 
109     package void consumeInput()
110     {
111         assert( readyForQuery );
112 
113         const size_t r = PQconsumeInput( conn );
114         if( r != ConsumeResult.PQ_CONSUME_OK ) throw new ConnException(this, __FILE__, __LINE__);
115     }
116     
117     package bool flush()
118     {
119         assert( readyForQuery );
120 
121         auto r = PQflush(conn);
122         if( r == -1 ) throw new ConnException(this, __FILE__, __LINE__);
123         return r == 0;
124     }
125     
126     package size_t socket()
127     {
128         auto r = PQsocket( conn );
129         assert( r >= 0 );
130         return r;
131     }
132 
133     package string errorMessage()
134     {
135         return to!(string)(PQerrorMessage(conn));
136     }
137 
138     ~this()
139     {
140         disconnect();
141     }
142 
143     /**
144      * returns the previous notice receiver or processor function pointer, and sets the new value.
145      * If you supply a null function pointer, no action is taken, but the current pointer is returned.
146      */
147     PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow
148     {
149         assert( readyForQuery );
150 
151         return PQsetNoticeProcessor(conn, proc, arg);
152     }
153 
154     /// Get for the next result from a sendQuery. Can return null.
155     package immutable(Answer) getAnswer()
156     {
157         return _getAnswer(PQgetResult(conn));
158     }
159 
160     /// Get Answer from PQexec* functions or throw error if pull is empty
161     package immutable(Answer) getAnswer( PGresult* r )
162     {
163         auto a = _getAnswer(r);
164 
165         if(!a) throw new ConnException(this, __FILE__, __LINE__);
166 
167         return a;
168     }
169 
170     /// Get Answer from PQexec* functions
171     private immutable(Answer) _getAnswer(in PGresult* r)
172     {
173         if(r)
174         {
175             auto res = new immutable Answer(cast(immutable) r);
176             res.checkAnswerForErrors(); // It is important to do a separate check because of Answer ctor is nothrow
177             return res;
178         }
179 
180         return null;
181     }
182 }
183 
184 /// Connection exception
185 class ConnException : Dpq2Exception
186 {
187     private BaseConnection conn;
188 
189     this(BaseConnection c, string file, size_t line)
190     {
191         conn = c;
192 
193         super(conn.errorMessage(), file, line);
194     }
195 }
196 
197 class Dpq2Exception : Exception
198 {
199     this(string msg, string file, size_t line)
200     {
201         super(msg, file, line);
202     }
203 }
204 
205 void _integration_test( string connParam )
206 {
207     assert( PQlibVersion() >= 9_0100 );
208 
209     {
210         auto c = new BaseConnection;
211         c.connString = connParam;
212 
213         c.connect();
214         c.disconnect();
215 
216         c.connect();
217         c.disconnect();
218 
219         destroy(c);
220     }
221 
222     {
223         bool exceptionFlag = false;
224         auto c = new BaseConnection;
225         c.connString = "!!!some incorrect connection string!!!";
226 
227         try c.connect();
228         catch(ConnException e)
229         {
230             exceptionFlag = true;
231             assert(e.msg.length > 40); // error message check
232         }
233         finally
234             assert(exceptionFlag);
235     }
236 }