1 /// Query methods 2 module dpq2.query; 3 4 public import dpq2.args; 5 6 import dpq2.connection: Connection, ConnectionException; 7 import dpq2.result: Result; 8 import dpq2.value; 9 import dpq2.oids: OidType; 10 import derelict.pq.pq; 11 import core.time: Duration, dur; 12 import std.exception: enforce; 13 14 /// Extends Connection by adding query methods 15 /// 16 /// Just use it as Connection.* methods. 17 mixin template Queries() 18 { 19 /// Perform SQL query to DB 20 immutable (Answer) exec( string SQLcmd ) 21 { 22 auto pgResult = PQexec(conn, toStringz( SQLcmd )); 23 24 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 25 auto container = createResultContainer(cast(immutable) pgResult); 26 27 return new immutable Answer(container); 28 } 29 30 /// Perform SQL query to DB 31 immutable (Answer) execParams(in ref QueryParams qp) 32 { 33 auto p = InternalQueryParams(&qp); 34 auto pgResult = PQexecParams ( 35 conn, 36 p.command, 37 p.nParams, 38 p.paramTypes, 39 p.paramValues, 40 p.paramLengths, 41 p.paramFormats, 42 p.resultFormat 43 ); 44 45 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 46 auto container = createResultContainer(cast(immutable) pgResult); 47 48 return new immutable Answer(container); 49 } 50 51 /// Submits a command to the server without waiting for the result(s) 52 void sendQuery( string SQLcmd ) 53 { 54 const size_t r = PQsendQuery( conn, toStringz(SQLcmd) ); 55 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 56 } 57 58 /// Submits a command and separate parameters to the server without waiting for the result(s) 59 void sendQueryParams(in ref QueryParams qp) 60 { 61 auto p = InternalQueryParams(&qp); 62 size_t r = PQsendQueryParams ( 63 conn, 64 p.command, 65 p.nParams, 66 p.paramTypes, 67 p.paramValues, 68 p.paramLengths, 69 p.paramFormats, 70 p.resultFormat 71 ); 72 73 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 74 } 75 76 /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s) 77 void sendQueryPrepared(in ref QueryParams qp) 78 { 79 auto p = InternalQueryParams(&qp); 80 size_t r = PQsendQueryPrepared( 81 conn, 82 p.stmtName, 83 p.nParams, 84 p.paramValues, 85 p.paramLengths, 86 p.paramFormats, 87 p.resultFormat 88 ); 89 90 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 91 } 92 93 /// Returns null if no notifies was received 94 Notify getNextNotify() 95 { 96 consumeInput(); 97 auto n = PQnotifies(conn); 98 return n is null ? null : new Notify( n ); 99 } 100 101 /// Submits a request to create a prepared statement with the given parameters, and waits for completion. 102 /// Returns: Result of query preparing 103 immutable(Result) prepare(string statementName, string sqlStatement, in Oid[] oids = null) 104 { 105 PGresult* pgResult = PQprepare( 106 conn, 107 toStringz(statementName), 108 toStringz(sqlStatement), 109 oids.length.to!int, 110 oids.ptr 111 ); 112 113 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 114 auto container = createResultContainer(cast(immutable) pgResult); 115 116 return new immutable Result(container); 117 } 118 119 /// Submits a request to create a prepared statement with the given parameters, and waits for completion. 120 /// 121 /// Throws an exception if preparing failed. 122 void prepareEx(string statementName, string sqlStatement, in Oid[] oids = null) 123 { 124 auto r = prepare(statementName, sqlStatement, oids); 125 126 if(r.status != PGRES_COMMAND_OK) 127 throw new ResponseException(r, __FILE__, __LINE__); 128 } 129 130 /// Submits a request to execute a prepared statement with given parameters, and waits for completion. 131 immutable(Answer) execPrepared(in ref QueryParams qp) 132 { 133 auto p = InternalQueryParams(&qp); 134 auto pgResult = PQexecPrepared( 135 conn, 136 p.stmtName, 137 p.nParams, 138 cast(const(char*)*)p.paramValues, 139 p.paramLengths, 140 p.paramFormats, 141 p.resultFormat 142 ); 143 144 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 145 auto container = createResultContainer(cast(immutable) pgResult); 146 147 return new immutable Answer(container); 148 } 149 150 /// Sends a request to create a prepared statement with the given parameters, without waiting for completion. 151 void sendPrepare(string statementName, string sqlStatement, in Oid[] oids = null) 152 { 153 size_t r = PQsendPrepare( 154 conn, 155 toStringz(statementName), 156 toStringz(sqlStatement), 157 oids.length.to!int, 158 oids.ptr 159 ); 160 161 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 162 } 163 164 /// Submits a request to obtain information about the specified prepared statement, and waits for completion. 165 immutable(Answer) describePrepared(string statementName) 166 { 167 PGresult* pgResult = PQdescribePrepared(conn, toStringz(statementName)); 168 169 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 170 auto container = createResultContainer(cast(immutable) pgResult); 171 172 return new immutable Answer(container); 173 } 174 175 /// Submits a request to obtain information about the specified prepared statement, without waiting for completion. 176 void sendDescribePrepared(string statementName) 177 { 178 size_t r = PQsendDescribePrepared(conn, statementName.toStringz); 179 180 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 181 } 182 183 // Waiting for completion of reading or writing 184 // Returns: timeout is not occured 185 version(integration_tests) 186 bool waitEndOf(WaitType type, Duration timeout = Duration.zero) 187 { 188 import std.socket; 189 190 auto socket = this.socket(); 191 auto set = new SocketSet; 192 set.add(socket); 193 194 while(true) 195 { 196 if(status() == CONNECTION_BAD) 197 throw new ConnectionException(this, __FILE__, __LINE__); 198 199 if(poll() == PGRES_POLLING_OK) 200 { 201 return true; 202 } 203 else 204 { 205 size_t sockNum; 206 207 with(WaitType) 208 final switch(type) 209 { 210 case READ: 211 sockNum = Socket.select(set, null, set, timeout); 212 break; 213 214 case WRITE: 215 sockNum = Socket.select(null, set, set, timeout); 216 break; 217 218 case READ_WRITE: 219 sockNum = Socket.select(set, set, set, timeout); 220 break; 221 } 222 223 enforce(sockNum >= 0); 224 if(sockNum == 0) return false; // timeout is occurred 225 226 continue; 227 } 228 } 229 } 230 } 231 232 version(integration_tests) 233 enum WaitType 234 { 235 READ, 236 WRITE, 237 READ_WRITE 238 } 239 240 version (integration_tests) 241 void _integration_test( string connParam ) @trusted 242 { 243 import dpq2.conv.to_d_types; 244 import dpq2.conv.to_bson; 245 246 auto conn = new Connection(connParam); 247 248 // Text type arguments testing 249 { 250 string sql_query = 251 "select now() as time, 'abc'::text as string, 123, 456.78\n"~ 252 "union all\n"~ 253 "select now(), 'абвгд'::text, 777, 910.11\n"~ 254 "union all\n"~ 255 "select NULL, 'ijk'::text, 789, 12345.115345"; 256 257 auto a = conn.exec( sql_query ); 258 259 assert( a.cmdStatus.length > 2 ); 260 assert( a.columnCount == 4 ); 261 assert( a.length == 3 ); 262 assert( a.columnFormat(1) == ValueFormat.TEXT ); 263 assert( a.columnFormat(2) == ValueFormat.TEXT ); 264 } 265 266 // Binary type arguments testing 267 { 268 import vibe.data.bson: Bson; 269 270 const string sql_query = 271 "select $1::text, $2::integer, $3::text, $4, $5::integer[]"; 272 273 Value[5] args; 274 args[0] = toValue("абвгд"); 275 args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value 276 args[2] = toValue("123"); 277 args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value 278 279 Bson binArray = Bson([ 280 Bson([Bson(null), Bson(123), Bson(456)]), 281 Bson([Bson(0), Bson(789), Bson(null)]) 282 ]); 283 284 args[4] = bsonToValue(binArray); 285 286 QueryParams p; 287 p.sqlCommand = sql_query; 288 p.args = args[]; 289 290 auto a = conn.execParams( p ); 291 292 foreach(i; 0 .. args.length) 293 assert(a.columnFormat(i) == ValueFormat.BINARY); 294 295 assert( a.OID(0) == OidType.Text ); 296 assert( a.OID(1) == OidType.Int4 ); 297 assert( a.OID(2) == OidType.Text ); 298 assert( a.OID(3) == OidType.Int8 ); 299 assert( a.OID(4) == OidType.Int4Array ); 300 301 // binary args array test 302 assert( a[0][4].as!Bson == binArray ); 303 } 304 305 { 306 // Bug #52: empty text argument 307 QueryParams p; 308 Value v = toValue(""); 309 310 p.sqlCommand = "SELECT $1"; 311 p.args = [v]; 312 313 auto a = conn.execParams(p); 314 315 assert( !a[0][0].isNull ); 316 assert( a[0][0].as!string == "" ); 317 } 318 319 // checking prepared statements 320 { 321 // uses PQprepare: 322 conn.prepareEx("prepared statement 1", "SELECT $1::integer"); 323 324 QueryParams p; 325 p.preparedStatementName = "prepared statement 1"; 326 p.args = [42.toValue]; 327 auto r = conn.execPrepared(p); 328 assert (r[0][0].as!int == 42); 329 } 330 { 331 // uses PQsendPrepare: 332 conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer"); 333 334 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 335 conn.consumeInput(); 336 337 immutable(Result)[] res; 338 339 while(true) 340 { 341 auto r = conn.getResult(); 342 if(r is null) break; 343 res ~= r; 344 } 345 346 assert(res.length == 1); 347 assert(res[0].status == PGRES_COMMAND_OK); 348 } 349 { 350 // check prepared arg types and result types 351 auto a = conn.describePrepared("prepared statement 2"); 352 353 assert(a.nParams == 2); 354 assert(a.paramType(0) == OidType.Text); 355 assert(a.paramType(1) == OidType.Int4); 356 } 357 { 358 // async check prepared arg types and result types 359 conn.sendDescribePrepared("prepared statement 2"); 360 361 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 362 conn.consumeInput(); 363 364 immutable(Result)[] res; 365 366 while(true) 367 { 368 auto r = conn.getResult(); 369 if(r is null) break; 370 res ~= r; 371 } 372 373 assert(res.length == 1); 374 assert(res[0].status == PGRES_COMMAND_OK); 375 376 auto a = res[0].getAnswer; 377 378 assert(a.nParams == 2); 379 assert(a.paramType(0) == OidType.Text); 380 assert(a.paramType(1) == OidType.Int4); 381 } 382 { 383 QueryParams p; 384 p.preparedStatementName = "prepared statement 2"; 385 p.argsFromArray = ["abc", "123456"]; 386 387 conn.sendQueryPrepared(p); 388 389 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 390 conn.consumeInput(); 391 392 immutable(Result)[] res; 393 394 while(true) 395 { 396 auto r = conn.getResult(); 397 if(r is null) break; 398 res ~= r; 399 } 400 401 assert(res.length == 1); 402 assert(res[0].getAnswer[0][0].as!PGtext == "abc"); 403 assert(res[0].getAnswer[0][1].as!PGinteger == 123456); 404 } 405 406 import std.socket; 407 conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection 408 409 { 410 bool exceptionFlag = false; 411 412 try conn.exec("SELECT 'abc'::text").getAnswer; 413 catch(ConnectionException e) 414 { 415 exceptionFlag = true; 416 assert(e.msg.length > 15); // error message check 417 } 418 finally 419 assert(exceptionFlag); 420 } 421 }