From 53609f29f3c8fcadc545afb891189253c07b33c3 Mon Sep 17 00:00:00 2001 From: Roman Podoliaka Date: Tue, 8 Apr 2014 11:47:48 +0300 Subject: [PATCH] Sync db code from oslo-incubator Synced from commit 5fb343faee1442921e2d610b1a5222e67418c4cd. Due to changes in oslo.db API, this sync requires a bit more work on Neutron side. Sync includes the following commits: 5b7e61c Dispose db connections pool on disconnect d1988b9 Set sql_mode callback on connect instead of checkout a1a8280 Fix excessive logging from db.sqlalchemy.session 9933bdd Get mysql_sql_mode parameter from config 96a2217 Prevent incorrect usage of _wrap_db_error() 20a7510 Add from_config() method to EngineFacade fea119e Drop special case for MySQL traditional mode, update unit tests dda24eb Introduce mysql_sql_mode option, remove old warning 0b5af67 Introduce a method to set any MySQL session SQL mode 8dccc7b Handle ibm_db_sa DBDuplicateEntry integrity errors 5b9e9f4 Fix doc build errors in db.sqlalchemy ac84a40 Update log translation domains 86707cd Remove None for dict.get() 0545121 Fix duplicating of SQL queries in logs fcf517d Update oslo log messages with translation domains 630d395 Don't use cfg.CONF in oslo.db ce69e7f Don't store engine instances in oslo.db Change-Id: I0e1d86878d3eb924b01e04dced0f90b4e57757d8 --- neutron/common/config.py | 4 +- neutron/db/api.py | 37 +- neutron/openstack/common/db/api.py | 178 +++--- neutron/openstack/common/db/options.py | 171 ++++++ .../openstack/common/db/sqlalchemy/models.py | 16 +- .../common/db/sqlalchemy/provision.py | 157 +++++ .../openstack/common/db/sqlalchemy/session.py | 546 +++++++++--------- .../common/db/sqlalchemy/test_base.py | 153 +++++ .../openstack/common/db/sqlalchemy/utils.py | 122 +++- neutron/service.py | 4 +- 10 files changed, 1010 insertions(+), 378 deletions(-) create mode 100644 neutron/openstack/common/db/options.py create mode 100644 neutron/openstack/common/db/sqlalchemy/provision.py create mode 100644 neutron/openstack/common/db/sqlalchemy/test_base.py diff --git a/neutron/common/config.py b/neutron/common/config.py index c38a1c13c..76a651a5c 100644 --- a/neutron/common/config.py +++ b/neutron/common/config.py @@ -24,7 +24,7 @@ from paste import deploy from neutron.api.v2 import attributes from neutron.common import utils -from neutron.openstack.common.db.sqlalchemy import session as db_session +from neutron.openstack.common.db import options as db_options from neutron.openstack.common import log as logging from neutron.openstack.common import rpc from neutron.version import version_info as neutron_version @@ -129,7 +129,7 @@ rpc.set_defaults(control_exchange='neutron') _SQL_CONNECTION_DEFAULT = 'sqlite://' # Update the default QueuePool parameters. These can be tweaked by the # configuration variables - max_pool_size, max_overflow and pool_timeout -db_session.set_defaults(sql_connection=_SQL_CONNECTION_DEFAULT, +db_options.set_defaults(sql_connection=_SQL_CONNECTION_DEFAULT, sqlite_db='', max_pool_size=10, max_overflow=20, pool_timeout=10) diff --git a/neutron/db/api.py b/neutron/db/api.py index 85bd42821..3ccf40624 100644 --- a/neutron/db/api.py +++ b/neutron/db/api.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo.config import cfg import sqlalchemy as sql from neutron.db import model_base @@ -23,6 +24,22 @@ LOG = logging.getLogger(__name__) BASE = model_base.BASEV2 +cfg.CONF.import_opt('connection', + 'neutron.openstack.common.db.options', + group='database') + +_FACADE = None + + +def _create_facade_lazily(): + global _FACADE + + if _FACADE is None: + _FACADE = session.EngineFacade.from_config( + cfg.CONF.database.connection, cfg.CONF, sqlite_fk=True) + + return _FACADE + def configure_db(): """Configure database. @@ -30,26 +47,31 @@ def configure_db(): Establish the database, create an engine if needed, and register the models. """ - session.get_engine(sqlite_fk=True) register_models() def clear_db(base=BASE): unregister_models(base) - session.cleanup() + + +def get_engine(): + """Helper method to grab engine.""" + facade = _create_facade_lazily() + return facade.get_engine() def get_session(autocommit=True, expire_on_commit=False): """Helper method to grab session.""" - return session.get_session(autocommit=autocommit, - expire_on_commit=expire_on_commit, - sqlite_fk=True) + facade = _create_facade_lazily() + return facade.get_session(autocommit=autocommit, + expire_on_commit=expire_on_commit) def register_models(base=BASE): """Register Models and create properties.""" try: - engine = session.get_engine(sqlite_fk=True) + facade = _create_facade_lazily() + engine = facade.get_engine() base.metadata.create_all(engine) except sql.exc.OperationalError as e: LOG.info(_("Database registration exception: %s"), e) @@ -60,7 +82,8 @@ def register_models(base=BASE): def unregister_models(base=BASE): """Unregister Models, useful clearing out data before testing.""" try: - engine = session.get_engine(sqlite_fk=True) + facade = _create_facade_lazily() + engine = facade.get_engine() base.metadata.drop_all(engine) except Exception: LOG.exception(_("Database exception")) diff --git a/neutron/openstack/common/db/api.py b/neutron/openstack/common/db/api.py index a0cec8562..7f71d6a2b 100644 --- a/neutron/openstack/common/db/api.py +++ b/neutron/openstack/common/db/api.py @@ -15,11 +15,6 @@ """Multiple DB API backend support. -Supported configuration options: - -The following two parameters are in the 'database' group: -`backend`: DB backend name or full module path to DB backend module. - A DB backend module should implement a method named 'get_backend' which takes no arguments. The method can return any object that implements DB API methods. @@ -27,45 +22,14 @@ API methods. import functools import logging +import threading import time -from oslo.config import cfg - from neutron.openstack.common.db import exception -from neutron.openstack.common.gettextutils import _ # noqa +from neutron.openstack.common.gettextutils import _LE from neutron.openstack.common import importutils -db_opts = [ - cfg.StrOpt('backend', - default='sqlalchemy', - deprecated_name='db_backend', - deprecated_group='DEFAULT', - help='The backend to use for db'), - cfg.BoolOpt('use_db_reconnect', - default=False, - help='Enable the experimental use of database reconnect ' - 'on connection lost'), - cfg.IntOpt('db_retry_interval', - default=1, - help='seconds between db connection retries'), - cfg.BoolOpt('db_inc_retry_interval', - default=True, - help='Whether to increase interval between db connection ' - 'retries, up to db_max_retry_interval'), - cfg.IntOpt('db_max_retry_interval', - default=10, - help='max seconds between db connection retries, if ' - 'db_inc_retry_interval is enabled'), - cfg.IntOpt('db_max_retries', - default=20, - help='maximum db connection retries before error is raised. ' - '(setting -1 implies an infinite retry count)'), -] - -CONF = cfg.CONF -CONF.register_opts(db_opts, 'database') - LOG = logging.getLogger(__name__) @@ -75,7 +39,7 @@ def safe_for_db_retry(f): return f -def _wrap_db_retry(f): +class wrap_db_retry(object): """Retry db.api methods, if DBConnectionError() raised Retry decorated db.api methods. If we enabled `use_db_reconnect` @@ -84,53 +48,115 @@ def _wrap_db_retry(f): Decorator catchs DBConnectionError() and retries function in a loop until it succeeds, or until maximum retries count will be reached. """ - @functools.wraps(f) - def wrapper(*args, **kwargs): - next_interval = CONF.database.db_retry_interval - remaining = CONF.database.db_max_retries - - while True: - try: - return f(*args, **kwargs) - except exception.DBConnectionError as e: - if remaining == 0: - LOG.exception(_('DB exceeded retry limit.')) - raise exception.DBError(e) - if remaining != -1: - remaining -= 1 - LOG.exception(_('DB connection error.')) - # NOTE(vsergeyev): We are using patched time module, so this - # effectively yields the execution context to - # another green thread. - time.sleep(next_interval) - if CONF.database.db_inc_retry_interval: - next_interval = min( - next_interval * 2, - CONF.database.db_max_retry_interval - ) - return wrapper + + def __init__(self, retry_interval, max_retries, inc_retry_interval, + max_retry_interval): + super(wrap_db_retry, self).__init__() + + self.retry_interval = retry_interval + self.max_retries = max_retries + self.inc_retry_interval = inc_retry_interval + self.max_retry_interval = max_retry_interval + + def __call__(self, f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + next_interval = self.retry_interval + remaining = self.max_retries + + while True: + try: + return f(*args, **kwargs) + except exception.DBConnectionError as e: + if remaining == 0: + LOG.exception(_LE('DB exceeded retry limit.')) + raise exception.DBError(e) + if remaining != -1: + remaining -= 1 + LOG.exception(_LE('DB connection error.')) + # NOTE(vsergeyev): We are using patched time module, so + # this effectively yields the execution + # context to another green thread. + time.sleep(next_interval) + if self.inc_retry_interval: + next_interval = min( + next_interval * 2, + self.max_retry_interval + ) + return wrapper class DBAPI(object): - def __init__(self, backend_mapping=None): - if backend_mapping is None: - backend_mapping = {} - backend_name = CONF.database.backend - # Import the untranslated name if we don't have a - # mapping. - backend_path = backend_mapping.get(backend_name, backend_name) - backend_mod = importutils.import_module(backend_path) - self.__backend = backend_mod.get_backend() + def __init__(self, backend_name, backend_mapping=None, lazy=False, + **kwargs): + """Initialize the chosen DB API backend. + + :param backend_name: name of the backend to load + :type backend_name: str + + :param backend_mapping: backend name -> module/class to load mapping + :type backend_mapping: dict + + :param lazy: load the DB backend lazily on the first DB API method call + :type lazy: bool + + Keyword arguments: + + :keyword use_db_reconnect: retry DB transactions on disconnect or not + :type use_db_reconnect: bool + + :keyword retry_interval: seconds between transaction retries + :type retry_interval: int + + :keyword inc_retry_interval: increase retry interval or not + :type inc_retry_interval: bool + + :keyword max_retry_interval: max interval value between retries + :type max_retry_interval: int + + :keyword max_retries: max number of retries before an error is raised + :type max_retries: int + + """ + + self._backend = None + self._backend_name = backend_name + self._backend_mapping = backend_mapping or {} + self._lock = threading.Lock() + + if not lazy: + self._load_backend() + + self.use_db_reconnect = kwargs.get('use_db_reconnect', False) + self.retry_interval = kwargs.get('retry_interval', 1) + self.inc_retry_interval = kwargs.get('inc_retry_interval', True) + self.max_retry_interval = kwargs.get('max_retry_interval', 10) + self.max_retries = kwargs.get('max_retries', 20) + + def _load_backend(self): + with self._lock: + if not self._backend: + # Import the untranslated name if we don't have a mapping + backend_path = self._backend_mapping.get(self._backend_name, + self._backend_name) + backend_mod = importutils.import_module(backend_path) + self._backend = backend_mod.get_backend() def __getattr__(self, key): - attr = getattr(self.__backend, key) + if not self._backend: + self._load_backend() + attr = getattr(self._backend, key) if not hasattr(attr, '__call__'): return attr # NOTE(vsergeyev): If `use_db_reconnect` option is set to True, retry # DB API methods, decorated with @safe_for_db_retry # on disconnect. - if CONF.database.use_db_reconnect and hasattr(attr, 'enable_retry'): - attr = _wrap_db_retry(attr) + if self.use_db_reconnect and hasattr(attr, 'enable_retry'): + attr = wrap_db_retry( + retry_interval=self.retry_interval, + max_retries=self.max_retries, + inc_retry_interval=self.inc_retry_interval, + max_retry_interval=self.max_retry_interval)(attr) return attr diff --git a/neutron/openstack/common/db/options.py b/neutron/openstack/common/db/options.py new file mode 100644 index 000000000..66443162c --- /dev/null +++ b/neutron/openstack/common/db/options.py @@ -0,0 +1,171 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy + +from oslo.config import cfg + + +database_opts = [ + cfg.StrOpt('sqlite_db', + deprecated_group='DEFAULT', + default='neutron.sqlite', + help='The file name to use with SQLite'), + cfg.BoolOpt('sqlite_synchronous', + deprecated_group='DEFAULT', + default=True, + help='If True, SQLite uses synchronous mode'), + cfg.StrOpt('backend', + default='sqlalchemy', + deprecated_name='db_backend', + deprecated_group='DEFAULT', + help='The backend to use for db'), + cfg.StrOpt('connection', + help='The SQLAlchemy connection string used to connect to the ' + 'database', + secret=True, + deprecated_opts=[cfg.DeprecatedOpt('sql_connection', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_connection', + group='DATABASE'), + cfg.DeprecatedOpt('connection', + group='sql'), ]), + cfg.StrOpt('mysql_sql_mode', + default='TRADITIONAL', + help='The SQL mode to be used for MySQL sessions. ' + 'This option, including the default, overrides any ' + 'server-set SQL mode. To use whatever SQL mode ' + 'is set by the server configuration, ' + 'set this to no value. Example: mysql_sql_mode='), + cfg.IntOpt('idle_timeout', + default=3600, + deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_idle_timeout', + group='DATABASE'), + cfg.DeprecatedOpt('idle_timeout', + group='sql')], + help='Timeout before idle sql connections are reaped'), + cfg.IntOpt('min_pool_size', + default=1, + deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_min_pool_size', + group='DATABASE')], + help='Minimum number of SQL connections to keep open in a ' + 'pool'), + cfg.IntOpt('max_pool_size', + default=None, + deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_max_pool_size', + group='DATABASE')], + help='Maximum number of SQL connections to keep open in a ' + 'pool'), + cfg.IntOpt('max_retries', + default=10, + deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries', + group='DEFAULT'), + cfg.DeprecatedOpt('sql_max_retries', + group='DATABASE')], + help='Maximum db connection retries during startup. ' + '(setting -1 implies an infinite retry count)'), + cfg.IntOpt('retry_interval', + default=10, + deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval', + group='DEFAULT'), + cfg.DeprecatedOpt('reconnect_interval', + group='DATABASE')], + help='Interval between retries of opening a sql connection'), + cfg.IntOpt('max_overflow', + default=None, + deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow', + group='DEFAULT'), + cfg.DeprecatedOpt('sqlalchemy_max_overflow', + group='DATABASE')], + help='If set, use this value for max_overflow with sqlalchemy'), + cfg.IntOpt('connection_debug', + default=0, + deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug', + group='DEFAULT')], + help='Verbosity of SQL debugging information. 0=None, ' + '100=Everything'), + cfg.BoolOpt('connection_trace', + default=False, + deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace', + group='DEFAULT')], + help='Add python stack traces to SQL as comment strings'), + cfg.IntOpt('pool_timeout', + default=None, + deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout', + group='DATABASE')], + help='If set, use this value for pool_timeout with sqlalchemy'), + cfg.BoolOpt('use_db_reconnect', + default=False, + help='Enable the experimental use of database reconnect ' + 'on connection lost'), + cfg.IntOpt('db_retry_interval', + default=1, + help='seconds between db connection retries'), + cfg.BoolOpt('db_inc_retry_interval', + default=True, + help='Whether to increase interval between db connection ' + 'retries, up to db_max_retry_interval'), + cfg.IntOpt('db_max_retry_interval', + default=10, + help='max seconds between db connection retries, if ' + 'db_inc_retry_interval is enabled'), + cfg.IntOpt('db_max_retries', + default=20, + help='maximum db connection retries before error is raised. ' + '(setting -1 implies an infinite retry count)'), +] + +CONF = cfg.CONF +CONF.register_opts(database_opts, 'database') + + +def set_defaults(sql_connection, sqlite_db, max_pool_size=None, + max_overflow=None, pool_timeout=None): + """Set defaults for configuration variables.""" + cfg.set_defaults(database_opts, + connection=sql_connection, + sqlite_db=sqlite_db) + # Update the QueuePool defaults + if max_pool_size is not None: + cfg.set_defaults(database_opts, + max_pool_size=max_pool_size) + if max_overflow is not None: + cfg.set_defaults(database_opts, + max_overflow=max_overflow) + if pool_timeout is not None: + cfg.set_defaults(database_opts, + pool_timeout=pool_timeout) + + +def list_opts(): + """Returns a list of oslo.config options available in the library. + + The returned list includes all oslo.config options which may be registered + at runtime by the library. + + Each element of the list is a tuple. The first element is the name of the + group under which the list of elements in the second element will be + registered. A group name of None corresponds to the [DEFAULT] group in + config files. + + The purpose of this is to allow tools like the Oslo sample config file + generator to discover the options exposed to users by this library. + + :returns: a list of (group_name, opts) tuples + """ + return [('database', copy.deepcopy(database_opts))] diff --git a/neutron/openstack/common/db/sqlalchemy/models.py b/neutron/openstack/common/db/sqlalchemy/models.py index 688f8c887..12cfc7645 100644 --- a/neutron/openstack/common/db/sqlalchemy/models.py +++ b/neutron/openstack/common/db/sqlalchemy/models.py @@ -26,18 +26,16 @@ from sqlalchemy import Column, Integer from sqlalchemy import DateTime from sqlalchemy.orm import object_mapper -from neutron.openstack.common.db.sqlalchemy import session as sa from neutron.openstack.common import timeutils -class ModelBase(object): +class ModelBase(six.Iterator): """Base class for models.""" __table_initialized__ = False - def save(self, session=None): + def save(self, session): """Save this object.""" - if not session: - session = sa.get_session() + # NOTE(boris-42): This part of code should be look like: # session.add(self) # session.flush() @@ -80,10 +78,14 @@ class ModelBase(object): self._i = iter(columns) return self - def next(self): + # In Python 3, __next__() has replaced next(). + def __next__(self): n = six.advance_iterator(self._i) return n, getattr(self, n) + def next(self): + return self.__next__() + def update(self, values): """Make the model object behave like a dict.""" for k, v in six.iteritems(values): @@ -110,7 +112,7 @@ class SoftDeleteMixin(object): deleted_at = Column(DateTime) deleted = Column(Integer, default=0) - def soft_delete(self, session=None): + def soft_delete(self, session): """Mark this object as deleted.""" self.deleted = self.id self.deleted_at = timeutils.utcnow() diff --git a/neutron/openstack/common/db/sqlalchemy/provision.py b/neutron/openstack/common/db/sqlalchemy/provision.py new file mode 100644 index 000000000..70c89a921 --- /dev/null +++ b/neutron/openstack/common/db/sqlalchemy/provision.py @@ -0,0 +1,157 @@ +# Copyright 2013 Mirantis.inc +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Provision test environment for specific DB backends""" + +import argparse +import logging +import os +import random +import string + +from six import moves +import sqlalchemy + +from neutron.openstack.common.db import exception as exc + + +LOG = logging.getLogger(__name__) + + +def get_engine(uri): + """Engine creation + + Call the function without arguments to get admin connection. Admin + connection required to create temporary user and database for each + particular test. Otherwise use existing connection to recreate connection + to the temporary database. + """ + return sqlalchemy.create_engine(uri, poolclass=sqlalchemy.pool.NullPool) + + +def _execute_sql(engine, sql, driver): + """Initialize connection, execute sql query and close it.""" + try: + with engine.connect() as conn: + if driver == 'postgresql': + conn.connection.set_isolation_level(0) + for s in sql: + conn.execute(s) + except sqlalchemy.exc.OperationalError: + msg = ('%s does not match database admin ' + 'credentials or database does not exist.') + LOG.exception(msg % engine.url) + raise exc.DBConnectionError(msg % engine.url) + + +def create_database(engine): + """Provide temporary user and database for each particular test.""" + driver = engine.name + + auth = { + 'database': ''.join(random.choice(string.ascii_lowercase) + for i in moves.range(10)), + 'user': engine.url.username, + 'passwd': engine.url.password, + } + + sqls = [ + "drop database if exists %(database)s;", + "create database %(database)s;" + ] + + if driver == 'sqlite': + return 'sqlite:////tmp/%s' % auth['database'] + elif driver in ['mysql', 'postgresql']: + sql_query = map(lambda x: x % auth, sqls) + _execute_sql(engine, sql_query, driver) + else: + raise ValueError('Unsupported RDBMS %s' % driver) + + params = auth.copy() + params['backend'] = driver + return "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" % params + + +def drop_database(admin_engine, current_uri): + """Drop temporary database and user after each particular test.""" + + engine = get_engine(current_uri) + driver = engine.name + auth = {'database': engine.url.database, 'user': engine.url.username} + + if driver == 'sqlite': + try: + os.remove(auth['database']) + except OSError: + pass + elif driver in ['mysql', 'postgresql']: + sql = "drop database if exists %(database)s;" + _execute_sql(admin_engine, [sql % auth], driver) + else: + raise ValueError('Unsupported RDBMS %s' % driver) + + +def main(): + """Controller to handle commands + + ::create: Create test user and database with random names. + ::drop: Drop user and database created by previous command. + """ + parser = argparse.ArgumentParser( + description='Controller to handle database creation and dropping' + ' commands.', + epilog='Under normal circumstances is not used directly.' + ' Used in .testr.conf to automate test database creation' + ' and dropping processes.') + subparsers = parser.add_subparsers( + help='Subcommands to manipulate temporary test databases.') + + create = subparsers.add_parser( + 'create', + help='Create temporary test ' + 'databases and users.') + create.set_defaults(which='create') + create.add_argument( + 'instances_count', + type=int, + help='Number of databases to create.') + + drop = subparsers.add_parser( + 'drop', + help='Drop temporary test databases and users.') + drop.set_defaults(which='drop') + drop.add_argument( + 'instances', + nargs='+', + help='List of databases uri to be dropped.') + + args = parser.parse_args() + + connection_string = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION', + 'sqlite://') + engine = get_engine(connection_string) + which = args.which + + if which == "create": + for i in range(int(args.instances_count)): + print(create_database(engine)) + elif which == "drop": + for db in args.instances: + drop_database(engine, db) + + +if __name__ == "__main__": + main() diff --git a/neutron/openstack/common/db/sqlalchemy/session.py b/neutron/openstack/common/db/sqlalchemy/session.py index 9e095f0d6..6592cb6bf 100644 --- a/neutron/openstack/common/db/sqlalchemy/session.py +++ b/neutron/openstack/common/db/sqlalchemy/session.py @@ -16,19 +16,6 @@ """Session Handling for SQLAlchemy backend. -Initializing: - -* Call `set_defaults()` with the minimal of the following kwargs: - ``sql_connection``, ``sqlite_db`` - - Example: - - .. code:: python - - session.set_defaults( - sql_connection="sqlite:///var/lib/neutron/sqlite.db", - sqlite_db="/var/lib/neutron/sqlite.db") - Recommended ways to use sessions within this framework: * Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``. @@ -87,7 +74,7 @@ Recommended ways to use sessions within this framework: .. code:: python def create_many_foo(context, foos): - session = get_session() + session = sessionmaker() with session.begin(): for foo in foos: foo_ref = models.Foo() @@ -95,7 +82,7 @@ Recommended ways to use sessions within this framework: session.add(foo_ref) def update_bar(context, foo_id, newbar): - session = get_session() + session = sessionmaker() with session.begin(): foo_ref = (model_query(context, models.Foo, session). filter_by(id=foo_id). @@ -142,7 +129,7 @@ Recommended ways to use sessions within this framework: foo1 = models.Foo() foo2 = models.Foo() foo1.id = foo2.id = 1 - session = get_session() + session = sessionmaker() try: with session.begin(): session.add(foo1) @@ -168,7 +155,7 @@ Recommended ways to use sessions within this framework: .. code:: python def myfunc(foo): - session = get_session() + session = sessionmaker() with session.begin(): # do some database things bar = _private_func(foo, session) @@ -176,7 +163,7 @@ Recommended ways to use sessions within this framework: def _private_func(foo, session=None): if not session: - session = get_session() + session = sessionmaker() with session.begin(subtransaction=True): # do some other database things return bar @@ -240,7 +227,7 @@ Efficient use of soft deletes: def complex_soft_delete_with_synchronization_bar(session=None): if session is None: - session = get_session() + session = sessionmaker() with session.begin(subtransactions=True): count = (model_query(BarModel). find(some_condition). @@ -257,7 +244,7 @@ Efficient use of soft deletes: .. code:: python def soft_delete_bar_model(): - session = get_session() + session = sessionmaker() with session.begin(): bar_ref = model_query(BarModel).find(some_condition).first() # Work with bar_ref @@ -269,7 +256,7 @@ Efficient use of soft deletes: .. code:: python def soft_delete_multi_models(): - session = get_session() + session = sessionmaker() with session.begin(): query = (model_query(BarModel, session=session). find(some_condition)) @@ -293,11 +280,9 @@ Efficient use of soft deletes: import functools import logging -import os.path import re import time -from oslo.config import cfg import six from sqlalchemy import exc as sqla_exc from sqlalchemy.interfaces import PoolListener @@ -306,150 +291,12 @@ from sqlalchemy.pool import NullPool, StaticPool from sqlalchemy.sql.expression import literal_column from neutron.openstack.common.db import exception -from neutron.openstack.common.gettextutils import _ +from neutron.openstack.common.gettextutils import _LE, _LW from neutron.openstack.common import timeutils -sqlite_db_opts = [ - cfg.StrOpt('sqlite_db', - default='neutron.sqlite', - help='The file name to use with SQLite'), - cfg.BoolOpt('sqlite_synchronous', - default=True, - help='If True, SQLite uses synchronous mode'), -] - -database_opts = [ - cfg.StrOpt('connection', - default='sqlite:///' + - os.path.abspath(os.path.join(os.path.dirname(__file__), - '../', '$sqlite_db')), - help='The SQLAlchemy connection string used to connect to the ' - 'database', - secret=True, - deprecated_opts=[cfg.DeprecatedOpt('sql_connection', - group='DEFAULT'), - cfg.DeprecatedOpt('sql_connection', - group='DATABASE'), - cfg.DeprecatedOpt('connection', - group='sql'), ]), - cfg.StrOpt('slave_connection', - default='', - secret=True, - help='The SQLAlchemy connection string used to connect to the ' - 'slave database'), - cfg.IntOpt('idle_timeout', - default=3600, - deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout', - group='DEFAULT'), - cfg.DeprecatedOpt('sql_idle_timeout', - group='DATABASE'), - cfg.DeprecatedOpt('idle_timeout', - group='sql')], - help='Timeout before idle sql connections are reaped'), - cfg.IntOpt('min_pool_size', - default=1, - deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size', - group='DEFAULT'), - cfg.DeprecatedOpt('sql_min_pool_size', - group='DATABASE')], - help='Minimum number of SQL connections to keep open in a ' - 'pool'), - cfg.IntOpt('max_pool_size', - default=None, - deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size', - group='DEFAULT'), - cfg.DeprecatedOpt('sql_max_pool_size', - group='DATABASE')], - help='Maximum number of SQL connections to keep open in a ' - 'pool'), - cfg.IntOpt('max_retries', - default=10, - deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries', - group='DEFAULT'), - cfg.DeprecatedOpt('sql_max_retries', - group='DATABASE')], - help='Maximum db connection retries during startup. ' - '(setting -1 implies an infinite retry count)'), - cfg.IntOpt('retry_interval', - default=10, - deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval', - group='DEFAULT'), - cfg.DeprecatedOpt('reconnect_interval', - group='DATABASE')], - help='Interval between retries of opening a sql connection'), - cfg.IntOpt('max_overflow', - default=None, - deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow', - group='DEFAULT'), - cfg.DeprecatedOpt('sqlalchemy_max_overflow', - group='DATABASE')], - help='If set, use this value for max_overflow with sqlalchemy'), - cfg.IntOpt('connection_debug', - default=0, - deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug', - group='DEFAULT')], - help='Verbosity of SQL debugging information. 0=None, ' - '100=Everything'), - cfg.BoolOpt('connection_trace', - default=False, - deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace', - group='DEFAULT')], - help='Add python stack traces to SQL as comment strings'), - cfg.IntOpt('pool_timeout', - default=None, - deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout', - group='DATABASE')], - help='If set, use this value for pool_timeout with sqlalchemy'), -] - -CONF = cfg.CONF -CONF.register_opts(sqlite_db_opts) -CONF.register_opts(database_opts, 'database') LOG = logging.getLogger(__name__) -_ENGINE = None -_MAKER = None -_SLAVE_ENGINE = None -_SLAVE_MAKER = None - - -def set_defaults(sql_connection, sqlite_db, max_pool_size=None, - max_overflow=None, pool_timeout=None): - """Set defaults for configuration variables.""" - cfg.set_defaults(database_opts, - connection=sql_connection) - cfg.set_defaults(sqlite_db_opts, - sqlite_db=sqlite_db) - # Update the QueuePool defaults - if max_pool_size is not None: - cfg.set_defaults(database_opts, - max_pool_size=max_pool_size) - if max_overflow is not None: - cfg.set_defaults(database_opts, - max_overflow=max_overflow) - if pool_timeout is not None: - cfg.set_defaults(database_opts, - pool_timeout=pool_timeout) - - -def cleanup(): - global _ENGINE, _MAKER - global _SLAVE_ENGINE, _SLAVE_MAKER - - if _MAKER: - _MAKER.close_all() - _MAKER = None - if _ENGINE: - _ENGINE.dispose() - _ENGINE = None - if _SLAVE_MAKER: - _SLAVE_MAKER.close_all() - _SLAVE_MAKER = None - if _SLAVE_ENGINE: - _SLAVE_ENGINE.dispose() - _SLAVE_ENGINE = None - class SqliteForeignKeysListener(PoolListener): """Ensures that the foreign key constraints are enforced in SQLite. @@ -462,30 +309,6 @@ class SqliteForeignKeysListener(PoolListener): dbapi_con.execute('pragma foreign_keys=ON') -def get_session(autocommit=True, expire_on_commit=False, sqlite_fk=False, - slave_session=False, mysql_traditional_mode=False): - """Return a SQLAlchemy session.""" - global _MAKER - global _SLAVE_MAKER - maker = _MAKER - - if slave_session: - maker = _SLAVE_MAKER - - if maker is None: - engine = get_engine(sqlite_fk=sqlite_fk, slave_engine=slave_session, - mysql_traditional_mode=mysql_traditional_mode) - maker = get_maker(engine, autocommit, expire_on_commit) - - if slave_session: - _SLAVE_MAKER = maker - else: - _MAKER = maker - - session = maker() - return session - - # note(boris-42): In current versions of DB backends unique constraint # violation messages follow the structure: # @@ -509,11 +332,20 @@ def get_session(autocommit=True, expire_on_commit=False, sqlite_fk=False, # 'c1'") # N columns - (IntegrityError) (1062, "Duplicate entry 'values joined # with -' for key 'name_of_our_constraint'") +# +# ibm_db_sa: +# N columns - (IntegrityError) SQL0803N One or more values in the INSERT +# statement, UPDATE statement, or foreign key update caused by a +# DELETE statement are not valid because the primary key, unique +# constraint or unique index identified by "2" constrains table +# "NOVA.KEY_PAIRS" from having duplicate values for the index +# key. _DUP_KEY_RE_DB = { "sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"), re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")), "postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),), - "mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),) + "mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),), + "ibm_db_sa": (re.compile(r"^.*SQL0803N.*$"),), } @@ -535,7 +367,7 @@ def _raise_if_duplicate_entry_error(integrity_error, engine_name): return [columns] return columns[len(uniqbase):].split("0")[1:] - if engine_name not in ["mysql", "sqlite", "postgresql"]: + if engine_name not in ["ibm_db_sa", "mysql", "sqlite", "postgresql"]: return # FIXME(johannes): The usage of the .message attribute has been @@ -550,7 +382,12 @@ def _raise_if_duplicate_entry_error(integrity_error, engine_name): else: return - columns = match.group(1) + # NOTE(mriedem): The ibm_db_sa integrity error message doesn't provide the + # columns so we have to omit that from the DBDuplicateEntry error. + columns = '' + + if engine_name != 'ibm_db_sa': + columns = match.group(1) if engine_name == "sqlite": columns = [c.split('.')[-1] for c in columns.strip().split(", ")] @@ -592,14 +429,19 @@ def _raise_if_deadlock_error(operational_error, engine_name): def _wrap_db_error(f): @functools.wraps(f) - def _wrap(*args, **kwargs): + def _wrap(self, *args, **kwargs): try: - return f(*args, **kwargs) + assert issubclass( + self.__class__, sqlalchemy.orm.session.Session + ), ('_wrap_db_error() can only be applied to methods of ' + 'subclasses of sqlalchemy.orm.session.Session.') + + return f(self, *args, **kwargs) except UnicodeEncodeError: raise exception.DBInvalidUnicodeParameter() except sqla_exc.OperationalError as e: - _raise_if_db_connection_lost(e, get_engine()) - _raise_if_deadlock_error(e, get_engine().name) + _raise_if_db_connection_lost(e, self.bind) + _raise_if_deadlock_error(e, self.bind.dialect.name) # NOTE(comstud): A lot of code is checking for OperationalError # so let's not wrap it for now. raise @@ -612,37 +454,14 @@ def _wrap_db_error(f): # instance_types) there are more than one unique constraint. This # means we should get names of columns, which values violate # unique constraint, from error message. - _raise_if_duplicate_entry_error(e, get_engine().name) + _raise_if_duplicate_entry_error(e, self.bind.dialect.name) raise exception.DBError(e) except Exception as e: - LOG.exception(_('DB exception wrapped.')) + LOG.exception(_LE('DB exception wrapped.')) raise exception.DBError(e) return _wrap -def get_engine(sqlite_fk=False, slave_engine=False, - mysql_traditional_mode=False): - """Return a SQLAlchemy engine.""" - global _ENGINE - global _SLAVE_ENGINE - engine = _ENGINE - db_uri = CONF.database.connection - - if slave_engine: - engine = _SLAVE_ENGINE - db_uri = CONF.database.slave_connection - - if engine is None: - engine = create_engine(db_uri, sqlite_fk=sqlite_fk, - mysql_traditional_mode=mysql_traditional_mode) - if slave_engine: - _SLAVE_ENGINE = engine - else: - _ENGINE = engine - - return engine - - def _synchronous_switch_listener(dbapi_conn, connection_rec): """Switch sqlite connections to non-synchronous mode.""" dbapi_conn.execute("PRAGMA synchronous = OFF") @@ -684,22 +503,78 @@ def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy): cursor.execute(ping_sql) except Exception as ex: if engine.dialect.is_disconnect(ex, dbapi_conn, cursor): - msg = _('Database server has gone away: %s') % ex + msg = _LW('Database server has gone away: %s') % ex LOG.warning(msg) + + # if the database server has gone away, all connections in the pool + # have become invalid and we can safely close all of them here, + # rather than waste time on checking of every single connection + engine.dispose() + + # this will be handled by SQLAlchemy and will force it to create + # a new connection and retry the original action raise sqla_exc.DisconnectionError(msg) else: raise -def _set_mode_traditional(dbapi_con, connection_rec, connection_proxy): - """Set engine mode to 'traditional'. +def _set_session_sql_mode(dbapi_con, connection_rec, sql_mode=None): + """Set the sql_mode session variable. + + MySQL supports several server modes. The default is None, but sessions + may choose to enable server modes like TRADITIONAL, ANSI, + several STRICT_* modes and others. + + Note: passing in '' (empty string) for sql_mode clears + the SQL mode for the session, overriding a potentially set + server default. + """ + + cursor = dbapi_con.cursor() + cursor.execute("SET SESSION sql_mode = %s", [sql_mode]) + + +def _mysql_get_effective_sql_mode(engine): + """Returns the effective SQL mode for connections from the engine pool. + + Returns ``None`` if the mode isn't available, otherwise returns the mode. - Required to prevent silent truncates at insert or update operations - under MySQL. By default MySQL truncates inserted string if it longer - than a declared field just with warning. That is fraught with data - corruption. """ - dbapi_con.cursor().execute("SET SESSION sql_mode = TRADITIONAL;") + # Get the real effective SQL mode. Even when unset by + # our own config, the server may still be operating in a specific + # SQL mode as set by the server configuration. + # Also note that the checkout listener will be called on execute to + # set the mode if it's registered. + row = engine.execute("SHOW VARIABLES LIKE 'sql_mode'").fetchone() + if row is None: + return + return row[1] + + +def _mysql_check_effective_sql_mode(engine): + """Logs a message based on the effective SQL mode for MySQL connections.""" + realmode = _mysql_get_effective_sql_mode(engine) + + if realmode is None: + LOG.warning(_LW('Unable to detect effective SQL mode')) + return + + LOG.debug('MySQL server mode set to %s', realmode) + # 'TRADITIONAL' mode enables several other modes, so + # we need a substring match here + if not ('TRADITIONAL' in realmode.upper() or + 'STRICT_ALL_TABLES' in realmode.upper()): + LOG.warning(_LW("MySQL SQL mode is '%s', " + "consider enabling TRADITIONAL or STRICT_ALL_TABLES"), + realmode) + + +def _mysql_set_mode_callback(engine, sql_mode): + if sql_mode is not None: + mode_callback = functools.partial(_set_session_sql_mode, + sql_mode=sql_mode) + sqlalchemy.event.listen(engine, 'connect', mode_callback) + _mysql_check_effective_sql_mode(engine) def _is_db_connection_error(args): @@ -726,69 +601,63 @@ def _raise_if_db_connection_lost(error, engine): raise exception.DBConnectionError(error) -def create_engine(sql_connection, sqlite_fk=False, - mysql_traditional_mode=False): +def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, + idle_timeout=3600, + connection_debug=0, max_pool_size=None, max_overflow=None, + pool_timeout=None, sqlite_synchronous=True, + connection_trace=False, max_retries=10, retry_interval=10): """Return a new SQLAlchemy engine.""" - # NOTE(geekinutah): At this point we could be connecting to the normal - # db handle or the slave db handle. Things like - # _wrap_db_error aren't going to work well if their - # backends don't match. Let's check. - _assert_matching_drivers() + connection_dict = sqlalchemy.engine.url.make_url(sql_connection) engine_args = { - "pool_recycle": CONF.database.idle_timeout, - "echo": False, + "pool_recycle": idle_timeout, 'convert_unicode': True, } - # Map our SQL debug level to SQLAlchemy's options - if CONF.database.connection_debug >= 100: - engine_args['echo'] = 'debug' - elif CONF.database.connection_debug >= 50: - engine_args['echo'] = True + logger = logging.getLogger('sqlalchemy.engine') + + # Map SQL debug level to Python log level + if connection_debug >= 100: + logger.setLevel(logging.DEBUG) + elif connection_debug >= 50: + logger.setLevel(logging.INFO) + else: + logger.setLevel(logging.WARNING) if "sqlite" in connection_dict.drivername: if sqlite_fk: engine_args["listeners"] = [SqliteForeignKeysListener()] engine_args["poolclass"] = NullPool - if CONF.database.connection == "sqlite://": + if sql_connection == "sqlite://": engine_args["poolclass"] = StaticPool engine_args["connect_args"] = {'check_same_thread': False} else: - if CONF.database.max_pool_size is not None: - engine_args['pool_size'] = CONF.database.max_pool_size - if CONF.database.max_overflow is not None: - engine_args['max_overflow'] = CONF.database.max_overflow - if CONF.database.pool_timeout is not None: - engine_args['pool_timeout'] = CONF.database.pool_timeout + if max_pool_size is not None: + engine_args['pool_size'] = max_pool_size + if max_overflow is not None: + engine_args['max_overflow'] = max_overflow + if pool_timeout is not None: + engine_args['pool_timeout'] = pool_timeout engine = sqlalchemy.create_engine(sql_connection, **engine_args) sqlalchemy.event.listen(engine, 'checkin', _thread_yield) if engine.name in ['mysql', 'ibm_db_sa']: - callback = functools.partial(_ping_listener, engine) - sqlalchemy.event.listen(engine, 'checkout', callback) + ping_callback = functools.partial(_ping_listener, engine) + sqlalchemy.event.listen(engine, 'checkout', ping_callback) if engine.name == 'mysql': - if mysql_traditional_mode: - sqlalchemy.event.listen(engine, 'checkout', - _set_mode_traditional) - else: - LOG.warning(_("This application has not enabled MySQL " - "traditional mode, which means silent " - "data corruption may occur. " - "Please encourage the application " - "developers to enable this mode.")) + if mysql_sql_mode: + _mysql_set_mode_callback(engine, mysql_sql_mode) elif 'sqlite' in connection_dict.drivername: - if not CONF.sqlite_synchronous: + if not sqlite_synchronous: sqlalchemy.event.listen(engine, 'connect', _synchronous_switch_listener) sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener) - if (CONF.database.connection_trace and - engine.dialect.dbapi.__name__ == 'MySQLdb'): + if connection_trace and engine.dialect.dbapi.__name__ == 'MySQLdb': _patch_mysqldb_with_stacktrace_comments() try: @@ -797,15 +666,15 @@ def create_engine(sql_connection, sqlite_fk=False, if not _is_db_connection_error(e.args[0]): raise - remaining = CONF.database.max_retries + remaining = max_retries if remaining == -1: remaining = 'infinite' while True: - msg = _('SQL connection failed. %s attempts left.') + msg = _LW('SQL connection failed. %s attempts left.') LOG.warning(msg % remaining) if remaining != 'infinite': remaining -= 1 - time.sleep(CONF.database.retry_interval) + time.sleep(retry_interval) try: engine.connect() break @@ -892,13 +761,144 @@ def _patch_mysqldb_with_stacktrace_comments(): setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query) -def _assert_matching_drivers(): - """Make sure slave handle and normal handle have the same driver.""" - # NOTE(geekinutah): There's no use case for writing to one backend and - # reading from another. Who knows what the future holds? - if CONF.database.slave_connection == '': - return +class EngineFacade(object): + """A helper class for removing of global engine instances from neutron.db. + + As a library, neutron.db can't decide where to store/when to create engine + and sessionmaker instances, so this must be left for a target application. + + On the other hand, in order to simplify the adoption of neutron.db changes, + we'll provide a helper class, which creates engine and sessionmaker + on its instantiation and provides get_engine()/get_session() methods + that are compatible with corresponding utility functions that currently + exist in target projects, e.g. in Nova. + + engine/sessionmaker instances will still be global (and they are meant to + be global), but they will be stored in the app context, rather that in the + neutron.db context. + + Note: using of this helper is completely optional and you are encouraged to + integrate engine/sessionmaker instances into your apps any way you like + (e.g. one might want to bind a session to a request context). Two important + things to remember: + + 1. An Engine instance is effectively a pool of DB connections, so it's + meant to be shared (and it's thread-safe). + 2. A Session instance is not meant to be shared and represents a DB + transactional context (i.e. it's not thread-safe). sessionmaker is + a factory of sessions. + + """ - normal = sqlalchemy.engine.url.make_url(CONF.database.connection) - slave = sqlalchemy.engine.url.make_url(CONF.database.slave_connection) - assert normal.drivername == slave.drivername + def __init__(self, sql_connection, + sqlite_fk=False, autocommit=True, + expire_on_commit=False, **kwargs): + """Initialize engine and sessionmaker instances. + + :param sqlite_fk: enable foreign keys in SQLite + :type sqlite_fk: bool + + :param autocommit: use autocommit mode for created Session instances + :type autocommit: bool + + :param expire_on_commit: expire session objects on commit + :type expire_on_commit: bool + + Keyword arguments: + + :keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions. + (defaults to TRADITIONAL) + :keyword idle_timeout: timeout before idle sql connections are reaped + (defaults to 3600) + :keyword connection_debug: verbosity of SQL debugging information. + 0=None, 100=Everything (defaults to 0) + :keyword max_pool_size: maximum number of SQL connections to keep open + in a pool (defaults to SQLAlchemy settings) + :keyword max_overflow: if set, use this value for max_overflow with + sqlalchemy (defaults to SQLAlchemy settings) + :keyword pool_timeout: if set, use this value for pool_timeout with + sqlalchemy (defaults to SQLAlchemy settings) + :keyword sqlite_synchronous: if True, SQLite uses synchronous mode + (defaults to True) + :keyword connection_trace: add python stack traces to SQL as comment + strings (defaults to False) + :keyword max_retries: maximum db connection retries during startup. + (setting -1 implies an infinite retry count) + (defaults to 10) + :keyword retry_interval: interval between retries of opening a sql + connection (defaults to 10) + + """ + + super(EngineFacade, self).__init__() + + self._engine = create_engine( + sql_connection=sql_connection, + sqlite_fk=sqlite_fk, + mysql_sql_mode=kwargs.get('mysql_sql_mode', 'TRADITIONAL'), + idle_timeout=kwargs.get('idle_timeout', 3600), + connection_debug=kwargs.get('connection_debug', 0), + max_pool_size=kwargs.get('max_pool_size'), + max_overflow=kwargs.get('max_overflow'), + pool_timeout=kwargs.get('pool_timeout'), + sqlite_synchronous=kwargs.get('sqlite_synchronous', True), + connection_trace=kwargs.get('connection_trace', False), + max_retries=kwargs.get('max_retries', 10), + retry_interval=kwargs.get('retry_interval', 10)) + self._session_maker = get_maker( + engine=self._engine, + autocommit=autocommit, + expire_on_commit=expire_on_commit) + + def get_engine(self): + """Get the engine instance (note, that it's shared).""" + + return self._engine + + def get_session(self, **kwargs): + """Get a Session instance. + + If passed, keyword arguments values override the ones used when the + sessionmaker instance was created. + + :keyword autocommit: use autocommit mode for created Session instances + :type autocommit: bool + + :keyword expire_on_commit: expire session objects on commit + :type expire_on_commit: bool + + """ + + for arg in kwargs: + if arg not in ('autocommit', 'expire_on_commit'): + del kwargs[arg] + + return self._session_maker(**kwargs) + + @classmethod + def from_config(cls, connection_string, conf, + sqlite_fk=False, autocommit=True, expire_on_commit=False): + """Initialize EngineFacade using oslo.config config instance options. + + :param connection_string: SQLAlchemy connection string + :type connection_string: string + + :param conf: oslo.config config instance + :type conf: oslo.config.cfg.ConfigOpts + + :param sqlite_fk: enable foreign keys in SQLite + :type sqlite_fk: bool + + :param autocommit: use autocommit mode for created Session instances + :type autocommit: bool + + :param expire_on_commit: expire session objects on commit + :type expire_on_commit: bool + + """ + + return cls(sql_connection=connection_string, + sqlite_fk=sqlite_fk, + autocommit=autocommit, + expire_on_commit=expire_on_commit, + **dict(conf.database.items())) diff --git a/neutron/openstack/common/db/sqlalchemy/test_base.py b/neutron/openstack/common/db/sqlalchemy/test_base.py new file mode 100644 index 000000000..e6e047fbf --- /dev/null +++ b/neutron/openstack/common/db/sqlalchemy/test_base.py @@ -0,0 +1,153 @@ +# Copyright (c) 2013 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import functools +import os + +import fixtures +import six + +from neutron.openstack.common.db.sqlalchemy import session +from neutron.openstack.common.db.sqlalchemy import utils +from neutron.openstack.common.fixture import lockutils +from neutron.openstack.common import test + + +class DbFixture(fixtures.Fixture): + """Basic database fixture. + + Allows to run tests on various db backends, such as SQLite, MySQL and + PostgreSQL. By default use sqlite backend. To override default backend + uri set env variable OS_TEST_DBAPI_CONNECTION with database admin + credentials for specific backend. + """ + + def _get_uri(self): + return os.getenv('OS_TEST_DBAPI_CONNECTION', 'sqlite://') + + def __init__(self, test): + super(DbFixture, self).__init__() + + self.test = test + + def setUp(self): + super(DbFixture, self).setUp() + + self.test.engine = session.create_engine(self._get_uri()) + self.test.sessionmaker = session.get_maker(self.test.engine) + self.addCleanup(self.test.engine.dispose) + + +class DbTestCase(test.BaseTestCase): + """Base class for testing of DB code. + + Using `DbFixture`. Intended to be the main database test case to use all + the tests on a given backend with user defined uri. Backend specific + tests should be decorated with `backend_specific` decorator. + """ + + FIXTURE = DbFixture + + def setUp(self): + super(DbTestCase, self).setUp() + self.useFixture(self.FIXTURE(self)) + + +ALLOWED_DIALECTS = ['sqlite', 'mysql', 'postgresql'] + + +def backend_specific(*dialects): + """Decorator to skip backend specific tests on inappropriate engines. + + ::dialects: list of dialects names under which the test will be launched. + """ + def wrap(f): + @functools.wraps(f) + def ins_wrap(self): + if not set(dialects).issubset(ALLOWED_DIALECTS): + raise ValueError( + "Please use allowed dialects: %s" % ALLOWED_DIALECTS) + if self.engine.name not in dialects: + msg = ('The test "%s" can be run ' + 'only on %s. Current engine is %s.') + args = (f.__name__, ' '.join(dialects), self.engine.name) + self.skip(msg % args) + else: + return f(self) + return ins_wrap + return wrap + + +@six.add_metaclass(abc.ABCMeta) +class OpportunisticFixture(DbFixture): + """Base fixture to use default CI databases. + + The databases exist in OpenStack CI infrastructure. But for the + correct functioning in local environment the databases must be + created manually. + """ + + DRIVER = abc.abstractproperty(lambda: None) + DBNAME = PASSWORD = USERNAME = 'openstack_citest' + + def _get_uri(self): + return utils.get_connect_string(backend=self.DRIVER, + user=self.USERNAME, + passwd=self.PASSWORD, + database=self.DBNAME) + + +@six.add_metaclass(abc.ABCMeta) +class OpportunisticTestCase(DbTestCase): + """Base test case to use default CI databases. + + The subclasses of the test case are running only when openstack_citest + database is available otherwise a tests will be skipped. + """ + + FIXTURE = abc.abstractproperty(lambda: None) + + def setUp(self): + # TODO(bnemec): Remove this once infra is ready for + # https://review.openstack.org/#/c/74963/ to merge. + self.useFixture(lockutils.LockFixture('opportunistic-db')) + credentials = { + 'backend': self.FIXTURE.DRIVER, + 'user': self.FIXTURE.USERNAME, + 'passwd': self.FIXTURE.PASSWORD, + 'database': self.FIXTURE.DBNAME} + + if self.FIXTURE.DRIVER and not utils.is_backend_avail(**credentials): + msg = '%s backend is not available.' % self.FIXTURE.DRIVER + return self.skip(msg) + + super(OpportunisticTestCase, self).setUp() + + +class MySQLOpportunisticFixture(OpportunisticFixture): + DRIVER = 'mysql' + + +class PostgreSQLOpportunisticFixture(OpportunisticFixture): + DRIVER = 'postgresql' + + +class MySQLOpportunisticTestCase(OpportunisticTestCase): + FIXTURE = MySQLOpportunisticFixture + + +class PostgreSQLOpportunisticTestCase(OpportunisticTestCase): + FIXTURE = PostgreSQLOpportunisticFixture diff --git a/neutron/openstack/common/db/sqlalchemy/utils.py b/neutron/openstack/common/db/sqlalchemy/utils.py index ff35a1276..878431773 100644 --- a/neutron/openstack/common/db/sqlalchemy/utils.py +++ b/neutron/openstack/common/db/sqlalchemy/utils.py @@ -19,7 +19,6 @@ import logging import re -from migrate.changeset import UniqueConstraint import sqlalchemy from sqlalchemy import Boolean from sqlalchemy import CheckConstraint @@ -30,14 +29,16 @@ from sqlalchemy import func from sqlalchemy import Index from sqlalchemy import Integer from sqlalchemy import MetaData +from sqlalchemy import or_ from sqlalchemy.sql.expression import literal_column from sqlalchemy.sql.expression import UpdateBase -from sqlalchemy.sql import select from sqlalchemy import String from sqlalchemy import Table from sqlalchemy.types import NullType -from neutron.openstack.common.gettextutils import _ +from neutron.openstack.common import context as request_context +from neutron.openstack.common.db.sqlalchemy import models +from neutron.openstack.common.gettextutils import _, _LI, _LW from neutron.openstack.common import timeutils @@ -93,7 +94,7 @@ def paginate_query(query, model, limit, sort_keys, marker=None, if 'id' not in sort_keys: # TODO(justinsb): If this ever gives a false-positive, check # the actual primary key, rather than assuming its id - LOG.warning(_('Id not in sort_keys; is sort_keys unique?')) + LOG.warning(_LW('Id not in sort_keys; is sort_keys unique?')) assert(not (sort_dir and sort_dirs)) @@ -156,6 +157,98 @@ def paginate_query(query, model, limit, sort_keys, marker=None, return query +def _read_deleted_filter(query, db_model, read_deleted): + if 'deleted' not in db_model.__table__.columns: + raise ValueError(_("There is no `deleted` column in `%s` table. " + "Project doesn't use soft-deleted feature.") + % db_model.__name__) + + default_deleted_value = db_model.__table__.c.deleted.default.arg + if read_deleted == 'no': + query = query.filter(db_model.deleted == default_deleted_value) + elif read_deleted == 'yes': + pass # omit the filter to include deleted and active + elif read_deleted == 'only': + query = query.filter(db_model.deleted != default_deleted_value) + else: + raise ValueError(_("Unrecognized read_deleted value '%s'") + % read_deleted) + return query + + +def _project_filter(query, db_model, context, project_only): + if project_only and 'project_id' not in db_model.__table__.columns: + raise ValueError(_("There is no `project_id` column in `%s` table.") + % db_model.__name__) + + if request_context.is_user_context(context) and project_only: + if project_only == 'allow_none': + is_none = None + query = query.filter(or_(db_model.project_id == context.project_id, + db_model.project_id == is_none)) + else: + query = query.filter(db_model.project_id == context.project_id) + + return query + + +def model_query(context, model, session, args=None, project_only=False, + read_deleted=None): + """Query helper that accounts for context's `read_deleted` field. + + :param context: context to query under + + :param model: Model to query. Must be a subclass of ModelBase. + :type model: models.ModelBase + + :param session: The session to use. + :type session: sqlalchemy.orm.session.Session + + :param args: Arguments to query. If None - model is used. + :type args: tuple + + :param project_only: If present and context is user-type, then restrict + query to match the context's project_id. If set to + 'allow_none', restriction includes project_id = None. + :type project_only: bool + + :param read_deleted: If present, overrides context's read_deleted field. + :type read_deleted: bool + + Usage: + + ..code:: python + + result = (utils.model_query(context, models.Instance, session=session) + .filter_by(uuid=instance_uuid) + .all()) + + query = utils.model_query( + context, Node, + session=session, + args=(func.count(Node.id), func.sum(Node.ram)) + ).filter_by(project_id=project_id) + + """ + + if not read_deleted: + if hasattr(context, 'read_deleted'): + # NOTE(viktors): some projects use `read_deleted` attribute in + # their contexts instead of `show_deleted`. + read_deleted = context.read_deleted + else: + read_deleted = context.show_deleted + + if not issubclass(model, models.ModelBase): + raise TypeError(_("model should be a subclass of ModelBase")) + + query = session.query(model) if not args else session.query(*args) + query = _read_deleted_filter(query, model, read_deleted) + query = _project_filter(query, model, context, project_only) + + return query + + def get_table(engine, name): """Returns an sqlalchemy table dynamically from db. @@ -207,6 +300,10 @@ def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns, **col_name_col_instance): """Drop unique constraint from table. + DEPRECATED: this function is deprecated and will be removed from neutron.db + in a few releases. Please use UniqueConstraint.drop() method directly for + sqlalchemy-migrate migration scripts. + This method drops UC from table and works for mysql, postgresql and sqlite. In mysql and postgresql we are able to use "alter table" construction. Sqlalchemy doesn't support some sqlite column types and replaces their @@ -223,6 +320,8 @@ def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns, types by sqlite. For example BigInteger. """ + from migrate.changeset import UniqueConstraint + meta = MetaData() meta.bind = migrate_engine t = Table(table_name, meta, autoload=True) @@ -262,9 +361,9 @@ def drop_old_duplicate_entries_from_table(migrate_engine, table_name, columns_for_select = [func.max(table.c.id)] columns_for_select.extend(columns_for_group_by) - duplicated_rows_select = select(columns_for_select, - group_by=columns_for_group_by, - having=func.count(table.c.id) > 1) + duplicated_rows_select = sqlalchemy.sql.select( + columns_for_select, group_by=columns_for_group_by, + having=func.count(table.c.id) > 1) for row in migrate_engine.execute(duplicated_rows_select): # NOTE(boris-42): Do not remove row that has the biggest ID. @@ -274,10 +373,11 @@ def drop_old_duplicate_entries_from_table(migrate_engine, table_name, for name in uc_column_names: delete_condition &= table.c[name] == row[name] - rows_to_delete_select = select([table.c.id]).where(delete_condition) + rows_to_delete_select = sqlalchemy.sql.select( + [table.c.id]).where(delete_condition) for row in migrate_engine.execute(rows_to_delete_select).fetchall(): - LOG.info(_("Deleting duplicated row with id: %(id)s from table: " - "%(table)s") % dict(id=row[0], table=table_name)) + LOG.info(_LI("Deleting duplicated row with id: %(id)s from table: " + "%(table)s") % dict(id=row[0], table=table_name)) if use_soft_delete: delete_statement = table.update().\ @@ -385,7 +485,7 @@ def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name, else: c_select.append(table.c.deleted == table.c.id) - ins = InsertFromSelect(new_table, select(c_select)) + ins = InsertFromSelect(new_table, sqlalchemy.sql.select(c_select)) migrate_engine.execute(ins) table.drop() diff --git a/neutron/service.py b/neutron/service.py index 8e4d5f514..a3224545b 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -23,8 +23,8 @@ from oslo.config import cfg from neutron.common import config from neutron import context +from neutron.db import api as session from neutron import manager -from neutron.openstack.common.db.sqlalchemy import session from neutron.openstack.common import excutils from neutron.openstack.common import importutils from neutron.openstack.common import log as logging @@ -118,7 +118,7 @@ class RpcWorker(object): # We may have just forked from parent process. A quick disposal of the # existing sql connections avoids producing errors later when they are # discovered to be broken. - session.get_engine(sqlite_fk=True).pool.dispose() + session.get_engine().pool.dispose() self._server = self._plugin.start_rpc_listener() def wait(self): -- 2.45.2