All unit tests now run in 4 seconds, instead of 64 seconds on a HDD.
_ENGINE global setting had to be moved into get_engine() so that migration works, otherwise get_engine() creates a new isolated in-memory database for
every call.
Other unit test speedups in this change are to stub out some eventlet sleeps
which saves about 5 seconds.
Change-Id: I3643b73dd9bd86c414934b7c78db67402587f570
from heat.openstack.common import cfg
import heat.utils
-SQL_CONNECTION = 'sqlite:///heat-test.db/'
+SQL_CONNECTION = 'sqlite://'
SQL_IDLE_TIMEOUT = 3600
db_opts = [
cfg.StrOpt('db_backend',
import sqlalchemy.interfaces
import sqlalchemy.orm
+import sqlalchemy.engine
from sqlalchemy.exc import DisconnectionError
from heat.openstack.common import log as logging
from heat.db import api as db_api
-from heat.openstack.common import cfg
logger = logging.getLogger('heat.db.sqlalchemy.session')
_ENGINE = None
def get_session(autocommit=True, expire_on_commit=False):
"""Return a SQLAlchemy session."""
- global _ENGINE, _MAKER
+ global _MAKER
- if _MAKER is None or _ENGINE is None:
- _ENGINE = get_engine()
- _MAKER = get_maker(_ENGINE, autocommit, expire_on_commit)
+ if _MAKER is None:
+ _MAKER = get_maker(get_engine(), autocommit, expire_on_commit)
return _MAKER()
def get_engine():
"""Return a SQLAlchemy engine."""
- connection_dict = sqlalchemy.engine.url.make_url(_get_sql_connection())
- engine_args = {
- "pool_recycle": _get_sql_idle_timeout(),
- "echo": False,
- 'convert_unicode': True,
- }
-
- if 'mysql' in connection_dict.drivername:
- engine_args['listeners'] = [MySQLPingListener()]
-
- return sqlalchemy.create_engine(_get_sql_connection(), **engine_args)
+ global _ENGINE
+ if _ENGINE is None:
+ connection_dict = sqlalchemy.engine.url.make_url(_get_sql_connection())
+ engine_args = {
+ "pool_recycle": _get_sql_idle_timeout(),
+ "echo": False,
+ 'convert_unicode': True
+ }
+
+ if 'mysql' in connection_dict.drivername:
+ engine_args['listeners'] = [MySQLPingListener()]
+
+ _ENGINE = sqlalchemy.create_engine(_get_sql_connection(),
+ **engine_args)
+ return _ENGINE
def get_maker(engine, autocommit=True, expire_on_commit=False):
import sys
import os
+import eventlet
import json
import nose
import mox
self.m.StubOutWithMock(self.fc.ec2, 'create')
self.m.StubOutWithMock(self.fc.ec2, 'get')
self.m.StubOutWithMock(self.fc.ec2, 'delete')
+ self.m.StubOutWithMock(eventlet, 'sleep')
def tearDown(self):
self.m.UnsetStubs()
# delete script
user.User.keystone().AndReturn(self.fc)
self.fc.users.get(user.DummyId('1')).AndRaise(Exception('not found'))
+ eventlet.sleep(1).AndReturn(None)
user.User.keystone().AndReturn(self.fc)
self.fc.users.get(user.DummyId('1')).AndReturn(fake_user)
import sys
import os
+import eventlet
import json
import nose
import mox
self.m.StubOutWithMock(self.fc.volumes, 'delete')
self.m.StubOutWithMock(self.fc.volumes, 'create_server_volume')
self.m.StubOutWithMock(self.fc.volumes, 'delete_server_volume')
+ self.m.StubOutWithMock(eventlet, 'sleep')
def tearDown(self):
self.m.UnsetStubs()
# delete script
vol.Volume.nova('volume').AndReturn(self.fc)
self.fc.volumes.get('vol-123').AndReturn(fv)
+ eventlet.sleep(1).AndReturn(None)
vol.Volume.nova('volume').AndReturn(self.fc)
self.fc.volumes.get('vol-123').AndReturn(fv)
display_description='test_stack.DataVolume',
display_name='test_stack.DataVolume').AndReturn(fv)
+ eventlet.sleep(1).AndReturn(None)
+
self.m.ReplayAll()
t = self.load_template()
vol.VolumeAttachment.nova('volume').AndReturn(self.fc)
self.fc.volumes.get('vol-123').AndReturn(fv)
+ eventlet.sleep(1).AndReturn(None)
self.m.ReplayAll()
vol.VolumeAttachment.nova('volume').AndReturn(self.fc)
self.fc.volumes.get('vol-123').AndReturn(fv)
+ eventlet.sleep(1).AndReturn(None)
# delete script
'vol-123').AndReturn(None)
vol.VolumeAttachment.nova('volume').AndReturn(self.fc)
self.fc.volumes.get('vol-123').AndReturn(fv)
+ eventlet.sleep(1).AndReturn(None)
self.m.ReplayAll()