Adjust the package revision; no actual code changes
[packages/trusty/python-eventlet.git] / eventlet / eventlet / db_pool.py
1 from __future__ import print_function
2
3 from collections import deque
4 from contextlib import contextmanager
5 import sys
6 import time
7
8 from eventlet.pools import Pool
9 from eventlet import timeout
10 from eventlet import hubs
11 from eventlet.hubs.timer import Timer
12 from eventlet.greenthread import GreenThread
13
14
15 _MISSING = object()
16
17
18 class ConnectTimeout(Exception):
19     pass
20
21
22 def cleanup_rollback(conn):
23     conn.rollback()
24
25
26 class BaseConnectionPool(Pool):
27     def __init__(self, db_module,
28                  min_size=0, max_size=4,
29                  max_idle=10, max_age=30,
30                  connect_timeout=5,
31                  cleanup=cleanup_rollback,
32                  *args, **kwargs):
33         """
34         Constructs a pool with at least *min_size* connections and at most
35         *max_size* connections.  Uses *db_module* to construct new connections.
36
37         The *max_idle* parameter determines how long pooled connections can
38         remain idle, in seconds.  After *max_idle* seconds have elapsed
39         without the connection being used, the pool closes the connection.
40
41         *max_age* is how long any particular connection is allowed to live.
42         Connections that have been open for longer than *max_age* seconds are
43         closed, regardless of idle time.  If *max_age* is 0, all connections are
44         closed on return to the pool, reducing it to a concurrency limiter.
45
46         *connect_timeout* is the duration in seconds that the pool will wait
47         before timing out on connect() to the database.  If triggered, the
48         timeout will raise a ConnectTimeout from get().
49
50         The remainder of the arguments are used as parameters to the
51         *db_module*'s connection constructor.
52         """
53         assert(db_module)
54         self._db_module = db_module
55         self._args = args
56         self._kwargs = kwargs
57         self.max_idle = max_idle
58         self.max_age = max_age
59         self.connect_timeout = connect_timeout
60         self._expiration_timer = None
61         self.cleanup = cleanup
62         super(BaseConnectionPool, self).__init__(min_size=min_size,
63                                                  max_size=max_size,
64                                                  order_as_stack=True)
65
66     def _schedule_expiration(self):
67         """Sets up a timer that will call _expire_old_connections when the
68         oldest connection currently in the free pool is ready to expire.  This
69         is the earliest possible time that a connection could expire, thus, the
70         timer will be running as infrequently as possible without missing a
71         possible expiration.
72
73         If this function is called when a timer is already scheduled, it does
74         nothing.
75
76         If max_age or max_idle is 0, _schedule_expiration likewise does nothing.
77         """
78         if self.max_age is 0 or self.max_idle is 0:
79             # expiration is unnecessary because all connections will be expired
80             # on put
81             return
82
83         if (self._expiration_timer is not None
84                 and not getattr(self._expiration_timer, 'called', False)):
85             # the next timer is already scheduled
86             return
87
88         try:
89             now = time.time()
90             self._expire_old_connections(now)
91             # the last item in the list, because of the stack ordering,
92             # is going to be the most-idle
93             idle_delay = (self.free_items[-1][0] - now) + self.max_idle
94             oldest = min([t[1] for t in self.free_items])
95             age_delay = (oldest - now) + self.max_age
96
97             next_delay = min(idle_delay, age_delay)
98         except (IndexError, ValueError):
99             # no free items, unschedule ourselves
100             self._expiration_timer = None
101             return
102
103         if next_delay > 0:
104             # set up a continuous self-calling loop
105             self._expiration_timer = Timer(next_delay, GreenThread(hubs.get_hub().greenlet).switch,
106                                            self._schedule_expiration, [], {})
107             self._expiration_timer.schedule()
108
109     def _expire_old_connections(self, now):
110         """Iterates through the open connections contained in the pool, closing
111         ones that have remained idle for longer than max_idle seconds, or have
112         been in existence for longer than max_age seconds.
113
114         *now* is the current time, as returned by time.time().
115         """
116         original_count = len(self.free_items)
117         expired = [
118             conn
119             for last_used, created_at, conn in self.free_items
120             if self._is_expired(now, last_used, created_at)]
121
122         new_free = [
123             (last_used, created_at, conn)
124             for last_used, created_at, conn in self.free_items
125             if not self._is_expired(now, last_used, created_at)]
126         self.free_items.clear()
127         self.free_items.extend(new_free)
128
129         # adjust the current size counter to account for expired
130         # connections
131         self.current_size -= original_count - len(self.free_items)
132
133         for conn in expired:
134             self._safe_close(conn, quiet=True)
135
136     def _is_expired(self, now, last_used, created_at):
137         """Returns true and closes the connection if it's expired.
138         """
139         if (self.max_idle <= 0 or self.max_age <= 0
140                 or now - last_used > self.max_idle
141                 or now - created_at > self.max_age):
142             return True
143         return False
144
145     def _unwrap_connection(self, conn):
146         """If the connection was wrapped by a subclass of
147         BaseConnectionWrapper and is still functional (as determined
148         by the __nonzero__, or __bool__ in python3, method), returns
149         the unwrapped connection.  If anything goes wrong with this
150         process, returns None.
151         """
152         base = None
153         try:
154             if conn:
155                 base = conn._base
156                 conn._destroy()
157             else:
158                 base = None
159         except AttributeError:
160             pass
161         return base
162
163     def _safe_close(self, conn, quiet=False):
164         """Closes the (already unwrapped) connection, squelching any
165         exceptions.
166         """
167         try:
168             conn.close()
169         except AttributeError:
170             pass  # conn is None, or junk
171         except Exception:
172             if not quiet:
173                 print("Connection.close raised: %s" % (sys.exc_info()[1]))
174
175     def get(self):
176         conn = super(BaseConnectionPool, self).get()
177
178         # None is a flag value that means that put got called with
179         # something it couldn't use
180         if conn is None:
181             try:
182                 conn = self.create()
183             except Exception:
184                 # unconditionally increase the free pool because
185                 # even if there are waiters, doing a full put
186                 # would incur a greenlib switch and thus lose the
187                 # exception stack
188                 self.current_size -= 1
189                 raise
190
191         # if the call to get() draws from the free pool, it will come
192         # back as a tuple
193         if isinstance(conn, tuple):
194             _last_used, created_at, conn = conn
195         else:
196             created_at = time.time()
197
198         # wrap the connection so the consumer can call close() safely
199         wrapped = PooledConnectionWrapper(conn, self)
200         # annotating the wrapper so that when it gets put in the pool
201         # again, we'll know how old it is
202         wrapped._db_pool_created_at = created_at
203         return wrapped
204
205     def put(self, conn, cleanup=_MISSING):
206         created_at = getattr(conn, '_db_pool_created_at', 0)
207         now = time.time()
208         conn = self._unwrap_connection(conn)
209
210         if self._is_expired(now, now, created_at):
211             self._safe_close(conn, quiet=False)
212             conn = None
213         elif cleanup is not None:
214             if cleanup is _MISSING:
215                 cleanup = self.cleanup
216             # by default, call rollback in case the connection is in the middle
217             # of a transaction. However, rollback has performance implications
218             # so optionally do nothing or call something else like ping
219             try:
220                 if conn:
221                     cleanup(conn)
222             except Exception as e:
223                 # we don't care what the exception was, we just know the
224                 # connection is dead
225                 print("WARNING: cleanup %s raised: %s" % (cleanup, e))
226                 conn = None
227             except:
228                 conn = None
229                 raise
230
231         if conn is not None:
232             super(BaseConnectionPool, self).put((now, created_at, conn))
233         else:
234             # wake up any waiters with a flag value that indicates
235             # they need to manufacture a connection
236             if self.waiting() > 0:
237                 super(BaseConnectionPool, self).put(None)
238             else:
239                 # no waiters -- just change the size
240                 self.current_size -= 1
241         self._schedule_expiration()
242
243     @contextmanager
244     def item(self, cleanup=_MISSING):
245         conn = self.get()
246         try:
247             yield conn
248         finally:
249             self.put(conn, cleanup=cleanup)
250
251     def clear(self):
252         """Close all connections that this pool still holds a reference to,
253         and removes all references to them.
254         """
255         if self._expiration_timer:
256             self._expiration_timer.cancel()
257         free_items, self.free_items = self.free_items, deque()
258         for item in free_items:
259             # Free items created using min_size>0 are not tuples.
260             conn = item[2] if isinstance(item, tuple) else item
261             self._safe_close(conn, quiet=True)
262
263     def __del__(self):
264         self.clear()
265
266
267 class TpooledConnectionPool(BaseConnectionPool):
268     """A pool which gives out :class:`~eventlet.tpool.Proxy`-based database
269     connections.
270     """
271
272     def create(self):
273         now = time.time()
274         return now, now, self.connect(
275             self._db_module, self.connect_timeout, *self._args, **self._kwargs)
276
277     @classmethod
278     def connect(cls, db_module, connect_timeout, *args, **kw):
279         t = timeout.Timeout(connect_timeout, ConnectTimeout())
280         try:
281             from eventlet import tpool
282             conn = tpool.execute(db_module.connect, *args, **kw)
283             return tpool.Proxy(conn, autowrap_names=('cursor',))
284         finally:
285             t.cancel()
286
287
288 class RawConnectionPool(BaseConnectionPool):
289     """A pool which gives out plain database connections.
290     """
291
292     def create(self):
293         now = time.time()
294         return now, now, self.connect(
295             self._db_module, self.connect_timeout, *self._args, **self._kwargs)
296
297     @classmethod
298     def connect(cls, db_module, connect_timeout, *args, **kw):
299         t = timeout.Timeout(connect_timeout, ConnectTimeout())
300         try:
301             return db_module.connect(*args, **kw)
302         finally:
303             t.cancel()
304
305
306 # default connection pool is the tpool one
307 ConnectionPool = TpooledConnectionPool
308
309
310 class GenericConnectionWrapper(object):
311     def __init__(self, baseconn):
312         self._base = baseconn
313
314     # Proxy all method calls to self._base
315     # FIXME: remove repetition; options to consider:
316     # * for name in (...):
317     #     setattr(class, name, lambda self, *a, **kw: getattr(self._base, name)(*a, **kw))
318     # * def __getattr__(self, name): if name in (...): return getattr(self._base, name)
319     # * other?
320     def __enter__(self): return self._base.__enter__()
321
322     def __exit__(self, exc, value, tb): return self._base.__exit__(exc, value, tb)
323
324     def __repr__(self): return self._base.__repr__()
325
326     def affected_rows(self): return self._base.affected_rows()
327
328     def autocommit(self, *args, **kwargs): return self._base.autocommit(*args, **kwargs)
329
330     def begin(self): return self._base.begin()
331
332     def change_user(self, *args, **kwargs): return self._base.change_user(*args, **kwargs)
333
334     def character_set_name(self, *args, **kwargs): return self._base.character_set_name(*args, **kwargs)
335
336     def close(self, *args, **kwargs): return self._base.close(*args, **kwargs)
337
338     def commit(self, *args, **kwargs): return self._base.commit(*args, **kwargs)
339
340     def cursor(self, *args, **kwargs): return self._base.cursor(*args, **kwargs)
341
342     def dump_debug_info(self, *args, **kwargs): return self._base.dump_debug_info(*args, **kwargs)
343
344     def errno(self, *args, **kwargs): return self._base.errno(*args, **kwargs)
345
346     def error(self, *args, **kwargs): return self._base.error(*args, **kwargs)
347
348     def errorhandler(self, *args, **kwargs): return self._base.errorhandler(*args, **kwargs)
349
350     def insert_id(self, *args, **kwargs): return self._base.insert_id(*args, **kwargs)
351
352     def literal(self, *args, **kwargs): return self._base.literal(*args, **kwargs)
353
354     def set_character_set(self, *args, **kwargs): return self._base.set_character_set(*args, **kwargs)
355
356     def set_sql_mode(self, *args, **kwargs): return self._base.set_sql_mode(*args, **kwargs)
357
358     def show_warnings(self): return self._base.show_warnings()
359
360     def warning_count(self): return self._base.warning_count()
361
362     def ping(self, *args, **kwargs): return self._base.ping(*args, **kwargs)
363
364     def query(self, *args, **kwargs): return self._base.query(*args, **kwargs)
365
366     def rollback(self, *args, **kwargs): return self._base.rollback(*args, **kwargs)
367
368     def select_db(self, *args, **kwargs): return self._base.select_db(*args, **kwargs)
369
370     def set_server_option(self, *args, **kwargs): return self._base.set_server_option(*args, **kwargs)
371
372     def server_capabilities(self, *args, **kwargs): return self._base.server_capabilities(*args, **kwargs)
373
374     def shutdown(self, *args, **kwargs): return self._base.shutdown(*args, **kwargs)
375
376     def sqlstate(self, *args, **kwargs): return self._base.sqlstate(*args, **kwargs)
377
378     def stat(self, *args, **kwargs): return self._base.stat(*args, **kwargs)
379
380     def store_result(self, *args, **kwargs): return self._base.store_result(*args, **kwargs)
381
382     def string_literal(self, *args, **kwargs): return self._base.string_literal(*args, **kwargs)
383
384     def thread_id(self, *args, **kwargs): return self._base.thread_id(*args, **kwargs)
385
386     def use_result(self, *args, **kwargs): return self._base.use_result(*args, **kwargs)
387
388
389 class PooledConnectionWrapper(GenericConnectionWrapper):
390     """A connection wrapper where:
391     - the close method returns the connection to the pool instead of closing it directly
392     - ``bool(conn)`` returns a reasonable value
393     - returns itself to the pool if it gets garbage collected
394     """
395
396     def __init__(self, baseconn, pool):
397         super(PooledConnectionWrapper, self).__init__(baseconn)
398         self._pool = pool
399
400     def __nonzero__(self):
401         return (hasattr(self, '_base') and bool(self._base))
402
403     __bool__ = __nonzero__
404
405     def _destroy(self):
406         self._pool = None
407         try:
408             del self._base
409         except AttributeError:
410             pass
411
412     def close(self):
413         """Return the connection to the pool, and remove the
414         reference to it so that you can't use it again through this
415         wrapper object.
416         """
417         if self and self._pool:
418             self._pool.put(self)
419         self._destroy()
420
421     def __del__(self):
422         return  # this causes some issues if __del__ is called in the
423                 # main coroutine, so for now this is disabled
424         # self.close()
425
426
427 class DatabaseConnector(object):
428     """
429     This is an object which will maintain a collection of database
430     connection pools on a per-host basis.
431     """
432
433     def __init__(self, module, credentials,
434                  conn_pool=None, *args, **kwargs):
435         """constructor
436         *module*
437             Database module to use.
438         *credentials*
439             Mapping of hostname to connect arguments (e.g. username and password)
440         """
441         assert(module)
442         self._conn_pool_class = conn_pool
443         if self._conn_pool_class is None:
444             self._conn_pool_class = ConnectionPool
445         self._module = module
446         self._args = args
447         self._kwargs = kwargs
448         # this is a map of hostname to username/password
449         self._credentials = credentials
450         self._databases = {}
451
452     def credentials_for(self, host):
453         if host in self._credentials:
454             return self._credentials[host]
455         else:
456             return self._credentials.get('default', None)
457
458     def get(self, host, dbname):
459         """Returns a ConnectionPool to the target host and schema.
460         """
461         key = (host, dbname)
462         if key not in self._databases:
463             new_kwargs = self._kwargs.copy()
464             new_kwargs['db'] = dbname
465             new_kwargs['host'] = host
466             new_kwargs.update(self.credentials_for(host))
467             dbpool = self._conn_pool_class(
468                 self._module, *self._args, **new_kwargs)
469             self._databases[key] = dbpool
470
471         return self._databases[key]