1 module dpq2.query;
2 
3 public import dpq2.args;
4 
5 import dpq2;
6 import core.time: Duration;
7 
8 mixin template Queries()
9 {
10     /// Perform SQL query to DB
11     immutable (Answer) exec( string SQLcmd )
12     {
13         auto pgResult = PQexec(conn, toStringz( SQLcmd ));
14 
15         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
16         auto container = createResultContainer(cast(immutable) pgResult);
17 
18         return new immutable Answer(container);
19     }
20     
21     /// Perform SQL query to DB
22     immutable (Answer) execParams(in ref QueryParams qp)
23     {
24         auto p = InternalQueryParams(qp);
25         auto pgResult = PQexecParams (
26                 conn,
27                 p.command,
28                 p.nParams,
29                 cast(uint*) p.paramTypes, //TODO: need report to derelict pq
30                 cast(const(ubyte)**) p.paramValues, //TODO: need report to derelict pq
31                 cast(int*) p.paramLengths, //TODO: need report to derelict pq
32                 cast(int*) p.paramFormats, //TODO: need report to derelict pq
33                 p.resultFormat
34         );
35 
36         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
37         auto container = createResultContainer(cast(immutable) pgResult);
38 
39         return new immutable Answer(container);
40     }
41     
42     /// Submits a command to the server without waiting for the result(s)
43     void sendQuery( string SQLcmd )
44     {
45         const size_t r = PQsendQuery( conn, toStringz(SQLcmd) );
46         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
47     }
48 
49     /// Submits a command and separate parameters to the server without waiting for the result(s)
50     void sendQueryParams(in ref QueryParams qp)
51     {
52         auto p = InternalQueryParams(qp);
53         size_t r = PQsendQueryParams (
54                 conn,
55                 p.command,
56                 p.nParams,
57                 cast(uint*) p.paramTypes, //TODO: need report to derelict pq
58                 cast(const(ubyte)**) p.paramValues, //TODO: need report to derelict pq
59                 cast(int*) p.paramLengths, //TODO: need report to derelict pq
60                 cast(int*) p.paramFormats, //TODO: need report to derelict pq
61                 p.resultFormat
62             );
63 
64         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
65     }
66 
67     /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s)
68     void sendQueryPrepared(in ref QueryParams qp)
69     {
70         auto p = InternalQueryParams(qp);
71         size_t r = PQsendQueryPrepared(
72                 conn,
73                 cast(char*) p.stmtName, //TODO: need report to derelict pq
74                 p.nParams,
75                 cast(char**) p.paramValues, //TODO: need report to derelict pq
76                 cast(int*) p.paramLengths, //TODO: need report to derelict pq
77                 cast(int*) p.paramFormats, //TODO: need report to derelict pq
78                 p.resultFormat
79             );
80 
81         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
82     }
83 
84     /// Returns null if no notifies was received
85     Notify getNextNotify()
86     {
87         consumeInput();
88         auto n = PQnotifies(conn);
89         return n is null ? null : new Notify( n );
90     }
91 
92     /// Submits a request to create a prepared statement with the given parameters, and waits for completion.
93     immutable(Result) prepare(string statementName, string sqlStatement)
94     {
95         PGresult* pgResult = PQprepare(
96                 conn,
97                 cast(char*)toStringz(statementName), //TODO: need report to derelict pq
98                 cast(char*)toStringz(sqlStatement), //TODO: need report to derelict pq
99                 0,
100                 null
101             );
102 
103         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
104         auto container = createResultContainer(cast(immutable) pgResult);
105 
106         return new immutable Result(container);
107     }
108 
109     /// Sends a request to create a prepared statement with the given parameters, without waiting for completion.
110     void sendPrepare(string statementName, string sqlStatement)
111     {
112         size_t r = PQsendPrepare(
113                 conn,
114                 cast(char*)toStringz(statementName), //TODO: need report to derelict pq
115                 cast(char*)toStringz(sqlStatement), //TODO: need report to derelict pq
116                 0,
117                 null
118             );
119 
120         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
121     }
122 
123     immutable(Answer) describePrepared(string statementName)
124     {
125         PGresult* pgResult = PQdescribePrepared(conn, cast(char*)toStringz(statementName)); //TODO: need report to derelict pq
126 
127         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
128         auto container = createResultContainer(cast(immutable) pgResult);
129 
130         return new immutable Answer(container);
131     }
132 
133     /// Waiting for completion of reading or writing
134     /// Return: timeout not occured
135     bool waitEndOf(WaitType type, Duration timeout = Duration.zero)
136     {
137         import std.socket;
138 
139         auto socket = socket();
140         auto set = new SocketSet;
141         set.add(socket);
142 
143         while(true)
144         {
145             if(status() == CONNECTION_BAD)
146                 throw new ConnectionException(this, __FILE__, __LINE__);
147 
148             if(poll() == PGRES_POLLING_OK)
149             {
150                 return true;
151             }
152             else
153             {
154                 size_t sockNum;
155 
156                 with(WaitType)
157                 final switch(type)
158                 {
159                     case READ:
160                         sockNum = Socket.select(set, null, set, timeout);
161                         break;
162 
163                     case WRITE:
164                         sockNum = Socket.select(null, set, set, timeout);
165                         break;
166 
167                     case READ_WRITE:
168                         sockNum = Socket.select(set, set, set, timeout);
169                         break;
170                 }
171 
172                 enforce(sockNum >= 0);
173                 if(sockNum == 0) return false; // timeout is occurred
174 
175                 continue;
176             }
177         }
178     }
179 }
180 
181 enum WaitType
182 {
183     READ,
184     WRITE,
185     READ_WRITE
186 }
187 
188 void _integration_test( string connParam ) @trusted
189 {
190     auto conn = new Connection(connParam);
191 
192     {    
193         string sql_query =
194         "select now() as time, 'abc'::text as string, 123, 456.78\n"~
195         "union all\n"~
196         "select now(), 'абвгд'::text, 777, 910.11\n"~
197         "union all\n"~
198         "select NULL, 'ijk'::text, 789, 12345.115345";
199 
200         auto a = conn.exec( sql_query );
201 
202         assert( a.cmdStatus.length > 2 );
203         assert( a.columnCount == 4 );
204         assert( a.length == 3 );
205         assert( a.columnFormat(1) == ValueFormat.TEXT );
206         assert( a.columnFormat(2) == ValueFormat.TEXT );
207     }
208 
209     {
210         import vibe.data.bson: Bson;
211 
212         const string sql_query =
213         "select $1::text, $2::integer, $3::text, $4, $5::integer[]";
214 
215         Value[5] args;
216         args[0] = toValue("абвгд");
217         args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value
218         args[2] = toValue("123");
219         args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value
220 
221         Bson binArray = Bson([
222             Bson([Bson(null), Bson(123), Bson(456)]),
223             Bson([Bson(0), Bson(789), Bson(null)])
224         ]);
225 
226         args[4] = bsonToValue(binArray);
227 
228         QueryParams p;
229         p.sqlCommand = sql_query;
230         p.args = args[];
231 
232         auto a = conn.execParams( p );
233 
234         foreach(i; 0 .. args.length)
235             assert(a.columnFormat(i) == ValueFormat.BINARY);
236 
237         assert( a.OID(0) == OidType.Text );
238         assert( a.OID(1) == OidType.Int4 );
239         assert( a.OID(2) == OidType.Text );
240         assert( a.OID(3) == OidType.Int8 );
241         assert( a.OID(4) == OidType.Int4Array );
242 
243         // binary args array test
244         assert( a[0][4].as!Bson == binArray );
245     }
246 
247     // checking prepared statements
248     {
249         // uses PQprepare:
250         auto s = conn.prepare("prepared statement 1", "SELECT $1::integer");
251         assert(s.status == PGRES_COMMAND_OK);
252     }
253     {
254         // uses PQsendPrepare:
255         conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer");
256 
257         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
258         conn.consumeInput();
259 
260         immutable(Result)[] res;
261 
262         while(true)
263         {
264             auto r = conn.getResult();
265             if(r is null) break;
266             res ~= r;
267         }
268 
269         assert(res.length == 1);
270         assert(res[0].status == PGRES_COMMAND_OK);
271     }
272     {
273         // check prepared arg types and result types
274         auto a = conn.describePrepared("prepared statement 2");
275 
276         assert(a.nParams == 2);
277         assert(a.paramType(0) == OidType.Text);
278         assert(a.paramType(1) == OidType.Int4);
279     }
280     {
281         QueryParams p;
282         p.preparedStatementName = "prepared statement 2";
283         p.argsFromArray = ["abc", "123456"];
284 
285         conn.sendQueryPrepared(p);
286 
287         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
288         conn.consumeInput();
289 
290         immutable(Result)[] res;
291 
292         while(true)
293         {
294             auto r = conn.getResult();
295             if(r is null) break;
296             res ~= r;
297         }
298 
299         assert(res.length == 1);
300         assert(res[0].getAnswer[0][0].as!PGtext == "abc");
301         assert(res[0].getAnswer[0][1].as!PGinteger == 123456);
302     }
303 
304     import std.socket;
305     conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection
306 
307     {
308         bool exceptionFlag = false;
309 
310         try conn.exec("SELECT 'abc'::text").getAnswer;
311         catch(ConnectionException e)
312         {
313             exceptionFlag = true;
314             assert(e.msg.length > 15); // error message check
315         }
316         finally
317             assert(exceptionFlag);
318     }
319 }