scheduler_driver = CONF.scheduler_driver
self.driver = importutils.import_object(scheduler_driver)
super(SchedulerManager, self).__init__(*args, **kwargs)
+ self.additional_endpoints.append(_SchedulerV2Proxy(self))
self._startup_delay = True
def init_host_with_rpc(self):
rpc.get_notifier("scheduler").error(context,
'scheduler.' + method,
payload)
+
+
+# TODO(dulek): This goes away immediately in Newton and is just present in
+# Mitaka so that we can receive v1.x and v2.0 messages.
+class _SchedulerV2Proxy(object):
+
+ target = messaging.Target(version='2.0')
+
+ def __init__(self, manager):
+ self.manager = manager
+
+ def update_service_capabilities(self, context, service_name=None,
+ host=None, capabilities=None, **kwargs):
+ return self.manager.update_service_capabilities(
+ context, service_name=service_name, host=host,
+ capabilities=capabilities, **kwargs)
+
+ def create_consistencygroup(self, context, topic, group,
+ request_spec_list=None,
+ filter_properties_list=None):
+ return self.manager.create_consistencygroup(
+ context, topic, group, request_spec_list=request_spec_list,
+ filter_properties_list=None)
+
+ def create_volume(self, context, topic, volume_id, snapshot_id=None,
+ image_id=None, request_spec=None, filter_properties=None,
+ volume=None):
+ return self.manager.create_volume(
+ context, topic, volume_id, snapshot_id=snapshot_id,
+ image_id=image_id, request_spec=request_spec,
+ filter_properties=filter_properties, volume=volume)
+
+ def request_service_capabilities(self, context):
+ return self.manager.request_service_capabilities(context)
+
+ def migrate_volume_to_host(self, context, topic, volume_id, host,
+ force_host_copy, request_spec,
+ filter_properties=None, volume=None):
+ return self.manager.migrate_volume_to_host(
+ context, topic, volume_id, host, force_host_copy, request_spec,
+ filter_properties=filter_properties, volume=volume)
+
+ def retype(self, context, topic, volume_id, request_spec,
+ filter_properties=None, volume=None):
+ return self.manager.retype(context, topic, volume_id, request_spec,
+ filter_properties=filter_properties,
+ volume=volume)
+
+ def manage_existing(self, context, topic, volume_id, request_spec,
+ filter_properties=None):
+ return self.manager.manage_existing(
+ context, topic, volume_id, request_spec,
+ filter_properties=filter_properties)
+
+ def get_pools(self, context, filters=None):
+ return self.manager.get_pools(context, filters=filters)
1.10 - Adds support for sending objects over RPC in retype()
1.11 - Adds support for sending objects over RPC in
migrate_volume_to_host()
+
+ ... Mitaka supports messaging 1.11. Any changes to existing methods in
+ 1.x after this point should be done so that they can handle version cap
+ set to 1.11.
+
+ 2.0 - Remove 1.x compatibility
"""
- RPC_API_VERSION = '1.11'
+ RPC_API_VERSION = '2.0'
TOPIC = CONF.scheduler_topic
BINARY = 'cinder-scheduler'
+ def _compat_ver(self, current, legacy):
+ if self.client.can_send_version(current):
+ return current
+ else:
+ return legacy
+
def create_consistencygroup(self, ctxt, topic, group,
request_spec_list=None,
filter_properties_list=None):
-
- cctxt = self.client.prepare(version='1.8')
+ version = self._compat_ver('2.0', '1.8')
+ cctxt = self.client.prepare(version=version)
request_spec_p_list = []
for request_spec in request_spec_list:
request_spec_p = jsonutils.to_primitive(request_spec)
def create_volume(self, ctxt, topic, volume_id, snapshot_id=None,
image_id=None, request_spec=None,
filter_properties=None, volume=None):
-
request_spec_p = jsonutils.to_primitive(request_spec)
msg_args = {'topic': topic, 'volume_id': volume_id,
'snapshot_id': snapshot_id, 'image_id': image_id,
'request_spec': request_spec_p,
'filter_properties': filter_properties}
- if self.client.can_send_version('1.9'):
+ if self.client.can_send_version('2.0'):
+ version = '2.0'
+ msg_args['volume'] = volume
+ elif self.client.can_send_version('1.9'):
version = '1.9'
msg_args['volume'] = volume
else:
'host': host, 'force_host_copy': force_host_copy,
'request_spec': request_spec_p,
'filter_properties': filter_properties}
- if self.client.can_send_version('1.11'):
+ if self.client.can_send_version('2.0'):
+ version = '2.0'
+ msg_args['volume'] = volume
+ elif self.client.can_send_version('1.11'):
version = '1.11'
msg_args['volume'] = volume
else:
msg_args = {'topic': topic, 'volume_id': volume_id,
'request_spec': request_spec_p,
'filter_properties': filter_properties}
- if self.client.can_send_version('1.10'):
+ if self.client.can_send_version('2.0'):
+ version = '2.0'
+ msg_args['volume'] = volume
+ elif self.client.can_send_version('1.10'):
version = '1.10'
msg_args['volume'] = volume
else:
def manage_existing(self, ctxt, topic, volume_id,
request_spec=None, filter_properties=None):
- cctxt = self.client.prepare(version='1.5')
+ version = self._compat_ver('2.0', '1.5')
+ cctxt = self.client.prepare(version=version)
request_spec_p = jsonutils.to_primitive(request_spec)
return cctxt.cast(ctxt, 'manage_existing',
topic=topic,
filter_properties=filter_properties)
def get_pools(self, ctxt, filters=None):
- cctxt = self.client.prepare(version='1.7')
+ version = self._compat_ver('2.0', '1.7')
+ cctxt = self.client.prepare(version=version)
return cctxt.call(ctxt, 'get_pools',
filters=filters)
service_name, host,
capabilities):
# FIXME(flaper87): What to do with fanout?
- cctxt = self.client.prepare(fanout=True, version='1.0')
+ version = self._compat_ver('2.0', '1.0')
+ cctxt = self.client.prepare(fanout=True, version=version)
cctxt.cast(ctxt, 'update_service_capabilities',
service_name=service_name, host=host,
capabilities=capabilities)
for kwarg, value in self.fake_kwargs.items():
self.assertEqual(expected_msg[kwarg], value)
- def test_update_service_capabilities(self):
+ @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
+ def test_update_service_capabilities(self, can_send_version):
+ self._test_scheduler_api('update_service_capabilities',
+ rpc_method='cast',
+ service_name='fake_name',
+ host='fake_host',
+ capabilities='fake_capabilities',
+ fanout=True,
+ version='2.0')
+ can_send_version.assert_called_once_with('2.0')
+
+ @mock.patch('oslo_messaging.RPCClient.can_send_version',
+ return_value=False)
+ def test_update_service_capabilities_old(self, can_send_version):
self._test_scheduler_api('update_service_capabilities',
rpc_method='cast',
service_name='fake_name',
capabilities='fake_capabilities',
fanout=True,
version='1.0')
+ can_send_version.assert_called_once_with('2.0')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume='volume',
- version='1.9')
- can_send_version.assert_called_once_with('1.9')
+ version='2.0')
+ can_send_version.assert_called_once_with('2.0')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
request_spec='fake_request_spec',
filter_properties='filter_properties',
version='1.2')
- can_send_version.assert_called_once_with('1.9')
+ can_send_version.assert_has_calls([mock.call('2.0'), mock.call('1.9')])
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume='volume',
- version='1.11')
- can_send_version.assert_called_once_with('1.11')
+ version='2.0')
+ can_send_version.assert_called_once_with('2.0')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
filter_properties='filter_properties',
volume='volume',
version='1.3')
- can_send_version.assert_called_once_with('1.11')
+ can_send_version.assert_has_calls([mock.call('2.0'),
+ mock.call('1.11')])
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=True)
request_spec='fake_request_spec',
filter_properties='filter_properties',
volume='volume',
- version='1.10')
- can_send_version.assert_called_with('1.10')
+ version='2.0')
+ can_send_version.assert_called_with('2.0')
@mock.patch('oslo_messaging.RPCClient.can_send_version',
return_value=False)
filter_properties='filter_properties',
volume='volume',
version='1.4')
- can_send_version.assert_called_with('1.10')
+ can_send_version.assert_has_calls([mock.call('2.0'),
+ mock.call('1.10')])
+
+ @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
+ def test_manage_existing(self, can_send_version):
+ self._test_scheduler_api('manage_existing',
+ rpc_method='cast',
+ topic='topic',
+ volume_id='volume_id',
+ request_spec='fake_request_spec',
+ filter_properties='filter_properties',
+ version='2.0')
+ can_send_version.assert_called_with('2.0')
- def test_manage_existing(self):
+ @mock.patch('oslo_messaging.RPCClient.can_send_version',
+ return_value=False)
+ def test_manage_existing_old(self, can_send_version):
self._test_scheduler_api('manage_existing',
rpc_method='cast',
topic='topic',
request_spec='fake_request_spec',
filter_properties='filter_properties',
version='1.5')
+ can_send_version.assert_called_with('2.0')
- def test_get_pools(self):
+ @mock.patch('oslo_messaging.RPCClient.can_send_version', return_value=True)
+ def test_get_pools(self, can_send_version):
+ self._test_scheduler_api('get_pools',
+ rpc_method='call',
+ filters=None,
+ version='2.0')
+ can_send_version.assert_called_with('2.0')
+
+ @mock.patch('oslo_messaging.RPCClient.can_send_version',
+ return_value=False)
+ def test_get_pools_old(self, can_send_version):
self._test_scheduler_api('get_pools',
rpc_method='call',
filters=None,
version='1.7')
+ can_send_version.assert_called_with('2.0')