1 module dpq2.connection; 2 3 @trusted: 4 5 public import derelict.pq.pq; 6 import dpq2.answer: Answer; 7 import std.conv: to; 8 import std.string: toStringz; 9 import std.exception: enforceEx; 10 import std.range; 11 import core.exception; 12 13 /* 14 * Bugs: On Unix connection is not thread safe. 15 * 16 * On Unix, forking a process with open libpq connections can lead 17 * to unpredictable results because the parent and child processes share 18 * the same sockets and operating system resources. For this reason, 19 * such usage is not recommended, though doing an exec from the child 20 * process to load a new executable is safe. 21 22 23 24 int PQisthreadsafe(); 25 Returns 1 if the libpq is thread-safe and 0 if it is not. 26 */ 27 28 /// BaseConnection 29 package class BaseConnection 30 { 31 string connString; /// Database connection parameters 32 33 package PGconn* conn; 34 private 35 { 36 bool readyForQuery; // connection started and not disconnect() was called 37 38 enum ConsumeResult 39 { 40 PQ_CONSUME_ERROR, 41 PQ_CONSUME_OK 42 } 43 } 44 45 @property bool nonBlocking() 46 { 47 return PQisnonblocking(conn) == 1; 48 } 49 50 private void setNonBlocking( bool state ) 51 { 52 if( PQsetnonblocking(conn, state ? 1 : 0 ) == -1 ) 53 throw new ConnException(this, __FILE__, __LINE__); 54 } 55 56 /// Connect to DB 57 void connect() 58 { 59 assert( !readyForQuery ); 60 61 conn = PQconnectdb(toStringz(connString)); 62 63 enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 64 65 if( !nonBlocking && status != CONNECTION_OK ) 66 throw new ConnException(this, __FILE__, __LINE__); 67 68 readyForQuery = true; 69 } 70 71 /// Connect to DB in a nonblocking manner 72 void connectNonblockingStart() 73 { 74 assert( !readyForQuery ); 75 76 conn = PQconnectStart(cast(char*) toStringz(connString)); // TODO: wrong DerelictPQ args 77 78 enforceEx!OutOfMemoryError(conn, "Unable to allocate libpq connection data"); 79 80 if( status == CONNECTION_BAD ) 81 throw new ConnException(this, __FILE__, __LINE__); 82 83 readyForQuery = true; 84 } 85 86 PostgresPollingStatusType poll() 87 { 88 assert( readyForQuery ); 89 90 return PQconnectPoll(conn); 91 } 92 93 ConnStatusType status() 94 { 95 return PQstatus(conn); 96 } 97 98 /// Disconnect from DB 99 void disconnect() 100 { 101 if( readyForQuery ) 102 { 103 readyForQuery = false; 104 PQfinish( conn ); 105 // TODO: remove readyForQuery and just use conn = null as flag 106 } 107 } 108 109 package void consumeInput() 110 { 111 assert( readyForQuery ); 112 113 const size_t r = PQconsumeInput( conn ); 114 if( r != ConsumeResult.PQ_CONSUME_OK ) throw new ConnException(this, __FILE__, __LINE__); 115 } 116 117 package bool flush() 118 { 119 assert( readyForQuery ); 120 121 auto r = PQflush(conn); 122 if( r == -1 ) throw new ConnException(this, __FILE__, __LINE__); 123 return r == 0; 124 } 125 126 package size_t socket() 127 { 128 auto r = PQsocket( conn ); 129 assert( r >= 0 ); 130 return r; 131 } 132 133 package string errorMessage() 134 { 135 return to!(string)(PQerrorMessage(conn)); 136 } 137 138 ~this() 139 { 140 disconnect(); 141 } 142 143 /** 144 * returns the previous notice receiver or processor function pointer, and sets the new value. 145 * If you supply a null function pointer, no action is taken, but the current pointer is returned. 146 */ 147 PQnoticeProcessor setNoticeProcessor(PQnoticeProcessor proc, void* arg) nothrow 148 { 149 assert( readyForQuery ); 150 151 return PQsetNoticeProcessor(conn, proc, arg); 152 } 153 154 /// Get for the next result from a sendQuery. Can return null. 155 package immutable(Answer) getAnswer() 156 { 157 return _getAnswer(PQgetResult(conn)); 158 } 159 160 /// Get Answer from PQexec* functions or throw error if pull is empty 161 package immutable(Answer) getAnswer( PGresult* r ) 162 { 163 auto a = _getAnswer(r); 164 165 if(!a) throw new ConnException(this, __FILE__, __LINE__); 166 167 return a; 168 } 169 170 /// Get Answer from PQexec* functions 171 private immutable(Answer) _getAnswer(in PGresult* r) 172 { 173 if(r) 174 { 175 auto res = new immutable Answer(cast(immutable) r); 176 res.checkAnswerForErrors(); // It is important to do a separate check because of Answer ctor is nothrow 177 return res; 178 } 179 180 return null; 181 } 182 } 183 184 /// Connection exception 185 class ConnException : Dpq2Exception 186 { 187 private BaseConnection conn; 188 189 this(BaseConnection c, string file, size_t line) 190 { 191 conn = c; 192 193 super(conn.errorMessage(), file, line); 194 } 195 } 196 197 class Dpq2Exception : Exception 198 { 199 this(string msg, string file, size_t line) 200 { 201 super(msg, file, line); 202 } 203 } 204 205 void _integration_test( string connParam ) 206 { 207 assert( PQlibVersion() >= 9_0100 ); 208 209 { 210 auto c = new BaseConnection; 211 c.connString = connParam; 212 213 c.connect(); 214 c.disconnect(); 215 216 c.connect(); 217 c.disconnect(); 218 219 destroy(c); 220 } 221 222 { 223 bool exceptionFlag = false; 224 auto c = new BaseConnection; 225 c.connString = "!!!some incorrect connection string!!!"; 226 227 try c.connect(); 228 catch(ConnException e) 229 { 230 exceptionFlag = true; 231 assert(e.msg.length > 40); // error message check 232 } 233 finally 234 assert(exceptionFlag); 235 } 236 }