import urlparse
import json
-import eventlet
from oslo.config import cfg
from keystoneclient.contrib.ec2.utils import Ec2Signer
from heat.common import exception
from heat.common import identifier
from heat.engine import resource
+from heat.engine import scheduler
from heat.openstack.common import log as logging
"""
expected_keys = ['Data', 'Reason', 'Status', 'UniqueId']
if sorted(metadata.keys()) == expected_keys:
- return metadata['Status'] in (SUCCESS, FAILURE)
+ return metadata['Status'] in WAIT_STATUSES
def metadata_update(self, new_metadata=None):
'''
WAIT_STATUSES = (
- FAILURE,
- TIMEDOUT,
- SUCCESS,
+ STATUS_FAILURE,
+ STATUS_SUCCESS,
) = (
'FAILURE',
- 'TIMEDOUT',
'SUCCESS',
)
+class WaitConditionFailure(Exception):
+ def __init__(self, wait_condition, handle):
+ reasons = handle.get_status_reason(STATUS_FAILURE)
+ super(WaitConditionFailure, self).__init__(reasons)
+
+
+class WaitConditionTimeout(Exception):
+ def __init__(self, wait_condition, handle):
+ reasons = handle.get_status_reason(STATUS_SUCCESS)
+ message = '%d of %d received' % (len(reasons), wait_condition.count)
+ if reasons:
+ message += ' - %s' % reasons
+
+ super(WaitConditionTimeout, self).__init__(message)
+
+
class WaitCondition(resource.Resource):
properties_schema = {'Handle': {'Type': 'String',
'Required': True},
'Count': {'Type': 'Number',
'MinValue': '1'}}
- # Sleep time between polling for wait completion
- # is calculated as a fraction of timeout time
- # bounded by MIN_SLEEP and MAX_SLEEP
- MIN_SLEEP = 1 # seconds
- MAX_SLEEP = 10
- SLEEP_DIV = 100 # 1/100'th of timeout
-
def __init__(self, name, json_snippet, stack):
super(WaitCondition, self).__init__(name, json_snippet, stack)
- self.timeout = int(self.t['Properties']['Timeout'])
self.count = int(self.t['Properties'].get('Count', '1'))
- self.sleep_time = max(min(self.MAX_SLEEP,
- self.timeout / self.SLEEP_DIV),
- self.MIN_SLEEP)
def _validate_handle_url(self):
handle_url = self.properties['Handle']
handle_id = identifier.ResourceIdentifier.from_arn_url(handle_url)
return handle_id.resource_name
- def _create_timeout(self):
- return eventlet.Timeout(self.timeout)
+ def _wait(self, handle):
+ while True:
+ try:
+ yield
+ except scheduler.Timeout:
+ timeout = WaitConditionTimeout(self, handle)
+ logger.info('%s Timed out (%s)' % (str(self), str(timeout)))
+ raise timeout
+
+ handle_status = handle.get_status()
+
+ if any(s != STATUS_SUCCESS for s in handle_status):
+ failure = WaitConditionFailure(self, handle)
+ logger.info('%s Failed (%s)' % (str(self), str(failure)))
+ raise failure
+
+ if len(handle_status) >= self.count:
+ logger.info("%s Succeeded" % str(self))
+ return
def handle_create(self):
self._validate_handle_url()
- tmo = None
- status = FAILURE
- reason = "Unknown reason"
- try:
- # keep polling our Metadata to see if the cfn-signal has written
- # it yet. The execution here is limited by timeout.
- with self._create_timeout() as tmo:
- handle_res_name = self._get_handle_resource_name()
- handle = self.stack[handle_res_name]
- self.resource_id_set(handle_res_name)
-
- # Poll for WaitConditionHandle signals indicating
- # SUCCESS/FAILURE. We need self.count SUCCESS signals
- # before we can declare the WaitCondition CREATE_COMPLETE
- handle_status = handle.get_status()
- while (FAILURE not in handle_status
- and len(handle_status) < self.count):
- logger.debug('Polling for WaitCondition completion,' +
- ' sleeping for %s seconds, timeout %s' %
- (self.sleep_time, self.timeout))
- eventlet.sleep(self.sleep_time)
- handle_status = handle.get_status()
-
- if FAILURE in handle_status:
- reason = handle.get_status_reason(FAILURE)
- elif (len(handle_status) == self.count and
- handle_status == [SUCCESS] * self.count):
- logger.debug("WaitCondition %s SUCCESS" % self.name)
- status = SUCCESS
-
- except eventlet.Timeout as t:
- if t is not tmo:
- # not my timeout
- raise
- else:
- (status, reason) = (TIMEDOUT, 'Timed out waiting for instance')
-
- if status != SUCCESS:
- raise exception.Error(reason)
+ handle_res_name = self._get_handle_resource_name()
+ handle = self.stack[handle_res_name]
+ self.resource_id_set(handle_res_name)
+
+ runner = scheduler.TaskRunner(self._wait, handle)
+ runner.start(timeout=float(self.properties['Timeout']))
+ return runner
+
+ def check_create_complete(self, runner):
+ return runner.step()
def handle_update(self, json_snippet):
return self.UPDATE_REPLACE
# License for the specific language governing permissions and limitations
# under the License.
+import mox
-import uuid
import datetime
+import time
import json
+import uuid
import eventlet
from oslo.config import cfg
from heat.common import template_format
from heat.common import identifier
from heat.engine import parser
+from heat.engine import scheduler
from heat.engine.resources import wait_condition as wc
from heat.common import config
from heat.common import context
setup_dummy_db()
self.m.StubOutWithMock(wc.WaitConditionHandle,
'get_status')
- self.m.StubOutWithMock(wc.WaitCondition,
- '_create_timeout')
self.m.StubOutWithMock(eventlet, 'sleep')
cfg.CONF.set_default('heat_waitcondition_server_url',
@stack_delete_after
def test_post_success_to_handle(self):
self.stack = self.create_stack()
- wc.WaitCondition._create_timeout().AndReturn(eventlet.Timeout(5))
wc.WaitConditionHandle.get_status().AndReturn([])
eventlet.sleep(1).AndReturn(None)
wc.WaitConditionHandle.get_status().AndReturn([])
@stack_delete_after
def test_post_failure_to_handle(self):
self.stack = self.create_stack()
- wc.WaitCondition._create_timeout().AndReturn(eventlet.Timeout(5))
wc.WaitConditionHandle.get_status().AndReturn([])
eventlet.sleep(1).AndReturn(None)
wc.WaitConditionHandle.get_status().AndReturn([])
self.stack.create()
resource = self.stack.resources['WaitForTheHandle']
- self.assertEqual(resource.state,
- 'CREATE_FAILED')
+ self.assertEqual(resource.state, resource.CREATE_FAILED)
+ reason = resource.state_description
+ self.assertTrue(reason.startswith('WaitConditionFailure:'))
r = db_api.resource_get_by_name_and_stack(None, 'WaitHandle',
self.stack.id)
@stack_delete_after
def test_post_success_to_handle_count(self):
self.stack = self.create_stack(template=test_template_wc_count)
- wc.WaitCondition._create_timeout().AndReturn(eventlet.Timeout(5))
wc.WaitConditionHandle.get_status().AndReturn([])
eventlet.sleep(1).AndReturn(None)
wc.WaitConditionHandle.get_status().AndReturn(['SUCCESS'])
@stack_delete_after
def test_post_failure_to_handle_count(self):
self.stack = self.create_stack(template=test_template_wc_count)
- wc.WaitCondition._create_timeout().AndReturn(eventlet.Timeout(5))
wc.WaitConditionHandle.get_status().AndReturn([])
eventlet.sleep(1).AndReturn(None)
wc.WaitConditionHandle.get_status().AndReturn(['SUCCESS'])
self.stack.create()
resource = self.stack.resources['WaitForTheHandle']
- self.assertEqual(resource.state,
- 'CREATE_FAILED')
+ self.assertEqual(resource.state, resource.CREATE_FAILED)
+ reason = resource.state_description
+ self.assertTrue(reason.startswith('WaitConditionFailure:'))
r = db_api.resource_get_by_name_and_stack(None, 'WaitHandle',
self.stack.id)
@stack_delete_after
def test_timeout(self):
+ st = time.time()
+
self.stack = self.create_stack()
- tmo = eventlet.Timeout(6)
- wc.WaitCondition._create_timeout().AndReturn(tmo)
+
+ self.m.StubOutWithMock(scheduler, 'wallclock')
+
+ scheduler.wallclock().AndReturn(st)
+ scheduler.wallclock().AndReturn(st + 0.001)
+ scheduler.wallclock().AndReturn(st + 0.1)
wc.WaitConditionHandle.get_status().AndReturn([])
- eventlet.sleep(1).AndReturn(None)
+ eventlet.sleep(mox.IsA(int)).AndReturn(None)
+ scheduler.wallclock().AndReturn(st + 4.1)
wc.WaitConditionHandle.get_status().AndReturn([])
- eventlet.sleep(1).AndRaise(tmo)
+ eventlet.sleep(mox.IsA(int)).AndReturn(None)
+ scheduler.wallclock().AndReturn(st + 5.1)
self.m.ReplayAll()
resource = self.stack.resources['WaitForTheHandle']
- self.assertEqual(resource.state,
- 'CREATE_FAILED')
+ self.assertEqual(resource.state, resource.CREATE_FAILED)
+ reason = resource.state_description
+ self.assertTrue(reason.startswith('WaitConditionTimeout:'))
+
self.assertEqual(wc.WaitCondition.UPDATE_REPLACE,
resource.handle_update({}))
self.m.VerifyAll()
@stack_delete_after
def test_FnGetAtt(self):
self.stack = self.create_stack()
- wc.WaitCondition._create_timeout().AndReturn(eventlet.Timeout(5))
wc.WaitConditionHandle.get_status().AndReturn(['SUCCESS'])
self.m.ReplayAll()
# Stub waitcondition status so all goes CREATE_COMPLETE
self.m.StubOutWithMock(wc.WaitConditionHandle, 'get_status')
wc.WaitConditionHandle.get_status().AndReturn(['SUCCESS'])
- self.m.StubOutWithMock(wc.WaitCondition, '_create_timeout')
- wc.WaitCondition._create_timeout().AndReturn(eventlet.Timeout(5))
# Stub keystone() with fake client
self.m.StubOutWithMock(wc.WaitConditionHandle, 'keystone')