1 module dpq2.query;
2 
3 @safe:
4 
5 public import dpq2.args;
6 
7 import dpq2;
8 import core.time: Duration;
9 
10 mixin template Queries()
11 {
12     /// Perform SQL query to DB
13     immutable (Answer) exec( string SQLcmd )
14     {
15         auto pgResult = PQexec(conn, toStringz( SQLcmd ));
16 
17         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
18         auto container = createResultContainer(cast(immutable) pgResult);
19 
20         return new immutable Answer(container);
21     }
22     
23     /// Perform SQL query to DB
24     immutable (Answer) execParams(ref QueryParams p)
25     {
26         auto pgResult = PQexecParams (
27                 conn,
28                 p.command,
29                 p.nParams,
30                 cast(uint*) p.paramTypes, //TODO: need report to derelict pq
31                 cast(const(ubyte)**) p.paramValues, //TODO: need report to derelict pq
32                 cast(int*) p.paramLengths, //TODO: need report to derelict pq
33                 cast(int*) p.paramFormats, //TODO: need report to derelict pq
34                 p.resultFormat
35         );
36 
37         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
38         auto container = createResultContainer(cast(immutable) pgResult);
39 
40         return new immutable Answer(container);
41     }
42     
43     /// Submits a command to the server without waiting for the result(s)
44     void sendQuery( string SQLcmd )
45     {
46         const size_t r = PQsendQuery( conn, toStringz(SQLcmd) );
47         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
48     }
49     
50     /// Submits a command and separate parameters to the server without waiting for the result(s)
51     void sendQueryParams(ref QueryParams p)
52     {
53         size_t r = PQsendQueryParams (
54                 conn,
55                 p.command,
56                 p.nParams,
57                 cast(uint*) p.paramTypes, //TODO: need report to derelict pq
58                 cast(const(ubyte)**) p.paramValues, //TODO: need report to derelict pq
59                 cast(int*) p.paramLengths, //TODO: need report to derelict pq
60                 cast(int*) p.paramFormats, //TODO: need report to derelict pq
61                 p.resultFormat
62             );
63 
64         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
65     }
66 
67     /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s)
68     void sendQueryPrepared(ref QueryParams p)
69     {
70         size_t r = PQsendQueryPrepared(
71                 conn,
72                 cast(char*) p.stmtName, //TODO: need report to derelict pq
73                 p.nParams,
74                 cast(char**) p.paramValues, //TODO: need report to derelict pq
75                 cast(int*) p.paramLengths, //TODO: need report to derelict pq
76                 cast(int*) p.paramFormats, //TODO: need report to derelict pq
77                 p.resultFormat
78             );
79 
80         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
81     }
82 
83     /// Returns null if no notifies was received
84     Notify getNextNotify()
85     {
86         consumeInput();
87         auto n = PQnotifies(conn);
88         return n is null ? null : new Notify( n );
89     }
90 
91     /// Submits a request to create a prepared statement with the given parameters, and waits for completion.
92     immutable(Result) prepare(string statementName, string sqlStatement)
93     {
94         PGresult* pgResult = PQprepare(
95                 conn,
96                 cast(char*)toStringz(statementName), //TODO: need report to derelict pq
97                 cast(char*)toStringz(sqlStatement), //TODO: need report to derelict pq
98                 0,
99                 null
100             );
101 
102         // is guaranteed by libpq that the result will not be changed until it will not be destroyed
103         auto container = createResultContainer(cast(immutable) pgResult);
104 
105         return new immutable Result(container);
106     }
107 
108     /// Sends a request to create a prepared statement with the given parameters, without waiting for completion.
109     void sendPrepare(string statementName, string sqlStatement)
110     {
111         size_t r = PQsendPrepare(
112                 conn,
113                 cast(char*)toStringz(statementName), //TODO: need report to derelict pq
114                 cast(char*)toStringz(sqlStatement), //TODO: need report to derelict pq
115                 0,
116                 null
117             );
118 
119         if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__);
120     }
121 
122     /// Waiting for completion of reading or writing
123     /// Return: timeout not occured
124     bool waitEndOf(WaitType type, Duration timeout = Duration.zero)
125     {
126         import std.socket;
127 
128         auto socket = socket();
129         auto set = new SocketSet;
130         set.add(socket);
131 
132         while(true)
133         {
134             if(status() == CONNECTION_BAD)
135                 throw new ConnectionException(this, __FILE__, __LINE__);
136 
137             if(poll() == PGRES_POLLING_OK)
138             {
139                 return true;
140             }
141             else
142             {
143                 size_t sockNum;
144 
145                 with(WaitType)
146                 final switch(type)
147                 {
148                     case READ:
149                         sockNum = Socket.select(set, null, set, timeout);
150                         break;
151 
152                     case WRITE:
153                         sockNum = Socket.select(null, set, set, timeout);
154                         break;
155 
156                     case READ_WRITE:
157                         sockNum = Socket.select(set, set, set, timeout);
158                         break;
159                 }
160 
161                 enforce(sockNum >= 0);
162                 if(sockNum == 0) return false; // timeout is occurred
163 
164                 continue;
165             }
166         }
167     }
168 }
169 
170 enum WaitType
171 {
172     READ,
173     WRITE,
174     READ_WRITE
175 }
176 
177 void _integration_test( string connParam ) @trusted
178 {
179     auto conn = new Connection(connParam);
180 
181     {    
182         string sql_query =
183         "select now() as time, 'abc'::text as string, 123, 456.78\n"~
184         "union all\n"~
185         "select now(), 'абвгд'::text, 777, 910.11\n"~
186         "union all\n"~
187         "select NULL, 'ijk'::text, 789, 12345.115345";
188 
189         auto a = conn.exec( sql_query );
190 
191         assert( a.cmdStatus.length > 2 );
192         assert( a.columnCount == 4 );
193         assert( a.length == 3 );
194         assert( a.columnFormat(1) == ValueFormat.TEXT );
195         assert( a.columnFormat(2) == ValueFormat.TEXT );
196     }
197 
198     {
199         import vibe.data.bson: Bson;
200 
201         const string sql_query =
202         "select $1::text, $2::integer, $3::text, $4, $5::integer[]";
203 
204         Value[5] args;
205         args[0] = toValue("абвгд");
206         args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value
207         args[2] = toValue("123");
208         args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value
209 
210         Bson binArray = Bson([
211             Bson([Bson(null), Bson(123), Bson(456)]),
212             Bson([Bson(0), Bson(789), Bson(null)])
213         ]);
214 
215         args[4] = bsonToValue(binArray);
216 
217         QueryParams p;
218         p.sqlCommand = sql_query;
219         p.args = args[];
220 
221         auto a = conn.execParams( p );
222 
223         foreach(i; 0 .. args.length)
224             assert(a.columnFormat(i) == ValueFormat.BINARY);
225 
226         assert( a.OID(0) == OidType.Text );
227         assert( a.OID(1) == OidType.Int4 );
228         assert( a.OID(2) == OidType.Text );
229         assert( a.OID(3) == OidType.Int8 );
230         assert( a.OID(4) == OidType.Int4Array );
231 
232         // binary args array test
233         assert( a[0][4].toBson == binArray );
234     }
235 
236     // checking prepared statements
237     {
238         // uses PQprepare:
239         auto s = conn.prepare("prepared statement 1", "SELECT $1::integer");
240         assert(s.status == PGRES_COMMAND_OK);
241     }
242     {
243         // uses PQsendPrepare:
244         conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer");
245 
246         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
247         conn.consumeInput();
248 
249         immutable(Result)[] res;
250 
251         while(true)
252         {
253             auto r = conn.getResult();
254             if(r is null) break;
255             res ~= r;
256         }
257 
258         assert(res.length == 1);
259         assert(res[0].status == PGRES_COMMAND_OK);
260     }
261     {
262         QueryParams p;
263         p.preparedStatementName = "prepared statement 2";
264         p.argsFromArray = ["abc", "123456"];
265 
266         conn.sendQueryPrepared(p);
267 
268         conn.waitEndOf(WaitType.READ, dur!"seconds"(5));
269         conn.consumeInput();
270 
271         immutable(Result)[] res;
272 
273         while(true)
274         {
275             auto r = conn.getResult();
276             if(r is null) break;
277             res ~= r;
278         }
279 
280         assert(res.length == 1);
281         assert(res[0].getAnswer[0][0].as!PGtext == "abc");
282         assert(res[0].getAnswer[0][1].as!PGinteger == 123456);
283     }
284 
285     import std.socket;
286     conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection
287 
288     {
289         bool exceptionFlag = false;
290 
291         try conn.exec("SELECT 'abc'::text").getAnswer;
292         catch(ConnectionException e)
293         {
294             exceptionFlag = true;
295             assert(e.msg.length > 15); // error message check
296         }
297         finally
298             assert(exceptionFlag);
299     }
300 }