1 /// Query methods
2 module dpq2.query;
3 
4 public import dpq2.args;
5 
6 import dpq2.connection: Connection, ConnectionException;
7 import dpq2.result: Result;
8 import dpq2.value;
9 import dpq2.oids: OidType;
10 import derelict.pq.pq;
11 import core.time: Duration, dur;
12 import std.exception: enforce;
13 
14 /// Extends Connection by adding query methods
15 ///
16 /// Just use it as Connection.* methods.
17 mixin template Queries()
18 {
19     /// Perform SQL query to DB
20     /// It uses the old wire protocol and all values are returned in textual
21     /// form. This means that the dpq2.conv.to_d_types.as template will likely
22     /// not work for anything but strings.
23     /// Try to used execParams instead, even if now parameters are present.
24     immutable (Answer) exec( string SQLcmd )
25     {
26         auto pgResult = PQexec(conn, toStringz( SQLcmd ));
27 
28         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
29         auto container = createResultContainer(cast(immutable) pgResult);
30 
31         return new immutable Answer(container);
32     }
33 
34     /// Perform SQL query to DB
35     immutable (Answer) execParams(in ref QueryParams qp)
36     {
37         auto p = InternalQueryParams(&qp);
38         auto pgResult = PQexecParams (
39                 conn,
40                 p.command,
41                 p.nParams,
42                 p.paramTypes,
43                 p.paramValues,
44                 p.paramLengths,
45                 p.paramFormats,
46                 p.resultFormat
47         );
48 
49         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
50         auto container = createResultContainer(cast(immutable) pgResult);
51 
52         return new immutable Answer(container);
53     }
54 
55     /// Submits a command to the server without waiting for the result(s)
56     void sendQuery( string SQLcmd )
57     {
58         const size_t r = PQsendQuery( conn, toStringz(SQLcmd) );
59         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
60     }
61 
62     /// Submits a command and separate parameters to the server without waiting for the result(s)
63     void sendQueryParams(in ref QueryParams qp)
64     {
65         auto p = InternalQueryParams(&qp);
66         size_t r = PQsendQueryParams (
67                 conn,
68                 p.command,
69                 p.nParams,
70                 p.paramTypes,
71                 p.paramValues,
72                 p.paramLengths,
73                 p.paramFormats,
74                 p.resultFormat
75             );
76 
77         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
78     }
79 
80     /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s)
81     void sendQueryPrepared(in ref QueryParams qp)
82     {
83         auto p = InternalQueryParams(&qp);
84         size_t r = PQsendQueryPrepared(
85                 conn,
86                 p.stmtName,
87                 p.nParams,
88                 p.paramValues,
89                 p.paramLengths,
90                 p.paramFormats,
91                 p.resultFormat
92             );
93 
94         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
95     }
96 
97     /// Returns null if no notifies was received
98     Notify getNextNotify()
99     {
100         consumeInput();
101         auto n = PQnotifies(conn);
102         return n is null ? null : new Notify( n );
103     }
104 
105     /// Submits a request to create a prepared statement with the given parameters, and waits for completion.
106     /// Returns: Result of query preparing
107     immutable(Result) prepare(string statementName, string sqlStatement, in Oid[] oids = null)
108     {
109         PGresult* pgResult = PQprepare(
110                 conn,
111                 toStringz(statementName),
112                 toStringz(sqlStatement),
113                 oids.length.to!int,
114                 oids.ptr
115             );
116 
117         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
118         auto container = createResultContainer(cast(immutable) pgResult);
119 
120         return new immutable Result(container);
121     }
122 
123     /// Submits a request to create a prepared statement with the given parameters, and waits for completion.
124     ///
125     /// Throws an exception if preparing failed.
126     void prepareEx(string statementName, string sqlStatement, in Oid[] oids = null)
127     {
128         auto r = prepare(statementName, sqlStatement, oids);
129 
130         if(r.status != PGRES_COMMAND_OK)
131             throw new ResponseException(r, __FILE__, __LINE__);
132     }
133 
134     /// Submits a request to execute a prepared statement with given parameters, and waits for completion.
135     immutable(Answer) execPrepared(in ref QueryParams qp)
136     {
137         auto p = InternalQueryParams(&qp);
138         auto pgResult = PQexecPrepared(
139                 conn,
140                 p.stmtName,
141                 p.nParams,
142                 cast(const(char*)*)p.paramValues,
143                 p.paramLengths,
144                 p.paramFormats,
145                 p.resultFormat
146             );
147 
148         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
149         auto container = createResultContainer(cast(immutable) pgResult);
150 
151         return new immutable Answer(container);
152     }
153 
154     /// Sends a request to create a prepared statement with the given parameters, without waiting for completion.
155     void sendPrepare(string statementName, string sqlStatement, in Oid[] oids = null)
156     {
157         size_t r = PQsendPrepare(
158                 conn,
159                 toStringz(statementName),
160                 toStringz(sqlStatement),
161                 oids.length.to!int,
162                 oids.ptr
163             );
164 
165         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
166     }
167 
168     /// Submits a request to obtain information about the specified prepared statement, and waits for completion.
169     immutable(Answer) describePrepared(string statementName)
170     {
171         PGresult* pgResult = PQdescribePrepared(conn, toStringz(statementName));
172 
173         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
174         auto container = createResultContainer(cast(immutable) pgResult);
175 
176         return new immutable Answer(container);
177     }
178 
179     /// Submits a request to obtain information about the specified prepared statement, without waiting for completion.
180     void sendDescribePrepared(string statementName)
181     {
182         size_t r = PQsendDescribePrepared(conn, statementName.toStringz);
183 
184         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
185     }
186 
187     /// Submits a request to obtain information about the specified portal, and waits for completion.
188     immutable(Answer) describePortal(string portalName)
189     {
190         PGresult* pgResult = PQdescribePortal(conn, portalName.toStringz);
191 
192         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
193         auto container = createResultContainer(cast(immutable) pgResult);
194 
195         return new immutable Answer(container);
196     }
197 
198     /// Sends a buffer of CSV data to the COPY command
199     ///
200     /// Returns: true if the data was queued, false if it was not queued because of full buffers (this will only happen in nonblocking mode)
201     bool putCopyData( string data )
202     {
203         const int r = PQputCopyData(conn, data.toStringz, data.length.to!int);
204 
205         if(r == -1) throw new ConnectionException(this);
206 
207         return r != 0;
208     }
209 
210     /// Signals that COPY data send is finished. Finalize and flush the COPY command.
211     immutable(Answer) putCopyEnd()
212     {
213         assert(!isNonBlocking, "Only for blocking connections");
214 
215         const bool r = sendPutCopyEnd;
216 
217         assert(r, "Impossible status for blocking connections");
218 
219         // after the copying is finished, and there is no connection error, we must still get the command result
220         // this will get if there is any errors in the process (invalid data format or constraint violation, etc.)
221         auto pgResult = PQgetResult(conn);
222 
223         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
224         auto container = createResultContainer(cast(immutable) pgResult);
225 
226         return new immutable Answer(container);
227     }
228 
229     /// Signals that COPY data send is finished.
230     ///
231     /// Returns: true if the termination data was sent, zero if it was not sent because the attempt would block (this case is only possible if the connection is in nonblocking mode)
232     bool sendPutCopyEnd()
233     {
234         const char* error;
235         const int r = PQputCopyEnd(conn, error);
236 
237         if(error !is null) throw new ConnectionException(error.to!string);
238 
239         if(r == -1) throw new ConnectionException(this);
240 
241         return r != 0;
242     }
243 
244     // Waiting for completion of reading or writing
245     // Returns: timeout is not occured
246     version(integration_tests)
247     bool waitEndOf(WaitType type, Duration timeout = Duration.zero)
248     {
249         import std.socket;
250 
251         auto socket = this.socket();
252         auto set = new SocketSet;
253         set.add(socket);
254 
255         while(true)
256         {
257             if(status() == CONNECTION_BAD)
258                 throw new ConnectionException(this, __FILE__, __LINE__);
259 
260             if(poll() == PGRES_POLLING_OK)
261             {
262                 return true;
263             }
264             else
265             {
266                 size_t sockNum;
267 
268                 with(WaitType)
269                 final switch(type)
270                 {
271                     case READ:
272                         sockNum = Socket.select(set, null, set, timeout);
273                         break;
274 
275                     case WRITE:
276                         sockNum = Socket.select(null, set, set, timeout);
277                         break;
278 
279                     case READ_WRITE:
280                         sockNum = Socket.select(set, set, set, timeout);
281                         break;
282                 }
283 
284                 enforce(sockNum >= 0);
285                 if(sockNum == 0) return false; // timeout is occurred
286 
287                 continue;
288             }
289         }
290     }
291 }
292 
293 version(integration_tests)
294 enum WaitType
295 {
296     READ,
297     WRITE,
298     READ_WRITE
299 }
300 
301 version (integration_tests)
302 void _integration_test( string connParam ) @trusted
303 {
304     import dpq2.conv.to_d_types;
305     import dpq2.conv.to_bson;
306     import dpq2.connection: createTestConn;
307 
308     auto conn = createTestConn(connParam);
309 
310     // Text type arguments testing
311     {
312         string sql_query =
313         "select now() as time, 'abc'::text as string, 123, 456.78\n"~
314         "union all\n"~
315         "select now(), 'абвгд'::text, 777, 910.11\n"~
316         "union all\n"~
317         "select NULL, 'ijk'::text, 789, 12345.115345";
318 
319         auto a = conn.exec( sql_query );
320 
321         assert( a.cmdStatus.length > 2 );
322         assert( a.columnCount == 4 );
323         assert( a.length == 3 );
324         assert( a.columnFormat(1) == ValueFormat.TEXT );
325         assert( a.columnFormat(2) == ValueFormat.TEXT );
326     }
327 
328     // Binary type arguments testing
329     {
330         import vibe.data.bson: Bson;
331 
332         const string sql_query =
333         "select $1::text, $2::integer, $3::text, $4, $5::integer[]";
334 
335         Value[5] args;
336         args[0] = toValue("абвгд");
337         args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value
338         args[2] = toValue("123");
339         args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value
340 
341         Bson binArray = Bson([
342             Bson([Bson(null), Bson(123), Bson(456)]),
343             Bson([Bson(0), Bson(789), Bson(null)])
344         ]);
345 
346         args[4] = bsonToValue(binArray);
347 
348         QueryParams p;
349         p.sqlCommand = sql_query;
350         p.args = args[];
351 
352         auto a = conn.execParams( p );
353 
354         foreach(i; 0 .. args.length)
355             assert(a.columnFormat(i) == ValueFormat.BINARY);
356 
357         assert( a.OID(0) == OidType.Text );
358         assert( a.OID(1) == OidType.Int4 );
359         assert( a.OID(2) == OidType.Text );
360         assert( a.OID(3) == OidType.Int8 );
361         assert( a.OID(4) == OidType.Int4Array );
362 
363         // binary args array test
364         assert( a[0][4].as!Bson == binArray );
365     }
366 
367     {
368         // Bug #52: empty text argument
369         QueryParams p;
370         Value v = toValue("");
371 
372         p.sqlCommand = "SELECT $1";
373         p.args = [v];
374 
375         auto a = conn.execParams(p);
376 
377         assert( !a[0][0].isNull );
378         assert( a[0][0].as!string == "" );
379     }
380 
381     // checking prepared statements
382     {
383         // uses PQprepare:
384         conn.prepareEx("prepared statement 1", "SELECT $1::integer");
385 
386         QueryParams p;
387         p.preparedStatementName = "prepared statement 1";
388         p.args = [42.toValue];
389         auto r = conn.execPrepared(p);
390         assert (r[0][0].as!int == 42);
391     }
392     {
393         // uses PQsendPrepare:
394         conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer");
395 
396         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
397         conn.consumeInput();
398 
399         immutable(Result)[] res;
400 
401         while(true)
402         {
403             auto r = conn.getResult();
404             if(r is null) break;
405             res ~= r;
406         }
407 
408         assert(res.length == 1);
409         assert(res[0].status == PGRES_COMMAND_OK);
410     }
411     {
412         // check prepared arg types and result types
413         auto a = conn.describePrepared("prepared statement 2");
414 
415         assert(a.nParams == 2);
416         assert(a.paramType(0) == OidType.Text);
417         assert(a.paramType(1) == OidType.Int4);
418     }
419 
420     // checking portal description
421     {
422         conn.exec(`BEGIN`);
423         conn.exec(`DECLARE test_cursor1 CURSOR FOR SELECT 123::integer`);
424         auto r = conn.describePortal(`test_cursor1`);
425         conn.exec(`COMMIT`);
426     }
427 
428     {
429         // async check prepared arg types and result types
430         conn.sendDescribePrepared("prepared statement 2");
431 
432         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
433         conn.consumeInput();
434 
435         immutable(Result)[] res;
436 
437         while(true)
438         {
439             auto r = conn.getResult();
440             if(r is null) break;
441             res ~= r;
442         }
443 
444         assert(res.length == 1);
445         assert(res[0].status == PGRES_COMMAND_OK);
446 
447         auto a = res[0].getAnswer;
448 
449         assert(a.nParams == 2);
450         assert(a.paramType(0) == OidType.Text);
451         assert(a.paramType(1) == OidType.Int4);
452     }
453     {
454         QueryParams p;
455         p.preparedStatementName = "prepared statement 2";
456         p.argsFromArray = ["abc", "123456"];
457 
458         conn.sendQueryPrepared(p);
459 
460         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
461         conn.consumeInput();
462 
463         immutable(Result)[] res;
464 
465         while(true)
466         {
467             auto r = conn.getResult();
468             if(r is null) break;
469             res ~= r;
470         }
471 
472         assert(res.length == 1);
473         assert(res[0].getAnswer[0][0].as!PGtext == "abc");
474         assert(res[0].getAnswer[0][1].as!PGinteger == 123456);
475     }
476     {
477         // test COPY
478         conn.exec("CREATE TEMP TABLE test_copy (text_field TEXT, int_field INT8)");
479 
480         conn.exec("COPY test_copy FROM STDIN WITH (FORMAT csv)");
481         conn.putCopyData("Val1,1\nval2,2\n");
482         conn.putCopyData("Val3,3\nval4,4\n");
483         conn.putCopyEnd();
484 
485         auto res = conn.exec("SELECT count(text_field), sum(int_field) FROM test_copy");
486         assert(res.length == 1);
487         assert(res[0][0].as!string == "4");
488         assert(res[0][1].as!string == "10");
489 
490         // This time with error
491         import std.exception: assertThrown;
492         import dpq2.result: ResponseException;
493 
494         conn.exec("COPY test_copy FROM STDIN WITH (FORMAT csv)");
495         conn.putCopyData("Val1,2\nval2,4,POORLY_FORMATTED_CSV\n");
496 
497         assertThrown!ResponseException(conn.putCopyEnd());
498     }
499 
500     import std.socket;
501     conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection
502 
503     {
504         bool exceptionFlag = false;
505         string errorMsg;
506 
507         try conn.exec("SELECT 'abc'::text").getAnswer;
508         catch(ConnectionException e)
509         {
510             exceptionFlag = true;
511             errorMsg = e.msg;
512             assert(e.msg.length > 15); // error message check
513         }
514         finally {
515             assert(exceptionFlag, errorMsg);
516         }
517     }
518 }