1 module dpq2.query;
2 
3 public import dpq2.args;
4 
5 import dpq2;
6 import core.time: Duration, dur;
7 import std.exception: enforce;
8 
9 mixin template Queries()
10 {
11     /// Perform SQL query to DB
12     immutable (Answer) exec( string SQLcmd )
13     {
14         auto pgResult = PQexec(conn, toStringz( SQLcmd ));
15 
16         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
17         auto container = createResultContainer(cast(immutable) pgResult);
18 
19         return new immutable Answer(container);
20     }
21     
22     /// Perform SQL query to DB
23     immutable (Answer) execParams(in ref QueryParams qp)
24     {
25         auto p = InternalQueryParams(qp);
26         auto pgResult = PQexecParams (
27                 conn,
28                 p.command,
29                 p.nParams,
30                 p.paramTypes,
31                 p.paramValues,
32                 p.paramLengths,
33                 p.paramFormats,
34                 p.resultFormat
35         );
36 
37         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
38         auto container = createResultContainer(cast(immutable) pgResult);
39 
40         return new immutable Answer(container);
41     }
42     
43     /// Submits a command to the server without waiting for the result(s)
44     void sendQuery( string SQLcmd )
45     {
46         const size_t r = PQsendQuery( conn, toStringz(SQLcmd) );
47         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
48     }
49 
50     /// Submits a command and separate parameters to the server without waiting for the result(s)
51     void sendQueryParams(in ref QueryParams qp)
52     {
53         auto p = InternalQueryParams(qp);
54         size_t r = PQsendQueryParams (
55                 conn,
56                 p.command,
57                 p.nParams,
58                 p.paramTypes,
59                 p.paramValues,
60                 p.paramLengths,
61                 p.paramFormats,
62                 p.resultFormat
63             );
64 
65         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
66     }
67 
68     /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s)
69     void sendQueryPrepared(in ref QueryParams qp)
70     {
71         auto p = InternalQueryParams(qp);
72         size_t r = PQsendQueryPrepared(
73                 conn,
74                 p.stmtName,
75                 p.nParams,
76                 p.paramValues,
77                 p.paramLengths,
78                 p.paramFormats,
79                 p.resultFormat
80             );
81 
82         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
83     }
84 
85     /// Returns null if no notifies was received
86     Notify getNextNotify()
87     {
88         consumeInput();
89         auto n = PQnotifies(conn);
90         return n is null ? null : new Notify( n );
91     }
92 
93     /// Submits a request to create a prepared statement with the given parameters, and waits for completion.
94     immutable(Result) prepare(string statementName, string sqlStatement)
95     {
96         PGresult* pgResult = PQprepare(
97                 conn,
98                 toStringz(statementName),
99                 toStringz(sqlStatement),
100                 0,
101                 null
102             );
103 
104         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
105         auto container = createResultContainer(cast(immutable) pgResult);
106 
107         return new immutable Result(container);
108     }
109 
110     /// Sends a request to create a prepared statement with the given parameters, without waiting for completion.
111     void sendPrepare(string statementName, string sqlStatement)
112     {
113         size_t r = PQsendPrepare(
114                 conn,
115                 toStringz(statementName),
116                 toStringz(sqlStatement),
117                 0,
118                 null
119             );
120 
121         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
122     }
123 
124     immutable(Answer) describePrepared(string statementName)
125     {
126         PGresult* pgResult = PQdescribePrepared(conn, toStringz(statementName));
127 
128         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
129         auto container = createResultContainer(cast(immutable) pgResult);
130 
131         return new immutable Answer(container);
132     }
133 
134     void sendDescribePrepared(string statementName)
135     {
136         size_t r = PQsendDescribePrepared(conn, statementName.toStringz);
137 
138         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
139     }
140 
141     /// Waiting for completion of reading or writing
142     /// Return: timeout not occured
143     bool waitEndOf(WaitType type, Duration timeout = Duration.zero)
144     {
145         import std.socket;
146 
147         auto socket = this.socket();
148         auto set = new SocketSet;
149         set.add(socket);
150 
151         while(true)
152         {
153             if(status() == CONNECTION_BAD)
154                 throw new ConnectionException(this, __FILE__, __LINE__);
155 
156             if(poll() == PGRES_POLLING_OK)
157             {
158                 return true;
159             }
160             else
161             {
162                 size_t sockNum;
163 
164                 with(WaitType)
165                 final switch(type)
166                 {
167                     case READ:
168                         sockNum = Socket.select(set, null, set, timeout);
169                         break;
170 
171                     case WRITE:
172                         sockNum = Socket.select(null, set, set, timeout);
173                         break;
174 
175                     case READ_WRITE:
176                         sockNum = Socket.select(set, set, set, timeout);
177                         break;
178                 }
179 
180                 enforce(sockNum >= 0);
181                 if(sockNum == 0) return false; // timeout is occurred
182 
183                 continue;
184             }
185         }
186     }
187 }
188 
189 enum WaitType
190 {
191     READ,
192     WRITE,
193     READ_WRITE
194 }
195 
196 void _integration_test( string connParam ) @trusted
197 {
198     auto conn = new Connection(connParam);
199 
200     {    
201         string sql_query =
202         "select now() as time, 'abc'::text as string, 123, 456.78\n"~
203         "union all\n"~
204         "select now(), 'абвгд'::text, 777, 910.11\n"~
205         "union all\n"~
206         "select NULL, 'ijk'::text, 789, 12345.115345";
207 
208         auto a = conn.exec( sql_query );
209 
210         assert( a.cmdStatus.length > 2 );
211         assert( a.columnCount == 4 );
212         assert( a.length == 3 );
213         assert( a.columnFormat(1) == ValueFormat.TEXT );
214         assert( a.columnFormat(2) == ValueFormat.TEXT );
215     }
216 
217     {
218         import vibe.data.bson: Bson;
219 
220         const string sql_query =
221         "select $1::text, $2::integer, $3::text, $4, $5::integer[]";
222 
223         Value[5] args;
224         args[0] = toValue("абвгд");
225         args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value
226         args[2] = toValue("123");
227         args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value
228 
229         Bson binArray = Bson([
230             Bson([Bson(null), Bson(123), Bson(456)]),
231             Bson([Bson(0), Bson(789), Bson(null)])
232         ]);
233 
234         args[4] = bsonToValue(binArray);
235 
236         QueryParams p;
237         p.sqlCommand = sql_query;
238         p.args = args[];
239 
240         auto a = conn.execParams( p );
241 
242         foreach(i; 0 .. args.length)
243             assert(a.columnFormat(i) == ValueFormat.BINARY);
244 
245         assert( a.OID(0) == OidType.Text );
246         assert( a.OID(1) == OidType.Int4 );
247         assert( a.OID(2) == OidType.Text );
248         assert( a.OID(3) == OidType.Int8 );
249         assert( a.OID(4) == OidType.Int4Array );
250 
251         // binary args array test
252         assert( a[0][4].as!Bson == binArray );
253     }
254 
255     // checking prepared statements
256     {
257         // uses PQprepare:
258         auto s = conn.prepare("prepared statement 1", "SELECT $1::integer");
259         assert(s.status == PGRES_COMMAND_OK);
260     }
261     {
262         // uses PQsendPrepare:
263         conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer");
264 
265         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
266         conn.consumeInput();
267 
268         immutable(Result)[] res;
269 
270         while(true)
271         {
272             auto r = conn.getResult();
273             if(r is null) break;
274             res ~= r;
275         }
276 
277         assert(res.length == 1);
278         assert(res[0].status == PGRES_COMMAND_OK);
279     }
280     {
281         // check prepared arg types and result types
282         auto a = conn.describePrepared("prepared statement 2");
283 
284         assert(a.nParams == 2);
285         assert(a.paramType(0) == OidType.Text);
286         assert(a.paramType(1) == OidType.Int4);
287     }
288     {
289         // async check prepared arg types and result types
290         conn.sendDescribePrepared("prepared statement 2");
291 
292         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
293         conn.consumeInput();
294 
295         immutable(Result)[] res;
296 
297         while(true)
298         {
299             auto r = conn.getResult();
300             if(r is null) break;
301             res ~= r;
302         }
303 
304         assert(res.length == 1);
305         assert(res[0].status == PGRES_COMMAND_OK);
306 
307         auto a = res[0].getAnswer;
308 
309         assert(a.nParams == 2);
310         assert(a.paramType(0) == OidType.Text);
311         assert(a.paramType(1) == OidType.Int4);
312     }
313     {
314         QueryParams p;
315         p.preparedStatementName = "prepared statement 2";
316         p.argsFromArray = ["abc", "123456"];
317 
318         conn.sendQueryPrepared(p);
319 
320         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
321         conn.consumeInput();
322 
323         immutable(Result)[] res;
324 
325         while(true)
326         {
327             auto r = conn.getResult();
328             if(r is null) break;
329             res ~= r;
330         }
331 
332         assert(res.length == 1);
333         assert(res[0].getAnswer[0][0].as!PGtext == "abc");
334         assert(res[0].getAnswer[0][1].as!PGinteger == 123456);
335     }
336 
337     import std.socket;
338     conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection
339 
340     {
341         bool exceptionFlag = false;
342 
343         try conn.exec("SELECT 'abc'::text").getAnswer;
344         catch(ConnectionException e)
345         {
346             exceptionFlag = true;
347             assert(e.msg.length > 15); // error message check
348         }
349         finally
350             assert(exceptionFlag);
351     }
352 }