'StackNotFound': exc.HTTPNotFound,
'ResourceNotFound': exc.HTTPNotFound,
'ResourceNotAvailable': exc.HTTPNotFound,
+ 'PhysicalResourceNotFound': exc.HTTPNotFound,
'InvalidTenant': exc.HTTPForbidden,
'StackExists': exc.HTTPConflict,
}
rs = db_api.resource_get_by_physical_resource_id(context,
physical_resource_id)
if not rs:
- msg = "The specified PhysicalResourceId doesn't exist"
- raise AttributeError(msg)
+ raise exception.PhysicalResourceNotFound(
+ resource_id=physical_resource_id)
stack = parser.Stack.load(context, stack=rs.stack)
resource = stack[rs.name]
self.assertEqual(response, expected)
+ def test_describe_stack_resources_physical_not_found(self):
+ # Format a dummy request
+ stack_name = "wordpress"
+ identity = dict(identifier.HeatIdentifier('t', stack_name, '6'))
+ params = {'Action': 'DescribeStackResources',
+ 'LogicalResourceId': "WikiDatabase",
+ 'PhysicalResourceId': 'aaaaaaaa-9f88-404d-cccc-ffffffffffff'}
+ dummy_req = self._dummy_GET_request(params)
+
+ # Stub out the RPC call to the engine with a pre-canned response
+ self.m.StubOutWithMock(rpc, 'call')
+ rpc.call(dummy_req.context, self.topic,
+ {'method': 'find_physical_resource',
+ 'args': {'physical_resource_id':
+ 'aaaaaaaa-9f88-404d-cccc-ffffffffffff'},
+ 'version': self.api_version},
+ None).AndRaise(
+ rpc_common.RemoteError("PhysicalResourceNotFound"))
+
+ self.m.ReplayAll()
+
+ response = self.controller.describe_stack_resources(dummy_req)
+
+ self.assertEqual(type(response),
+ exception.HeatInvalidParameterValueError)
+
def test_describe_stack_resources_err_inval(self):
# Format a dummy request containing both StackName and
# PhysicalResourceId, which is invalid and should throw a
self.assertEqual(resource_identity.resource_name, 'WebServer')
def test_find_physical_resource_nonexist(self):
- self.assertRaises(AttributeError,
+ self.assertRaises(exception.PhysicalResourceNotFound,
self.man.find_physical_resource,
self.ctx, 'foo')