1 /// Query methods 2 module dpq2.query; 3 4 public import dpq2.args; 5 6 import dpq2.connection: Connection, ConnectionException; 7 import dpq2.result: Result; 8 import dpq2.value; 9 import dpq2.oids: OidType; 10 import derelict.pq.pq; 11 import core.time: Duration, dur; 12 import std.exception: enforce; 13 14 /// Extends Connection by adding query methods 15 /// 16 /// Just use it as Connection.* methods. 17 mixin template Queries() 18 { 19 /// Perform SQL query to DB 20 /// It uses the old wire protocol and all values are returned in textual 21 /// form. This means that the dpq2.conv.to_d_types.as template will likely 22 /// not work for anything but strings. 23 /// Try to used execParams instead, even if now parameters are present. 24 immutable (Answer) exec( string SQLcmd ) 25 { 26 auto pgResult = PQexec(conn, toStringz( SQLcmd )); 27 28 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 29 auto container = createResultContainer(cast(immutable) pgResult); 30 31 return new immutable Answer(container); 32 } 33 34 /// Perform SQL query to DB 35 immutable (Answer) execParams(in ref QueryParams qp) 36 { 37 auto p = InternalQueryParams(&qp); 38 auto pgResult = PQexecParams ( 39 conn, 40 p.command, 41 p.nParams, 42 p.paramTypes, 43 p.paramValues, 44 p.paramLengths, 45 p.paramFormats, 46 p.resultFormat 47 ); 48 49 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 50 auto container = createResultContainer(cast(immutable) pgResult); 51 52 return new immutable Answer(container); 53 } 54 55 /// Submits a command to the server without waiting for the result(s) 56 void sendQuery( string SQLcmd ) 57 { 58 const size_t r = PQsendQuery( conn, toStringz(SQLcmd) ); 59 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 60 } 61 62 /// Submits a command and separate parameters to the server without waiting for the result(s) 63 void sendQueryParams(in ref QueryParams qp) 64 { 65 auto p = InternalQueryParams(&qp); 66 size_t r = PQsendQueryParams ( 67 conn, 68 p.command, 69 p.nParams, 70 p.paramTypes, 71 p.paramValues, 72 p.paramLengths, 73 p.paramFormats, 74 p.resultFormat 75 ); 76 77 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 78 } 79 80 /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s) 81 void sendQueryPrepared(in ref QueryParams qp) 82 { 83 auto p = InternalQueryParams(&qp); 84 size_t r = PQsendQueryPrepared( 85 conn, 86 p.stmtName, 87 p.nParams, 88 p.paramValues, 89 p.paramLengths, 90 p.paramFormats, 91 p.resultFormat 92 ); 93 94 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 95 } 96 97 /// Returns null if no notifies was received 98 Notify getNextNotify() 99 { 100 consumeInput(); 101 auto n = PQnotifies(conn); 102 return n is null ? null : new Notify( n ); 103 } 104 105 /// Submits a request to create a prepared statement with the given parameters, and waits for completion. 106 /// Returns: Result of query preparing 107 immutable(Result) prepare(string statementName, string sqlStatement, in Oid[] oids = null) 108 { 109 PGresult* pgResult = PQprepare( 110 conn, 111 toStringz(statementName), 112 toStringz(sqlStatement), 113 oids.length.to!int, 114 oids.ptr 115 ); 116 117 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 118 auto container = createResultContainer(cast(immutable) pgResult); 119 120 return new immutable Result(container); 121 } 122 123 /// Submits a request to create a prepared statement with the given parameters, and waits for completion. 124 /// 125 /// Throws an exception if preparing failed. 126 void prepareEx(string statementName, string sqlStatement, in Oid[] oids = null) 127 { 128 auto r = prepare(statementName, sqlStatement, oids); 129 130 if(r.status != PGRES_COMMAND_OK) 131 throw new ResponseException(r, __FILE__, __LINE__); 132 } 133 134 /// Submits a request to execute a prepared statement with given parameters, and waits for completion. 135 immutable(Answer) execPrepared(in ref QueryParams qp) 136 { 137 auto p = InternalQueryParams(&qp); 138 auto pgResult = PQexecPrepared( 139 conn, 140 p.stmtName, 141 p.nParams, 142 cast(const(char*)*)p.paramValues, 143 p.paramLengths, 144 p.paramFormats, 145 p.resultFormat 146 ); 147 148 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 149 auto container = createResultContainer(cast(immutable) pgResult); 150 151 return new immutable Answer(container); 152 } 153 154 /// Sends a request to create a prepared statement with the given parameters, without waiting for completion. 155 void sendPrepare(string statementName, string sqlStatement, in Oid[] oids = null) 156 { 157 size_t r = PQsendPrepare( 158 conn, 159 toStringz(statementName), 160 toStringz(sqlStatement), 161 oids.length.to!int, 162 oids.ptr 163 ); 164 165 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 166 } 167 168 /// Submits a request to obtain information about the specified prepared statement, and waits for completion. 169 immutable(Answer) describePrepared(string statementName) 170 { 171 PGresult* pgResult = PQdescribePrepared(conn, toStringz(statementName)); 172 173 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 174 auto container = createResultContainer(cast(immutable) pgResult); 175 176 return new immutable Answer(container); 177 } 178 179 /// Submits a request to obtain information about the specified prepared statement, without waiting for completion. 180 void sendDescribePrepared(string statementName) 181 { 182 size_t r = PQsendDescribePrepared(conn, statementName.toStringz); 183 184 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 185 } 186 187 /// Submits a request to obtain information about the specified portal, and waits for completion. 188 immutable(Answer) describePortal(string portalName) 189 { 190 PGresult* pgResult = PQdescribePortal(conn, portalName.toStringz); 191 192 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 193 auto container = createResultContainer(cast(immutable) pgResult); 194 195 return new immutable Answer(container); 196 } 197 198 /// Sends a buffer of CSV data to the COPY command 199 /// 200 /// Returns: true if the data was queued, false if it was not queued because of full buffers (this will only happen in nonblocking mode) 201 bool putCopyData( string data ) 202 { 203 const int r = PQputCopyData(conn, data.toStringz, data.length.to!int); 204 205 if(r == -1) throw new ConnectionException(this); 206 207 return r != 0; 208 } 209 210 /// Signals that COPY data send is finished. Finalize and flush the COPY command. 211 immutable(Answer) putCopyEnd() 212 { 213 assert(!isNonBlocking, "Only for blocking connections"); 214 215 const bool r = sendPutCopyEnd; 216 217 assert(r, "Impossible status for blocking connections"); 218 219 // after the copying is finished, and there is no connection error, we must still get the command result 220 // this will get if there is any errors in the process (invalid data format or constraint violation, etc.) 221 auto pgResult = PQgetResult(conn); 222 223 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 224 auto container = createResultContainer(cast(immutable) pgResult); 225 226 return new immutable Answer(container); 227 } 228 229 /// Signals that COPY data send is finished. 230 /// 231 /// Returns: true if the termination data was sent, zero if it was not sent because the attempt would block (this case is only possible if the connection is in nonblocking mode) 232 bool sendPutCopyEnd() 233 { 234 const char* error; 235 const int r = PQputCopyEnd(conn, error); 236 237 if(error !is null) throw new ConnectionException(error.to!string); 238 239 if(r == -1) throw new ConnectionException(this); 240 241 return r != 0; 242 } 243 244 // Waiting for completion of reading or writing 245 // Returns: timeout is not occured 246 version(integration_tests) 247 bool waitEndOf(WaitType type, Duration timeout = Duration.zero) 248 { 249 import std.socket; 250 251 auto socket = this.socket(); 252 auto set = new SocketSet; 253 set.add(socket); 254 255 while(true) 256 { 257 if(status() == CONNECTION_BAD) 258 throw new ConnectionException(this, __FILE__, __LINE__); 259 260 if(poll() == PGRES_POLLING_OK) 261 { 262 return true; 263 } 264 else 265 { 266 size_t sockNum; 267 268 with(WaitType) 269 final switch(type) 270 { 271 case READ: 272 sockNum = Socket.select(set, null, set, timeout); 273 break; 274 275 case WRITE: 276 sockNum = Socket.select(null, set, set, timeout); 277 break; 278 279 case READ_WRITE: 280 sockNum = Socket.select(set, set, set, timeout); 281 break; 282 } 283 284 enforce(sockNum >= 0); 285 if(sockNum == 0) return false; // timeout is occurred 286 287 continue; 288 } 289 } 290 } 291 } 292 293 version(integration_tests) 294 enum WaitType 295 { 296 READ, 297 WRITE, 298 READ_WRITE 299 } 300 301 version (integration_tests) 302 void _integration_test( string connParam ) @trusted 303 { 304 import dpq2.conv.to_d_types; 305 import dpq2.conv.to_bson; 306 import dpq2.connection: createTestConn; 307 308 auto conn = createTestConn(connParam); 309 310 // Text type arguments testing 311 { 312 string sql_query = 313 "select now() as time, 'abc'::text as string, 123, 456.78\n"~ 314 "union all\n"~ 315 "select now(), 'абвгд'::text, 777, 910.11\n"~ 316 "union all\n"~ 317 "select NULL, 'ijk'::text, 789, 12345.115345"; 318 319 auto a = conn.exec( sql_query ); 320 321 assert( a.cmdStatus.length > 2 ); 322 assert( a.columnCount == 4 ); 323 assert( a.length == 3 ); 324 assert( a.columnFormat(1) == ValueFormat.TEXT ); 325 assert( a.columnFormat(2) == ValueFormat.TEXT ); 326 } 327 328 // Binary type arguments testing 329 { 330 import vibe.data.bson: Bson; 331 332 const string sql_query = 333 "select $1::text, $2::integer, $3::text, $4, $5::integer[]"; 334 335 Value[5] args; 336 args[0] = toValue("абвгд"); 337 args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value 338 args[2] = toValue("123"); 339 args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value 340 341 Bson binArray = Bson([ 342 Bson([Bson(null), Bson(123), Bson(456)]), 343 Bson([Bson(0), Bson(789), Bson(null)]) 344 ]); 345 346 args[4] = bsonToValue(binArray); 347 348 QueryParams p; 349 p.sqlCommand = sql_query; 350 p.args = args[]; 351 352 auto a = conn.execParams( p ); 353 354 foreach(i; 0 .. args.length) 355 assert(a.columnFormat(i) == ValueFormat.BINARY); 356 357 assert( a.OID(0) == OidType.Text ); 358 assert( a.OID(1) == OidType.Int4 ); 359 assert( a.OID(2) == OidType.Text ); 360 assert( a.OID(3) == OidType.Int8 ); 361 assert( a.OID(4) == OidType.Int4Array ); 362 363 // binary args array test 364 assert( a[0][4].as!Bson == binArray ); 365 } 366 367 { 368 // Bug #52: empty text argument 369 QueryParams p; 370 Value v = toValue(""); 371 372 p.sqlCommand = "SELECT $1"; 373 p.args = [v]; 374 375 auto a = conn.execParams(p); 376 377 assert( !a[0][0].isNull ); 378 assert( a[0][0].as!string == "" ); 379 } 380 381 // checking prepared statements 382 { 383 // uses PQprepare: 384 conn.prepareEx("prepared statement 1", "SELECT $1::integer"); 385 386 QueryParams p; 387 p.preparedStatementName = "prepared statement 1"; 388 p.args = [42.toValue]; 389 auto r = conn.execPrepared(p); 390 assert (r[0][0].as!int == 42); 391 } 392 { 393 // uses PQsendPrepare: 394 conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer"); 395 396 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 397 conn.consumeInput(); 398 399 immutable(Result)[] res; 400 401 while(true) 402 { 403 auto r = conn.getResult(); 404 if(r is null) break; 405 res ~= r; 406 } 407 408 assert(res.length == 1); 409 assert(res[0].status == PGRES_COMMAND_OK); 410 } 411 { 412 // check prepared arg types and result types 413 auto a = conn.describePrepared("prepared statement 2"); 414 415 assert(a.nParams == 2); 416 assert(a.paramType(0) == OidType.Text); 417 assert(a.paramType(1) == OidType.Int4); 418 } 419 420 // checking portal description 421 { 422 conn.exec(`BEGIN`); 423 conn.exec(`DECLARE test_cursor1 CURSOR FOR SELECT 123::integer`); 424 auto r = conn.describePortal(`test_cursor1`); 425 conn.exec(`COMMIT`); 426 } 427 428 { 429 // async check prepared arg types and result types 430 conn.sendDescribePrepared("prepared statement 2"); 431 432 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 433 conn.consumeInput(); 434 435 immutable(Result)[] res; 436 437 while(true) 438 { 439 auto r = conn.getResult(); 440 if(r is null) break; 441 res ~= r; 442 } 443 444 assert(res.length == 1); 445 assert(res[0].status == PGRES_COMMAND_OK); 446 447 auto a = res[0].getAnswer; 448 449 assert(a.nParams == 2); 450 assert(a.paramType(0) == OidType.Text); 451 assert(a.paramType(1) == OidType.Int4); 452 } 453 { 454 QueryParams p; 455 p.preparedStatementName = "prepared statement 2"; 456 p.argsFromArray = ["abc", "123456"]; 457 458 conn.sendQueryPrepared(p); 459 460 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 461 conn.consumeInput(); 462 463 immutable(Result)[] res; 464 465 while(true) 466 { 467 auto r = conn.getResult(); 468 if(r is null) break; 469 res ~= r; 470 } 471 472 assert(res.length == 1); 473 assert(res[0].getAnswer[0][0].as!PGtext == "abc"); 474 assert(res[0].getAnswer[0][1].as!PGinteger == 123456); 475 } 476 { 477 // test COPY 478 conn.exec("CREATE TEMP TABLE test_copy (text_field TEXT, int_field INT8)"); 479 480 conn.exec("COPY test_copy FROM STDIN WITH (FORMAT csv)"); 481 conn.putCopyData("Val1,1\nval2,2\n"); 482 conn.putCopyData("Val3,3\nval4,4\n"); 483 conn.putCopyEnd(); 484 485 auto res = conn.exec("SELECT count(text_field), sum(int_field) FROM test_copy"); 486 assert(res.length == 1); 487 assert(res[0][0].as!string == "4"); 488 assert(res[0][1].as!string == "10"); 489 490 // This time with error 491 import std.exception: assertThrown; 492 import dpq2.result: ResponseException; 493 494 conn.exec("COPY test_copy FROM STDIN WITH (FORMAT csv)"); 495 conn.putCopyData("Val1,2\nval2,4,POORLY_FORMATTED_CSV\n"); 496 497 assertThrown!ResponseException(conn.putCopyEnd()); 498 } 499 500 import std.socket; 501 conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection 502 503 { 504 bool exceptionFlag = false; 505 string errorMsg; 506 507 try conn.exec("SELECT 'abc'::text").getAnswer; 508 catch(ConnectionException e) 509 { 510 exceptionFlag = true; 511 errorMsg = e.msg; 512 assert(e.msg.length > 15); // error message check 513 } 514 finally { 515 assert(exceptionFlag, errorMsg); 516 } 517 } 518 }