1 module dpq2.query;
2 
3 public import dpq2.args;
4 
5 import dpq2.connection: Connection, ConnectionException;
6 import dpq2.result: Result;
7 import dpq2.value;
8 import dpq2.oids: OidType;
9 import derelict.pq.pq;
10 import core.time: Duration, dur;
11 import std.exception: enforce;
12 
13 mixin template Queries()
14 {
15     /// Perform SQL query to DB
16     immutable (Answer) exec( string SQLcmd )
17     {
18         auto pgResult = PQexec(conn, toStringz( SQLcmd ));
19 
20         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
21         auto container = createResultContainer(cast(immutable) pgResult);
22 
23         return new immutable Answer(container);
24     }
25 
26     /// Perform SQL query to DB
27     immutable (Answer) execParams(in ref QueryParams qp)
28     {
29         auto p = InternalQueryParams(&qp);
30         auto pgResult = PQexecParams (
31                 conn,
32                 p.command,
33                 p.nParams,
34                 p.paramTypes,
35                 p.paramValues,
36                 p.paramLengths,
37                 p.paramFormats,
38                 p.resultFormat
39         );
40 
41         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
42         auto container = createResultContainer(cast(immutable) pgResult);
43 
44         return new immutable Answer(container);
45     }
46     
47     /// Submits a command to the server without waiting for the result(s)
48     void sendQuery( string SQLcmd )
49     {
50         const size_t r = PQsendQuery( conn, toStringz(SQLcmd) );
51         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
52     }
53 
54     /// Submits a command and separate parameters to the server without waiting for the result(s)
55     void sendQueryParams(in ref QueryParams qp)
56     {
57         auto p = InternalQueryParams(&qp);
58         size_t r = PQsendQueryParams (
59                 conn,
60                 p.command,
61                 p.nParams,
62                 p.paramTypes,
63                 p.paramValues,
64                 p.paramLengths,
65                 p.paramFormats,
66                 p.resultFormat
67             );
68 
69         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
70     }
71 
72     /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s)
73     void sendQueryPrepared(in ref QueryParams qp)
74     {
75         auto p = InternalQueryParams(&qp);
76         size_t r = PQsendQueryPrepared(
77                 conn,
78                 p.stmtName,
79                 p.nParams,
80                 p.paramValues,
81                 p.paramLengths,
82                 p.paramFormats,
83                 p.resultFormat
84             );
85 
86         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
87     }
88 
89     /// Returns null if no notifies was received
90     Notify getNextNotify()
91     {
92         consumeInput();
93         auto n = PQnotifies(conn);
94         return n is null ? null : new Notify( n );
95     }
96 
97     /// Submits a request to create a prepared statement with the given parameters, and waits for completion.
98     immutable(Result) prepare(string statementName, string sqlStatement, in Oid[] oids = null)
99     {
100         PGresult* pgResult = PQprepare(
101                 conn,
102                 toStringz(statementName),
103                 toStringz(sqlStatement),
104                 oids.length.to!int,
105                 oids.ptr
106             );
107 
108         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
109         auto container = createResultContainer(cast(immutable) pgResult);
110 
111         return new immutable Result(container);
112     }
113 
114     /// Submits a request to execute a prepared statement with given parameters, and waits for completion.
115     immutable(Answer) execPrepared(in ref QueryParams qp)
116     {
117         auto p = InternalQueryParams(&qp);
118         auto pgResult = PQexecPrepared(
119                 conn,
120                 p.stmtName,
121                 p.nParams,
122                 cast(const(char*)*)p.paramValues,
123                 p.paramLengths,
124                 p.paramFormats,
125                 p.resultFormat
126             );
127 
128         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
129         auto container = createResultContainer(cast(immutable) pgResult);
130 
131         return new immutable Answer(container);
132     }
133 
134     /// Sends a request to create a prepared statement with the given parameters, without waiting for completion.
135     void sendPrepare(string statementName, string sqlStatement, in Oid[] oids = null)
136     {
137         size_t r = PQsendPrepare(
138                 conn,
139                 toStringz(statementName),
140                 toStringz(sqlStatement),
141                 oids.length.to!int,
142                 cast(Oid*)oids.ptr //const should be accepted here, see https://github.com/DerelictOrg/DerelictPQ/issues/21
143             );
144 
145         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
146     }
147 
148     immutable(Answer) describePrepared(string statementName)
149     {
150         PGresult* pgResult = PQdescribePrepared(conn, toStringz(statementName));
151 
152         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
153         auto container = createResultContainer(cast(immutable) pgResult);
154 
155         return new immutable Answer(container);
156     }
157 
158     void sendDescribePrepared(string statementName)
159     {
160         size_t r = PQsendDescribePrepared(conn, statementName.toStringz);
161 
162         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
163     }
164 
165     /// Waiting for completion of reading or writing
166     /// Return: timeout not occured
167     bool waitEndOf(WaitType type, Duration timeout = Duration.zero)
168     {
169         import std.socket;
170 
171         auto socket = this.socket();
172         auto set = new SocketSet;
173         set.add(socket);
174 
175         while(true)
176         {
177             if(status() == CONNECTION_BAD)
178                 throw new ConnectionException(this, __FILE__, __LINE__);
179 
180             if(poll() == PGRES_POLLING_OK)
181             {
182                 return true;
183             }
184             else
185             {
186                 size_t sockNum;
187 
188                 with(WaitType)
189                 final switch(type)
190                 {
191                     case READ:
192                         sockNum = Socket.select(set, null, set, timeout);
193                         break;
194 
195                     case WRITE:
196                         sockNum = Socket.select(null, set, set, timeout);
197                         break;
198 
199                     case READ_WRITE:
200                         sockNum = Socket.select(set, set, set, timeout);
201                         break;
202                 }
203 
204                 enforce(sockNum >= 0);
205                 if(sockNum == 0) return false; // timeout is occurred
206 
207                 continue;
208             }
209         }
210     }
211 }
212 
213 enum WaitType
214 {
215     READ,
216     WRITE,
217     READ_WRITE
218 }
219 
220 void _integration_test( string connParam ) @trusted
221 {
222     import dpq2.conv.to_d_types; 
223     import dpq2.conv.to_bson;
224 
225     auto conn = new Connection(connParam);
226 
227     // Text type arguments testing
228     {    
229         string sql_query =
230         "select now() as time, 'abc'::text as string, 123, 456.78\n"~
231         "union all\n"~
232         "select now(), 'абвгд'::text, 777, 910.11\n"~
233         "union all\n"~
234         "select NULL, 'ijk'::text, 789, 12345.115345";
235 
236         auto a = conn.exec( sql_query );
237 
238         assert( a.cmdStatus.length > 2 );
239         assert( a.columnCount == 4 );
240         assert( a.length == 3 );
241         assert( a.columnFormat(1) == ValueFormat.TEXT );
242         assert( a.columnFormat(2) == ValueFormat.TEXT );
243     }
244 
245     // Binary type arguments testing
246     {
247         import vibe.data.bson: Bson;
248 
249         const string sql_query =
250         "select $1::text, $2::integer, $3::text, $4, $5::integer[]";
251 
252         Value[5] args;
253         args[0] = toValue("абвгд");
254         args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value
255         args[2] = toValue("123");
256         args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value
257 
258         Bson binArray = Bson([
259             Bson([Bson(null), Bson(123), Bson(456)]),
260             Bson([Bson(0), Bson(789), Bson(null)])
261         ]);
262 
263         args[4] = bsonToValue(binArray);
264 
265         QueryParams p;
266         p.sqlCommand = sql_query;
267         p.args = args[];
268 
269         auto a = conn.execParams( p );
270 
271         foreach(i; 0 .. args.length)
272             assert(a.columnFormat(i) == ValueFormat.BINARY);
273 
274         assert( a.OID(0) == OidType.Text );
275         assert( a.OID(1) == OidType.Int4 );
276         assert( a.OID(2) == OidType.Text );
277         assert( a.OID(3) == OidType.Int8 );
278         assert( a.OID(4) == OidType.Int4Array );
279 
280         // binary args array test
281         assert( a[0][4].as!Bson == binArray );
282     }
283 
284     {
285         // Bug #52: empty text argument
286         QueryParams p;
287         Value v = toValue("");
288 
289         p.sqlCommand = "SELECT $1";
290         p.args = [v];
291 
292         auto a = conn.execParams(p);
293 
294         assert( !a[0][0].isNull );
295         assert( a[0][0].as!string == "" );
296     }
297 
298     // checking prepared statements
299     {
300         // uses PQprepare:
301         auto s = conn.prepare("prepared statement 1", "SELECT $1::integer");
302         assert(s.status == PGRES_COMMAND_OK);
303 
304         QueryParams p;
305         p.preparedStatementName = "prepared statement 1";
306         p.args = [42.toValue];
307         auto r = conn.execPrepared(p);
308         assert (r[0][0].as!int == 42);
309     }
310     {
311         // uses PQsendPrepare:
312         conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer");
313 
314         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
315         conn.consumeInput();
316 
317         immutable(Result)[] res;
318 
319         while(true)
320         {
321             auto r = conn.getResult();
322             if(r is null) break;
323             res ~= r;
324         }
325 
326         assert(res.length == 1);
327         assert(res[0].status == PGRES_COMMAND_OK);
328     }
329     {
330         // check prepared arg types and result types
331         auto a = conn.describePrepared("prepared statement 2");
332 
333         assert(a.nParams == 2);
334         assert(a.paramType(0) == OidType.Text);
335         assert(a.paramType(1) == OidType.Int4);
336     }
337     {
338         // async check prepared arg types and result types
339         conn.sendDescribePrepared("prepared statement 2");
340 
341         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
342         conn.consumeInput();
343 
344         immutable(Result)[] res;
345 
346         while(true)
347         {
348             auto r = conn.getResult();
349             if(r is null) break;
350             res ~= r;
351         }
352 
353         assert(res.length == 1);
354         assert(res[0].status == PGRES_COMMAND_OK);
355 
356         auto a = res[0].getAnswer;
357 
358         assert(a.nParams == 2);
359         assert(a.paramType(0) == OidType.Text);
360         assert(a.paramType(1) == OidType.Int4);
361     }
362     {
363         QueryParams p;
364         p.preparedStatementName = "prepared statement 2";
365         p.argsFromArray = ["abc", "123456"];
366 
367         conn.sendQueryPrepared(p);
368 
369         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
370         conn.consumeInput();
371 
372         immutable(Result)[] res;
373 
374         while(true)
375         {
376             auto r = conn.getResult();
377             if(r is null) break;
378             res ~= r;
379         }
380 
381         assert(res.length == 1);
382         assert(res[0].getAnswer[0][0].as!PGtext == "abc");
383         assert(res[0].getAnswer[0][1].as!PGinteger == 123456);
384     }
385 
386     import std.socket;
387     conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection
388 
389     {
390         bool exceptionFlag = false;
391 
392         try conn.exec("SELECT 'abc'::text").getAnswer;
393         catch(ConnectionException e)
394         {
395             exceptionFlag = true;
396             assert(e.msg.length > 15); // error message check
397         }
398         finally
399             assert(exceptionFlag);
400     }
401 }