1 module dpq2.query; 2 3 public import dpq2.args; 4 5 import dpq2; 6 import core.time: Duration, dur; 7 import std.exception: enforce; 8 9 mixin template Queries() 10 { 11 /// Perform SQL query to DB 12 immutable (Answer) exec( string SQLcmd ) 13 { 14 auto pgResult = PQexec(conn, toStringz( SQLcmd )); 15 16 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 17 auto container = createResultContainer(cast(immutable) pgResult); 18 19 return new immutable Answer(container); 20 } 21 22 /// Perform SQL query to DB 23 immutable (Answer) execParams(in ref QueryParams qp) 24 { 25 auto p = InternalQueryParams(&qp); 26 auto pgResult = PQexecParams ( 27 conn, 28 p.command, 29 p.nParams, 30 p.paramTypes, 31 p.paramValues, 32 p.paramLengths, 33 p.paramFormats, 34 p.resultFormat 35 ); 36 37 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 38 auto container = createResultContainer(cast(immutable) pgResult); 39 40 return new immutable Answer(container); 41 } 42 43 /// Submits a command to the server without waiting for the result(s) 44 void sendQuery( string SQLcmd ) 45 { 46 const size_t r = PQsendQuery( conn, toStringz(SQLcmd) ); 47 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 48 } 49 50 /// Submits a command and separate parameters to the server without waiting for the result(s) 51 void sendQueryParams(in ref QueryParams qp) 52 { 53 auto p = InternalQueryParams(&qp); 54 size_t r = PQsendQueryParams ( 55 conn, 56 p.command, 57 p.nParams, 58 p.paramTypes, 59 p.paramValues, 60 p.paramLengths, 61 p.paramFormats, 62 p.resultFormat 63 ); 64 65 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 66 } 67 68 /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s) 69 void sendQueryPrepared(in ref QueryParams qp) 70 { 71 auto p = InternalQueryParams(&qp); 72 size_t r = PQsendQueryPrepared( 73 conn, 74 p.stmtName, 75 p.nParams, 76 p.paramValues, 77 p.paramLengths, 78 p.paramFormats, 79 p.resultFormat 80 ); 81 82 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 83 } 84 85 /// Returns null if no notifies was received 86 Notify getNextNotify() 87 { 88 consumeInput(); 89 auto n = PQnotifies(conn); 90 return n is null ? null : new Notify( n ); 91 } 92 93 /// Submits a request to create a prepared statement with the given parameters, and waits for completion. 94 immutable(Result) prepare(string statementName, string sqlStatement, in Oid[] oids = null) 95 { 96 PGresult* pgResult = PQprepare( 97 conn, 98 toStringz(statementName), 99 toStringz(sqlStatement), 100 oids.length.to!int, 101 oids.ptr 102 ); 103 104 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 105 auto container = createResultContainer(cast(immutable) pgResult); 106 107 return new immutable Result(container); 108 } 109 110 /// Submits a request to execute a prepared statement with given parameters, and waits for completion. 111 immutable(Answer) execPrepared(in ref QueryParams qp) 112 { 113 auto p = InternalQueryParams(&qp); 114 auto pgResult = PQexecPrepared( 115 conn, 116 p.stmtName, 117 p.nParams, 118 cast(const(char*)*)p.paramValues, 119 p.paramLengths, 120 p.paramFormats, 121 p.resultFormat 122 ); 123 124 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 125 auto container = createResultContainer(cast(immutable) pgResult); 126 127 return new immutable Answer(container); 128 } 129 130 /// Sends a request to create a prepared statement with the given parameters, without waiting for completion. 131 void sendPrepare(string statementName, string sqlStatement, in Oid[] oids = null) 132 { 133 size_t r = PQsendPrepare( 134 conn, 135 toStringz(statementName), 136 toStringz(sqlStatement), 137 oids.length.to!int, 138 cast(Oid*)oids.ptr //const should be accepted here, see https://github.com/DerelictOrg/DerelictPQ/issues/21 139 ); 140 141 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 142 } 143 144 immutable(Answer) describePrepared(string statementName) 145 { 146 PGresult* pgResult = PQdescribePrepared(conn, toStringz(statementName)); 147 148 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 149 auto container = createResultContainer(cast(immutable) pgResult); 150 151 return new immutable Answer(container); 152 } 153 154 void sendDescribePrepared(string statementName) 155 { 156 size_t r = PQsendDescribePrepared(conn, statementName.toStringz); 157 158 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 159 } 160 161 /// Waiting for completion of reading or writing 162 /// Return: timeout not occured 163 bool waitEndOf(WaitType type, Duration timeout = Duration.zero) 164 { 165 import std.socket; 166 167 auto socket = this.socket(); 168 auto set = new SocketSet; 169 set.add(socket); 170 171 while(true) 172 { 173 if(status() == CONNECTION_BAD) 174 throw new ConnectionException(this, __FILE__, __LINE__); 175 176 if(poll() == PGRES_POLLING_OK) 177 { 178 return true; 179 } 180 else 181 { 182 size_t sockNum; 183 184 with(WaitType) 185 final switch(type) 186 { 187 case READ: 188 sockNum = Socket.select(set, null, set, timeout); 189 break; 190 191 case WRITE: 192 sockNum = Socket.select(null, set, set, timeout); 193 break; 194 195 case READ_WRITE: 196 sockNum = Socket.select(set, set, set, timeout); 197 break; 198 } 199 200 enforce(sockNum >= 0); 201 if(sockNum == 0) return false; // timeout is occurred 202 203 continue; 204 } 205 } 206 } 207 } 208 209 enum WaitType 210 { 211 READ, 212 WRITE, 213 READ_WRITE 214 } 215 216 void _integration_test( string connParam ) @trusted 217 { 218 auto conn = new Connection(connParam); 219 220 // Text type arguments testing 221 { 222 string sql_query = 223 "select now() as time, 'abc'::text as string, 123, 456.78\n"~ 224 "union all\n"~ 225 "select now(), 'абвгд'::text, 777, 910.11\n"~ 226 "union all\n"~ 227 "select NULL, 'ijk'::text, 789, 12345.115345"; 228 229 auto a = conn.exec( sql_query ); 230 231 assert( a.cmdStatus.length > 2 ); 232 assert( a.columnCount == 4 ); 233 assert( a.length == 3 ); 234 assert( a.columnFormat(1) == ValueFormat.TEXT ); 235 assert( a.columnFormat(2) == ValueFormat.TEXT ); 236 } 237 238 // Binary type arguments testing 239 { 240 import vibe.data.bson: Bson; 241 242 const string sql_query = 243 "select $1::text, $2::integer, $3::text, $4, $5::integer[]"; 244 245 Value[5] args; 246 args[0] = toValue("абвгд"); 247 args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value 248 args[2] = toValue("123"); 249 args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value 250 251 Bson binArray = Bson([ 252 Bson([Bson(null), Bson(123), Bson(456)]), 253 Bson([Bson(0), Bson(789), Bson(null)]) 254 ]); 255 256 args[4] = bsonToValue(binArray); 257 258 QueryParams p; 259 p.sqlCommand = sql_query; 260 p.args = args[]; 261 262 auto a = conn.execParams( p ); 263 264 foreach(i; 0 .. args.length) 265 assert(a.columnFormat(i) == ValueFormat.BINARY); 266 267 assert( a.OID(0) == OidType.Text ); 268 assert( a.OID(1) == OidType.Int4 ); 269 assert( a.OID(2) == OidType.Text ); 270 assert( a.OID(3) == OidType.Int8 ); 271 assert( a.OID(4) == OidType.Int4Array ); 272 273 // binary args array test 274 assert( a[0][4].as!Bson == binArray ); 275 } 276 277 { 278 // Bug #52: empty text argument 279 QueryParams p; 280 Value v = toValue(""); 281 282 p.sqlCommand = "SELECT $1"; 283 p.args = [v]; 284 285 auto a = conn.execParams(p); 286 287 assert( !a[0][0].isNull ); 288 assert( a[0][0].as!string == "" ); 289 } 290 291 // checking prepared statements 292 { 293 // uses PQprepare: 294 auto s = conn.prepare("prepared statement 1", "SELECT $1::integer"); 295 assert(s.status == PGRES_COMMAND_OK); 296 297 QueryParams p; 298 p.preparedStatementName = "prepared statement 1"; 299 p.args = [42.toValue]; 300 auto r = conn.execPrepared(p); 301 assert (r[0][0].as!int == 42); 302 } 303 { 304 // uses PQsendPrepare: 305 conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer"); 306 307 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 308 conn.consumeInput(); 309 310 immutable(Result)[] res; 311 312 while(true) 313 { 314 auto r = conn.getResult(); 315 if(r is null) break; 316 res ~= r; 317 } 318 319 assert(res.length == 1); 320 assert(res[0].status == PGRES_COMMAND_OK); 321 } 322 { 323 // check prepared arg types and result types 324 auto a = conn.describePrepared("prepared statement 2"); 325 326 assert(a.nParams == 2); 327 assert(a.paramType(0) == OidType.Text); 328 assert(a.paramType(1) == OidType.Int4); 329 } 330 { 331 // async check prepared arg types and result types 332 conn.sendDescribePrepared("prepared statement 2"); 333 334 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 335 conn.consumeInput(); 336 337 immutable(Result)[] res; 338 339 while(true) 340 { 341 auto r = conn.getResult(); 342 if(r is null) break; 343 res ~= r; 344 } 345 346 assert(res.length == 1); 347 assert(res[0].status == PGRES_COMMAND_OK); 348 349 auto a = res[0].getAnswer; 350 351 assert(a.nParams == 2); 352 assert(a.paramType(0) == OidType.Text); 353 assert(a.paramType(1) == OidType.Int4); 354 } 355 { 356 QueryParams p; 357 p.preparedStatementName = "prepared statement 2"; 358 p.argsFromArray = ["abc", "123456"]; 359 360 conn.sendQueryPrepared(p); 361 362 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 363 conn.consumeInput(); 364 365 immutable(Result)[] res; 366 367 while(true) 368 { 369 auto r = conn.getResult(); 370 if(r is null) break; 371 res ~= r; 372 } 373 374 assert(res.length == 1); 375 assert(res[0].getAnswer[0][0].as!PGtext == "abc"); 376 assert(res[0].getAnswer[0][1].as!PGinteger == 123456); 377 } 378 379 import std.socket; 380 conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection 381 382 { 383 bool exceptionFlag = false; 384 385 try conn.exec("SELECT 'abc'::text").getAnswer; 386 catch(ConnectionException e) 387 { 388 exceptionFlag = true; 389 assert(e.msg.length > 15); // error message check 390 } 391 finally 392 assert(exceptionFlag); 393 } 394 }