Add python-eventlet 0.16.1
[packages/trusty/python-eventlet.git] / eventlet / eventlet / db_pool.py
1 from __future__ import print_function
2
3 from collections import deque
4 from contextlib import contextmanager
5 import sys
6 import time
7
8 from eventlet.pools import Pool
9 from eventlet import timeout
10 from eventlet import hubs
11 from eventlet.hubs.timer import Timer
12 from eventlet.greenthread import GreenThread
13
14
15 _MISSING = object()
16
17
18 class ConnectTimeout(Exception):
19     pass
20
21
22 def cleanup_rollback(conn):
23     conn.rollback()
24
25
26 class BaseConnectionPool(Pool):
27     def __init__(self, db_module,
28                  min_size=0, max_size=4,
29                  max_idle=10, max_age=30,
30                  connect_timeout=5,
31                  cleanup=cleanup_rollback,
32                  *args, **kwargs):
33         """
34         Constructs a pool with at least *min_size* connections and at most
35         *max_size* connections.  Uses *db_module* to construct new connections.
36
37         The *max_idle* parameter determines how long pooled connections can
38         remain idle, in seconds.  After *max_idle* seconds have elapsed
39         without the connection being used, the pool closes the connection.
40
41         *max_age* is how long any particular connection is allowed to live.
42         Connections that have been open for longer than *max_age* seconds are
43         closed, regardless of idle time.  If *max_age* is 0, all connections are
44         closed on return to the pool, reducing it to a concurrency limiter.
45
46         *connect_timeout* is the duration in seconds that the pool will wait
47         before timing out on connect() to the database.  If triggered, the
48         timeout will raise a ConnectTimeout from get().
49
50         The remainder of the arguments are used as parameters to the
51         *db_module*'s connection constructor.
52         """
53         assert(db_module)
54         self._db_module = db_module
55         self._args = args
56         self._kwargs = kwargs
57         self.max_idle = max_idle
58         self.max_age = max_age
59         self.connect_timeout = connect_timeout
60         self._expiration_timer = None
61         self.cleanup = cleanup
62         super(BaseConnectionPool, self).__init__(min_size=min_size,
63                                                  max_size=max_size,
64                                                  order_as_stack=True)
65
66     def _schedule_expiration(self):
67         """Sets up a timer that will call _expire_old_connections when the
68         oldest connection currently in the free pool is ready to expire.  This
69         is the earliest possible time that a connection could expire, thus, the
70         timer will be running as infrequently as possible without missing a
71         possible expiration.
72
73         If this function is called when a timer is already scheduled, it does
74         nothing.
75
76         If max_age or max_idle is 0, _schedule_expiration likewise does nothing.
77         """
78         if self.max_age is 0 or self.max_idle is 0:
79             # expiration is unnecessary because all connections will be expired
80             # on put
81             return
82
83         if (self._expiration_timer is not None
84                 and not getattr(self._expiration_timer, 'called', False)):
85             # the next timer is already scheduled
86             return
87
88         try:
89             now = time.time()
90             self._expire_old_connections(now)
91             # the last item in the list, because of the stack ordering,
92             # is going to be the most-idle
93             idle_delay = (self.free_items[-1][0] - now) + self.max_idle
94             oldest = min([t[1] for t in self.free_items])
95             age_delay = (oldest - now) + self.max_age
96
97             next_delay = min(idle_delay, age_delay)
98         except (IndexError, ValueError):
99             # no free items, unschedule ourselves
100             self._expiration_timer = None
101             return
102
103         if next_delay > 0:
104             # set up a continuous self-calling loop
105             self._expiration_timer = Timer(next_delay, GreenThread(hubs.get_hub().greenlet).switch,
106                                            self._schedule_expiration, [], {})
107             self._expiration_timer.schedule()
108
109     def _expire_old_connections(self, now):
110         """Iterates through the open connections contained in the pool, closing
111         ones that have remained idle for longer than max_idle seconds, or have
112         been in existence for longer than max_age seconds.
113
114         *now* is the current time, as returned by time.time().
115         """
116         original_count = len(self.free_items)
117         expired = [
118             conn
119             for last_used, created_at, conn in self.free_items
120             if self._is_expired(now, last_used, created_at)]
121
122         new_free = [
123             (last_used, created_at, conn)
124             for last_used, created_at, conn in self.free_items
125             if not self._is_expired(now, last_used, created_at)]
126         self.free_items.clear()
127         self.free_items.extend(new_free)
128
129         # adjust the current size counter to account for expired
130         # connections
131         self.current_size -= original_count - len(self.free_items)
132
133         for conn in expired:
134             self._safe_close(conn, quiet=True)
135
136     def _is_expired(self, now, last_used, created_at):
137         """Returns true and closes the connection if it's expired.
138         """
139         if (self.max_idle <= 0 or self.max_age <= 0
140                 or now - last_used > self.max_idle
141                 or now - created_at > self.max_age):
142             return True
143         return False
144
145     def _unwrap_connection(self, conn):
146         """If the connection was wrapped by a subclass of
147         BaseConnectionWrapper and is still functional (as determined
148         by the __nonzero__, or __bool__ in python3, method), returns
149         the unwrapped connection.  If anything goes wrong with this
150         process, returns None.
151         """
152         base = None
153         try:
154             if conn:
155                 base = conn._base
156                 conn._destroy()
157             else:
158                 base = None
159         except AttributeError:
160             pass
161         return base
162
163     def _safe_close(self, conn, quiet=False):
164         """Closes the (already unwrapped) connection, squelching any
165         exceptions.
166         """
167         try:
168             conn.close()
169         except AttributeError:
170             pass  # conn is None, or junk
171         except Exception:
172             if not quiet:
173                 print("Connection.close raised: %s" % (sys.exc_info()[1]))
174
175     def get(self):
176         conn = super(BaseConnectionPool, self).get()
177
178         # None is a flag value that means that put got called with
179         # something it couldn't use
180         if conn is None:
181             try:
182                 conn = self.create()
183             except Exception:
184                 # unconditionally increase the free pool because
185                 # even if there are waiters, doing a full put
186                 # would incur a greenlib switch and thus lose the
187                 # exception stack
188                 self.current_size -= 1
189                 raise
190
191         # if the call to get() draws from the free pool, it will come
192         # back as a tuple
193         if isinstance(conn, tuple):
194             _last_used, created_at, conn = conn
195         else:
196             created_at = time.time()
197
198         # wrap the connection so the consumer can call close() safely
199         wrapped = PooledConnectionWrapper(conn, self)
200         # annotating the wrapper so that when it gets put in the pool
201         # again, we'll know how old it is
202         wrapped._db_pool_created_at = created_at
203         return wrapped
204
205     def put(self, conn, cleanup=_MISSING):
206         created_at = getattr(conn, '_db_pool_created_at', 0)
207         now = time.time()
208         conn = self._unwrap_connection(conn)
209
210         if self._is_expired(now, now, created_at):
211             self._safe_close(conn, quiet=False)
212             conn = None
213         elif cleanup is not None:
214             if cleanup is _MISSING:
215                 cleanup = self.cleanup
216             # by default, call rollback in case the connection is in the middle
217             # of a transaction. However, rollback has performance implications
218             # so optionally do nothing or call something else like ping
219             try:
220                 if conn:
221                     cleanup(conn)
222             except Exception as e:
223                 # we don't care what the exception was, we just know the
224                 # connection is dead
225                 print("WARNING: cleanup %s raised: %s" % (cleanup, e))
226                 conn = None
227             except:
228                 conn = None
229                 raise
230
231         if conn is not None:
232             super(BaseConnectionPool, self).put((now, created_at, conn))
233         else:
234             # wake up any waiters with a flag value that indicates
235             # they need to manufacture a connection
236             if self.waiting() > 0:
237                 super(BaseConnectionPool, self).put(None)
238             else:
239                 # no waiters -- just change the size
240                 self.current_size -= 1
241         self._schedule_expiration()
242
243     @contextmanager
244     def item(self, cleanup=_MISSING):
245         conn = self.get()
246         try:
247             yield conn
248         finally:
249             self.put(conn, cleanup=cleanup)
250
251     def clear(self):
252         """Close all connections that this pool still holds a reference to,
253         and removes all references to them.
254         """
255         if self._expiration_timer:
256             self._expiration_timer.cancel()
257         free_items, self.free_items = self.free_items, deque()
258         for item in free_items:
259             # Free items created using min_size>0 are not tuples.
260             conn = item[2] if isinstance(item, tuple) else item
261             self._safe_close(conn, quiet=True)
262             self.current_size -= 1
263
264     def __del__(self):
265         self.clear()
266
267
268 class TpooledConnectionPool(BaseConnectionPool):
269     """A pool which gives out :class:`~eventlet.tpool.Proxy`-based database
270     connections.
271     """
272
273     def create(self):
274         now = time.time()
275         return now, now, self.connect(
276             self._db_module, self.connect_timeout, *self._args, **self._kwargs)
277
278     @classmethod
279     def connect(cls, db_module, connect_timeout, *args, **kw):
280         t = timeout.Timeout(connect_timeout, ConnectTimeout())
281         try:
282             from eventlet import tpool
283             conn = tpool.execute(db_module.connect, *args, **kw)
284             return tpool.Proxy(conn, autowrap_names=('cursor',))
285         finally:
286             t.cancel()
287
288
289 class RawConnectionPool(BaseConnectionPool):
290     """A pool which gives out plain database connections.
291     """
292
293     def create(self):
294         now = time.time()
295         return now, now, self.connect(
296             self._db_module, self.connect_timeout, *self._args, **self._kwargs)
297
298     @classmethod
299     def connect(cls, db_module, connect_timeout, *args, **kw):
300         t = timeout.Timeout(connect_timeout, ConnectTimeout())
301         try:
302             return db_module.connect(*args, **kw)
303         finally:
304             t.cancel()
305
306
307 # default connection pool is the tpool one
308 ConnectionPool = TpooledConnectionPool
309
310
311 class GenericConnectionWrapper(object):
312     def __init__(self, baseconn):
313         self._base = baseconn
314
315     # Proxy all method calls to self._base
316     # FIXME: remove repetition; options to consider:
317     # * for name in (...):
318     #     setattr(class, name, lambda self, *a, **kw: getattr(self._base, name)(*a, **kw))
319     # * def __getattr__(self, name): if name in (...): return getattr(self._base, name)
320     # * other?
321     def __enter__(self):
322         return self._base.__enter__()
323
324     def __exit__(self, exc, value, tb):
325         return self._base.__exit__(exc, value, tb)
326
327     def __repr__(self):
328         return self._base.__repr__()
329
330     def affected_rows(self):
331         return self._base.affected_rows()
332
333     def autocommit(self, *args, **kwargs):
334         return self._base.autocommit(*args, **kwargs)
335
336     def begin(self):
337         return self._base.begin()
338
339     def change_user(self, *args, **kwargs):
340         return self._base.change_user(*args, **kwargs)
341
342     def character_set_name(self, *args, **kwargs):
343         return self._base.character_set_name(*args, **kwargs)
344
345     def close(self, *args, **kwargs):
346         return self._base.close(*args, **kwargs)
347
348     def commit(self, *args, **kwargs):
349         return self._base.commit(*args, **kwargs)
350
351     def cursor(self, *args, **kwargs):
352         return self._base.cursor(*args, **kwargs)
353
354     def dump_debug_info(self, *args, **kwargs):
355         return self._base.dump_debug_info(*args, **kwargs)
356
357     def errno(self, *args, **kwargs):
358         return self._base.errno(*args, **kwargs)
359
360     def error(self, *args, **kwargs):
361         return self._base.error(*args, **kwargs)
362
363     def errorhandler(self, *args, **kwargs):
364         return self._base.errorhandler(*args, **kwargs)
365
366     def insert_id(self, *args, **kwargs):
367         return self._base.insert_id(*args, **kwargs)
368
369     def literal(self, *args, **kwargs):
370         return self._base.literal(*args, **kwargs)
371
372     def set_character_set(self, *args, **kwargs):
373         return self._base.set_character_set(*args, **kwargs)
374
375     def set_sql_mode(self, *args, **kwargs):
376         return self._base.set_sql_mode(*args, **kwargs)
377
378     def show_warnings(self):
379         return self._base.show_warnings()
380
381     def warning_count(self):
382         return self._base.warning_count()
383
384     def ping(self, *args, **kwargs):
385         return self._base.ping(*args, **kwargs)
386
387     def query(self, *args, **kwargs):
388         return self._base.query(*args, **kwargs)
389
390     def rollback(self, *args, **kwargs):
391         return self._base.rollback(*args, **kwargs)
392
393     def select_db(self, *args, **kwargs):
394         return self._base.select_db(*args, **kwargs)
395
396     def set_server_option(self, *args, **kwargs):
397         return self._base.set_server_option(*args, **kwargs)
398
399     def server_capabilities(self, *args, **kwargs):
400         return self._base.server_capabilities(*args, **kwargs)
401
402     def shutdown(self, *args, **kwargs):
403         return self._base.shutdown(*args, **kwargs)
404
405     def sqlstate(self, *args, **kwargs):
406         return self._base.sqlstate(*args, **kwargs)
407
408     def stat(self, *args, **kwargs):
409         return self._base.stat(*args, **kwargs)
410
411     def store_result(self, *args, **kwargs):
412         return self._base.store_result(*args, **kwargs)
413
414     def string_literal(self, *args, **kwargs):
415         return self._base.string_literal(*args, **kwargs)
416
417     def thread_id(self, *args, **kwargs):
418         return self._base.thread_id(*args, **kwargs)
419
420     def use_result(self, *args, **kwargs):
421         return self._base.use_result(*args, **kwargs)
422
423
424 class PooledConnectionWrapper(GenericConnectionWrapper):
425     """A connection wrapper where:
426     - the close method returns the connection to the pool instead of closing it directly
427     - ``bool(conn)`` returns a reasonable value
428     - returns itself to the pool if it gets garbage collected
429     """
430
431     def __init__(self, baseconn, pool):
432         super(PooledConnectionWrapper, self).__init__(baseconn)
433         self._pool = pool
434
435     def __nonzero__(self):
436         return (hasattr(self, '_base') and bool(self._base))
437
438     __bool__ = __nonzero__
439
440     def _destroy(self):
441         self._pool = None
442         try:
443             del self._base
444         except AttributeError:
445             pass
446
447     def close(self):
448         """Return the connection to the pool, and remove the
449         reference to it so that you can't use it again through this
450         wrapper object.
451         """
452         if self and self._pool:
453             self._pool.put(self)
454         self._destroy()
455
456     def __del__(self):
457         return  # this causes some issues if __del__ is called in the
458         # main coroutine, so for now this is disabled
459         # self.close()
460
461
462 class DatabaseConnector(object):
463     """
464     This is an object which will maintain a collection of database
465     connection pools on a per-host basis.
466     """
467
468     def __init__(self, module, credentials,
469                  conn_pool=None, *args, **kwargs):
470         """constructor
471         *module*
472             Database module to use.
473         *credentials*
474             Mapping of hostname to connect arguments (e.g. username and password)
475         """
476         assert(module)
477         self._conn_pool_class = conn_pool
478         if self._conn_pool_class is None:
479             self._conn_pool_class = ConnectionPool
480         self._module = module
481         self._args = args
482         self._kwargs = kwargs
483         # this is a map of hostname to username/password
484         self._credentials = credentials
485         self._databases = {}
486
487     def credentials_for(self, host):
488         if host in self._credentials:
489             return self._credentials[host]
490         else:
491             return self._credentials.get('default', None)
492
493     def get(self, host, dbname):
494         """Returns a ConnectionPool to the target host and schema.
495         """
496         key = (host, dbname)
497         if key not in self._databases:
498             new_kwargs = self._kwargs.copy()
499             new_kwargs['db'] = dbname
500             new_kwargs['host'] = host
501             new_kwargs.update(self.credentials_for(host))
502             dbpool = self._conn_pool_class(
503                 self._module, *self._args, **new_kwargs)
504             self._databases[key] = dbpool
505
506         return self._databases[key]