from cinder.brick.initiator import linuxfc
from cinder.brick.initiator import linuxscsi
from cinder.openstack.common import log as logging
+from cinder.openstack.common import loopingcall
from cinder.openstack.common import processutils as putils
from cinder import test
connection_info['data'])
+class FakeFixedIntervalLoopingCall(object):
+ def __init__(self, f=None, *args, **kw):
+ self.args = args
+ self.kw = kw
+ self.f = f
+ self._stop = False
+
+ def stop(self):
+ self._stop = True
+
+ def wait(self):
+ return self
+
+ def start(self, interval, initial_delay=None):
+ while not self._stop:
+ try:
+ self.f(*self.args, **self.kw)
+ except loopingcall.LoopingCallDone:
+ return self
+ except Exception:
+ LOG.exception(_('in fixed duration looping call'))
+ raise
+
+
class AoEConnectorTestCase(ConnectorTestCase):
"""Test cases for AoE initiator class."""
def setUp(self):
self.connector = connector.AoEConnector()
self.connection_properties = {'target_shelf': 'fake_shelf',
'target_lun': 'fake_lun'}
+ self.stubs.Set(loopingcall,
+ 'FixedIntervalLoopingCall',
+ FakeFixedIntervalLoopingCall)
def tearDown(self):
self.mox.VerifyAll()