resource_name=resource_name, metadata=metadata),
topic=_engine_topic(self.topic, ctxt, None))
- def event_create(self, ctxt, event):
- return self.call(ctxt, self.make_msg('event_create',
- event=event),
- topic=_engine_topic(self.topic, ctxt, None))
-
def create_watch_data(self, ctxt, watch_name, stats_data):
'''
This could be used by CloudWatch and WaitConditions
return {'events': [api.format_event(context, e) for e in events]}
- def event_create(self, context, event):
- stack_name = event['stack']
- resource_name = event['resource']
- stack = db_api.stack_get_by_name(context, stack_name)
- resource = db_api.resource_get_by_name_and_stack(context,
- resource_name,
- stack.id)
- if not resource:
- return ['Unknown resource', None]
- new_event = {
- 'name': event['message'],
- 'resource_status_reason': event['reason'],
- 'StackId': stack.id,
- 'LogicalResourceId': resource.name,
- 'PhysicalResourceId': None,
- 'ResourceType': event['resource_type'],
- 'ResourceProperties': {},
- }
- try:
- result = db_api.event_create(context, new_event)
- new_event['id'] = result.id
- return [None, new_event]
- except Exception as ex:
- logger.warn('db error %s' % str(ex))
- msg = 'Error creating event'
- return [msg, None]
-
def describe_stack_resource(self, context, stack_identity, resource_name):
s = self._get_stack(context, stack_identity)
resource_name='LogicalResourceId',
metadata={u'wordpress': []})
- def test_event_create(self):
- self._test_engine_api('event_create', 'call',
- event={
- 'stack': 'wordpress',
- 'resource': 'LogicalResourceId',
- 'message': 'Foo',
- 'reason': 'Bar'})
-
def test_create_watch_data(self):
self._test_engine_api('create_watch_data', 'call',
watch_name='watch1',