From 3320a8943827e82a1576d212b1ea076a9381ef63 Mon Sep 17 00:00:00 2001 From: Vivek Dhayaal Date: Sun, 24 Jan 2016 11:17:37 +0530 Subject: [PATCH] Support ZeroMQ messaging driver in cinder NOTE:This patch introduces support for ZeroMQ driver for cinder single backend case. Multi-backend will be addressed in the next patch as part of the blueprint. CHANGES:ZeroMQ driver requires hostname for message delivery as there is no broker inbetween. So, extract the hostname and feed to the messaging client for zeromq driver. For the record, ZeroMq is a very lightweight distributed messaging system specially designed for high throughput/low latency scenarios. Addition of support for ZeroMQ would help cinder scale out with high performance and be highly available as there is no centralised broker. DocImpact Document the configurations for ZeroMQ driver for Cinder Change-Id: Ic4b4301e5d7ca1692fc91155ba53f2dd12f99311 Closes-Bug: #1440631 partially Implements bp cinder-zeromq-support --- cinder/tests/unit/test_volume_utils.py | 7 ++ cinder/volume/rpcapi.py | 96 +++++++------------ cinder/volume/utils.py | 8 ++ ...omq-messaging-driver-d26a1141290f5548.yaml | 3 + 4 files changed, 53 insertions(+), 61 deletions(-) create mode 100644 releasenotes/notes/support-zeromq-messaging-driver-d26a1141290f5548.yaml diff --git a/cinder/tests/unit/test_volume_utils.py b/cinder/tests/unit/test_volume_utils.py index f425ea773..f461b5fb7 100644 --- a/cinder/tests/unit/test_volume_utils.py +++ b/cinder/tests/unit/test_volume_utils.py @@ -789,6 +789,13 @@ class VolumeUtilsTestCase(test.TestCase): self.assertEqual(pool, volume_utils.extract_host(host, 'pool', True)) + def test_get_volume_rpc_host(self): + host = 'Host@backend' + # default level is 'backend' + # check if host with backend is returned + self.assertEqual(volume_utils.extract_host(host), + volume_utils.get_volume_rpc_host(host)) + def test_append_host(self): host = 'Host' pool = 'Pool' diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py index 7fd60cb86..c0e8e9cce 100644 --- a/cinder/volume/rpcapi.py +++ b/cinder/volume/rpcapi.py @@ -91,22 +91,23 @@ class VolumeAPI(rpc.RPCAPI): TOPIC = CONF.volume_topic BINARY = 'cinder-volume' + def _get_cctxt(self, host, version): + new_host = utils.get_volume_rpc_host(host) + return self.client.prepare(server=new_host, version=version) + def create_consistencygroup(self, ctxt, group, host): - new_host = utils.extract_host(host) - cctxt = self.client.prepare(server=new_host, version='1.26') + cctxt = self._get_cctxt(host, '1.26') cctxt.cast(ctxt, 'create_consistencygroup', group=group) def delete_consistencygroup(self, ctxt, group): - host = utils.extract_host(group.host) - cctxt = self.client.prepare(server=host, version='1.26') + cctxt = self._get_cctxt(group.host, '1.26') cctxt.cast(ctxt, 'delete_consistencygroup', group=group) def update_consistencygroup(self, ctxt, group, add_volumes=None, remove_volumes=None): - host = utils.extract_host(group.host) - cctxt = self.client.prepare(server=host, version='1.26') + cctxt = self._get_cctxt(group.host, '1.26') cctxt.cast(ctxt, 'update_consistencygroup', group=group, add_volumes=add_volumes, @@ -114,21 +115,18 @@ class VolumeAPI(rpc.RPCAPI): def create_consistencygroup_from_src(self, ctxt, group, cgsnapshot=None, source_cg=None): - new_host = utils.extract_host(group.host) - cctxt = self.client.prepare(server=new_host, version='1.31') + cctxt = self._get_cctxt(group.host, '1.31') cctxt.cast(ctxt, 'create_consistencygroup_from_src', group=group, cgsnapshot=cgsnapshot, source_cg=source_cg) def create_cgsnapshot(self, ctxt, cgsnapshot): - host = utils.extract_host(cgsnapshot.consistencygroup.host) - cctxt = self.client.prepare(server=host, version='1.31') + cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, '1.31') cctxt.cast(ctxt, 'create_cgsnapshot', cgsnapshot=cgsnapshot) def delete_cgsnapshot(self, ctxt, cgsnapshot): - new_host = utils.extract_host(cgsnapshot.consistencygroup.host) - cctxt = self.client.prepare(server=new_host, version='1.31') + cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, '1.31') cctxt.cast(ctxt, 'delete_cgsnapshot', cgsnapshot=cgsnapshot) def create_volume(self, ctxt, volume, host, request_spec, @@ -143,8 +141,7 @@ class VolumeAPI(rpc.RPCAPI): else: version = '1.24' - new_host = utils.extract_host(host) - cctxt = self.client.prepare(server=new_host, version=version) + cctxt = self._get_cctxt(host, version) request_spec_p = jsonutils.to_primitive(request_spec) cctxt.cast(ctxt, 'create_volume', **msg_args) @@ -156,27 +153,23 @@ class VolumeAPI(rpc.RPCAPI): else: version = '1.15' - new_host = utils.extract_host(volume.host) - cctxt = self.client.prepare(server=new_host, version=version) + cctxt = self._get_cctxt(volume.host, version) cctxt.cast(ctxt, 'delete_volume', **msg_args) def create_snapshot(self, ctxt, volume, snapshot): - new_host = utils.extract_host(volume['host']) - cctxt = self.client.prepare(server=new_host, version='1.20') + cctxt = self._get_cctxt(volume['host'], version='1.20') cctxt.cast(ctxt, 'create_snapshot', volume_id=volume['id'], snapshot=snapshot) def delete_snapshot(self, ctxt, snapshot, host, unmanage_only=False): - new_host = utils.extract_host(host) - cctxt = self.client.prepare(server=new_host, version='1.20') + cctxt = self._get_cctxt(host, version='1.20') cctxt.cast(ctxt, 'delete_snapshot', snapshot=snapshot, unmanage_only=unmanage_only) def attach_volume(self, ctxt, volume, instance_uuid, host_name, mountpoint, mode): - new_host = utils.extract_host(volume['host']) - cctxt = self.client.prepare(server=new_host, version='1.11') + cctxt = self._get_cctxt(volume['host'], '1.11') return cctxt.call(ctxt, 'attach_volume', volume_id=volume['id'], instance_uuid=instance_uuid, @@ -185,33 +178,28 @@ class VolumeAPI(rpc.RPCAPI): mode=mode) def detach_volume(self, ctxt, volume, attachment_id): - new_host = utils.extract_host(volume['host']) - cctxt = self.client.prepare(server=new_host, version='1.20') + cctxt = self._get_cctxt(volume['host'], '1.20') return cctxt.call(ctxt, 'detach_volume', volume_id=volume['id'], attachment_id=attachment_id) def copy_volume_to_image(self, ctxt, volume, image_meta): - new_host = utils.extract_host(volume['host']) - cctxt = self.client.prepare(server=new_host, version='1.3') + cctxt = self._get_cctxt(volume['host'], '1.3') cctxt.cast(ctxt, 'copy_volume_to_image', volume_id=volume['id'], image_meta=image_meta) def initialize_connection(self, ctxt, volume, connector): - new_host = utils.extract_host(volume['host']) - cctxt = self.client.prepare(server=new_host, version='1.0') + cctxt = self._get_cctxt(volume['host'], version='1.0') return cctxt.call(ctxt, 'initialize_connection', volume_id=volume['id'], connector=connector) def terminate_connection(self, ctxt, volume, connector, force=False): - new_host = utils.extract_host(volume['host']) - cctxt = self.client.prepare(server=new_host, version='1.0') + cctxt = self._get_cctxt(volume['host'], version='1.0') return cctxt.call(ctxt, 'terminate_connection', volume_id=volume['id'], connector=connector, force=force) def remove_export(self, ctxt, volume): - new_host = utils.extract_host(volume['host']) - cctxt = self.client.prepare(server=new_host, version='1.30') + cctxt = self._get_cctxt(volume['host'], '1.30') cctxt.cast(ctxt, 'remove_export', volume_id=volume['id']) def publish_service_capabilities(self, ctxt): @@ -219,13 +207,11 @@ class VolumeAPI(rpc.RPCAPI): cctxt.cast(ctxt, 'publish_service_capabilities') def accept_transfer(self, ctxt, volume, new_user, new_project): - new_host = utils.extract_host(volume['host']) - cctxt = self.client.prepare(server=new_host, version='1.9') + cctxt = self._get_cctxt(volume['host'], '1.9') return cctxt.call(ctxt, 'accept_transfer', volume_id=volume['id'], new_user=new_user, new_project=new_project) def extend_volume(self, ctxt, volume, new_size, reservations): - new_host = utils.extract_host(volume.host) msg_args = {'volume_id': volume.id, 'new_size': new_size, 'reservations': reservations} @@ -235,11 +221,10 @@ class VolumeAPI(rpc.RPCAPI): else: version = '1.14' - cctxt = self.client.prepare(server=new_host, version=version) + cctxt = self._get_cctxt(volume.host, version) cctxt.cast(ctxt, 'extend_volume', **msg_args) def migrate_volume(self, ctxt, volume, dest_host, force_host_copy): - new_host = utils.extract_host(volume.host) host_p = {'host': dest_host.host, 'capabilities': dest_host.capabilities} @@ -251,11 +236,10 @@ class VolumeAPI(rpc.RPCAPI): else: version = '1.8' - cctxt = self.client.prepare(server=new_host, version=version) + cctxt = self._get_cctxt(volume.host, version) cctxt.cast(ctxt, 'migrate_volume', **msg_args) def migrate_volume_completion(self, ctxt, volume, new_volume, error): - new_host = utils.extract_host(volume.host) msg_args = {'volume_id': volume.id, 'new_volume_id': new_volume.id, 'error': error} @@ -266,7 +250,7 @@ class VolumeAPI(rpc.RPCAPI): else: version = '1.10' - cctxt = self.client.prepare(server=new_host, version=version) + cctxt = self._get_cctxt(volume.host, version) return cctxt.call(ctxt, 'migrate_volume_completion', **msg_args) def retype(self, ctxt, volume, new_type_id, dest_host, @@ -287,29 +271,24 @@ class VolumeAPI(rpc.RPCAPI): else: version = '1.12' - new_host = utils.extract_host(volume.host) - cctxt = self.client.prepare(server=new_host, version=version) + cctxt = self._get_cctxt(volume.host, version) cctxt.cast(ctxt, 'retype', **msg_args) def manage_existing(self, ctxt, volume, ref): - new_host = utils.extract_host(volume['host']) - cctxt = self.client.prepare(server=new_host, version='1.15') + cctxt = self._get_cctxt(volume['host'], '1.15') cctxt.cast(ctxt, 'manage_existing', volume_id=volume['id'], ref=ref) def promote_replica(self, ctxt, volume): - new_host = utils.extract_host(volume['host']) - cctxt = self.client.prepare(server=new_host, version='1.17') + cctxt = self._get_cctxt(volume['host'], '1.17') cctxt.cast(ctxt, 'promote_replica', volume_id=volume['id']) def reenable_replication(self, ctxt, volume): - new_host = utils.extract_host(volume['host']) - cctxt = self.client.prepare(server=new_host, version='1.17') + cctxt = self._get_cctxt(volume['host'], '1.17') cctxt.cast(ctxt, 'reenable_replication', volume_id=volume['id']) def update_migrated_volume(self, ctxt, volume, new_volume, original_volume_status): - host = utils.extract_host(new_volume['host']) - cctxt = self.client.prepare(server=host, version='1.36') + cctxt = self._get_cctxt(new_volume['host'], '1.36') cctxt.call(ctxt, 'update_migrated_volume', volume=volume, @@ -317,13 +296,11 @@ class VolumeAPI(rpc.RPCAPI): volume_status=original_volume_status) def enable_replication(self, ctxt, volume): - new_host = utils.extract_host(volume['host']) - cctxt = self.client.prepare(server=new_host, version='1.27') + cctxt = self._get_cctxt(volume['host'], '1.27') cctxt.cast(ctxt, 'enable_replication', volume=volume) def disable_replication(self, ctxt, volume): - new_host = utils.extract_host(volume['host']) - cctxt = self.client.prepare(server=new_host, version='1.27') + cctxt = self._get_cctxt(volume['host'], '1.27') cctxt.cast(ctxt, 'disable_replication', volume=volume) @@ -331,24 +308,21 @@ class VolumeAPI(rpc.RPCAPI): ctxt, volume, secondary=None): - new_host = utils.extract_host(volume['host']) - cctxt = self.client.prepare(server=new_host, version='1.27') + cctxt = self._get_cctxt(volume['host'], '1.27') cctxt.cast(ctxt, 'failover_replication', volume=volume, secondary=secondary) def list_replication_targets(self, ctxt, volume): - new_host = utils.extract_host(volume['host']) - cctxt = self.client.prepare(server=new_host, version='1.27') + cctxt = self._get_cctxt(volume['host'], '1.27') return cctxt.call(ctxt, 'list_replication_targets', volume=volume) def manage_existing_snapshot(self, ctxt, snapshot, ref, host): - cctxt = self.client.prepare(server=host, version='1.28') + cctxt = self._get_cctxt(host, '1.28') cctxt.cast(ctxt, 'manage_existing_snapshot', snapshot=snapshot, ref=ref) def get_capabilities(self, ctxt, host, discover): - new_host = utils.extract_host(host) - cctxt = self.client.prepare(server=new_host, version='1.29') + cctxt = self._get_cctxt(host, '1.29') return cctxt.call(ctxt, 'get_capabilities', discover=discover) diff --git a/cinder/volume/utils.py b/cinder/volume/utils.py index f93c17c37..f5752c914 100644 --- a/cinder/volume/utils.py +++ b/cinder/volume/utils.py @@ -623,6 +623,14 @@ def extract_host(host, level='backend', default_pool_name=False): return None +def get_volume_rpc_host(host): + if CONF.rpc_backend and CONF.rpc_backend == "zmq": + # ZeroMQ RPC driver requires only the hostname. + # So, return just that. + return extract_host(host, 'host') + return extract_host(host) + + def append_host(host, pool): """Encode pool into host info.""" if not host or not pool: diff --git a/releasenotes/notes/support-zeromq-messaging-driver-d26a1141290f5548.yaml b/releasenotes/notes/support-zeromq-messaging-driver-d26a1141290f5548.yaml new file mode 100644 index 000000000..8496e2732 --- /dev/null +++ b/releasenotes/notes/support-zeromq-messaging-driver-d26a1141290f5548.yaml @@ -0,0 +1,3 @@ +--- +features: + - Added support for ZeroMQ messaging driver in cinder single backend config -- 2.45.2