1 module dpq2.query;
2 
3 public import dpq2.args;
4 
5 import dpq2;
6 import core.time: Duration, dur;
7 import std.exception: enforce;
8 
9 mixin template Queries()
10 {
11     /// Perform SQL query to DB
12     immutable (Answer) exec( string SQLcmd )
13     {
14         auto pgResult = PQexec(conn, toStringz( SQLcmd ));
15 
16         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
17         auto container = createResultContainer(cast(immutable) pgResult);
18 
19         return new immutable Answer(container);
20     }
21 
22     /// Perform SQL query to DB
23     immutable (Answer) execParams(in ref QueryParams qp)
24     {
25         auto p = InternalQueryParams(&qp);
26         auto pgResult = PQexecParams (
27                 conn,
28                 p.command,
29                 p.nParams,
30                 p.paramTypes,
31                 p.paramValues,
32                 p.paramLengths,
33                 p.paramFormats,
34                 p.resultFormat
35         );
36 
37         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
38         auto container = createResultContainer(cast(immutable) pgResult);
39 
40         return new immutable Answer(container);
41     }
42     
43     /// Submits a command to the server without waiting for the result(s)
44     void sendQuery( string SQLcmd )
45     {
46         const size_t r = PQsendQuery( conn, toStringz(SQLcmd) );
47         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
48     }
49 
50     /// Submits a command and separate parameters to the server without waiting for the result(s)
51     void sendQueryParams(in ref QueryParams qp)
52     {
53         auto p = InternalQueryParams(&qp);
54         size_t r = PQsendQueryParams (
55                 conn,
56                 p.command,
57                 p.nParams,
58                 p.paramTypes,
59                 p.paramValues,
60                 p.paramLengths,
61                 p.paramFormats,
62                 p.resultFormat
63             );
64 
65         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
66     }
67 
68     /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s)
69     void sendQueryPrepared(in ref QueryParams qp)
70     {
71         auto p = InternalQueryParams(&qp);
72         size_t r = PQsendQueryPrepared(
73                 conn,
74                 p.stmtName,
75                 p.nParams,
76                 p.paramValues,
77                 p.paramLengths,
78                 p.paramFormats,
79                 p.resultFormat
80             );
81 
82         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
83     }
84 
85     /// Returns null if no notifies was received
86     Notify getNextNotify()
87     {
88         consumeInput();
89         auto n = PQnotifies(conn);
90         return n is null ? null : new Notify( n );
91     }
92 
93     /// Submits a request to create a prepared statement with the given parameters, and waits for completion.
94     immutable(Result) prepare(string statementName, string sqlStatement, in Oid[] oids = null)
95     {
96         PGresult* pgResult = PQprepare(
97                 conn,
98                 toStringz(statementName),
99                 toStringz(sqlStatement),
100                 oids.length.to!int,
101                 oids.ptr
102             );
103 
104         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
105         auto container = createResultContainer(cast(immutable) pgResult);
106 
107         return new immutable Result(container);
108     }
109 
110     /// Submits a request to execute a prepared statement with given parameters, and waits for completion.
111     immutable(Answer) execPrepared(in ref QueryParams qp)
112     {
113         auto p = InternalQueryParams(&qp);
114         auto pgResult = PQexecPrepared(
115                 conn,
116                 p.stmtName,
117                 p.nParams,
118                 cast(const(char*)*)p.paramValues,
119                 p.paramLengths,
120                 p.paramFormats,
121                 p.resultFormat
122             );
123 
124         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
125         auto container = createResultContainer(cast(immutable) pgResult);
126 
127         return new immutable Answer(container);
128     }
129 
130     /// Sends a request to create a prepared statement with the given parameters, without waiting for completion.
131     void sendPrepare(string statementName, string sqlStatement, in Oid[] oids = null)
132     {
133         size_t r = PQsendPrepare(
134                 conn,
135                 toStringz(statementName),
136                 toStringz(sqlStatement),
137                 oids.length.to!int,
138                 cast(Oid*)oids.ptr //const should be accepted here, see https://github.com/DerelictOrg/DerelictPQ/issues/21
139             );
140 
141         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
142     }
143 
144     immutable(Answer) describePrepared(string statementName)
145     {
146         PGresult* pgResult = PQdescribePrepared(conn, toStringz(statementName));
147 
148         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
149         auto container = createResultContainer(cast(immutable) pgResult);
150 
151         return new immutable Answer(container);
152     }
153 
154     void sendDescribePrepared(string statementName)
155     {
156         size_t r = PQsendDescribePrepared(conn, statementName.toStringz);
157 
158         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
159     }
160 
161     /// Waiting for completion of reading or writing
162     /// Return: timeout not occured
163     bool waitEndOf(WaitType type, Duration timeout = Duration.zero)
164     {
165         import std.socket;
166 
167         auto socket = this.socket();
168         auto set = new SocketSet;
169         set.add(socket);
170 
171         while(true)
172         {
173             if(status() == CONNECTION_BAD)
174                 throw new ConnectionException(this, __FILE__, __LINE__);
175 
176             if(poll() == PGRES_POLLING_OK)
177             {
178                 return true;
179             }
180             else
181             {
182                 size_t sockNum;
183 
184                 with(WaitType)
185                 final switch(type)
186                 {
187                     case READ:
188                         sockNum = Socket.select(set, null, set, timeout);
189                         break;
190 
191                     case WRITE:
192                         sockNum = Socket.select(null, set, set, timeout);
193                         break;
194 
195                     case READ_WRITE:
196                         sockNum = Socket.select(set, set, set, timeout);
197                         break;
198                 }
199 
200                 enforce(sockNum >= 0);
201                 if(sockNum == 0) return false; // timeout is occurred
202 
203                 continue;
204             }
205         }
206     }
207 }
208 
209 enum WaitType
210 {
211     READ,
212     WRITE,
213     READ_WRITE
214 }
215 
216 void _integration_test( string connParam ) @trusted
217 {
218     auto conn = new Connection(connParam);
219 
220     // Text type arguments testing
221     {    
222         string sql_query =
223         "select now() as time, 'abc'::text as string, 123, 456.78\n"~
224         "union all\n"~
225         "select now(), 'абвгд'::text, 777, 910.11\n"~
226         "union all\n"~
227         "select NULL, 'ijk'::text, 789, 12345.115345";
228 
229         auto a = conn.exec( sql_query );
230 
231         assert( a.cmdStatus.length > 2 );
232         assert( a.columnCount == 4 );
233         assert( a.length == 3 );
234         assert( a.columnFormat(1) == ValueFormat.TEXT );
235         assert( a.columnFormat(2) == ValueFormat.TEXT );
236     }
237 
238     // Binary type arguments testing
239     {
240         import vibe.data.bson: Bson;
241 
242         const string sql_query =
243         "select $1::text, $2::integer, $3::text, $4, $5::integer[]";
244 
245         Value[5] args;
246         args[0] = toValue("абвгд");
247         args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value
248         args[2] = toValue("123");
249         args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value
250 
251         Bson binArray = Bson([
252             Bson([Bson(null), Bson(123), Bson(456)]),
253             Bson([Bson(0), Bson(789), Bson(null)])
254         ]);
255 
256         args[4] = bsonToValue(binArray);
257 
258         QueryParams p;
259         p.sqlCommand = sql_query;
260         p.args = args[];
261 
262         auto a = conn.execParams( p );
263 
264         foreach(i; 0 .. args.length)
265             assert(a.columnFormat(i) == ValueFormat.BINARY);
266 
267         assert( a.OID(0) == OidType.Text );
268         assert( a.OID(1) == OidType.Int4 );
269         assert( a.OID(2) == OidType.Text );
270         assert( a.OID(3) == OidType.Int8 );
271         assert( a.OID(4) == OidType.Int4Array );
272 
273         // binary args array test
274         assert( a[0][4].as!Bson == binArray );
275     }
276 
277     {
278         // Bug #52: empty text argument
279         QueryParams p;
280         Value v = toValue("");
281 
282         p.sqlCommand = "SELECT $1";
283         p.args = [v];
284 
285         auto a = conn.execParams(p);
286 
287         assert( !a[0][0].isNull );
288         assert( a[0][0].as!string == "" );
289     }
290 
291     // checking prepared statements
292     {
293         // uses PQprepare:
294         auto s = conn.prepare("prepared statement 1", "SELECT $1::integer");
295         assert(s.status == PGRES_COMMAND_OK);
296 
297         QueryParams p;
298         p.preparedStatementName = "prepared statement 1";
299         p.args = [42.toValue];
300         auto r = conn.execPrepared(p);
301         assert (r[0][0].as!int == 42);
302     }
303     {
304         // uses PQsendPrepare:
305         conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer");
306 
307         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
308         conn.consumeInput();
309 
310         immutable(Result)[] res;
311 
312         while(true)
313         {
314             auto r = conn.getResult();
315             if(r is null) break;
316             res ~= r;
317         }
318 
319         assert(res.length == 1);
320         assert(res[0].status == PGRES_COMMAND_OK);
321     }
322     {
323         // check prepared arg types and result types
324         auto a = conn.describePrepared("prepared statement 2");
325 
326         assert(a.nParams == 2);
327         assert(a.paramType(0) == OidType.Text);
328         assert(a.paramType(1) == OidType.Int4);
329     }
330     {
331         // async check prepared arg types and result types
332         conn.sendDescribePrepared("prepared statement 2");
333 
334         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
335         conn.consumeInput();
336 
337         immutable(Result)[] res;
338 
339         while(true)
340         {
341             auto r = conn.getResult();
342             if(r is null) break;
343             res ~= r;
344         }
345 
346         assert(res.length == 1);
347         assert(res[0].status == PGRES_COMMAND_OK);
348 
349         auto a = res[0].getAnswer;
350 
351         assert(a.nParams == 2);
352         assert(a.paramType(0) == OidType.Text);
353         assert(a.paramType(1) == OidType.Int4);
354     }
355     {
356         QueryParams p;
357         p.preparedStatementName = "prepared statement 2";
358         p.argsFromArray = ["abc", "123456"];
359 
360         conn.sendQueryPrepared(p);
361 
362         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
363         conn.consumeInput();
364 
365         immutable(Result)[] res;
366 
367         while(true)
368         {
369             auto r = conn.getResult();
370             if(r is null) break;
371             res ~= r;
372         }
373 
374         assert(res.length == 1);
375         assert(res[0].getAnswer[0][0].as!PGtext == "abc");
376         assert(res[0].getAnswer[0][1].as!PGinteger == 123456);
377     }
378 
379     import std.socket;
380     conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection
381 
382     {
383         bool exceptionFlag = false;
384 
385         try conn.exec("SELECT 'abc'::text").getAnswer;
386         catch(ConnectionException e)
387         {
388             exceptionFlag = true;
389             assert(e.msg.length > 15); // error message check
390         }
391         finally
392             assert(exceptionFlag);
393     }
394 }