1 /// Query methods
2 module dpq2.query;
3 
4 public import dpq2.args;
5 
6 import dpq2.connection: Connection, ConnectionException;
7 import dpq2.result: Result;
8 import dpq2.value;
9 import dpq2.oids: OidType;
10 import derelict.pq.pq;
11 import core.time: Duration, dur;
12 import std.exception: enforce;
13 
14 /// Extends Connection by adding query methods
15 ///
16 /// Just use it as Connection.* methods.
17 mixin template Queries()
18 {
19     /// Perform SQL query to DB
20     /// It uses the old wire protocol and all values are returned in textual
21     /// form. This means that the dpq2.conv.to_d_types.as template will likely
22     /// not work for anything but strings.
23     /// Try to used execParams instead, even if now parameters are present.
24     immutable (Answer) exec( string SQLcmd )
25     {
26         auto pgResult = PQexec(conn, toStringz( SQLcmd ));
27 
28         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
29         auto container = createResultContainer(cast(immutable) pgResult);
30 
31         return new immutable Answer(container);
32     }
33 
34     /// Perform SQL query to DB
35     immutable (Answer) execParams(in ref QueryParams qp)
36     {
37         auto p = InternalQueryParams(&qp);
38         auto pgResult = PQexecParams (
39                 conn,
40                 p.command,
41                 p.nParams,
42                 p.paramTypes,
43                 p.paramValues,
44                 p.paramLengths,
45                 p.paramFormats,
46                 p.resultFormat
47         );
48 
49         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
50         auto container = createResultContainer(cast(immutable) pgResult);
51 
52         return new immutable Answer(container);
53     }
54 
55     /// Submits a command to the server without waiting for the result(s)
56     void sendQuery( string SQLcmd )
57     {
58         const size_t r = PQsendQuery( conn, toStringz(SQLcmd) );
59         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
60     }
61 
62     /// Submits a command and separate parameters to the server without waiting for the result(s)
63     void sendQueryParams(in ref QueryParams qp)
64     {
65         auto p = InternalQueryParams(&qp);
66         size_t r = PQsendQueryParams (
67                 conn,
68                 p.command,
69                 p.nParams,
70                 p.paramTypes,
71                 p.paramValues,
72                 p.paramLengths,
73                 p.paramFormats,
74                 p.resultFormat
75             );
76 
77         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
78     }
79 
80     /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s)
81     void sendQueryPrepared(in ref QueryParams qp)
82     {
83         auto p = InternalQueryParams(&qp);
84         size_t r = PQsendQueryPrepared(
85                 conn,
86                 p.stmtName,
87                 p.nParams,
88                 p.paramValues,
89                 p.paramLengths,
90                 p.paramFormats,
91                 p.resultFormat
92             );
93 
94         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
95     }
96 
97     /// Returns null if no notifies was received
98     Notify getNextNotify()
99     {
100         consumeInput();
101         auto n = PQnotifies(conn);
102         return n is null ? null : new Notify( n );
103     }
104 
105     /// Submits a request to create a prepared statement with the given parameters, and waits for completion.
106     /// Returns: Result of query preparing
107     immutable(Result) prepare(string statementName, string sqlStatement, in Oid[] oids = null)
108     {
109         PGresult* pgResult = PQprepare(
110                 conn,
111                 toStringz(statementName),
112                 toStringz(sqlStatement),
113                 oids.length.to!int,
114                 oids.ptr
115             );
116 
117         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
118         auto container = createResultContainer(cast(immutable) pgResult);
119 
120         return new immutable Result(container);
121     }
122 
123     /// Submits a request to create a prepared statement with the given parameters, and waits for completion.
124     ///
125     /// Throws an exception if preparing failed.
126     void prepareEx(string statementName, string sqlStatement, in Oid[] oids = null)
127     {
128         auto r = prepare(statementName, sqlStatement, oids);
129 
130         if(r.status != PGRES_COMMAND_OK)
131             throw new ResponseException(r, __FILE__, __LINE__);
132     }
133 
134     /// Submits a request to execute a prepared statement with given parameters, and waits for completion.
135     immutable(Answer) execPrepared(in ref QueryParams qp)
136     {
137         auto p = InternalQueryParams(&qp);
138         auto pgResult = PQexecPrepared(
139                 conn,
140                 p.stmtName,
141                 p.nParams,
142                 cast(const(char*)*)p.paramValues,
143                 p.paramLengths,
144                 p.paramFormats,
145                 p.resultFormat
146             );
147 
148         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
149         auto container = createResultContainer(cast(immutable) pgResult);
150 
151         return new immutable Answer(container);
152     }
153 
154     /// Sends a request to create a prepared statement with the given parameters, without waiting for completion.
155     void sendPrepare(string statementName, string sqlStatement, in Oid[] oids = null)
156     {
157         size_t r = PQsendPrepare(
158                 conn,
159                 toStringz(statementName),
160                 toStringz(sqlStatement),
161                 oids.length.to!int,
162                 oids.ptr
163             );
164 
165         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
166     }
167 
168     /// Submits a request to obtain information about the specified prepared statement, and waits for completion.
169     immutable(Answer) describePrepared(string statementName)
170     {
171         PGresult* pgResult = PQdescribePrepared(conn, toStringz(statementName));
172 
173         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
174         auto container = createResultContainer(cast(immutable) pgResult);
175 
176         return new immutable Answer(container);
177     }
178 
179     /// Submits a request to obtain information about the specified prepared statement, without waiting for completion.
180     void sendDescribePrepared(string statementName)
181     {
182         size_t r = PQsendDescribePrepared(conn, statementName.toStringz);
183 
184         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
185     }
186 
187     /// Submits a request to obtain information about the specified portal, and waits for completion.
188     immutable(Answer) describePortal(string portalName)
189     {
190         PGresult* pgResult = PQdescribePortal(conn, portalName.toStringz);
191 
192         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
193         auto container = createResultContainer(cast(immutable) pgResult);
194 
195         return new immutable Answer(container);
196     }
197 
198     /// Sends a buffer of CSV data to the COPY command
199     ///
200     /// Returns: true if the data was queued, false if it was not queued because of full buffers (this will only happen in nonblocking mode)
201     bool putCopyData( string data )
202     {
203         const int r = PQputCopyData(conn, data.toStringz, data.length.to!int);
204 
205         if(r == -1) throw new ConnectionException(this);
206 
207         return r != 0;
208     }
209 
210     /// Signals that COPY data send is finished. Finalize and flush the COPY command.
211     immutable(Answer) putCopyEnd()
212     {
213         assert(!isNonBlocking, "Only for blocking connections");
214 
215         const bool r = sendPutCopyEnd;
216 
217         assert(r, "Impossible status for blocking connections");
218 
219         // after the copying is finished, and there is no connection error, we must still get the command result
220         // this will get if there is any errors in the process (invalid data format or constraint violation, etc.)
221         auto pgResult = PQgetResult(conn);
222 
223         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
224         auto container = createResultContainer(cast(immutable) pgResult);
225 
226         return new immutable Answer(container);
227     }
228 
229     /// Signals that COPY data send is finished.
230     ///
231     /// Returns: true if the termination data was sent, zero if it was not sent because the attempt would block (this case is only possible if the connection is in nonblocking mode)
232     bool sendPutCopyEnd()
233     {
234         const char* error;
235         const int r = PQputCopyEnd(conn, error);
236 
237         if(error !is null) throw new ConnectionException(error.to!string);
238 
239         if(r == -1) throw new ConnectionException(this);
240 
241         return r != 0;
242     }
243 
244     // Waiting for completion of reading or writing
245     // Returns: timeout is not occured
246     version(integration_tests)
247     bool waitEndOf(WaitType type, Duration timeout = Duration.zero)
248     {
249         import std.socket;
250 
251         auto socket = this.socket();
252         auto set = new SocketSet;
253         set.add(socket);
254 
255         while(true)
256         {
257             if(status() == CONNECTION_BAD)
258                 throw new ConnectionException(this, __FILE__, __LINE__);
259 
260             if(poll() == PGRES_POLLING_OK)
261             {
262                 return true;
263             }
264             else
265             {
266                 size_t sockNum;
267 
268                 with(WaitType)
269                 final switch(type)
270                 {
271                     case READ:
272                         sockNum = Socket.select(set, null, set, timeout);
273                         break;
274 
275                     case WRITE:
276                         sockNum = Socket.select(null, set, set, timeout);
277                         break;
278 
279                     case READ_WRITE:
280                         sockNum = Socket.select(set, set, set, timeout);
281                         break;
282                 }
283 
284                 enforce(sockNum >= 0);
285                 if(sockNum == 0) return false; // timeout is occurred
286 
287                 continue;
288             }
289         }
290     }
291 }
292 
293 version(integration_tests)
294 enum WaitType
295 {
296     READ,
297     WRITE,
298     READ_WRITE
299 }
300 
301 version (integration_tests)
302 void _integration_test( string connParam ) @trusted
303 {
304     import dpq2.conv.to_d_types;
305     import dpq2.conv.to_bson;
306 
307     auto conn = new Connection(connParam);
308 
309     // Text type arguments testing
310     {
311         string sql_query =
312         "select now() as time, 'abc'::text as string, 123, 456.78\n"~
313         "union all\n"~
314         "select now(), 'абвгд'::text, 777, 910.11\n"~
315         "union all\n"~
316         "select NULL, 'ijk'::text, 789, 12345.115345";
317 
318         auto a = conn.exec( sql_query );
319 
320         assert( a.cmdStatus.length > 2 );
321         assert( a.columnCount == 4 );
322         assert( a.length == 3 );
323         assert( a.columnFormat(1) == ValueFormat.TEXT );
324         assert( a.columnFormat(2) == ValueFormat.TEXT );
325     }
326 
327     // Binary type arguments testing
328     {
329         import vibe.data.bson: Bson;
330 
331         const string sql_query =
332         "select $1::text, $2::integer, $3::text, $4, $5::integer[]";
333 
334         Value[5] args;
335         args[0] = toValue("абвгд");
336         args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value
337         args[2] = toValue("123");
338         args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value
339 
340         Bson binArray = Bson([
341             Bson([Bson(null), Bson(123), Bson(456)]),
342             Bson([Bson(0), Bson(789), Bson(null)])
343         ]);
344 
345         args[4] = bsonToValue(binArray);
346 
347         QueryParams p;
348         p.sqlCommand = sql_query;
349         p.args = args[];
350 
351         auto a = conn.execParams( p );
352 
353         foreach(i; 0 .. args.length)
354             assert(a.columnFormat(i) == ValueFormat.BINARY);
355 
356         assert( a.OID(0) == OidType.Text );
357         assert( a.OID(1) == OidType.Int4 );
358         assert( a.OID(2) == OidType.Text );
359         assert( a.OID(3) == OidType.Int8 );
360         assert( a.OID(4) == OidType.Int4Array );
361 
362         // binary args array test
363         assert( a[0][4].as!Bson == binArray );
364     }
365 
366     {
367         // Bug #52: empty text argument
368         QueryParams p;
369         Value v = toValue("");
370 
371         p.sqlCommand = "SELECT $1";
372         p.args = [v];
373 
374         auto a = conn.execParams(p);
375 
376         assert( !a[0][0].isNull );
377         assert( a[0][0].as!string == "" );
378     }
379 
380     // checking prepared statements
381     {
382         // uses PQprepare:
383         conn.prepareEx("prepared statement 1", "SELECT $1::integer");
384 
385         QueryParams p;
386         p.preparedStatementName = "prepared statement 1";
387         p.args = [42.toValue];
388         auto r = conn.execPrepared(p);
389         assert (r[0][0].as!int == 42);
390     }
391     {
392         // uses PQsendPrepare:
393         conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer");
394 
395         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
396         conn.consumeInput();
397 
398         immutable(Result)[] res;
399 
400         while(true)
401         {
402             auto r = conn.getResult();
403             if(r is null) break;
404             res ~= r;
405         }
406 
407         assert(res.length == 1);
408         assert(res[0].status == PGRES_COMMAND_OK);
409     }
410     {
411         // check prepared arg types and result types
412         auto a = conn.describePrepared("prepared statement 2");
413 
414         assert(a.nParams == 2);
415         assert(a.paramType(0) == OidType.Text);
416         assert(a.paramType(1) == OidType.Int4);
417     }
418 
419     // checking portal description
420     {
421         conn.exec(`BEGIN`);
422         conn.exec(`DECLARE test_cursor1 CURSOR FOR SELECT 123::integer`);
423         auto r = conn.describePortal(`test_cursor1`);
424         conn.exec(`COMMIT`);
425     }
426 
427     {
428         // async check prepared arg types and result types
429         conn.sendDescribePrepared("prepared statement 2");
430 
431         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
432         conn.consumeInput();
433 
434         immutable(Result)[] res;
435 
436         while(true)
437         {
438             auto r = conn.getResult();
439             if(r is null) break;
440             res ~= r;
441         }
442 
443         assert(res.length == 1);
444         assert(res[0].status == PGRES_COMMAND_OK);
445 
446         auto a = res[0].getAnswer;
447 
448         assert(a.nParams == 2);
449         assert(a.paramType(0) == OidType.Text);
450         assert(a.paramType(1) == OidType.Int4);
451     }
452     {
453         QueryParams p;
454         p.preparedStatementName = "prepared statement 2";
455         p.argsFromArray = ["abc", "123456"];
456 
457         conn.sendQueryPrepared(p);
458 
459         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
460         conn.consumeInput();
461 
462         immutable(Result)[] res;
463 
464         while(true)
465         {
466             auto r = conn.getResult();
467             if(r is null) break;
468             res ~= r;
469         }
470 
471         assert(res.length == 1);
472         assert(res[0].getAnswer[0][0].as!PGtext == "abc");
473         assert(res[0].getAnswer[0][1].as!PGinteger == 123456);
474     }
475     {
476         // test COPY
477         conn.exec("CREATE TEMP TABLE test_copy (text_field TEXT, int_field INT8)");
478 
479         conn.exec("COPY test_copy FROM STDIN WITH (FORMAT csv)");
480         conn.putCopyData("Val1,1\nval2,2\n");
481         conn.putCopyData("Val3,3\nval4,4\n");
482         conn.putCopyEnd();
483 
484         auto res = conn.exec("SELECT count(text_field), sum(int_field) FROM test_copy");
485         assert(res.length == 1);
486         assert(res[0][0].as!string == "4");
487         assert(res[0][1].as!string == "10");
488 
489         // This time with error
490         import std.exception: assertThrown;
491         import dpq2.result: ResponseException;
492 
493         conn.exec("COPY test_copy FROM STDIN WITH (FORMAT csv)");
494         conn.putCopyData("Val1,2\nval2,4,POORLY_FORMATTED_CSV\n");
495 
496         assertThrown!ResponseException(conn.putCopyEnd());
497     }
498 
499     import std.socket;
500     conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection
501 
502     {
503         bool exceptionFlag = false;
504         string errorMsg;
505 
506         try conn.exec("SELECT 'abc'::text").getAnswer;
507         catch(ConnectionException e)
508         {
509             exceptionFlag = true;
510             errorMsg = e.msg;
511             assert(e.msg.length > 15); // error message check
512         }
513         finally {
514             assert(exceptionFlag, errorMsg);
515         }
516     }
517 }