1 module dpq2.query; 2 3 public import dpq2.args; 4 5 import dpq2; 6 import core.time: Duration; 7 8 mixin template Queries() 9 { 10 /// Perform SQL query to DB 11 immutable (Answer) exec( string SQLcmd ) 12 { 13 auto pgResult = PQexec(conn, toStringz( SQLcmd )); 14 15 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 16 auto container = createResultContainer(cast(immutable) pgResult); 17 18 return new immutable Answer(container); 19 } 20 21 /// Perform SQL query to DB 22 immutable (Answer) execParams(in ref QueryParams qp) 23 { 24 auto p = InternalQueryParams(qp); 25 auto pgResult = PQexecParams ( 26 conn, 27 p.command, 28 p.nParams, 29 cast(uint*) p.paramTypes, //TODO: need report to derelict pq 30 cast(const(ubyte)**) p.paramValues, //TODO: need report to derelict pq 31 cast(int*) p.paramLengths, //TODO: need report to derelict pq 32 cast(int*) p.paramFormats, //TODO: need report to derelict pq 33 p.resultFormat 34 ); 35 36 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 37 auto container = createResultContainer(cast(immutable) pgResult); 38 39 return new immutable Answer(container); 40 } 41 42 /// Submits a command to the server without waiting for the result(s) 43 void sendQuery( string SQLcmd ) 44 { 45 const size_t r = PQsendQuery( conn, toStringz(SQLcmd) ); 46 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 47 } 48 49 /// Submits a command and separate parameters to the server without waiting for the result(s) 50 void sendQueryParams(in ref QueryParams qp) 51 { 52 auto p = InternalQueryParams(qp); 53 size_t r = PQsendQueryParams ( 54 conn, 55 p.command, 56 p.nParams, 57 cast(uint*) p.paramTypes, //TODO: need report to derelict pq 58 cast(const(ubyte)**) p.paramValues, //TODO: need report to derelict pq 59 cast(int*) p.paramLengths, //TODO: need report to derelict pq 60 cast(int*) p.paramFormats, //TODO: need report to derelict pq 61 p.resultFormat 62 ); 63 64 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 65 } 66 67 /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s) 68 void sendQueryPrepared(in ref QueryParams qp) 69 { 70 auto p = InternalQueryParams(qp); 71 size_t r = PQsendQueryPrepared( 72 conn, 73 cast(char*) p.stmtName, //TODO: need report to derelict pq 74 p.nParams, 75 cast(char**) p.paramValues, //TODO: need report to derelict pq 76 cast(int*) p.paramLengths, //TODO: need report to derelict pq 77 cast(int*) p.paramFormats, //TODO: need report to derelict pq 78 p.resultFormat 79 ); 80 81 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 82 } 83 84 /// Returns null if no notifies was received 85 Notify getNextNotify() 86 { 87 consumeInput(); 88 auto n = PQnotifies(conn); 89 return n is null ? null : new Notify( n ); 90 } 91 92 /// Submits a request to create a prepared statement with the given parameters, and waits for completion. 93 immutable(Result) prepare(string statementName, string sqlStatement) 94 { 95 PGresult* pgResult = PQprepare( 96 conn, 97 cast(char*)toStringz(statementName), //TODO: need report to derelict pq 98 cast(char*)toStringz(sqlStatement), //TODO: need report to derelict pq 99 0, 100 null 101 ); 102 103 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 104 auto container = createResultContainer(cast(immutable) pgResult); 105 106 return new immutable Result(container); 107 } 108 109 /// Sends a request to create a prepared statement with the given parameters, without waiting for completion. 110 void sendPrepare(string statementName, string sqlStatement) 111 { 112 size_t r = PQsendPrepare( 113 conn, 114 cast(char*)toStringz(statementName), //TODO: need report to derelict pq 115 cast(char*)toStringz(sqlStatement), //TODO: need report to derelict pq 116 0, 117 null 118 ); 119 120 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 121 } 122 123 immutable(Answer) describePrepared(string statementName) 124 { 125 PGresult* pgResult = PQdescribePrepared(conn, cast(char*)toStringz(statementName)); //TODO: need report to derelict pq 126 127 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 128 auto container = createResultContainer(cast(immutable) pgResult); 129 130 return new immutable Answer(container); 131 } 132 133 /// Waiting for completion of reading or writing 134 /// Return: timeout not occured 135 bool waitEndOf(WaitType type, Duration timeout = Duration.zero) 136 { 137 import std.socket; 138 139 auto socket = socket(); 140 auto set = new SocketSet; 141 set.add(socket); 142 143 while(true) 144 { 145 if(status() == CONNECTION_BAD) 146 throw new ConnectionException(this, __FILE__, __LINE__); 147 148 if(poll() == PGRES_POLLING_OK) 149 { 150 return true; 151 } 152 else 153 { 154 size_t sockNum; 155 156 with(WaitType) 157 final switch(type) 158 { 159 case READ: 160 sockNum = Socket.select(set, null, set, timeout); 161 break; 162 163 case WRITE: 164 sockNum = Socket.select(null, set, set, timeout); 165 break; 166 167 case READ_WRITE: 168 sockNum = Socket.select(set, set, set, timeout); 169 break; 170 } 171 172 enforce(sockNum >= 0); 173 if(sockNum == 0) return false; // timeout is occurred 174 175 continue; 176 } 177 } 178 } 179 } 180 181 enum WaitType 182 { 183 READ, 184 WRITE, 185 READ_WRITE 186 } 187 188 void _integration_test( string connParam ) @trusted 189 { 190 auto conn = new Connection(connParam); 191 192 { 193 string sql_query = 194 "select now() as time, 'abc'::text as string, 123, 456.78\n"~ 195 "union all\n"~ 196 "select now(), 'абвгд'::text, 777, 910.11\n"~ 197 "union all\n"~ 198 "select NULL, 'ijk'::text, 789, 12345.115345"; 199 200 auto a = conn.exec( sql_query ); 201 202 assert( a.cmdStatus.length > 2 ); 203 assert( a.columnCount == 4 ); 204 assert( a.length == 3 ); 205 assert( a.columnFormat(1) == ValueFormat.TEXT ); 206 assert( a.columnFormat(2) == ValueFormat.TEXT ); 207 } 208 209 { 210 import vibe.data.bson: Bson; 211 212 const string sql_query = 213 "select $1::text, $2::integer, $3::text, $4, $5::integer[]"; 214 215 Value[5] args; 216 args[0] = toValue("абвгд"); 217 args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value 218 args[2] = toValue("123"); 219 args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value 220 221 Bson binArray = Bson([ 222 Bson([Bson(null), Bson(123), Bson(456)]), 223 Bson([Bson(0), Bson(789), Bson(null)]) 224 ]); 225 226 args[4] = bsonToValue(binArray); 227 228 QueryParams p; 229 p.sqlCommand = sql_query; 230 p.args = args[]; 231 232 auto a = conn.execParams( p ); 233 234 foreach(i; 0 .. args.length) 235 assert(a.columnFormat(i) == ValueFormat.BINARY); 236 237 assert( a.OID(0) == OidType.Text ); 238 assert( a.OID(1) == OidType.Int4 ); 239 assert( a.OID(2) == OidType.Text ); 240 assert( a.OID(3) == OidType.Int8 ); 241 assert( a.OID(4) == OidType.Int4Array ); 242 243 // binary args array test 244 assert( a[0][4].as!Bson == binArray ); 245 } 246 247 // checking prepared statements 248 { 249 // uses PQprepare: 250 auto s = conn.prepare("prepared statement 1", "SELECT $1::integer"); 251 assert(s.status == PGRES_COMMAND_OK); 252 } 253 { 254 // uses PQsendPrepare: 255 conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer"); 256 257 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 258 conn.consumeInput(); 259 260 immutable(Result)[] res; 261 262 while(true) 263 { 264 auto r = conn.getResult(); 265 if(r is null) break; 266 res ~= r; 267 } 268 269 assert(res.length == 1); 270 assert(res[0].status == PGRES_COMMAND_OK); 271 } 272 { 273 // check prepared arg types and result types 274 auto a = conn.describePrepared("prepared statement 2"); 275 276 assert(a.nParams == 2); 277 assert(a.paramType(0) == OidType.Text); 278 assert(a.paramType(1) == OidType.Int4); 279 } 280 { 281 QueryParams p; 282 p.preparedStatementName = "prepared statement 2"; 283 p.argsFromArray = ["abc", "123456"]; 284 285 conn.sendQueryPrepared(p); 286 287 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 288 conn.consumeInput(); 289 290 immutable(Result)[] res; 291 292 while(true) 293 { 294 auto r = conn.getResult(); 295 if(r is null) break; 296 res ~= r; 297 } 298 299 assert(res.length == 1); 300 assert(res[0].getAnswer[0][0].as!PGtext == "abc"); 301 assert(res[0].getAnswer[0][1].as!PGinteger == 123456); 302 } 303 304 import std.socket; 305 conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection 306 307 { 308 bool exceptionFlag = false; 309 310 try conn.exec("SELECT 'abc'::text").getAnswer; 311 catch(ConnectionException e) 312 { 313 exceptionFlag = true; 314 assert(e.msg.length > 15); // error message check 315 } 316 finally 317 assert(exceptionFlag); 318 } 319 }