1 from __future__ import print_function
3 from collections import deque
4 from contextlib import contextmanager
8 from eventlet.pools import Pool
9 from eventlet import timeout
10 from eventlet import hubs
11 from eventlet.hubs.timer import Timer
12 from eventlet.greenthread import GreenThread
18 class ConnectTimeout(Exception):
22 def cleanup_rollback(conn):
26 class BaseConnectionPool(Pool):
27 def __init__(self, db_module,
28 min_size=0, max_size=4,
29 max_idle=10, max_age=30,
31 cleanup=cleanup_rollback,
34 Constructs a pool with at least *min_size* connections and at most
35 *max_size* connections. Uses *db_module* to construct new connections.
37 The *max_idle* parameter determines how long pooled connections can
38 remain idle, in seconds. After *max_idle* seconds have elapsed
39 without the connection being used, the pool closes the connection.
41 *max_age* is how long any particular connection is allowed to live.
42 Connections that have been open for longer than *max_age* seconds are
43 closed, regardless of idle time. If *max_age* is 0, all connections are
44 closed on return to the pool, reducing it to a concurrency limiter.
46 *connect_timeout* is the duration in seconds that the pool will wait
47 before timing out on connect() to the database. If triggered, the
48 timeout will raise a ConnectTimeout from get().
50 The remainder of the arguments are used as parameters to the
51 *db_module*'s connection constructor.
54 self._db_module = db_module
57 self.max_idle = max_idle
58 self.max_age = max_age
59 self.connect_timeout = connect_timeout
60 self._expiration_timer = None
61 self.cleanup = cleanup
62 super(BaseConnectionPool, self).__init__(min_size=min_size,
66 def _schedule_expiration(self):
67 """Sets up a timer that will call _expire_old_connections when the
68 oldest connection currently in the free pool is ready to expire. This
69 is the earliest possible time that a connection could expire, thus, the
70 timer will be running as infrequently as possible without missing a
73 If this function is called when a timer is already scheduled, it does
76 If max_age or max_idle is 0, _schedule_expiration likewise does nothing.
78 if self.max_age is 0 or self.max_idle is 0:
79 # expiration is unnecessary because all connections will be expired
83 if (self._expiration_timer is not None
84 and not getattr(self._expiration_timer, 'called', False)):
85 # the next timer is already scheduled
90 self._expire_old_connections(now)
91 # the last item in the list, because of the stack ordering,
92 # is going to be the most-idle
93 idle_delay = (self.free_items[-1][0] - now) + self.max_idle
94 oldest = min([t[1] for t in self.free_items])
95 age_delay = (oldest - now) + self.max_age
97 next_delay = min(idle_delay, age_delay)
98 except (IndexError, ValueError):
99 # no free items, unschedule ourselves
100 self._expiration_timer = None
104 # set up a continuous self-calling loop
105 self._expiration_timer = Timer(next_delay, GreenThread(hubs.get_hub().greenlet).switch,
106 self._schedule_expiration, [], {})
107 self._expiration_timer.schedule()
109 def _expire_old_connections(self, now):
110 """Iterates through the open connections contained in the pool, closing
111 ones that have remained idle for longer than max_idle seconds, or have
112 been in existence for longer than max_age seconds.
114 *now* is the current time, as returned by time.time().
116 original_count = len(self.free_items)
119 for last_used, created_at, conn in self.free_items
120 if self._is_expired(now, last_used, created_at)]
123 (last_used, created_at, conn)
124 for last_used, created_at, conn in self.free_items
125 if not self._is_expired(now, last_used, created_at)]
126 self.free_items.clear()
127 self.free_items.extend(new_free)
129 # adjust the current size counter to account for expired
131 self.current_size -= original_count - len(self.free_items)
134 self._safe_close(conn, quiet=True)
136 def _is_expired(self, now, last_used, created_at):
137 """Returns true and closes the connection if it's expired.
139 if (self.max_idle <= 0 or self.max_age <= 0
140 or now - last_used > self.max_idle
141 or now - created_at > self.max_age):
145 def _unwrap_connection(self, conn):
146 """If the connection was wrapped by a subclass of
147 BaseConnectionWrapper and is still functional (as determined
148 by the __nonzero__, or __bool__ in python3, method), returns
149 the unwrapped connection. If anything goes wrong with this
150 process, returns None.
159 except AttributeError:
163 def _safe_close(self, conn, quiet=False):
164 """Closes the (already unwrapped) connection, squelching any
169 except AttributeError:
170 pass # conn is None, or junk
173 print("Connection.close raised: %s" % (sys.exc_info()[1]))
176 conn = super(BaseConnectionPool, self).get()
178 # None is a flag value that means that put got called with
179 # something it couldn't use
184 # unconditionally increase the free pool because
185 # even if there are waiters, doing a full put
186 # would incur a greenlib switch and thus lose the
188 self.current_size -= 1
191 # if the call to get() draws from the free pool, it will come
193 if isinstance(conn, tuple):
194 _last_used, created_at, conn = conn
196 created_at = time.time()
198 # wrap the connection so the consumer can call close() safely
199 wrapped = PooledConnectionWrapper(conn, self)
200 # annotating the wrapper so that when it gets put in the pool
201 # again, we'll know how old it is
202 wrapped._db_pool_created_at = created_at
205 def put(self, conn, cleanup=_MISSING):
206 created_at = getattr(conn, '_db_pool_created_at', 0)
208 conn = self._unwrap_connection(conn)
210 if self._is_expired(now, now, created_at):
211 self._safe_close(conn, quiet=False)
213 elif cleanup is not None:
214 if cleanup is _MISSING:
215 cleanup = self.cleanup
216 # by default, call rollback in case the connection is in the middle
217 # of a transaction. However, rollback has performance implications
218 # so optionally do nothing or call something else like ping
222 except Exception as e:
223 # we don't care what the exception was, we just know the
225 print("WARNING: cleanup %s raised: %s" % (cleanup, e))
232 super(BaseConnectionPool, self).put((now, created_at, conn))
234 # wake up any waiters with a flag value that indicates
235 # they need to manufacture a connection
236 if self.waiting() > 0:
237 super(BaseConnectionPool, self).put(None)
239 # no waiters -- just change the size
240 self.current_size -= 1
241 self._schedule_expiration()
244 def item(self, cleanup=_MISSING):
249 self.put(conn, cleanup=cleanup)
252 """Close all connections that this pool still holds a reference to,
253 and removes all references to them.
255 if self._expiration_timer:
256 self._expiration_timer.cancel()
257 free_items, self.free_items = self.free_items, deque()
258 for item in free_items:
259 # Free items created using min_size>0 are not tuples.
260 conn = item[2] if isinstance(item, tuple) else item
261 self._safe_close(conn, quiet=True)
262 self.current_size -= 1
268 class TpooledConnectionPool(BaseConnectionPool):
269 """A pool which gives out :class:`~eventlet.tpool.Proxy`-based database
275 return now, now, self.connect(
276 self._db_module, self.connect_timeout, *self._args, **self._kwargs)
279 def connect(cls, db_module, connect_timeout, *args, **kw):
280 t = timeout.Timeout(connect_timeout, ConnectTimeout())
282 from eventlet import tpool
283 conn = tpool.execute(db_module.connect, *args, **kw)
284 return tpool.Proxy(conn, autowrap_names=('cursor',))
289 class RawConnectionPool(BaseConnectionPool):
290 """A pool which gives out plain database connections.
295 return now, now, self.connect(
296 self._db_module, self.connect_timeout, *self._args, **self._kwargs)
299 def connect(cls, db_module, connect_timeout, *args, **kw):
300 t = timeout.Timeout(connect_timeout, ConnectTimeout())
302 return db_module.connect(*args, **kw)
307 # default connection pool is the tpool one
308 ConnectionPool = TpooledConnectionPool
311 class GenericConnectionWrapper(object):
312 def __init__(self, baseconn):
313 self._base = baseconn
315 # Proxy all method calls to self._base
316 # FIXME: remove repetition; options to consider:
317 # * for name in (...):
318 # setattr(class, name, lambda self, *a, **kw: getattr(self._base, name)(*a, **kw))
319 # * def __getattr__(self, name): if name in (...): return getattr(self._base, name)
322 return self._base.__enter__()
324 def __exit__(self, exc, value, tb):
325 return self._base.__exit__(exc, value, tb)
328 return self._base.__repr__()
330 def affected_rows(self):
331 return self._base.affected_rows()
333 def autocommit(self, *args, **kwargs):
334 return self._base.autocommit(*args, **kwargs)
337 return self._base.begin()
339 def change_user(self, *args, **kwargs):
340 return self._base.change_user(*args, **kwargs)
342 def character_set_name(self, *args, **kwargs):
343 return self._base.character_set_name(*args, **kwargs)
345 def close(self, *args, **kwargs):
346 return self._base.close(*args, **kwargs)
348 def commit(self, *args, **kwargs):
349 return self._base.commit(*args, **kwargs)
351 def cursor(self, *args, **kwargs):
352 return self._base.cursor(*args, **kwargs)
354 def dump_debug_info(self, *args, **kwargs):
355 return self._base.dump_debug_info(*args, **kwargs)
357 def errno(self, *args, **kwargs):
358 return self._base.errno(*args, **kwargs)
360 def error(self, *args, **kwargs):
361 return self._base.error(*args, **kwargs)
363 def errorhandler(self, *args, **kwargs):
364 return self._base.errorhandler(*args, **kwargs)
366 def insert_id(self, *args, **kwargs):
367 return self._base.insert_id(*args, **kwargs)
369 def literal(self, *args, **kwargs):
370 return self._base.literal(*args, **kwargs)
372 def set_character_set(self, *args, **kwargs):
373 return self._base.set_character_set(*args, **kwargs)
375 def set_sql_mode(self, *args, **kwargs):
376 return self._base.set_sql_mode(*args, **kwargs)
378 def show_warnings(self):
379 return self._base.show_warnings()
381 def warning_count(self):
382 return self._base.warning_count()
384 def ping(self, *args, **kwargs):
385 return self._base.ping(*args, **kwargs)
387 def query(self, *args, **kwargs):
388 return self._base.query(*args, **kwargs)
390 def rollback(self, *args, **kwargs):
391 return self._base.rollback(*args, **kwargs)
393 def select_db(self, *args, **kwargs):
394 return self._base.select_db(*args, **kwargs)
396 def set_server_option(self, *args, **kwargs):
397 return self._base.set_server_option(*args, **kwargs)
399 def server_capabilities(self, *args, **kwargs):
400 return self._base.server_capabilities(*args, **kwargs)
402 def shutdown(self, *args, **kwargs):
403 return self._base.shutdown(*args, **kwargs)
405 def sqlstate(self, *args, **kwargs):
406 return self._base.sqlstate(*args, **kwargs)
408 def stat(self, *args, **kwargs):
409 return self._base.stat(*args, **kwargs)
411 def store_result(self, *args, **kwargs):
412 return self._base.store_result(*args, **kwargs)
414 def string_literal(self, *args, **kwargs):
415 return self._base.string_literal(*args, **kwargs)
417 def thread_id(self, *args, **kwargs):
418 return self._base.thread_id(*args, **kwargs)
420 def use_result(self, *args, **kwargs):
421 return self._base.use_result(*args, **kwargs)
424 class PooledConnectionWrapper(GenericConnectionWrapper):
425 """A connection wrapper where:
426 - the close method returns the connection to the pool instead of closing it directly
427 - ``bool(conn)`` returns a reasonable value
428 - returns itself to the pool if it gets garbage collected
431 def __init__(self, baseconn, pool):
432 super(PooledConnectionWrapper, self).__init__(baseconn)
435 def __nonzero__(self):
436 return (hasattr(self, '_base') and bool(self._base))
438 __bool__ = __nonzero__
444 except AttributeError:
448 """Return the connection to the pool, and remove the
449 reference to it so that you can't use it again through this
452 if self and self._pool:
457 return # this causes some issues if __del__ is called in the
458 # main coroutine, so for now this is disabled
462 class DatabaseConnector(object):
464 This is an object which will maintain a collection of database
465 connection pools on a per-host basis.
468 def __init__(self, module, credentials,
469 conn_pool=None, *args, **kwargs):
472 Database module to use.
474 Mapping of hostname to connect arguments (e.g. username and password)
477 self._conn_pool_class = conn_pool
478 if self._conn_pool_class is None:
479 self._conn_pool_class = ConnectionPool
480 self._module = module
482 self._kwargs = kwargs
483 # this is a map of hostname to username/password
484 self._credentials = credentials
487 def credentials_for(self, host):
488 if host in self._credentials:
489 return self._credentials[host]
491 return self._credentials.get('default', None)
493 def get(self, host, dbname):
494 """Returns a ConnectionPool to the target host and schema.
497 if key not in self._databases:
498 new_kwargs = self._kwargs.copy()
499 new_kwargs['db'] = dbname
500 new_kwargs['host'] = host
501 new_kwargs.update(self.credentials_for(host))
502 dbpool = self._conn_pool_class(
503 self._module, *self._args, **new_kwargs)
504 self._databases[key] = dbpool
506 return self._databases[key]