1 module dpq2.query; 2 3 @safe: 4 5 public import dpq2.args; 6 7 import dpq2; 8 import core.time: Duration; 9 10 mixin template Queries() 11 { 12 /// Perform SQL query to DB 13 immutable (Answer) exec( string SQLcmd ) 14 { 15 auto pgResult = PQexec(conn, toStringz( SQLcmd )); 16 17 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 18 auto container = createResultContainer(cast(immutable) pgResult); 19 20 return new immutable Answer(container); 21 } 22 23 /// Perform SQL query to DB 24 immutable (Answer) execParams(ref QueryParams p) 25 { 26 auto pgResult = PQexecParams ( 27 conn, 28 p.command, 29 p.nParams, 30 cast(uint*) p.paramTypes, //TODO: need report to derelict pq 31 cast(const(ubyte)**) p.paramValues, //TODO: need report to derelict pq 32 cast(int*) p.paramLengths, //TODO: need report to derelict pq 33 cast(int*) p.paramFormats, //TODO: need report to derelict pq 34 p.resultFormat 35 ); 36 37 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 38 auto container = createResultContainer(cast(immutable) pgResult); 39 40 return new immutable Answer(container); 41 } 42 43 /// Submits a command to the server without waiting for the result(s) 44 void sendQuery( string SQLcmd ) 45 { 46 const size_t r = PQsendQuery( conn, toStringz(SQLcmd) ); 47 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 48 } 49 50 /// Submits a command and separate parameters to the server without waiting for the result(s) 51 void sendQueryParams(ref QueryParams p) 52 { 53 size_t r = PQsendQueryParams ( 54 conn, 55 p.command, 56 p.nParams, 57 cast(uint*) p.paramTypes, //TODO: need report to derelict pq 58 cast(const(ubyte)**) p.paramValues, //TODO: need report to derelict pq 59 cast(int*) p.paramLengths, //TODO: need report to derelict pq 60 cast(int*) p.paramFormats, //TODO: need report to derelict pq 61 p.resultFormat 62 ); 63 64 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 65 } 66 67 /// Sends a request to execute a prepared statement with given parameters, without waiting for the result(s) 68 void sendQueryPrepared(ref QueryParams p) 69 { 70 size_t r = PQsendQueryPrepared( 71 conn, 72 cast(char*) p.stmtName, //TODO: need report to derelict pq 73 p.nParams, 74 cast(char**) p.paramValues, //TODO: need report to derelict pq 75 cast(int*) p.paramLengths, //TODO: need report to derelict pq 76 cast(int*) p.paramFormats, //TODO: need report to derelict pq 77 p.resultFormat 78 ); 79 80 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 81 } 82 83 /// Returns null if no notifies was received 84 Notify getNextNotify() 85 { 86 consumeInput(); 87 auto n = PQnotifies(conn); 88 return n is null ? null : new Notify( n ); 89 } 90 91 /// Submits a request to create a prepared statement with the given parameters, and waits for completion. 92 immutable(Result) prepare(string statementName, string sqlStatement) 93 { 94 PGresult* pgResult = PQprepare( 95 conn, 96 cast(char*)toStringz(statementName), //TODO: need report to derelict pq 97 cast(char*)toStringz(sqlStatement), //TODO: need report to derelict pq 98 0, 99 null 100 ); 101 102 // is guaranteed by libpq that the result will not be changed until it will not be destroyed 103 auto container = createResultContainer(cast(immutable) pgResult); 104 105 return new immutable Result(container); 106 } 107 108 /// Sends a request to create a prepared statement with the given parameters, without waiting for completion. 109 void sendPrepare(string statementName, string sqlStatement) 110 { 111 size_t r = PQsendPrepare( 112 conn, 113 cast(char*)toStringz(statementName), //TODO: need report to derelict pq 114 cast(char*)toStringz(sqlStatement), //TODO: need report to derelict pq 115 0, 116 null 117 ); 118 119 if(r != 1) throw new ConnectionException(this, __FILE__, __LINE__); 120 } 121 122 /// Waiting for completion of reading or writing 123 /// Return: timeout not occured 124 bool waitEndOf(WaitType type, Duration timeout = Duration.zero) 125 { 126 import std.socket; 127 128 auto socket = socket(); 129 auto set = new SocketSet; 130 set.add(socket); 131 132 while(true) 133 { 134 if(status() == CONNECTION_BAD) 135 throw new ConnectionException(this, __FILE__, __LINE__); 136 137 if(poll() == PGRES_POLLING_OK) 138 { 139 return true; 140 } 141 else 142 { 143 size_t sockNum; 144 145 with(WaitType) 146 final switch(type) 147 { 148 case READ: 149 sockNum = Socket.select(set, null, set, timeout); 150 break; 151 152 case WRITE: 153 sockNum = Socket.select(null, set, set, timeout); 154 break; 155 156 case READ_WRITE: 157 sockNum = Socket.select(set, set, set, timeout); 158 break; 159 } 160 161 enforce(sockNum >= 0); 162 if(sockNum == 0) return false; // timeout is occurred 163 164 continue; 165 } 166 } 167 } 168 } 169 170 enum WaitType 171 { 172 READ, 173 WRITE, 174 READ_WRITE 175 } 176 177 void _integration_test( string connParam ) @trusted 178 { 179 auto conn = new Connection(connParam); 180 181 { 182 string sql_query = 183 "select now() as time, 'abc'::text as string, 123, 456.78\n"~ 184 "union all\n"~ 185 "select now(), 'абвгд'::text, 777, 910.11\n"~ 186 "union all\n"~ 187 "select NULL, 'ijk'::text, 789, 12345.115345"; 188 189 auto a = conn.exec( sql_query ); 190 191 assert( a.cmdStatus.length > 2 ); 192 assert( a.columnCount == 4 ); 193 assert( a.length == 3 ); 194 assert( a.columnFormat(1) == ValueFormat.TEXT ); 195 assert( a.columnFormat(2) == ValueFormat.TEXT ); 196 } 197 198 { 199 import vibe.data.bson: Bson; 200 201 const string sql_query = 202 "select $1::text, $2::integer, $3::text, $4, $5::integer[]"; 203 204 Value[5] args; 205 args[0] = toValue("абвгд"); 206 args[1] = Value(ValueFormat.BINARY, OidType.Undefined); // undefined type NULL value 207 args[2] = toValue("123"); 208 args[3] = Value(ValueFormat.BINARY, OidType.Int8); // NULL value 209 210 Bson binArray = Bson([ 211 Bson([Bson(null), Bson(123), Bson(456)]), 212 Bson([Bson(0), Bson(789), Bson(null)]) 213 ]); 214 215 args[4] = bsonToValue(binArray); 216 217 QueryParams p; 218 p.sqlCommand = sql_query; 219 p.args = args[]; 220 221 auto a = conn.execParams( p ); 222 223 foreach(i; 0 .. args.length) 224 assert(a.columnFormat(i) == ValueFormat.BINARY); 225 226 assert( a.OID(0) == OidType.Text ); 227 assert( a.OID(1) == OidType.Int4 ); 228 assert( a.OID(2) == OidType.Text ); 229 assert( a.OID(3) == OidType.Int8 ); 230 assert( a.OID(4) == OidType.Int4Array ); 231 232 // binary args array test 233 assert( a[0][4].toBson == binArray ); 234 } 235 236 // checking prepared statements 237 { 238 // uses PQprepare: 239 auto s = conn.prepare("prepared statement 1", "SELECT $1::integer"); 240 assert(s.status == PGRES_COMMAND_OK); 241 } 242 { 243 // uses PQsendPrepare: 244 conn.sendPrepare("prepared statement 2", "SELECT $1::text, $2::integer"); 245 246 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 247 conn.consumeInput(); 248 249 immutable(Result)[] res; 250 251 while(true) 252 { 253 auto r = conn.getResult(); 254 if(r is null) break; 255 res ~= r; 256 } 257 258 assert(res.length == 1); 259 assert(res[0].status == PGRES_COMMAND_OK); 260 } 261 { 262 QueryParams p; 263 p.preparedStatementName = "prepared statement 2"; 264 p.argsFromArray = ["abc", "123456"]; 265 266 conn.sendQueryPrepared(p); 267 268 conn.waitEndOf(WaitType.READ, dur!"seconds"(5)); 269 conn.consumeInput(); 270 271 immutable(Result)[] res; 272 273 while(true) 274 { 275 auto r = conn.getResult(); 276 if(r is null) break; 277 res ~= r; 278 } 279 280 assert(res.length == 1); 281 assert(res[0].getAnswer[0][0].as!PGtext == "abc"); 282 assert(res[0].getAnswer[0][1].as!PGinteger == 123456); 283 } 284 285 import std.socket; 286 conn.socket.shutdown(SocketShutdown.BOTH); // breaks connection 287 288 { 289 bool exceptionFlag = false; 290 291 try conn.exec("SELECT 'abc'::text").getAnswer; 292 catch(ConnectionException e) 293 { 294 exceptionFlag = true; 295 assert(e.msg.length > 15); // error message check 296 } 297 finally 298 assert(exceptionFlag); 299 } 300 }