1 /// Query methods
2 module dpq2.query;
3 
4 public import dpq2.args;
5 
6 import dpq2.connection: Connection, ConnectionException;
7 import dpq2.result: Result;
8 import dpq2.value;
9 import dpq2.oids: OidType;
10 import derelict.pq.pq;
11 import core.time: Duration, dur;
12 import std.exception: enforce;
13 
14 /// Extends Connection by adding query methods
15 ///
16 /// Just use it as Connection.* methods.
17 mixin template Queries()
18 {
19     /// Perform SQL query to DB
20     immutable (Answer) exec( string SQLcmd )
21     {
22         auto pgResult = PQexec(conn, toStringz( SQLcmd ));
23 
24         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
25         auto container = createResultContainer(cast(immutable) pgResult);
26 
27         return new immutable Answer(container);
28     }
29 
30     /// Perform SQL query to DB
31     immutable (Answer) execParams(in ref QueryParams qp)
32     {
33         auto p = InternalQueryParams(&qp);
34         auto pgResult = PQexecParams (
35                 conn,
36                 p.command,
37                 p.nParams,
38                 p.paramTypes,
39                 p.paramValues,
40                 p.paramLengths,
41                 p.paramFormats,
42                 p.resultFormat
43         );
44 
45         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
46         auto container = createResultContainer(cast(immutable) pgResult);
47 
48         return new immutable Answer(container);
49     }
50 
51     /// Submits a command to the server without waiting for the result(s)
52     void sendQuery( string SQLcmd )
53     {
54         const size_t r = PQsendQuery( conn, toStringz(SQLcmd) );
55         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
56     }
57 
58     /// Submits a command and separate parameters to the server without waiting for the result(s)
59     void sendQueryParams(in ref QueryParams qp)
60     {
61         auto p = InternalQueryParams(&qp);
62         size_t r = PQsendQueryParams (
63                 conn,
64                 p.command,
65                 p.nParams,
66                 p.paramTypes,
67                 p.paramValues,
68                 p.paramLengths,
69                 p.paramFormats,
70                 p.resultFormat
71             );
72 
73         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
74     }
75 
76     /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s)
77     void sendQueryPrepared(in ref QueryParams qp)
78     {
79         auto p = InternalQueryParams(&qp);
80         size_t r = PQsendQueryPrepared(
81                 conn,
82                 p.stmtName,
83                 p.nParams,
84                 p.paramValues,
85                 p.paramLengths,
86                 p.paramFormats,
87                 p.resultFormat
88             );
89 
90         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
91     }
92 
93     /// Returns null if no notifies was received
94     Notify getNextNotify()
95     {
96         consumeInput();
97         auto n = PQnotifies(conn);
98         return n is null ? null : new Notify( n );
99     }
100 
101     /// Submits a request to create a prepared statement with the given parameters, and waits for completion.
102     /// Returns: Result of query preparing
103     immutable(Result) prepare(string statementName, string sqlStatement, in Oid[] oids = null)
104     {
105         PGresult* pgResult = PQprepare(
106                 conn,
107                 toStringz(statementName),
108                 toStringz(sqlStatement),
109                 oids.length.to!int,
110                 oids.ptr
111             );
112 
113         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
114         auto container = createResultContainer(cast(immutable) pgResult);
115 
116         return new immutable Result(container);
117     }
118 
119     /// Submits a request to create a prepared statement with the given parameters, and waits for completion.
120     ///
121     /// Throws an exception if preparing failed.
122     void prepareEx(string statementName, string sqlStatement, in Oid[] oids = null)
123     {
124         auto r = prepare(statementName, sqlStatement, oids);
125 
126         if(r.status != PGRES_COMMAND_OK)
127             throw new ResponseException(r, __FILE__, __LINE__);
128     }
129 
130     /// Submits a request to execute a prepared statement with given parameters, and waits for completion.
131     immutable(Answer) execPrepared(in ref QueryParams qp)
132     {
133         auto p = InternalQueryParams(&qp);
134         auto pgResult = PQexecPrepared(
135                 conn,
136                 p.stmtName,
137                 p.nParams,
138                 cast(const(char*)*)p.paramValues,
139                 p.paramLengths,
140                 p.paramFormats,
141                 p.resultFormat
142             );
143 
144         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
145         auto container = createResultContainer(cast(immutable) pgResult);
146 
147         return new immutable Answer(container);
148     }
149 
150     /// Sends a request to create a prepared statement with the given parameters, without waiting for completion.
151     void sendPrepare(string statementName, string sqlStatement, in Oid[] oids = null)
152     {
153         size_t r = PQsendPrepare(
154                 conn,
155                 toStringz(statementName),
156                 toStringz(sqlStatement),
157                 oids.length.to!int,
158                 oids.ptr
159             );
160 
161         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
162     }
163 
164     /// Submits a request to obtain information about the specified prepared statement, and waits for completion.
165     immutable(Answer) describePrepared(string statementName)
166     {
167         PGresult* pgResult = PQdescribePrepared(conn, toStringz(statementName));
168 
169         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
170         auto container = createResultContainer(cast(immutable) pgResult);
171 
172         return new immutable Answer(container);
173     }
174 
175     /// Submits a request to obtain information about the specified prepared statement, without waiting for completion.
176     void sendDescribePrepared(string statementName)
177     {
178         size_t r = PQsendDescribePrepared(conn, statementName.toStringz);
179 
180         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
181     }
182 
183     // Waiting for completion of reading or writing
184     // Returns: timeout is not occured
185     version(integration_tests)
186     bool waitEndOf(WaitType type, Duration timeout = Duration.zero)
187     {
188         import std.socket;
189 
190         auto socket = this.socket();
191         auto set = new SocketSet;
192         set.add(socket);
193 
194         while(true)
195         {
196             if(status() == CONNECTION_BAD)
197                 throw new ConnectionException(this, __FILE__, __LINE__);
198 
199             if(poll() == PGRES_POLLING_OK)
200             {
201                 return true;
202             }
203             else
204             {
205                 size_t sockNum;
206 
207                 with(WaitType)
208                 final switch(type)
209                 {
210                     case READ:
211                         sockNum = Socket.select(set, null, set, timeout);
212                         break;
213 
214                     case WRITE:
215                         sockNum = Socket.select(null, set, set, timeout);
216                         break;
217 
218                     case READ_WRITE:
219                         sockNum = Socket.select(set, set, set, timeout);
220                         break;
221                 }
222 
223                 enforce(sockNum >= 0);
224                 if(sockNum == 0) return false; // timeout is occurred
225 
226                 continue;
227             }
228         }
229     }
230 }
231 
232 version(integration_tests)
233 enum WaitType
234 {
235     READ,
236     WRITE,
237     READ_WRITE
238 }
239 
240 version (integration_tests)
241 void _integration_test( string connParam ) @trusted
242 {
243     import dpq2.conv.to_d_types;
244     import dpq2.conv.to_bson;
245 
246     auto conn = new Connection(connParam);
247 
248     // Text type arguments testing
249     {
250         string sql_query =
251         "select now() as time, 'abc'::text as string, 123, 456.78\n"~
252         "union all\n"~
253         "select now(), 'абвгд'::text, 777, 910.11\n"~
254         "union all\n"~
255         "select NULL, 'ijk'::text, 789, 12345.115345";
256 
257         auto a = conn.exec( sql_query );
258 
259         assert( a.cmdStatus.length > 2 );
260         assert( a.columnCount == 4 );
261         assert( a.length == 3 );
262         assert( a.columnFormat(1) == ValueFormat.TEXT );
263         assert( a.columnFormat(2) == ValueFormat.TEXT );
264     }
265 
266     // Binary type arguments testing
267     {
268         import vibe.data.bson: Bson;
269 
270         const string sql_query =
271         "select $1::text, $2::integer, $3::text, $4, $5::integer[]";
272 
273         Value[5] args;
274         args[0] = toValue("абвгд");
275         args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value
276         args[2] = toValue("123");
277         args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value
278 
279         Bson binArray = Bson([
280             Bson([Bson(null), Bson(123), Bson(456)]),
281             Bson([Bson(0), Bson(789), Bson(null)])
282         ]);
283 
284         args[4] = bsonToValue(binArray);
285 
286         QueryParams p;
287         p.sqlCommand = sql_query;
288         p.args = args[];
289 
290         auto a = conn.execParams( p );
291 
292         foreach(i; 0 .. args.length)
293             assert(a.columnFormat(i) == ValueFormat.BINARY);
294 
295         assert( a.OID(0) == OidType.Text );
296         assert( a.OID(1) == OidType.Int4 );
297         assert( a.OID(2) == OidType.Text );
298         assert( a.OID(3) == OidType.Int8 );
299         assert( a.OID(4) == OidType.Int4Array );
300 
301         // binary args array test
302         assert( a[0][4].as!Bson == binArray );
303     }
304 
305     {
306         // Bug #52: empty text argument
307         QueryParams p;
308         Value v = toValue("");
309 
310         p.sqlCommand = "SELECT $1";
311         p.args = [v];
312 
313         auto a = conn.execParams(p);
314 
315         assert( !a[0][0].isNull );
316         assert( a[0][0].as!string == "" );
317     }
318 
319     // checking prepared statements
320     {
321         // uses PQprepare:
322         conn.prepareEx("prepared statement 1", "SELECT $1::integer");
323 
324         QueryParams p;
325         p.preparedStatementName = "prepared statement 1";
326         p.args = [42.toValue];
327         auto r = conn.execPrepared(p);
328         assert (r[0][0].as!int == 42);
329     }
330     {
331         // uses PQsendPrepare:
332         conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer");
333 
334         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
335         conn.consumeInput();
336 
337         immutable(Result)[] res;
338 
339         while(true)
340         {
341             auto r = conn.getResult();
342             if(r is null) break;
343             res ~= r;
344         }
345 
346         assert(res.length == 1);
347         assert(res[0].status == PGRES_COMMAND_OK);
348     }
349     {
350         // check prepared arg types and result types
351         auto a = conn.describePrepared("prepared statement 2");
352 
353         assert(a.nParams == 2);
354         assert(a.paramType(0) == OidType.Text);
355         assert(a.paramType(1) == OidType.Int4);
356     }
357     {
358         // async check prepared arg types and result types
359         conn.sendDescribePrepared("prepared statement 2");
360 
361         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
362         conn.consumeInput();
363 
364         immutable(Result)[] res;
365 
366         while(true)
367         {
368             auto r = conn.getResult();
369             if(r is null) break;
370             res ~= r;
371         }
372 
373         assert(res.length == 1);
374         assert(res[0].status == PGRES_COMMAND_OK);
375 
376         auto a = res[0].getAnswer;
377 
378         assert(a.nParams == 2);
379         assert(a.paramType(0) == OidType.Text);
380         assert(a.paramType(1) == OidType.Int4);
381     }
382     {
383         QueryParams p;
384         p.preparedStatementName = "prepared statement 2";
385         p.argsFromArray = ["abc", "123456"];
386 
387         conn.sendQueryPrepared(p);
388 
389         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
390         conn.consumeInput();
391 
392         immutable(Result)[] res;
393 
394         while(true)
395         {
396             auto r = conn.getResult();
397             if(r is null) break;
398             res ~= r;
399         }
400 
401         assert(res.length == 1);
402         assert(res[0].getAnswer[0][0].as!PGtext == "abc");
403         assert(res[0].getAnswer[0][1].as!PGinteger == 123456);
404     }
405 
406     import std.socket;
407     conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection
408 
409     {
410         bool exceptionFlag = false;
411 
412         try conn.exec("SELECT 'abc'::text").getAnswer;
413         catch(ConnectionException e)
414         {
415             exceptionFlag = true;
416             assert(e.msg.length > 15); // error message check
417         }
418         finally
419             assert(exceptionFlag);
420     }
421 }