from webob import Request
from heat.api.cfn.v1 import stacks
-from heat.api.cfn.v1 import waitcondition
+from heat.api.cfn.v1 import signal
from heat.common import wsgi
from heat.openstack.common import log as logging
mapper.connect("/", controller=stacks_resource, action="index")
- # Add controller which handles waitcondition notifications
+ # Add controller which handles signals on resources like:
+ # waitconditions and alarms.
# This is not part of the main CFN API spec, hence handle it
# separately via a different path
- waitcondition_controller = waitcondition.create_resource(conf)
+ signal_controller = signal.create_resource(conf)
mapper.connect('/waitcondition/{arn:.*}',
- controller=waitcondition_controller,
+ controller=signal_controller,
action='update_waitcondition',
conditions=dict(method=['PUT']))
+ mapper.connect('/signal/{arn:.*}',
+ controller=signal_controller,
+ action='signal',
+ conditions=dict(method=['POST']))
super(API, self).__init__(mapper)
import heat.openstack.common.rpc.common as rpc_common
-class WaitConditionController(object):
+class SignalController(object):
def __init__(self, options):
self.options = options
self.engine = rpc_client.EngineClient()
return {'resource': identity.resource_name, 'metadata': md}
+ def signal(self, req, body, arn):
+ con = req.context
+ identity = identifier.ResourceIdentifier.from_arn(arn)
+ try:
+ md = self.engine.resource_signal(
+ con,
+ stack_identity=dict(identity.stack()),
+ resource_name=identity.resource_name,
+ details=body)
+ except rpc_common.RemoteError as ex:
+ return exception.map_remote_error(ex)
+
def create_resource(options):
"""
- Stacks resource factory method.
+ Signal resource factory method.
"""
deserializer = wsgi.JSONRequestDeserializer()
- return wsgi.Resource(WaitConditionController(options), deserializer)
+ return wsgi.Resource(SignalController(options), deserializer)
'''
return base64.b64encode(data)
+ def signal(self, details=None):
+ '''
+ signal the resource. Subclasses should provide a handle_signal() method
+ to implement the signal, the base-class raise an exception if no
+ handler is implemented.
+ '''
+ if self.action in (self.SUSPEND, self.DELETE):
+ raise exception.ResourceFailure(Exception(
+ 'Can not send a signal to a Resource whilst actioning a %s' %
+ self.action))
+
+ if not callable(getattr(self, 'handle_signal', None)):
+ raise exception.ResourceFailure(Exception(
+ 'Resource %s is not able to receive a signal' % str(self)))
+
+ try:
+ self._add_event('signal', self.status, details)
+ self.handle_signal(details)
+ except Exception as ex:
+ logger.exception('signal %s : %s' % (str(self), str(ex)))
+ failure = exception.ResourceFailure(ex)
+ raise failure
+
def handle_update(self, json_snippet=None, tmpl_diff=None, prop_diff=None):
raise UpdateReplace(self.name)
return api.format_stack_resource(stack[resource_name])
+ @request_context
+ def resource_signal(self, cnxt, stack_identity, resource_name, details):
+ s = self._get_stack(cnxt, stack_identity)
+
+ # This is not "nice" converting to the stored context here,
+ # but this happens because the keystone user associated with the
+ # signal doesn't have permission to read the secret key of
+ # the user associated with the cfn-credentials file
+ user_creds = db_api.user_creds_get(s.user_creds_id)
+ stack_context = context.RequestContext.from_dict(user_creds)
+ stack = parser.Stack.load(stack_context, stack=s)
+
+ if resource_name not in stack:
+ raise exception.ResourceNotFound(resource_name=resource_name,
+ stack_name=stack.name)
+
+ resource = stack[resource_name]
+ if resource.id is None:
+ raise exception.ResourceNotAvailable(resource_name=resource_name)
+
+ if callable(stack[resource_name].signal):
+ stack[resource_name].signal(details)
+
@request_context
def find_physical_resource(self, cnxt, physical_resource_id):
"""
) = (
'/waitcondition', '/signal'
)
+SIGNAL_VERB = {WAITCONDITION: 'PUT',
+ SIGNAL: 'POST'}
class SignalResponder(resource.Resource):
# ensure the actual URL contains the quoted version...
unquoted_path = urllib.unquote(host_url.path + path)
request = {'host': host_url.netloc.lower(),
- 'verb': 'PUT',
+ 'verb': SIGNAL_VERB[signal_type],
'path': unquoted_path,
'params': {'SignatureMethod': 'HmacSHA256',
'SignatureVersion': '2',
resource_name=resource_name,
metadata=metadata))
+ def resource_signal(self, ctxt, stack_identity, resource_name, details):
+ """
+ Generate an alarm on the resource.
+ :param ctxt: RPC context.
+ :param stack_identity: Name of the stack.
+ :param resource_name: the Resource.
+ :param details: the details of the signal.
+ """
+ return self.call(ctxt, self.make_msg('resource_signal',
+ stack_identity=stack_identity,
+ resource_name=resource_name,
+ details=details))
+
def create_watch_data(self, ctxt, watch_name, stats_data):
'''
This could be used by CloudWatch and WaitConditions
# under the License.
from heat.engine import resource
+from heat.engine import signal_responder
from heat.openstack.common import log as logging
class ResourceWithRequiredProps(GenericResource):
properties_schema = {'Foo': {'Type': 'String',
'Required': True}}
+
+
+class SignalResource(signal_responder.SignalResponder):
+ properties_schema = {}
+ attributes_schema = {'AlarmUrl': 'Get a signed webhook'}
+
+ def handle_signal(self, details=None):
+ logger.warning('Signaled resource (Type "%s") %s' % (self.type(),
+ details))
+
+ def _resolve_attribute(self, name):
+ if name == 'AlarmUrl' and self.resource_id is not None:
+ return unicode(self._get_signed_url())
from heat.engine import service
from heat.engine.properties import Properties
from heat.engine.resources import instance as instances
+from heat.engine import resource as rsrs
from heat.engine import watchrule
from heat.openstack.common import threadgroup
from heat.tests.common import HeatTestCase
}
'''
+policy_template = '''
+{
+ "AWSTemplateFormatVersion" : "2010-09-09",
+ "Description" : "alarming",
+ "Resources" : {
+ "WebServerScaleDownPolicy" : {
+ "Type" : "AWS::AutoScaling::ScalingPolicy",
+ "Properties" : {
+ "AdjustmentType" : "ChangeInCapacity",
+ "AutoScalingGroupName" : "",
+ "Cooldown" : "60",
+ "ScalingAdjustment" : "-1"
+ }
+ }
+ }
+}
+'''
+
def create_context(mocks, user='stacks_test_user',
tenant_id='test_admin', password='stacks_test_password'):
return stack
-def get_alarm_stack(stack_name, ctx):
- t = template_format.parse(alarm_template)
+def get_stack(stack_name, ctx, template):
+ t = template_format.parse(template)
template = parser.Template(t)
stack = parser.Stack(ctx, stack_name, template)
return stack
self.m.VerifyAll()
+ def test_signal_reception(self):
+ stack = get_stack('signal_reception',
+ self.ctx,
+ policy_template)
+ self.stack = stack
+ self.m.ReplayAll()
+ stack.store()
+ stack.create()
+ test_data = {'food': 'yum'}
+
+ self.m.StubOutWithMock(service.EngineService, '_get_stack')
+ s = db_api.stack_get(self.ctx, self.stack.id)
+ service.EngineService._get_stack(self.ctx,
+ self.stack.identifier()).AndReturn(s)
+
+ self.m.StubOutWithMock(db_api, 'user_creds_get')
+ db_api.user_creds_get(mox.IgnoreArg()).MultipleTimes().AndReturn(
+ self.ctx.to_dict())
+
+ self.m.StubOutWithMock(rsrs.Resource, 'signal')
+ rsrs.Resource.signal(mox.IgnoreArg()).AndReturn(None)
+ self.m.ReplayAll()
+
+ result = self.eng.resource_signal(self.ctx,
+ dict(self.stack.identifier()),
+ 'WebServerScaleDownPolicy',
+ test_data)
+ self.m.VerifyAll()
+ self.stack.delete()
+
+ def test_signal_reception_no_resource(self):
+ stack = get_stack('signal_reception_no_resource',
+ self.ctx,
+ policy_template)
+ self.stack = stack
+ self.m.ReplayAll()
+ stack.store()
+ stack.create()
+ test_data = {'food': 'yum'}
+
+ self.m.StubOutWithMock(service.EngineService, '_get_stack')
+ s = db_api.stack_get(self.ctx, self.stack.id)
+ service.EngineService._get_stack(self.ctx,
+ self.stack.identifier()).AndReturn(s)
+
+ self.m.StubOutWithMock(db_api, 'user_creds_get')
+ db_api.user_creds_get(mox.IgnoreArg()).MultipleTimes().AndReturn(
+ self.ctx.to_dict())
+ self.m.ReplayAll()
+
+ self.assertRaises(exception.ResourceNotFound,
+ self.eng.resource_signal, self.ctx,
+ dict(self.stack.identifier()),
+ 'resource_does_not_exist',
+ test_data)
+ self.m.VerifyAll()
+ self.stack.delete()
+
@stack_context('service_metadata_test_stack')
def test_metadata(self):
test_metadata = {'foo': 'bar', 'baz': 'quux', 'blarg': 'wibble'}
self.assertEqual([], self.eng.stg[self.stack.id].threads)
def test_periodic_watch_task_created(self):
- stack = get_alarm_stack('period_watch_task_created',
- create_context(self.m))
+ stack = get_stack('period_watch_task_created',
+ create_context(self.m),
+ alarm_template)
self.stack = stack
self.m.ReplayAll()
stack.store()
resource_name='LogicalResourceId',
metadata={u'wordpress': []})
+ def test_resource_signal(self):
+ self._test_engine_api('resource_signal', 'call',
+ stack_identity=self.identity,
+ resource_name='LogicalResourceId',
+ details={u'wordpress': []})
+
def test_create_watch_data(self):
self._test_engine_api('create_watch_data', 'call',
watch_name='watch1',
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import uuid
+
+from oslo.config import cfg
+
+from heat.tests import generic_resource
+from heat.tests import fakes
+from heat.tests.common import HeatTestCase
+from heat.tests.utils import stack_delete_after
+from heat.tests.utils import setup_dummy_db
+
+from heat.common import context
+from heat.common import exception
+from heat.common import template_format
+
+from heat.engine import parser
+from heat.engine import resource
+from heat.engine import signal_responder as sr
+
+
+test_template_signal = '''
+{
+ "AWSTemplateFormatVersion" : "2010-09-09",
+ "Description" : "Just a test.",
+ "Parameters" : {},
+ "Resources" : {
+ "signal_handler" : {"Type" : "SignalResourceType"},
+ "resource_X" : {"Type" : "GenericResourceType"}
+ },
+ "Outputs": {
+ "signed_url": {"Fn::GetAtt": ["signal_handler", "AlarmUrl"]}
+ }
+}
+'''
+
+
+class UUIDStub(object):
+ def __init__(self, value):
+ self.value = value
+
+ def __enter__(self):
+ self.uuid4 = uuid.uuid4
+ uuid_stub = lambda: self.value
+ uuid.uuid4 = uuid_stub
+
+ def __exit__(self, *exc_info):
+ uuid.uuid4 = self.uuid4
+
+
+class SignalTest(HeatTestCase):
+
+ def setUp(self):
+ super(SignalTest, self).setUp()
+ setup_dummy_db()
+
+ resource._register_class('SignalResourceType',
+ generic_resource.SignalResource)
+ resource._register_class('GenericResourceType',
+ generic_resource.GenericResource)
+
+ cfg.CONF.set_default('heat_waitcondition_server_url',
+ 'http://127.0.0.1:8000/v1/waitcondition')
+
+ self.stack_id = 'STACKABCD1234'
+ self.fc = fakes.FakeKeystoneClient()
+
+ # Note tests creating a stack should be decorated with @stack_delete_after
+ # to ensure the stack is properly cleaned up
+ def create_stack(self, stack_name='test_stack', stub=True):
+ temp = template_format.parse(test_template_signal)
+ template = parser.Template(temp)
+ ctx = context.get_admin_context()
+ ctx.tenant_id = 'test_tenant'
+ stack = parser.Stack(ctx, stack_name, template,
+ disable_rollback=True)
+
+ # Stub out the stack ID so we have a known value
+ with UUIDStub(self.stack_id):
+ stack.store()
+
+ if stub:
+ self.m.StubOutWithMock(sr.SignalResponder, 'keystone')
+ sr.SignalResponder.keystone().MultipleTimes().AndReturn(
+ self.fc)
+ return stack
+
+ @stack_delete_after
+ def test_FnGetAtt_Alarm_Url(self):
+ self.stack = self.create_stack()
+
+ self.m.ReplayAll()
+ self.stack.create()
+
+ rsrc = self.stack.resources['signal_handler']
+ created_time = datetime.datetime(2012, 11, 29, 13, 49, 37)
+ rsrc.created_time = created_time
+ self.assertEqual(rsrc.state, (rsrc.CREATE, rsrc.COMPLETE))
+
+ expected_url = "".join([
+ 'http://127.0.0.1:8000/v1/signal/',
+ 'arn%3Aopenstack%3Aheat%3A%3Atest_tenant%3Astacks%2F',
+ 'test_stack%2FSTACKABCD1234%2Fresources%2F',
+ 'signal_handler?',
+ 'Timestamp=2012-11-29T13%3A49%3A37Z&',
+ 'SignatureMethod=HmacSHA256&',
+ 'AWSAccessKeyId=4567&',
+ 'SignatureVersion=2&',
+ 'Signature=',
+ 'MJIFh7LKCpVlK6pCxe2WfYrRsfO7FU3Wt%2BzQFo2rYSY%3D'])
+
+ self.assertEqual(expected_url, rsrc.FnGetAtt('AlarmUrl'))
+ self.m.VerifyAll()
+
+ @stack_delete_after
+ def test_signal(self):
+ test_d = {'Data': 'foo', 'Reason': 'bar',
+ 'Status': 'SUCCESS', 'UniqueId': '123'}
+
+ self.stack = self.create_stack()
+
+ # to confirm we get a call to handle_signal
+ self.m.StubOutWithMock(generic_resource.SignalResource,
+ 'handle_signal')
+ generic_resource.SignalResource.handle_signal(test_d).AndReturn(None)
+
+ self.m.ReplayAll()
+ self.stack.create()
+
+ rsrc = self.stack.resources['signal_handler']
+ self.assertEqual(rsrc.state, (rsrc.CREATE, rsrc.COMPLETE))
+
+ rsrc.signal(details=test_d)
+
+ self.m.VerifyAll()
+
+ @stack_delete_after
+ def test_signal_wrong_resource(self):
+ # assert that we get the correct exception when calling a
+ # resource.signal() that does not have a handle_signal()
+ self.stack = self.create_stack()
+
+ self.m.ReplayAll()
+ self.stack.create()
+
+ rsrc = self.stack.resources['resource_X']
+ self.assertEqual(rsrc.state, (rsrc.CREATE, rsrc.COMPLETE))
+
+ err_metadata = {'Data': 'foo', 'Status': 'SUCCESS', 'UniqueId': '123'}
+ self.assertRaises(exception.ResourceFailure, rsrc.signal,
+ details=err_metadata)
+
+ self.m.VerifyAll()
+
+ @stack_delete_after
+ def test_signal_reception_wrong_state(self):
+ # assert that we get the correct exception when calling a
+ # resource.signal() that is in having a destructive action.
+ self.stack = self.create_stack()
+
+ self.m.ReplayAll()
+ self.stack.create()
+
+ rsrc = self.stack.resources['signal_handler']
+ self.assertEqual(rsrc.state, (rsrc.CREATE, rsrc.COMPLETE))
+ # manually override the action to DELETE
+ rsrc.action = rsrc.DELETE
+
+ err_metadata = {'Data': 'foo', 'Status': 'SUCCESS', 'UniqueId': '123'}
+ self.assertRaises(exception.ResourceFailure, rsrc.signal,
+ details=err_metadata)
+
+ self.m.VerifyAll()
+
+ @stack_delete_after
+ def test_signal_reception_failed_call(self):
+ # assert that we get the correct exception from resource.signal()
+ # when resource.handle_signal() raises an exception.
+ self.stack = self.create_stack()
+
+ test_d = {'Data': 'foo', 'Reason': 'bar',
+ 'Status': 'SUCCESS', 'UniqueId': '123'}
+
+ # to confirm we get a call to handle_signal
+ self.m.StubOutWithMock(generic_resource.SignalResource,
+ 'handle_signal')
+ generic_resource.SignalResource.handle_signal(test_d).AndRaise(
+ ValueError)
+
+ self.m.ReplayAll()
+ self.stack.create()
+
+ rsrc = self.stack.resources['signal_handler']
+ self.assertEqual(rsrc.state, (rsrc.CREATE, rsrc.COMPLETE))
+
+ self.assertRaises(exception.ResourceFailure,
+ rsrc.signal, details=test_d)
+
+ self.m.VerifyAll()