1 module dpq2.query;
2 
3 public import dpq2.args;
4 
5 import dpq2;
6 import core.time: Duration, dur;
7 import std.exception: enforce;
8 
9 mixin template Queries()
10 {
11     /// Perform SQL query to DB
12     immutable (Answer) exec( string SQLcmd )
13     {
14         auto pgResult = PQexec(conn, toStringz( SQLcmd ));
15 
16         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
17         auto container = createResultContainer(cast(immutable) pgResult);
18 
19         return new immutable Answer(container);
20     }
21     
22     /// Perform SQL query to DB
23     immutable (Answer) execParams(in ref QueryParams qp)
24     {
25         auto p = InternalQueryParams(qp);
26         auto pgResult = PQexecParams (
27                 conn,
28                 p.command,
29                 p.nParams,
30                 p.paramTypes,
31                 p.paramValues,
32                 p.paramLengths,
33                 p.paramFormats,
34                 p.resultFormat
35         );
36 
37         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
38         auto container = createResultContainer(cast(immutable) pgResult);
39 
40         return new immutable Answer(container);
41     }
42     
43     /// Submits a command to the server without waiting for the result(s)
44     void sendQuery( string SQLcmd )
45     {
46         const size_t r = PQsendQuery( conn, toStringz(SQLcmd) );
47         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
48     }
49 
50     /// Submits a command and separate parameters to the server without waiting for the result(s)
51     void sendQueryParams(in ref QueryParams qp)
52     {
53         auto p = InternalQueryParams(qp);
54         size_t r = PQsendQueryParams (
55                 conn,
56                 p.command,
57                 p.nParams,
58                 p.paramTypes,
59                 p.paramValues,
60                 p.paramLengths,
61                 p.paramFormats,
62                 p.resultFormat
63             );
64 
65         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
66     }
67 
68     /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s)
69     void sendQueryPrepared(in ref QueryParams qp)
70     {
71         auto p = InternalQueryParams(qp);
72         size_t r = PQsendQueryPrepared(
73                 conn,
74                 p.stmtName,
75                 p.nParams,
76                 p.paramValues,
77                 p.paramLengths,
78                 p.paramFormats,
79                 p.resultFormat
80             );
81 
82         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
83     }
84 
85     /// Returns null if no notifies was received
86     Notify getNextNotify()
87     {
88         consumeInput();
89         auto n = PQnotifies(conn);
90         return n is null ? null : new Notify( n );
91     }
92 
93     /// Submits a request to create a prepared statement with the given parameters, and waits for completion.
94     immutable(Result) prepare(string statementName, string sqlStatement)
95     {
96         PGresult* pgResult = PQprepare(
97                 conn,
98                 toStringz(statementName),
99                 toStringz(sqlStatement),
100                 0,
101                 null
102             );
103 
104         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
105         auto container = createResultContainer(cast(immutable) pgResult);
106 
107         return new immutable Result(container);
108     }
109 
110     /// Sends a request to create a prepared statement with the given parameters, without waiting for completion.
111     void sendPrepare(string statementName, string sqlStatement)
112     {
113         size_t r = PQsendPrepare(
114                 conn,
115                 toStringz(statementName),
116                 toStringz(sqlStatement),
117                 0,
118                 null
119             );
120 
121         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
122     }
123 
124     immutable(Answer) describePrepared(string statementName)
125     {
126         PGresult* pgResult = PQdescribePrepared(conn, toStringz(statementName));
127 
128         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
129         auto container = createResultContainer(cast(immutable) pgResult);
130 
131         return new immutable Answer(container);
132     }
133 
134     void sendDescribePrepared(string statementName)
135     {
136         size_t r = PQsendDescribePrepared(conn, statementName.toStringz);
137 
138         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
139     }
140 
141     /// Waiting for completion of reading or writing
142     /// Return: timeout not occured
143     bool waitEndOf(WaitType type, Duration timeout = Duration.zero)
144     {
145         import std.socket;
146 
147         auto socket = this.socket();
148         auto set = new SocketSet;
149         set.add(socket);
150 
151         while(true)
152         {
153             if(status() == CONNECTION_BAD)
154                 throw new ConnectionException(this, __FILE__, __LINE__);
155 
156             if(poll() == PGRES_POLLING_OK)
157             {
158                 return true;
159             }
160             else
161             {
162                 size_t sockNum;
163 
164                 with(WaitType)
165                 final switch(type)
166                 {
167                     case READ:
168                         sockNum = Socket.select(set, null, set, timeout);
169                         break;
170 
171                     case WRITE:
172                         sockNum = Socket.select(null, set, set, timeout);
173                         break;
174 
175                     case READ_WRITE:
176                         sockNum = Socket.select(set, set, set, timeout);
177                         break;
178                 }
179 
180                 enforce(sockNum >= 0);
181                 if(sockNum == 0) return false; // timeout is occurred
182 
183                 continue;
184             }
185         }
186     }
187 }
188 
189 enum WaitType
190 {
191     READ,
192     WRITE,
193     READ_WRITE
194 }
195 
196 void _integration_test( string connParam ) @trusted
197 {
198     auto conn = new Connection(connParam);
199 
200     {    
201         string sql_query =
202         "select now() as time, 'abc'::text as string, 123, 456.78\n"~
203         "union all\n"~
204         "select now(), 'абвгд'::text, 777, 910.11\n"~
205         "union all\n"~
206         "select NULL, 'ijk'::text, 789, 12345.115345";
207 
208         auto a = conn.exec( sql_query );
209 
210         assert( a.cmdStatus.length > 2 );
211         assert( a.columnCount == 4 );
212         assert( a.length == 3 );
213         assert( a.columnFormat(1) == ValueFormat.TEXT );
214         assert( a.columnFormat(2) == ValueFormat.TEXT );
215     }
216 
217     {
218         import vibe.data.bson: Bson;
219 
220         const string sql_query =
221         "select $1::text, $2::integer, $3::text, $4, $5::integer[]";
222 
223         Value[5] args;
224         args[0] = toValue("абвгд");
225         args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value
226         args[2] = toValue("123");
227         args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value
228 
229         Bson binArray = Bson([
230             Bson([Bson(null), Bson(123), Bson(456)]),
231             Bson([Bson(0), Bson(789), Bson(null)])
232         ]);
233 
234         args[4] = bsonToValue(binArray);
235 
236         QueryParams p;
237         p.sqlCommand = sql_query;
238         p.args = args[];
239 
240         auto a = conn.execParams( p );
241 
242         foreach(i; 0 .. args.length)
243             assert(a.columnFormat(i) == ValueFormat.BINARY);
244 
245         assert( a.OID(0) == OidType.Text );
246         assert( a.OID(1) == OidType.Int4 );
247         assert( a.OID(2) == OidType.Text );
248         assert( a.OID(3) == OidType.Int8 );
249         assert( a.OID(4) == OidType.Int4Array );
250 
251         // binary args array test
252         assert( a[0][4].as!Bson == binArray );
253     }
254 
255     {
256         // Bug #52: empty text argument
257         QueryParams p;
258         Value v = toValue("");
259 
260         p.sqlCommand = "SELECT $1";
261         p.args = [v];
262 
263         auto a = conn.execParams(p);
264 
265         assert( !a[0][0].isNull );
266         assert( a[0][0].as!string == "" );
267     }
268 
269     // checking prepared statements
270     {
271         // uses PQprepare:
272         auto s = conn.prepare("prepared statement 1", "SELECT $1::integer");
273         assert(s.status == PGRES_COMMAND_OK);
274     }
275     {
276         // uses PQsendPrepare:
277         conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer");
278 
279         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
280         conn.consumeInput();
281 
282         immutable(Result)[] res;
283 
284         while(true)
285         {
286             auto r = conn.getResult();
287             if(r is null) break;
288             res ~= r;
289         }
290 
291         assert(res.length == 1);
292         assert(res[0].status == PGRES_COMMAND_OK);
293     }
294     {
295         // check prepared arg types and result types
296         auto a = conn.describePrepared("prepared statement 2");
297 
298         assert(a.nParams == 2);
299         assert(a.paramType(0) == OidType.Text);
300         assert(a.paramType(1) == OidType.Int4);
301     }
302     {
303         // async check prepared arg types and result types
304         conn.sendDescribePrepared("prepared statement 2");
305 
306         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
307         conn.consumeInput();
308 
309         immutable(Result)[] res;
310 
311         while(true)
312         {
313             auto r = conn.getResult();
314             if(r is null) break;
315             res ~= r;
316         }
317 
318         assert(res.length == 1);
319         assert(res[0].status == PGRES_COMMAND_OK);
320 
321         auto a = res[0].getAnswer;
322 
323         assert(a.nParams == 2);
324         assert(a.paramType(0) == OidType.Text);
325         assert(a.paramType(1) == OidType.Int4);
326     }
327     {
328         QueryParams p;
329         p.preparedStatementName = "prepared statement 2";
330         p.argsFromArray = ["abc", "123456"];
331 
332         conn.sendQueryPrepared(p);
333 
334         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
335         conn.consumeInput();
336 
337         immutable(Result)[] res;
338 
339         while(true)
340         {
341             auto r = conn.getResult();
342             if(r is null) break;
343             res ~= r;
344         }
345 
346         assert(res.length == 1);
347         assert(res[0].getAnswer[0][0].as!PGtext == "abc");
348         assert(res[0].getAnswer[0][1].as!PGinteger == 123456);
349     }
350 
351     import std.socket;
352     conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection
353 
354     {
355         bool exceptionFlag = false;
356 
357         try conn.exec("SELECT 'abc'::text").getAnswer;
358         catch(ConnectionException e)
359         {
360             exceptionFlag = true;
361             assert(e.msg.length > 15); // error message check
362         }
363         finally
364             assert(exceptionFlag);
365     }
366 }