hosts = []
for host in services:
delta = curr_time - (host['updated_at'] or host['created_at'])
- alive = abs(utils.total_seconds(delta)) <= CONF.service_down_time
+ alive = abs(delta.total_seconds()) <= CONF.service_down_time
status = (alive and "available") or "unavailable"
active = 'enabled'
if host['disabled']:
svcs = []
for svc in services:
delta = now - (svc['updated_at'] or svc['created_at'])
- alive = abs(utils.total_seconds(delta)) <= CONF.service_down_time
+ alive = abs(delta.total_seconds()) <= CONF.service_down_time
art = (alive and "up") or "down"
active = 'enabled'
if svc['disabled']:
from oslo.utils import timeutils
import stubout
import testtools
-from testtools import matchers
from cinder.common import config # noqa Need to register global_opts
from cinder.db import migration
'd1value': d1value,
'd2value': d2value,
})
-
- def assertGreater(self, first, second, msg=None):
- """Python < v2.7 compatibility. Assert 'first' > 'second'."""
- try:
- f = super(TestCase, self).assertGreater
- except AttributeError:
- self.assertThat(first,
- matchers.GreaterThan(second),
- message=msg or '')
- else:
- f(first, second, msg=msg)
-
- def assertGreaterEqual(self, first, second, msg=None):
- """Python < v2.7 compatibility. Assert 'first' >= 'second'."""
- try:
- f = super(TestCase, self).assertGreaterEqual
- except AttributeError:
- self.assertThat(first,
- matchers.Not(matchers.LessThan(second)),
- message=msg or '')
- else:
- f(first, second, msg=msg)
self.driver.delete_volume(clone)
self._assert_vol_exists(clone['name'], False)
- # Note defined in python 2.6, so define here...
- def assertLessEqual(self, a, b, msg=None):
- if not a <= b:
- self.fail('%s not less than or equal to %s' % (repr(a), repr(b)))
-
def test_storwize_svc_get_volume_stats(self):
self._set_flag('reserved_percentage', 25)
stats = self.driver.get_volume_stats()
return path
-def total_seconds(td):
- """Local total_seconds implementation for compatibility with python 2.6."""
- if hasattr(td, 'total_seconds'):
- return td.total_seconds()
- else:
- return ((td.days * 86400 + td.seconds) * 10 ** 6 +
- td.microseconds) / 10.0 ** 6
-
-
def sanitize_hostname(hostname):
"""Return a hostname which conforms to RFC-952 and RFC-1123 specs."""
if isinstance(hostname, unicode):
"""Check whether a service is up based on last heartbeat."""
last_heartbeat = service['updated_at'] or service['created_at']
# Timestamps in DB are UTC.
- elapsed = total_seconds(timeutils.utcnow() - last_heartbeat)
+ elapsed = (timeutils.utcnow() - last_heartbeat).total_seconds()
return abs(elapsed) <= CONF.service_down_time