1 /// Query methods
2 module dpq2.query;
3 
4 public import dpq2.args;
5 
6 import dpq2.connection: Connection, ConnectionException;
7 import dpq2.result: Result;
8 import dpq2.value;
9 import dpq2.oids: OidType;
10 import derelict.pq.pq;
11 import core.time: Duration, dur;
12 import std.exception: enforce;
13 
14 /// Extends Connection by adding query methods
15 ///
16 /// Just use it as Connection.* methods.
17 mixin template Queries()
18 {
19     /// Perform SQL query to DB
20     /// It uses the old wire protocol and all values are returned in textual
21     /// form. This means that the dpq2.conv.to_d_types.as template will likely
22     /// not work for anything but strings.
23     /// Try to used execParams instead, even if now parameters are present.
24     immutable (Answer) exec( string SQLcmd )
25     {
26         auto pgResult = PQexec(conn, toStringz( SQLcmd ));
27 
28         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
29         auto container = createResultContainer(cast(immutable) pgResult);
30 
31         return new immutable Answer(container);
32     }
33 
34     /// Perform SQL query to DB
35     immutable (Answer) execParams(in ref QueryParams qp)
36     {
37         auto p = InternalQueryParams(&qp);
38         auto pgResult = PQexecParams (
39                 conn,
40                 p.command,
41                 p.nParams,
42                 p.paramTypes,
43                 p.paramValues,
44                 p.paramLengths,
45                 p.paramFormats,
46                 p.resultFormat
47         );
48 
49         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
50         auto container = createResultContainer(cast(immutable) pgResult);
51 
52         return new immutable Answer(container);
53     }
54 
55     /// Submits a command to the server without waiting for the result(s)
56     void sendQuery( string SQLcmd )
57     {
58         const size_t r = PQsendQuery( conn, toStringz(SQLcmd) );
59         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
60     }
61 
62     /// Submits a command and separate parameters to the server without waiting for the result(s)
63     void sendQueryParams(in ref QueryParams qp)
64     {
65         auto p = InternalQueryParams(&qp);
66         size_t r = PQsendQueryParams (
67                 conn,
68                 p.command,
69                 p.nParams,
70                 p.paramTypes,
71                 p.paramValues,
72                 p.paramLengths,
73                 p.paramFormats,
74                 p.resultFormat
75             );
76 
77         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
78     }
79 
80     /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s)
81     void sendQueryPrepared(in ref QueryParams qp)
82     {
83         auto p = InternalQueryParams(&qp);
84         size_t r = PQsendQueryPrepared(
85                 conn,
86                 p.stmtName,
87                 p.nParams,
88                 p.paramValues,
89                 p.paramLengths,
90                 p.paramFormats,
91                 p.resultFormat
92             );
93 
94         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
95     }
96 
97     /// Returns null if no notifies was received
98     Notify getNextNotify()
99     {
100         consumeInput();
101         auto n = PQnotifies(conn);
102         return n is null ? null : new Notify( n );
103     }
104 
105     /// Submits a request to create a prepared statement with the given parameters, and waits for completion.
106     /// Returns: Result of query preparing
107     immutable(Result) prepare(string statementName, string sqlStatement, in Oid[] oids = null)
108     {
109         PGresult* pgResult = PQprepare(
110                 conn,
111                 toStringz(statementName),
112                 toStringz(sqlStatement),
113                 oids.length.to!int,
114                 oids.ptr
115             );
116 
117         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
118         auto container = createResultContainer(cast(immutable) pgResult);
119 
120         return new immutable Result(container);
121     }
122 
123     /// Submits a request to create a prepared statement with the given parameters, and waits for completion.
124     ///
125     /// Throws an exception if preparing failed.
126     void prepareEx(string statementName, string sqlStatement, in Oid[] oids = null)
127     {
128         auto r = prepare(statementName, sqlStatement, oids);
129 
130         if(r.status != PGRES_COMMAND_OK)
131             throw new ResponseException(r, __FILE__, __LINE__);
132     }
133 
134     /// Submits a request to execute a prepared statement with given parameters, and waits for completion.
135     immutable(Answer) execPrepared(in ref QueryParams qp)
136     {
137         auto p = InternalQueryParams(&qp);
138         auto pgResult = PQexecPrepared(
139                 conn,
140                 p.stmtName,
141                 p.nParams,
142                 cast(const(char*)*)p.paramValues,
143                 p.paramLengths,
144                 p.paramFormats,
145                 p.resultFormat
146             );
147 
148         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
149         auto container = createResultContainer(cast(immutable) pgResult);
150 
151         return new immutable Answer(container);
152     }
153 
154     /// Sends a request to create a prepared statement with the given parameters, without waiting for completion.
155     void sendPrepare(string statementName, string sqlStatement, in Oid[] oids = null)
156     {
157         size_t r = PQsendPrepare(
158                 conn,
159                 toStringz(statementName),
160                 toStringz(sqlStatement),
161                 oids.length.to!int,
162                 oids.ptr
163             );
164 
165         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
166     }
167 
168     /// Submits a request to obtain information about the specified prepared statement, and waits for completion.
169     immutable(Answer) describePrepared(string statementName)
170     {
171         PGresult* pgResult = PQdescribePrepared(conn, toStringz(statementName));
172 
173         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
174         auto container = createResultContainer(cast(immutable) pgResult);
175 
176         return new immutable Answer(container);
177     }
178 
179     /// Submits a request to obtain information about the specified prepared statement, without waiting for completion.
180     void sendDescribePrepared(string statementName)
181     {
182         size_t r = PQsendDescribePrepared(conn, statementName.toStringz);
183 
184         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
185     }
186 
187     /// Sends a buffer of CSV data to the COPY command
188     ///
189     /// Returns: true if the data was queued, false if it was not queued because of full buffers (this will only happen in nonblocking mode)
190     bool putCopyData( string data )
191     {
192         const int r = PQputCopyData(conn, data.toStringz, data.length.to!int);
193 
194         if(r == -1) throw new ConnectionException(this);
195 
196         return r != 0;
197     }
198 
199     /// Signals that COPY data send is finished. Finalize and flush the COPY command.
200     immutable(Answer) putCopyEnd()
201     {
202         assert(!isNonBlocking, "Only for blocking connections");
203 
204         const bool r = sendPutCopyEnd;
205 
206         assert(r, "Impossible status for blocking connections");
207 
208         // after the copying is finished, and there is no connection error, we must still get the command result
209         // this will get if there is any errors in the process (invalid data format or constraint violation, etc.)
210         auto pgResult = PQgetResult(conn);
211 
212         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
213         auto container = createResultContainer(cast(immutable) pgResult);
214 
215         return new immutable Answer(container);
216     }
217 
218     /// Signals that COPY data send is finished.
219     ///
220     /// Returns: true if the termination data was sent, zero if it was not sent because the attempt would block (this case is only possible if the connection is in nonblocking mode)
221     bool sendPutCopyEnd()
222     {
223         const char* error;
224         const int r = PQputCopyEnd(conn, error);
225 
226         if(error !is null) throw new ConnectionException(error.to!string);
227 
228         if(r == -1) throw new ConnectionException(this);
229 
230         return r != 0;
231     }
232 
233     // Waiting for completion of reading or writing
234     // Returns: timeout is not occured
235     version(integration_tests)
236     bool waitEndOf(WaitType type, Duration timeout = Duration.zero)
237     {
238         import std.socket;
239 
240         auto socket = this.socket();
241         auto set = new SocketSet;
242         set.add(socket);
243 
244         while(true)
245         {
246             if(status() == CONNECTION_BAD)
247                 throw new ConnectionException(this, __FILE__, __LINE__);
248 
249             if(poll() == PGRES_POLLING_OK)
250             {
251                 return true;
252             }
253             else
254             {
255                 size_t sockNum;
256 
257                 with(WaitType)
258                 final switch(type)
259                 {
260                     case READ:
261                         sockNum = Socket.select(set, null, set, timeout);
262                         break;
263 
264                     case WRITE:
265                         sockNum = Socket.select(null, set, set, timeout);
266                         break;
267 
268                     case READ_WRITE:
269                         sockNum = Socket.select(set, set, set, timeout);
270                         break;
271                 }
272 
273                 enforce(sockNum >= 0);
274                 if(sockNum == 0) return false; // timeout is occurred
275 
276                 continue;
277             }
278         }
279     }
280 }
281 
282 version(integration_tests)
283 enum WaitType
284 {
285     READ,
286     WRITE,
287     READ_WRITE
288 }
289 
290 version (integration_tests)
291 void _integration_test( string connParam ) @trusted
292 {
293     import dpq2.conv.to_d_types;
294     import dpq2.conv.to_bson;
295 
296     auto conn = new Connection(connParam);
297 
298     // Text type arguments testing
299     {
300         string sql_query =
301         "select now() as time, 'abc'::text as string, 123, 456.78\n"~
302         "union all\n"~
303         "select now(), 'абвгд'::text, 777, 910.11\n"~
304         "union all\n"~
305         "select NULL, 'ijk'::text, 789, 12345.115345";
306 
307         auto a = conn.exec( sql_query );
308 
309         assert( a.cmdStatus.length > 2 );
310         assert( a.columnCount == 4 );
311         assert( a.length == 3 );
312         assert( a.columnFormat(1) == ValueFormat.TEXT );
313         assert( a.columnFormat(2) == ValueFormat.TEXT );
314     }
315 
316     // Binary type arguments testing
317     {
318         import vibe.data.bson: Bson;
319 
320         const string sql_query =
321         "select $1::text, $2::integer, $3::text, $4, $5::integer[]";
322 
323         Value[5] args;
324         args[0] = toValue("абвгд");
325         args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value
326         args[2] = toValue("123");
327         args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value
328 
329         Bson binArray = Bson([
330             Bson([Bson(null), Bson(123), Bson(456)]),
331             Bson([Bson(0), Bson(789), Bson(null)])
332         ]);
333 
334         args[4] = bsonToValue(binArray);
335 
336         QueryParams p;
337         p.sqlCommand = sql_query;
338         p.args = args[];
339 
340         auto a = conn.execParams( p );
341 
342         foreach(i; 0 .. args.length)
343             assert(a.columnFormat(i) == ValueFormat.BINARY);
344 
345         assert( a.OID(0) == OidType.Text );
346         assert( a.OID(1) == OidType.Int4 );
347         assert( a.OID(2) == OidType.Text );
348         assert( a.OID(3) == OidType.Int8 );
349         assert( a.OID(4) == OidType.Int4Array );
350 
351         // binary args array test
352         assert( a[0][4].as!Bson == binArray );
353     }
354 
355     {
356         // Bug #52: empty text argument
357         QueryParams p;
358         Value v = toValue("");
359 
360         p.sqlCommand = "SELECT $1";
361         p.args = [v];
362 
363         auto a = conn.execParams(p);
364 
365         assert( !a[0][0].isNull );
366         assert( a[0][0].as!string == "" );
367     }
368 
369     // checking prepared statements
370     {
371         // uses PQprepare:
372         conn.prepareEx("prepared statement 1", "SELECT $1::integer");
373 
374         QueryParams p;
375         p.preparedStatementName = "prepared statement 1";
376         p.args = [42.toValue];
377         auto r = conn.execPrepared(p);
378         assert (r[0][0].as!int == 42);
379     }
380     {
381         // uses PQsendPrepare:
382         conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer");
383 
384         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
385         conn.consumeInput();
386 
387         immutable(Result)[] res;
388 
389         while(true)
390         {
391             auto r = conn.getResult();
392             if(r is null) break;
393             res ~= r;
394         }
395 
396         assert(res.length == 1);
397         assert(res[0].status == PGRES_COMMAND_OK);
398     }
399     {
400         // check prepared arg types and result types
401         auto a = conn.describePrepared("prepared statement 2");
402 
403         assert(a.nParams == 2);
404         assert(a.paramType(0) == OidType.Text);
405         assert(a.paramType(1) == OidType.Int4);
406     }
407     {
408         // async check prepared arg types and result types
409         conn.sendDescribePrepared("prepared statement 2");
410 
411         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
412         conn.consumeInput();
413 
414         immutable(Result)[] res;
415 
416         while(true)
417         {
418             auto r = conn.getResult();
419             if(r is null) break;
420             res ~= r;
421         }
422 
423         assert(res.length == 1);
424         assert(res[0].status == PGRES_COMMAND_OK);
425 
426         auto a = res[0].getAnswer;
427 
428         assert(a.nParams == 2);
429         assert(a.paramType(0) == OidType.Text);
430         assert(a.paramType(1) == OidType.Int4);
431     }
432     {
433         QueryParams p;
434         p.preparedStatementName = "prepared statement 2";
435         p.argsFromArray = ["abc", "123456"];
436 
437         conn.sendQueryPrepared(p);
438 
439         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
440         conn.consumeInput();
441 
442         immutable(Result)[] res;
443 
444         while(true)
445         {
446             auto r = conn.getResult();
447             if(r is null) break;
448             res ~= r;
449         }
450 
451         assert(res.length == 1);
452         assert(res[0].getAnswer[0][0].as!PGtext == "abc");
453         assert(res[0].getAnswer[0][1].as!PGinteger == 123456);
454     }
455     {
456         // test COPY
457         conn.exec("CREATE TEMP TABLE test_copy (text_field TEXT, int_field INT8)");
458 
459         conn.exec("COPY test_copy FROM STDIN WITH (FORMAT csv)");
460         conn.putCopyData("Val1,1\nval2,2\n");
461         conn.putCopyData("Val3,3\nval4,4\n");
462         conn.putCopyEnd();
463 
464         auto res = conn.exec("SELECT count(text_field), sum(int_field) FROM test_copy");
465         assert(res.length == 1);
466         assert(res[0][0].as!string == "4");
467         assert(res[0][1].as!string == "10");
468 
469         // This time with error
470         import std.exception: assertThrown;
471         import dpq2.result: ResponseException;
472 
473         conn.exec("COPY test_copy FROM STDIN WITH (FORMAT csv)");
474         conn.putCopyData("Val1,2\nval2,4,POORLY_FORMATTED_CSV\n");
475 
476         assertThrown!ResponseException(conn.putCopyEnd());
477     }
478 
479     import std.socket;
480     conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection
481 
482     {
483         bool exceptionFlag = false;
484         string errorMsg;
485 
486         try conn.exec("SELECT 'abc'::text").getAnswer;
487         catch(ConnectionException e)
488         {
489             exceptionFlag = true;
490             errorMsg = e.msg;
491             assert(e.msg.length > 15); // error message check
492         }
493         finally {
494             assert(exceptionFlag, errorMsg);
495         }
496     }
497 }