from heat.api.aws import exception
import heat.api.cfn.v1.stacks as stacks
from heat.tests.common import HeatTestCase
-from heat.tests.utils import dummy_context
+from heat.tests import utils
policy_path = os.path.dirname(os.path.realpath(__file__)) + "/policy/"
qs = "&".join(["=".join([k, str(params[k])]) for k in params])
environ = {'REQUEST_METHOD': 'GET', 'QUERY_STRING': qs}
req = Request(environ)
- req.context = dummy_context()
+ req.context = utils.dummy_context()
return req
# The tests
import heat.api.cloudwatch.watch as watches
from heat.rpc import api as engine_api
from heat.tests.common import HeatTestCase
-from heat.tests.utils import dummy_context
+from heat.tests import utils
class WatchControllerTest(HeatTestCase):
qs = "&".join(["=".join([k, str(params[k])]) for k in params])
environ = {'REQUEST_METHOD': 'GET', 'QUERY_STRING': qs}
req = Request(environ)
- req.context = dummy_context()
+ req.context = utils.dummy_context()
return req
# The tests
import heat.api.openstack.v1.resources as resources
import heat.api.openstack.v1.events as events
import heat.api.openstack.v1.actions as actions
-from heat.tests.utils import dummy_context
+from heat.tests import utils
import heat.api.middleware.fault as fault
environ['REQUEST_METHOD'] = method
req = Request(environ)
- req.context = dummy_context('api_test_user', self.tenant)
+ req.context = utils.dummy_context('api_test_user', self.tenant)
return req
def _get(self, path):
environ['REQUEST_METHOD'] = method
req = Request(environ)
- req.context = dummy_context('api_test_user', self.tenant)
+ req.context = utils.dummy_context('api_test_user', self.tenant)
req.body = data
return req
from heat.openstack.common import timeutils
from heat.tests.common import HeatTestCase
from heat.tests import fakes
-from heat.tests.utils import setup_dummy_db
-from heat.tests.utils import parse_stack
+from heat.tests import utils
+
as_template = '''
{
def setUp(self):
super(AutoScalingTest, self).setUp()
- setup_dummy_db()
+ utils.setup_dummy_db()
cfg.CONF.set_default('heat_waitcondition_server_url',
'http://127.0.0.1:8000/v1/waitcondition')
self.fc = fakes.FakeKeystoneClient()
properties = t['Resources']['WebServerGroup']['Properties']
properties['MinSize'] = '0'
properties['MaxSize'] = '0'
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
now = timeutils.utcnow()
self.m.ReplayAll()
properties = t['Resources']['WebServerGroup']['Properties']
properties['MinSize'] = '1'
properties['MaxSize'] = '1'
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
self._stub_lb_reload(1)
now = timeutils.utcnow()
def test_scaling_group_update_replace(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
self._stub_lb_reload(1)
now = timeutils.utcnow()
def test_scaling_group_suspend(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
self._stub_lb_reload(1)
now = timeutils.utcnow()
def test_scaling_group_resume(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
self._stub_lb_reload(1)
now = timeutils.utcnow()
t = template_format.parse(as_template)
properties = t['Resources']['WebServerGroup']['Properties']
properties['DesiredCapacity'] = '2'
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
self._stub_lb_reload(2)
now = timeutils.utcnow()
t = template_format.parse(as_template)
properties = t['Resources']['WebServerGroup']['Properties']
properties['DesiredCapacity'] = '2'
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
self._stub_lb_reload(2)
now = timeutils.utcnow()
def test_scaling_group_suspend_fail(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
self._stub_lb_reload(1)
now = timeutils.utcnow()
def test_scaling_group_resume_fail(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
self._stub_lb_reload(1)
now = timeutils.utcnow()
def test_scaling_group_create_error(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
self.m.StubOutWithMock(instance.Instance, 'handle_create')
self.m.StubOutWithMock(instance.Instance, 'check_create_complete')
properties = t['Resources']['WebServerGroup']['Properties']
properties['MinSize'] = '1'
properties['MaxSize'] = '3'
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
self._stub_lb_reload(1)
now = timeutils.utcnow()
properties = t['Resources']['WebServerGroup']['Properties']
properties['MinSize'] = '1'
properties['MaxSize'] = '3'
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
self._stub_lb_reload(1)
now = timeutils.utcnow()
properties = t['Resources']['WebServerGroup']['Properties']
properties['MinSize'] = '1'
properties['MaxSize'] = '3'
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
self._stub_lb_reload(1)
now = timeutils.utcnow()
t = template_format.parse(as_template)
properties = t['Resources']['WebServerGroup']['Properties']
properties['DesiredCapacity'] = '2'
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
self._stub_lb_reload(2)
now = timeutils.utcnow()
t = template_format.parse(as_template)
properties = t['Resources']['WebServerGroup']['Properties']
properties['Cooldown'] = '60'
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
self._stub_lb_reload(1)
now = timeutils.utcnow()
self._stub_meta_expected(now, 'ExactCapacity : 1')
self._stub_create(1)
self.m.ReplayAll()
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
rsrc = self.create_scaling_group(t, stack, 'WebServerGroup')
self.assertEqual('WebServerGroup', rsrc.FnGetRefId())
def test_scaling_group_adjust(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
# start with 3
properties = t['Resources']['WebServerGroup']['Properties']
def test_scaling_group_scale_up_failure(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
# Create initial group
self._stub_lb_reload(1)
def test_scaling_group_nochange(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
# Create initial group, 2 instances
properties = t['Resources']['WebServerGroup']['Properties']
def test_scaling_group_percent(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
# Create initial group, 2 instances
properties = t['Resources']['WebServerGroup']['Properties']
def test_scaling_group_cooldown_toosoon(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
# Create initial group, 2 instances, Cooldown 60s
properties = t['Resources']['WebServerGroup']['Properties']
def test_scaling_group_cooldown_ok(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
# Create initial group, 2 instances, Cooldown 60s
properties = t['Resources']['WebServerGroup']['Properties']
def test_scaling_group_cooldown_zero(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
# Create initial group, 2 instances, Cooldown 0
properties = t['Resources']['WebServerGroup']['Properties']
def test_scaling_policy_up(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
# Create initial group
self._stub_lb_reload(1)
def test_scaling_policy_down(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
# Create initial group, 2 instances
properties = t['Resources']['WebServerGroup']['Properties']
def test_scaling_policy_cooldown_toosoon(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
# Create initial group
self._stub_lb_reload(1)
def test_scaling_policy_cooldown_ok(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
# Create initial group
self._stub_lb_reload(1)
def test_scaling_policy_cooldown_zero(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
# Create initial group
self._stub_lb_reload(1)
def test_scaling_policy_cooldown_none(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
# Create initial group
self._stub_lb_reload(1)
def test_scaling_policy_update(self):
t = template_format.parse(as_template)
- stack = parse_stack(t, params=self.params)
+ stack = utils.parse_stack(t, params=self.params)
# Create initial group
self._stub_lb_reload(1)
from heat.engine.resources.rackspace import clouddatabase
from heat.openstack.common import uuidutils
from heat.tests.common import HeatTestCase
-from heat.tests.utils import dummy_context
-from heat.tests.utils import setup_dummy_db
+from heat.tests import utils
wp_template = '''
class CloudDBInstanceTest(HeatTestCase):
def setUp(self):
super(CloudDBInstanceTest, self).setUp()
- setup_dummy_db()
+ utils.setup_dummy_db()
# Test environment may not have pyrax client library installed and if
# pyrax is not installed resource class would not be registered.
# So register resource provider class explicitly for unit testing.
stack_name = '%s_stack' % name
t = template_format.parse(wp_template)
template = parser.Template(t)
- stack = parser.Stack(dummy_context(),
+ stack = parser.Stack(utils.dummy_context(),
stack_name,
template,
environment.Environment({'InstanceName': 'Test',
from heat.common import policy
from heat.common import exception
from heat.tests.common import HeatTestCase
-from heat.tests.utils import dummy_context
+from heat.tests import utils
policy_path = os.path.dirname(os.path.realpath(__file__)) + "/policy/"
def test_policy_cfn_default(self):
enforcer = policy.Enforcer(scope='cloudformation')
- ctx = dummy_context(roles=[])
+ ctx = utils.dummy_context(roles=[])
for action in self.cfn_actions:
# Everything should be allowed
enforcer.enforce(ctx, action, {})
enforcer = policy.Enforcer(scope='cloudformation')
- ctx = dummy_context(roles=[])
+ ctx = utils.dummy_context(roles=[])
for action in self.cfn_actions:
# Everything should raise the default exception.Forbidden
self.assertRaises(exception.Forbidden, enforcer.enforce, ctx,
enforcer = policy.Enforcer(scope='cloudformation')
- ctx = dummy_context(roles=['heat_stack_user'])
+ ctx = utils.dummy_context(roles=['heat_stack_user'])
for action in self.cfn_actions:
# Everything apart from DescribeStackResource should be Forbidden
if action == "DescribeStackResource":
enforcer = policy.Enforcer(scope='cloudformation')
- ctx = dummy_context(roles=['not_a_stack_user'])
+ ctx = utils.dummy_context(roles=['not_a_stack_user'])
for action in self.cfn_actions:
# Everything should be allowed
enforcer.enforce(ctx, action, {})
enforcer = policy.Enforcer(scope='cloudwatch')
- ctx = dummy_context(roles=['heat_stack_user'])
+ ctx = utils.dummy_context(roles=['heat_stack_user'])
for action in self.cw_actions:
# Everything apart from PutMetricData should be Forbidden
if action == "PutMetricData":
enforcer = policy.Enforcer(scope='cloudwatch')
- ctx = dummy_context(roles=['not_a_stack_user'])
+ ctx = utils.dummy_context(roles=['not_a_stack_user'])
for action in self.cw_actions:
# Everything should be allowed
enforcer.enforce(ctx, action, {})
from heat.engine import scheduler
from heat.engine.resources import dbinstance as dbi
from heat.tests.common import HeatTestCase
-from heat.tests.utils import setup_dummy_db
-from heat.tests.utils import parse_stack
+from heat.tests import utils
rds_template = '''
class DBInstanceTest(HeatTestCase):
def setUp(self):
super(DBInstanceTest, self).setUp()
- setup_dummy_db()
+ utils.setup_dummy_db()
self.m.StubOutWithMock(dbi.DBInstance, 'create_with_template')
self.m.StubOutWithMock(dbi.DBInstance, 'check_create_complete')
self.m.StubOutWithMock(dbi.DBInstance, 'nested')
self.m.ReplayAll()
t = template_format.parse(rds_template)
- s = parse_stack(t)
+ s = utils.parse_stack(t)
resource = self.create_dbinstance(t, s, 'DatabaseServer')
self.assertEqual('0.0.0.0', resource.FnGetAtt('Endpoint.Address'))
from heat.engine import scheduler
from heat.tests.common import HeatTestCase
from heat.tests.v1_1 import fakes
-from heat.tests.utils import setup_dummy_db
-from heat.tests.utils import parse_stack
+from heat.tests import utils
eip_template = '''
self.m.StubOutWithMock(eip.ElasticIp, 'nova')
self.m.StubOutWithMock(eip.ElasticIpAssociation, 'nova')
self.m.StubOutWithMock(self.fc.servers, 'get')
- setup_dummy_db()
+ utils.setup_dummy_db()
def create_eip(self, t, stack, resource_name):
rsrc = eip.ElasticIp(resource_name,
self.m.ReplayAll()
t = template_format.parse(eip_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_eip(t, stack, 'IPAddress')
self.m.ReplayAll()
t = template_format.parse(eip_template_ipassoc)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_eip(t, stack, 'IPAddress')
association = self.create_association(t, stack, 'IPAssoc')
from heat.rpc import api as rpc_api
from heat.tests.common import HeatTestCase
from heat.tests import generic_resource as generic_rsrc
-from heat.tests.utils import dummy_context
-from heat.tests.utils import setup_dummy_db
+from heat.tests import utils
class EngineApiTest(HeatTestCase):
def setUp(self):
super(FormatTest, self).setUp()
- setup_dummy_db()
+ utils.setup_dummy_db()
template = parser.Template({
'Resources': {
})
resource._register_class('GenericResourceType',
generic_rsrc.GenericResource)
- self.stack = parser.Stack(dummy_context(), 'test_stack', template,
- stack_id=uuidutils.generate_uuid())
+ self.stack = parser.Stack(utils.dummy_context(), 'test_stack',
+ template, stack_id=uuidutils.generate_uuid())
def test_format_stack_resource(self):
res = self.stack['generic1']
from heat.openstack.common import threadgroup
from heat.tests.common import HeatTestCase
from heat.tests import utils
-from heat.tests.utils import dummy_context
-from heat.tests.utils import setup_dummy_db
wp_template = '''
def create_context(mocks, user='stacks_test_user',
tenant_id='test_admin', password='stacks_test_password'):
- ctx = dummy_context()
+ ctx = utils.dummy_context()
mocks.StubOutWithMock(ctx, 'username')
mocks.StubOutWithMock(ctx, 'tenant_id')
mocks.StubOutWithMock(ctx, 'password')
class StackCreateTest(HeatTestCase):
def setUp(self):
super(StackCreateTest, self).setUp()
- setup_dummy_db()
+ utils.setup_dummy_db()
def test_wordpress_single_instance_stack_create(self):
stack = get_wordpress_stack('test_stack', create_context(self.m))
def setUp(self):
super(StackServiceCreateUpdateDeleteTest, self).setUp()
- setup_dummy_db()
+ utils.setup_dummy_db()
self.ctx = create_context(self.m)
self.man = service.EngineService('a-host', 'a-topic')
def setUp(self):
super(StackServiceSuspendResumeTest, self).setUp()
- setup_dummy_db()
+ utils.setup_dummy_db()
self.ctx = create_context(self.m)
self.man = service.EngineService('a-host', 'a-topic')
self.eng = service.EngineService('a-host', 'a-topic')
cfg.CONF.set_default('heat_stack_user_role', 'stack_user_role')
- setup_dummy_db()
+ utils.setup_dummy_db()
@stack_context('service_identify_test_stack', False)
def test_stack_identify(self):
from heat.engine import event
from heat.tests.common import HeatTestCase
-from heat.tests.utils import dummy_context
-from heat.tests.utils import setup_dummy_db
from heat.tests import generic_resource as generic_rsrc
+from heat.tests import utils
tmpl = {
super(EventTest, self).setUp()
self.username = 'event_test_user'
- setup_dummy_db()
- self.ctx = dummy_context()
+ utils.setup_dummy_db()
+ self.ctx = utils.dummy_context()
self.m.ReplayAll()
from heat.common import heat_keystoneclient
from heat.tests.common import HeatTestCase
-from heat.tests.utils import dummy_context
+from heat.tests import utils
class KeystoneClientTest(HeatTestCase):
# long_user_name, keystone was actually called with a truncated
# user name
heat_ks_client = heat_keystoneclient.KeystoneClient(
- dummy_context())
+ utils.dummy_context())
heat_ks_client.create_stack_user(long_user_name, password='password')
from heat.tests.common import HeatTestCase
from heat.tests import test_parser
-from heat.tests.utils import stack_delete_after
+from heat.tests import utils
hot_tpl_empty = template_format.parse('''
class StackTest(test_parser.StackTest):
"""Test stack function when stack was created from HOT template."""
- @stack_delete_after
+ @utils.stack_delete_after
def test_get_attr(self):
"""Test resolution of get_attr occurrences in HOT template."""
from heat.openstack.common import uuidutils
from heat.tests.common import HeatTestCase
from heat.tests import utils
-from heat.tests.utils import dummy_context
-from heat.tests.utils import setup_dummy_db
wp_template = '''
def setUp(self):
super(InstancesTest, self).setUp()
self.fc = fakes.FakeClient()
- setup_dummy_db()
+ utils.setup_dummy_db()
def _setup_test_stack(self, stack_name):
t = template_format.parse(wp_template)
template = parser.Template(t)
- stack = parser.Stack(dummy_context(), stack_name, template,
+ stack = parser.Stack(utils.dummy_context(), stack_name, template,
environment.Environment({'KeyName': 'test'}),
stack_id=uuidutils.generate_uuid())
return (t, stack)
from heat.engine import resources
from heat.engine import scheduler
from heat.tests.common import HeatTestCase
-from heat.tests.utils import setup_dummy_db
-from heat.tests.utils import parse_stack
+from heat.tests import utils
ig_template = '''
class InstanceGroupTest(HeatTestCase):
def setUp(self):
super(InstanceGroupTest, self).setUp()
- setup_dummy_db()
+ utils.setup_dummy_db()
def _stub_create(self, num, instance_class=instance.Instance):
"""
def test_instance_group(self):
t = template_format.parse(ig_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
# start with min then delete
self._stub_create(1)
original_instance)
t = template_format.parse(ig_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
self._stub_create(1, instance_class=MyInstance)
self.m.ReplayAll()
def test_missing_image(self):
t = template_format.parse(ig_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = asc.InstanceGroup('JobServerGroup',
t['Resources']['JobServerGroup'],
t = template_format.parse(ig_template)
properties = t['Resources']['JobServerGroup']['Properties']
properties['Size'] = '2'
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
self._stub_create(2)
self.m.ReplayAll()
t = template_format.parse(ig_template)
properties = t['Resources']['JobServerGroup']['Properties']
properties['Size'] = '2'
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
self._stub_create(2)
self.m.ReplayAll()
t = template_format.parse(ig_template)
properties = t['Resources']['JobServerGroup']['Properties']
properties['Size'] = '2'
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
self._stub_create(2)
self.m.ReplayAll()
from heat.openstack.common import uuidutils
from heat.tests.common import HeatTestCase
from heat.tests import utils
-from heat.tests.utils import dummy_context
-from heat.tests.utils import setup_dummy_db
wp_template = '''
def setUp(self):
super(instancesTest, self).setUp()
self.fc = fakes.FakeClient()
- setup_dummy_db()
+ utils.setup_dummy_db()
def _create_test_instance(self, return_server, name):
stack_name = '%s_stack' % name
kwargs = {'KeyName': 'test',
'InstanceType': 'm1.large',
'SubnetId': '4156c7a5-e8c4-4aff-a6e1-8f3c7bc83861'}
- stack = parser.Stack(dummy_context(), stack_name, template,
+ stack = parser.Stack(utils.dummy_context(), stack_name, template,
environment.Environment(kwargs),
stack_id=uuidutils.generate_uuid())
kwargs = {'KeyName': 'test',
'InstanceType': 'm1.large',
'SubnetId': '4156c7a5-e8c4-4aff-a6e1-8f3c7bc83861'}
- stack = parser.Stack(dummy_context(), stack_name, template,
+ stack = parser.Stack(utils.dummy_context(), stack_name, template,
environment.Environment(kwargs),
stack_id=uuidutils.generate_uuid())
from heat.engine.resource import Metadata
from heat.tests.common import HeatTestCase
from heat.tests import utils
-from heat.tests.utils import setup_dummy_db
-from heat.tests.utils import parse_stack
from heat.tests.v1_1 import fakes
from heat.tests import fakes as test_fakes
cfg.CONF.set_default('heat_waitcondition_server_url',
'http://127.0.0.1:8000/v1/waitcondition')
- setup_dummy_db()
+ utils.setup_dummy_db()
def create_loadbalancer(self, t, stack, resource_name):
rsrc = lb.LoadBalancer(resource_name,
self.m.ReplayAll()
t = template_format.parse(lb_template)
- s = parse_stack(t)
+ s = utils.parse_stack(t)
s.store()
rsrc = self.create_loadbalancer(t, s, 'LoadBalancer')
self.m.ReplayAll()
t = template_format.parse(lb_template_nokey)
- s = parse_stack(t)
+ s = utils.parse_stack(t)
s.store()
rsrc = self.create_loadbalancer(t, s, 'LoadBalancer')
from oslo.config import cfg
from heat.tests import fakes
from heat.tests.common import HeatTestCase
-from heat.tests.utils import dummy_context
-from heat.tests.utils import setup_dummy_db
-from heat.tests.utils import stack_delete_after
+from heat.tests import utils
from heat.db import api as db_api
from heat.engine import environment
def setUp(self):
super(MetadataRefreshTest, self).setUp()
self.fc = fakes.FakeKeystoneClient()
- setup_dummy_db()
+ utils.setup_dummy_db()
# Note tests creating a stack should be decorated with @stack_delete_after
# to ensure the stack is properly cleaned up
def create_stack(self, stack_name='test_stack', params={}):
temp = template_format.parse(test_template_metadata)
template = parser.Template(temp)
- ctx = dummy_context()
+ ctx = utils.dummy_context()
stack = parser.Stack(ctx, stack_name, template,
environment.Environment(params),
disable_rollback=True)
return stack
- @stack_delete_after
+ @utils.stack_delete_after
def test_FnGetAtt(self):
self.stack = self.create_stack()
class WaitCondMetadataUpdateTest(HeatTestCase):
def setUp(self):
super(WaitCondMetadataUpdateTest, self).setUp()
- setup_dummy_db()
+ utils.setup_dummy_db()
self.fc = fakes.FakeKeystoneClient()
self.man = service.EngineService('a-host', 'a-topic')
cfg.CONF.set_default('heat_waitcondition_server_url',
def create_stack(self, stack_name='test_stack'):
temp = template_format.parse(test_template_waitcondition)
template = parser.Template(temp)
- stack = parser.Stack(dummy_context(), stack_name, template,
+ stack = parser.Stack(utils.dummy_context(), stack_name, template,
disable_rollback=True)
self.stack_id = stack.store()
return stack
- @stack_delete_after
+ @utils.stack_delete_after
def test_wait_meta(self):
'''
1 create stack
self.assertEqual(inst.metadata['test'], None)
def update_metadata(id, data, reason):
- self.man.metadata_update(dummy_context(),
+ self.man.metadata_update(utils.dummy_context(),
dict(self.stack.identifier()),
'WH',
{'Data': data, 'Reason': reason,
from heat.tests import generic_resource as generic_rsrc
from heat.tests import utils
from heat.tests.common import HeatTestCase
-from heat.tests.utils import dummy_context
-from heat.tests.utils import setup_dummy_db
class NestedStackTest(HeatTestCase):
def setUp(self):
super(NestedStackTest, self).setUp()
self.m.StubOutWithMock(urlfetch, 'get')
- setup_dummy_db()
+ utils.setup_dummy_db()
def create_stack(self, template):
t = template_format.parse(template)
return stack
def parse_stack(self, t):
- ctx = dummy_context('test_username', 'aaaa', 'password')
+ ctx = utils.dummy_context('test_username', 'aaaa', 'password')
stack_name = 'test_stack'
tmpl = parser.Template(t)
stack = parser.Stack(ctx, stack_name, tmpl)
from heat.tests.common import HeatTestCase
from heat.tests import fakes
from heat.tests import utils
-from heat.tests.utils import setup_dummy_db
-from heat.tests.utils import parse_stack
neutronclient = try_import('neutronclient.v2_0.client')
qe = try_import('neutronclient.common.exceptions')
self.m.StubOutWithMock(neutronclient.Client, 'delete_network')
self.m.StubOutWithMock(neutronclient.Client, 'show_network')
self.m.StubOutWithMock(clients.OpenStackClients, 'keystone')
- setup_dummy_db()
+ utils.setup_dummy_db()
def create_net(self, t, stack, resource_name):
rsrc = net.Net('test_net', t['Resources'][resource_name], stack)
self.m.ReplayAll()
t = template_format.parse(neutron_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_net(t, stack, 'network')
# assert the implicit dependency between the gateway and the interface
self.m.StubOutWithMock(neutronclient.Client, 'delete_subnet')
self.m.StubOutWithMock(neutronclient.Client, 'show_subnet')
self.m.StubOutWithMock(clients.OpenStackClients, 'keystone')
- setup_dummy_db()
+ utils.setup_dummy_db()
def create_subnet(self, t, stack, resource_name):
rsrc = subnet.Subnet('test_subnet', t['Resources'][resource_name],
self.m.ReplayAll()
t = template_format.parse(neutron_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_subnet(t, stack, 'subnet')
rsrc.validate()
self.m.ReplayAll()
t = template_format.parse(neutron_template)
t['Resources']['subnet']['Properties']['enable_dhcp'] = 'False'
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_subnet(t, stack, 'subnet')
rsrc.validate()
self.m.StubOutWithMock(neutronclient.Client, 'add_gateway_router')
self.m.StubOutWithMock(neutronclient.Client, 'remove_gateway_router')
self.m.StubOutWithMock(clients.OpenStackClients, 'keystone')
- setup_dummy_db()
+ utils.setup_dummy_db()
def create_router(self, t, stack, resource_name):
rsrc = router.Router('router', t['Resources'][resource_name], stack)
self.m.ReplayAll()
t = template_format.parse(neutron_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_router(t, stack, 'router')
rsrc.validate()
).AndRaise(qe.NeutronClientException(status_code=404))
self.m.ReplayAll()
t = template_format.parse(neutron_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_router_interface(
t, stack, 'router_interface', properties={
).AndRaise(qe.NeutronClientException(status_code=404))
self.m.ReplayAll()
t = template_format.parse(neutron_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_gateway_router(
t, stack, 'gateway', properties={
self.m.StubOutWithMock(neutronclient.Client, 'delete_port')
self.m.StubOutWithMock(neutronclient.Client, 'show_port')
self.m.StubOutWithMock(clients.OpenStackClients, 'keystone')
- setup_dummy_db()
+ utils.setup_dummy_db()
def test_floating_ip(self):
self.m.ReplayAll()
t = template_format.parse(neutron_floating_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
# assert the implicit dependency between the floating_ip
# and the gateway
self.m.ReplayAll()
t = template_format.parse(neutron_floating_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
p = stack['port_floating']
scheduler.TaskRunner(p.create)()
self.m.ReplayAll()
t = template_format.parse(neutron_floating_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
fip = stack['floating_ip']
scheduler.TaskRunner(fip.create)()
from heat.engine import scheduler
from heat.tests.common import HeatTestCase
from heat.tests import utils
-from heat.tests.utils import setup_dummy_db
-from heat.tests.utils import parse_stack
nokey_template = '''
def setUp(self):
super(nokeyTest, self).setUp()
self.fc = fakes.FakeClient()
- setup_dummy_db()
+ utils.setup_dummy_db()
def test_nokey_create(self):
stack_name = 'instance_create_test_nokey_stack'
t = template_format.parse(nokey_template)
- stack = parse_stack(t, stack_name=stack_name)
+ stack = utils.parse_stack(t, stack_name=stack_name)
t['Resources']['WebServer']['Properties']['ImageId'] = 'CentOS 5.2'
t['Resources']['WebServer']['Properties']['InstanceType'] = \
from heat.tests.fakes import FakeKeystoneClient
from heat.tests.common import HeatTestCase
-from heat.tests.utils import dummy_context
-from heat.tests.utils import setup_dummy_db
+from heat.tests import utils
from heat.tests.v1_1 import fakes
-from heat.tests.utils import stack_delete_after
from heat.tests import generic_resource as generic_rsrc
import heat.db.api as db_api
def setUp(self):
super(TemplateTest, self).setUp()
- self.ctx = dummy_context()
+ self.ctx = utils.dummy_context()
def test_defaults(self):
empty = parser.Template({})
self.username = 'parser_stack_test_user'
- setup_dummy_db()
- self.ctx = dummy_context()
+ utils.setup_dummy_db()
+ self.ctx = utils.dummy_context()
resource._register_class('GenericResourceType',
generic_rsrc.GenericResource)
self.assertEqual(stack.status_reason, '')
def test_no_auth_token(self):
- ctx = dummy_context()
+ ctx = utils.dummy_context()
ctx.auth_token = None
self.m.StubOutWithMock(clients.OpenStackClients, 'keystone')
clients.OpenStackClients.keystone().AndReturn(FakeKeystoneClient())
self.assertRaises(exception.NotFound, parser.Stack.load,
None, -1)
- @stack_delete_after
+ @utils.stack_delete_after
def test_load_parent_resource(self):
self.stack = parser.Stack(self.ctx, 'load_parent_resource',
parser.Template({}))
# Note tests creating a stack should be decorated with @stack_delete_after
# to ensure the self.stack is properly cleaned up
- @stack_delete_after
+ @utils.stack_delete_after
def test_identifier(self):
self.stack = parser.Stack(self.ctx, 'identifier_test',
parser.Template({}))
self.assertTrue(identifier.stack_id)
self.assertFalse(identifier.path)
- @stack_delete_after
+ @utils.stack_delete_after
def test_set_param_id(self):
self.stack = parser.Stack(self.ctx, 'param_arn_test',
parser.Template({}))
identifier.arn())
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_load_param_id(self):
self.stack = parser.Stack(self.ctx, 'param_load_arn_test',
parser.Template({}))
newstack = parser.Stack.load(self.ctx, stack_id=self.stack.id)
self.assertEqual(newstack.parameters['AWS::StackId'], identifier.arn())
- @stack_delete_after
+ @utils.stack_delete_after
def test_created_time(self):
self.stack = parser.Stack(self.ctx, 'creation_time_test',
parser.Template({}))
self.stack.store()
self.assertNotEqual(self.stack.created_time, None)
- @stack_delete_after
+ @utils.stack_delete_after
def test_updated_time(self):
self.stack = parser.Stack(self.ctx, 'update_time_test',
parser.Template({}))
self.assertNotEqual(self.stack.updated_time, None)
self.assertNotEqual(self.stack.updated_time, stored_time)
- @stack_delete_after
+ @utils.stack_delete_after
def test_delete(self):
self.stack = parser.Stack(self.ctx, 'delete_test',
parser.Template({}))
self.assertEqual(self.stack.state,
(parser.Stack.DELETE, parser.Stack.COMPLETE))
- @stack_delete_after
+ @utils.stack_delete_after
def test_suspend_resume(self):
self.m.ReplayAll()
tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_suspend_fail(self):
tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
self.m.StubOutWithMock(generic_rsrc.GenericResource, 'handle_suspend')
'Resource suspend failed: Exception: foo')
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_resume_fail(self):
tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
self.m.StubOutWithMock(generic_rsrc.GenericResource, 'handle_resume')
'Resource resume failed: Exception: foo')
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_suspend_timeout(self):
tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
self.m.StubOutWithMock(generic_rsrc.GenericResource, 'handle_suspend')
self.assertEqual(self.stack.status_reason, 'Suspend timed out')
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_resume_timeout(self):
tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
self.m.StubOutWithMock(generic_rsrc.GenericResource, 'handle_resume')
self.assertEqual(self.stack.status_reason, 'Resume timed out')
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_delete_rollback(self):
self.stack = parser.Stack(self.ctx, 'delete_rollback_test',
parser.Template({}), disable_rollback=False)
self.assertEqual(self.stack.state,
(parser.Stack.ROLLBACK, parser.Stack.COMPLETE))
- @stack_delete_after
+ @utils.stack_delete_after
def test_delete_badaction(self):
self.stack = parser.Stack(self.ctx, 'delete_badaction_test',
parser.Template({}))
self.assertEqual(self.stack.state,
(parser.Stack.DELETE, parser.Stack.FAILED))
- @stack_delete_after
+ @utils.stack_delete_after
def test_update_badstate(self):
self.stack = parser.Stack(self.ctx, 'test_stack', parser.Template({}),
action=parser.Stack.CREATE,
self.assertEqual(self.stack.state,
(parser.Stack.UPDATE, parser.Stack.FAILED))
- @stack_delete_after
+ @utils.stack_delete_after
def test_resource_by_refid(self):
tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
finally:
rsrc.state_set(rsrc.CREATE, rsrc.COMPLETE)
- @stack_delete_after
+ @utils.stack_delete_after
def test_update_add(self):
tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
(parser.Stack.UPDATE, parser.Stack.COMPLETE))
self.assertTrue('BResource' in self.stack)
- @stack_delete_after
+ @utils.stack_delete_after
def test_update_remove(self):
tmpl = {'Resources': {
'AResource': {'Type': 'GenericResourceType'},
(parser.Stack.UPDATE, parser.Stack.COMPLETE))
self.assertFalse('BResource' in self.stack)
- @stack_delete_after
+ @utils.stack_delete_after
def test_update_description(self):
tmpl = {'Description': 'ATemplate',
'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
(parser.Stack.UPDATE, parser.Stack.COMPLETE))
self.assertEqual(self.stack.t[template.DESCRIPTION], 'BTemplate')
- @stack_delete_after
+ @utils.stack_delete_after
def test_update_modify_ok_replace(self):
tmpl = {'Resources': {'AResource': {'Type': 'ResourceWithPropsType',
'Properties': {'Foo': 'abc'}}}}
self.assertEqual(self.stack['AResource'].properties['Foo'], 'xyz')
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_update_modify_update_failed(self):
tmpl = {'Resources': {'AResource': {'Type': 'ResourceWithPropsType',
'Properties': {'Foo': 'abc'}}}}
(parser.Stack.UPDATE, parser.Stack.FAILED))
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_update_modify_replace_failed_delete(self):
tmpl = {'Resources': {'AResource': {'Type': 'ResourceWithPropsType',
'Properties': {'Foo': 'abc'}}}}
# Unset here so destroy() is not stubbed for stack.delete cleanup
self.m.UnsetStubs()
- @stack_delete_after
+ @utils.stack_delete_after
def test_update_modify_replace_failed_create(self):
tmpl = {'Resources': {'AResource': {'Type': 'ResourceWithPropsType',
'Properties': {'Foo': 'abc'}}}}
(parser.Stack.UPDATE, parser.Stack.FAILED))
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_update_add_failed_create(self):
tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
self.assertTrue('BResource' in re_stack)
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_update_rollback(self):
tmpl = {'Resources': {'AResource': {'Type': 'ResourceWithPropsType',
'Properties': {'Foo': 'abc'}}}}
self.assertEqual(self.stack['AResource'].properties['Foo'], 'abc')
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_update_rollback_fail(self):
tmpl = {'Resources': {'AResource': {'Type': 'ResourceWithPropsType',
'Properties': {'Foo': 'abc'}}}}
(parser.Stack.ROLLBACK, parser.Stack.FAILED))
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_update_rollback_add(self):
tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
self.assertFalse('BResource' in self.stack)
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_update_rollback_remove(self):
tmpl = {'Resources': {
'AResource': {'Type': 'GenericResourceType'},
# Unset here so delete() is not stubbed for stack.delete cleanup
self.m.UnsetStubs()
- @stack_delete_after
+ @utils.stack_delete_after
def test_update_replace_by_reference(self):
'''
assertion:
self.assertEqual(self.stack['BResource'].properties['Foo'], 'inst-007')
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_update_by_reference_and_rollback_1(self):
'''
assertion:
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_update_by_reference_and_rollback_2(self):
'''
assertion:
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_update_replace_parameters(self):
'''
assertion:
self.assertRaises(ValueError, parser.Stack, self.ctx, '#test',
parser.Template({}))
- @stack_delete_after
+ @utils.stack_delete_after
def test_resource_state_get_att(self):
tmpl = {
'Resources': {'AResource': {'Type': 'GenericResourceType'}},
rsrc.state_set(action, status)
self.assertEqual(None, self.stack.output('TestOutput'))
- @stack_delete_after
+ @utils.stack_delete_after
def test_resource_required_by(self):
tmpl = {'Resources': {'AResource': {'Type': 'GenericResourceType'},
'BResource': {'Type': 'GenericResourceType',
from heat.tests import generic_resource as generic_rsrc
from heat.tests.common import HeatTestCase
-from heat.tests.utils import dummy_context
-from heat.tests.utils import setup_dummy_db
+from heat.tests import utils
class MyCloudResource(generic_rsrc.GenericResource):
class ProviderTemplateTest(HeatTestCase):
def setUp(self):
super(ProviderTemplateTest, self).setUp()
- setup_dummy_db()
+ utils.setup_dummy_db()
resource._register_class('OS::ResourceType',
generic_rsrc.GenericResource)
resource._register_class('myCloud::ResourceType',
def test_to_parameters(self):
"""Tests property conversion to parameter values."""
- setup_dummy_db()
- stack = parser.Stack(dummy_context(), 'test_stack',
+ utils.setup_dummy_db()
+ stack = parser.Stack(utils.dummy_context(), 'test_stack',
parser.Template({}),
stack_id=uuidutils.generate_uuid())
from heat.openstack.common import uuidutils
from heat.tests.common import HeatTestCase
from heat.tests import utils
-from heat.tests.utils import dummy_context
-from heat.tests.utils import setup_dummy_db
wp_template = '''
def setUp(self):
super(RackspaceCloudServerTest, self).setUp()
self.fc = fakes.FakeClient()
- setup_dummy_db()
+ utils.setup_dummy_db()
# Test environment may not have pyrax client library installed and if
# pyrax is not installed resource class would not be registered.
# So register resource provider class explicitly for unit testing.
def _setup_test_stack(self, stack_name):
t = template_format.parse(wp_template)
template = parser.Template(t)
- stack = parser.Stack(dummy_context(), stack_name, template,
+ stack = parser.Stack(utils.dummy_context(), stack_name, template,
environment.Environment({'flavor': '2'}),
stack_id=uuidutils.generate_uuid())
return (t, stack)
from heat.tests import generic_resource as generic_rsrc
from heat.tests.common import HeatTestCase
-from heat.tests.utils import dummy_context
-from heat.tests.utils import setup_dummy_db
+from heat.tests import utils
class ResourceTest(HeatTestCase):
def setUp(self):
super(ResourceTest, self).setUp()
- setup_dummy_db()
- self.stack = parser.Stack(dummy_context(), 'test_stack',
+ utils.setup_dummy_db()
+ self.stack = parser.Stack(utils.dummy_context(), 'test_stack',
parser.Template({}),
stack_id=uuidutils.generate_uuid())
tmpl1 = {'Type': 'Foo'}
tmpl2 = {'Type': 'Foo'}
tmpl3 = {'Type': 'Bar'}
- stack2 = parser.Stack(dummy_context(), 'test_stack',
+ stack2 = parser.Stack(utils.dummy_context(), 'test_stack',
parser.Template({}), stack_id=-1)
res1 = generic_rsrc.GenericResource('test_resource', tmpl1, self.stack)
res2 = generic_rsrc.GenericResource('test_resource', tmpl2, stack2)
'Type': 'Foo',
'Metadata': {'Test': 'Initial metadata'}
}
- setup_dummy_db()
- self.stack = parser.Stack(dummy_context(),
+ utils.setup_dummy_db()
+ self.stack = parser.Stack(utils.dummy_context(),
'test_stack', parser.Template({}))
self.stack.store()
self.res = generic_rsrc.GenericResource('metadata_resource',
from heat.rpc import api as rpc_api
from heat.rpc import client as rpc_client
from heat.openstack.common import rpc
-from heat.tests.utils import dummy_context
+from heat.tests import utils
class EngineRpcAPITestCase(testtools.TestCase):
def setUp(self):
- self.context = dummy_context()
+ self.context = utils.dummy_context()
cfg.CONF.set_default('rpc_backend',
'heat.openstack.common.rpc.impl_fake')
cfg.CONF.set_default('verbose', True)
super(EngineRpcAPITestCase, self).setUp()
def _test_engine_api(self, method, rpc_method, **kwargs):
- ctxt = dummy_context()
+ ctxt = utils.dummy_context()
if 'rpcapi_class' in kwargs:
rpcapi_class = kwargs['rpcapi_class']
del kwargs['rpcapi_class']
from heat.tests.common import HeatTestCase
from heat.tests import utils
from heat.tests import fakes
-from heat.tests.utils import setup_dummy_db
-from heat.tests.utils import parse_stack
swiftclient = try_import('swiftclient.client')
self.m.StubOutWithMock(swiftclient.Connection, 'get_auth')
self.m.StubOutWithMock(clients.OpenStackClients, 'keystone')
- setup_dummy_db()
+ utils.setup_dummy_db()
def create_resource(self, t, stack, resource_name):
rsrc = s3.S3Bucket('test_resource',
self.m.ReplayAll()
t = template_format.parse(swift_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_resource(t, stack, 'S3Bucket')
ref_id = rsrc.FnGetRefId()
t = template_format.parse(swift_template)
properties = t['Resources']['S3Bucket']['Properties']
properties['AccessControl'] = 'PublicRead'
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_resource(t, stack, 'S3Bucket')
rsrc.delete()
self.m.VerifyAll()
t = template_format.parse(swift_template)
properties = t['Resources']['S3Bucket']['Properties']
properties['AccessControl'] = 'PublicReadWrite'
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_resource(t, stack, 'S3Bucket')
rsrc.delete()
self.m.VerifyAll()
t = template_format.parse(swift_template)
properties = t['Resources']['S3Bucket']['Properties']
properties['AccessControl'] = 'AuthenticatedRead'
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_resource(t, stack, 'S3Bucket')
rsrc.delete()
self.m.VerifyAll()
self.m.ReplayAll()
t = template_format.parse(swift_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_resource(t, stack, 'S3BucketWebsite')
rsrc.delete()
self.m.VerifyAll()
self.m.ReplayAll()
t = template_format.parse(swift_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_resource(t, stack, 'S3Bucket')
rsrc.delete()
bucket = t['Resources']['S3Bucket']
bucket['DeletionPolicy'] = 'Retain'
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_resource(t, stack, 'S3Bucket')
rsrc.delete()
self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
from heat.engine import resource
from heat.tests.common import HeatTestCase
from heat.tests.fakes import FakeKeystoneClient
-from heat.tests.utils import dummy_context
-from heat.tests.utils import setup_dummy_db
from heat.tests.v1_1 import fakes
from heat.tests import utils
-from heat.tests.utils import stack_delete_after
from novaclient.v1_1 import security_groups as nova_sg
from novaclient.v1_1 import security_group_rules as nova_sgr
self.m.StubOutWithMock(nova_sg.SecurityGroupManager, 'delete')
self.m.StubOutWithMock(nova_sg.SecurityGroupManager, 'get')
self.m.StubOutWithMock(nova_sg.SecurityGroupManager, 'list')
- setup_dummy_db()
+ utils.setup_dummy_db()
self.m.StubOutWithMock(neutronclient.Client, 'create_security_group')
self.m.StubOutWithMock(
neutronclient.Client, 'create_security_group_rule')
def parse_stack(self, t):
stack_name = 'test_stack'
tmpl = parser.Template(t)
- stack = parser.Stack(dummy_context(), stack_name, tmpl)
+ stack = parser.Stack(utils.dummy_context(), stack_name, tmpl)
stack.store()
return stack
self.assertEqual(ref_id, rsrc.FnGetRefId())
self.assertEqual(metadata, dict(rsrc.metadata))
- @stack_delete_after
+ @utils.stack_delete_after
def test_security_group_nova(self):
#create script
clients.OpenStackClients.nova('compute').AndReturn(self.fc)
stack.delete()
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_security_group_nova_exception(self):
#create script
clients.OpenStackClients.nova('compute').AndReturn(self.fc)
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_security_group_neutron(self):
#create script
clients.OpenStackClients.keystone().AndReturn(
stack.delete()
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_security_group_neutron_exception(self):
#create script
clients.OpenStackClients.keystone().AndReturn(
from heat.openstack.common import uuidutils
from heat.tests.common import HeatTestCase
from heat.tests import utils
-from heat.tests.utils import dummy_context
-from heat.tests.utils import setup_dummy_db
instance_template = '''
def setUp(self):
super(ServerTagsTest, self).setUp()
self.fc = fakes.FakeClient()
- setup_dummy_db()
+ utils.setup_dummy_db()
def _setup_test_instance(self, intags=None, nova_tags=None):
stack_name = 'tag_test'
t = template_format.parse(instance_template)
template = parser.Template(t)
- stack = parser.Stack(dummy_context(), stack_name, template,
+ stack = parser.Stack(utils.dummy_context(), stack_name, template,
environment.Environment({'KeyName': 'test'}),
stack_id=uuidutils.generate_uuid())
stack_name = 'tag_test'
t = template_format.parse(group_template)
template = parser.Template(t)
- stack = parser.Stack(dummy_context(), stack_name, template,
+ stack = parser.Stack(utils.dummy_context(), stack_name, template,
environment.Environment({'KeyName': 'test'}),
stack_id=uuidutils.generate_uuid())
from heat.tests import fakes
from heat.tests.common import HeatTestCase
from heat.tests import utils
-from heat.tests.utils import reset_dummy_db
from heat.common import context
from heat.common import exception
def tearDown(self):
super(SignalTest, self).tearDown()
- reset_dummy_db()
+ utils.reset_dummy_db()
# Note tests creating a stack should be decorated with @stack_delete_after
# to ensure the stack is properly cleaned up
from heat.openstack.common import uuidutils
from heat.tests.common import HeatTestCase
from heat.tests import utils
-from heat.tests.utils import setup_dummy_db
-from heat.tests.utils import dummy_context
-from heat.tests.utils import reset_dummy_db
+
from heat.engine.clients import novaclient
def setUp(self):
super(SqlAlchemyTest, self).setUp()
self.fc = fakes.FakeClient()
- setup_dummy_db()
- reset_dummy_db()
- self.ctx = dummy_context()
+ utils.setup_dummy_db()
+ utils.reset_dummy_db()
+ self.ctx = utils.dummy_context()
def tearDown(self):
super(SqlAlchemyTest, self).tearDown()
from heat.openstack.common import uuidutils
from heat.tests.common import HeatTestCase
from heat.tests import generic_resource as generic_rsrc
-from heat.tests.utils import dummy_context
-from heat.tests.utils import setup_dummy_db
-from heat.tests.utils import stack_delete_after
+from heat.tests import utils
ws_res_snippet = {"Type": "some_magic_type",
"metadata": {
def setUp(self):
super(StackResourceTest, self).setUp()
- setup_dummy_db()
+ utils.setup_dummy_db()
resource._register_class('some_magic_type',
MyStackResource)
resource._register_class('GenericResource',
generic_rsrc.GenericResource)
t = parser.Template({template.RESOURCES:
{"provider_resource": ws_res_snippet}})
- self.parent_stack = parser.Stack(dummy_context(), 'test_stack', t,
- stack_id=uuidutils.generate_uuid())
+ self.parent_stack = parser.Stack(utils.dummy_context(), 'test_stack',
+ t, stack_id=uuidutils.generate_uuid())
self.parent_resource = MyStackResource('test',
ws_res_snippet,
self.parent_stack)
self.templ = template_format.parse(wp_template)
self.generic_template = template_format.parse(generic_template)
- @stack_delete_after
+ @utils.stack_delete_after
def test_create_with_template_ok(self):
self.parent_resource.create_with_template(self.templ,
{"KeyName": "key"})
self.assertEqual(self.templ, self.stack.t.t)
self.assertEqual(self.stack.id, self.parent_resource.resource_id)
- @stack_delete_after
+ @utils.stack_delete_after
def test_update_with_template_ok(self):
"""
The update_with_template method updates the nested stack with the
self.assertEqual(set(self.stack.resources.keys()),
set(["WebServer", "WebServer2"]))
- @stack_delete_after
+ @utils.stack_delete_after
def test_load_nested_ok(self):
self.parent_resource.create_with_template(self.templ,
{"KeyName": "key"})
self.parent_resource.nested()
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_load_nested_non_exist(self):
self.parent_resource.create_with_template(self.templ,
{"KeyName": "key"})
self.m.VerifyAll()
- @stack_delete_after
+ @utils.stack_delete_after
def test_create_complete_state_err(self):
"""
check_create_complete should raise error when create task is
# Restore state_set to let clean up proceed
self.stack.state_set = st_set
- @stack_delete_after
+ @utils.stack_delete_after
def test_suspend_complete_state_err(self):
"""
check_suspend_complete should raise error when suspend task is
# Restore state_set to let clean up proceed
self.stack.state_set = st_set
- @stack_delete_after
+ @utils.stack_delete_after
def test_resume_complete_state_err(self):
"""
check_resume_complete should raise error when resume task is
from heat.tests.common import HeatTestCase
from heat.tests import fakes
from heat.tests import utils
-from heat.tests.utils import setup_dummy_db
-from heat.tests.utils import parse_stack
swiftclient = try_import('swiftclient.client')
self.m.StubOutWithMock(swiftclient.Connection, 'get_auth')
self.m.StubOutWithMock(clients.OpenStackClients, 'keystone')
- setup_dummy_db()
+ utils.setup_dummy_db()
def create_resource(self, t, stack, resource_name):
rsrc = swift.SwiftContainer(
self.m.ReplayAll()
t = template_format.parse(swift_template)
t['Resources']['SwiftContainer']['Properties']['name'] = 'the_name'
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = swift.SwiftContainer(
'test_resource',
t['Resources']['SwiftContainer'],
self.m.ReplayAll()
t = template_format.parse(swift_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_resource(t, stack, 'SwiftContainer')
ref_id = rsrc.FnGetRefId()
t = template_format.parse(swift_template)
properties = t['Resources']['SwiftContainer']['Properties']
properties['X-Container-Read'] = '.r:*'
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_resource(t, stack, 'SwiftContainer')
rsrc.delete()
self.m.VerifyAll()
properties = t['Resources']['SwiftContainer']['Properties']
properties['X-Container-Read'] = '.r:*'
properties['X-Container-Write'] = '.r:*'
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_resource(t, stack, 'SwiftContainer')
rsrc.delete()
self.m.VerifyAll()
self.m.ReplayAll()
t = template_format.parse(swift_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_resource(t, stack, 'SwiftContainerWebsite')
rsrc.delete()
self.m.VerifyAll()
self.m.ReplayAll()
t = template_format.parse(swift_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_resource(t, stack, 'SwiftContainer')
rsrc.delete()
container = t['Resources']['SwiftContainer']
container['DeletionPolicy'] = 'Retain'
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_resource(t, stack, 'SwiftContainer')
rsrc.delete()
self.assertEqual((rsrc.DELETE, rsrc.COMPLETE), rsrc.state)
from heat.engine import clients
from heat.common import template_format
from heat.tests.common import HeatTestCase
-from heat.tests.utils import setup_dummy_db
-from heat.tests.utils import parse_stack
+from heat.tests import utils
class JsonToYamlTest(HeatTestCase):
super(JsonYamlResolvedCompareTest, self).setUp()
self.longMessage = True
self.maxDiff = None
- setup_dummy_db()
+ utils.setup_dummy_db()
def load_template(self, file_name):
filepath = os.path.join(os.path.dirname(os.path.realpath(__file__)),
t2 = self.load_template(yaml_file)
del(t2[u'HeatTemplateFormatVersion'])
- stack1 = parse_stack(t1, parameters)
- stack2 = parse_stack(t2, parameters)
+ stack1 = utils.parse_stack(t1, parameters)
+ stack2 = utils.parse_stack(t2, parameters)
# compare resources separately so that resolved static data
# is compared
from heat.tests.common import HeatTestCase
from heat.tests import fakes
from heat.tests import utils
-from heat.tests.utils import setup_dummy_db
-from heat.tests.utils import parse_stack
import keystoneclient.exceptions
username = utils.PhysName('test_stack', 'CfnUser')
self.fc = fakes.FakeKeystoneClient(username=username)
cfg.CONF.set_default('heat_stack_user_role', 'stack_user_role')
- setup_dummy_db()
+ utils.setup_dummy_db()
class UserTest(UserPolicyTestCase):
self.m.ReplayAll()
t = template_format.parse(user_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_user(t, stack, 'CfnUser')
self.assertEqual(self.fc.user_id, rsrc.resource_id)
self.m.ReplayAll()
t = template_format.parse(user_policy_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_user(t, stack, 'CfnUser')
self.assertEqual(self.fc.user_id, rsrc.resource_id)
t = template_format.parse(user_policy_template)
t['Resources']['CfnUser']['Properties']['Policies'] = ['NoExistBad']
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
resource_name = 'CfnUser'
rsrc = user.User(resource_name,
t['Resources'][resource_name],
self.m.ReplayAll()
t = template_format.parse(user_policy_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_user(t, stack, 'CfnUser')
self.assertEqual(self.fc.user_id, rsrc.resource_id)
t = template_format.parse(user_policy_template)
t['Resources']['CfnUser']['Properties']['Policies'] = [
'WebServerAccessPolicy', {'an_ignored': 'policy'}]
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = self.create_user(t, stack, 'CfnUser')
self.assertEqual(self.fc.user_id, rsrc.resource_id)
t = template_format.parse(user_accesskey_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
self.create_user(t, stack, 'CfnUser')
rsrc = self.create_access_key(t, stack, 'HostKeys')
self.m.ReplayAll()
t = template_format.parse(user_accesskey_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
self.create_user(t, stack, 'CfnUser')
rsrc = self.create_access_key(t, stack, 'HostKeys')
t = template_format.parse(user_accesskey_template)
# Set the resource properties UserName to an unknown user
t['Resources']['HostKeys']['Properties']['UserName'] = 'NonExistant'
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
stack.resources['CfnUser'].resource_id = self.fc.user_id
rsrc = user.AccessKey('HostKeys',
def test_accesspolicy_create_ok(self):
t = template_format.parse(user_policy_template)
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
resource_name = 'WebServerAccessPolicy'
rsrc = user.AccessPolicy(resource_name,
t = template_format.parse(user_policy_template)
resource_name = 'WebServerAccessPolicy'
t['Resources'][resource_name]['Properties']['AllowedResources'] = []
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = user.AccessPolicy(resource_name,
t['Resources'][resource_name],
resource_name = 'WebServerAccessPolicy'
t['Resources'][resource_name]['Properties']['AllowedResources'] = [
'NoExistResource']
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = user.AccessPolicy(resource_name,
t['Resources'][resource_name],
def test_accesspolicy_update(self):
t = template_format.parse(user_policy_template)
resource_name = 'WebServerAccessPolicy'
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = user.AccessPolicy(resource_name,
t['Resources'][resource_name],
def test_accesspolicy_access_allowed(self):
t = template_format.parse(user_policy_template)
resource_name = 'WebServerAccessPolicy'
- stack = parse_stack(t)
+ stack = utils.parse_stack(t)
rsrc = user.AccessPolicy(resource_name,
t['Resources'][resource_name],
import heat.db.api as db_api
from heat.engine import parser
from heat.tests.common import HeatTestCase
-from heat.tests.utils import dummy_context
-from heat.tests.utils import setup_dummy_db
+from heat.tests import utils
test_template_volumeattach = '''
{
resources.initialise()
self.fc = fakes.FakeClient()
resources.initialise()
- setup_dummy_db()
- self.ctx = dummy_context()
+ utils.setup_dummy_db()
+ self.ctx = utils.dummy_context()
def test_validate_volumeattach_valid(self):
t = template_format.parse(test_template_volumeattach % 'vdq')
from heat.tests.common import HeatTestCase
from heat.tests.v1_1 import fakes
from heat.tests import utils
-from heat.tests.utils import setup_dummy_db
-from heat.tests.utils import parse_stack
from cinderclient.v1 import client as cinderclient
self.m.StubOutWithMock(self.cinder_fc.volumes, 'delete')
self.m.StubOutWithMock(self.fc.volumes, 'create_server_volume')
self.m.StubOutWithMock(self.fc.volumes, 'delete_server_volume')
- setup_dummy_db()
+ utils.setup_dummy_db()
def create_volume(self, t, stack, resource_name):
data = t['Resources'][resource_name]
self.m.ReplayAll()
t = template_format.parse(volume_template)
- stack = parse_stack(t, stack_name=stack_name)
+ stack = utils.parse_stack(t, stack_name=stack_name)
rsrc = self.create_volume(t, stack, 'DataVolume')
self.assertEqual(fv.status, 'available')
self.m.ReplayAll()
t = template_format.parse(volume_template)
- stack = parse_stack(t, stack_name=stack_name)
+ stack = utils.parse_stack(t, stack_name=stack_name)
rsrc = stack['DataVolume']
self.assertEqual(rsrc.validate(), None)
t = template_format.parse(volume_template)
t['Resources']['DataVolume']['Properties']['AvailabilityZone'] = 'nova'
- stack = parse_stack(t, stack_name=stack_name)
+ stack = utils.parse_stack(t, stack_name=stack_name)
rsrc = vol.Volume('DataVolume',
t['Resources']['DataVolume'],
t = template_format.parse(volume_template)
t['Resources']['DataVolume']['Properties']['AvailabilityZone'] = 'nova'
- stack = parse_stack(t, stack_name=stack_name)
+ stack = utils.parse_stack(t, stack_name=stack_name)
scheduler.TaskRunner(stack['DataVolume'].create)()
self.assertEqual(fv.status, 'available')
t = template_format.parse(volume_template)
t['Resources']['DataVolume']['Properties']['AvailabilityZone'] = 'nova'
- stack = parse_stack(t, stack_name=stack_name)
+ stack = utils.parse_stack(t, stack_name=stack_name)
scheduler.TaskRunner(stack['DataVolume'].create)()
self.assertEqual(fv.status, 'available')
t = template_format.parse(volume_template)
t['Resources']['DataVolume']['Properties']['AvailabilityZone'] = 'nova'
- stack = parse_stack(t, stack_name=stack_name)
+ stack = utils.parse_stack(t, stack_name=stack_name)
scheduler.TaskRunner(stack['DataVolume'].create)()
self.assertEqual(fv.status, 'available')
t = template_format.parse(volume_template)
t['Resources']['DataVolume']['Properties']['AvailabilityZone'] = 'nova'
- stack = parse_stack(t, stack_name=stack_name)
+ stack = utils.parse_stack(t, stack_name=stack_name)
scheduler.TaskRunner(stack['DataVolume'].create)()
rsrc = self.create_attachment(t, stack, 'MountPoint')
t = template_format.parse(volume_template)
t['Resources']['DataVolume']['Properties']['AvailabilityZone'] = 'nova'
- stack = parse_stack(t, stack_name=stack_name)
+ stack = utils.parse_stack(t, stack_name=stack_name)
scheduler.TaskRunner(stack['DataVolume'].create)()
self.assertEqual(fv.status, 'available')
t = template_format.parse(volume_template)
t['Resources']['DataVolume']['Properties']['AvailabilityZone'] = 'nova'
- stack = parse_stack(t, stack_name=stack_name)
+ stack = utils.parse_stack(t, stack_name=stack_name)
scheduler.TaskRunner(stack['DataVolume'].create)()
self.assertEqual(fv.status, 'available')
t = template_format.parse(volume_template)
t['Resources']['DataVolume']['DeletionPolicy'] = 'Snapshot'
- stack = parse_stack(t, stack_name=stack_name)
+ stack = utils.parse_stack(t, stack_name=stack_name)
rsrc = self.create_volume(t, stack, 'DataVolume')
t = template_format.parse(volume_template)
t['Resources']['DataVolume']['DeletionPolicy'] = 'Snapshot'
- stack = parse_stack(t, stack_name=stack_name)
+ stack = utils.parse_stack(t, stack_name=stack_name)
rsrc = self.create_volume(t, stack, 'DataVolume')
t = template_format.parse(volume_template)
t['Resources']['DataVolume']['DeletionPolicy'] = 'Snapshot'
t['Resources']['DataVolume']['Properties']['AvailabilityZone'] = 'nova'
- stack = parse_stack(t, stack_name=stack_name)
+ stack = utils.parse_stack(t, stack_name=stack_name)
rsrc = vol.Volume('DataVolume',
t['Resources']['DataVolume'],
stack)
t = template_format.parse(volume_template)
t['Resources']['DataVolume']['Properties']['SnapshotId'] = 'backup-123'
- stack = parse_stack(t, stack_name=stack_name)
+ stack = utils.parse_stack(t, stack_name=stack_name)
self.create_volume(t, stack, 'DataVolume')
self.assertEqual(fv.status, 'available')
t = template_format.parse(volume_template)
t['Resources']['DataVolume']['Properties']['SnapshotId'] = 'backup-123'
t['Resources']['DataVolume']['Properties']['AvailabilityZone'] = 'nova'
- stack = parse_stack(t, stack_name=stack_name)
+ stack = utils.parse_stack(t, stack_name=stack_name)
rsrc = vol.Volume('DataVolume',
t['Resources']['DataVolume'],
'snapshot_id': 'snap-123',
'source_volid': 'vol-012',
}
- stack = parse_stack(t, stack_name=stack_name)
+ stack = utils.parse_stack(t, stack_name=stack_name)
rsrc = vol.CinderVolume('DataVolume',
t['Resources']['DataVolume'],
'size': '1',
'availability_zone': 'nova',
}
- stack = parse_stack(t, stack_name=stack_name)
+ stack = utils.parse_stack(t, stack_name=stack_name)
rsrc = vol.CinderVolume('DataVolume',
t['Resources']['DataVolume'],
'size': '1',
'availability_zone': 'nova',
}
- stack = parse_stack(t, stack_name=stack_name)
+ stack = utils.parse_stack(t, stack_name=stack_name)
rsrc = vol.CinderVolume('DataVolume',
t['Resources']['DataVolume'],
'volume_id': {'Ref': 'DataVolume'},
'mountpoint': '/dev/vdc'
}
- stack = parse_stack(t, stack_name=stack_name)
+ stack = utils.parse_stack(t, stack_name=stack_name)
scheduler.TaskRunner(stack['DataVolume'].create)()
self.assertEqual(fv.status, 'available')
from heat.tests.common import HeatTestCase
from heat.tests import fakes
from heat.tests import utils
-from heat.tests.utils import dummy_context
-from heat.tests.utils import setup_dummy_db
try:
from neutronclient.common.exceptions import NeutronClientException
@skipIf(neutronclient is None, 'neutronclient unavaialble')
def setUp(self):
super(VPCTestBase, self).setUp()
- setup_dummy_db()
+ utils.setup_dummy_db()
self.m.StubOutWithMock(neutronclient.Client, 'add_interface_router')
self.m.StubOutWithMock(neutronclient.Client, 'add_gateway_router')
self.m.StubOutWithMock(neutronclient.Client, 'create_network')
def parse_stack(self, t):
stack_name = 'test_stack'
tmpl = parser.Template(t)
- stack = parser.Stack(dummy_context(), stack_name, tmpl)
+ stack = parser.Stack(utils.dummy_context(), stack_name, tmpl)
stack.store()
return stack
from heat.tests.common import HeatTestCase
from heat.tests import fakes
from heat.tests import utils
-from heat.tests.utils import reset_dummy_db
import heat.db.api as db_api
from heat.common import template_format
def tearDown(self):
super(WaitConditionTest, self).tearDown()
- reset_dummy_db()
+ utils.reset_dummy_db()
# Note tests creating a stack should be decorated with @stack_delete_after
# to ensure the stack is properly cleaned up
def tearDown(self):
super(WaitConditionHandleTest, self).tearDown()
- reset_dummy_db()
+ utils.reset_dummy_db()
def create_stack(self, stack_name='test_stack2', params={}):
temp = template_format.parse(test_template_waitcondition)
from heat.engine import parser
from heat.tests.common import HeatTestCase
from heat.tests import utils
-from heat.tests.utils import dummy_context
class WatchData(object):
# Create a dummy stack in the DB as WatchRule instances
# must be associated with a stack
utils.setup_dummy_db()
- ctx = dummy_context()
+ ctx = utils.dummy_context()
ctx.auth_token = 'abcd1234'
empty_tmpl = {"template": {}}
tmpl = parser.Template(empty_tmpl)
self.setUpDatabase()
self.username = 'watchrule_test_user'
- self.ctx = dummy_context()
+ self.ctx = utils.dummy_context()
self.ctx.auth_token = 'abcd1234'
self.m.ReplayAll()