1 /// Query methods 2 module dpq2.query; 3 4 public import dpq2.args; 5 6 import dpq2.connection: Connection, ConnectionException; 7 import dpq2.result: Result; 8 import dpq2.value; 9 import dpq2.oids: OidType; 10 import derelict.pq.pq; 11 import core.time: Duration, dur; 12 import std.exception: enforce; 13 14 /// Extends Connection by adding query methods 15 /// 16 /// Just use it as Connection.* methods. 17 mixin template Queries() 18 { 19 /// Perform SQL query to DB 20 /// It uses the old wire protocol and all values are returned in textual 21 /// form. This means that the dpq2.conv.to_d_types.as template will likely 22 /// not work for anything but strings. 23 /// Try to used execParams instead, even if now parameters are present. 24 immutable (Answer) exec( string SQLcmd ) 25 { 26 auto pgResult = PQexec(conn, toStringz( SQLcmd )); 27 28 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 29 auto container = createResultContainer(cast(immutable) pgResult); 30 31 return new immutable Answer(container); 32 } 33 34 /// Perform SQL query to DB 35 immutable (Answer) execParams(in ref QueryParams qp) 36 { 37 auto p = InternalQueryParams(&qp); 38 auto pgResult = PQexecParams ( 39 conn, 40 p.command, 41 p.nParams, 42 p.paramTypes, 43 p.paramValues, 44 p.paramLengths, 45 p.paramFormats, 46 p.resultFormat 47 ); 48 49 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 50 auto container = createResultContainer(cast(immutable) pgResult); 51 52 return new immutable Answer(container); 53 } 54 55 /// Submits a command to the server without waiting for the result(s) 56 void sendQuery( string SQLcmd ) 57 { 58 const size_t r = PQsendQuery( conn, toStringz(SQLcmd) ); 59 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 60 } 61 62 /// Submits a command and separate parameters to the server without waiting for the result(s) 63 void sendQueryParams(in ref QueryParams qp) 64 { 65 auto p = InternalQueryParams(&qp); 66 size_t r = PQsendQueryParams ( 67 conn, 68 p.command, 69 p.nParams, 70 p.paramTypes, 71 p.paramValues, 72 p.paramLengths, 73 p.paramFormats, 74 p.resultFormat 75 ); 76 77 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 78 } 79 80 /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s) 81 void sendQueryPrepared(in ref QueryParams qp) 82 { 83 auto p = InternalQueryParams(&qp); 84 size_t r = PQsendQueryPrepared( 85 conn, 86 p.stmtName, 87 p.nParams, 88 p.paramValues, 89 p.paramLengths, 90 p.paramFormats, 91 p.resultFormat 92 ); 93 94 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 95 } 96 97 /// Returns null if no notifies was received 98 Notify getNextNotify() 99 { 100 consumeInput(); 101 auto n = PQnotifies(conn); 102 return n is null ? null : new Notify( n ); 103 } 104 105 /// Submits a request to create a prepared statement with the given parameters, and waits for completion. 106 /// Returns: Result of query preparing 107 immutable(Result) prepare(string statementName, string sqlStatement, in Oid[] oids = null) 108 { 109 PGresult* pgResult = PQprepare( 110 conn, 111 toStringz(statementName), 112 toStringz(sqlStatement), 113 oids.length.to!int, 114 oids.ptr 115 ); 116 117 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 118 auto container = createResultContainer(cast(immutable) pgResult); 119 120 return new immutable Result(container); 121 } 122 123 /// Submits a request to create a prepared statement with the given parameters, and waits for completion. 124 /// 125 /// Throws an exception if preparing failed. 126 void prepareEx(string statementName, string sqlStatement, in Oid[] oids = null) 127 { 128 auto r = prepare(statementName, sqlStatement, oids); 129 130 if(r.status != PGRES_COMMAND_OK) 131 throw new ResponseException(r, __FILE__, __LINE__); 132 } 133 134 /// Submits a request to execute a prepared statement with given parameters, and waits for completion. 135 immutable(Answer) execPrepared(in ref QueryParams qp) 136 { 137 auto p = InternalQueryParams(&qp); 138 auto pgResult = PQexecPrepared( 139 conn, 140 p.stmtName, 141 p.nParams, 142 cast(const(char*)*)p.paramValues, 143 p.paramLengths, 144 p.paramFormats, 145 p.resultFormat 146 ); 147 148 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 149 auto container = createResultContainer(cast(immutable) pgResult); 150 151 return new immutable Answer(container); 152 } 153 154 /// Sends a request to create a prepared statement with the given parameters, without waiting for completion. 155 void sendPrepare(string statementName, string sqlStatement, in Oid[] oids = null) 156 { 157 size_t r = PQsendPrepare( 158 conn, 159 toStringz(statementName), 160 toStringz(sqlStatement), 161 oids.length.to!int, 162 oids.ptr 163 ); 164 165 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 166 } 167 168 /// Submits a request to obtain information about the specified prepared statement, and waits for completion. 169 immutable(Answer) describePrepared(string statementName) 170 { 171 PGresult* pgResult = PQdescribePrepared(conn, toStringz(statementName)); 172 173 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 174 auto container = createResultContainer(cast(immutable) pgResult); 175 176 return new immutable Answer(container); 177 } 178 179 /// Submits a request to obtain information about the specified prepared statement, without waiting for completion. 180 void sendDescribePrepared(string statementName) 181 { 182 size_t r = PQsendDescribePrepared(conn, statementName.toStringz); 183 184 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 185 } 186 187 /// Sends a buffer of CSV data to the COPY command 188 /// 189 /// Returns: true if the data was queued, false if it was not queued because of full buffers (this will only happen in nonblocking mode) 190 bool putCopyData( string data ) 191 { 192 const int r = PQputCopyData(conn, data.toStringz, data.length.to!int); 193 194 if(r == -1) throw new ConnectionException(this); 195 196 return r != 0; 197 } 198 199 /// Signals that COPY data send is finished. Finalize and flush the COPY command. 200 immutable(Answer) putCopyEnd() 201 { 202 assert(!isNonBlocking, "Only for blocking connections"); 203 204 const bool r = sendPutCopyEnd; 205 206 assert(r, "Impossible status for blocking connections"); 207 208 // after the copying is finished, and there is no connection error, we must still get the command result 209 // this will get if there is any errors in the process (invalid data format or constraint violation, etc.) 210 auto pgResult = PQgetResult(conn); 211 212 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 213 auto container = createResultContainer(cast(immutable) pgResult); 214 215 return new immutable Answer(container); 216 } 217 218 /// Signals that COPY data send is finished. 219 /// 220 /// Returns: true if the termination data was sent, zero if it was not sent because the attempt would block (this case is only possible if the connection is in nonblocking mode) 221 bool sendPutCopyEnd() 222 { 223 const char* error; 224 const int r = PQputCopyEnd(conn, error); 225 226 if(error !is null) throw new ConnectionException(error.to!string); 227 228 if(r == -1) throw new ConnectionException(this); 229 230 return r != 0; 231 } 232 233 // Waiting for completion of reading or writing 234 // Returns: timeout is not occured 235 version(integration_tests) 236 bool waitEndOf(WaitType type, Duration timeout = Duration.zero) 237 { 238 import std.socket; 239 240 auto socket = this.socket(); 241 auto set = new SocketSet; 242 set.add(socket); 243 244 while(true) 245 { 246 if(status() == CONNECTION_BAD) 247 throw new ConnectionException(this, __FILE__, __LINE__); 248 249 if(poll() == PGRES_POLLING_OK) 250 { 251 return true; 252 } 253 else 254 { 255 size_t sockNum; 256 257 with(WaitType) 258 final switch(type) 259 { 260 case READ: 261 sockNum = Socket.select(set, null, set, timeout); 262 break; 263 264 case WRITE: 265 sockNum = Socket.select(null, set, set, timeout); 266 break; 267 268 case READ_WRITE: 269 sockNum = Socket.select(set, set, set, timeout); 270 break; 271 } 272 273 enforce(sockNum >= 0); 274 if(sockNum == 0) return false; // timeout is occurred 275 276 continue; 277 } 278 } 279 } 280 } 281 282 version(integration_tests) 283 enum WaitType 284 { 285 READ, 286 WRITE, 287 READ_WRITE 288 } 289 290 version (integration_tests) 291 void _integration_test( string connParam ) @trusted 292 { 293 import dpq2.conv.to_d_types; 294 import dpq2.conv.to_bson; 295 296 auto conn = new Connection(connParam); 297 298 // Text type arguments testing 299 { 300 string sql_query = 301 "select now() as time, 'abc'::text as string, 123, 456.78\n"~ 302 "union all\n"~ 303 "select now(), 'абвгд'::text, 777, 910.11\n"~ 304 "union all\n"~ 305 "select NULL, 'ijk'::text, 789, 12345.115345"; 306 307 auto a = conn.exec( sql_query ); 308 309 assert( a.cmdStatus.length > 2 ); 310 assert( a.columnCount == 4 ); 311 assert( a.length == 3 ); 312 assert( a.columnFormat(1) == ValueFormat.TEXT ); 313 assert( a.columnFormat(2) == ValueFormat.TEXT ); 314 } 315 316 // Binary type arguments testing 317 { 318 import vibe.data.bson: Bson; 319 320 const string sql_query = 321 "select $1::text, $2::integer, $3::text, $4, $5::integer[]"; 322 323 Value[5] args; 324 args[0] = toValue("абвгд"); 325 args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value 326 args[2] = toValue("123"); 327 args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value 328 329 Bson binArray = Bson([ 330 Bson([Bson(null), Bson(123), Bson(456)]), 331 Bson([Bson(0), Bson(789), Bson(null)]) 332 ]); 333 334 args[4] = bsonToValue(binArray); 335 336 QueryParams p; 337 p.sqlCommand = sql_query; 338 p.args = args[]; 339 340 auto a = conn.execParams( p ); 341 342 foreach(i; 0 .. args.length) 343 assert(a.columnFormat(i) == ValueFormat.BINARY); 344 345 assert( a.OID(0) == OidType.Text ); 346 assert( a.OID(1) == OidType.Int4 ); 347 assert( a.OID(2) == OidType.Text ); 348 assert( a.OID(3) == OidType.Int8 ); 349 assert( a.OID(4) == OidType.Int4Array ); 350 351 // binary args array test 352 assert( a[0][4].as!Bson == binArray ); 353 } 354 355 { 356 // Bug #52: empty text argument 357 QueryParams p; 358 Value v = toValue(""); 359 360 p.sqlCommand = "SELECT $1"; 361 p.args = [v]; 362 363 auto a = conn.execParams(p); 364 365 assert( !a[0][0].isNull ); 366 assert( a[0][0].as!string == "" ); 367 } 368 369 // checking prepared statements 370 { 371 // uses PQprepare: 372 conn.prepareEx("prepared statement 1", "SELECT $1::integer"); 373 374 QueryParams p; 375 p.preparedStatementName = "prepared statement 1"; 376 p.args = [42.toValue]; 377 auto r = conn.execPrepared(p); 378 assert (r[0][0].as!int == 42); 379 } 380 { 381 // uses PQsendPrepare: 382 conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer"); 383 384 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 385 conn.consumeInput(); 386 387 immutable(Result)[] res; 388 389 while(true) 390 { 391 auto r = conn.getResult(); 392 if(r is null) break; 393 res ~= r; 394 } 395 396 assert(res.length == 1); 397 assert(res[0].status == PGRES_COMMAND_OK); 398 } 399 { 400 // check prepared arg types and result types 401 auto a = conn.describePrepared("prepared statement 2"); 402 403 assert(a.nParams == 2); 404 assert(a.paramType(0) == OidType.Text); 405 assert(a.paramType(1) == OidType.Int4); 406 } 407 { 408 // async check prepared arg types and result types 409 conn.sendDescribePrepared("prepared statement 2"); 410 411 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 412 conn.consumeInput(); 413 414 immutable(Result)[] res; 415 416 while(true) 417 { 418 auto r = conn.getResult(); 419 if(r is null) break; 420 res ~= r; 421 } 422 423 assert(res.length == 1); 424 assert(res[0].status == PGRES_COMMAND_OK); 425 426 auto a = res[0].getAnswer; 427 428 assert(a.nParams == 2); 429 assert(a.paramType(0) == OidType.Text); 430 assert(a.paramType(1) == OidType.Int4); 431 } 432 { 433 QueryParams p; 434 p.preparedStatementName = "prepared statement 2"; 435 p.argsFromArray = ["abc", "123456"]; 436 437 conn.sendQueryPrepared(p); 438 439 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 440 conn.consumeInput(); 441 442 immutable(Result)[] res; 443 444 while(true) 445 { 446 auto r = conn.getResult(); 447 if(r is null) break; 448 res ~= r; 449 } 450 451 assert(res.length == 1); 452 assert(res[0].getAnswer[0][0].as!PGtext == "abc"); 453 assert(res[0].getAnswer[0][1].as!PGinteger == 123456); 454 } 455 { 456 // test COPY 457 conn.exec("CREATE TEMP TABLE test_copy (text_field TEXT, int_field INT8)"); 458 459 conn.exec("COPY test_copy FROM STDIN WITH (FORMAT csv)"); 460 conn.putCopyData("Val1,1\nval2,2\n"); 461 conn.putCopyData("Val3,3\nval4,4\n"); 462 conn.putCopyEnd(); 463 464 auto res = conn.exec("SELECT count(text_field), sum(int_field) FROM test_copy"); 465 assert(res.length == 1); 466 assert(res[0][0].as!string == "4"); 467 assert(res[0][1].as!string == "10"); 468 469 // This time with error 470 import std.exception: assertThrown; 471 import dpq2.result: ResponseException; 472 473 conn.exec("COPY test_copy FROM STDIN WITH (FORMAT csv)"); 474 conn.putCopyData("Val1,2\nval2,4,POORLY_FORMATTED_CSV\n"); 475 476 assertThrown!ResponseException(conn.putCopyEnd()); 477 } 478 479 import std.socket; 480 conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection 481 482 { 483 bool exceptionFlag = false; 484 string errorMsg; 485 486 try conn.exec("SELECT 'abc'::text").getAnswer; 487 catch(ConnectionException e) 488 { 489 exceptionFlag = true; 490 errorMsg = e.msg; 491 assert(e.msg.length > 15); // error message check 492 } 493 finally { 494 assert(exceptionFlag, errorMsg); 495 } 496 } 497 }