from heat.common import context
from heat.engine import auth
from heat.openstack.common import rpc
+import heat.openstack.common.rpc.common as rpc_common
from heat.common.wsgi import Request
from heat.api.v1 import exception
import heat.api.v1.stacks as stacks
u'StackName': u'wordpress', u'StackStatus': u'CREATE_COMPLETE'}]}}}
self.assertEqual(result, expected)
+ def test_list_rmt_aterr(self):
+ params = {'Action': 'ListStacks'}
+ dummy_req = self._dummy_GET_request(params)
+
+ # Insert an engine RPC error and ensure we map correctly to the
+ # heat exception type
+ self.m.StubOutWithMock(rpc, 'call')
+ rpc.call(dummy_req.context, 'engine', {'method': 'show_stack',
+ 'args': {'stack_name': None,
+ 'params': dict(dummy_req.params)}}
+ ).AndRaise(rpc_common.RemoteError("AttributeError"))
+
+ self.m.ReplayAll()
+
+ # Call the list controller function and compare the response
+ result = self.controller.list(dummy_req)
+ self.assert_(type(result) == exception.HeatInvalidParameterValueError)
+
+ def test_list_rmt_interr(self):
+ params = {'Action': 'ListStacks'}
+ dummy_req = self._dummy_GET_request(params)
+
+ # Insert an engine RPC error and ensure we map correctly to the
+ # heat exception type
+ self.m.StubOutWithMock(rpc, 'call')
+ rpc.call(dummy_req.context, 'engine', {'method': 'show_stack',
+ 'args': {'stack_name': None,
+ 'params': dict(dummy_req.params)}}
+ ).AndRaise(rpc_common.RemoteError("Exception"))
+
+ self.m.ReplayAll()
+
+ # Call the list controller function and compare the response
+ result = self.controller.list(dummy_req)
+ self.assert_(type(result) == exception.HeatInternalFailureError)
+
def test_describe(self):
# Format a dummy GET request to pass into the WSGI handler
stack_name = "wordpress"
self.assert_(response == expected)
+ def test_describe_aterr(self):
+ stack_name = "wordpress"
+ params = {'Action': 'DescribeStacks', 'StackName': stack_name}
+ dummy_req = self._dummy_GET_request(params)
+
+ # Insert an engine RPC error and ensure we map correctly to the
+ # heat exception type
+ self.m.StubOutWithMock(rpc, 'call')
+ rpc.call(dummy_req.context, 'engine', {'method': 'show_stack', 'args':
+ {'stack_name': stack_name,
+ 'params': dict(dummy_req.params)}}
+ ).AndRaise(rpc_common.RemoteError("AttributeError"))
+
+ self.m.ReplayAll()
+
+ result = self.controller.describe(dummy_req)
+ self.assert_(type(result) == exception.HeatInvalidParameterValueError)
+
def test_get_template_int_body(self):
''' Test the internal _get_template function '''
params = {'TemplateBody': "abcdef"}
self.assert_(response == expected)
+ def test_create_err_no_template(self):
+ # Format a dummy request with a missing template field
+ stack_name = "wordpress"
+ params = {'Action': 'CreateStack', 'StackName': stack_name}
+ dummy_req = self._dummy_GET_request(params)
+
+ result = self.controller.create(dummy_req)
+ self.assert_(type(result) == exception.HeatMissingParameterError)
+
+ def test_create_err_inval_template(self):
+ # Format a dummy request with an invalid TemplateBody
+ stack_name = "wordpress"
+ json_template = "!$%**_+}@~?"
+ params = {'Action': 'CreateStack', 'StackName': stack_name,
+ 'TemplateBody': '%s' % json_template}
+ dummy_req = self._dummy_GET_request(params)
+
+ result = self.controller.create(dummy_req)
+ self.assert_(type(result) == exception.HeatInvalidParameterValueError)
+
+ def test_create_err_rpcerr(self):
+ # Format a dummy request
+ stack_name = "wordpress"
+ template = {u'Foo': u'bar'}
+ json_template = json.dumps(template)
+ params = {'Action': 'CreateStack', 'StackName': stack_name,
+ 'TemplateBody': '%s' % json_template,
+ 'TimeoutInMinutes': 30,
+ 'Parameters.member.1.ParameterKey': 'InstanceType',
+ 'Parameters.member.1.ParameterValue': 'm1.xlarge'}
+ engine_parms = {u'InstanceType': u'm1.xlarge'}
+ engine_args = {'timeout_mins': u'30'}
+ dummy_req = self._dummy_GET_request(params)
+
+ # Insert an engine RPC error and ensure we map correctly to the
+ # heat exception type
+ self.m.StubOutWithMock(rpc, 'call')
+
+ rpc.call(dummy_req.context, 'engine', {'method': 'create_stack',
+ 'args':
+ {'stack_name': stack_name,
+ 'template': template,
+ 'params': engine_parms,
+ 'args': engine_args}}
+ ).AndRaise(rpc_common.RemoteError("AttributeError"))
+
+ self.m.ReplayAll()
+
+ result = self.controller.create(dummy_req)
+
+ self.assert_(type(result) == exception.HeatInvalidParameterValueError)
+
def test_update(self):
# Format a dummy request
stack_name = "wordpress"
self.assert_(response == expected)
+ def test_create_or_update_err(self):
+ result = self.controller.create_or_update(req={}, action="dsdgfdf")
+ self.assert_(type(result) == exception.HeatInternalFailureError)
+
def test_get_template(self):
# Format a dummy request
stack_name = "wordpress"
self.assert_(response == expected)
+ def test_get_template_err_rpcerr(self):
+ stack_name = "wordpress"
+ template = {u'Foo': u'bar'}
+ params = {'Action': 'GetTemplate', 'StackName': stack_name}
+ dummy_req = self._dummy_GET_request(params)
+
+ # Insert an engine RPC error and ensure we map correctly to the
+ # heat exception type
+ self.m.StubOutWithMock(rpc, 'call')
+ rpc.call(dummy_req.context, 'engine', {'method': 'get_template',
+ 'args':
+ {'stack_name': stack_name,
+ 'params': dict(dummy_req.params)}}
+ ).AndRaise(rpc_common.RemoteError("AttributeError"))
+
+ self.m.ReplayAll()
+
+ result = self.controller.get_template(dummy_req)
+
+ self.assert_(type(result) == exception.HeatInvalidParameterValueError)
+
+ def test_get_template_err_none(self):
+ stack_name = "wordpress"
+ template = {u'Foo': u'bar'}
+ params = {'Action': 'GetTemplate', 'StackName': stack_name}
+ dummy_req = self._dummy_GET_request(params)
+
+ # Stub out the RPC call to the engine to return None
+ # this test the "no such stack" error path
+ engine_resp = None
+
+ self.m.StubOutWithMock(rpc, 'call')
+ rpc.call(dummy_req.context, 'engine', {'method': 'get_template',
+ 'args':
+ {'stack_name': stack_name,
+ 'params': dict(dummy_req.params)}}).AndReturn(engine_resp)
+
+ self.m.ReplayAll()
+
+ result = self.controller.get_template(dummy_req)
+
+ self.assert_(type(result) == exception.HeatInvalidParameterValueError)
+
+ def test_validate_err_no_template(self):
+ # Format a dummy request with a missing template field
+ stack_name = "wordpress"
+ params = {'Action': 'ValidateTemplate'}
+ dummy_req = self._dummy_GET_request(params)
+
+ result = self.controller.validate_template(dummy_req)
+ self.assert_(type(result) == exception.HeatMissingParameterError)
+
+ def test_validate_err_inval_template(self):
+ # Format a dummy request with an invalid TemplateBody
+ json_template = "!$%**_+}@~?"
+ params = {'Action': 'ValidateTemplate',
+ 'TemplateBody': '%s' % json_template}
+ dummy_req = self._dummy_GET_request(params)
+
+ result = self.controller.validate_template(dummy_req)
+ self.assert_(type(result) == exception.HeatInvalidParameterValueError)
+
def test_delete(self):
# Format a dummy request
stack_name = "wordpress"
self.assert_(response == expected)
+ def test_delete_err_rpcerr(self):
+ stack_name = "wordpress"
+ params = {'Action': 'DeleteStack', 'StackName': stack_name}
+ dummy_req = self._dummy_GET_request(params)
+
+ # Insert an engine RPC error and ensure we map correctly to the
+ # heat exception type
+ self.m.StubOutWithMock(rpc, 'call')
+ rpc.call(dummy_req.context, 'engine', {'method': 'delete_stack',
+ 'args':
+ {'stack_name': stack_name,
+ 'params': dict(dummy_req.params)}}
+ ).AndRaise(rpc_common.RemoteError("AttributeError"))
+
+ self.m.ReplayAll()
+
+ result = self.controller.delete(dummy_req)
+
+ self.assert_(type(result) == exception.HeatInvalidParameterValueError)
+
def test_events_list(self):
# Format a dummy request
stack_name = "wordpress"
self.assert_(response == expected)
+ def test_events_list_err_rpcerr(self):
+ stack_name = "wordpress"
+ params = {'Action': 'DescribeStackEvents', 'StackName': stack_name}
+ dummy_req = self._dummy_GET_request(params)
+
+ # Insert an engine RPC error and ensure we map correctly to the
+ # heat exception type
+ self.m.StubOutWithMock(rpc, 'call')
+ rpc.call(dummy_req.context, 'engine', {'method': 'list_events',
+ 'args':
+ {'stack_name': stack_name,
+ 'params': dict(dummy_req.params)}}
+ ).AndRaise(rpc_common.RemoteError("Exception"))
+
+ self.m.ReplayAll()
+
+ result = self.controller.events_list(dummy_req)
+
+ self.assert_(type(result) == exception.HeatInternalFailureError)
+
def test_describe_stack_resource(self):
# Format a dummy request
stack_name = "wordpress"