From 2940ce438f76dba20ff45095c90f0a29292c827c Mon Sep 17 00:00:00 2001 From: Zhiteng Huang Date: Sat, 10 Nov 2012 01:32:22 +0800 Subject: [PATCH] Volume RPC API Versioning Add versioning to Volume Rpc API version. This is initial version 1.0, which is compatible with previous non-versioned RPC API. Note: this patch slightly change the db.volume_update() behavior, which now returns updated volume info. Change-Id: I78036b6ed97c5bc369d8c85307ecaaad8e31ff90 --- cinder/db/sqlalchemy/api.py | 1 + cinder/scheduler/chance.py | 7 +- cinder/scheduler/driver.py | 42 ++---- cinder/scheduler/simple.py | 17 ++- cinder/tests/scheduler/test_scheduler.py | 92 +------------ cinder/tests/test_volume_rpcapi.py | 164 +++++++++++++++++++++++ cinder/volume/api.py | 95 +++++-------- cinder/volume/manager.py | 3 + cinder/volume/rpcapi.py | 97 ++++++++++++++ 9 files changed, 324 insertions(+), 194 deletions(-) create mode 100644 cinder/tests/test_volume_rpcapi.py create mode 100644 cinder/volume/rpcapi.py diff --git a/cinder/db/sqlalchemy/api.py b/cinder/db/sqlalchemy/api.py index 8dd3abc28..73374ef21 100644 --- a/cinder/db/sqlalchemy/api.py +++ b/cinder/db/sqlalchemy/api.py @@ -1046,6 +1046,7 @@ def volume_update(context, volume_id, values): volume_ref = volume_get(context, volume_id, session=session) volume_ref.update(values) volume_ref.save(session=session) + return volume_ref #################### diff --git a/cinder/scheduler/chance.py b/cinder/scheduler/chance.py index e5af364f2..363d73857 100644 --- a/cinder/scheduler/chance.py +++ b/cinder/scheduler/chance.py @@ -67,7 +67,6 @@ class ChanceScheduler(driver.Scheduler): snapshot_id = request_spec['snapshot_id'] image_id = request_spec['image_id'] - driver.cast_to_host(context, topic, host, 'create_volume', - volume_id=volume_id, - snapshot_id=snapshot_id, - image_id=image_id) + updated_volume = driver.volume_update_db(context, volume_id, host) + self.volume_rpcapi.create_volume(context, updated_volume, host, + snapshot_id, image_id) diff --git a/cinder/scheduler/driver.py b/cinder/scheduler/driver.py index 9f5646dd9..be1c3f320 100644 --- a/cinder/scheduler/driver.py +++ b/cinder/scheduler/driver.py @@ -25,14 +25,11 @@ from cinder import db from cinder import flags from cinder.openstack.common import cfg from cinder.openstack.common import importutils -from cinder.openstack.common import log as logging -from cinder.openstack.common import rpc from cinder.openstack.common import timeutils from cinder import utils +from cinder.volume import rpcapi as volume_rpcapi -LOG = logging.getLogger(__name__) - scheduler_driver_opts = [ cfg.StrOpt('scheduler_host_manager', default='cinder.scheduler.host_manager.HostManager', @@ -43,36 +40,14 @@ FLAGS = flags.FLAGS FLAGS.register_opts(scheduler_driver_opts) -def cast_to_volume_host(context, host, method, update_db=True, **kwargs): - """Cast request to a volume host queue""" - - if update_db: - volume_id = kwargs.get('volume_id', None) - if volume_id is not None: - now = timeutils.utcnow() - db.volume_update(context, volume_id, - {'host': host, 'scheduled_at': now}) - rpc.cast(context, - rpc.queue_get_for(context, FLAGS.volume_topic, host), - {"method": method, "args": kwargs}) - LOG.debug(_("Casted '%(method)s' to host '%(host)s'") % locals()) - - -def cast_to_host(context, topic, host, method, update_db=True, **kwargs): - """Generic cast to host""" - - topic_mapping = { - "volume": cast_to_volume_host} +def volume_update_db(context, volume_id, host): + '''Set the host and set the scheduled_at field of a volume. - func = topic_mapping.get(topic) - if func: - func(context, host, method, update_db=update_db, **kwargs) - else: - rpc.cast(context, - rpc.queue_get_for(context, topic, host), - {"method": method, "args": kwargs}) - LOG.debug(_("Casted '%(method)s' to %(topic)s '%(host)s'") - % locals()) + :returns: A Volume with the updated fields set properly. + ''' + now = timeutils.utcnow() + values = {'host': host, 'scheduled_at': now} + return db.volume_update(context, volume_id, values) class Scheduler(object): @@ -81,6 +56,7 @@ class Scheduler(object): def __init__(self): self.host_manager = importutils.import_object( FLAGS.scheduler_host_manager) + self.volume_rpcapi = volume_rpcapi.VolumeAPI() def get_host_list(self): """Get a list of hosts from the HostManager.""" diff --git a/cinder/scheduler/simple.py b/cinder/scheduler/simple.py index 6ced96a51..9e138a79f 100644 --- a/cinder/scheduler/simple.py +++ b/cinder/scheduler/simple.py @@ -62,9 +62,11 @@ class SimpleScheduler(chance.ChanceScheduler): service = db.service_get_by_args(elevated, host, topic) if not utils.service_is_up(service): raise exception.WillNotSchedule(host=host) - driver.cast_to_volume_host(context, host, 'create_volume', - volume_id=volume_id, snapshot_id=snapshot_id, - image_id=image_id) + updated_volume = driver.volume_update_db(context, volume_id, host) + self.volume_rpcapi.create_volume(context, updated_volume, + host, + snapshot_id, + image_id) return None results = db.service_get_all_volume_sorted(elevated) @@ -77,9 +79,12 @@ class SimpleScheduler(chance.ChanceScheduler): msg = _("Not enough allocatable volume gigabytes remaining") raise exception.NoValidHost(reason=msg) if utils.service_is_up(service) and not service['disabled']: - driver.cast_to_volume_host(context, service['host'], - 'create_volume', volume_id=volume_id, - snapshot_id=snapshot_id, image_id=image_id) + updated_volume = driver.volume_update_db(context, volume_id, + service['host']) + self.volume_rpcapi.create_volume(context, updated_volume, + service['host'], + snapshot_id, + image_id) return None msg = _("Is the appropriate service running?") raise exception.NoValidHost(reason=msg) diff --git a/cinder/tests/scheduler/test_scheduler.py b/cinder/tests/scheduler/test_scheduler.py index d0026a7c2..97fe9b8fd 100644 --- a/cinder/tests/scheduler/test_scheduler.py +++ b/cinder/tests/scheduler/test_scheduler.py @@ -24,13 +24,13 @@ from cinder import context from cinder import db from cinder import exception from cinder import flags -from cinder.openstack.common import rpc from cinder.openstack.common import timeutils from cinder.scheduler import driver from cinder.scheduler import manager from cinder import test from cinder import utils + FLAGS = flags.FLAGS @@ -179,97 +179,13 @@ class SchedulerDriverModuleTestCase(test.TestCase): super(SchedulerDriverModuleTestCase, self).setUp() self.context = context.RequestContext('fake_user', 'fake_project') - def test_cast_to_volume_host_update_db_with_volume_id(self): - host = 'fake_host1' - method = 'fake_method' - fake_kwargs = {'volume_id': 31337, - 'extra_arg': 'meow'} - queue = 'fake_queue' - + def test_volume_host_update_db(self): self.mox.StubOutWithMock(timeutils, 'utcnow') self.mox.StubOutWithMock(db, 'volume_update') - self.mox.StubOutWithMock(rpc, 'queue_get_for') - self.mox.StubOutWithMock(rpc, 'cast') timeutils.utcnow().AndReturn('fake-now') db.volume_update(self.context, 31337, - {'host': host, 'scheduled_at': 'fake-now'}) - rpc.queue_get_for(self.context, - FLAGS.volume_topic, host).AndReturn(queue) - rpc.cast(self.context, queue, - {'method': method, - 'args': fake_kwargs}) - - self.mox.ReplayAll() - driver.cast_to_volume_host(self.context, host, method, - update_db=True, **fake_kwargs) - - def test_cast_to_volume_host_update_db_without_volume_id(self): - host = 'fake_host1' - method = 'fake_method' - fake_kwargs = {'extra_arg': 'meow'} - queue = 'fake_queue' - - self.mox.StubOutWithMock(rpc, 'queue_get_for') - self.mox.StubOutWithMock(rpc, 'cast') - - rpc.queue_get_for(self.context, - FLAGS.volume_topic, host).AndReturn(queue) - rpc.cast(self.context, queue, - {'method': method, - 'args': fake_kwargs}) - - self.mox.ReplayAll() - driver.cast_to_volume_host(self.context, host, method, - update_db=True, **fake_kwargs) - - def test_cast_to_volume_host_no_update_db(self): - host = 'fake_host1' - method = 'fake_method' - fake_kwargs = {'extra_arg': 'meow'} - queue = 'fake_queue' - - self.mox.StubOutWithMock(rpc, 'queue_get_for') - self.mox.StubOutWithMock(rpc, 'cast') - - rpc.queue_get_for(self.context, - FLAGS.volume_topic, host).AndReturn(queue) - rpc.cast(self.context, queue, - {'method': method, - 'args': fake_kwargs}) - - self.mox.ReplayAll() - driver.cast_to_volume_host(self.context, host, method, - update_db=False, **fake_kwargs) - - def test_cast_to_host_volume_topic(self): - host = 'fake_host1' - method = 'fake_method' - fake_kwargs = {'extra_arg': 'meow'} - - self.mox.StubOutWithMock(driver, 'cast_to_volume_host') - driver.cast_to_volume_host(self.context, host, method, - update_db=False, **fake_kwargs) - - self.mox.ReplayAll() - driver.cast_to_host(self.context, 'volume', host, method, - update_db=False, **fake_kwargs) - - def test_cast_to_host_unknown_topic(self): - host = 'fake_host1' - method = 'fake_method' - fake_kwargs = {'extra_arg': 'meow'} - topic = 'unknown' - queue = 'fake_queue' - - self.mox.StubOutWithMock(rpc, 'queue_get_for') - self.mox.StubOutWithMock(rpc, 'cast') - - rpc.queue_get_for(self.context, topic, host).AndReturn(queue) - rpc.cast(self.context, queue, - {'method': method, - 'args': fake_kwargs}) + {'host': 'fake_host', 'scheduled_at': 'fake-now'}) self.mox.ReplayAll() - driver.cast_to_host(self.context, topic, host, method, - update_db=False, **fake_kwargs) + driver.volume_update_db(self.context, 31337, 'fake_host') diff --git a/cinder/tests/test_volume_rpcapi.py b/cinder/tests/test_volume_rpcapi.py new file mode 100644 index 000000000..4cd571f6e --- /dev/null +++ b/cinder/tests/test_volume_rpcapi.py @@ -0,0 +1,164 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012, Intel, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Unit Tests for cinder.volume.rpcapi +""" + + +from cinder import context +from cinder import db +from cinder import flags +from cinder.openstack.common import jsonutils +from cinder.openstack.common import rpc +from cinder import test +from cinder.volume import rpcapi as volume_rpcapi + + +FLAGS = flags.FLAGS + + +class VolumeRpcAPITestCase(test.TestCase): + + def setUp(self): + self.context = context.get_admin_context() + vol = {} + vol['host'] = 'fake_host' + vol['availability_zone'] = FLAGS.storage_availability_zone + vol['status'] = "available" + vol['attach_status'] = "detached" + volume = db.volume_create(self.context, vol) + + snpshot = { + 'volume_id': 'fake_id', + 'status': "creating", + 'progress': '0%', + 'volume_size': 0, + 'display_name': 'fake_name', + 'display_description': 'fake_description'} + snapshot = db.snapshot_create(self.context, snpshot) + self.fake_volume = jsonutils.to_primitive(volume) + self.fake_snapshot = jsonutils.to_primitive(snapshot) + super(VolumeRpcAPITestCase, self).setUp() + + def test_serialized_volume_has_id(self): + self.assertTrue('id' in self.fake_volume) + + def _test_volume_api(self, method, rpc_method, **kwargs): + ctxt = context.RequestContext('fake_user', 'fake_project') + + if 'rpcapi_class' in kwargs: + rpcapi_class = kwargs['rpcapi_class'] + del kwargs['rpcapi_class'] + else: + rpcapi_class = volume_rpcapi.VolumeAPI + rpcapi = rpcapi_class() + expected_retval = 'foo' if method == 'call' else None + + expected_version = kwargs.pop('version', rpcapi.BASE_RPC_API_VERSION) + expected_msg = rpcapi.make_msg(method, **kwargs) + if 'volume' in expected_msg['args']: + volume = expected_msg['args']['volume'] + del expected_msg['args']['volume'] + expected_msg['args']['volume_id'] = volume['id'] + if 'snapshot' in expected_msg['args']: + snapshot = expected_msg['args']['snapshot'] + del expected_msg['args']['snapshot'] + expected_msg['args']['snapshot_id'] = snapshot['id'] + if 'host' in expected_msg['args']: + del expected_msg['args']['host'] + + expected_msg['version'] = expected_version + + if 'host' in kwargs: + host = kwargs['host'] + else: + host = kwargs['volume']['host'] + expected_topic = '%s.%s' % (FLAGS.volume_topic, host) + + self.fake_args = None + self.fake_kwargs = None + + def _fake_rpc_method(*args, **kwargs): + self.fake_args = args + self.fake_kwargs = kwargs + if expected_retval: + return expected_retval + + self.stubs.Set(rpc, rpc_method, _fake_rpc_method) + + retval = getattr(rpcapi, method)(ctxt, **kwargs) + + self.assertEqual(retval, expected_retval) + expected_args = [ctxt, expected_topic, expected_msg] + for arg, expected_arg in zip(self.fake_args, expected_args): + self.assertEqual(arg, expected_arg) + + def test_create_volume(self): + self._test_volume_api('create_volume', + rpc_method='cast', + volume=self.fake_volume, + host='fake_host1', + snapshot_id='fake_snapshot_id', + image_id='fake_image_id') + + def test_delete_volume(self): + self._test_volume_api('delete_volume', + rpc_method='cast', + volume=self.fake_volume) + + def test_create_snapshot(self): + self._test_volume_api('create_snapshot', + rpc_method='cast', + volume=self.fake_volume, + snapshot=self.fake_snapshot) + + def test_delete_snapshot(self): + self._test_volume_api('delete_snapshot', + rpc_method='cast', + snapshot=self.fake_snapshot, + host='fake_host') + + def test_attach_volume(self): + self._test_volume_api('attach_volume', + rpc_method='call', + volume=self.fake_volume, + instance_uuid='fake_uuid', + mountpoint='fake_mountpoint') + + def test_detach_volume(self): + self._test_volume_api('detach_volume', + rpc_method='call', + volume=self.fake_volume) + + def test_copy_volume_to_image(self): + self._test_volume_api('copy_volume_to_image', + rpc_method='cast', + volume=self.fake_volume, + image_id='fake_image_id') + + def test_initialize_connection(self): + self._test_volume_api('initialize_connection', + rpc_method='call', + volume=self.fake_volume, + connector='fake_connector') + + def test_terminate_connection(self): + self._test_volume_api('terminate_connection', + rpc_method='call', + volume=self.fake_volume, + connector='fake_connector', + force=False) diff --git a/cinder/volume/api.py b/cinder/volume/api.py index 7ba5969bb..faed8a35b 100644 --- a/cinder/volume/api.py +++ b/cinder/volume/api.py @@ -34,7 +34,7 @@ from cinder.openstack.common import timeutils import cinder.policy from cinder import quota from cinder.scheduler import rpcapi as scheduler_rpcapi -from cinder.volume import volume_types +from cinder.volume import rpcapi as volume_rpcapi from cinder.volume import volume_types volume_host_opt = cfg.BoolOpt('snapshot_same_host', @@ -81,6 +81,7 @@ class API(base.Base): self.image_service = (image_service or glance.get_default_image_service()) self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI() + self.volume_rpcapi = volume_rpcapi.VolumeAPI() super(API, self).__init__(db_driver) def create(self, context, size, name, description, snapshot=None, @@ -207,24 +208,20 @@ class API(base.Base): snapshot_ref = self.db.snapshot_get(context, snapshot_id) src_volume_ref = self.db.volume_get(context, snapshot_ref['volume_id']) - topic = rpc.queue_get_for(context, - FLAGS.volume_topic, - src_volume_ref['host']) # bypass scheduler and send request directly to volume - rpc.cast(context, - topic, - {"method": "create_volume", - "args": {"volume_id": volume_id, - "snapshot_id": snapshot_id, - "image_id": image_id}}) + self.volume_rpcapi.create_volume(context, + src_volume_ref, + src_volume_ref['host'], + snapshot_id, + image_id) else: self.scheduler_rpcapi.create_volume(context, - FLAGS.volume_topic, - volume_id, - snapshot_id, - image_id, - request_spec=request_spec, - filter_properties=filter_properties) + FLAGS.volume_topic, + volume_id, + snapshot_id, + image_id, + request_spec=request_spec, + filter_properties=filter_properties) @wrap_check_policy def delete(self, context, volume, force=False): @@ -256,11 +253,8 @@ class API(base.Base): now = timeutils.utcnow() self.db.volume_update(context, volume_id, {'status': 'deleting', 'terminated_at': now}) - host = volume['host'] - rpc.cast(context, - rpc.queue_get_for(context, FLAGS.volume_topic, host), - {"method": "delete_volume", - "args": {"volume_id": volume_id}}) + + self.volume_rpcapi.delete_volume(context, volume) @wrap_check_policy def update(self, context, volume, fields): @@ -388,40 +382,28 @@ class API(base.Base): @wrap_check_policy def attach(self, context, volume, instance_uuid, mountpoint): - host = volume['host'] - queue = rpc.queue_get_for(context, FLAGS.volume_topic, host) - return rpc.call(context, queue, - {"method": "attach_volume", - "args": {"volume_id": volume['id'], - "instance_uuid": instance_uuid, - "mountpoint": mountpoint}}) + return self.volume_rpcapi.attach_volume(context, + volume, + instance_uuid, + mountpoint) @wrap_check_policy def detach(self, context, volume): - host = volume['host'] - queue = rpc.queue_get_for(context, FLAGS.volume_topic, host) - return rpc.call(context, queue, - {"method": "detach_volume", - "args": {"volume_id": volume['id']}}) + return self.volume_rpcapi.detach_volume(context, volume) @wrap_check_policy def initialize_connection(self, context, volume, connector): - host = volume['host'] - queue = rpc.queue_get_for(context, FLAGS.volume_topic, host) - return rpc.call(context, queue, - {"method": "initialize_connection", - "args": {"volume_id": volume['id'], - "connector": connector}}) + return self.volume_rpcapi.initialize_connection(context, + volume, + connector) @wrap_check_policy def terminate_connection(self, context, volume, connector, force=False): self.unreserve_volume(context, volume) - host = volume['host'] - queue = rpc.queue_get_for(context, FLAGS.volume_topic, host) - return rpc.call(context, queue, - {"method": "terminate_connection", - "args": {"volume_id": volume['id'], - "connector": connector, 'force': force}}) + return self.volume_rpcapi.terminate_connection(context, + volume, + connector, + force) def _create_snapshot(self, context, volume, name, description, force=False): @@ -442,12 +424,8 @@ class API(base.Base): 'display_description': description} snapshot = self.db.snapshot_create(context, options) - host = volume['host'] - rpc.cast(context, - rpc.queue_get_for(context, FLAGS.volume_topic, host), - {"method": "create_snapshot", - "args": {"volume_id": volume['id'], - "snapshot_id": snapshot['id']}}) + self.volume_rpcapi.create_snapshot(context, volume, snapshot) + return snapshot def create_snapshot(self, context, volume, name, description): @@ -466,11 +444,7 @@ class API(base.Base): self.db.snapshot_update(context, snapshot['id'], {'status': 'deleting'}) volume = self.db.volume_get(context, snapshot['volume_id']) - host = volume['host'] - rpc.cast(context, - rpc.queue_get_for(context, FLAGS.volume_topic, host), - {"method": "delete_snapshot", - "args": {"snapshot_id": snapshot['id']}}) + self.volume_rpcapi.delete_snapshot(context, snapshot, volume['host']) @wrap_check_policy def update_snapshot(self, context, snapshot, fields): @@ -529,13 +503,8 @@ class API(base.Base): recv_metadata = self.image_service.create(context, metadata) self.update(context, volume, {'status': 'uploading'}) - rpc.cast(context, - rpc.queue_get_for(context, - FLAGS.volume_topic, - volume['host']), - {"method": "copy_volume_to_image", - "args": {"volume_id": volume['id'], - "image_id": recv_metadata['id']}}) + self.volume_rpcapi.copy_volume_to_image(context, volume, + recv_metadata['id']) response = {"id": volume['id'], "updated_at": volume['updated_at'], diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index 378259e10..7e5c2bb56 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -77,6 +77,9 @@ MAPPING = { class VolumeManager(manager.SchedulerDependentManager): """Manages attachable block storage devices.""" + + RPC_API_VERSION = '1.0' + def __init__(self, volume_driver=None, *args, **kwargs): """Load the driver from the one specified in args, or from flags.""" if not volume_driver: diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py new file mode 100644 index 000000000..640875c04 --- /dev/null +++ b/cinder/volume/rpcapi.py @@ -0,0 +1,97 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012, Intel, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Client side of the volume RPC API. +""" + +from cinder import exception +from cinder import flags +from cinder.openstack.common import rpc +import cinder.openstack.common.rpc.proxy + + +FLAGS = flags.FLAGS + + +class VolumeAPI(cinder.openstack.common.rpc.proxy.RpcProxy): + '''Client side of the volume rpc API. + + API version history: + + 1.0 - Initial version. + ''' + + BASE_RPC_API_VERSION = '1.0' + + def __init__(self): + super(VolumeAPI, self).__init__(topic=FLAGS.volume_topic, + default_version=self.BASE_RPC_API_VERSION) + + def create_volume(self, ctxt, volume, host, + snapshot_id=None, image_id=None): + self.cast(ctxt, self.make_msg('create_volume', + volume_id=volume['id'], + snapshot_id=snapshot_id, + image_id=image_id), + topic=rpc.queue_get_for(ctxt, self.topic, host)) + + def delete_volume(self, ctxt, volume): + self.cast(ctxt, self.make_msg('delete_volume', + volume_id=volume['id']), + topic=rpc.queue_get_for(ctxt, self.topic, volume['host'])) + + def create_snapshot(self, ctxt, volume, snapshot): + self.cast(ctxt, self.make_msg('create_snapshot', + volume_id=volume['id'], + snapshot_id=snapshot['id']), + topic=rpc.queue_get_for(ctxt, self.topic, volume['host'])) + + def delete_snapshot(self, ctxt, snapshot, host): + self.cast(ctxt, self.make_msg('delete_snapshot', + snapshot_id=snapshot['id']), + topic=rpc.queue_get_for(ctxt, self.topic, host)) + + def attach_volume(self, ctxt, volume, instance_uuid, mountpoint): + return self.call(ctxt, self.make_msg('attach_volume', + volume_id=volume['id'], + instance_uuid=instance_uuid, + mountpoint=mountpoint), + topic=rpc.queue_get_for(ctxt, self.topic, volume['host'])) + + def detach_volume(self, ctxt, volume): + return self.call(ctxt, self.make_msg('detach_volume', + volume_id=volume['id']), + topic=rpc.queue_get_for(ctxt, self.topic, volume['host'])) + + def copy_volume_to_image(self, ctxt, volume, image_id): + self.cast(ctxt, self.make_msg('copy_volume_to_image', + volume_id=volume['id'], + image_id=image_id), + topic=rpc.queue_get_for(ctxt, self.topic, volume['host'])) + + def initialize_connection(self, ctxt, volume, connector): + return self.call(ctxt, self.make_msg('initialize_connection', + volume_id=volume['id'], + connector=connector), + topic=rpc.queue_get_for(ctxt, self.topic, volume['host'])) + + def terminate_connection(self, ctxt, volume, connector, force=False): + return self.call(ctxt, self.make_msg('terminate_connection', + volume_id=volume['id'], + connector=connector, + force=force), + topic=rpc.queue_get_for(ctxt, self.topic, volume['host'])) -- 2.45.2