from neutron.api.v2 import attributes
from neutron.common import utils
-from neutron.openstack.common.db.sqlalchemy import session as db_session
+from neutron.openstack.common.db import options as db_options
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.version import version_info as neutron_version
_SQL_CONNECTION_DEFAULT = 'sqlite://'
# Update the default QueuePool parameters. These can be tweaked by the
# configuration variables - max_pool_size, max_overflow and pool_timeout
-db_session.set_defaults(sql_connection=_SQL_CONNECTION_DEFAULT,
+db_options.set_defaults(sql_connection=_SQL_CONNECTION_DEFAULT,
sqlite_db='', max_pool_size=10,
max_overflow=20, pool_timeout=10)
# License for the specific language governing permissions and limitations
# under the License.
+from oslo.config import cfg
import sqlalchemy as sql
from neutron.db import model_base
BASE = model_base.BASEV2
+cfg.CONF.import_opt('connection',
+ 'neutron.openstack.common.db.options',
+ group='database')
+
+_FACADE = None
+
+
+def _create_facade_lazily():
+ global _FACADE
+
+ if _FACADE is None:
+ _FACADE = session.EngineFacade.from_config(
+ cfg.CONF.database.connection, cfg.CONF, sqlite_fk=True)
+
+ return _FACADE
+
def configure_db():
"""Configure database.
Establish the database, create an engine if needed, and register
the models.
"""
- session.get_engine(sqlite_fk=True)
register_models()
def clear_db(base=BASE):
unregister_models(base)
- session.cleanup()
+
+
+def get_engine():
+ """Helper method to grab engine."""
+ facade = _create_facade_lazily()
+ return facade.get_engine()
def get_session(autocommit=True, expire_on_commit=False):
"""Helper method to grab session."""
- return session.get_session(autocommit=autocommit,
- expire_on_commit=expire_on_commit,
- sqlite_fk=True)
+ facade = _create_facade_lazily()
+ return facade.get_session(autocommit=autocommit,
+ expire_on_commit=expire_on_commit)
def register_models(base=BASE):
"""Register Models and create properties."""
try:
- engine = session.get_engine(sqlite_fk=True)
+ facade = _create_facade_lazily()
+ engine = facade.get_engine()
base.metadata.create_all(engine)
except sql.exc.OperationalError as e:
LOG.info(_("Database registration exception: %s"), e)
def unregister_models(base=BASE):
"""Unregister Models, useful clearing out data before testing."""
try:
- engine = session.get_engine(sqlite_fk=True)
+ facade = _create_facade_lazily()
+ engine = facade.get_engine()
base.metadata.drop_all(engine)
except Exception:
LOG.exception(_("Database exception"))
"""Multiple DB API backend support.
-Supported configuration options:
-
-The following two parameters are in the 'database' group:
-`backend`: DB backend name or full module path to DB backend module.
-
A DB backend module should implement a method named 'get_backend' which
takes no arguments. The method can return any object that implements DB
API methods.
import functools
import logging
+import threading
import time
-from oslo.config import cfg
-
from neutron.openstack.common.db import exception
-from neutron.openstack.common.gettextutils import _ # noqa
+from neutron.openstack.common.gettextutils import _LE
from neutron.openstack.common import importutils
-db_opts = [
- cfg.StrOpt('backend',
- default='sqlalchemy',
- deprecated_name='db_backend',
- deprecated_group='DEFAULT',
- help='The backend to use for db'),
- cfg.BoolOpt('use_db_reconnect',
- default=False,
- help='Enable the experimental use of database reconnect '
- 'on connection lost'),
- cfg.IntOpt('db_retry_interval',
- default=1,
- help='seconds between db connection retries'),
- cfg.BoolOpt('db_inc_retry_interval',
- default=True,
- help='Whether to increase interval between db connection '
- 'retries, up to db_max_retry_interval'),
- cfg.IntOpt('db_max_retry_interval',
- default=10,
- help='max seconds between db connection retries, if '
- 'db_inc_retry_interval is enabled'),
- cfg.IntOpt('db_max_retries',
- default=20,
- help='maximum db connection retries before error is raised. '
- '(setting -1 implies an infinite retry count)'),
-]
-
-CONF = cfg.CONF
-CONF.register_opts(db_opts, 'database')
-
LOG = logging.getLogger(__name__)
return f
-def _wrap_db_retry(f):
+class wrap_db_retry(object):
"""Retry db.api methods, if DBConnectionError() raised
Retry decorated db.api methods. If we enabled `use_db_reconnect`
Decorator catchs DBConnectionError() and retries function in a
loop until it succeeds, or until maximum retries count will be reached.
"""
- @functools.wraps(f)
- def wrapper(*args, **kwargs):
- next_interval = CONF.database.db_retry_interval
- remaining = CONF.database.db_max_retries
-
- while True:
- try:
- return f(*args, **kwargs)
- except exception.DBConnectionError as e:
- if remaining == 0:
- LOG.exception(_('DB exceeded retry limit.'))
- raise exception.DBError(e)
- if remaining != -1:
- remaining -= 1
- LOG.exception(_('DB connection error.'))
- # NOTE(vsergeyev): We are using patched time module, so this
- # effectively yields the execution context to
- # another green thread.
- time.sleep(next_interval)
- if CONF.database.db_inc_retry_interval:
- next_interval = min(
- next_interval * 2,
- CONF.database.db_max_retry_interval
- )
- return wrapper
+
+ def __init__(self, retry_interval, max_retries, inc_retry_interval,
+ max_retry_interval):
+ super(wrap_db_retry, self).__init__()
+
+ self.retry_interval = retry_interval
+ self.max_retries = max_retries
+ self.inc_retry_interval = inc_retry_interval
+ self.max_retry_interval = max_retry_interval
+
+ def __call__(self, f):
+ @functools.wraps(f)
+ def wrapper(*args, **kwargs):
+ next_interval = self.retry_interval
+ remaining = self.max_retries
+
+ while True:
+ try:
+ return f(*args, **kwargs)
+ except exception.DBConnectionError as e:
+ if remaining == 0:
+ LOG.exception(_LE('DB exceeded retry limit.'))
+ raise exception.DBError(e)
+ if remaining != -1:
+ remaining -= 1
+ LOG.exception(_LE('DB connection error.'))
+ # NOTE(vsergeyev): We are using patched time module, so
+ # this effectively yields the execution
+ # context to another green thread.
+ time.sleep(next_interval)
+ if self.inc_retry_interval:
+ next_interval = min(
+ next_interval * 2,
+ self.max_retry_interval
+ )
+ return wrapper
class DBAPI(object):
- def __init__(self, backend_mapping=None):
- if backend_mapping is None:
- backend_mapping = {}
- backend_name = CONF.database.backend
- # Import the untranslated name if we don't have a
- # mapping.
- backend_path = backend_mapping.get(backend_name, backend_name)
- backend_mod = importutils.import_module(backend_path)
- self.__backend = backend_mod.get_backend()
+ def __init__(self, backend_name, backend_mapping=None, lazy=False,
+ **kwargs):
+ """Initialize the chosen DB API backend.
+
+ :param backend_name: name of the backend to load
+ :type backend_name: str
+
+ :param backend_mapping: backend name -> module/class to load mapping
+ :type backend_mapping: dict
+
+ :param lazy: load the DB backend lazily on the first DB API method call
+ :type lazy: bool
+
+ Keyword arguments:
+
+ :keyword use_db_reconnect: retry DB transactions on disconnect or not
+ :type use_db_reconnect: bool
+
+ :keyword retry_interval: seconds between transaction retries
+ :type retry_interval: int
+
+ :keyword inc_retry_interval: increase retry interval or not
+ :type inc_retry_interval: bool
+
+ :keyword max_retry_interval: max interval value between retries
+ :type max_retry_interval: int
+
+ :keyword max_retries: max number of retries before an error is raised
+ :type max_retries: int
+
+ """
+
+ self._backend = None
+ self._backend_name = backend_name
+ self._backend_mapping = backend_mapping or {}
+ self._lock = threading.Lock()
+
+ if not lazy:
+ self._load_backend()
+
+ self.use_db_reconnect = kwargs.get('use_db_reconnect', False)
+ self.retry_interval = kwargs.get('retry_interval', 1)
+ self.inc_retry_interval = kwargs.get('inc_retry_interval', True)
+ self.max_retry_interval = kwargs.get('max_retry_interval', 10)
+ self.max_retries = kwargs.get('max_retries', 20)
+
+ def _load_backend(self):
+ with self._lock:
+ if not self._backend:
+ # Import the untranslated name if we don't have a mapping
+ backend_path = self._backend_mapping.get(self._backend_name,
+ self._backend_name)
+ backend_mod = importutils.import_module(backend_path)
+ self._backend = backend_mod.get_backend()
def __getattr__(self, key):
- attr = getattr(self.__backend, key)
+ if not self._backend:
+ self._load_backend()
+ attr = getattr(self._backend, key)
if not hasattr(attr, '__call__'):
return attr
# NOTE(vsergeyev): If `use_db_reconnect` option is set to True, retry
# DB API methods, decorated with @safe_for_db_retry
# on disconnect.
- if CONF.database.use_db_reconnect and hasattr(attr, 'enable_retry'):
- attr = _wrap_db_retry(attr)
+ if self.use_db_reconnect and hasattr(attr, 'enable_retry'):
+ attr = wrap_db_retry(
+ retry_interval=self.retry_interval,
+ max_retries=self.max_retries,
+ inc_retry_interval=self.inc_retry_interval,
+ max_retry_interval=self.max_retry_interval)(attr)
return attr
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+
+from oslo.config import cfg
+
+
+database_opts = [
+ cfg.StrOpt('sqlite_db',
+ deprecated_group='DEFAULT',
+ default='neutron.sqlite',
+ help='The file name to use with SQLite'),
+ cfg.BoolOpt('sqlite_synchronous',
+ deprecated_group='DEFAULT',
+ default=True,
+ help='If True, SQLite uses synchronous mode'),
+ cfg.StrOpt('backend',
+ default='sqlalchemy',
+ deprecated_name='db_backend',
+ deprecated_group='DEFAULT',
+ help='The backend to use for db'),
+ cfg.StrOpt('connection',
+ help='The SQLAlchemy connection string used to connect to the '
+ 'database',
+ secret=True,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('sql_connection',
+ group='DATABASE'),
+ cfg.DeprecatedOpt('connection',
+ group='sql'), ]),
+ cfg.StrOpt('mysql_sql_mode',
+ default='TRADITIONAL',
+ help='The SQL mode to be used for MySQL sessions. '
+ 'This option, including the default, overrides any '
+ 'server-set SQL mode. To use whatever SQL mode '
+ 'is set by the server configuration, '
+ 'set this to no value. Example: mysql_sql_mode='),
+ cfg.IntOpt('idle_timeout',
+ default=3600,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('sql_idle_timeout',
+ group='DATABASE'),
+ cfg.DeprecatedOpt('idle_timeout',
+ group='sql')],
+ help='Timeout before idle sql connections are reaped'),
+ cfg.IntOpt('min_pool_size',
+ default=1,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('sql_min_pool_size',
+ group='DATABASE')],
+ help='Minimum number of SQL connections to keep open in a '
+ 'pool'),
+ cfg.IntOpt('max_pool_size',
+ default=None,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('sql_max_pool_size',
+ group='DATABASE')],
+ help='Maximum number of SQL connections to keep open in a '
+ 'pool'),
+ cfg.IntOpt('max_retries',
+ default=10,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('sql_max_retries',
+ group='DATABASE')],
+ help='Maximum db connection retries during startup. '
+ '(setting -1 implies an infinite retry count)'),
+ cfg.IntOpt('retry_interval',
+ default=10,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('reconnect_interval',
+ group='DATABASE')],
+ help='Interval between retries of opening a sql connection'),
+ cfg.IntOpt('max_overflow',
+ default=None,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow',
+ group='DEFAULT'),
+ cfg.DeprecatedOpt('sqlalchemy_max_overflow',
+ group='DATABASE')],
+ help='If set, use this value for max_overflow with sqlalchemy'),
+ cfg.IntOpt('connection_debug',
+ default=0,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug',
+ group='DEFAULT')],
+ help='Verbosity of SQL debugging information. 0=None, '
+ '100=Everything'),
+ cfg.BoolOpt('connection_trace',
+ default=False,
+ deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace',
+ group='DEFAULT')],
+ help='Add python stack traces to SQL as comment strings'),
+ cfg.IntOpt('pool_timeout',
+ default=None,
+ deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout',
+ group='DATABASE')],
+ help='If set, use this value for pool_timeout with sqlalchemy'),
+ cfg.BoolOpt('use_db_reconnect',
+ default=False,
+ help='Enable the experimental use of database reconnect '
+ 'on connection lost'),
+ cfg.IntOpt('db_retry_interval',
+ default=1,
+ help='seconds between db connection retries'),
+ cfg.BoolOpt('db_inc_retry_interval',
+ default=True,
+ help='Whether to increase interval between db connection '
+ 'retries, up to db_max_retry_interval'),
+ cfg.IntOpt('db_max_retry_interval',
+ default=10,
+ help='max seconds between db connection retries, if '
+ 'db_inc_retry_interval is enabled'),
+ cfg.IntOpt('db_max_retries',
+ default=20,
+ help='maximum db connection retries before error is raised. '
+ '(setting -1 implies an infinite retry count)'),
+]
+
+CONF = cfg.CONF
+CONF.register_opts(database_opts, 'database')
+
+
+def set_defaults(sql_connection, sqlite_db, max_pool_size=None,
+ max_overflow=None, pool_timeout=None):
+ """Set defaults for configuration variables."""
+ cfg.set_defaults(database_opts,
+ connection=sql_connection,
+ sqlite_db=sqlite_db)
+ # Update the QueuePool defaults
+ if max_pool_size is not None:
+ cfg.set_defaults(database_opts,
+ max_pool_size=max_pool_size)
+ if max_overflow is not None:
+ cfg.set_defaults(database_opts,
+ max_overflow=max_overflow)
+ if pool_timeout is not None:
+ cfg.set_defaults(database_opts,
+ pool_timeout=pool_timeout)
+
+
+def list_opts():
+ """Returns a list of oslo.config options available in the library.
+
+ The returned list includes all oslo.config options which may be registered
+ at runtime by the library.
+
+ Each element of the list is a tuple. The first element is the name of the
+ group under which the list of elements in the second element will be
+ registered. A group name of None corresponds to the [DEFAULT] group in
+ config files.
+
+ The purpose of this is to allow tools like the Oslo sample config file
+ generator to discover the options exposed to users by this library.
+
+ :returns: a list of (group_name, opts) tuples
+ """
+ return [('database', copy.deepcopy(database_opts))]
from sqlalchemy import DateTime
from sqlalchemy.orm import object_mapper
-from neutron.openstack.common.db.sqlalchemy import session as sa
from neutron.openstack.common import timeutils
-class ModelBase(object):
+class ModelBase(six.Iterator):
"""Base class for models."""
__table_initialized__ = False
- def save(self, session=None):
+ def save(self, session):
"""Save this object."""
- if not session:
- session = sa.get_session()
+
# NOTE(boris-42): This part of code should be look like:
# session.add(self)
# session.flush()
self._i = iter(columns)
return self
- def next(self):
+ # In Python 3, __next__() has replaced next().
+ def __next__(self):
n = six.advance_iterator(self._i)
return n, getattr(self, n)
+ def next(self):
+ return self.__next__()
+
def update(self, values):
"""Make the model object behave like a dict."""
for k, v in six.iteritems(values):
deleted_at = Column(DateTime)
deleted = Column(Integer, default=0)
- def soft_delete(self, session=None):
+ def soft_delete(self, session):
"""Mark this object as deleted."""
self.deleted = self.id
self.deleted_at = timeutils.utcnow()
--- /dev/null
+# Copyright 2013 Mirantis.inc
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""Provision test environment for specific DB backends"""
+
+import argparse
+import logging
+import os
+import random
+import string
+
+from six import moves
+import sqlalchemy
+
+from neutron.openstack.common.db import exception as exc
+
+
+LOG = logging.getLogger(__name__)
+
+
+def get_engine(uri):
+ """Engine creation
+
+ Call the function without arguments to get admin connection. Admin
+ connection required to create temporary user and database for each
+ particular test. Otherwise use existing connection to recreate connection
+ to the temporary database.
+ """
+ return sqlalchemy.create_engine(uri, poolclass=sqlalchemy.pool.NullPool)
+
+
+def _execute_sql(engine, sql, driver):
+ """Initialize connection, execute sql query and close it."""
+ try:
+ with engine.connect() as conn:
+ if driver == 'postgresql':
+ conn.connection.set_isolation_level(0)
+ for s in sql:
+ conn.execute(s)
+ except sqlalchemy.exc.OperationalError:
+ msg = ('%s does not match database admin '
+ 'credentials or database does not exist.')
+ LOG.exception(msg % engine.url)
+ raise exc.DBConnectionError(msg % engine.url)
+
+
+def create_database(engine):
+ """Provide temporary user and database for each particular test."""
+ driver = engine.name
+
+ auth = {
+ 'database': ''.join(random.choice(string.ascii_lowercase)
+ for i in moves.range(10)),
+ 'user': engine.url.username,
+ 'passwd': engine.url.password,
+ }
+
+ sqls = [
+ "drop database if exists %(database)s;",
+ "create database %(database)s;"
+ ]
+
+ if driver == 'sqlite':
+ return 'sqlite:////tmp/%s' % auth['database']
+ elif driver in ['mysql', 'postgresql']:
+ sql_query = map(lambda x: x % auth, sqls)
+ _execute_sql(engine, sql_query, driver)
+ else:
+ raise ValueError('Unsupported RDBMS %s' % driver)
+
+ params = auth.copy()
+ params['backend'] = driver
+ return "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" % params
+
+
+def drop_database(admin_engine, current_uri):
+ """Drop temporary database and user after each particular test."""
+
+ engine = get_engine(current_uri)
+ driver = engine.name
+ auth = {'database': engine.url.database, 'user': engine.url.username}
+
+ if driver == 'sqlite':
+ try:
+ os.remove(auth['database'])
+ except OSError:
+ pass
+ elif driver in ['mysql', 'postgresql']:
+ sql = "drop database if exists %(database)s;"
+ _execute_sql(admin_engine, [sql % auth], driver)
+ else:
+ raise ValueError('Unsupported RDBMS %s' % driver)
+
+
+def main():
+ """Controller to handle commands
+
+ ::create: Create test user and database with random names.
+ ::drop: Drop user and database created by previous command.
+ """
+ parser = argparse.ArgumentParser(
+ description='Controller to handle database creation and dropping'
+ ' commands.',
+ epilog='Under normal circumstances is not used directly.'
+ ' Used in .testr.conf to automate test database creation'
+ ' and dropping processes.')
+ subparsers = parser.add_subparsers(
+ help='Subcommands to manipulate temporary test databases.')
+
+ create = subparsers.add_parser(
+ 'create',
+ help='Create temporary test '
+ 'databases and users.')
+ create.set_defaults(which='create')
+ create.add_argument(
+ 'instances_count',
+ type=int,
+ help='Number of databases to create.')
+
+ drop = subparsers.add_parser(
+ 'drop',
+ help='Drop temporary test databases and users.')
+ drop.set_defaults(which='drop')
+ drop.add_argument(
+ 'instances',
+ nargs='+',
+ help='List of databases uri to be dropped.')
+
+ args = parser.parse_args()
+
+ connection_string = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION',
+ 'sqlite://')
+ engine = get_engine(connection_string)
+ which = args.which
+
+ if which == "create":
+ for i in range(int(args.instances_count)):
+ print(create_database(engine))
+ elif which == "drop":
+ for db in args.instances:
+ drop_database(engine, db)
+
+
+if __name__ == "__main__":
+ main()
"""Session Handling for SQLAlchemy backend.
-Initializing:
-
-* Call `set_defaults()` with the minimal of the following kwargs:
- ``sql_connection``, ``sqlite_db``
-
- Example:
-
- .. code:: python
-
- session.set_defaults(
- sql_connection="sqlite:///var/lib/neutron/sqlite.db",
- sqlite_db="/var/lib/neutron/sqlite.db")
-
Recommended ways to use sessions within this framework:
* Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``.
.. code:: python
def create_many_foo(context, foos):
- session = get_session()
+ session = sessionmaker()
with session.begin():
for foo in foos:
foo_ref = models.Foo()
session.add(foo_ref)
def update_bar(context, foo_id, newbar):
- session = get_session()
+ session = sessionmaker()
with session.begin():
foo_ref = (model_query(context, models.Foo, session).
filter_by(id=foo_id).
foo1 = models.Foo()
foo2 = models.Foo()
foo1.id = foo2.id = 1
- session = get_session()
+ session = sessionmaker()
try:
with session.begin():
session.add(foo1)
.. code:: python
def myfunc(foo):
- session = get_session()
+ session = sessionmaker()
with session.begin():
# do some database things
bar = _private_func(foo, session)
def _private_func(foo, session=None):
if not session:
- session = get_session()
+ session = sessionmaker()
with session.begin(subtransaction=True):
# do some other database things
return bar
def complex_soft_delete_with_synchronization_bar(session=None):
if session is None:
- session = get_session()
+ session = sessionmaker()
with session.begin(subtransactions=True):
count = (model_query(BarModel).
find(some_condition).
.. code:: python
def soft_delete_bar_model():
- session = get_session()
+ session = sessionmaker()
with session.begin():
bar_ref = model_query(BarModel).find(some_condition).first()
# Work with bar_ref
.. code:: python
def soft_delete_multi_models():
- session = get_session()
+ session = sessionmaker()
with session.begin():
query = (model_query(BarModel, session=session).
find(some_condition))
import functools
import logging
-import os.path
import re
import time
-from oslo.config import cfg
import six
from sqlalchemy import exc as sqla_exc
from sqlalchemy.interfaces import PoolListener
from sqlalchemy.sql.expression import literal_column
from neutron.openstack.common.db import exception
-from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common.gettextutils import _LE, _LW
from neutron.openstack.common import timeutils
-sqlite_db_opts = [
- cfg.StrOpt('sqlite_db',
- default='neutron.sqlite',
- help='The file name to use with SQLite'),
- cfg.BoolOpt('sqlite_synchronous',
- default=True,
- help='If True, SQLite uses synchronous mode'),
-]
-
-database_opts = [
- cfg.StrOpt('connection',
- default='sqlite:///' +
- os.path.abspath(os.path.join(os.path.dirname(__file__),
- '../', '$sqlite_db')),
- help='The SQLAlchemy connection string used to connect to the '
- 'database',
- secret=True,
- deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
- group='DEFAULT'),
- cfg.DeprecatedOpt('sql_connection',
- group='DATABASE'),
- cfg.DeprecatedOpt('connection',
- group='sql'), ]),
- cfg.StrOpt('slave_connection',
- default='',
- secret=True,
- help='The SQLAlchemy connection string used to connect to the '
- 'slave database'),
- cfg.IntOpt('idle_timeout',
- default=3600,
- deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
- group='DEFAULT'),
- cfg.DeprecatedOpt('sql_idle_timeout',
- group='DATABASE'),
- cfg.DeprecatedOpt('idle_timeout',
- group='sql')],
- help='Timeout before idle sql connections are reaped'),
- cfg.IntOpt('min_pool_size',
- default=1,
- deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size',
- group='DEFAULT'),
- cfg.DeprecatedOpt('sql_min_pool_size',
- group='DATABASE')],
- help='Minimum number of SQL connections to keep open in a '
- 'pool'),
- cfg.IntOpt('max_pool_size',
- default=None,
- deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size',
- group='DEFAULT'),
- cfg.DeprecatedOpt('sql_max_pool_size',
- group='DATABASE')],
- help='Maximum number of SQL connections to keep open in a '
- 'pool'),
- cfg.IntOpt('max_retries',
- default=10,
- deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries',
- group='DEFAULT'),
- cfg.DeprecatedOpt('sql_max_retries',
- group='DATABASE')],
- help='Maximum db connection retries during startup. '
- '(setting -1 implies an infinite retry count)'),
- cfg.IntOpt('retry_interval',
- default=10,
- deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval',
- group='DEFAULT'),
- cfg.DeprecatedOpt('reconnect_interval',
- group='DATABASE')],
- help='Interval between retries of opening a sql connection'),
- cfg.IntOpt('max_overflow',
- default=None,
- deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow',
- group='DEFAULT'),
- cfg.DeprecatedOpt('sqlalchemy_max_overflow',
- group='DATABASE')],
- help='If set, use this value for max_overflow with sqlalchemy'),
- cfg.IntOpt('connection_debug',
- default=0,
- deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug',
- group='DEFAULT')],
- help='Verbosity of SQL debugging information. 0=None, '
- '100=Everything'),
- cfg.BoolOpt('connection_trace',
- default=False,
- deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace',
- group='DEFAULT')],
- help='Add python stack traces to SQL as comment strings'),
- cfg.IntOpt('pool_timeout',
- default=None,
- deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout',
- group='DATABASE')],
- help='If set, use this value for pool_timeout with sqlalchemy'),
-]
-
-CONF = cfg.CONF
-CONF.register_opts(sqlite_db_opts)
-CONF.register_opts(database_opts, 'database')
LOG = logging.getLogger(__name__)
-_ENGINE = None
-_MAKER = None
-_SLAVE_ENGINE = None
-_SLAVE_MAKER = None
-
-
-def set_defaults(sql_connection, sqlite_db, max_pool_size=None,
- max_overflow=None, pool_timeout=None):
- """Set defaults for configuration variables."""
- cfg.set_defaults(database_opts,
- connection=sql_connection)
- cfg.set_defaults(sqlite_db_opts,
- sqlite_db=sqlite_db)
- # Update the QueuePool defaults
- if max_pool_size is not None:
- cfg.set_defaults(database_opts,
- max_pool_size=max_pool_size)
- if max_overflow is not None:
- cfg.set_defaults(database_opts,
- max_overflow=max_overflow)
- if pool_timeout is not None:
- cfg.set_defaults(database_opts,
- pool_timeout=pool_timeout)
-
-
-def cleanup():
- global _ENGINE, _MAKER
- global _SLAVE_ENGINE, _SLAVE_MAKER
-
- if _MAKER:
- _MAKER.close_all()
- _MAKER = None
- if _ENGINE:
- _ENGINE.dispose()
- _ENGINE = None
- if _SLAVE_MAKER:
- _SLAVE_MAKER.close_all()
- _SLAVE_MAKER = None
- if _SLAVE_ENGINE:
- _SLAVE_ENGINE.dispose()
- _SLAVE_ENGINE = None
-
class SqliteForeignKeysListener(PoolListener):
"""Ensures that the foreign key constraints are enforced in SQLite.
dbapi_con.execute('pragma foreign_keys=ON')
-def get_session(autocommit=True, expire_on_commit=False, sqlite_fk=False,
- slave_session=False, mysql_traditional_mode=False):
- """Return a SQLAlchemy session."""
- global _MAKER
- global _SLAVE_MAKER
- maker = _MAKER
-
- if slave_session:
- maker = _SLAVE_MAKER
-
- if maker is None:
- engine = get_engine(sqlite_fk=sqlite_fk, slave_engine=slave_session,
- mysql_traditional_mode=mysql_traditional_mode)
- maker = get_maker(engine, autocommit, expire_on_commit)
-
- if slave_session:
- _SLAVE_MAKER = maker
- else:
- _MAKER = maker
-
- session = maker()
- return session
-
-
# note(boris-42): In current versions of DB backends unique constraint
# violation messages follow the structure:
#
# 'c1'")
# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined
# with -' for key 'name_of_our_constraint'")
+#
+# ibm_db_sa:
+# N columns - (IntegrityError) SQL0803N One or more values in the INSERT
+# statement, UPDATE statement, or foreign key update caused by a
+# DELETE statement are not valid because the primary key, unique
+# constraint or unique index identified by "2" constrains table
+# "NOVA.KEY_PAIRS" from having duplicate values for the index
+# key.
_DUP_KEY_RE_DB = {
"sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"),
re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")),
"postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),),
- "mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),)
+ "mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),),
+ "ibm_db_sa": (re.compile(r"^.*SQL0803N.*$"),),
}
return [columns]
return columns[len(uniqbase):].split("0")[1:]
- if engine_name not in ["mysql", "sqlite", "postgresql"]:
+ if engine_name not in ["ibm_db_sa", "mysql", "sqlite", "postgresql"]:
return
# FIXME(johannes): The usage of the .message attribute has been
else:
return
- columns = match.group(1)
+ # NOTE(mriedem): The ibm_db_sa integrity error message doesn't provide the
+ # columns so we have to omit that from the DBDuplicateEntry error.
+ columns = ''
+
+ if engine_name != 'ibm_db_sa':
+ columns = match.group(1)
if engine_name == "sqlite":
columns = [c.split('.')[-1] for c in columns.strip().split(", ")]
def _wrap_db_error(f):
@functools.wraps(f)
- def _wrap(*args, **kwargs):
+ def _wrap(self, *args, **kwargs):
try:
- return f(*args, **kwargs)
+ assert issubclass(
+ self.__class__, sqlalchemy.orm.session.Session
+ ), ('_wrap_db_error() can only be applied to methods of '
+ 'subclasses of sqlalchemy.orm.session.Session.')
+
+ return f(self, *args, **kwargs)
except UnicodeEncodeError:
raise exception.DBInvalidUnicodeParameter()
except sqla_exc.OperationalError as e:
- _raise_if_db_connection_lost(e, get_engine())
- _raise_if_deadlock_error(e, get_engine().name)
+ _raise_if_db_connection_lost(e, self.bind)
+ _raise_if_deadlock_error(e, self.bind.dialect.name)
# NOTE(comstud): A lot of code is checking for OperationalError
# so let's not wrap it for now.
raise
# instance_types) there are more than one unique constraint. This
# means we should get names of columns, which values violate
# unique constraint, from error message.
- _raise_if_duplicate_entry_error(e, get_engine().name)
+ _raise_if_duplicate_entry_error(e, self.bind.dialect.name)
raise exception.DBError(e)
except Exception as e:
- LOG.exception(_('DB exception wrapped.'))
+ LOG.exception(_LE('DB exception wrapped.'))
raise exception.DBError(e)
return _wrap
-def get_engine(sqlite_fk=False, slave_engine=False,
- mysql_traditional_mode=False):
- """Return a SQLAlchemy engine."""
- global _ENGINE
- global _SLAVE_ENGINE
- engine = _ENGINE
- db_uri = CONF.database.connection
-
- if slave_engine:
- engine = _SLAVE_ENGINE
- db_uri = CONF.database.slave_connection
-
- if engine is None:
- engine = create_engine(db_uri, sqlite_fk=sqlite_fk,
- mysql_traditional_mode=mysql_traditional_mode)
- if slave_engine:
- _SLAVE_ENGINE = engine
- else:
- _ENGINE = engine
-
- return engine
-
-
def _synchronous_switch_listener(dbapi_conn, connection_rec):
"""Switch sqlite connections to non-synchronous mode."""
dbapi_conn.execute("PRAGMA synchronous = OFF")
cursor.execute(ping_sql)
except Exception as ex:
if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):
- msg = _('Database server has gone away: %s') % ex
+ msg = _LW('Database server has gone away: %s') % ex
LOG.warning(msg)
+
+ # if the database server has gone away, all connections in the pool
+ # have become invalid and we can safely close all of them here,
+ # rather than waste time on checking of every single connection
+ engine.dispose()
+
+ # this will be handled by SQLAlchemy and will force it to create
+ # a new connection and retry the original action
raise sqla_exc.DisconnectionError(msg)
else:
raise
-def _set_mode_traditional(dbapi_con, connection_rec, connection_proxy):
- """Set engine mode to 'traditional'.
+def _set_session_sql_mode(dbapi_con, connection_rec, sql_mode=None):
+ """Set the sql_mode session variable.
+
+ MySQL supports several server modes. The default is None, but sessions
+ may choose to enable server modes like TRADITIONAL, ANSI,
+ several STRICT_* modes and others.
+
+ Note: passing in '' (empty string) for sql_mode clears
+ the SQL mode for the session, overriding a potentially set
+ server default.
+ """
+
+ cursor = dbapi_con.cursor()
+ cursor.execute("SET SESSION sql_mode = %s", [sql_mode])
+
+
+def _mysql_get_effective_sql_mode(engine):
+ """Returns the effective SQL mode for connections from the engine pool.
+
+ Returns ``None`` if the mode isn't available, otherwise returns the mode.
- Required to prevent silent truncates at insert or update operations
- under MySQL. By default MySQL truncates inserted string if it longer
- than a declared field just with warning. That is fraught with data
- corruption.
"""
- dbapi_con.cursor().execute("SET SESSION sql_mode = TRADITIONAL;")
+ # Get the real effective SQL mode. Even when unset by
+ # our own config, the server may still be operating in a specific
+ # SQL mode as set by the server configuration.
+ # Also note that the checkout listener will be called on execute to
+ # set the mode if it's registered.
+ row = engine.execute("SHOW VARIABLES LIKE 'sql_mode'").fetchone()
+ if row is None:
+ return
+ return row[1]
+
+
+def _mysql_check_effective_sql_mode(engine):
+ """Logs a message based on the effective SQL mode for MySQL connections."""
+ realmode = _mysql_get_effective_sql_mode(engine)
+
+ if realmode is None:
+ LOG.warning(_LW('Unable to detect effective SQL mode'))
+ return
+
+ LOG.debug('MySQL server mode set to %s', realmode)
+ # 'TRADITIONAL' mode enables several other modes, so
+ # we need a substring match here
+ if not ('TRADITIONAL' in realmode.upper() or
+ 'STRICT_ALL_TABLES' in realmode.upper()):
+ LOG.warning(_LW("MySQL SQL mode is '%s', "
+ "consider enabling TRADITIONAL or STRICT_ALL_TABLES"),
+ realmode)
+
+
+def _mysql_set_mode_callback(engine, sql_mode):
+ if sql_mode is not None:
+ mode_callback = functools.partial(_set_session_sql_mode,
+ sql_mode=sql_mode)
+ sqlalchemy.event.listen(engine, 'connect', mode_callback)
+ _mysql_check_effective_sql_mode(engine)
def _is_db_connection_error(args):
raise exception.DBConnectionError(error)
-def create_engine(sql_connection, sqlite_fk=False,
- mysql_traditional_mode=False):
+def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
+ idle_timeout=3600,
+ connection_debug=0, max_pool_size=None, max_overflow=None,
+ pool_timeout=None, sqlite_synchronous=True,
+ connection_trace=False, max_retries=10, retry_interval=10):
"""Return a new SQLAlchemy engine."""
- # NOTE(geekinutah): At this point we could be connecting to the normal
- # db handle or the slave db handle. Things like
- # _wrap_db_error aren't going to work well if their
- # backends don't match. Let's check.
- _assert_matching_drivers()
+
connection_dict = sqlalchemy.engine.url.make_url(sql_connection)
engine_args = {
- "pool_recycle": CONF.database.idle_timeout,
- "echo": False,
+ "pool_recycle": idle_timeout,
'convert_unicode': True,
}
- # Map our SQL debug level to SQLAlchemy's options
- if CONF.database.connection_debug >= 100:
- engine_args['echo'] = 'debug'
- elif CONF.database.connection_debug >= 50:
- engine_args['echo'] = True
+ logger = logging.getLogger('sqlalchemy.engine')
+
+ # Map SQL debug level to Python log level
+ if connection_debug >= 100:
+ logger.setLevel(logging.DEBUG)
+ elif connection_debug >= 50:
+ logger.setLevel(logging.INFO)
+ else:
+ logger.setLevel(logging.WARNING)
if "sqlite" in connection_dict.drivername:
if sqlite_fk:
engine_args["listeners"] = [SqliteForeignKeysListener()]
engine_args["poolclass"] = NullPool
- if CONF.database.connection == "sqlite://":
+ if sql_connection == "sqlite://":
engine_args["poolclass"] = StaticPool
engine_args["connect_args"] = {'check_same_thread': False}
else:
- if CONF.database.max_pool_size is not None:
- engine_args['pool_size'] = CONF.database.max_pool_size
- if CONF.database.max_overflow is not None:
- engine_args['max_overflow'] = CONF.database.max_overflow
- if CONF.database.pool_timeout is not None:
- engine_args['pool_timeout'] = CONF.database.pool_timeout
+ if max_pool_size is not None:
+ engine_args['pool_size'] = max_pool_size
+ if max_overflow is not None:
+ engine_args['max_overflow'] = max_overflow
+ if pool_timeout is not None:
+ engine_args['pool_timeout'] = pool_timeout
engine = sqlalchemy.create_engine(sql_connection, **engine_args)
sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
if engine.name in ['mysql', 'ibm_db_sa']:
- callback = functools.partial(_ping_listener, engine)
- sqlalchemy.event.listen(engine, 'checkout', callback)
+ ping_callback = functools.partial(_ping_listener, engine)
+ sqlalchemy.event.listen(engine, 'checkout', ping_callback)
if engine.name == 'mysql':
- if mysql_traditional_mode:
- sqlalchemy.event.listen(engine, 'checkout',
- _set_mode_traditional)
- else:
- LOG.warning(_("This application has not enabled MySQL "
- "traditional mode, which means silent "
- "data corruption may occur. "
- "Please encourage the application "
- "developers to enable this mode."))
+ if mysql_sql_mode:
+ _mysql_set_mode_callback(engine, mysql_sql_mode)
elif 'sqlite' in connection_dict.drivername:
- if not CONF.sqlite_synchronous:
+ if not sqlite_synchronous:
sqlalchemy.event.listen(engine, 'connect',
_synchronous_switch_listener)
sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener)
- if (CONF.database.connection_trace and
- engine.dialect.dbapi.__name__ == 'MySQLdb'):
+ if connection_trace and engine.dialect.dbapi.__name__ == 'MySQLdb':
_patch_mysqldb_with_stacktrace_comments()
try:
if not _is_db_connection_error(e.args[0]):
raise
- remaining = CONF.database.max_retries
+ remaining = max_retries
if remaining == -1:
remaining = 'infinite'
while True:
- msg = _('SQL connection failed. %s attempts left.')
+ msg = _LW('SQL connection failed. %s attempts left.')
LOG.warning(msg % remaining)
if remaining != 'infinite':
remaining -= 1
- time.sleep(CONF.database.retry_interval)
+ time.sleep(retry_interval)
try:
engine.connect()
break
setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query)
-def _assert_matching_drivers():
- """Make sure slave handle and normal handle have the same driver."""
- # NOTE(geekinutah): There's no use case for writing to one backend and
- # reading from another. Who knows what the future holds?
- if CONF.database.slave_connection == '':
- return
+class EngineFacade(object):
+ """A helper class for removing of global engine instances from neutron.db.
+
+ As a library, neutron.db can't decide where to store/when to create engine
+ and sessionmaker instances, so this must be left for a target application.
+
+ On the other hand, in order to simplify the adoption of neutron.db changes,
+ we'll provide a helper class, which creates engine and sessionmaker
+ on its instantiation and provides get_engine()/get_session() methods
+ that are compatible with corresponding utility functions that currently
+ exist in target projects, e.g. in Nova.
+
+ engine/sessionmaker instances will still be global (and they are meant to
+ be global), but they will be stored in the app context, rather that in the
+ neutron.db context.
+
+ Note: using of this helper is completely optional and you are encouraged to
+ integrate engine/sessionmaker instances into your apps any way you like
+ (e.g. one might want to bind a session to a request context). Two important
+ things to remember:
+
+ 1. An Engine instance is effectively a pool of DB connections, so it's
+ meant to be shared (and it's thread-safe).
+ 2. A Session instance is not meant to be shared and represents a DB
+ transactional context (i.e. it's not thread-safe). sessionmaker is
+ a factory of sessions.
+
+ """
- normal = sqlalchemy.engine.url.make_url(CONF.database.connection)
- slave = sqlalchemy.engine.url.make_url(CONF.database.slave_connection)
- assert normal.drivername == slave.drivername
+ def __init__(self, sql_connection,
+ sqlite_fk=False, autocommit=True,
+ expire_on_commit=False, **kwargs):
+ """Initialize engine and sessionmaker instances.
+
+ :param sqlite_fk: enable foreign keys in SQLite
+ :type sqlite_fk: bool
+
+ :param autocommit: use autocommit mode for created Session instances
+ :type autocommit: bool
+
+ :param expire_on_commit: expire session objects on commit
+ :type expire_on_commit: bool
+
+ Keyword arguments:
+
+ :keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions.
+ (defaults to TRADITIONAL)
+ :keyword idle_timeout: timeout before idle sql connections are reaped
+ (defaults to 3600)
+ :keyword connection_debug: verbosity of SQL debugging information.
+ 0=None, 100=Everything (defaults to 0)
+ :keyword max_pool_size: maximum number of SQL connections to keep open
+ in a pool (defaults to SQLAlchemy settings)
+ :keyword max_overflow: if set, use this value for max_overflow with
+ sqlalchemy (defaults to SQLAlchemy settings)
+ :keyword pool_timeout: if set, use this value for pool_timeout with
+ sqlalchemy (defaults to SQLAlchemy settings)
+ :keyword sqlite_synchronous: if True, SQLite uses synchronous mode
+ (defaults to True)
+ :keyword connection_trace: add python stack traces to SQL as comment
+ strings (defaults to False)
+ :keyword max_retries: maximum db connection retries during startup.
+ (setting -1 implies an infinite retry count)
+ (defaults to 10)
+ :keyword retry_interval: interval between retries of opening a sql
+ connection (defaults to 10)
+
+ """
+
+ super(EngineFacade, self).__init__()
+
+ self._engine = create_engine(
+ sql_connection=sql_connection,
+ sqlite_fk=sqlite_fk,
+ mysql_sql_mode=kwargs.get('mysql_sql_mode', 'TRADITIONAL'),
+ idle_timeout=kwargs.get('idle_timeout', 3600),
+ connection_debug=kwargs.get('connection_debug', 0),
+ max_pool_size=kwargs.get('max_pool_size'),
+ max_overflow=kwargs.get('max_overflow'),
+ pool_timeout=kwargs.get('pool_timeout'),
+ sqlite_synchronous=kwargs.get('sqlite_synchronous', True),
+ connection_trace=kwargs.get('connection_trace', False),
+ max_retries=kwargs.get('max_retries', 10),
+ retry_interval=kwargs.get('retry_interval', 10))
+ self._session_maker = get_maker(
+ engine=self._engine,
+ autocommit=autocommit,
+ expire_on_commit=expire_on_commit)
+
+ def get_engine(self):
+ """Get the engine instance (note, that it's shared)."""
+
+ return self._engine
+
+ def get_session(self, **kwargs):
+ """Get a Session instance.
+
+ If passed, keyword arguments values override the ones used when the
+ sessionmaker instance was created.
+
+ :keyword autocommit: use autocommit mode for created Session instances
+ :type autocommit: bool
+
+ :keyword expire_on_commit: expire session objects on commit
+ :type expire_on_commit: bool
+
+ """
+
+ for arg in kwargs:
+ if arg not in ('autocommit', 'expire_on_commit'):
+ del kwargs[arg]
+
+ return self._session_maker(**kwargs)
+
+ @classmethod
+ def from_config(cls, connection_string, conf,
+ sqlite_fk=False, autocommit=True, expire_on_commit=False):
+ """Initialize EngineFacade using oslo.config config instance options.
+
+ :param connection_string: SQLAlchemy connection string
+ :type connection_string: string
+
+ :param conf: oslo.config config instance
+ :type conf: oslo.config.cfg.ConfigOpts
+
+ :param sqlite_fk: enable foreign keys in SQLite
+ :type sqlite_fk: bool
+
+ :param autocommit: use autocommit mode for created Session instances
+ :type autocommit: bool
+
+ :param expire_on_commit: expire session objects on commit
+ :type expire_on_commit: bool
+
+ """
+
+ return cls(sql_connection=connection_string,
+ sqlite_fk=sqlite_fk,
+ autocommit=autocommit,
+ expire_on_commit=expire_on_commit,
+ **dict(conf.database.items()))
--- /dev/null
+# Copyright (c) 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import functools
+import os
+
+import fixtures
+import six
+
+from neutron.openstack.common.db.sqlalchemy import session
+from neutron.openstack.common.db.sqlalchemy import utils
+from neutron.openstack.common.fixture import lockutils
+from neutron.openstack.common import test
+
+
+class DbFixture(fixtures.Fixture):
+ """Basic database fixture.
+
+ Allows to run tests on various db backends, such as SQLite, MySQL and
+ PostgreSQL. By default use sqlite backend. To override default backend
+ uri set env variable OS_TEST_DBAPI_CONNECTION with database admin
+ credentials for specific backend.
+ """
+
+ def _get_uri(self):
+ return os.getenv('OS_TEST_DBAPI_CONNECTION', 'sqlite://')
+
+ def __init__(self, test):
+ super(DbFixture, self).__init__()
+
+ self.test = test
+
+ def setUp(self):
+ super(DbFixture, self).setUp()
+
+ self.test.engine = session.create_engine(self._get_uri())
+ self.test.sessionmaker = session.get_maker(self.test.engine)
+ self.addCleanup(self.test.engine.dispose)
+
+
+class DbTestCase(test.BaseTestCase):
+ """Base class for testing of DB code.
+
+ Using `DbFixture`. Intended to be the main database test case to use all
+ the tests on a given backend with user defined uri. Backend specific
+ tests should be decorated with `backend_specific` decorator.
+ """
+
+ FIXTURE = DbFixture
+
+ def setUp(self):
+ super(DbTestCase, self).setUp()
+ self.useFixture(self.FIXTURE(self))
+
+
+ALLOWED_DIALECTS = ['sqlite', 'mysql', 'postgresql']
+
+
+def backend_specific(*dialects):
+ """Decorator to skip backend specific tests on inappropriate engines.
+
+ ::dialects: list of dialects names under which the test will be launched.
+ """
+ def wrap(f):
+ @functools.wraps(f)
+ def ins_wrap(self):
+ if not set(dialects).issubset(ALLOWED_DIALECTS):
+ raise ValueError(
+ "Please use allowed dialects: %s" % ALLOWED_DIALECTS)
+ if self.engine.name not in dialects:
+ msg = ('The test "%s" can be run '
+ 'only on %s. Current engine is %s.')
+ args = (f.__name__, ' '.join(dialects), self.engine.name)
+ self.skip(msg % args)
+ else:
+ return f(self)
+ return ins_wrap
+ return wrap
+
+
+@six.add_metaclass(abc.ABCMeta)
+class OpportunisticFixture(DbFixture):
+ """Base fixture to use default CI databases.
+
+ The databases exist in OpenStack CI infrastructure. But for the
+ correct functioning in local environment the databases must be
+ created manually.
+ """
+
+ DRIVER = abc.abstractproperty(lambda: None)
+ DBNAME = PASSWORD = USERNAME = 'openstack_citest'
+
+ def _get_uri(self):
+ return utils.get_connect_string(backend=self.DRIVER,
+ user=self.USERNAME,
+ passwd=self.PASSWORD,
+ database=self.DBNAME)
+
+
+@six.add_metaclass(abc.ABCMeta)
+class OpportunisticTestCase(DbTestCase):
+ """Base test case to use default CI databases.
+
+ The subclasses of the test case are running only when openstack_citest
+ database is available otherwise a tests will be skipped.
+ """
+
+ FIXTURE = abc.abstractproperty(lambda: None)
+
+ def setUp(self):
+ # TODO(bnemec): Remove this once infra is ready for
+ # https://review.openstack.org/#/c/74963/ to merge.
+ self.useFixture(lockutils.LockFixture('opportunistic-db'))
+ credentials = {
+ 'backend': self.FIXTURE.DRIVER,
+ 'user': self.FIXTURE.USERNAME,
+ 'passwd': self.FIXTURE.PASSWORD,
+ 'database': self.FIXTURE.DBNAME}
+
+ if self.FIXTURE.DRIVER and not utils.is_backend_avail(**credentials):
+ msg = '%s backend is not available.' % self.FIXTURE.DRIVER
+ return self.skip(msg)
+
+ super(OpportunisticTestCase, self).setUp()
+
+
+class MySQLOpportunisticFixture(OpportunisticFixture):
+ DRIVER = 'mysql'
+
+
+class PostgreSQLOpportunisticFixture(OpportunisticFixture):
+ DRIVER = 'postgresql'
+
+
+class MySQLOpportunisticTestCase(OpportunisticTestCase):
+ FIXTURE = MySQLOpportunisticFixture
+
+
+class PostgreSQLOpportunisticTestCase(OpportunisticTestCase):
+ FIXTURE = PostgreSQLOpportunisticFixture
import logging
import re
-from migrate.changeset import UniqueConstraint
import sqlalchemy
from sqlalchemy import Boolean
from sqlalchemy import CheckConstraint
from sqlalchemy import Index
from sqlalchemy import Integer
from sqlalchemy import MetaData
+from sqlalchemy import or_
from sqlalchemy.sql.expression import literal_column
from sqlalchemy.sql.expression import UpdateBase
-from sqlalchemy.sql import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.types import NullType
-from neutron.openstack.common.gettextutils import _
+from neutron.openstack.common import context as request_context
+from neutron.openstack.common.db.sqlalchemy import models
+from neutron.openstack.common.gettextutils import _, _LI, _LW
from neutron.openstack.common import timeutils
if 'id' not in sort_keys:
# TODO(justinsb): If this ever gives a false-positive, check
# the actual primary key, rather than assuming its id
- LOG.warning(_('Id not in sort_keys; is sort_keys unique?'))
+ LOG.warning(_LW('Id not in sort_keys; is sort_keys unique?'))
assert(not (sort_dir and sort_dirs))
return query
+def _read_deleted_filter(query, db_model, read_deleted):
+ if 'deleted' not in db_model.__table__.columns:
+ raise ValueError(_("There is no `deleted` column in `%s` table. "
+ "Project doesn't use soft-deleted feature.")
+ % db_model.__name__)
+
+ default_deleted_value = db_model.__table__.c.deleted.default.arg
+ if read_deleted == 'no':
+ query = query.filter(db_model.deleted == default_deleted_value)
+ elif read_deleted == 'yes':
+ pass # omit the filter to include deleted and active
+ elif read_deleted == 'only':
+ query = query.filter(db_model.deleted != default_deleted_value)
+ else:
+ raise ValueError(_("Unrecognized read_deleted value '%s'")
+ % read_deleted)
+ return query
+
+
+def _project_filter(query, db_model, context, project_only):
+ if project_only and 'project_id' not in db_model.__table__.columns:
+ raise ValueError(_("There is no `project_id` column in `%s` table.")
+ % db_model.__name__)
+
+ if request_context.is_user_context(context) and project_only:
+ if project_only == 'allow_none':
+ is_none = None
+ query = query.filter(or_(db_model.project_id == context.project_id,
+ db_model.project_id == is_none))
+ else:
+ query = query.filter(db_model.project_id == context.project_id)
+
+ return query
+
+
+def model_query(context, model, session, args=None, project_only=False,
+ read_deleted=None):
+ """Query helper that accounts for context's `read_deleted` field.
+
+ :param context: context to query under
+
+ :param model: Model to query. Must be a subclass of ModelBase.
+ :type model: models.ModelBase
+
+ :param session: The session to use.
+ :type session: sqlalchemy.orm.session.Session
+
+ :param args: Arguments to query. If None - model is used.
+ :type args: tuple
+
+ :param project_only: If present and context is user-type, then restrict
+ query to match the context's project_id. If set to
+ 'allow_none', restriction includes project_id = None.
+ :type project_only: bool
+
+ :param read_deleted: If present, overrides context's read_deleted field.
+ :type read_deleted: bool
+
+ Usage:
+
+ ..code:: python
+
+ result = (utils.model_query(context, models.Instance, session=session)
+ .filter_by(uuid=instance_uuid)
+ .all())
+
+ query = utils.model_query(
+ context, Node,
+ session=session,
+ args=(func.count(Node.id), func.sum(Node.ram))
+ ).filter_by(project_id=project_id)
+
+ """
+
+ if not read_deleted:
+ if hasattr(context, 'read_deleted'):
+ # NOTE(viktors): some projects use `read_deleted` attribute in
+ # their contexts instead of `show_deleted`.
+ read_deleted = context.read_deleted
+ else:
+ read_deleted = context.show_deleted
+
+ if not issubclass(model, models.ModelBase):
+ raise TypeError(_("model should be a subclass of ModelBase"))
+
+ query = session.query(model) if not args else session.query(*args)
+ query = _read_deleted_filter(query, model, read_deleted)
+ query = _project_filter(query, model, context, project_only)
+
+ return query
+
+
def get_table(engine, name):
"""Returns an sqlalchemy table dynamically from db.
**col_name_col_instance):
"""Drop unique constraint from table.
+ DEPRECATED: this function is deprecated and will be removed from neutron.db
+ in a few releases. Please use UniqueConstraint.drop() method directly for
+ sqlalchemy-migrate migration scripts.
+
This method drops UC from table and works for mysql, postgresql and sqlite.
In mysql and postgresql we are able to use "alter table" construction.
Sqlalchemy doesn't support some sqlite column types and replaces their
types by sqlite. For example BigInteger.
"""
+ from migrate.changeset import UniqueConstraint
+
meta = MetaData()
meta.bind = migrate_engine
t = Table(table_name, meta, autoload=True)
columns_for_select = [func.max(table.c.id)]
columns_for_select.extend(columns_for_group_by)
- duplicated_rows_select = select(columns_for_select,
- group_by=columns_for_group_by,
- having=func.count(table.c.id) > 1)
+ duplicated_rows_select = sqlalchemy.sql.select(
+ columns_for_select, group_by=columns_for_group_by,
+ having=func.count(table.c.id) > 1)
for row in migrate_engine.execute(duplicated_rows_select):
# NOTE(boris-42): Do not remove row that has the biggest ID.
for name in uc_column_names:
delete_condition &= table.c[name] == row[name]
- rows_to_delete_select = select([table.c.id]).where(delete_condition)
+ rows_to_delete_select = sqlalchemy.sql.select(
+ [table.c.id]).where(delete_condition)
for row in migrate_engine.execute(rows_to_delete_select).fetchall():
- LOG.info(_("Deleting duplicated row with id: %(id)s from table: "
- "%(table)s") % dict(id=row[0], table=table_name))
+ LOG.info(_LI("Deleting duplicated row with id: %(id)s from table: "
+ "%(table)s") % dict(id=row[0], table=table_name))
if use_soft_delete:
delete_statement = table.update().\
else:
c_select.append(table.c.deleted == table.c.id)
- ins = InsertFromSelect(new_table, select(c_select))
+ ins = InsertFromSelect(new_table, sqlalchemy.sql.select(c_select))
migrate_engine.execute(ins)
table.drop()
from neutron.common import config
from neutron import context
+from neutron.db import api as session
from neutron import manager
-from neutron.openstack.common.db.sqlalchemy import session
from neutron.openstack.common import excutils
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
# We may have just forked from parent process. A quick disposal of the
# existing sql connections avoids producing errors later when they are
# discovered to be broken.
- session.get_engine(sqlite_fk=True).pool.dispose()
+ session.get_engine().pool.dispose()
self._server = self._plugin.start_rpc_listener()
def wait(self):