1 /// Query methods 2 module dpq2.query; 3 4 public import dpq2.args; 5 6 import dpq2.connection: Connection, ConnectionException; 7 import dpq2.result: Result; 8 import dpq2.value; 9 import dpq2.oids: OidType; 10 import derelict.pq.pq; 11 import core.time: Duration, dur; 12 import std.exception: enforce; 13 14 /// Extends Connection by adding query methods 15 /// 16 /// Just use it as Connection.* methods. 17 mixin template Queries() 18 { 19 /// Perform SQL query to DB 20 /// It uses the old wire protocol and all values are returned in textual 21 /// form. This means that the dpq2.conv.to_d_types.as template will likely 22 /// not work for anything but strings. 23 /// Try to used execParams instead, even if now parameters are present. 24 immutable (Answer) exec( string SQLcmd ) 25 { 26 auto pgResult = PQexec(conn, toStringz( SQLcmd )); 27 28 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 29 auto container = createResultContainer(cast(immutable) pgResult); 30 31 return new immutable Answer(container); 32 } 33 34 /// Perform SQL query to DB 35 immutable (Answer) execParams(in ref QueryParams qp) 36 { 37 auto p = InternalQueryParams(&qp); 38 auto pgResult = PQexecParams ( 39 conn, 40 p.command, 41 p.nParams, 42 p.paramTypes, 43 p.paramValues, 44 p.paramLengths, 45 p.paramFormats, 46 p.resultFormat 47 ); 48 49 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 50 auto container = createResultContainer(cast(immutable) pgResult); 51 52 return new immutable Answer(container); 53 } 54 55 /// Submits a command to the server without waiting for the result(s) 56 void sendQuery( string SQLcmd ) 57 { 58 const size_t r = PQsendQuery( conn, toStringz(SQLcmd) ); 59 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 60 } 61 62 /// Submits a command and separate parameters to the server without waiting for the result(s) 63 void sendQueryParams(in ref QueryParams qp) 64 { 65 auto p = InternalQueryParams(&qp); 66 size_t r = PQsendQueryParams ( 67 conn, 68 p.command, 69 p.nParams, 70 p.paramTypes, 71 p.paramValues, 72 p.paramLengths, 73 p.paramFormats, 74 p.resultFormat 75 ); 76 77 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 78 } 79 80 /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s) 81 void sendQueryPrepared(in ref QueryParams qp) 82 { 83 auto p = InternalQueryParams(&qp); 84 size_t r = PQsendQueryPrepared( 85 conn, 86 p.stmtName, 87 p.nParams, 88 p.paramValues, 89 p.paramLengths, 90 p.paramFormats, 91 p.resultFormat 92 ); 93 94 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 95 } 96 97 /// Returns null if no notifies was received 98 Notify getNextNotify() 99 { 100 consumeInput(); 101 auto n = PQnotifies(conn); 102 return n is null ? null : new Notify( n ); 103 } 104 105 /// Submits a request to create a prepared statement with the given parameters, and waits for completion. 106 /// Returns: Result of query preparing 107 immutable(Result) prepare(string statementName, string sqlStatement, in Oid[] oids = null) 108 { 109 PGresult* pgResult = PQprepare( 110 conn, 111 toStringz(statementName), 112 toStringz(sqlStatement), 113 oids.length.to!int, 114 oids.ptr 115 ); 116 117 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 118 auto container = createResultContainer(cast(immutable) pgResult); 119 120 return new immutable Result(container); 121 } 122 123 /// Submits a request to create a prepared statement with the given parameters, and waits for completion. 124 /// 125 /// Throws an exception if preparing failed. 126 void prepareEx(string statementName, string sqlStatement, in Oid[] oids = null) 127 { 128 auto r = prepare(statementName, sqlStatement, oids); 129 130 if(r.status != PGRES_COMMAND_OK) 131 throw new ResponseException(r, __FILE__, __LINE__); 132 } 133 134 /// Submits a request to execute a prepared statement with given parameters, and waits for completion. 135 immutable(Answer) execPrepared(in ref QueryParams qp) 136 { 137 auto p = InternalQueryParams(&qp); 138 auto pgResult = PQexecPrepared( 139 conn, 140 p.stmtName, 141 p.nParams, 142 cast(const(char*)*)p.paramValues, 143 p.paramLengths, 144 p.paramFormats, 145 p.resultFormat 146 ); 147 148 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 149 auto container = createResultContainer(cast(immutable) pgResult); 150 151 return new immutable Answer(container); 152 } 153 154 /// Sends a request to create a prepared statement with the given parameters, without waiting for completion. 155 void sendPrepare(string statementName, string sqlStatement, in Oid[] oids = null) 156 { 157 size_t r = PQsendPrepare( 158 conn, 159 toStringz(statementName), 160 toStringz(sqlStatement), 161 oids.length.to!int, 162 oids.ptr 163 ); 164 165 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 166 } 167 168 /// Submits a request to obtain information about the specified prepared statement, and waits for completion. 169 immutable(Answer) describePrepared(string statementName) 170 { 171 PGresult* pgResult = PQdescribePrepared(conn, toStringz(statementName)); 172 173 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 174 auto container = createResultContainer(cast(immutable) pgResult); 175 176 return new immutable Answer(container); 177 } 178 179 /// Submits a request to obtain information about the specified prepared statement, without waiting for completion. 180 void sendDescribePrepared(string statementName) 181 { 182 size_t r = PQsendDescribePrepared(conn, statementName.toStringz); 183 184 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 185 } 186 187 /// Submits a request to obtain information about the specified portal, and waits for completion. 188 immutable(Answer) describePortal(string portalName) 189 { 190 PGresult* pgResult = PQdescribePortal(conn, portalName.toStringz); 191 192 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 193 auto container = createResultContainer(cast(immutable) pgResult); 194 195 return new immutable Answer(container); 196 } 197 198 /// Sends a buffer of CSV data to the COPY command 199 /// 200 /// Returns: true if the data was queued, false if it was not queued because of full buffers (this will only happen in nonblocking mode) 201 bool putCopyData( string data ) 202 { 203 const int r = PQputCopyData(conn, data.toStringz, data.length.to!int); 204 205 if(r == -1) throw new ConnectionException(this); 206 207 return r != 0; 208 } 209 210 /// Signals that COPY data send is finished. Finalize and flush the COPY command. 211 immutable(Answer) putCopyEnd() 212 { 213 assert(!isNonBlocking, "Only for blocking connections"); 214 215 const bool r = sendPutCopyEnd; 216 217 assert(r, "Impossible status for blocking connections"); 218 219 // after the copying is finished, and there is no connection error, we must still get the command result 220 // this will get if there is any errors in the process (invalid data format or constraint violation, etc.) 221 auto pgResult = PQgetResult(conn); 222 223 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 224 auto container = createResultContainer(cast(immutable) pgResult); 225 226 return new immutable Answer(container); 227 } 228 229 /// Signals that COPY data send is finished. 230 /// 231 /// Returns: true if the termination data was sent, zero if it was not sent because the attempt would block (this case is only possible if the connection is in nonblocking mode) 232 bool sendPutCopyEnd() 233 { 234 const char* error; 235 const int r = PQputCopyEnd(conn, error); 236 237 if(error !is null) throw new ConnectionException(error.to!string); 238 239 if(r == -1) throw new ConnectionException(this); 240 241 return r != 0; 242 } 243 244 // Waiting for completion of reading or writing 245 // Returns: timeout is not occured 246 version(integration_tests) 247 bool waitEndOf(WaitType type, Duration timeout = Duration.zero) 248 { 249 import std.socket; 250 251 auto socket = this.socket(); 252 auto set = new SocketSet; 253 set.add(socket); 254 255 while(true) 256 { 257 if(status() == CONNECTION_BAD) 258 throw new ConnectionException(this, __FILE__, __LINE__); 259 260 if(poll() == PGRES_POLLING_OK) 261 { 262 return true; 263 } 264 else 265 { 266 size_t sockNum; 267 268 with(WaitType) 269 final switch(type) 270 { 271 case READ: 272 sockNum = Socket.select(set, null, set, timeout); 273 break; 274 275 case WRITE: 276 sockNum = Socket.select(null, set, set, timeout); 277 break; 278 279 case READ_WRITE: 280 sockNum = Socket.select(set, set, set, timeout); 281 break; 282 } 283 284 enforce(sockNum >= 0); 285 if(sockNum == 0) return false; // timeout is occurred 286 287 continue; 288 } 289 } 290 } 291 } 292 293 version(integration_tests) 294 enum WaitType 295 { 296 READ, 297 WRITE, 298 READ_WRITE 299 } 300 301 version (integration_tests) 302 void _integration_test( string connParam ) @trusted 303 { 304 import dpq2.conv.to_d_types; 305 import dpq2.conv.to_bson; 306 307 auto conn = new Connection(connParam); 308 309 // Text type arguments testing 310 { 311 string sql_query = 312 "select now() as time, 'abc'::text as string, 123, 456.78\n"~ 313 "union all\n"~ 314 "select now(), 'абвгд'::text, 777, 910.11\n"~ 315 "union all\n"~ 316 "select NULL, 'ijk'::text, 789, 12345.115345"; 317 318 auto a = conn.exec( sql_query ); 319 320 assert( a.cmdStatus.length > 2 ); 321 assert( a.columnCount == 4 ); 322 assert( a.length == 3 ); 323 assert( a.columnFormat(1) == ValueFormat.TEXT ); 324 assert( a.columnFormat(2) == ValueFormat.TEXT ); 325 } 326 327 // Binary type arguments testing 328 { 329 import vibe.data.bson: Bson; 330 331 const string sql_query = 332 "select $1::text, $2::integer, $3::text, $4, $5::integer[]"; 333 334 Value[5] args; 335 args[0] = toValue("абвгд"); 336 args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value 337 args[2] = toValue("123"); 338 args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value 339 340 Bson binArray = Bson([ 341 Bson([Bson(null), Bson(123), Bson(456)]), 342 Bson([Bson(0), Bson(789), Bson(null)]) 343 ]); 344 345 args[4] = bsonToValue(binArray); 346 347 QueryParams p; 348 p.sqlCommand = sql_query; 349 p.args = args[]; 350 351 auto a = conn.execParams( p ); 352 353 foreach(i; 0 .. args.length) 354 assert(a.columnFormat(i) == ValueFormat.BINARY); 355 356 assert( a.OID(0) == OidType.Text ); 357 assert( a.OID(1) == OidType.Int4 ); 358 assert( a.OID(2) == OidType.Text ); 359 assert( a.OID(3) == OidType.Int8 ); 360 assert( a.OID(4) == OidType.Int4Array ); 361 362 // binary args array test 363 assert( a[0][4].as!Bson == binArray ); 364 } 365 366 { 367 // Bug #52: empty text argument 368 QueryParams p; 369 Value v = toValue(""); 370 371 p.sqlCommand = "SELECT $1"; 372 p.args = [v]; 373 374 auto a = conn.execParams(p); 375 376 assert( !a[0][0].isNull ); 377 assert( a[0][0].as!string == "" ); 378 } 379 380 // checking prepared statements 381 { 382 // uses PQprepare: 383 conn.prepareEx("prepared statement 1", "SELECT $1::integer"); 384 385 QueryParams p; 386 p.preparedStatementName = "prepared statement 1"; 387 p.args = [42.toValue]; 388 auto r = conn.execPrepared(p); 389 assert (r[0][0].as!int == 42); 390 } 391 { 392 // uses PQsendPrepare: 393 conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer"); 394 395 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 396 conn.consumeInput(); 397 398 immutable(Result)[] res; 399 400 while(true) 401 { 402 auto r = conn.getResult(); 403 if(r is null) break; 404 res ~= r; 405 } 406 407 assert(res.length == 1); 408 assert(res[0].status == PGRES_COMMAND_OK); 409 } 410 { 411 // check prepared arg types and result types 412 auto a = conn.describePrepared("prepared statement 2"); 413 414 assert(a.nParams == 2); 415 assert(a.paramType(0) == OidType.Text); 416 assert(a.paramType(1) == OidType.Int4); 417 } 418 419 // checking portal description 420 { 421 conn.exec(`BEGIN`); 422 conn.exec(`DECLARE test_cursor1 CURSOR FOR SELECT 123::integer`); 423 auto r = conn.describePortal(`test_cursor1`); 424 conn.exec(`COMMIT`); 425 } 426 427 { 428 // async check prepared arg types and result types 429 conn.sendDescribePrepared("prepared statement 2"); 430 431 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 432 conn.consumeInput(); 433 434 immutable(Result)[] res; 435 436 while(true) 437 { 438 auto r = conn.getResult(); 439 if(r is null) break; 440 res ~= r; 441 } 442 443 assert(res.length == 1); 444 assert(res[0].status == PGRES_COMMAND_OK); 445 446 auto a = res[0].getAnswer; 447 448 assert(a.nParams == 2); 449 assert(a.paramType(0) == OidType.Text); 450 assert(a.paramType(1) == OidType.Int4); 451 } 452 { 453 QueryParams p; 454 p.preparedStatementName = "prepared statement 2"; 455 p.argsFromArray = ["abc", "123456"]; 456 457 conn.sendQueryPrepared(p); 458 459 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 460 conn.consumeInput(); 461 462 immutable(Result)[] res; 463 464 while(true) 465 { 466 auto r = conn.getResult(); 467 if(r is null) break; 468 res ~= r; 469 } 470 471 assert(res.length == 1); 472 assert(res[0].getAnswer[0][0].as!PGtext == "abc"); 473 assert(res[0].getAnswer[0][1].as!PGinteger == 123456); 474 } 475 { 476 // test COPY 477 conn.exec("CREATE TEMP TABLE test_copy (text_field TEXT, int_field INT8)"); 478 479 conn.exec("COPY test_copy FROM STDIN WITH (FORMAT csv)"); 480 conn.putCopyData("Val1,1\nval2,2\n"); 481 conn.putCopyData("Val3,3\nval4,4\n"); 482 conn.putCopyEnd(); 483 484 auto res = conn.exec("SELECT count(text_field), sum(int_field) FROM test_copy"); 485 assert(res.length == 1); 486 assert(res[0][0].as!string == "4"); 487 assert(res[0][1].as!string == "10"); 488 489 // This time with error 490 import std.exception: assertThrown; 491 import dpq2.result: ResponseException; 492 493 conn.exec("COPY test_copy FROM STDIN WITH (FORMAT csv)"); 494 conn.putCopyData("Val1,2\nval2,4,POORLY_FORMATTED_CSV\n"); 495 496 assertThrown!ResponseException(conn.putCopyEnd()); 497 } 498 499 import std.socket; 500 conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection 501 502 { 503 bool exceptionFlag = false; 504 string errorMsg; 505 506 try conn.exec("SELECT 'abc'::text").getAnswer; 507 catch(ConnectionException e) 508 { 509 exceptionFlag = true; 510 errorMsg = e.msg; 511 assert(e.msg.length > 15); // error message check 512 } 513 finally { 514 assert(exceptionFlag, errorMsg); 515 } 516 } 517 }