except AssertionError:
extra_keys = set(keys).difference(set(required + optional))
raise AssertionError("found unexpected keys: %s" %
- list(extra_keys))
+ list(extra_keys))
class FakeClient(object):
called = self.client.callstack[pos][0:2]
assert self.client.callstack, \
- "Expected %s %s but no calls were made." % expected
+ "Expected %s %s but no calls were made." % expected
assert expected == called, 'Expected %s %s; got %s %s' % \
- (expected + called)
+ (expected + called)
if body is not None:
assert self.client.callstack[pos][2] == body
expected = (method, url)
assert self.client.callstack, \
- "Expected %s %s but no calls were made." % expected
+ "Expected %s %s but no calls were made." % expected
found = False
for entry in self.client.callstack:
break
assert found, 'Expected %s %s; got %s' % \
- (expected, self.client.callstack)
+ (expected, self.client.callstack)
if body is not None:
try:
assert entry[2] == body
'Parameters.member.2.ParameterKey': 'blarg',
'Parameters.member.2.ParameterValue': 'wibble'}
params = api_utils.extract_param_pairs(p, prefix='Parameters',
- keyname='ParameterKey',
- valuename='ParameterValue')
+ keyname='ParameterKey',
+ valuename='ParameterValue')
self.assertEqual(len(params), 2)
self.assertTrue('foo' in params)
self.assertEqual(params['foo'], 'bar')
'Parameters.member.2.1.ParameterKey': 'blarg',
'Parameters.member.2.1.ParameterValue': 'wibble'}
params = api_utils.extract_param_pairs(p, prefix='Parameters',
- keyname='ParameterKey',
- valuename='ParameterValue')
+ keyname='ParameterKey',
+ valuename='ParameterValue')
self.assertFalse(params)
def test_params_extract_garbage(self):
'Foo.1.ParameterKey': 'blarg',
'Foo.1.ParameterValue': 'wibble'}
params = api_utils.extract_param_pairs(p, prefix='Parameters',
- keyname='ParameterKey',
- valuename='ParameterValue')
+ keyname='ParameterKey',
+ valuename='ParameterValue')
self.assertEqual(len(params), 1)
self.assertTrue('foo' in params)
self.assertEqual(params['foo'], 'bar')
p = {'prefixParameters.member.Foo.Bar.ParameterKey': 'foo',
'Parameters.member.Foo.Bar.ParameterValue': 'bar'}
params = api_utils.extract_param_pairs(p, prefix='Parameters',
- keyname='ParameterKey',
- valuename='ParameterValue')
+ keyname='ParameterKey',
+ valuename='ParameterValue')
self.assertFalse(params)
def test_params_extract_garbage_suffix(self):
p = {'Parameters.member.1.ParameterKeysuffix': 'foo',
'Parameters.member.1.ParameterValue': 'bar'}
params = api_utils.extract_param_pairs(p, prefix='Parameters',
- keyname='ParameterKey',
- valuename='ParameterValue')
+ keyname='ParameterKey',
+ valuename='ParameterValue')
self.assertFalse(params)
def test_extract_param_list(self):
p = {'MetricData.member.1.MetricName': 'foo',
- 'MetricData.member.1.Unit': 'Bytes',
- 'MetricData.member.1.Value': 234333}
+ 'MetricData.member.1.Unit': 'Bytes',
+ 'MetricData.member.1.Value': 234333}
params = api_utils.extract_param_list(p, prefix='MetricData')
self.assertEqual(len(params), 1)
self.assertTrue('MetricName' in params[0])
def test_extract_param_list_garbage_prefix(self):
p = {'AMetricData.member.1.MetricName': 'foo',
- 'MetricData.member.1.Unit': 'Bytes',
- 'MetricData.member.1.Value': 234333}
+ 'MetricData.member.1.Unit': 'Bytes',
+ 'MetricData.member.1.Value': 234333}
params = api_utils.extract_param_list(p, prefix='MetricData')
self.assertEqual(len(params), 1)
self.assertTrue('MetricName' not in params[0])
def test_extract_param_list_garbage_prefix2(self):
p = {'AMetricData.member.1.MetricName': 'foo',
- 'BMetricData.member.1.Unit': 'Bytes',
- 'CMetricData.member.1.Value': 234333}
+ 'BMetricData.member.1.Unit': 'Bytes',
+ 'CMetricData.member.1.Value': 234333}
params = api_utils.extract_param_list(p, prefix='MetricData')
self.assertEqual(len(params), 0)
def test_extract_param_list_garbage_suffix(self):
p = {'MetricData.member.1.AMetricName': 'foo',
- 'MetricData.member.1.Unit': 'Bytes',
- 'MetricData.member.1.Value': 234333}
+ 'MetricData.member.1.Unit': 'Bytes',
+ 'MetricData.member.1.Value': 234333}
params = api_utils.extract_param_list(p, prefix='MetricData')
self.assertEqual(len(params), 1)
self.assertTrue('MetricName' not in params[0])
def test_extract_param_list_multiple(self):
p = {'MetricData.member.1.MetricName': 'foo',
- 'MetricData.member.1.Unit': 'Bytes',
- 'MetricData.member.1.Value': 234333,
- 'MetricData.member.2.MetricName': 'foo2',
- 'MetricData.member.2.Unit': 'Bytes',
- 'MetricData.member.2.Value': 12345}
+ 'MetricData.member.1.Unit': 'Bytes',
+ 'MetricData.member.1.Value': 234333,
+ 'MetricData.member.2.MetricName': 'foo2',
+ 'MetricData.member.2.Unit': 'Bytes',
+ 'MetricData.member.2.Value': 12345}
params = api_utils.extract_param_list(p, prefix='MetricData')
self.assertEqual(len(params), 2)
self.assertTrue('MetricName' in params[0])
def test_extract_param_list_multiple_missing(self):
# Handle case where there is an empty list item
p = {'MetricData.member.1.MetricName': 'foo',
- 'MetricData.member.1.Unit': 'Bytes',
- 'MetricData.member.1.Value': 234333,
- 'MetricData.member.3.MetricName': 'foo2',
- 'MetricData.member.3.Unit': 'Bytes',
- 'MetricData.member.3.Value': 12345}
+ 'MetricData.member.1.Unit': 'Bytes',
+ 'MetricData.member.1.Value': 234333,
+ 'MetricData.member.3.MetricName': 'foo2',
+ 'MetricData.member.3.Unit': 'Bytes',
+ 'MetricData.member.3.Value': 12345}
params = api_utils.extract_param_list(p, prefix='MetricData')
self.assertEqual(len(params), 2)
self.assertTrue('MetricName' in params[0])
def test_extract_param_list_badindex(self):
p = {'MetricData.member.xyz.MetricName': 'foo',
- 'MetricData.member.$!&^.Unit': 'Bytes',
- 'MetricData.member.+.Value': 234333,
- 'MetricData.member.--.MetricName': 'foo2',
- 'MetricData.member._3.Unit': 'Bytes',
- 'MetricData.member.-1000.Value': 12345}
+ 'MetricData.member.$!&^.Unit': 'Bytes',
+ 'MetricData.member.+.Value': 234333,
+ 'MetricData.member.--.MetricName': 'foo2',
+ 'MetricData.member._3.Unit': 'Bytes',
+ 'MetricData.member.-1000.Value': 12345}
params = api_utils.extract_param_list(p, prefix='MetricData')
self.assertEqual(len(params), 0)
# Stub out the RPC call to the engine with a pre-canned response
engine_resp = {u'stacks': [
- {u'stack_identity': {u'tenant': u't',
- u'stack_name': u'wordpress',
- u'stack_id': u'1',
- u'path': u''},
+ {u'stack_identity': {u'tenant': u't',
+ u'stack_name': u'wordpress',
+ u'stack_id': u'1',
+ u'path': u''},
u'updated_time': u'2012-07-09T09:13:11Z',
u'template_description': u'blah',
u'stack_status_reason': u'Stack successfully created',
u'stack_name': u'wordpress',
u'stack_status': u'CREATE_COMPLETE'}]}
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'list_stacks',
- 'args': {},
- 'version': self.api_version}, None
- ).AndReturn(engine_resp)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'list_stacks',
+ 'args': {},
+ 'version': self.api_version},
+ None).AndReturn(engine_resp)
self.m.ReplayAll()
# Call the list controller function and compare the response
result = self.controller.list(dummy_req)
expected = {'ListStacksResponse': {'ListStacksResult':
- {'StackSummaries': [
- {u'StackId': u'arn:openstack:heat::t:stacks/wordpress/1',
- u'LastUpdatedTime': u'2012-07-09T09:13:11Z',
- u'TemplateDescription': u'blah',
- u'StackStatusReason': u'Stack successfully created',
- u'CreationTime': u'2012-07-09T09:12:45Z',
- u'StackName': u'wordpress', u'StackStatus': u'CREATE_COMPLETE'}]}}}
+ {'StackSummaries':
+ [{u'StackId': u'arn:openstack:heat::t:stacks/wordpress/1',
+ u'LastUpdatedTime': u'2012-07-09T09:13:11Z',
+ u'TemplateDescription': u'blah',
+ u'StackStatusReason': u'Stack successfully created',
+ u'CreationTime': u'2012-07-09T09:12:45Z',
+ u'StackName': u'wordpress',
+ u'StackStatus': u'CREATE_COMPLETE'}]}}}
self.assertEqual(result, expected)
def test_list_rmt_aterr(self):
# Insert an engine RPC error and ensure we map correctly to the
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'list_stacks',
- 'args': {},
- 'version': self.api_version}, None
- ).AndRaise(rpc_common.RemoteError("AttributeError"))
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'list_stacks',
+ 'args': {},
+ 'version': self.api_version},
+ None).AndRaise(rpc_common.RemoteError("AttributeError"))
self.m.ReplayAll()
# Insert an engine RPC error and ensure we map correctly to the
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'list_stacks',
- 'args': {},
- 'version': self.api_version}, None
- ).AndRaise(rpc_common.RemoteError("Exception"))
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'list_stacks',
+ 'args': {},
+ 'version': self.api_version},
+ None).AndRaise(rpc_common.RemoteError("Exception"))
self.m.ReplayAll()
# Stub out the RPC call to the engine with a pre-canned response
# Note the engine returns a load of keys we don't actually use
# so this is a subset of the real response format
- engine_resp = {u'stacks': [
- {u'stack_identity': {u'tenant': u't',
- u'stack_name': u'wordpress',
- u'stack_id': u'6',
- u'path': u''},
- u'updated_time': u'2012-07-09T09:13:11Z',
- u'parameters':{
- u'DBUsername': u'admin',
- u'LinuxDistribution': u'F17',
- u'InstanceType': u'm1.large',
- u'DBRootPassword': u'admin',
- u'DBPassword': u'admin',
- u'DBName': u'wordpress'},
- u'outputs':
- [{u'output_key': u'WebsiteURL',
- u'description': u'URL for Wordpress wiki',
- u'output_value': u'http://10.0.0.8/wordpress'}],
- u'stack_status_reason': u'Stack successfully created',
- u'creation_time': u'2012-07-09T09:12:45Z',
- u'stack_name': u'wordpress',
- u'notification_topics': [],
- u'stack_status': u'CREATE_COMPLETE',
- u'description': u'blah',
- u'disable_rollback': True,
- u'timeout_mins':60,
- u'capabilities':[]}]}
+ engine_resp = {u'stacks':
+ [{u'stack_identity':
+ {u'tenant': u't',
+ u'stack_name': u'wordpress',
+ u'stack_id': u'6',
+ u'path': u''},
+ u'updated_time': u'2012-07-09T09:13:11Z',
+ u'parameters': {u'DBUsername': u'admin',
+ u'LinuxDistribution': u'F17',
+ u'InstanceType': u'm1.large',
+ u'DBRootPassword': u'admin',
+ u'DBPassword': u'admin',
+ u'DBName': u'wordpress'},
+ u'outputs':
+ [{u'output_key': u'WebsiteURL',
+ u'description': u'URL for Wordpress wiki',
+ u'output_value': u'http://10.0.0.8/wordpress'}],
+ u'stack_status_reason': u'Stack successfully created',
+ u'creation_time': u'2012-07-09T09:12:45Z',
+ u'stack_name': u'wordpress',
+ u'notification_topics': [],
+ u'stack_status': u'CREATE_COMPLETE',
+ u'description': u'blah',
+ u'disable_rollback': True,
+ u'timeout_mins':60,
+ u'capabilities':[]}]}
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None).AndReturn(identity)
- rpc.call(dummy_req.context, self.topic, {'method': 'show_stack',
- 'args': {'stack_identity': identity},
- 'version': self.api_version}, None).AndReturn(engine_resp)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None).AndReturn(identity)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'show_stack',
+ 'args': {'stack_identity': identity},
+ 'version': self.api_version}, None).AndReturn(engine_resp)
self.m.ReplayAll()
# Call the list controller function and compare the response
response = self.controller.describe(dummy_req)
- expected = {'DescribeStacksResponse': {'DescribeStacksResult':
- {'Stacks':
- [{'StackId': u'arn:openstack:heat::t:stacks/wordpress/6',
- 'StackStatusReason': u'Stack successfully created',
- 'Description': u'blah',
- 'Parameters':
- [{'ParameterValue': u'admin',
- 'ParameterKey': u'DBUsername'},
- {'ParameterValue': u'F17',
- 'ParameterKey': u'LinuxDistribution'},
- {'ParameterValue': u'm1.large',
- 'ParameterKey': u'InstanceType'},
- {'ParameterValue': u'admin',
- 'ParameterKey': u'DBRootPassword'},
- {'ParameterValue': u'admin',
- 'ParameterKey': u'DBPassword'},
- {'ParameterValue': u'wordpress',
- 'ParameterKey': u'DBName'}],
- 'Outputs':
- [{'OutputKey': u'WebsiteURL',
- 'OutputValue': u'http://10.0.0.8/wordpress',
- 'Description': u'URL for Wordpress wiki'}],
- 'TimeoutInMinutes': 60,
- 'CreationTime': u'2012-07-09T09:12:45Z',
- 'Capabilities': [],
- 'StackName': u'wordpress',
- 'NotificationARNs': [],
- 'StackStatus': u'CREATE_COMPLETE',
- 'DisableRollback': True,
- 'LastUpdatedTime': u'2012-07-09T09:13:11Z'}]}}}
+ expected = {'DescribeStacksResponse':
+ {'DescribeStacksResult':
+ {'Stacks':
+ [{'StackId': u'arn:openstack:heat::t:stacks/wordpress/6',
+ 'StackStatusReason': u'Stack successfully created',
+ 'Description': u'blah',
+ 'Parameters':
+ [{'ParameterValue': u'admin',
+ 'ParameterKey': u'DBUsername'},
+ {'ParameterValue': u'F17',
+ 'ParameterKey': u'LinuxDistribution'},
+ {'ParameterValue': u'm1.large',
+ 'ParameterKey': u'InstanceType'},
+ {'ParameterValue': u'admin',
+ 'ParameterKey': u'DBRootPassword'},
+ {'ParameterValue': u'admin',
+ 'ParameterKey': u'DBPassword'},
+ {'ParameterValue': u'wordpress',
+ 'ParameterKey': u'DBName'}],
+ 'Outputs':
+ [{'OutputKey': u'WebsiteURL',
+ 'OutputValue': u'http://10.0.0.8/wordpress',
+ 'Description': u'URL for Wordpress wiki'}],
+ 'TimeoutInMinutes': 60,
+ 'CreationTime': u'2012-07-09T09:12:45Z',
+ 'Capabilities': [],
+ 'StackName': u'wordpress',
+ 'NotificationARNs': [],
+ 'StackStatus': u'CREATE_COMPLETE',
+ 'DisableRollback': True,
+ 'LastUpdatedTime': u'2012-07-09T09:13:11Z'}]}}}
self.assertEqual(response, expected)
# Stub out the RPC call to the engine with a pre-canned response
# Note the engine returns a load of keys we don't actually use
# so this is a subset of the real response format
- engine_resp = {u'stacks': [
- {u'stack_identity': {u'tenant': u't',
- u'stack_name': u'wordpress',
- u'stack_id': u'6',
- u'path': u''},
- u'updated_time': u'2012-07-09T09:13:11Z',
- u'parameters':{
- u'DBUsername': u'admin',
- u'LinuxDistribution': u'F17',
- u'InstanceType': u'm1.large',
- u'DBRootPassword': u'admin',
- u'DBPassword': u'admin',
- u'DBName': u'wordpress'},
- u'outputs':
- [{u'output_key': u'WebsiteURL',
- u'description': u'URL for Wordpress wiki',
- u'output_value': u'http://10.0.0.8/wordpress'}],
- u'stack_status_reason': u'Stack successfully created',
- u'creation_time': u'2012-07-09T09:12:45Z',
- u'stack_name': u'wordpress',
- u'notification_topics': [],
- u'stack_status': u'CREATE_COMPLETE',
- u'description': u'blah',
- u'disable_rollback': True,
- u'timeout_mins':60,
- u'capabilities':[]}]}
+ engine_resp = {u'stacks':
+ [{u'stack_identity': {u'tenant': u't',
+ u'stack_name': u'wordpress',
+ u'stack_id': u'6',
+ u'path': u''},
+ u'updated_time': u'2012-07-09T09:13:11Z',
+ u'parameters': {u'DBUsername': u'admin',
+ u'LinuxDistribution': u'F17',
+ u'InstanceType': u'm1.large',
+ u'DBRootPassword': u'admin',
+ u'DBPassword': u'admin',
+ u'DBName': u'wordpress'},
+ u'outputs':
+ [{u'output_key': u'WebsiteURL',
+ u'description': u'URL for Wordpress wiki',
+ u'output_value': u'http://10.0.0.8/wordpress'}],
+ u'stack_status_reason': u'Stack successfully created',
+ u'creation_time': u'2012-07-09T09:12:45Z',
+ u'stack_name': u'wordpress',
+ u'notification_topics': [],
+ u'stack_status': u'CREATE_COMPLETE',
+ u'description': u'blah',
+ u'disable_rollback': True,
+ u'timeout_mins':60,
+ u'capabilities':[]}]}
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'show_stack',
- 'args': {'stack_identity': identity},
- 'version': self.api_version}, None).AndReturn(engine_resp)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'show_stack',
+ 'args': {'stack_identity': identity},
+ 'version': self.api_version}, None).AndReturn(engine_resp)
self.m.ReplayAll()
# Call the list controller function and compare the response
response = self.controller.describe(dummy_req)
- expected = {'DescribeStacksResponse': {'DescribeStacksResult':
- {'Stacks':
- [{'StackId': u'arn:openstack:heat::t:stacks/wordpress/6',
- 'StackStatusReason': u'Stack successfully created',
- 'Description': u'blah',
- 'Parameters':
- [{'ParameterValue': u'admin',
- 'ParameterKey': u'DBUsername'},
- {'ParameterValue': u'F17',
- 'ParameterKey': u'LinuxDistribution'},
- {'ParameterValue': u'm1.large',
- 'ParameterKey': u'InstanceType'},
- {'ParameterValue': u'admin',
- 'ParameterKey': u'DBRootPassword'},
- {'ParameterValue': u'admin',
- 'ParameterKey': u'DBPassword'},
- {'ParameterValue': u'wordpress',
- 'ParameterKey': u'DBName'}],
- 'Outputs':
- [{'OutputKey': u'WebsiteURL',
- 'OutputValue': u'http://10.0.0.8/wordpress',
- 'Description': u'URL for Wordpress wiki'}],
- 'TimeoutInMinutes': 60,
- 'CreationTime': u'2012-07-09T09:12:45Z',
- 'Capabilities': [],
- 'StackName': u'wordpress',
- 'NotificationARNs': [],
- 'StackStatus': u'CREATE_COMPLETE',
- 'DisableRollback': True,
- 'LastUpdatedTime': u'2012-07-09T09:13:11Z'}]}}}
+ expected = {'DescribeStacksResponse':
+ {'DescribeStacksResult':
+ {'Stacks':
+ [{'StackId': u'arn:openstack:heat::t:stacks/wordpress/6',
+ 'StackStatusReason': u'Stack successfully created',
+ 'Description': u'blah',
+ 'Parameters':
+ [{'ParameterValue': u'admin',
+ 'ParameterKey': u'DBUsername'},
+ {'ParameterValue': u'F17',
+ 'ParameterKey': u'LinuxDistribution'},
+ {'ParameterValue': u'm1.large',
+ 'ParameterKey': u'InstanceType'},
+ {'ParameterValue': u'admin',
+ 'ParameterKey': u'DBRootPassword'},
+ {'ParameterValue': u'admin',
+ 'ParameterKey': u'DBPassword'},
+ {'ParameterValue': u'wordpress',
+ 'ParameterKey': u'DBName'}],
+ 'Outputs':
+ [{'OutputKey': u'WebsiteURL',
+ 'OutputValue': u'http://10.0.0.8/wordpress',
+ 'Description': u'URL for Wordpress wiki'}],
+ 'TimeoutInMinutes': 60,
+ 'CreationTime': u'2012-07-09T09:12:45Z',
+ 'Capabilities': [],
+ 'StackName': u'wordpress',
+ 'NotificationARNs': [],
+ 'StackStatus': u'CREATE_COMPLETE',
+ 'DisableRollback': True,
+ 'LastUpdatedTime': u'2012-07-09T09:13:11Z'}]}}}
self.assertEqual(response, expected)
# Insert an engine RPC error and ensure we map correctly to the
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None).AndReturn(identity)
- rpc.call(dummy_req.context, self.topic, {'method': 'show_stack',
- 'args': {'stack_identity': identity},
- 'version': self.api_version}, None
- ).AndRaise(rpc_common.RemoteError("AttributeError"))
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None).AndReturn(identity)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'show_stack',
+ 'args': {'stack_identity': identity},
+ 'version': self.api_version}, None
+ ).AndRaise(rpc_common.RemoteError("AttributeError"))
self.m.ReplayAll()
# Insert an engine RPC error and ensure we map correctly to the
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None
- ).AndRaise(rpc_common.RemoteError("AttributeError"))
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None
+ ).AndRaise(rpc_common.RemoteError("AttributeError"))
self.m.ReplayAll()
u'path': u''}
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'create_stack',
- 'args':
- {'stack_name': stack_name,
- 'template': template,
- 'params': engine_parms,
- 'args': engine_args},
- 'version': self.api_version}, None).AndReturn(engine_resp)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'create_stack',
+ 'args': {'stack_name': stack_name,
+ 'template': template,
+ 'params': engine_parms,
+ 'args': engine_args},
+ 'version': self.api_version}, None).AndReturn(engine_resp)
self.m.ReplayAll()
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'create_stack',
- 'args':
- {'stack_name': stack_name,
- 'template': template,
- 'params': engine_parms,
- 'args': engine_args},
- 'version': self.api_version}, None
- ).AndRaise(rpc_common.RemoteError("AttributeError"))
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'create_stack',
+ 'args': {'stack_name': stack_name,
+ 'template': template,
+ 'params': engine_parms,
+ 'args': engine_args},
+ 'version': self.api_version}, None
+ ).AndRaise(rpc_common.RemoteError("AttributeError"))
self.m.ReplayAll()
self.m.StubOutWithMock(rpc, 'call')
engine_err = {'Description': 'Something went wrong'}
- rpc.call(dummy_req.context, self.topic, {'method': 'create_stack',
- 'args':
- {'stack_name': stack_name,
- 'template': template,
- 'params': engine_parms,
- 'args': engine_args},
- 'version': self.api_version}, None
- ).AndReturn(engine_err)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'create_stack',
+ 'args': {'stack_name': stack_name,
+ 'template': template,
+ 'params': engine_parms,
+ 'args': engine_args},
+ 'version': self.api_version}, None).AndReturn(engine_err)
self.m.ReplayAll()
identity = dict(identifier.HeatIdentifier('t', stack_name, '1'))
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None).AndReturn(identity)
-
- rpc.call(dummy_req.context, self.topic, {'method': 'update_stack',
- 'args':
- {'stack_identity': identity,
- 'template': template,
- 'params': engine_parms,
- 'args': engine_args},
- 'version': self.api_version}, None).AndReturn(identity)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None).AndReturn(identity)
+
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'update_stack',
+ 'args': {'stack_identity': identity,
+ 'template': template,
+ 'params': engine_parms,
+ 'args': engine_args},
+ 'version': self.api_version},
+ None).AndReturn(identity)
self.m.ReplayAll()
# Insert an engine RPC error and ensure we map correctly to the
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None
- ).AndRaise(rpc_common.RemoteError("AttributeError"))
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None
+ ).AndRaise(rpc_common.RemoteError("AttributeError"))
self.m.ReplayAll()
engine_resp = template
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None).AndReturn(identity)
- rpc.call(dummy_req.context, self.topic, {'method': 'get_template',
- 'args':
- {'stack_identity': identity},
- 'version': self.api_version}, None).AndReturn(engine_resp)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None).AndReturn(identity)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'get_template',
+ 'args': {'stack_identity': identity},
+ 'version': self.api_version}, None).AndReturn(engine_resp)
self.m.ReplayAll()
response = self.controller.get_template(dummy_req)
- expected = {'GetTemplateResponse': {'GetTemplateResult':
- {'TemplateBody': template}}}
+ expected = {'GetTemplateResponse':
+ {'GetTemplateResult':
+ {'TemplateBody': template}}}
self.assertEqual(response, expected)
# Insert an engine RPC error and ensure we map correctly to the
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None).AndReturn(identity)
- rpc.call(dummy_req.context, self.topic, {'method': 'get_template',
- 'args':
- {'stack_identity': identity},
- 'version': self.api_version}, None
- ).AndRaise(rpc_common.RemoteError("AttributeError"))
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None).AndReturn(identity)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'get_template',
+ 'args': {'stack_identity': identity},
+ 'version': self.api_version}, None
+ ).AndRaise(rpc_common.RemoteError("AttributeError"))
self.m.ReplayAll()
# Insert an engine RPC error and ensure we map correctly to the
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None
- ).AndRaise(rpc_common.RemoteError("AttributeError"))
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None
+ ).AndRaise(rpc_common.RemoteError("AttributeError"))
self.m.ReplayAll()
engine_resp = None
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None).AndReturn(identity)
- rpc.call(dummy_req.context, self.topic, {'method': 'get_template',
- 'args':
- {'stack_identity': identity},
- 'version': self.api_version}, None).AndReturn(engine_resp)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None).AndReturn(identity)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'get_template',
+ 'args': {'stack_identity': identity},
+ 'version': self.api_version}, None).AndReturn(engine_resp)
self.m.ReplayAll()
# Stub out the RPC call to the engine with a pre-canned response
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None).AndReturn(identity)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None).AndReturn(identity)
# Engine returns None when delete successful
- rpc.call(dummy_req.context, self.topic, {'method': 'delete_stack',
- 'args': {'stack_identity': identity},
- 'version': self.api_version}, None).AndReturn(None)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'delete_stack',
+ 'args': {'stack_identity': identity},
+ 'version': self.api_version}, None).AndReturn(None)
self.m.ReplayAll()
# Stub out the RPC call to the engine with a pre-canned response
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None).AndReturn(identity)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None).AndReturn(identity)
# Insert an engine RPC error and ensure we map correctly to the
# heat exception type
- rpc.call(dummy_req.context, self.topic, {'method': 'delete_stack',
- 'args': {'stack_identity': identity},
- 'version': self.api_version}, None
- ).AndRaise(rpc_common.RemoteError("AttributeError"))
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'delete_stack',
+ 'args': {'stack_identity': identity},
+ 'version': self.api_version}, None
+ ).AndRaise(rpc_common.RemoteError("AttributeError"))
self.m.ReplayAll()
# Insert an engine RPC error and ensure we map correctly to the
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None
- ).AndRaise(rpc_common.RemoteError("AttributeError"))
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None
+ ).AndRaise(rpc_common.RemoteError("AttributeError"))
self.m.ReplayAll()
dummy_req = self._dummy_GET_request(params)
# Stub out the RPC call to the engine with a pre-canned response
- engine_resp = {u'events': [{u'stack_name': u'wordpress',
- u'event_time': u'2012-07-23T13:05:39Z',
- u'stack_identity': {u'tenant': u't',
- u'stack_name': u'wordpress',
- u'stack_id': u'6',
- u'path': u''},
- u'logical_resource_id': u'WikiDatabase',
- u'resource_status_reason': u'state changed',
- u'event_identity': {
- u'tenant': u't',
- u'stack_name': u'wordpress',
- u'stack_id': u'6',
- u'path': u'/resources/WikiDatabase/events/42'
- },
- u'resource_status': u'IN_PROGRESS',
- u'physical_resource_id': None,
- u'resource_properties':
- {u'UserData': u'blah'},
- u'resource_type': u'AWS::EC2::Instance'}]}
+ engine_resp = {u'events':
+ [{u'stack_name': u'wordpress',
+ u'event_time': u'2012-07-23T13:05:39Z',
+ u'stack_identity': {u'tenant': u't',
+ u'stack_name': u'wordpress',
+ u'stack_id': u'6',
+ u'path': u''},
+ u'logical_resource_id': u'WikiDatabase',
+ u'resource_status_reason': u'state changed',
+ u'event_identity':
+ {u'tenant': u't',
+ u'stack_name': u'wordpress',
+ u'stack_id': u'6',
+ u'path': u'/resources/WikiDatabase/events/42'},
+ u'resource_status': u'IN_PROGRESS',
+ u'physical_resource_id': None,
+ u'resource_properties': {u'UserData': u'blah'},
+ u'resource_type': u'AWS::EC2::Instance'}]}
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None).AndReturn(identity)
- rpc.call(dummy_req.context, self.topic, {'method': 'list_events',
- 'args':
- {'stack_identity': identity},
- 'version': self.api_version}, None).AndReturn(engine_resp)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None).AndReturn(identity)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'list_events',
+ 'args': {'stack_identity': identity},
+ 'version': self.api_version}, None).AndReturn(engine_resp)
self.m.ReplayAll()
response = self.controller.events_list(dummy_req)
expected = {'DescribeStackEventsResponse':
- {'DescribeStackEventsResult':
- {'StackEvents':
- [{'EventId': u'42',
- 'StackId': u'arn:openstack:heat::t:stacks/wordpress/6',
- 'ResourceStatus': u'IN_PROGRESS',
- 'ResourceType': u'AWS::EC2::Instance',
- 'Timestamp': u'2012-07-23T13:05:39Z',
- 'StackName': u'wordpress',
- 'ResourceProperties': json.dumps({u'UserData': u'blah'}),
- 'PhysicalResourceId': None,
- 'ResourceStatusReason': u'state changed',
- 'LogicalResourceId': u'WikiDatabase'}]}}}
+ {'DescribeStackEventsResult':
+ {'StackEvents':
+ [{'EventId': u'42',
+ 'StackId': u'arn:openstack:heat::t:stacks/wordpress/6',
+ 'ResourceStatus': u'IN_PROGRESS',
+ 'ResourceType': u'AWS::EC2::Instance',
+ 'Timestamp': u'2012-07-23T13:05:39Z',
+ 'StackName': u'wordpress',
+ 'ResourceProperties':
+ json.dumps({u'UserData': u'blah'}),
+ 'PhysicalResourceId': None,
+ 'ResourceStatusReason': u'state changed',
+ 'LogicalResourceId': u'WikiDatabase'}]}}}
self.assertEqual(response, expected)
# Insert an engine RPC error and ensure we map correctly to the
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None).AndReturn(identity)
- rpc.call(dummy_req.context, self.topic, {'method': 'list_events',
- 'args':
- {'stack_identity': identity},
- 'version': self.api_version}, None
- ).AndRaise(rpc_common.RemoteError("Exception"))
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None).AndReturn(identity)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'list_events',
+ 'args': {'stack_identity': identity},
+ 'version': self.api_version}, None
+ ).AndRaise(rpc_common.RemoteError("Exception"))
self.m.ReplayAll()
# Insert an engine RPC error and ensure we map correctly to the
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None
- ).AndRaise(rpc_common.RemoteError("AttributeError"))
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None
+ ).AndRaise(rpc_common.RemoteError("AttributeError"))
self.m.ReplayAll()
u'path': u''},
u'resource_status': u'CREATE_COMPLETE',
u'physical_resource_id':
- u'a3455d8c-9f88-404d-a85b-5315293e67de',
+ u'a3455d8c-9f88-404d-a85b-5315293e67de',
u'resource_type': u'AWS::EC2::Instance',
u'metadata': {u'wordpress': []}}
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None).AndReturn(identity)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None).AndReturn(identity)
args = {
'stack_identity': identity,
'resource_name': dummy_req.params.get('LogicalResourceId'),
}
rpc.call(dummy_req.context, self.topic,
- {'method': 'describe_stack_resource',
- 'args': args,
- 'version': self.api_version}, None).AndReturn(engine_resp)
+ {'method': 'describe_stack_resource',
+ 'args': args,
+ 'version': self.api_version}, None).AndReturn(engine_resp)
self.m.ReplayAll()
'LastUpdatedTimestamp': u'2012-07-23T13:06:00Z',
'StackName': u'wordpress',
'PhysicalResourceId':
- u'a3455d8c-9f88-404d-a85b-5315293e67de',
+ u'a3455d8c-9f88-404d-a85b-5315293e67de',
'Metadata': {u'wordpress': []},
'LogicalResourceId': u'WikiDatabase'}}}}
u'path': u''},
u'resource_status': u'CREATE_COMPLETE',
u'physical_resource_id':
- u'a3455d8c-9f88-404d-a85b-5315293e67de',
+ u'a3455d8c-9f88-404d-a85b-5315293e67de',
u'resource_type': u'AWS::EC2::Instance',
u'metadata': {u'ensureRunning': u'true''true'}}]
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None).AndReturn(identity)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None).AndReturn(identity)
args = {
'stack_identity': identity,
'physical_resource_id': None,
'logical_resource_id': dummy_req.params.get('LogicalResourceId'),
}
rpc.call(dummy_req.context, self.topic,
- {'method': 'describe_stack_resources',
- 'args': args,
- 'version': self.api_version}, None).AndReturn(engine_resp)
+ {'method': 'describe_stack_resources',
+ 'args': args,
+ 'version': self.api_version}, None).AndReturn(engine_resp)
self.m.ReplayAll()
expected = {'DescribeStackResourcesResponse':
{'DescribeStackResourcesResult':
{'StackResources':
- [{'StackId': u'arn:openstack:heat::t:stacks/wordpress/6',
- 'ResourceStatus': u'CREATE_COMPLETE',
- 'Description': u'',
- 'ResourceType': u'AWS::EC2::Instance',
- 'Timestamp': u'2012-07-23T13:06:00Z',
- 'ResourceStatusReason': None,
- 'StackName': u'wordpress',
- 'PhysicalResourceId':
- u'a3455d8c-9f88-404d-a85b-5315293e67de',
- 'LogicalResourceId': u'WikiDatabase'}]}}}
+ [{'StackId': u'arn:openstack:heat::t:stacks/wordpress/6',
+ 'ResourceStatus': u'CREATE_COMPLETE',
+ 'Description': u'',
+ 'ResourceType': u'AWS::EC2::Instance',
+ 'Timestamp': u'2012-07-23T13:06:00Z',
+ 'ResourceStatusReason': None,
+ 'StackName': u'wordpress',
+ 'PhysicalResourceId':
+ u'a3455d8c-9f88-404d-a85b-5315293e67de',
+ 'LogicalResourceId': u'WikiDatabase'}]}}}
self.assertEqual(response, expected)
# Insert an engine RPC error and ensure we map correctly to the
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None
- ).AndRaise(rpc_common.RemoteError("AttributeError"))
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None
+ ).AndRaise(rpc_common.RemoteError("AttributeError"))
self.m.ReplayAll()
dummy_req = self._dummy_GET_request(params)
ret = self.controller.describe_stack_resources(dummy_req)
self.assertEqual(type(ret),
- exception.HeatInvalidParameterCombinationError)
+ exception.HeatInvalidParameterCombinationError)
def test_list_stack_resources(self):
# Format a dummy request
dummy_req = self._dummy_GET_request(params)
# Stub out the RPC call to the engine with a pre-canned response
- engine_resp = [{u'resource_identity': {
- u'tenant': u't',
- u'stack_name': u'wordpress',
- u'stack_id': u'6',
- u'path': u'/resources/WikiDatabase'
- },
+ engine_resp = [{u'resource_identity':
+ {u'tenant': u't',
+ u'stack_name': u'wordpress',
+ u'stack_id': u'6',
+ u'path': u'/resources/WikiDatabase'},
u'stack_name': u'wordpress',
u'logical_resource_id': u'WikiDatabase',
u'resource_status_reason': None,
u'path': u''},
u'resource_status': u'CREATE_COMPLETE',
u'physical_resource_id':
- u'a3455d8c-9f88-404d-a85b-5315293e67de',
+ u'a3455d8c-9f88-404d-a85b-5315293e67de',
u'resource_type': u'AWS::EC2::Instance'}]
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None).AndReturn(identity)
rpc.call(dummy_req.context, self.topic,
- {'method': 'list_stack_resources',
- 'args': {'stack_identity': identity},
- 'version': self.api_version}, None).AndReturn(engine_resp)
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None).AndReturn(identity)
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'list_stack_resources',
+ 'args': {'stack_identity': identity},
+ 'version': self.api_version}, None).AndReturn(engine_resp)
self.m.ReplayAll()
expected = {'ListStackResourcesResponse': {'ListStackResourcesResult':
{'StackResourceSummaries':
- [{'ResourceStatus': u'CREATE_COMPLETE',
- 'ResourceType': u'AWS::EC2::Instance',
- 'ResourceStatusReason': None,
- 'LastUpdatedTimestamp': u'2012-07-23T13:06:00Z',
- 'PhysicalResourceId':
- u'a3455d8c-9f88-404d-a85b-5315293e67de',
- 'LogicalResourceId': u'WikiDatabase'}]}}}
+ [{'ResourceStatus': u'CREATE_COMPLETE',
+ 'ResourceType': u'AWS::EC2::Instance',
+ 'ResourceStatusReason': None,
+ 'LastUpdatedTimestamp': u'2012-07-23T13:06:00Z',
+ 'PhysicalResourceId':
+ u'a3455d8c-9f88-404d-a85b-5315293e67de',
+ 'LogicalResourceId': u'WikiDatabase'}]}}}
self.assertEqual(response, expected)
# Insert an engine RPC error and ensure we map correctly to the
# heat exception type
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'method': 'identify_stack',
- 'args': {'stack_name': stack_name},
- 'version': self.api_version}, None
- ).AndRaise(rpc_common.RemoteError("AttributeError"))
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'identify_stack',
+ 'args': {'stack_name': stack_name},
+ 'version': self.api_version}, None
+ ).AndRaise(rpc_common.RemoteError("AttributeError"))
self.m.ReplayAll()
u'updated_time': u'2012-08-30T14:10:46Z'}]
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'args':
- {'watch_name': watch_name},
- 'method': 'show_watch',
- 'version': self.api_version},
- None).AndReturn(engine_resp)
+ rpc.call(dummy_req.context, self.topic,
+ {'args': {'watch_name': watch_name},
+ 'method': 'show_watch',
+ 'version': self.api_version},
+ None).AndReturn(engine_resp)
self.m.ReplayAll()
response = self.controller.describe_alarms(dummy_req)
expected = {'DescribeAlarmsResponse': {'DescribeAlarmsResult':
- {'MetricAlarms': [
+ {'MetricAlarms': [
{'EvaluationPeriods': u'1',
- 'StateReasonData': None,
- 'AlarmArn': None,
- 'StateUpdatedTimestamp': u'2012-08-30T14:13:21Z',
- 'AlarmConfigurationUpdatedTimestamp':
- u'2012-08-30T14:10:46Z',
- 'AlarmActions': [u'WebServerRestartPolicy'],
- 'Threshold': u'2',
- 'AlarmDescription': u'Restart the WikiDatabase',
- 'Namespace': u'system/linux',
- 'Period': u'300',
- 'StateValue': u'NORMAL',
- 'ComparisonOperator': u'GreaterThanThreshold',
- 'AlarmName': u'HttpFailureAlarm',
- 'Unit': None,
- 'Statistic': u'SampleCount',
- 'StateReason': None,
- 'InsufficientDataActions': None,
- 'OKActions': None,
- 'MetricName': u'ServiceFailure',
- 'ActionsEnabled': None,
- 'Dimensions': [
- {'Name': 'StackId',
- 'Value': u'21617058-781e-4262-97ab-5f9df371ee52'}
- ]}]}}}
+ 'StateReasonData': None,
+ 'AlarmArn': None,
+ 'StateUpdatedTimestamp': u'2012-08-30T14:13:21Z',
+ 'AlarmConfigurationUpdatedTimestamp':
+ u'2012-08-30T14:10:46Z',
+ 'AlarmActions': [u'WebServerRestartPolicy'],
+ 'Threshold': u'2',
+ 'AlarmDescription': u'Restart the WikiDatabase',
+ 'Namespace': u'system/linux',
+ 'Period': u'300',
+ 'StateValue': u'NORMAL',
+ 'ComparisonOperator': u'GreaterThanThreshold',
+ 'AlarmName': u'HttpFailureAlarm',
+ 'Unit': None,
+ 'Statistic': u'SampleCount',
+ 'StateReason': None,
+ 'InsufficientDataActions': None,
+ 'OKActions': None,
+ 'MetricName': u'ServiceFailure',
+ 'ActionsEnabled': None,
+ 'Dimensions':
+ [{'Name': 'StackId',
+ 'Value': u'21617058-781e-4262-97ab-5f9df371ee52'}]
+ }]}}}
self.assert_(response == expected)
# Stub out the RPC call to the engine with a pre-canned response
# We dummy three different metrics and namespaces to test
# filtering by parameter
- engine_resp = [
- {u'timestamp': u'2012-08-30T15:09:02Z',
+ engine_resp = [{u'timestamp': u'2012-08-30T15:09:02Z',
u'watch_name': u'HttpFailureAlarm',
u'namespace': u'system/linux',
u'metric_name': u'ServiceFailure',
u'data': {u'Units': u'Counter', u'Value': 1}},
- {u'timestamp': u'2012-08-30T15:10:03Z',
+ {u'timestamp': u'2012-08-30T15:10:03Z',
u'watch_name': u'HttpFailureAlarm2',
u'namespace': u'system/linux2',
u'metric_name': u'ServiceFailure2',
u'data': {u'Units': u'Counter', u'Value': 1}},
- {u'timestamp': u'2012-08-30T15:16:03Z',
+ {u'timestamp': u'2012-08-30T15:16:03Z',
u'watch_name': u'HttpFailureAlar3m',
u'namespace': u'system/linux3',
u'metric_name': u'ServiceFailure3',
# Current engine implementation means we filter in the API
# and pass None/None for namespace/watch_name which returns
# all metric data which we post-process in the API
- rpc.call(dummy_req.context, self.topic, {'args':
- {'namespace': None,
- 'metric_name': None},
- 'method': 'show_watch_metric', 'version': self.api_version},
+ rpc.call(dummy_req.context, self.topic,
+ {'args': {'namespace': None, 'metric_name': None},
+ 'method': 'show_watch_metric',
+ 'version': self.api_version},
None).AndReturn(engine_resp)
self.m.ReplayAll()
# First pass no query paramters filtering, should get all three
response = self.controller.list_metrics(dummy_req)
- expected = {'ListMetricsResponse': {'ListMetricsResult': {'Metrics': [
- {'Namespace': u'system/linux',
- 'Dimensions': [
- {'Name': 'AlarmName', 'Value': u'HttpFailureAlarm'},
- {'Name': 'Timestamp',
- 'Value': u'2012-08-30T15:09:02Z'},
- {'Name': u'Units', 'Value': u'Counter'},
- {'Name': u'Value', 'Value': 1}],
- 'MetricName': u'ServiceFailure'},
-
- {'Namespace': u'system/linux2',
- 'Dimensions': [
- {'Name': 'AlarmName', 'Value': u'HttpFailureAlarm2'},
- {'Name': 'Timestamp',
- 'Value': u'2012-08-30T15:10:03Z'},
- {'Name': u'Units', 'Value': u'Counter'},
- {'Name': u'Value', 'Value': 1}],
- 'MetricName': u'ServiceFailure2'},
-
- {'Namespace': u'system/linux3',
- 'Dimensions': [
- {'Name': 'AlarmName', 'Value': u'HttpFailureAlar3m'},
- {'Name': 'Timestamp',
- 'Value': u'2012-08-30T15:16:03Z'},
- {'Name': u'Units', 'Value': u'Counter'},
- {'Name': u'Value', 'Value': 1}],
- 'MetricName': u'ServiceFailure3'}]}}}
+ expected = {'ListMetricsResponse':
+ {'ListMetricsResult':
+ {'Metrics': [{'Namespace': u'system/linux',
+ 'Dimensions':
+ [{'Name': 'AlarmName',
+ 'Value': u'HttpFailureAlarm'},
+ {'Name': 'Timestamp',
+ 'Value': u'2012-08-30T15:09:02Z'},
+ {'Name': u'Units',
+ 'Value': u'Counter'},
+ {'Name': u'Value',
+ 'Value': 1}],
+ 'MetricName': u'ServiceFailure'},
+ {'Namespace': u'system/linux2',
+ 'Dimensions':
+ [{'Name': 'AlarmName',
+ 'Value': u'HttpFailureAlarm2'},
+ {'Name': 'Timestamp',
+ 'Value': u'2012-08-30T15:10:03Z'},
+ {'Name': u'Units',
+ 'Value': u'Counter'},
+ {'Name': u'Value',
+ 'Value': 1}],
+ 'MetricName': u'ServiceFailure2'},
+ {'Namespace': u'system/linux3',
+ 'Dimensions':
+ [{'Name': 'AlarmName',
+ 'Value': u'HttpFailureAlar3m'},
+ {'Name': 'Timestamp',
+ 'Value': u'2012-08-30T15:16:03Z'},
+ {'Name': u'Units',
+ 'Value': u'Counter'},
+ {'Name': u'Value',
+ 'Value': 1}],
+ 'MetricName': u'ServiceFailure3'}]}}}
self.assert_(response == expected)
def test_list_metrics_filter_name(self):
# Stub out the RPC call to the engine with a pre-canned response
# We dummy three different metrics and namespaces to test
# filtering by parameter
- engine_resp = [
- {u'timestamp': u'2012-08-30T15:09:02Z',
+ engine_resp = [{u'timestamp': u'2012-08-30T15:09:02Z',
u'watch_name': u'HttpFailureAlarm',
u'namespace': u'system/linux',
u'metric_name': u'ServiceFailure',
u'data': {u'Units': u'Counter', u'Value': 1}},
- {u'timestamp': u'2012-08-30T15:10:03Z',
+ {u'timestamp': u'2012-08-30T15:10:03Z',
u'watch_name': u'HttpFailureAlarm2',
u'namespace': u'system/linux2',
u'metric_name': u'ServiceFailure2',
u'data': {u'Units': u'Counter', u'Value': 1}},
- {u'timestamp': u'2012-08-30T15:16:03Z',
+ {u'timestamp': u'2012-08-30T15:16:03Z',
u'watch_name': u'HttpFailureAlar3m',
u'namespace': u'system/linux3',
u'metric_name': u'ServiceFailure3',
# First pass no query paramters filtering, should get all three
response = self.controller.list_metrics(dummy_req)
- expected = {'ListMetricsResponse': {'ListMetricsResult': {'Metrics': [
- {'Namespace': u'system/linux',
- 'Dimensions': [
- {'Name': 'AlarmName', 'Value': u'HttpFailureAlarm'},
- {'Name': 'Timestamp',
- 'Value': u'2012-08-30T15:09:02Z'},
- {'Name': u'Units', 'Value': u'Counter'},
- {'Name': u'Value', 'Value': 1}],
- 'MetricName': u'ServiceFailure'},
- ]}}}
+ expected = {'ListMetricsResponse':
+ {'ListMetricsResult':
+ {'Metrics':
+ [{'Namespace': u'system/linux',
+ 'Dimensions':
+ [{'Name': 'AlarmName',
+ 'Value': u'HttpFailureAlarm'},
+ {'Name': 'Timestamp',
+ 'Value': u'2012-08-30T15:09:02Z'},
+ {'Name': u'Units',
+ 'Value': u'Counter'},
+ {'Name': u'Value',
+ 'Value': 1}],
+ 'MetricName': u'ServiceFailure'}]}}}
self.assert_(response == expected)
def test_list_metrics_filter_namespace(self):
# Stub out the RPC call to the engine with a pre-canned response
# We dummy three different metrics and namespaces to test
# filtering by parameter
- engine_resp = [
- {u'timestamp': u'2012-08-30T15:09:02Z',
+ engine_resp = [{u'timestamp': u'2012-08-30T15:09:02Z',
u'watch_name': u'HttpFailureAlarm',
u'namespace': u'atestnamespace/foo',
u'metric_name': u'ServiceFailure',
u'data': {u'Units': u'Counter', u'Value': 1}},
- {u'timestamp': u'2012-08-30T15:10:03Z',
+ {u'timestamp': u'2012-08-30T15:10:03Z',
u'watch_name': u'HttpFailureAlarm2',
u'namespace': u'atestnamespace/foo',
u'metric_name': u'ServiceFailure2',
u'data': {u'Units': u'Counter', u'Value': 1}},
- {u'timestamp': u'2012-08-30T15:16:03Z',
+ {u'timestamp': u'2012-08-30T15:16:03Z',
u'watch_name': u'HttpFailureAlar3m',
u'namespace': u'system/linux3',
u'metric_name': u'ServiceFailure3',
self.m.ReplayAll()
response = self.controller.list_metrics(dummy_req)
- expected = {'ListMetricsResponse': {'ListMetricsResult': {'Metrics': [
- {'Namespace': u'atestnamespace/foo',
- 'Dimensions': [
- {'Name': 'AlarmName', 'Value': u'HttpFailureAlarm'},
- {'Name': 'Timestamp', 'Value': u'2012-08-30T15:09:02Z'},
- {'Name': u'Units', 'Value': u'Counter'},
- {'Name': u'Value', 'Value': 1}],
- 'MetricName': u'ServiceFailure'},
-
- {'Namespace': u'atestnamespace/foo',
- 'Dimensions': [
- {'Name': 'AlarmName', 'Value': u'HttpFailureAlarm2'},
- {'Name': 'Timestamp', 'Value': u'2012-08-30T15:10:03Z'},
- {'Name': u'Units', 'Value': u'Counter'},
- {'Name': u'Value', 'Value': 1}],
- 'MetricName': u'ServiceFailure2'}]}}}
+ expected = {'ListMetricsResponse':
+ {'ListMetricsResult':
+ {'Metrics':
+ [{'Namespace': u'atestnamespace/foo',
+ 'Dimensions':
+ [{'Name': 'AlarmName',
+ 'Value': u'HttpFailureAlarm'},
+ {'Name': 'Timestamp',
+ 'Value': u'2012-08-30T15:09:02Z'},
+ {'Name': u'Units',
+ 'Value': u'Counter'},
+ {'Name': u'Value',
+ 'Value': 1}],
+ 'MetricName': u'ServiceFailure'},
+ {'Namespace': u'atestnamespace/foo',
+ 'Dimensions':
+ [{'Name': 'AlarmName',
+ 'Value': u'HttpFailureAlarm2'},
+ {'Name': 'Timestamp',
+ 'Value': u'2012-08-30T15:10:03Z'},
+ {'Name': u'Units',
+ 'Value': u'Counter'},
+ {'Name': u'Value',
+ 'Value': 1}],
+ 'MetricName': u'ServiceFailure2'}]}}}
self.assert_(response == expected)
def test_put_metric_alarm(self):
u'MetricData.member.1.Value': u'1',
u'MetricData.member.1.MetricName': u'ServiceFailure',
u'MetricData.member.1.Dimensions.member.1.Name':
- u'AlarmName',
+ u'AlarmName',
u'MetricData.member.1.Dimensions.member.1.Value':
- u'HttpFailureAlarm',
+ u'HttpFailureAlarm',
u'Action': u'PutMetricData'}
dummy_req = self._dummy_GET_request(params)
engine_resp = {}
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'args': {'stats_data':
- {'Namespace': u'system/linux',
- u'ServiceFailure':
- {'Value': u'1',
- 'Unit': u'Count',
- 'Dimensions': []}},
- 'watch_name': u'HttpFailureAlarm'},
+ rpc.call(dummy_req.context, self.topic,
+ {'args':
+ {'stats_data':
+ {'Namespace': u'system/linux',
+ u'ServiceFailure':
+ {'Value': u'1',
+ 'Unit': u'Count',
+ 'Dimensions': []}},
+ 'watch_name': u'HttpFailureAlarm'},
'method': 'create_watch_data',
'version': self.api_version},
None).AndReturn(engine_resp)
def test_set_alarm_state(self):
state_map = {'OK': engine_api.WATCH_STATE_OK,
- 'ALARM': engine_api.WATCH_STATE_ALARM,
- 'INSUFFICIENT_DATA': engine_api.WATCH_STATE_NODATA}
+ 'ALARM': engine_api.WATCH_STATE_ALARM,
+ 'INSUFFICIENT_DATA': engine_api.WATCH_STATE_NODATA}
for state in state_map.keys():
params = {u'StateValue': state,
engine_resp = {}
self.m.StubOutWithMock(rpc, 'call')
- rpc.call(dummy_req.context, self.topic, {'args':
- {'state': state_map[state],
- 'watch_name': u'HttpFailureAlarm'},
- 'method': 'set_watch_state',
- 'version': self.api_version},
- None).AndReturn(engine_resp)
+ rpc.call(dummy_req.context, self.topic,
+ {'args':
+ {'state': state_map[state],
+ 'watch_name': u'HttpFailureAlarm'},
+ 'method': 'set_watch_state',
+ 'version': self.api_version},
+ None).AndReturn(engine_resp)
self.m.ReplayAll()
blarg: wibble
'''
parsed = {u'HeatTemplateFormatVersion': u'2012-12-12',
- u'Mappings': {},
- u'Outputs': {},
- u'Parameters': {},
- u'Resources': {},
- u'blarg': u'wibble',
- u'foo': u'bar'}
+ u'Mappings': {},
+ u'Outputs': {},
+ u'Parameters': {},
+ u'Resources': {},
+ u'blarg': u'wibble',
+ u'foo': u'bar'}
body = {'template': template}
data = stacks.InstantiationData(body)
stack_name=identity.stack_name,
path='resources')
except webob.exc.HTTPFound as found:
- self.assertEqual(found.location, self._url(identity) +
- '/resources')
+ self.assertEqual(found.location,
+ self._url(identity) + '/resources')
else:
self.fail('No redirect generated')
self.m.VerifyAll()
u'stack_identity': stack_identity,
u'resource_status': u'CREATE_COMPLETE',
u'physical_resource_id':
- u'a3455d8c-9f88-404d-a85b-5315293e67de',
+ u'a3455d8c-9f88-404d-a85b-5315293e67de',
u'resource_type': u'AWS::EC2::Instance',
}
]
stack_id=stack_identity.stack_id)
expected = {
- 'resources': [
- {
- 'links': [
- {'href': self._url(res_identity), 'rel': 'self'},
- {'href': self._url(stack_identity), 'rel': 'stack'},
- ],
- u'logical_resource_id': res_name,
- u'resource_status_reason': None,
- u'updated_time': u'2012-07-23T13:06:00Z',
- u'resource_status': u'CREATE_COMPLETE',
- u'physical_resource_id':
- u'a3455d8c-9f88-404d-a85b-5315293e67de',
- u'resource_type': u'AWS::EC2::Instance',
- }
- ]
- }
+ 'resources': [{'links': [{'href': self._url(res_identity),
+ 'rel': 'self'},
+ {'href': self._url(stack_identity),
+ 'rel': 'stack'}],
+ u'logical_resource_id': res_name,
+ u'resource_status_reason': None,
+ u'updated_time': u'2012-07-23T13:06:00Z',
+ u'resource_status': u'CREATE_COMPLETE',
+ u'physical_resource_id':
+ u'a3455d8c-9f88-404d-a85b-5315293e67de',
+ u'resource_type': u'AWS::EC2::Instance'}]}
self.assertEqual(result, expected)
self.m.VerifyAll()
u'stack_identity': dict(stack_identity),
u'resource_status': u'CREATE_COMPLETE',
u'physical_resource_id':
- u'a3455d8c-9f88-404d-a85b-5315293e67de',
+ u'a3455d8c-9f88-404d-a85b-5315293e67de',
u'resource_type': u'AWS::EC2::Instance',
u'metadata': {u'ensureRunning': u'true'}
}
u'updated_time': u'2012-07-23T13:06:00Z',
u'resource_status': u'CREATE_COMPLETE',
u'physical_resource_id':
- u'a3455d8c-9f88-404d-a85b-5315293e67de',
+ u'a3455d8c-9f88-404d-a85b-5315293e67de',
u'resource_type': u'AWS::EC2::Instance',
}
}
u'stack_identity': dict(stack_identity),
u'resource_status': u'CREATE_COMPLETE',
u'physical_resource_id':
- u'a3455d8c-9f88-404d-a85b-5315293e67de',
+ u'a3455d8c-9f88-404d-a85b-5315293e67de',
u'resource_type': u'AWS::EC2::Instance',
u'metadata': {u'ensureRunning': u'true'}
}
u'event_identity': dict(ev_identity),
u'resource_status': u'CREATE_COMPLETE',
u'physical_resource_id':
- u'a3455d8c-9f88-404d-a85b-5315293e67de',
+ u'a3455d8c-9f88-404d-a85b-5315293e67de',
u'resource_properties': {u'UserData': u'blah'},
u'resource_type': u'AWS::EC2::Instance',
}
u'event_time': u'2012-07-23T13:06:00Z',
u'resource_status': u'CREATE_COMPLETE',
u'physical_resource_id':
- u'a3455d8c-9f88-404d-a85b-5315293e67de',
+ u'a3455d8c-9f88-404d-a85b-5315293e67de',
u'resource_type': u'AWS::EC2::Instance',
u'resource_properties': {u'UserData': u'blah'},
}
res_identity = identifier.ResourceIdentifier(resource_name=res_name,
**stack_identity)
ev_identity = identifier.EventIdentifier(event_id='41',
- **res_identity)
+ **res_identity)
req = self._get(stack_identity._tenant_path() +
'/resources/' + res_name + '/events/' + event_id)
res_identity = identifier.ResourceIdentifier(resource_name=res_name,
**stack_identity)
ev_identity = identifier.EventIdentifier(event_id='41',
- **res_identity)
+ **res_identity)
req = self._get(stack_identity._tenant_path() +
'/resources/' + res_name + '/events/' + event_id)
def create_scaling_group(self, t, stack, resource_name):
resource = asc.AutoScalingGroup(resource_name,
- t['Resources'][resource_name],
- stack)
+ t['Resources'][resource_name],
+ stack)
self.assertEqual(None, resource.validate())
self.assertEqual(None, resource.create())
self.assertEqual(asc.AutoScalingGroup.CREATE_COMPLETE, resource.state)
def create_scaling_policy(self, t, stack, resource_name):
resource = asc.ScalingPolicy(resource_name,
- t['Resources'][resource_name],
- stack)
+ t['Resources'][resource_name],
+ stack)
self.assertEqual(None, resource.validate())
self.assertEqual(None, resource.create())
self.assertEqual(asc.ScalingPolicy.CREATE_COMPLETE,
self.assertEqual('WebServerGroup', resource.FnGetRefId())
self.assertEqual('WebServerGroup-0', resource.resource_id)
self.assertEqual(asc.AutoScalingGroup.UPDATE_REPLACE,
- resource.handle_update())
+ resource.handle_update())
resource.delete()
# reduce by 50%
resource.adjust(-50, 'PercentChangeInCapacity')
self.assertEqual('WebServerGroup-0',
- resource.resource_id)
+ resource.resource_id)
# raise by 200%
resource.adjust(200, 'PercentChangeInCapacity')
self.assertEqual('WebServerGroup-0,WebServerGroup-1,WebServerGroup-2',
- resource.resource_id)
+ resource.resource_id)
resource.delete()
'WebServerScaleUpPolicy')
up_policy.alarm()
self.assertEqual('WebServerGroup-0,WebServerGroup-1',
- resource.resource_id)
+ resource.resource_id)
down_policy = self.create_scaling_policy(t, stack,
'WebServerScaleDownPolicy')
down_policy.alarm()
- self.assertEqual('WebServerGroup-0',
- resource.resource_id)
+ self.assertEqual('WebServerGroup-0', resource.resource_id)
resource.delete()
fullpath = basepath + '/bin/' + bin
proc = subprocess.Popen(fullpath,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
stdout, stderr = proc.communicate()
if proc.returncode:
def create_dbinstance(self, t, stack, resource_name):
resource = dbi.DBInstance(resource_name,
- t['Resources'][resource_name],
- stack)
+ t['Resources'][resource_name],
+ stack)
self.assertEqual(None, resource.validate())
self.assertEqual(None, resource.create())
self.assertEqual(dbi.DBInstance.CREATE_COMPLETE, resource.state)
def create_eip(self, t, stack, resource_name):
resource = eip.ElasticIp(resource_name,
- t['Resources'][resource_name],
- stack)
+ t['Resources'][resource_name],
+ stack)
self.assertEqual(None, resource.validate())
self.assertEqual(None, resource.create())
self.assertEqual(eip.ElasticIp.CREATE_COMPLETE, resource.state)
def create_association(self, t, stack, resource_name):
resource = eip.ElasticIpAssociation(resource_name,
- t['Resources'][resource_name],
- stack)
+ t['Resources'][resource_name],
+ stack)
self.assertEqual(None, resource.validate())
self.assertEqual(None, resource.create())
self.assertEqual(eip.ElasticIpAssociation.CREATE_COMPLETE,
self.assertEqual('1', resource.FnGetAtt('AllocationId'))
self.assertEqual(eip.ElasticIp.UPDATE_REPLACE,
- resource.handle_update())
+ resource.handle_update())
try:
resource.FnGetAtt('Foo')
stack_name = 'service_create/test_stack'
stack = get_wordpress_stack('test_stack', self.ctx)
- self.assertRaises(ValueError, self.man.create_stack,
- self.ctx, stack_name,
- stack.t, {}, {})
+ self.assertRaises(ValueError,
+ self.man.create_stack,
+ self.ctx, stack_name, stack.t, {}, {})
def test_stack_create_invalid_resource_name(self):
stack_name = 'service_create_test_stack_invalid_res'
tmpl['Resources']['Web/Server'] = tmpl['Resources']['WebServer']
del tmpl['Resources']['WebServer']
- self.assertRaises(ValueError, self.man.create_stack,
- self.ctx, stack_name,
- stack.t, {}, {})
+ self.assertRaises(ValueError,
+ self.man.create_stack,
+ self.ctx, stack_name,
+ stack.t, {}, {})
def test_stack_delete(self):
stack_name = 'service_delete_test_stack'
def test_stack_by_name_tenants(self):
self.assertEqual(self.stack.id,
- db_api.stack_get_by_name(self.ctx, self.stack_name).id)
+ db_api.stack_get_by_name(self.ctx,
+ self.stack_name).id)
ctx2 = create_context(self.m, self.username,
- 'stack_service_test_tenant2')
+ 'stack_service_test_tenant2')
self.assertEqual(None, db_api.stack_get_by_name(ctx2, self.stack_name))
def test_stack_event_list(self):
self.assertTrue('resource_status' in ev)
self.assertTrue(ev['resource_status'] in ('IN_PROGRESS',
- 'CREATE_COMPLETE'))
+ 'CREATE_COMPLETE'))
self.assertTrue('resource_status_reason' in ev)
self.assertEqual(ev['resource_status_reason'], 'state changed')
def test_stack_resources_describe_no_filter(self):
resources = self.man.describe_stack_resources(self.ctx,
- self.stack_identity,
- None, None)
+ self.stack_identity,
+ None, None)
self.assertEqual(len(resources), 1)
r = resources[0]
values = {'stack_id': self.stack.id,
'state': 'NORMAL',
'name': u'HttpFailureAlarm',
- 'rule': {
- u'EvaluationPeriods': u'1',
- u'AlarmActions': [u'WebServerRestartPolicy'],
- u'AlarmDescription': u'Restart the WikiDatabase',
- u'Namespace': u'system/linux',
- u'Period': u'300',
- u'ComparisonOperator': u'GreaterThanThreshold',
- u'Statistic': u'SampleCount',
- u'Threshold': u'2',
- u'MetricName': u'ServiceFailure'}}
+ 'rule': {u'EvaluationPeriods': u'1',
+ u'AlarmActions': [u'WebServerRestartPolicy'],
+ u'AlarmDescription': u'Restart the WikiDatabase',
+ u'Namespace': u'system/linux',
+ u'Period': u'300',
+ u'ComparisonOperator': u'GreaterThanThreshold',
+ u'Statistic': u'SampleCount',
+ u'Threshold': u'2',
+ u'MetricName': u'ServiceFailure'}}
db_ret = db_api.watch_rule_create(self.ctx, values)
self.assertNotEqual(db_ret, None)
values['name'] = "AnotherWatch"
values = {'stack_id': self.stack.id,
'state': 'NORMAL',
'name': u'HttpFailureAlarm',
- 'rule': {
- u'EvaluationPeriods': u'1',
- u'AlarmActions': [u'WebServerRestartPolicy'],
- u'AlarmDescription': u'Restart the WikiDatabase',
- u'Namespace': u'system/linux',
- u'Period': u'300',
- u'ComparisonOperator': u'GreaterThanThreshold',
- u'Statistic': u'SampleCount',
- u'Threshold': u'2',
- u'MetricName': u'ServiceFailure'}}
+ 'rule': {u'EvaluationPeriods': u'1',
+ u'AlarmActions': [u'WebServerRestartPolicy'],
+ u'AlarmDescription': u'Restart the WikiDatabase',
+ u'Namespace': u'system/linux',
+ u'Period': u'300',
+ u'ComparisonOperator': u'GreaterThanThreshold',
+ u'Statistic': u'SampleCount',
+ u'Threshold': u'2',
+ u'MetricName': u'ServiceFailure'}}
db_ret = db_api.watch_rule_create(self.ctx, values)
self.assertNotEqual(db_ret, None)
watch = db_api.watch_rule_get_by_name(self.ctx, "HttpFailureAlarm")
self.assertNotEqual(watch, None)
values = {'watch_rule_id': watch.id,
- 'data': {
- u'Namespace': u'system/linux',
- u'ServiceFailure': {
- u'Units': u'Counter', u'Value': 1}}}
+ 'data': {u'Namespace': u'system/linux',
+ u'ServiceFailure': {
+ u'Units': u'Counter', u'Value': 1}}}
watch = db_api.watch_data_create(self.ctx, values)
# Check there is one result returned
values = {'stack_id': self.stack.id,
'state': 'NORMAL',
'name': u'OverrideAlarm',
- 'rule': {
- u'EvaluationPeriods': u'1',
- u'AlarmActions': [u'WebServerRestartPolicy'],
- u'AlarmDescription': u'Restart the WikiDatabase',
- u'Namespace': u'system/linux',
- u'Period': u'300',
- u'ComparisonOperator': u'GreaterThanThreshold',
- u'Statistic': u'SampleCount',
- u'Threshold': u'2',
- u'MetricName': u'ServiceFailure'}}
+ 'rule': {u'EvaluationPeriods': u'1',
+ u'AlarmActions': [u'WebServerRestartPolicy'],
+ u'AlarmDescription': u'Restart the WikiDatabase',
+ u'Namespace': u'system/linux',
+ u'Period': u'300',
+ u'ComparisonOperator': u'GreaterThanThreshold',
+ u'Statistic': u'SampleCount',
+ u'Threshold': u'2',
+ u'MetricName': u'ServiceFailure'}}
db_ret = db_api.watch_rule_create(self.ctx, values)
self.assertNotEqual(db_ret, None)
values = {'stack_id': self.stack.id,
'state': 'NORMAL',
'name': u'OverrideAlarm2',
- 'rule': {
- u'EvaluationPeriods': u'1',
- u'AlarmActions': [u'WebServerRestartPolicy'],
- u'AlarmDescription': u'Restart the WikiDatabase',
- u'Namespace': u'system/linux',
- u'Period': u'300',
- u'ComparisonOperator': u'GreaterThanThreshold',
- u'Statistic': u'SampleCount',
- u'Threshold': u'2',
- u'MetricName': u'ServiceFailure'}}
+ 'rule': {u'EvaluationPeriods': u'1',
+ u'AlarmActions': [u'WebServerRestartPolicy'],
+ u'AlarmDescription': u'Restart the WikiDatabase',
+ u'Namespace': u'system/linux',
+ u'Period': u'300',
+ u'ComparisonOperator': u'GreaterThanThreshold',
+ u'Statistic': u'SampleCount',
+ u'Threshold': u'2',
+ u'MetricName': u'ServiceFailure'}}
db_ret = db_api.watch_rule_create(self.ctx, values)
self.assertNotEqual(db_ret, None)
def test_arn_url_parse_qs(self):
url = self.url_prefix +\
- 'arn%3Aopenstack%3Aheat%3A%3At%3Astacks/s/i/p?foo=bar'
+ 'arn%3Aopenstack%3Aheat%3A%3At%3Astacks/s/i/p?foo=bar'
hi = identifier.HeatIdentifier.from_arn_url(url)
self.assertEqual(hi.tenant, 't')
self.assertEqual(hi.stack_name, 's')
# need to resolve the template functions
server_userdata = instance._build_userdata(
- instance.t['Properties']['UserData'])
+ instance.t['Properties']['UserData'])
self.m.StubOutWithMock(self.fc.servers, 'create')
- self.fc.servers.create(image=1, flavor=1, key_name='test',
- name='%s.%s' % (stack_name, instance.name),
- security_groups=None,
- userdata=server_userdata, scheduler_hints=None,
- meta=None).AndReturn(self.fc.servers.list()[1])
+ self.fc.servers.create(
+ image=1, flavor=1, key_name='test',
+ name='%s.%s' % (stack_name, instance.name),
+ security_groups=None,
+ userdata=server_userdata, scheduler_hints=None,
+ meta=None).AndReturn(self.fc.servers.list()[1])
self.m.ReplayAll()
self.assertEqual(instance.create(), None)
# need to resolve the template functions
server_userdata = instance._build_userdata(
- instance.t['Properties']['UserData'])
+ instance.t['Properties']['UserData'])
self.m.StubOutWithMock(self.fc.servers, 'create')
- self.fc.servers.create(image=1, flavor=1, key_name='test',
- name='%s.%s' % (stack_name, instance.name),
- security_groups=None,
- userdata=server_userdata, scheduler_hints=None,
- meta=None).AndReturn(self.fc.servers.list()[1])
+ self.fc.servers.create(
+ image=1, flavor=1, key_name='test',
+ name='%s.%s' % (stack_name, instance.name),
+ security_groups=None,
+ userdata=server_userdata, scheduler_hints=None,
+ meta=None).AndReturn(self.fc.servers.list()[1])
self.m.ReplayAll()
self.assertEqual(instance.create(), None)
def create_loadbalancer(self, t, stack, resource_name):
resource = lb.LoadBalancer(resource_name,
- t['Resources'][resource_name],
- stack)
+ t['Resources'][resource_name],
+ stack)
self.assertEqual(None, resource.validate())
self.assertEqual(None, resource.create())
self.assertEqual(lb.LoadBalancer.CREATE_COMPLETE, resource.state)
def test_loadbalancer(self):
lb.LoadBalancer.nova().AndReturn(self.fc)
-# parser.Stack.store(mox.IgnoreArg()).AndReturn('5678')
instance.Instance.nova().MultipleTimes().AndReturn(self.fc)
- self.fc.servers.create(flavor=2, image=745, key_name='test',
- meta=None, name=u'test_stack.LoadBalancer.LB_instance',
- scheduler_hints=None, userdata=mox.IgnoreArg(),
- security_groups=None).AndReturn(self.fc.servers.list()[1])
+ self.fc.servers.create(
+ flavor=2, image=745, key_name='test',
+ meta=None, name=u'test_stack.LoadBalancer.LB_instance',
+ scheduler_hints=None, userdata=mox.IgnoreArg(),
+ security_groups=None).AndReturn(self.fc.servers.list()[1])
#stack.Stack.create_with_template(mox.IgnoreArg()).AndReturn(None)
Metadata.__set__(mox.IgnoreArg(),
mox.IgnoreArg()).AndReturn(None)
self.assertEqual(None, resource.validate())
hc['Timeout'] = 35
- self.assertEqual({'Error':
- 'Interval must be larger than Timeout'},
- resource.validate())
+ self.assertEqual(
+ {'Error': 'Interval must be larger than Timeout'},
+ resource.validate())
hc['Timeout'] = 5
self.assertEqual('LoadBalancer', resource.FnGetRefId())
def test_schema_invariance(self):
params1 = parameters.Parameters('test', params_schema,
- {'Defaulted': 'wibble'})
+ {'Defaulted': 'wibble'})
self.assertEqual(params1['Defaulted'], 'wibble')
params2 = parameters.Parameters('test', params_schema)
self.assertTrue(parsed is not raw)
def test_join_recursive(self):
- raw = {'Fn::Join': ['\n', [{'Fn::Join': [' ', ['foo', 'bar']]},
- 'baz']]}
+ raw = {'Fn::Join': ['\n', [{'Fn::Join':
+ [' ', ['foo', 'bar']]}, 'baz']]}
self.assertEqual(join(raw), 'foo bar\nbaz')
def test_join_reduce(self):
join = {"Fn::Join": [" ", ["foo", "bar", "baz", {'Ref': 'baz'},
- "bink", "bonk"]]}
- self.assertEqual(parser.Template.reduce_joins(join),
- {"Fn::Join": [" ", ["foo bar baz", {'Ref': 'baz'},
- "bink bonk"]]})
+ "bink", "bonk"]]}
+ self.assertEqual(
+ parser.Template.reduce_joins(join),
+ {"Fn::Join": [" ", ["foo bar baz", {'Ref': 'baz'}, "bink bonk"]]})
join = {"Fn::Join": [" ", ["foo", {'Ref': 'baz'},
- "bink"]]}
- self.assertEqual(parser.Template.reduce_joins(join),
- {"Fn::Join": [" ", ["foo", {'Ref': 'baz'},
- "bink"]]})
+ "bink"]]}
+ self.assertEqual(
+ parser.Template.reduce_joins(join),
+ {"Fn::Join": [" ", ["foo", {'Ref': 'baz'}, "bink"]]})
join = {"Fn::Join": [" ", [{'Ref': 'baz'}]]}
- self.assertEqual(parser.Template.reduce_joins(join),
+ self.assertEqual(
+ parser.Template.reduce_joins(join),
{"Fn::Join": [" ", [{'Ref': 'baz'}]]})
def test_join(self):
map_schema = {'valid': {'Type': 'Boolean'}}
list_schema = {'Type': 'Map', 'Schema': map_schema}
p = properties.Property({'Type': 'List', 'Schema': list_schema})
- self.assertEqual(p.validate_data([{'valid': 'TRUE'},
- {'valid': 'False'}]),
- [{'valid': True},
- {'valid': False}])
+ self.assertEqual(p.validate_data(
+ [{'valid': 'TRUE'}, {'valid': 'False'}]),
+ [{'valid': True}, {'valid': False}])
def test_list_schema_bad_data(self):
map_schema = {'valid': {'Type': 'Boolean'}}
return stack
def create_net(self, t, stack, resource_name):
- resource = net.Net('test_net',
- t['Resources'][resource_name],
- stack)
+ resource = net.Net('test_net', t['Resources'][resource_name], stack)
self.assertEqual(None, resource.create())
self.assertEqual(net.Net.CREATE_COMPLETE, resource.state)
return resource
def test_validate_properties(self):
vs = {'router:external': True}
- data = {
- 'admin_state_up': False,
- 'value_specs': vs
- }
+ data = {'admin_state_up': False,
+ 'value_specs': vs}
p = properties.Properties(net.Net.properties_schema, data)
self.assertEqual(None, qr.validate_properties(p))
vs['shared'] = True
self.assertEqual('shared not allowed in value_specs',
- qr.validate_properties(p))
+ qr.validate_properties(p))
vs.pop('shared')
vs['name'] = 'foo'
self.assertEqual('name not allowed in value_specs',
- qr.validate_properties(p))
+ qr.validate_properties(p))
vs.pop('name')
vs['tenant_id'] = '1234'
self.assertEqual('tenant_id not allowed in value_specs',
- qr.validate_properties(p))
+ qr.validate_properties(p))
vs.pop('tenant_id')
vs['foo'] = '1234'
self.assertEqual(None, qr.validate_properties(p))
def test_prepare_properties(self):
- data = {
- 'admin_state_up': False,
- 'value_specs': {'router:external': True}
- }
+ data = {'admin_state_up': False,
+ 'value_specs': {'router:external': True}}
p = properties.Properties(net.Net.properties_schema, data)
props = qr.prepare_properties(p, 'resource_name')
- self.assertEqual({
- 'name': 'resource_name',
- 'router:external': True,
- 'admin_state_up': False
- }, props)
+ self.assertEqual({'name': 'resource_name',
+ 'router:external': True,
+ 'admin_state_up': False}, props)
def test_net(self):
fq = FakeQuantum()
pass
self.assertEqual('fc68ea2c-b60b-4b4f-bd82-94ec81110766',
- resource.FnGetAtt('id'))
+ resource.FnGetAtt('id'))
self.assertEqual(net.Net.UPDATE_REPLACE, resource.handle_update())
tmpl2 = {'Type': 'Foo'}
tmpl3 = {'Type': 'Bar'}
stack2 = parser.Stack(None, 'test_stack', parser.Template({}),
- stack_id=-1)
+ stack_id=-1)
res1 = resource.GenericResource('test_resource', tmpl1, self.stack)
res2 = resource.GenericResource('test_resource', tmpl2, stack2)
res3 = resource.GenericResource('test_resource2', tmpl3, stack2)
self.stack = parser.Stack(ctx, 'test_stack', parser.Template({}))
self.stack.store()
self.res = resource.GenericResource('metadata_resource',
- tmpl, self.stack)
+ tmpl, self.stack)
self.res.create()
def tearDown(self):
def create_resource(self, t, stack, resource_name):
resource = s3.S3Bucket('test_resource',
- t['Resources'][resource_name],
- stack)
+ t['Resources'][resource_name],
+ stack)
self.assertEqual(None, resource.create())
self.assertEqual(s3.S3Bucket.CREATE_COMPLETE, resource.state)
return resource
def test_create_container_name(self):
self.m.UnsetStubs()
self.assertTrue(re.match(self.container_pattern,
- s3.S3Bucket._create_container_name('test_stack.test_resource')))
+ s3.S3Bucket._create_container_name(
+ 'test_stack.test_resource')))
@skip_if(skip_test, 'unable to import swiftclient')
def test_attributes(self):
swiftclient.Connection.put_container(
mox.Regex(self.container_pattern),
{'X-Container-Write': 'test_tenant:test_username',
- 'X-Container-Read': 'test_tenant:test_username'}).AndReturn(None)
+ 'X-Container-Read': 'test_tenant:test_username'}
+ ).AndReturn(None)
swiftclient.Connection.get_auth().MultipleTimes().AndReturn(
- ('http://localhost:8080/v_2', None))
+ ('http://localhost:8080/v_2', None))
swiftclient.Connection.delete_container(
mox.Regex(self.container_pattern)).AndReturn(None)
swiftclient.Connection.put_container(
mox.Regex(self.container_pattern),
{'X-Container-Write': 'test_tenant:test_username',
- 'X-Container-Read': '.r:*'}).AndReturn(None)
+ 'X-Container-Read': '.r:*'}).AndReturn(None)
swiftclient.Connection.delete_container(
mox.Regex(self.container_pattern)).AndReturn(None)
swiftclient.Connection.put_container(
mox.Regex(self.container_pattern),
{'X-Container-Write': '.r:*',
- 'X-Container-Read': '.r:*'}).AndReturn(None)
+ 'X-Container-Read': '.r:*'}).AndReturn(None)
swiftclient.Connection.delete_container(
mox.Regex(self.container_pattern)).AndReturn(None)
swiftclient.Connection.put_container(
mox.Regex(self.container_pattern),
{'X-Container-Write': 'test_tenant:test_username',
- 'X-Container-Read': 'test_tenant'}).AndReturn(None)
+ 'X-Container-Read': 'test_tenant'}).AndReturn(None)
swiftclient.Connection.delete_container(
mox.Regex(self.container_pattern)).AndReturn(None)
swiftclient.Connection.put_container(
mox.Regex(self.container_pattern),
{'X-Container-Write': 'test_tenant:test_username',
- 'X-Container-Read': 'test_tenant:test_username'}).AndReturn(None)
+ 'X-Container-Read': 'test_tenant:test_username'}).AndReturn(None)
swiftclient.Connection.delete_container(
mox.Regex(self.container_pattern)).AndRaise(
- swiftclient.ClientException('Test delete failure'))
+ swiftclient.ClientException('Test delete failure'))
self.m.ReplayAll()
t = self.load_template()
swiftclient.Connection.put_container(
mox.Regex(self.container_pattern),
{'X-Container-Write': 'test_tenant:test_username',
- 'X-Container-Read': 'test_tenant:test_username'}).AndReturn(None)
+ 'X-Container-Read': 'test_tenant:test_username'}).AndReturn(None)
# This should not be called
swiftclient.Connection.delete_container(
mox.Regex(self.container_pattern)).AndReturn(None)
template_test_count = 0
for (json_str,
- yml_str,
- file_name) in self.convert_all_json_to_yaml(path):
+ yml_str,
+ file_name) in self.convert_all_json_to_yaml(path):
self.compare_json_vs_yaml(json_str, yml_str, file_name)
template_test_count += 1
break
self.assertTrue(template_test_count >= self.expected_test_count,
- 'Expected at least %d templates to be tested' %
- self.expected_test_count)
+ 'Expected at least %d templates to be tested' %
+ self.expected_test_count)
def compare_json_vs_yaml(self, json_str, yml_str, file_name):
yml = template_format.parse(yml_str)
self.assertEqual(u'2012-12-12', yml[u'HeatTemplateFormatVersion'],
- file_name)
+ file_name)
self.assertFalse(u'AWSTemplateFormatVersion' in yml, file_name)
del(yml[u'HeatTemplateFormatVersion'])
jsn = template_format.parse(json_str)
template_format.default_for_missing(jsn, 'AWSTemplateFormatVersion',
- template_format.CFN_VERSIONS)
+ template_format.CFN_VERSIONS)
if u'AWSTemplateFormatVersion' in jsn:
del(jsn[u'AWSTemplateFormatVersion'])
def compare_stacks(self, json_file, yaml_file, parameters):
t1 = self.load_template(json_file)
template_format.default_for_missing(t1, 'AWSTemplateFormatVersion',
- template_format.CFN_VERSIONS)
+ template_format.CFN_VERSIONS)
del(t1[u'AWSTemplateFormatVersion'])
t2 = self.load_template(yaml_file)
self.assertEqual(t1nr, t2nr)
self.assertEquals(set(stack1.resources.keys()),
- set(stack2.resources.keys()))
+ set(stack2.resources.keys()))
for key in stack1.resources:
self.assertEqual(stack1.resources[key].t, stack2.resources[key].t)
def test_wordpress_resolved(self):
self.compare_stacks('WordPress_Single_Instance.template',
- 'WordPress_Single_Instance.yaml',
- {'KeyName': 'test'})
+ 'WordPress_Single_Instance.yaml',
+ {'KeyName': 'test'})
def create_user(self, t, stack, resource_name):
resource = user.User(resource_name,
- t['Resources'][resource_name],
- stack)
+ t['Resources'][resource_name],
+ stack)
self.assertEqual(None, resource.validate())
self.assertEqual(None, resource.create())
self.assertEqual(user.User.CREATE_COMPLETE, resource.state)
self.assertEqual('CREATE_COMPLETE', resource.state)
self.assertEqual(user.User.UPDATE_REPLACE,
- resource.handle_update())
+ resource.handle_update())
resource.resource_id = None
self.assertEqual(None, resource.delete())
def create_access_key(self, t, stack, resource_name):
resource = user.AccessKey(resource_name,
- t['Resources'][resource_name],
- stack)
+ t['Resources'][resource_name],
+ stack)
self.assertEqual(None, resource.validate())
self.assertEqual(None, resource.create())
self.assertEqual(user.AccessKey.CREATE_COMPLETE,
resource = self.create_access_key(t, stack, 'HostKeys')
self.assertEqual(user.AccessKey.UPDATE_REPLACE,
- resource.handle_update())
+ resource.handle_update())
self.assertEqual(self.fc.access,
resource.resource_id)
"KeyName" : {
''' + \
- '"Description" : "Name of an existing EC2' + \
- 'KeyPair to enable SSH access to the instances",' + \
-'''
- "Type" : "String"
- }
- },
-
- "Resources" : {
- "WikiDatabase": {
- "Type": "AWS::EC2::Instance",
- "Properties": {
- "ImageId": "image_name",
- "InstanceType": "m1.large",
- "KeyName": { "Ref" : "KeyName" }
- }
- },
- "DataVolume" : {
- "Type" : "AWS::EC2::Volume",
- "Properties" : {
- "Size" : "6",
- "AvailabilityZone" : "nova"
- }
- },
- "MountPoint" : {
- "Type" : "AWS::EC2::VolumeAttachment",
- "Properties" : {
- "InstanceId" : { "Ref" : "%s" },
- "VolumeId" : { "Ref" : "DataVolume" },
- "Device" : "/dev/vdb"
+ '"Description" : "Name of an existing EC2' + \
+ 'KeyPair to enable SSH access to the instances",' + \
+ '''
+ "Type" : "String"
+ }
+ },
+
+ "Resources" : {
+ "WikiDatabase": {
+ "Type": "AWS::EC2::Instance",
+ "Properties": {
+ "ImageId": "image_name",
+ "InstanceType": "m1.large",
+ "KeyName": { "Ref" : "KeyName" }
+ }
+ },
+ "DataVolume" : {
+ "Type" : "AWS::EC2::Volume",
+ "Properties" : {
+ "Size" : "6",
+ "AvailabilityZone" : "nova"
+ }
+ },
+ "MountPoint" : {
+ "Type" : "AWS::EC2::VolumeAttachment",
+ "Properties" : {
+ "InstanceId" : { "Ref" : "%s" },
+ "VolumeId" : { "Ref" : "DataVolume" },
+ "Device" : "/dev/vdb"
+ }
+ }
}
}
- }
-}
-'''
+ '''
test_template_findinmap_valid = '''
{
"AWSTemplateFormatVersion" : "2010-09-09",
"Parameters" : {
"KeyName" : {
''' + \
- '"Description" : "Name of an existing EC2 KeyPair to' + \
- 'enable SSH access to the instances",' + \
-'''
- "Type" : "String"
- }
- },
-
- "Resources" : {
- "WikiDatabase": {
- "Type": "AWS::EC2::Instance",
- "Properties": {
- "ImageId": "image_name",
- "InstanceType": "m1.large",
- "KeyName": { "Ref" : "KeyName" }
- }
- },
- "DataVolume" : {
- "Type" : "AWS::EC2::Volume",
- "Properties" : {
- "Size" : "6",
- "AvailabilityZone" : "nova"
- }
- },
-
- "MountPoint" : {
- "Type" : "AWS::EC2::VolumeAttachment",
- "Properties" : {
- "InstanceId" : { "Ref" : "WikiDatabase" },
- "VolumeId" : { "Ref" : "DataVolume" },
- "Device" : "/dev/vdb"
+ '"Description" : "Name of an existing EC2 KeyPair to' + \
+ 'enable SSH access to the instances",' + \
+ '''
+ "Type" : "String"
+ }
+ },
+
+ "Resources" : {
+ "WikiDatabase": {
+ "Type": "AWS::EC2::Instance",
+ "Properties": {
+ "ImageId": "image_name",
+ "InstanceType": "m1.large",
+ "KeyName": { "Ref" : "KeyName" }
+ }
+ },
+ "DataVolume" : {
+ "Type" : "AWS::EC2::Volume",
+ "Properties" : {
+ "Size" : "6",
+ "AvailabilityZone" : "nova"
+ }
+ },
+
+ "MountPoint" : {
+ "Type" : "AWS::EC2::VolumeAttachment",
+ "Properties" : {
+ "InstanceId" : { "Ref" : "WikiDatabase" },
+ "VolumeId" : { "Ref" : "DataVolume" },
+ "Device" : "/dev/vdb"
+ }
+ }
}
}
- }
-}
-'''
+ '''
test_template_findinmap_invalid = '''
{
"AWSTemplateFormatVersion" : "2010-09-09",
"KeyName" : {
''' + \
- '"Description" : "Name of an existing EC2 KeyPair to enable SSH ' + \
- 'access to the instances",' + \
-''' "Type" : "String"
- }
- },
-
- "Mappings" : {
- "AWSInstanceType2Arch" : {
- "t1.micro" : { "Arch" : "64" },
- "m1.small" : { "Arch" : "64" },
- "m1.medium" : { "Arch" : "64" },
- "m1.large" : { "Arch" : "64" },
- "m1.xlarge" : { "Arch" : "64" },
- "m2.xlarge" : { "Arch" : "64" },
- "m2.2xlarge" : { "Arch" : "64" },
- "m2.4xlarge" : { "Arch" : "64" },
- "c1.medium" : { "Arch" : "64" },
- "c1.xlarge" : { "Arch" : "64" },
- "cc1.4xlarge" : { "Arch" : "64HVM" },
- "cc2.8xlarge" : { "Arch" : "64HVM" },
- "cg1.4xlarge" : { "Arch" : "64HVM" }
- }
- },
- "Resources" : {
- "WikiDatabase": {
- "Type": "AWS::EC2::Instance",
- "Properties": {
-''' + \
- '"ImageId" : { "Fn::FindInMap" : [ "DistroArch2AMI", { "Ref" : ' + \
- '"LinuxDistribution" },' + \
- '{ "Fn::FindInMap" : [ "AWSInstanceType2Arch", { "Ref" : ' + \
- '"InstanceType" }, "Arch" ] } ] },' + \
-'''
+ '"Description" : "Name of an existing EC2 KeyPair to enable SSH ' + \
+ 'access to the instances",' + \
+ ''' "Type" : "String"
+ }
+ },
+
+ "Mappings" : {
+ "AWSInstanceType2Arch" : {
+ "t1.micro" : { "Arch" : "64" },
+ "m1.small" : { "Arch" : "64" },
+ "m1.medium" : { "Arch" : "64" },
+ "m1.large" : { "Arch" : "64" },
+ "m1.xlarge" : { "Arch" : "64" },
+ "m2.xlarge" : { "Arch" : "64" },
+ "m2.2xlarge" : { "Arch" : "64" },
+ "m2.4xlarge" : { "Arch" : "64" },
+ "c1.medium" : { "Arch" : "64" },
+ "c1.xlarge" : { "Arch" : "64" },
+ "cc1.4xlarge" : { "Arch" : "64HVM" },
+ "cc2.8xlarge" : { "Arch" : "64HVM" },
+ "cg1.4xlarge" : { "Arch" : "64HVM" }
+ }
+ },
+ "Resources" : {
+ "WikiDatabase": {
+ "Type": "AWS::EC2::Instance",
+ "Properties": {
+ ''' + \
+ '"ImageId" : { "Fn::FindInMap" : [ "DistroArch2AMI", { "Ref" : ' + \
+ '"LinuxDistribution" },' + \
+ '{ "Fn::FindInMap" : [ "AWSInstanceType2Arch", { "Ref" : ' + \
+ '"InstanceType" }, "Arch" ] } ] },' + \
+ '''
"InstanceType": "m1.large",
"KeyName": { "Ref" : "KeyName"}
}
self.m.ReplayAll()
engine = service.EngineService('a', 't')
- res = dict(engine.
- validate_template(None, t))
+ res = dict(engine.validate_template(None, t))
print 'res %s' % res
self.assertEqual(res['Description'], 'test.')
self.m.ReplayAll()
engine = service.EngineService('a', 't')
- res = dict(engine.
- validate_template(None, t))
+ res = dict(engine.validate_template(None, t))
self.assertNotEqual(res['Description'], 'Successfully validated')
def test_validate_findinmap_valid(self):
self.m.ReplayAll()
engine = service.EngineService('a', 't')
- res = dict(engine.
- validate_template(None, t))
+ res = dict(engine.validate_template(None, t))
self.assertEqual(res['Description'], 'test.')
def test_validate_findinmap_invalid(self):
self.m.ReplayAll()
engine = service.EngineService('a', 't')
- res = dict(engine.
- validate_template(None, t))
+ res = dict(engine.validate_template(None, t))
self.assertNotEqual(res['Description'], 'Successfully validated')
# create script
vol.Volume.nova('volume').MultipleTimes().AndReturn(self.fc)
- self.fc.volumes.create(u'1',
- display_description='%s.DataVolume' % stack_name,
- display_name='%s.DataVolume' % stack_name).AndReturn(fv)
+ self.fc.volumes.create(
+ u'1', display_description='%s.DataVolume' % stack_name,
+ display_name='%s.DataVolume' % stack_name).AndReturn(fv)
# delete script
self.fc.volumes.get('vol-123').AndReturn(fv)
# create script
vol.Volume.nova('volume').AndReturn(self.fc)
- self.fc.volumes.create(u'1',
- display_description='%s.DataVolume' % stack_name,
- display_name='%s.DataVolume' % stack_name).AndReturn(fv)
+ self.fc.volumes.create(
+ u'1', display_description='%s.DataVolume' % stack_name,
+ display_name='%s.DataVolume' % stack_name).AndReturn(fv)
eventlet.sleep(1).AndReturn(None)
# volume create
vol.Volume.nova('volume').MultipleTimes().AndReturn(self.fc)
- self.fc.volumes.create(u'1',
- display_description='%s.DataVolume' % stack_name,
- display_name='%s.DataVolume' % stack_name).AndReturn(fv)
+ self.fc.volumes.create(
+ u'1', display_description='%s.DataVolume' % stack_name,
+ display_name='%s.DataVolume' % stack_name).AndReturn(fv)
# create script
vol.VolumeAttachment.nova().MultipleTimes().AndReturn(self.fc)
vol.VolumeAttachment.nova('volume').MultipleTimes().AndReturn(self.fc)
eventlet.sleep(1).MultipleTimes().AndReturn(None)
- self.fc.volumes.create_server_volume(device=u'/dev/vdc',
- server_id=u'WikiDatabase',
- volume_id=u'vol-123').AndReturn(fva)
+ self.fc.volumes.create_server_volume(
+ device=u'/dev/vdc',
+ server_id=u'WikiDatabase',
+ volume_id=u'vol-123').AndReturn(fva)
self.fc.volumes.get('vol-123').AndReturn(fva)
# volume create
vol.Volume.nova('volume').MultipleTimes().AndReturn(self.fc)
- self.fc.volumes.create(u'1',
- display_description='%s.DataVolume' % stack_name,
- display_name='%s.DataVolume' % stack_name).AndReturn(fv)
+ self.fc.volumes.create(
+ u'1', display_description='%s.DataVolume' % stack_name,
+ display_name='%s.DataVolume' % stack_name).AndReturn(fv)
# create script
vol.VolumeAttachment.nova().MultipleTimes().AndReturn(self.fc)
vol.VolumeAttachment.nova('volume').MultipleTimes().AndReturn(self.fc)
eventlet.sleep(1).MultipleTimes().AndReturn(None)
- self.fc.volumes.create_server_volume(device=u'/dev/vdc',
- server_id=u'WikiDatabase',
- volume_id=u'vol-123').AndReturn(fva)
+ self.fc.volumes.create_server_volume(
+ device=u'/dev/vdc',
+ server_id=u'WikiDatabase',
+ volume_id=u'vol-123').AndReturn(fva)
self.fc.volumes.get('vol-123').AndReturn(fva)
return stack
def create_vpc(self, t, stack, resource_name):
- resource = vpc.VPC('the_vpc',
- t['Resources'][resource_name],
- stack)
+ resource = vpc.VPC('the_vpc', t['Resources'][resource_name], stack)
self.assertEqual(None, resource.create())
self.assertEqual(vpc.VPC.CREATE_COMPLETE, resource.state)
return resource
wc.WaitCondition._create_timeout().AndReturn(eventlet.Timeout(5))
wc.WaitCondition._get_status_reason(
- mox.IgnoreArg()).AndReturn(('WAITING', ''))
+ mox.IgnoreArg()).AndReturn(('WAITING', ''))
eventlet.sleep(1).AndReturn(None)
wc.WaitCondition._get_status_reason(
- mox.IgnoreArg()).AndReturn(('WAITING', ''))
+ mox.IgnoreArg()).AndReturn(('WAITING', ''))
eventlet.sleep(1).AndReturn(None)
wc.WaitCondition._get_status_reason(
- mox.IgnoreArg()).AndReturn(('SUCCESS', 'woot toot'))
+ mox.IgnoreArg()).AndReturn(('SUCCESS', 'woot toot'))
self.m.StubOutWithMock(wc.WaitConditionHandle, 'keystone')
wc.WaitConditionHandle.keystone().MultipleTimes().AndReturn(self.fc)
tmo = eventlet.Timeout(6)
wc.WaitCondition._create_timeout().AndReturn(tmo)
wc.WaitCondition._get_status_reason(
- mox.IgnoreArg()).AndReturn(('WAITING', ''))
+ mox.IgnoreArg()).AndReturn(('WAITING', ''))
eventlet.sleep(1).AndReturn(None)
wc.WaitCondition._get_status_reason(
- mox.IgnoreArg()).AndReturn(('WAITING', ''))
+ mox.IgnoreArg()).AndReturn(('WAITING', ''))
eventlet.sleep(1).AndRaise(tmo)
self.m.StubOutWithMock(wc.WaitConditionHandle, 'keystone')
self.assertEqual(resource.state,
'CREATE_FAILED')
self.assertEqual(wc.WaitCondition.UPDATE_REPLACE,
- resource.handle_update())
+ resource.handle_update())
stack.delete()
# Stub waitcondition status so all goes CREATE_COMPLETE
self.m.StubOutWithMock(wc.WaitCondition, '_get_status_reason')
wc.WaitCondition._get_status_reason(
- mox.IgnoreArg()).AndReturn(('SUCCESS', 'woot toot'))
+ mox.IgnoreArg()).AndReturn(('SUCCESS', 'woot toot'))
self.m.StubOutWithMock(wc.WaitCondition, '_create_timeout')
wc.WaitCondition._create_timeout().AndReturn(eventlet.Timeout(5))
resource = stack.resources['WaitHandle']
self.assertEqual(resource.state, 'CREATE_COMPLETE')
- expected_url = "".join(
- ['http://127.0.0.1:8000/v1/waitcondition/',
- 'arn%3Aopenstack%3Aheat%3A%3Atest_tenant%3Astacks%2F',
- 'test_stack2%2FSTACKABCD1234%2Fresources%2F',
- 'WaitHandle?',
- 'Timestamp=2012-11-29T13%3A49%3A37Z&',
- 'SignatureMethod=HmacSHA256&',
- 'AWSAccessKeyId=4567&',
- 'SignatureVersion=2&',
- 'Signature=',
- 'ePyTwmC%2F1kSigeo%2Fha7kP8Avvb45G9Y7WOQWe4F%2BnXM%3D'
- ])
+ expected_url = "".join([
+ 'http://127.0.0.1:8000/v1/waitcondition/',
+ 'arn%3Aopenstack%3Aheat%3A%3Atest_tenant%3Astacks%2F',
+ 'test_stack2%2FSTACKABCD1234%2Fresources%2F',
+ 'WaitHandle?',
+ 'Timestamp=2012-11-29T13%3A49%3A37Z&',
+ 'SignatureMethod=HmacSHA256&',
+ 'AWSAccessKeyId=4567&',
+ 'SignatureVersion=2&',
+ 'Signature=',
+ 'ePyTwmC%2F1kSigeo%2Fha7kP8Avvb45G9Y7WOQWe4F%2BnXM%3D'])
self.assertEqual(expected_url, resource.FnGetRefId())
- self.assertEqual(resource.UPDATE_REPLACE,
- resource.handle_update())
+ self.assertEqual(resource.UPDATE_REPLACE, resource.handle_update())
stack.delete()
ctx = context.get_admin_context()
tmpl = db_api.raw_template_create(ctx, {'foo': 'bar'})
dummy_stack = {'id': '6754d843-bed2-40dc-a325-84882bb90a98',
- 'name': 'dummystack',
- 'raw_template_id': tmpl.id,
- 'user_creds_id': 1,
- 'username': 'dummyuser',
- 'owner_id': None,
- 'status': 'CREATE_COMPLETE',
- 'status_reason': 'foo status',
- 'parameters': {'foo': 'bar'},
- 'timeout': 60,
- 'tenant': 123456}
+ 'name': 'dummystack',
+ 'raw_template_id': tmpl.id,
+ 'user_creds_id': 1,
+ 'username': 'dummyuser',
+ 'owner_id': None,
+ 'status': 'CREATE_COMPLETE',
+ 'status_reason': 'foo status',
+ 'parameters': {'foo': 'bar'},
+ 'timeout': 60,
+ 'tenant': 123456}
db_ret = db_api.stack_create(ctx, dummy_stack)
cls.stack_id = db_ret.id
self.m.UnsetStubs()
def test_minimum(self):
- rule = {
- 'EvaluationPeriods': '1',
- 'MetricName': 'test_metric',
- 'Period': '300',
- 'Statistic': 'Minimum',
- 'ComparisonOperator': 'LessThanOrEqualToThreshold',
- 'Threshold': '50'}
+ rule = {'EvaluationPeriods': '1',
+ 'MetricName': 'test_metric',
+ 'Period': '300',
+ 'Statistic': 'Minimum',
+ 'ComparisonOperator': 'LessThanOrEqualToThreshold',
+ 'Threshold': '50'}
now = timeutils.utcnow()
last = now - datetime.timedelta(seconds=320)
self.assertEqual(new_state, 'ALARM')
def test_maximum(self):
- rule = {
- 'EvaluationPeriods': '1',
- 'MetricName': 'test_metric',
- 'Period': '300',
- 'Statistic': 'Maximum',
- 'ComparisonOperator': 'GreaterThanOrEqualToThreshold',
- 'Threshold': '30'}
+ rule = {'EvaluationPeriods': '1',
+ 'MetricName': 'test_metric',
+ 'Period': '300',
+ 'Statistic': 'Maximum',
+ 'ComparisonOperator': 'GreaterThanOrEqualToThreshold',
+ 'Threshold': '30'}
now = timeutils.utcnow()
last = now - datetime.timedelta(seconds=320)
def test_samplecount(self):
- rule = {
- 'EvaluationPeriods': '1',
- 'MetricName': 'test_metric',
- 'Period': '300',
- 'Statistic': 'SampleCount',
- 'ComparisonOperator': 'GreaterThanOrEqualToThreshold',
- 'Threshold': '3'}
+ rule = {'EvaluationPeriods': '1',
+ 'MetricName': 'test_metric',
+ 'Period': '300',
+ 'Statistic': 'SampleCount',
+ 'ComparisonOperator': 'GreaterThanOrEqualToThreshold',
+ 'Threshold': '3'}
now = timeutils.utcnow()
last = now - datetime.timedelta(seconds=320)
self.assertEqual(new_state, 'NORMAL')
def test_sum(self):
- rule = {
- 'EvaluationPeriods': '1',
- 'MetricName': 'test_metric',
- 'Period': '300',
- 'Statistic': 'Sum',
- 'ComparisonOperator': 'GreaterThanOrEqualToThreshold',
- 'Threshold': '100'}
+ rule = {'EvaluationPeriods': '1',
+ 'MetricName': 'test_metric',
+ 'Period': '300',
+ 'Statistic': 'Sum',
+ 'ComparisonOperator': 'GreaterThanOrEqualToThreshold',
+ 'Threshold': '100'}
now = timeutils.utcnow()
last = now - datetime.timedelta(seconds=320)
self.assertEqual(new_state, 'ALARM')
def test_ave(self):
- rule = {
- 'EvaluationPeriods': '1',
- 'MetricName': 'test_metric',
- 'Period': '300',
- 'Statistic': 'Average',
- 'ComparisonOperator': 'GreaterThanThreshold',
- 'Threshold': '100'}
+ rule = {'EvaluationPeriods': '1',
+ 'MetricName': 'test_metric',
+ 'Period': '300',
+ 'Statistic': 'Average',
+ 'ComparisonOperator': 'GreaterThanThreshold',
+ 'Threshold': '100'}
now = timeutils.utcnow()
last = now - datetime.timedelta(seconds=320)
values = {'stack_id': self.stack_id,
'state': 'NORMAL',
'name': u'HttpFailureAlarm',
- 'rule': {
- u'EvaluationPeriods': u'1',
- u'AlarmActions': [u'WebServerRestartPolicy'],
- u'AlarmDescription': u'Restart the WikiDatabase',
- u'Namespace': u'system/linux',
- u'Period': u'300',
- u'ComparisonOperator': u'GreaterThanThreshold',
- u'Statistic': u'SampleCount',
- u'Threshold': u'2',
- u'MetricName': u'ServiceFailure'}}
+ 'rule': {u'EvaluationPeriods': u'1',
+ u'AlarmActions': [u'WebServerRestartPolicy'],
+ u'AlarmDescription': u'Restart the WikiDatabase',
+ u'Namespace': u'system/linux',
+ u'Period': u'300',
+ u'ComparisonOperator': u'GreaterThanThreshold',
+ u'Statistic': u'SampleCount',
+ u'Threshold': u'2',
+ u'MetricName': u'ServiceFailure'}}
db_ret = db_api.watch_rule_create(self.ctx, values)
self.assertNotEqual(db_ret, None)
values['name'] = 'AnotherWatch'
def get_limits(self, **kw):
return (200, {"limits": {
- "rate": [
- {
- "uri": "*",
- "regex": ".*",
- "limit": [
- {
- "value": 10,
- "verb": "POST",
- "remaining": 2,
- "unit": "MINUTE",
- "next-available": "2011-12-15T22:42:45Z"
- },
- {
- "value": 10,
- "verb": "PUT",
- "remaining": 2,
- "unit": "MINUTE",
- "next-available": "2011-12-15T22:42:45Z"
- },
- {
- "value": 100,
- "verb": "DELETE",
- "remaining": 100,
- "unit": "MINUTE",
- "next-available": "2011-12-15T22:42:45Z"
- }
- ]
- },
- {
- "uri": "*/servers",
- "regex": "^/servers",
- "limit": [
- {
- "verb": "POST",
- "value": 25,
- "remaining": 24,
- "unit": "DAY",
- "next-available": "2011-12-15T22:42:45Z"
- }
- ]
- }
- ],
- "absolute": {
- "maxTotalRAMSize": 51200,
- "maxServerMeta": 5,
- "maxImageMeta": 5,
- "maxPersonality": 5,
- "maxPersonalitySize": 10240
- },
- },
- })
+ "rate": [{"uri": "*",
+ "regex": ".*",
+ "limit": [
+ {"value": 10,
+ "verb": "POST",
+ "remaining": 2,
+ "unit": "MINUTE",
+ "next-available": "2011-12-15T22:42:45Z"},
+ {"value": 10,
+ "verb": "PUT",
+ "remaining": 2,
+ "unit": "MINUTE",
+ "next-available": "2011-12-15T22:42:45Z"},
+ {"value": 100,
+ "verb": "DELETE",
+ "remaining": 100,
+ "unit": "MINUTE",
+ "next-available": "2011-12-15T22:42:45Z"}]},
+ {"uri": "*/servers",
+ "regex": "^/servers",
+ "limit": [{"verb": "POST",
+ "value": 25,
+ "remaining": 24,
+ "unit": "DAY",
+ "next-available":
+ "2011-12-15T22:42:45Z"}]}],
+ "absolute": {"maxTotalRAMSize": 51200,
+ "maxServerMeta": 5,
+ "maxImageMeta": 5,
+ "maxPersonality": 5,
+ "maxPersonalitySize": 10240}}})
#
# Servers
]})
def get_servers_detail(self, **kw):
- return (200, {"servers": [
- {
- "id": 1234,
- "name": "sample-server",
- "image": {
- "id": 2,
- "name": "sample image",
- },
- "flavor": {
- "id": 1,
- "name": "256 MB Server",
- },
- "hostId": "e4d909c290d0fb1ca068ffaddf22cbd0",
- "status": "BUILD",
- "progress": 60,
- "addresses": {
- "public": [{
- "version": 4,
- "addr": "1.2.3.4",
- },
- {
- "version": 4,
- "addr": "5.6.7.8",
- }],
- "private": [{
- "version": 4,
- "addr": "10.11.12.13",
- }],
- },
- "metadata": {
- "Server Label": "Web Head 1",
- "Image Version": "2.1"
- }
- },
- {
- "id": 5678,
- "name": "sample-server2",
- "image": {
- "id": 2,
- "name": "sample image",
- },
- "flavor": {
- "id": 1,
- "name": "256 MB Server",
- },
- "hostId": "9e107d9d372bb6826bd81d3542a419d6",
- "status": "ACTIVE",
- "addresses": {
- "public": [{
- "version": 4,
- "addr": "4.5.6.7",
- },
- {
- "version": 4,
- "addr": "5.6.9.8",
- }],
- "private": [{
- "version": 4,
- "addr": "10.13.12.13",
- }],
- },
- "metadata": {
- "Server Label": "DB 1"
- }
- },
- {
- "id": 9999,
- "name": "sample-server3",
- "image": {
- "id": 3,
- "name": "sample image",
- },
- "flavor": {
- "id": 3,
- "name": "m1.large",
- },
- "hostId": "9e107d9d372bb6826bd81d3542a419d6",
- "status": "ACTIVE",
- "addresses": {
- "public": [{
- "version": 4,
- "addr": "4.5.6.7",
- },
- {
- "version": 4,
- "addr": "5.6.9.8",
- }],
- "private": [{
- "version": 4,
- "addr": "10.13.12.13",
- }],
- },
- "metadata": {
- "Server Label": "DB 1"
- }
- }
- ]})
+ return (200, {"servers": [{"id": 1234,
+ "name": "sample-server",
+ "image": {"id": 2,
+ "name": "sample image"},
+ "flavor": {"id": 1,
+ "name": "256 MB Server"},
+ "hostId":
+ "e4d909c290d0fb1ca068ffaddf22cbd0",
+ "status": "BUILD",
+ "progress": 60,
+ "addresses": {"public": [{"version": 4,
+ "addr":
+ "1.2.3.4"},
+ {"version": 4,
+ "addr":
+ "5.6.7.8"}],
+ "private": [{"version": 4,
+ "addr": "10.11.12.13"}]},
+ "metadata": {"Server Label": "Web Head 1",
+ "Image Version": "2.1"}},
+ {"id": 5678,
+ "name": "sample-server2",
+ "image": {"id": 2,
+ "name": "sample image"},
+ "flavor": {"id": 1,
+ "name": "256 MB Server"},
+ "hostId":
+ "9e107d9d372bb6826bd81d3542a419d6",
+ "status": "ACTIVE",
+ "addresses": {"public": [{"version": 4,
+ "addr":
+ "4.5.6.7"},
+ {"version": 4,
+ "addr":
+ "5.6.9.8"}],
+ "private": [{"version": 4,
+ "addr": "10.13.12.13"}]},
+ "metadata": {"Server Label": "DB 1"}},
+ {"id": 9999,
+ "name": "sample-server3",
+ "image": {"id": 3,
+ "name": "sample image"},
+ "flavor": {"id": 3,
+ "name": "m1.large"},
+ "hostId":
+ "9e107d9d372bb6826bd81d3542a419d6",
+ "status": "ACTIVE",
+ "addresses": {
+ "public": [{"version": 4,
+ "addr": "4.5.6.7"},
+ {"version": 4,
+ "addr": "5.6.9.8"}],
+ "private": [{"version": 4,
+ "addr": "10.13.12.13"}]},
+ "metadata": {"Server Label": "DB 1"}}]})
def post_servers(self, body, **kw):
assert body.keys() == ['server']
fakes.assert_has_keys(body['server'],
- required=['name', 'imageRef', 'flavorRef'],
- optional=['metadata', 'personality'])
+ required=['name', 'imageRef', 'flavorRef'],
+ optional=['metadata', 'personality'])
if 'personality' in body['server']:
for pfile in body['server']['personality']:
fakes.assert_has_keys(pfile, required=['path', 'contents'])
return (200, 'Fake diagnostics')
def get_servers_1234_actions(self, **kw):
- return (200, {'actions': [
- {
- 'action': 'rebuild',
- 'error': None,
- 'created_at': '2011-12-30 11:45:36'
- },
- {
- 'action': 'reboot',
- 'error': 'Failed!',
- 'created_at': '2011-12-30 11:40:29'
- },
- ]})
+ return (200, {'actions': [{'action': 'rebuild',
+ 'error': None,
+ 'created_at': '2011-12-30 11:45:36'},
+ {'action': 'reboot',
+ 'error': 'Failed!',
+ 'created_at': '2011-12-30 11:40:29'}]})
#
# Server Addresses
#
def get_os_cloudpipe(self, **kw):
- return (200, {'cloudpipes': [
- {'project_id':1}
- ]})
+ return (200, {'cloudpipes': [{'project_id': 1}]})
def post_os_cloudpipe(self, **ks):
return (202, {'instance_id': '9d5824aa-20e6-4b9f-b967-76a699fc51fd'})
]})
def get_os_floating_ips_1(self, **kw):
- return (200, {'floating_ip':
- {'id': 1, 'fixed_ip': '10.0.0.1', 'ip': '11.0.0.1'}
- })
+ return (200, {'floating_ip': {'id': 1,
+ 'fixed_ip': '10.0.0.1',
+ 'ip': '11.0.0.1'}})
def post_os_floating_ips(self, body, **kw):
return (202, self.get_os_floating_ips_1()[1])
def post_os_floating_ips(self, body):
if body.get('pool'):
- return (200, {'floating_ip':
- {'id': 1, 'fixed_ip': '10.0.0.1', 'ip': '11.0.0.1',
- 'pool': 'nova'}})
+ return (200, {'floating_ip': {'id': 1,
+ 'fixed_ip': '10.0.0.1',
+ 'ip': '11.0.0.1',
+ 'pool': 'nova'}})
else:
- return (200, {'floating_ip':
- {'id': 1, 'fixed_ip': '10.0.0.1', 'ip': '11.0.0.1',
- 'pool': None}})
+ return (200, {'floating_ip': {'id': 1,
+ 'fixed_ip': '10.0.0.1',
+ 'ip': '11.0.0.1',
+ 'pool': None}})
def delete_os_floating_ips_1(self, **kw):
return (204, None)
def get_os_floating_ip_dns_testdomain_entries(self, **kw):
if kw.get('ip'):
return (205, {'dns_entries':
- [{'dns_entry':
- {'ip': kw.get('ip'),
- 'name': "host1",
- 'type': "A",
- 'domain': 'testdomain'}},
- {'dns_entry':
- {'ip': kw.get('ip'),
- 'name': "host2",
- 'type': "A",
- 'domain': 'testdomain'}}]})
+ [{'dns_entry': {'ip': kw.get('ip'),
+ 'name': "host1",
+ 'type': "A",
+ 'domain': 'testdomain'}},
+ {'dns_entry': {'ip': kw.get('ip'),
+ 'name': "host2",
+ 'type': "A",
+ 'domain': 'testdomain'}}]})
else:
return (404, None)
def get_os_floating_ip_dns_testdomain_entries_testname(self, **kw):
- return (205, {'dns_entry':
- {'ip': "10.10.10.10",
- 'name': 'testname',
- 'type': "A",
- 'domain': 'testdomain'}})
+ return (205, {'dns_entry': {'ip': "10.10.10.10",
+ 'name': 'testname',
+ 'type': "A",
+ 'domain': 'testdomain'}})
def put_os_floating_ip_dns_testdomain(self, body, **kw):
if body['domain_entry']['scope'] == 'private':
fakes.assert_has_keys(body['domain_entry'],
- required=['availability_zone', 'scope'])
+ required=['availability_zone', 'scope'])
elif body['domain_entry']['scope'] == 'public':
fakes.assert_has_keys(body['domain_entry'],
- required=['project', 'scope'])
+ required=['project', 'scope'])
else:
fakes.assert_has_keys(body['domain_entry'],
- required=['project', 'scope'])
+ required=['project', 'scope'])
return (205, None)
def put_os_floating_ip_dns_testdomain_entries_testname(self, body, **kw):
fakes.assert_has_keys(body['dns_entry'],
- required=['ip', 'dns_type'])
+ required=['ip', 'dns_type'])
return (205, None)
def delete_os_floating_ip_dns_testdomain(self, **kw):
# Images
#
def get_images(self, **kw):
- return (200, {'images': [
- {'id': 1, 'name': 'CentOS 5.2'},
- {'id': 2, 'name': 'My Server Backup'},
- {'id': 3, 'name': 'F17-x86_64-gold'},
- {'id': 4, 'name': 'F17-x86_64-cfntools'}
- ]})
+ return (200, {'images': [{'id': 1, 'name': 'CentOS 5.2'},
+ {'id': 2, 'name': 'My Server Backup'},
+ {'id': 3, 'name': 'F17-x86_64-gold'},
+ {'id': 4, 'name': 'F17-x86_64-cfntools'}]})
def get_images_detail(self, **kw):
- return (200, {'images': [
- {
- 'id': 1,
- 'name': 'CentOS 5.2',
- "updated": "2010-10-10T12:00:00Z",
- "created": "2010-08-10T12:00:00Z",
- "status": "ACTIVE",
- "metadata": {
- "test_key": "test_value",
- },
- "links": {},
- },
- {
- "id": 743,
- "name": "My Server Backup",
- "serverId": 1234,
- "updated": "2010-10-10T12:00:00Z",
- "created": "2010-08-10T12:00:00Z",
- "status": "SAVING",
- "progress": 80,
- "links": {},
- },
- {
- "id": 744,
- "name": "F17-x86_64-gold",
- "serverId": 9999,
- "updated": "2010-10-10T12:00:00Z",
- "created": "2010-08-10T12:00:00Z",
- "status": "SAVING",
- "progress": 80,
- "links": {},
- },
- {
- "id": 745,
- "name": "F17-x86_64-cfntools",
- "serverId": 9998,
- "updated": "2010-10-10T12:00:00Z",
- "created": "2010-08-10T12:00:00Z",
- "status": "SAVING",
- "progress": 80,
- "links": {},
- }
- ]})
+ return (200, {'images': [{'id': 1,
+ 'name': 'CentOS 5.2',
+ "updated": "2010-10-10T12:00:00Z",
+ "created": "2010-08-10T12:00:00Z",
+ "status": "ACTIVE",
+ "metadata": {"test_key": "test_value"},
+ "links": {}},
+ {"id": 743,
+ "name": "My Server Backup",
+ "serverId": 1234,
+ "updated": "2010-10-10T12:00:00Z",
+ "created": "2010-08-10T12:00:00Z",
+ "status": "SAVING",
+ "progress": 80,
+ "links": {}},
+ {"id": 744,
+ "name": "F17-x86_64-gold",
+ "serverId": 9999,
+ "updated": "2010-10-10T12:00:00Z",
+ "created": "2010-08-10T12:00:00Z",
+ "status": "SAVING",
+ "progress": 80,
+ "links": {}},
+ {"id": 745,
+ "name": "F17-x86_64-cfntools",
+ "serverId": 9998,
+ "updated": "2010-10-10T12:00:00Z",
+ "created": "2010-08-10T12:00:00Z",
+ "status": "SAVING",
+ "progress": 80,
+ "links": {}}]})
def get_images_1(self, **kw):
return (200, {'image': self.get_images_detail()[1]['images'][0]})
assert body.keys() == ['metadata']
fakes.assert_has_keys(body['metadata'],
required=['test_key'])
- return (200,
- {'metadata': self.get_images_1()[1]['image']['metadata']})
+ return (200, {'metadata': self.get_images_1()[1]['image']['metadata']})
def delete_images_1(self, **kw):
return (204, None)
# Keypairs
#
def get_os_keypairs(self, *kw):
- return (200, {"keypairs": [
- {'fingerprint': 'FAKE_KEYPAIR', 'name': 'test'}
- ]})
+ return (200, {"keypairs": [{'fingerprint': 'FAKE_KEYPAIR',
+ 'name': 'test'}]})
def delete_os_keypairs_test(self, **kw):
return (202, None)
# Security Groups
#
def get_os_security_groups(self, **kw):
- return (200, {"security_groups": [
- {'id': 1, 'name': 'test', 'description': 'FAKE_SECURITY_GROUP'}
- ]})
+ return (200, {"security_groups": [{'id': 1,
+ 'name': 'test',
+ 'description':
+ 'FAKE_SECURITY_GROUP'}]})
def get_os_security_groups_1(self, **kw):
- return (200, {"security_group":
- {'id': 1, 'name': 'test', 'description': 'FAKE_SECURITY_GROUP'}
- })
+ return (200, {"security_group": {'id': 1,
+ 'name': 'test',
+ 'description':
+ 'FAKE_SECURITY_GROUP'}})
def delete_os_security_groups_1(self, **kw):
return (202, None)
fakes.assert_has_keys(body['security_group'],
required=['name', 'description'])
r = {'security_group':
- self.get_os_security_groups()[1]['security_groups'][0]}
+ self.get_os_security_groups()[1]['security_groups'][0]}
return (202, r)
#
# Security Group Rules
#
def get_os_security_group_rules(self, **kw):
- return (200, {"security_group_rules": [
- {'id': 1, 'parent_group_id': 1, 'group_id': 2,
- 'ip_protocol': 'TCP', 'from_port': '22', 'to_port': 22,
- 'cidr': '10.0.0.0/8'}
- ]})
+ return (200, {"security_group_rules": [{'id': 1,
+ 'parent_group_id': 1,
+ 'group_id': 2,
+ 'ip_protocol': 'TCP',
+ 'from_port': '22',
+ 'to_port': 22,
+ 'cidr': '10.0.0.0/8'}]})
def delete_os_security_group_rules_1(self, **kw):
return (202, None)
def post_os_security_group_rules(self, body, **kw):
assert body.keys() == ['security_group_rule']
fakes.assert_has_keys(body['security_group_rule'],
- required=['parent_group_id'],
- optional=['group_id', 'ip_protocol', 'from_port',
- 'to_port', 'cidr'])
+ required=['parent_group_id'],
+ optional=['group_id', 'ip_protocol', 'from_port',
+ 'to_port', 'cidr'])
r = {'security_group_rule':
- self.get_os_security_group_rules()[1]['security_group_rules'][0]}
+ self.get_os_security_group_rules()[1]['security_group_rules'][0]}
return (202, r)
#
#
def get_os_aggregates(self, *kw):
return (200, {"aggregates": [
- {'id':'1',
+ {'id': '1',
'name': 'test',
'availability_zone': 'nova1'},
- {'id':'2',
+ {'id': '2',
'name': 'test2',
'availability_zone': 'nova1'},
]})