1 module dpq2.query; 2 3 public import dpq2.args; 4 5 import dpq2; 6 import core.time: Duration, dur; 7 import std.exception: enforce; 8 9 mixin template Queries() 10 { 11 /// Perform SQL query to DB 12 immutable (Answer) exec( string SQLcmd ) 13 { 14 auto pgResult = PQexec(conn, toStringz( SQLcmd )); 15 16 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 17 auto container = createResultContainer(cast(immutable) pgResult); 18 19 return new immutable Answer(container); 20 } 21 22 /// Perform SQL query to DB 23 immutable (Answer) execParams(in ref QueryParams qp) 24 { 25 auto p = InternalQueryParams(qp); 26 auto pgResult = PQexecParams ( 27 conn, 28 p.command, 29 p.nParams, 30 p.paramTypes, 31 p.paramValues, 32 p.paramLengths, 33 p.paramFormats, 34 p.resultFormat 35 ); 36 37 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 38 auto container = createResultContainer(cast(immutable) pgResult); 39 40 return new immutable Answer(container); 41 } 42 43 /// Submits a command to the server without waiting for the result(s) 44 void sendQuery( string SQLcmd ) 45 { 46 const size_t r = PQsendQuery( conn, toStringz(SQLcmd) ); 47 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 48 } 49 50 /// Submits a command and separate parameters to the server without waiting for the result(s) 51 void sendQueryParams(in ref QueryParams qp) 52 { 53 auto p = InternalQueryParams(qp); 54 size_t r = PQsendQueryParams ( 55 conn, 56 p.command, 57 p.nParams, 58 p.paramTypes, 59 p.paramValues, 60 p.paramLengths, 61 p.paramFormats, 62 p.resultFormat 63 ); 64 65 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 66 } 67 68 /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s) 69 void sendQueryPrepared(in ref QueryParams qp) 70 { 71 auto p = InternalQueryParams(qp); 72 size_t r = PQsendQueryPrepared( 73 conn, 74 p.stmtName, 75 p.nParams, 76 p.paramValues, 77 p.paramLengths, 78 p.paramFormats, 79 p.resultFormat 80 ); 81 82 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 83 } 84 85 /// Returns null if no notifies was received 86 Notify getNextNotify() 87 { 88 consumeInput(); 89 auto n = PQnotifies(conn); 90 return n is null ? null : new Notify( n ); 91 } 92 93 /// Submits a request to create a prepared statement with the given parameters, and waits for completion. 94 immutable(Result) prepare(string statementName, string sqlStatement) 95 { 96 PGresult* pgResult = PQprepare( 97 conn, 98 toStringz(statementName), 99 toStringz(sqlStatement), 100 0, 101 null 102 ); 103 104 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 105 auto container = createResultContainer(cast(immutable) pgResult); 106 107 return new immutable Result(container); 108 } 109 110 /// Sends a request to create a prepared statement with the given parameters, without waiting for completion. 111 void sendPrepare(string statementName, string sqlStatement) 112 { 113 size_t r = PQsendPrepare( 114 conn, 115 toStringz(statementName), 116 toStringz(sqlStatement), 117 0, 118 null 119 ); 120 121 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 122 } 123 124 immutable(Answer) describePrepared(string statementName) 125 { 126 PGresult* pgResult = PQdescribePrepared(conn, toStringz(statementName)); 127 128 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 129 auto container = createResultContainer(cast(immutable) pgResult); 130 131 return new immutable Answer(container); 132 } 133 134 void sendDescribePrepared(string statementName) 135 { 136 size_t r = PQsendDescribePrepared(conn, statementName.toStringz); 137 138 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 139 } 140 141 /// Waiting for completion of reading or writing 142 /// Return: timeout not occured 143 bool waitEndOf(WaitType type, Duration timeout = Duration.zero) 144 { 145 import std.socket; 146 147 auto socket = this.socket(); 148 auto set = new SocketSet; 149 set.add(socket); 150 151 while(true) 152 { 153 if(status() == CONNECTION_BAD) 154 throw new ConnectionException(this, __FILE__, __LINE__); 155 156 if(poll() == PGRES_POLLING_OK) 157 { 158 return true; 159 } 160 else 161 { 162 size_t sockNum; 163 164 with(WaitType) 165 final switch(type) 166 { 167 case READ: 168 sockNum = Socket.select(set, null, set, timeout); 169 break; 170 171 case WRITE: 172 sockNum = Socket.select(null, set, set, timeout); 173 break; 174 175 case READ_WRITE: 176 sockNum = Socket.select(set, set, set, timeout); 177 break; 178 } 179 180 enforce(sockNum >= 0); 181 if(sockNum == 0) return false; // timeout is occurred 182 183 continue; 184 } 185 } 186 } 187 } 188 189 enum WaitType 190 { 191 READ, 192 WRITE, 193 READ_WRITE 194 } 195 196 void _integration_test( string connParam ) @trusted 197 { 198 auto conn = new Connection(connParam); 199 200 { 201 string sql_query = 202 "select now() as time, 'abc'::text as string, 123, 456.78\n"~ 203 "union all\n"~ 204 "select now(), 'абвгд'::text, 777, 910.11\n"~ 205 "union all\n"~ 206 "select NULL, 'ijk'::text, 789, 12345.115345"; 207 208 auto a = conn.exec( sql_query ); 209 210 assert( a.cmdStatus.length > 2 ); 211 assert( a.columnCount == 4 ); 212 assert( a.length == 3 ); 213 assert( a.columnFormat(1) == ValueFormat.TEXT ); 214 assert( a.columnFormat(2) == ValueFormat.TEXT ); 215 } 216 217 { 218 import vibe.data.bson: Bson; 219 220 const string sql_query = 221 "select $1::text, $2::integer, $3::text, $4, $5::integer[]"; 222 223 Value[5] args; 224 args[0] = toValue("абвгд"); 225 args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value 226 args[2] = toValue("123"); 227 args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value 228 229 Bson binArray = Bson([ 230 Bson([Bson(null), Bson(123), Bson(456)]), 231 Bson([Bson(0), Bson(789), Bson(null)]) 232 ]); 233 234 args[4] = bsonToValue(binArray); 235 236 QueryParams p; 237 p.sqlCommand = sql_query; 238 p.args = args[]; 239 240 auto a = conn.execParams( p ); 241 242 foreach(i; 0 .. args.length) 243 assert(a.columnFormat(i) == ValueFormat.BINARY); 244 245 assert( a.OID(0) == OidType.Text ); 246 assert( a.OID(1) == OidType.Int4 ); 247 assert( a.OID(2) == OidType.Text ); 248 assert( a.OID(3) == OidType.Int8 ); 249 assert( a.OID(4) == OidType.Int4Array ); 250 251 // binary args array test 252 assert( a[0][4].as!Bson == binArray ); 253 } 254 255 // checking prepared statements 256 { 257 // uses PQprepare: 258 auto s = conn.prepare("prepared statement 1", "SELECT $1::integer"); 259 assert(s.status == PGRES_COMMAND_OK); 260 } 261 { 262 // uses PQsendPrepare: 263 conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer"); 264 265 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 266 conn.consumeInput(); 267 268 immutable(Result)[] res; 269 270 while(true) 271 { 272 auto r = conn.getResult(); 273 if(r is null) break; 274 res ~= r; 275 } 276 277 assert(res.length == 1); 278 assert(res[0].status == PGRES_COMMAND_OK); 279 } 280 { 281 // check prepared arg types and result types 282 auto a = conn.describePrepared("prepared statement 2"); 283 284 assert(a.nParams == 2); 285 assert(a.paramType(0) == OidType.Text); 286 assert(a.paramType(1) == OidType.Int4); 287 } 288 { 289 // async check prepared arg types and result types 290 conn.sendDescribePrepared("prepared statement 2"); 291 292 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 293 conn.consumeInput(); 294 295 immutable(Result)[] res; 296 297 while(true) 298 { 299 auto r = conn.getResult(); 300 if(r is null) break; 301 res ~= r; 302 } 303 304 assert(res.length == 1); 305 assert(res[0].status == PGRES_COMMAND_OK); 306 307 auto a = res[0].getAnswer; 308 309 assert(a.nParams == 2); 310 assert(a.paramType(0) == OidType.Text); 311 assert(a.paramType(1) == OidType.Int4); 312 } 313 { 314 QueryParams p; 315 p.preparedStatementName = "prepared statement 2"; 316 p.argsFromArray = ["abc", "123456"]; 317 318 conn.sendQueryPrepared(p); 319 320 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 321 conn.consumeInput(); 322 323 immutable(Result)[] res; 324 325 while(true) 326 { 327 auto r = conn.getResult(); 328 if(r is null) break; 329 res ~= r; 330 } 331 332 assert(res.length == 1); 333 assert(res[0].getAnswer[0][0].as!PGtext == "abc"); 334 assert(res[0].getAnswer[0][1].as!PGinteger == 123456); 335 } 336 337 import std.socket; 338 conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection 339 340 { 341 bool exceptionFlag = false; 342 343 try conn.exec("SELECT 'abc'::text").getAnswer; 344 catch(ConnectionException e) 345 { 346 exceptionFlag = true; 347 assert(e.msg.length > 15); // error message check 348 } 349 finally 350 assert(exceptionFlag); 351 } 352 }