1 /// Query methods
2 module dpq2.query;
3 
4 public import dpq2.args;
5 
6 import dpq2.connection: Connection, ConnectionException;
7 import dpq2.result: Result;
8 import dpq2.value;
9 import dpq2.oids: OidType;
10 import derelict.pq.pq;
11 import core.time: Duration, dur;
12 import std.exception: enforce;
13 
14 /// Extends Connection by adding query methods
15 ///
16 /// Just use it as Connection.* methods.
17 mixin template Queries()
18 {
19     /// Perform SQL query to DB
20     immutable (Answer) exec( string SQLcmd )
21     {
22         auto pgResult = PQexec(conn, toStringz( SQLcmd ));
23 
24         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
25         auto container = createResultContainer(cast(immutable) pgResult);
26 
27         return new immutable Answer(container);
28     }
29 
30     /// Perform SQL query to DB
31     immutable (Answer) execParams(in ref QueryParams qp)
32     {
33         auto p = InternalQueryParams(&qp);
34         auto pgResult = PQexecParams (
35                 conn,
36                 p.command,
37                 p.nParams,
38                 p.paramTypes,
39                 p.paramValues,
40                 p.paramLengths,
41                 p.paramFormats,
42                 p.resultFormat
43         );
44 
45         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
46         auto container = createResultContainer(cast(immutable) pgResult);
47 
48         return new immutable Answer(container);
49     }
50 
51     /// Submits a command to the server without waiting for the result(s)
52     void sendQuery( string SQLcmd )
53     {
54         const size_t r = PQsendQuery( conn, toStringz(SQLcmd) );
55         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
56     }
57 
58     /// Submits a command and separate parameters to the server without waiting for the result(s)
59     void sendQueryParams(in ref QueryParams qp)
60     {
61         auto p = InternalQueryParams(&qp);
62         size_t r = PQsendQueryParams (
63                 conn,
64                 p.command,
65                 p.nParams,
66                 p.paramTypes,
67                 p.paramValues,
68                 p.paramLengths,
69                 p.paramFormats,
70                 p.resultFormat
71             );
72 
73         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
74     }
75 
76     /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s)
77     void sendQueryPrepared(in ref QueryParams qp)
78     {
79         auto p = InternalQueryParams(&qp);
80         size_t r = PQsendQueryPrepared(
81                 conn,
82                 p.stmtName,
83                 p.nParams,
84                 p.paramValues,
85                 p.paramLengths,
86                 p.paramFormats,
87                 p.resultFormat
88             );
89 
90         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
91     }
92 
93     /// Returns null if no notifies was received
94     Notify getNextNotify()
95     {
96         consumeInput();
97         auto n = PQnotifies(conn);
98         return n is null ? null : new Notify( n );
99     }
100 
101     /// Submits a request to create a prepared statement with the given parameters, and waits for completion.
102     /// Returns: Result of query preparing
103     immutable(Result) prepare(string statementName, string sqlStatement, in Oid[] oids = null)
104     {
105         PGresult* pgResult = PQprepare(
106                 conn,
107                 toStringz(statementName),
108                 toStringz(sqlStatement),
109                 oids.length.to!int,
110                 oids.ptr
111             );
112 
113         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
114         auto container = createResultContainer(cast(immutable) pgResult);
115 
116         return new immutable Result(container);
117     }
118 
119     /// Submits a request to create a prepared statement with the given parameters, and waits for completion.
120     ///
121     /// Throws an exception if preparing failed.
122     void prepareEx(string statementName, string sqlStatement, in Oid[] oids = null)
123     {
124         auto r = prepare(statementName, sqlStatement, oids);
125 
126         if(r.status != PGRES_COMMAND_OK)
127             throw new ResponseException(r, __FILE__, __LINE__);
128     }
129 
130     /// Submits a request to execute a prepared statement with given parameters, and waits for completion.
131     immutable(Answer) execPrepared(in ref QueryParams qp)
132     {
133         auto p = InternalQueryParams(&qp);
134         auto pgResult = PQexecPrepared(
135                 conn,
136                 p.stmtName,
137                 p.nParams,
138                 cast(const(char*)*)p.paramValues,
139                 p.paramLengths,
140                 p.paramFormats,
141                 p.resultFormat
142             );
143 
144         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
145         auto container = createResultContainer(cast(immutable) pgResult);
146 
147         return new immutable Answer(container);
148     }
149 
150     /// Sends a request to create a prepared statement with the given parameters, without waiting for completion.
151     void sendPrepare(string statementName, string sqlStatement, in Oid[] oids = null)
152     {
153         size_t r = PQsendPrepare(
154                 conn,
155                 toStringz(statementName),
156                 toStringz(sqlStatement),
157                 oids.length.to!int,
158                 oids.ptr
159             );
160 
161         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
162     }
163 
164     /// Submits a request to obtain information about the specified prepared statement, and waits for completion.
165     immutable(Answer) describePrepared(string statementName)
166     {
167         PGresult* pgResult = PQdescribePrepared(conn, toStringz(statementName));
168 
169         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
170         auto container = createResultContainer(cast(immutable) pgResult);
171 
172         return new immutable Answer(container);
173     }
174 
175     /// Submits a request to obtain information about the specified prepared statement, without waiting for completion.
176     void sendDescribePrepared(string statementName)
177     {
178         size_t r = PQsendDescribePrepared(conn, statementName.toStringz);
179 
180         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
181     }
182 
183     /// Sends a buffer of CSV data to the COPY command
184     ///
185     /// Returns: true if the data was queued, false if it was not queued because of full buffers (this will only happen in nonblocking mode)
186     bool putCopyData( string data )
187     {
188         const int r = PQputCopyData(conn, data.toStringz, data.length.to!int);
189 
190         if(r == -1) throw new ConnectionException(this);
191 
192         return r != 0;
193     }
194 
195     /// Signals that COPY data send is finished. Finalize and flush the COPY command.
196     immutable(Answer) putCopyEnd()
197     {
198         assert(!isNonBlocking, "Only for blocking connections");
199 
200         const bool r = sendPutCopyEnd;
201 
202         assert(r, "Impossible status for blocking connections");
203 
204         // after the copying is finished, and there is no connection error, we must still get the command result
205         // this will get if there is any errors in the process (invalid data format or constraint violation, etc.)
206         auto pgResult = PQgetResult(conn);
207 
208         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
209         auto container = createResultContainer(cast(immutable) pgResult);
210 
211         return new immutable Answer(container);
212     }
213 
214     /// Signals that COPY data send is finished.
215     ///
216     /// Returns: true if the termination data was sent, zero if it was not sent because the attempt would block (this case is only possible if the connection is in nonblocking mode)
217     bool sendPutCopyEnd()
218     {
219         const char* error;
220         const int r = PQputCopyEnd(conn, error);
221 
222         if(error !is null) throw new ConnectionException(error.to!string);
223 
224         if(r == -1) throw new ConnectionException(this);
225 
226         return r != 0;
227     }
228 
229     // Waiting for completion of reading or writing
230     // Returns: timeout is not occured
231     version(integration_tests)
232     bool waitEndOf(WaitType type, Duration timeout = Duration.zero)
233     {
234         import std.socket;
235 
236         auto socket = this.socket();
237         auto set = new SocketSet;
238         set.add(socket);
239 
240         while(true)
241         {
242             if(status() == CONNECTION_BAD)
243                 throw new ConnectionException(this, __FILE__, __LINE__);
244 
245             if(poll() == PGRES_POLLING_OK)
246             {
247                 return true;
248             }
249             else
250             {
251                 size_t sockNum;
252 
253                 with(WaitType)
254                 final switch(type)
255                 {
256                     case READ:
257                         sockNum = Socket.select(set, null, set, timeout);
258                         break;
259 
260                     case WRITE:
261                         sockNum = Socket.select(null, set, set, timeout);
262                         break;
263 
264                     case READ_WRITE:
265                         sockNum = Socket.select(set, set, set, timeout);
266                         break;
267                 }
268 
269                 enforce(sockNum >= 0);
270                 if(sockNum == 0) return false; // timeout is occurred
271 
272                 continue;
273             }
274         }
275     }
276 }
277 
278 version(integration_tests)
279 enum WaitType
280 {
281     READ,
282     WRITE,
283     READ_WRITE
284 }
285 
286 version (integration_tests)
287 void _integration_test( string connParam ) @trusted
288 {
289     import dpq2.conv.to_d_types;
290     import dpq2.conv.to_bson;
291 
292     auto conn = new Connection(connParam);
293 
294     // Text type arguments testing
295     {
296         string sql_query =
297         "select now() as time, 'abc'::text as string, 123, 456.78\n"~
298         "union all\n"~
299         "select now(), 'абвгд'::text, 777, 910.11\n"~
300         "union all\n"~
301         "select NULL, 'ijk'::text, 789, 12345.115345";
302 
303         auto a = conn.exec( sql_query );
304 
305         assert( a.cmdStatus.length > 2 );
306         assert( a.columnCount == 4 );
307         assert( a.length == 3 );
308         assert( a.columnFormat(1) == ValueFormat.TEXT );
309         assert( a.columnFormat(2) == ValueFormat.TEXT );
310     }
311 
312     // Binary type arguments testing
313     {
314         import vibe.data.bson: Bson;
315 
316         const string sql_query =
317         "select $1::text, $2::integer, $3::text, $4, $5::integer[]";
318 
319         Value[5] args;
320         args[0] = toValue("абвгд");
321         args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value
322         args[2] = toValue("123");
323         args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value
324 
325         Bson binArray = Bson([
326             Bson([Bson(null), Bson(123), Bson(456)]),
327             Bson([Bson(0), Bson(789), Bson(null)])
328         ]);
329 
330         args[4] = bsonToValue(binArray);
331 
332         QueryParams p;
333         p.sqlCommand = sql_query;
334         p.args = args[];
335 
336         auto a = conn.execParams( p );
337 
338         foreach(i; 0 .. args.length)
339             assert(a.columnFormat(i) == ValueFormat.BINARY);
340 
341         assert( a.OID(0) == OidType.Text );
342         assert( a.OID(1) == OidType.Int4 );
343         assert( a.OID(2) == OidType.Text );
344         assert( a.OID(3) == OidType.Int8 );
345         assert( a.OID(4) == OidType.Int4Array );
346 
347         // binary args array test
348         assert( a[0][4].as!Bson == binArray );
349     }
350 
351     {
352         // Bug #52: empty text argument
353         QueryParams p;
354         Value v = toValue("");
355 
356         p.sqlCommand = "SELECT $1";
357         p.args = [v];
358 
359         auto a = conn.execParams(p);
360 
361         assert( !a[0][0].isNull );
362         assert( a[0][0].as!string == "" );
363     }
364 
365     // checking prepared statements
366     {
367         // uses PQprepare:
368         conn.prepareEx("prepared statement 1", "SELECT $1::integer");
369 
370         QueryParams p;
371         p.preparedStatementName = "prepared statement 1";
372         p.args = [42.toValue];
373         auto r = conn.execPrepared(p);
374         assert (r[0][0].as!int == 42);
375     }
376     {
377         // uses PQsendPrepare:
378         conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer");
379 
380         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
381         conn.consumeInput();
382 
383         immutable(Result)[] res;
384 
385         while(true)
386         {
387             auto r = conn.getResult();
388             if(r is null) break;
389             res ~= r;
390         }
391 
392         assert(res.length == 1);
393         assert(res[0].status == PGRES_COMMAND_OK);
394     }
395     {
396         // check prepared arg types and result types
397         auto a = conn.describePrepared("prepared statement 2");
398 
399         assert(a.nParams == 2);
400         assert(a.paramType(0) == OidType.Text);
401         assert(a.paramType(1) == OidType.Int4);
402     }
403     {
404         // async check prepared arg types and result types
405         conn.sendDescribePrepared("prepared statement 2");
406 
407         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
408         conn.consumeInput();
409 
410         immutable(Result)[] res;
411 
412         while(true)
413         {
414             auto r = conn.getResult();
415             if(r is null) break;
416             res ~= r;
417         }
418 
419         assert(res.length == 1);
420         assert(res[0].status == PGRES_COMMAND_OK);
421 
422         auto a = res[0].getAnswer;
423 
424         assert(a.nParams == 2);
425         assert(a.paramType(0) == OidType.Text);
426         assert(a.paramType(1) == OidType.Int4);
427     }
428     {
429         QueryParams p;
430         p.preparedStatementName = "prepared statement 2";
431         p.argsFromArray = ["abc", "123456"];
432 
433         conn.sendQueryPrepared(p);
434 
435         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
436         conn.consumeInput();
437 
438         immutable(Result)[] res;
439 
440         while(true)
441         {
442             auto r = conn.getResult();
443             if(r is null) break;
444             res ~= r;
445         }
446 
447         assert(res.length == 1);
448         assert(res[0].getAnswer[0][0].as!PGtext == "abc");
449         assert(res[0].getAnswer[0][1].as!PGinteger == 123456);
450     }
451     {
452         // test COPY
453         conn.exec("CREATE TEMP TABLE test_copy (text_field TEXT, int_field INT8)");
454 
455         conn.exec("COPY test_copy FROM STDIN WITH (FORMAT csv)");
456         conn.putCopyData("Val1,1\nval2,2\n");
457         conn.putCopyData("Val3,3\nval4,4\n");
458         conn.putCopyEnd();
459 
460         auto res = conn.exec("SELECT count(text_field), sum(int_field) FROM test_copy");
461         assert(res.length == 1);
462         assert(res[0][0].as!string == "4");
463         assert(res[0][1].as!string == "10");
464 
465         // This time with error
466         import std.exception: assertThrown;
467         import dpq2.result: ResponseException;
468 
469         conn.exec("COPY test_copy FROM STDIN WITH (FORMAT csv)");
470         conn.putCopyData("Val1,2\nval2,4,POORLY_FORMATTED_CSV\n");
471 
472         assertThrown!ResponseException(conn.putCopyEnd());
473     }
474 
475     import std.socket;
476     conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection
477 
478     {
479         bool exceptionFlag = false;
480 
481         try conn.exec("SELECT 'abc'::text").getAnswer;
482         catch(ConnectionException e)
483         {
484             exceptionFlag = true;
485             assert(e.msg.length > 15); // error message check
486         }
487         finally
488             assert(exceptionFlag);
489     }
490 }