1 diff --git a/neunton/common/exceptions.py b/neunton/common/exceptions.py
2 index c99c254..e24f7bc 100644
3 --- a/neunton/common/exceptions.py
4 +++ b/neunton/common/exceptions.py
5 @@ -235,3 +235,7 @@ class InvalidSharedSetting(QuantumException):
7 class InvalidExtenstionEnv(QuantumException):
8 message = _("Invalid extension environment: %(reason)s")
10 +class DBError(Error):
11 + message = _("Database error")
13 diff --git a/neunton/db/api.py b/neunton/db/api.py
14 index 238a9f9..737c748 100644
15 --- a/neunton/db/api.py
16 +++ b/neunton/db/api.py
23 import sqlalchemy as sql
24 from sqlalchemy import create_engine
25 from sqlalchemy.exc import DisconnectionError
26 +from sqlalchemy.exc import OperationalError
27 from sqlalchemy.orm import sessionmaker, exc
29 from quantum.db import model_base
30 +from quantum.common.exceptions import DBError
32 LOG = logging.getLogger(__name__)
34 @@ -33,28 +37,61 @@ LOG = logging.getLogger(__name__)
37 BASE = model_base.BASE
40 +def is_db_connection_error(args):
41 + """Return True if error in connecting to db."""
42 + # NOTE(adam_g): This is currently MySQL specific and needs to be extended
43 + # to support Postgres and others.
44 + conn_err_codes = ('2002', '2003', '2006', '2013', '2014', '2045', '2055')
45 + for err_code in conn_err_codes:
46 + if args.find(err_code) != -1:
50 -class MySQLPingListener(object):
53 - Ensures that MySQL connections checked out of the
55 +def wrap_db_error(f):
56 + """Function wrapper to capture DB errors
59 - http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
61 + If an exception is thrown by the wrapped function,
62 + determine if it represents a database connection error.
63 + If so, retry the wrapped function, and repeat until it succeeds
64 + or we reach a configurable maximum number of retries.
65 + If it is not a connection error, or we exceeded the retry limit,
68 - def checkout(self, dbapi_con, con_record, con_proxy):
70 - dbapi_con.cursor().execute('select 1')
71 - except dbapi_con.OperationalError, ex:
72 - if ex.args[0] in (2006, 2013, 2014, 2045, 2055):
73 - LOG.warn('Got mysql server has gone away: %s', ex)
74 - raise DisconnectionError("Database server went away")
78 + def _wrap_db_error(*args, **kwargs):
79 + next_interval = OPTIONS.get('reconnect_interval', 1)
80 + remaining = OPTIONS.get('sql_max_retries', -1)
82 + remaining = 'infinite'
85 + return f(*args, **kwargs)
86 + except OperationalError, e:
87 + if is_db_connection_error(e.args[0]):
89 + logging.warn('DB exceeded retry limit.')
91 + if remaining != 'infinite':
93 + logging.warn('DB connection error, '
94 + 'retrying in %i seconds.' % next_interval)
95 + time.sleep(next_interval)
96 + if OPTIONS.get('inc_reconnect_interval', True):
97 + next_interval = min(next_interval * 2,
98 + OPTIONS.get('max_reconnect_interval', 60))
100 + logging.warn('DB exception wrapped.')
102 + except Exception, e:
105 + _wrap_db_error.func_name = f.func_name
106 + return _wrap_db_error
109 def configure_db(options):
111 @@ -63,6 +100,8 @@ def configure_db(options):
113 :param options: Mapping of configuration options
119 connection_dict = sql.engine.url.make_url(options['sql_connection'])
120 @@ -72,9 +111,6 @@ def configure_db(options):
121 'convert_unicode': True,
124 - if 'mysql' in connection_dict.drivername:
125 - engine_args['listeners'] = [MySQLPingListener()]
127 _ENGINE = create_engine(options['sql_connection'], **engine_args)
128 base = options.get('base', BASE)
129 if not register_models(base):
130 @@ -101,10 +137,18 @@ def get_session(autocommit=True, expire_on_commit=False):
131 global _MAKER, _ENGINE
134 + class OurQuery(sql.orm.query.Query):
137 + query.all = wrap_db_error(query.all)
138 + query.first = wrap_db_error(query.first)
139 _MAKER = sessionmaker(bind=_ENGINE,
140 autocommit=autocommit,
141 - expire_on_commit=expire_on_commit)
143 + expire_on_commit=expire_on_commit,
144 + query_cls=OurQuery)
146 + session.flush = wrap_db_error(session.flush)
150 def retry_registration(remaining, reconnect_interval, base=BASE):