1 module dpq2.query; 2 3 public import dpq2.args; 4 5 import dpq2; 6 import core.time: Duration, dur; 7 import std.exception: enforce; 8 9 mixin template Queries() 10 { 11 /// Perform SQL query to DB 12 immutable (Answer) exec( string SQLcmd ) 13 { 14 auto pgResult = PQexec(conn, toStringz( SQLcmd )); 15 16 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 17 auto container = createResultContainer(cast(immutable) pgResult); 18 19 return new immutable Answer(container); 20 } 21 22 /// Perform SQL query to DB 23 immutable (Answer) execParams(in ref QueryParams qp) 24 { 25 auto p = InternalQueryParams(qp); 26 auto pgResult = PQexecParams ( 27 conn, 28 p.command, 29 p.nParams, 30 p.paramTypes, 31 p.paramValues, 32 p.paramLengths, 33 p.paramFormats, 34 p.resultFormat 35 ); 36 37 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 38 auto container = createResultContainer(cast(immutable) pgResult); 39 40 return new immutable Answer(container); 41 } 42 43 /// Submits a command to the server without waiting for the result(s) 44 void sendQuery( string SQLcmd ) 45 { 46 const size_t r = PQsendQuery( conn, toStringz(SQLcmd) ); 47 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 48 } 49 50 /// Submits a command and separate parameters to the server without waiting for the result(s) 51 void sendQueryParams(in ref QueryParams qp) 52 { 53 auto p = InternalQueryParams(qp); 54 size_t r = PQsendQueryParams ( 55 conn, 56 p.command, 57 p.nParams, 58 p.paramTypes, 59 p.paramValues, 60 p.paramLengths, 61 p.paramFormats, 62 p.resultFormat 63 ); 64 65 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 66 } 67 68 /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s) 69 void sendQueryPrepared(in ref QueryParams qp) 70 { 71 auto p = InternalQueryParams(qp); 72 size_t r = PQsendQueryPrepared( 73 conn, 74 p.stmtName, 75 p.nParams, 76 p.paramValues, 77 p.paramLengths, 78 p.paramFormats, 79 p.resultFormat 80 ); 81 82 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 83 } 84 85 /// Returns null if no notifies was received 86 Notify getNextNotify() 87 { 88 consumeInput(); 89 auto n = PQnotifies(conn); 90 return n is null ? null : new Notify( n ); 91 } 92 93 /// Submits a request to create a prepared statement with the given parameters, and waits for completion. 94 immutable(Result) prepare(string statementName, string sqlStatement) 95 { 96 PGresult* pgResult = PQprepare( 97 conn, 98 toStringz(statementName), 99 toStringz(sqlStatement), 100 0, 101 null 102 ); 103 104 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 105 auto container = createResultContainer(cast(immutable) pgResult); 106 107 return new immutable Result(container); 108 } 109 110 /// Sends a request to create a prepared statement with the given parameters, without waiting for completion. 111 void sendPrepare(string statementName, string sqlStatement) 112 { 113 size_t r = PQsendPrepare( 114 conn, 115 toStringz(statementName), 116 toStringz(sqlStatement), 117 0, 118 null 119 ); 120 121 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 122 } 123 124 immutable(Answer) describePrepared(string statementName) 125 { 126 PGresult* pgResult = PQdescribePrepared(conn, toStringz(statementName)); 127 128 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 129 auto container = createResultContainer(cast(immutable) pgResult); 130 131 return new immutable Answer(container); 132 } 133 134 void sendDescribePrepared(string statementName) 135 { 136 size_t r = PQsendDescribePrepared(conn, statementName.toStringz); 137 138 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 139 } 140 141 /// Waiting for completion of reading or writing 142 /// Return: timeout not occured 143 bool waitEndOf(WaitType type, Duration timeout = Duration.zero) 144 { 145 import std.socket; 146 147 auto socket = this.socket(); 148 auto set = new SocketSet; 149 set.add(socket); 150 151 while(true) 152 { 153 if(status() == CONNECTION_BAD) 154 throw new ConnectionException(this, __FILE__, __LINE__); 155 156 if(poll() == PGRES_POLLING_OK) 157 { 158 return true; 159 } 160 else 161 { 162 size_t sockNum; 163 164 with(WaitType) 165 final switch(type) 166 { 167 case READ: 168 sockNum = Socket.select(set, null, set, timeout); 169 break; 170 171 case WRITE: 172 sockNum = Socket.select(null, set, set, timeout); 173 break; 174 175 case READ_WRITE: 176 sockNum = Socket.select(set, set, set, timeout); 177 break; 178 } 179 180 enforce(sockNum >= 0); 181 if(sockNum == 0) return false; // timeout is occurred 182 183 continue; 184 } 185 } 186 } 187 } 188 189 enum WaitType 190 { 191 READ, 192 WRITE, 193 READ_WRITE 194 } 195 196 void _integration_test( string connParam ) @trusted 197 { 198 auto conn = new Connection(connParam); 199 200 { 201 string sql_query = 202 "select now() as time, 'abc'::text as string, 123, 456.78\n"~ 203 "union all\n"~ 204 "select now(), 'абвгд'::text, 777, 910.11\n"~ 205 "union all\n"~ 206 "select NULL, 'ijk'::text, 789, 12345.115345"; 207 208 auto a = conn.exec( sql_query ); 209 210 assert( a.cmdStatus.length > 2 ); 211 assert( a.columnCount == 4 ); 212 assert( a.length == 3 ); 213 assert( a.columnFormat(1) == ValueFormat.TEXT ); 214 assert( a.columnFormat(2) == ValueFormat.TEXT ); 215 } 216 217 { 218 import vibe.data.bson: Bson; 219 220 const string sql_query = 221 "select $1::text, $2::integer, $3::text, $4, $5::integer[]"; 222 223 Value[5] args; 224 args[0] = toValue("абвгд"); 225 args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value 226 args[2] = toValue("123"); 227 args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value 228 229 Bson binArray = Bson([ 230 Bson([Bson(null), Bson(123), Bson(456)]), 231 Bson([Bson(0), Bson(789), Bson(null)]) 232 ]); 233 234 args[4] = bsonToValue(binArray); 235 236 QueryParams p; 237 p.sqlCommand = sql_query; 238 p.args = args[]; 239 240 auto a = conn.execParams( p ); 241 242 foreach(i; 0 .. args.length) 243 assert(a.columnFormat(i) == ValueFormat.BINARY); 244 245 assert( a.OID(0) == OidType.Text ); 246 assert( a.OID(1) == OidType.Int4 ); 247 assert( a.OID(2) == OidType.Text ); 248 assert( a.OID(3) == OidType.Int8 ); 249 assert( a.OID(4) == OidType.Int4Array ); 250 251 // binary args array test 252 assert( a[0][4].as!Bson == binArray ); 253 } 254 255 { 256 // Bug #52: empty text argument 257 QueryParams p; 258 Value v = toValue(""); 259 260 p.sqlCommand = "SELECT $1"; 261 p.args = [v]; 262 263 auto a = conn.execParams(p); 264 265 assert( !a[0][0].isNull ); 266 assert( a[0][0].as!string == "" ); 267 } 268 269 // checking prepared statements 270 { 271 // uses PQprepare: 272 auto s = conn.prepare("prepared statement 1", "SELECT $1::integer"); 273 assert(s.status == PGRES_COMMAND_OK); 274 } 275 { 276 // uses PQsendPrepare: 277 conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer"); 278 279 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 280 conn.consumeInput(); 281 282 immutable(Result)[] res; 283 284 while(true) 285 { 286 auto r = conn.getResult(); 287 if(r is null) break; 288 res ~= r; 289 } 290 291 assert(res.length == 1); 292 assert(res[0].status == PGRES_COMMAND_OK); 293 } 294 { 295 // check prepared arg types and result types 296 auto a = conn.describePrepared("prepared statement 2"); 297 298 assert(a.nParams == 2); 299 assert(a.paramType(0) == OidType.Text); 300 assert(a.paramType(1) == OidType.Int4); 301 } 302 { 303 // async check prepared arg types and result types 304 conn.sendDescribePrepared("prepared statement 2"); 305 306 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 307 conn.consumeInput(); 308 309 immutable(Result)[] res; 310 311 while(true) 312 { 313 auto r = conn.getResult(); 314 if(r is null) break; 315 res ~= r; 316 } 317 318 assert(res.length == 1); 319 assert(res[0].status == PGRES_COMMAND_OK); 320 321 auto a = res[0].getAnswer; 322 323 assert(a.nParams == 2); 324 assert(a.paramType(0) == OidType.Text); 325 assert(a.paramType(1) == OidType.Int4); 326 } 327 { 328 QueryParams p; 329 p.preparedStatementName = "prepared statement 2"; 330 p.argsFromArray = ["abc", "123456"]; 331 332 conn.sendQueryPrepared(p); 333 334 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 335 conn.consumeInput(); 336 337 immutable(Result)[] res; 338 339 while(true) 340 { 341 auto r = conn.getResult(); 342 if(r is null) break; 343 res ~= r; 344 } 345 346 assert(res.length == 1); 347 assert(res[0].getAnswer[0][0].as!PGtext == "abc"); 348 assert(res[0].getAnswer[0][1].as!PGinteger == 123456); 349 } 350 351 import std.socket; 352 conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection 353 354 { 355 bool exceptionFlag = false; 356 357 try conn.exec("SELECT 'abc'::text").getAnswer; 358 catch(ConnectionException e) 359 { 360 exceptionFlag = true; 361 assert(e.msg.length > 15); // error message check 362 } 363 finally 364 assert(exceptionFlag); 365 } 366 }