1 /// Query methods 2 module dpq2.query; 3 4 public import dpq2.args; 5 6 import dpq2.connection: Connection, ConnectionException; 7 import dpq2.result: Result; 8 import dpq2.value; 9 import dpq2.oids: OidType; 10 import derelict.pq.pq; 11 import core.time: Duration, dur; 12 import std.exception: enforce; 13 14 /// Extends Connection by adding query methods 15 /// 16 /// Just use it as Connection.* methods. 17 mixin template Queries() 18 { 19 /// Perform SQL query to DB 20 immutable (Answer) exec( string SQLcmd ) 21 { 22 auto pgResult = PQexec(conn, toStringz( SQLcmd )); 23 24 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 25 auto container = createResultContainer(cast(immutable) pgResult); 26 27 return new immutable Answer(container); 28 } 29 30 /// Perform SQL query to DB 31 immutable (Answer) execParams(in ref QueryParams qp) 32 { 33 auto p = InternalQueryParams(&qp); 34 auto pgResult = PQexecParams ( 35 conn, 36 p.command, 37 p.nParams, 38 p.paramTypes, 39 p.paramValues, 40 p.paramLengths, 41 p.paramFormats, 42 p.resultFormat 43 ); 44 45 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 46 auto container = createResultContainer(cast(immutable) pgResult); 47 48 return new immutable Answer(container); 49 } 50 51 /// Submits a command to the server without waiting for the result(s) 52 void sendQuery( string SQLcmd ) 53 { 54 const size_t r = PQsendQuery( conn, toStringz(SQLcmd) ); 55 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 56 } 57 58 /// Submits a command and separate parameters to the server without waiting for the result(s) 59 void sendQueryParams(in ref QueryParams qp) 60 { 61 auto p = InternalQueryParams(&qp); 62 size_t r = PQsendQueryParams ( 63 conn, 64 p.command, 65 p.nParams, 66 p.paramTypes, 67 p.paramValues, 68 p.paramLengths, 69 p.paramFormats, 70 p.resultFormat 71 ); 72 73 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 74 } 75 76 /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s) 77 void sendQueryPrepared(in ref QueryParams qp) 78 { 79 auto p = InternalQueryParams(&qp); 80 size_t r = PQsendQueryPrepared( 81 conn, 82 p.stmtName, 83 p.nParams, 84 p.paramValues, 85 p.paramLengths, 86 p.paramFormats, 87 p.resultFormat 88 ); 89 90 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 91 } 92 93 /// Returns null if no notifies was received 94 Notify getNextNotify() 95 { 96 consumeInput(); 97 auto n = PQnotifies(conn); 98 return n is null ? null : new Notify( n ); 99 } 100 101 /// Submits a request to create a prepared statement with the given parameters, and waits for completion. 102 /// Returns: Result of query preparing 103 immutable(Result) prepare(string statementName, string sqlStatement, in Oid[] oids = null) 104 { 105 PGresult* pgResult = PQprepare( 106 conn, 107 toStringz(statementName), 108 toStringz(sqlStatement), 109 oids.length.to!int, 110 oids.ptr 111 ); 112 113 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 114 auto container = createResultContainer(cast(immutable) pgResult); 115 116 return new immutable Result(container); 117 } 118 119 /// Submits a request to create a prepared statement with the given parameters, and waits for completion. 120 /// 121 /// Throws an exception if preparing failed. 122 void prepareEx(string statementName, string sqlStatement, in Oid[] oids = null) 123 { 124 auto r = prepare(statementName, sqlStatement, oids); 125 126 if(r.status != PGRES_COMMAND_OK) 127 throw new ResponseException(r, __FILE__, __LINE__); 128 } 129 130 /// Submits a request to execute a prepared statement with given parameters, and waits for completion. 131 immutable(Answer) execPrepared(in ref QueryParams qp) 132 { 133 auto p = InternalQueryParams(&qp); 134 auto pgResult = PQexecPrepared( 135 conn, 136 p.stmtName, 137 p.nParams, 138 cast(const(char*)*)p.paramValues, 139 p.paramLengths, 140 p.paramFormats, 141 p.resultFormat 142 ); 143 144 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 145 auto container = createResultContainer(cast(immutable) pgResult); 146 147 return new immutable Answer(container); 148 } 149 150 /// Sends a request to create a prepared statement with the given parameters, without waiting for completion. 151 void sendPrepare(string statementName, string sqlStatement, in Oid[] oids = null) 152 { 153 size_t r = PQsendPrepare( 154 conn, 155 toStringz(statementName), 156 toStringz(sqlStatement), 157 oids.length.to!int, 158 oids.ptr 159 ); 160 161 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 162 } 163 164 /// Submits a request to obtain information about the specified prepared statement, and waits for completion. 165 immutable(Answer) describePrepared(string statementName) 166 { 167 PGresult* pgResult = PQdescribePrepared(conn, toStringz(statementName)); 168 169 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 170 auto container = createResultContainer(cast(immutable) pgResult); 171 172 return new immutable Answer(container); 173 } 174 175 /// Submits a request to obtain information about the specified prepared statement, without waiting for completion. 176 void sendDescribePrepared(string statementName) 177 { 178 size_t r = PQsendDescribePrepared(conn, statementName.toStringz); 179 180 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 181 } 182 183 /// Sends a buffer of CSV data to the COPY command 184 /// 185 /// Returns: true if the data was queued, false if it was not queued because of full buffers (this will only happen in nonblocking mode) 186 bool putCopyData( string data ) 187 { 188 const int r = PQputCopyData(conn, data.toStringz, data.length.to!int); 189 190 if(r == -1) throw new ConnectionException(this); 191 192 return r != 0; 193 } 194 195 /// Signals that COPY data send is finished. Finalize and flush the COPY command. 196 immutable(Answer) putCopyEnd() 197 { 198 assert(!isNonBlocking, "Only for blocking connections"); 199 200 const bool r = sendPutCopyEnd; 201 202 assert(r, "Impossible status for blocking connections"); 203 204 // after the copying is finished, and there is no connection error, we must still get the command result 205 // this will get if there is any errors in the process (invalid data format or constraint violation, etc.) 206 auto pgResult = PQgetResult(conn); 207 208 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 209 auto container = createResultContainer(cast(immutable) pgResult); 210 211 return new immutable Answer(container); 212 } 213 214 /// Signals that COPY data send is finished. 215 /// 216 /// Returns: true if the termination data was sent, zero if it was not sent because the attempt would block (this case is only possible if the connection is in nonblocking mode) 217 bool sendPutCopyEnd() 218 { 219 const char* error; 220 const int r = PQputCopyEnd(conn, error); 221 222 if(error !is null) throw new ConnectionException(error.to!string); 223 224 if(r == -1) throw new ConnectionException(this); 225 226 return r != 0; 227 } 228 229 // Waiting for completion of reading or writing 230 // Returns: timeout is not occured 231 version(integration_tests) 232 bool waitEndOf(WaitType type, Duration timeout = Duration.zero) 233 { 234 import std.socket; 235 236 auto socket = this.socket(); 237 auto set = new SocketSet; 238 set.add(socket); 239 240 while(true) 241 { 242 if(status() == CONNECTION_BAD) 243 throw new ConnectionException(this, __FILE__, __LINE__); 244 245 if(poll() == PGRES_POLLING_OK) 246 { 247 return true; 248 } 249 else 250 { 251 size_t sockNum; 252 253 with(WaitType) 254 final switch(type) 255 { 256 case READ: 257 sockNum = Socket.select(set, null, set, timeout); 258 break; 259 260 case WRITE: 261 sockNum = Socket.select(null, set, set, timeout); 262 break; 263 264 case READ_WRITE: 265 sockNum = Socket.select(set, set, set, timeout); 266 break; 267 } 268 269 enforce(sockNum >= 0); 270 if(sockNum == 0) return false; // timeout is occurred 271 272 continue; 273 } 274 } 275 } 276 } 277 278 version(integration_tests) 279 enum WaitType 280 { 281 READ, 282 WRITE, 283 READ_WRITE 284 } 285 286 version (integration_tests) 287 void _integration_test( string connParam ) @trusted 288 { 289 import dpq2.conv.to_d_types; 290 import dpq2.conv.to_bson; 291 292 auto conn = new Connection(connParam); 293 294 // Text type arguments testing 295 { 296 string sql_query = 297 "select now() as time, 'abc'::text as string, 123, 456.78\n"~ 298 "union all\n"~ 299 "select now(), 'абвгд'::text, 777, 910.11\n"~ 300 "union all\n"~ 301 "select NULL, 'ijk'::text, 789, 12345.115345"; 302 303 auto a = conn.exec( sql_query ); 304 305 assert( a.cmdStatus.length > 2 ); 306 assert( a.columnCount == 4 ); 307 assert( a.length == 3 ); 308 assert( a.columnFormat(1) == ValueFormat.TEXT ); 309 assert( a.columnFormat(2) == ValueFormat.TEXT ); 310 } 311 312 // Binary type arguments testing 313 { 314 import vibe.data.bson: Bson; 315 316 const string sql_query = 317 "select $1::text, $2::integer, $3::text, $4, $5::integer[]"; 318 319 Value[5] args; 320 args[0] = toValue("абвгд"); 321 args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value 322 args[2] = toValue("123"); 323 args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value 324 325 Bson binArray = Bson([ 326 Bson([Bson(null), Bson(123), Bson(456)]), 327 Bson([Bson(0), Bson(789), Bson(null)]) 328 ]); 329 330 args[4] = bsonToValue(binArray); 331 332 QueryParams p; 333 p.sqlCommand = sql_query; 334 p.args = args[]; 335 336 auto a = conn.execParams( p ); 337 338 foreach(i; 0 .. args.length) 339 assert(a.columnFormat(i) == ValueFormat.BINARY); 340 341 assert( a.OID(0) == OidType.Text ); 342 assert( a.OID(1) == OidType.Int4 ); 343 assert( a.OID(2) == OidType.Text ); 344 assert( a.OID(3) == OidType.Int8 ); 345 assert( a.OID(4) == OidType.Int4Array ); 346 347 // binary args array test 348 assert( a[0][4].as!Bson == binArray ); 349 } 350 351 { 352 // Bug #52: empty text argument 353 QueryParams p; 354 Value v = toValue(""); 355 356 p.sqlCommand = "SELECT $1"; 357 p.args = [v]; 358 359 auto a = conn.execParams(p); 360 361 assert( !a[0][0].isNull ); 362 assert( a[0][0].as!string == "" ); 363 } 364 365 // checking prepared statements 366 { 367 // uses PQprepare: 368 conn.prepareEx("prepared statement 1", "SELECT $1::integer"); 369 370 QueryParams p; 371 p.preparedStatementName = "prepared statement 1"; 372 p.args = [42.toValue]; 373 auto r = conn.execPrepared(p); 374 assert (r[0][0].as!int == 42); 375 } 376 { 377 // uses PQsendPrepare: 378 conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer"); 379 380 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 381 conn.consumeInput(); 382 383 immutable(Result)[] res; 384 385 while(true) 386 { 387 auto r = conn.getResult(); 388 if(r is null) break; 389 res ~= r; 390 } 391 392 assert(res.length == 1); 393 assert(res[0].status == PGRES_COMMAND_OK); 394 } 395 { 396 // check prepared arg types and result types 397 auto a = conn.describePrepared("prepared statement 2"); 398 399 assert(a.nParams == 2); 400 assert(a.paramType(0) == OidType.Text); 401 assert(a.paramType(1) == OidType.Int4); 402 } 403 { 404 // async check prepared arg types and result types 405 conn.sendDescribePrepared("prepared statement 2"); 406 407 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 408 conn.consumeInput(); 409 410 immutable(Result)[] res; 411 412 while(true) 413 { 414 auto r = conn.getResult(); 415 if(r is null) break; 416 res ~= r; 417 } 418 419 assert(res.length == 1); 420 assert(res[0].status == PGRES_COMMAND_OK); 421 422 auto a = res[0].getAnswer; 423 424 assert(a.nParams == 2); 425 assert(a.paramType(0) == OidType.Text); 426 assert(a.paramType(1) == OidType.Int4); 427 } 428 { 429 QueryParams p; 430 p.preparedStatementName = "prepared statement 2"; 431 p.argsFromArray = ["abc", "123456"]; 432 433 conn.sendQueryPrepared(p); 434 435 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 436 conn.consumeInput(); 437 438 immutable(Result)[] res; 439 440 while(true) 441 { 442 auto r = conn.getResult(); 443 if(r is null) break; 444 res ~= r; 445 } 446 447 assert(res.length == 1); 448 assert(res[0].getAnswer[0][0].as!PGtext == "abc"); 449 assert(res[0].getAnswer[0][1].as!PGinteger == 123456); 450 } 451 { 452 // test COPY 453 conn.exec("CREATE TEMP TABLE test_copy (text_field TEXT, int_field INT8)"); 454 455 conn.exec("COPY test_copy FROM STDIN WITH (FORMAT csv)"); 456 conn.putCopyData("Val1,1\nval2,2\n"); 457 conn.putCopyData("Val3,3\nval4,4\n"); 458 conn.putCopyEnd(); 459 460 auto res = conn.exec("SELECT count(text_field), sum(int_field) FROM test_copy"); 461 assert(res.length == 1); 462 assert(res[0][0].as!string == "4"); 463 assert(res[0][1].as!string == "10"); 464 465 // This time with error 466 import std.exception: assertThrown; 467 import dpq2.result: ResponseException; 468 469 conn.exec("COPY test_copy FROM STDIN WITH (FORMAT csv)"); 470 conn.putCopyData("Val1,2\nval2,4,POORLY_FORMATTED_CSV\n"); 471 472 assertThrown!ResponseException(conn.putCopyEnd()); 473 } 474 475 import std.socket; 476 conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection 477 478 { 479 bool exceptionFlag = false; 480 481 try conn.exec("SELECT 'abc'::text").getAnswer; 482 catch(ConnectionException e) 483 { 484 exceptionFlag = true; 485 assert(e.msg.length > 15); // error message check 486 } 487 finally 488 assert(exceptionFlag); 489 } 490 }