1 module dpq2.query; 2 3 public import dpq2.args; 4 5 import dpq2.connection: Connection, ConnectionException; 6 import dpq2.result: Result; 7 import dpq2.value; 8 import dpq2.oids: OidType; 9 import derelict.pq.pq; 10 import core.time: Duration, dur; 11 import std.exception: enforce; 12 13 mixin template Queries() 14 { 15 /// Perform SQL query to DB 16 immutable (Answer) exec( string SQLcmd ) 17 { 18 auto pgResult = PQexec(conn, toStringz( SQLcmd )); 19 20 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 21 auto container = createResultContainer(cast(immutable) pgResult); 22 23 return new immutable Answer(container); 24 } 25 26 /// Perform SQL query to DB 27 immutable (Answer) execParams(in ref QueryParams qp) 28 { 29 auto p = InternalQueryParams(&qp); 30 auto pgResult = PQexecParams ( 31 conn, 32 p.command, 33 p.nParams, 34 p.paramTypes, 35 p.paramValues, 36 p.paramLengths, 37 p.paramFormats, 38 p.resultFormat 39 ); 40 41 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 42 auto container = createResultContainer(cast(immutable) pgResult); 43 44 return new immutable Answer(container); 45 } 46 47 /// Submits a command to the server without waiting for the result(s) 48 void sendQuery( string SQLcmd ) 49 { 50 const size_t r = PQsendQuery( conn, toStringz(SQLcmd) ); 51 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 52 } 53 54 /// Submits a command and separate parameters to the server without waiting for the result(s) 55 void sendQueryParams(in ref QueryParams qp) 56 { 57 auto p = InternalQueryParams(&qp); 58 size_t r = PQsendQueryParams ( 59 conn, 60 p.command, 61 p.nParams, 62 p.paramTypes, 63 p.paramValues, 64 p.paramLengths, 65 p.paramFormats, 66 p.resultFormat 67 ); 68 69 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 70 } 71 72 /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s) 73 void sendQueryPrepared(in ref QueryParams qp) 74 { 75 auto p = InternalQueryParams(&qp); 76 size_t r = PQsendQueryPrepared( 77 conn, 78 p.stmtName, 79 p.nParams, 80 p.paramValues, 81 p.paramLengths, 82 p.paramFormats, 83 p.resultFormat 84 ); 85 86 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 87 } 88 89 /// Returns null if no notifies was received 90 Notify getNextNotify() 91 { 92 consumeInput(); 93 auto n = PQnotifies(conn); 94 return n is null ? null : new Notify( n ); 95 } 96 97 /// Submits a request to create a prepared statement with the given parameters, and waits for completion. 98 immutable(Result) prepare(string statementName, string sqlStatement, in Oid[] oids = null) 99 { 100 PGresult* pgResult = PQprepare( 101 conn, 102 toStringz(statementName), 103 toStringz(sqlStatement), 104 oids.length.to!int, 105 oids.ptr 106 ); 107 108 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 109 auto container = createResultContainer(cast(immutable) pgResult); 110 111 return new immutable Result(container); 112 } 113 114 /// Submits a request to execute a prepared statement with given parameters, and waits for completion. 115 immutable(Answer) execPrepared(in ref QueryParams qp) 116 { 117 auto p = InternalQueryParams(&qp); 118 auto pgResult = PQexecPrepared( 119 conn, 120 p.stmtName, 121 p.nParams, 122 cast(const(char*)*)p.paramValues, 123 p.paramLengths, 124 p.paramFormats, 125 p.resultFormat 126 ); 127 128 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 129 auto container = createResultContainer(cast(immutable) pgResult); 130 131 return new immutable Answer(container); 132 } 133 134 /// Sends a request to create a prepared statement with the given parameters, without waiting for completion. 135 void sendPrepare(string statementName, string sqlStatement, in Oid[] oids = null) 136 { 137 size_t r = PQsendPrepare( 138 conn, 139 toStringz(statementName), 140 toStringz(sqlStatement), 141 oids.length.to!int, 142 cast(Oid*)oids.ptr //const should be accepted here, see https://github.com/DerelictOrg/DerelictPQ/issues/21 143 ); 144 145 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 146 } 147 148 immutable(Answer) describePrepared(string statementName) 149 { 150 PGresult* pgResult = PQdescribePrepared(conn, toStringz(statementName)); 151 152 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 153 auto container = createResultContainer(cast(immutable) pgResult); 154 155 return new immutable Answer(container); 156 } 157 158 void sendDescribePrepared(string statementName) 159 { 160 size_t r = PQsendDescribePrepared(conn, statementName.toStringz); 161 162 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 163 } 164 165 /// Waiting for completion of reading or writing 166 /// Return: timeout not occured 167 bool waitEndOf(WaitType type, Duration timeout = Duration.zero) 168 { 169 import std.socket; 170 171 auto socket = this.socket(); 172 auto set = new SocketSet; 173 set.add(socket); 174 175 while(true) 176 { 177 if(status() == CONNECTION_BAD) 178 throw new ConnectionException(this, __FILE__, __LINE__); 179 180 if(poll() == PGRES_POLLING_OK) 181 { 182 return true; 183 } 184 else 185 { 186 size_t sockNum; 187 188 with(WaitType) 189 final switch(type) 190 { 191 case READ: 192 sockNum = Socket.select(set, null, set, timeout); 193 break; 194 195 case WRITE: 196 sockNum = Socket.select(null, set, set, timeout); 197 break; 198 199 case READ_WRITE: 200 sockNum = Socket.select(set, set, set, timeout); 201 break; 202 } 203 204 enforce(sockNum >= 0); 205 if(sockNum == 0) return false; // timeout is occurred 206 207 continue; 208 } 209 } 210 } 211 } 212 213 enum WaitType 214 { 215 READ, 216 WRITE, 217 READ_WRITE 218 } 219 220 void _integration_test( string connParam ) @trusted 221 { 222 import dpq2.conv.to_d_types; 223 import dpq2.conv.to_bson; 224 225 auto conn = new Connection(connParam); 226 227 // Text type arguments testing 228 { 229 string sql_query = 230 "select now() as time, 'abc'::text as string, 123, 456.78\n"~ 231 "union all\n"~ 232 "select now(), 'абвгд'::text, 777, 910.11\n"~ 233 "union all\n"~ 234 "select NULL, 'ijk'::text, 789, 12345.115345"; 235 236 auto a = conn.exec( sql_query ); 237 238 assert( a.cmdStatus.length > 2 ); 239 assert( a.columnCount == 4 ); 240 assert( a.length == 3 ); 241 assert( a.columnFormat(1) == ValueFormat.TEXT ); 242 assert( a.columnFormat(2) == ValueFormat.TEXT ); 243 } 244 245 // Binary type arguments testing 246 { 247 import vibe.data.bson: Bson; 248 249 const string sql_query = 250 "select $1::text, $2::integer, $3::text, $4, $5::integer[]"; 251 252 Value[5] args; 253 args[0] = toValue("абвгд"); 254 args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value 255 args[2] = toValue("123"); 256 args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value 257 258 Bson binArray = Bson([ 259 Bson([Bson(null), Bson(123), Bson(456)]), 260 Bson([Bson(0), Bson(789), Bson(null)]) 261 ]); 262 263 args[4] = bsonToValue(binArray); 264 265 QueryParams p; 266 p.sqlCommand = sql_query; 267 p.args = args[]; 268 269 auto a = conn.execParams( p ); 270 271 foreach(i; 0 .. args.length) 272 assert(a.columnFormat(i) == ValueFormat.BINARY); 273 274 assert( a.OID(0) == OidType.Text ); 275 assert( a.OID(1) == OidType.Int4 ); 276 assert( a.OID(2) == OidType.Text ); 277 assert( a.OID(3) == OidType.Int8 ); 278 assert( a.OID(4) == OidType.Int4Array ); 279 280 // binary args array test 281 assert( a[0][4].as!Bson == binArray ); 282 } 283 284 { 285 // Bug #52: empty text argument 286 QueryParams p; 287 Value v = toValue(""); 288 289 p.sqlCommand = "SELECT $1"; 290 p.args = [v]; 291 292 auto a = conn.execParams(p); 293 294 assert( !a[0][0].isNull ); 295 assert( a[0][0].as!string == "" ); 296 } 297 298 // checking prepared statements 299 { 300 // uses PQprepare: 301 auto s = conn.prepare("prepared statement 1", "SELECT $1::integer"); 302 assert(s.status == PGRES_COMMAND_OK); 303 304 QueryParams p; 305 p.preparedStatementName = "prepared statement 1"; 306 p.args = [42.toValue]; 307 auto r = conn.execPrepared(p); 308 assert (r[0][0].as!int == 42); 309 } 310 { 311 // uses PQsendPrepare: 312 conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer"); 313 314 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 315 conn.consumeInput(); 316 317 immutable(Result)[] res; 318 319 while(true) 320 { 321 auto r = conn.getResult(); 322 if(r is null) break; 323 res ~= r; 324 } 325 326 assert(res.length == 1); 327 assert(res[0].status == PGRES_COMMAND_OK); 328 } 329 { 330 // check prepared arg types and result types 331 auto a = conn.describePrepared("prepared statement 2"); 332 333 assert(a.nParams == 2); 334 assert(a.paramType(0) == OidType.Text); 335 assert(a.paramType(1) == OidType.Int4); 336 } 337 { 338 // async check prepared arg types and result types 339 conn.sendDescribePrepared("prepared statement 2"); 340 341 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 342 conn.consumeInput(); 343 344 immutable(Result)[] res; 345 346 while(true) 347 { 348 auto r = conn.getResult(); 349 if(r is null) break; 350 res ~= r; 351 } 352 353 assert(res.length == 1); 354 assert(res[0].status == PGRES_COMMAND_OK); 355 356 auto a = res[0].getAnswer; 357 358 assert(a.nParams == 2); 359 assert(a.paramType(0) == OidType.Text); 360 assert(a.paramType(1) == OidType.Int4); 361 } 362 { 363 QueryParams p; 364 p.preparedStatementName = "prepared statement 2"; 365 p.argsFromArray = ["abc", "123456"]; 366 367 conn.sendQueryPrepared(p); 368 369 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 370 conn.consumeInput(); 371 372 immutable(Result)[] res; 373 374 while(true) 375 { 376 auto r = conn.getResult(); 377 if(r is null) break; 378 res ~= r; 379 } 380 381 assert(res.length == 1); 382 assert(res[0].getAnswer[0][0].as!PGtext == "abc"); 383 assert(res[0].getAnswer[0][1].as!PGinteger == 123456); 384 } 385 386 import std.socket; 387 conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection 388 389 { 390 bool exceptionFlag = false; 391 392 try conn.exec("SELECT 'abc'::text").getAnswer; 393 catch(ConnectionException e) 394 { 395 exceptionFlag = true; 396 assert(e.msg.length > 15); // error message check 397 } 398 finally 399 assert(exceptionFlag); 400 } 401 }