]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Support ZeroMQ messaging driver in cinder
authorVivek Dhayaal <vivekdhayaal@gmail.com>
Sun, 24 Jan 2016 05:47:37 +0000 (11:17 +0530)
committerVivek Dhayaal <vivekdhayaal@gmail.com>
Wed, 27 Jan 2016 08:02:46 +0000 (13:32 +0530)
NOTE:This patch introduces support for ZeroMQ driver for cinder single
backend case.
Multi-backend will be addressed in the next patch as part of the
blueprint.

CHANGES:ZeroMQ driver requires hostname for message delivery as there is no
broker inbetween.
So, extract the hostname and feed to the messaging client for zeromq
driver.

For the record, ZeroMq is a very lightweight distributed messaging system
specially designed for high throughput/low latency scenarios.
Addition of support for ZeroMQ would help cinder scale out with high
performance and be highly available as there is no centralised broker.

DocImpact
Document the configurations for ZeroMQ driver for Cinder

Change-Id: Ic4b4301e5d7ca1692fc91155ba53f2dd12f99311
Closes-Bug: #1440631
partially Implements bp cinder-zeromq-support

cinder/tests/unit/test_volume_utils.py
cinder/volume/rpcapi.py
cinder/volume/utils.py
releasenotes/notes/support-zeromq-messaging-driver-d26a1141290f5548.yaml [new file with mode: 0644]

index f425ea7736131916df725a08b1e4dc993c5dc940..f461b5fb7b09ad47cfe28956bb01668b684ee772 100644 (file)
@@ -789,6 +789,13 @@ class VolumeUtilsTestCase(test.TestCase):
         self.assertEqual(pool,
                          volume_utils.extract_host(host, 'pool', True))
 
+    def test_get_volume_rpc_host(self):
+        host = 'Host@backend'
+        # default level is 'backend'
+        # check if host with backend is returned
+        self.assertEqual(volume_utils.extract_host(host),
+                         volume_utils.get_volume_rpc_host(host))
+
     def test_append_host(self):
         host = 'Host'
         pool = 'Pool'
index 7fd60cb86b0bfdf03c77ce327e4034f3b2617f0d..c0e8e9cce5e80a497366e1771669c3128ef10f11 100644 (file)
@@ -91,22 +91,23 @@ class VolumeAPI(rpc.RPCAPI):
     TOPIC = CONF.volume_topic
     BINARY = 'cinder-volume'
 
+    def _get_cctxt(self, host, version):
+        new_host = utils.get_volume_rpc_host(host)
+        return self.client.prepare(server=new_host, version=version)
+
     def create_consistencygroup(self, ctxt, group, host):
-        new_host = utils.extract_host(host)
-        cctxt = self.client.prepare(server=new_host, version='1.26')
+        cctxt = self._get_cctxt(host, '1.26')
         cctxt.cast(ctxt, 'create_consistencygroup',
                    group=group)
 
     def delete_consistencygroup(self, ctxt, group):
-        host = utils.extract_host(group.host)
-        cctxt = self.client.prepare(server=host, version='1.26')
+        cctxt = self._get_cctxt(group.host, '1.26')
         cctxt.cast(ctxt, 'delete_consistencygroup',
                    group=group)
 
     def update_consistencygroup(self, ctxt, group, add_volumes=None,
                                 remove_volumes=None):
-        host = utils.extract_host(group.host)
-        cctxt = self.client.prepare(server=host, version='1.26')
+        cctxt = self._get_cctxt(group.host, '1.26')
         cctxt.cast(ctxt, 'update_consistencygroup',
                    group=group,
                    add_volumes=add_volumes,
@@ -114,21 +115,18 @@ class VolumeAPI(rpc.RPCAPI):
 
     def create_consistencygroup_from_src(self, ctxt, group, cgsnapshot=None,
                                          source_cg=None):
-        new_host = utils.extract_host(group.host)
-        cctxt = self.client.prepare(server=new_host, version='1.31')
+        cctxt = self._get_cctxt(group.host, '1.31')
         cctxt.cast(ctxt, 'create_consistencygroup_from_src',
                    group=group,
                    cgsnapshot=cgsnapshot,
                    source_cg=source_cg)
 
     def create_cgsnapshot(self, ctxt, cgsnapshot):
-        host = utils.extract_host(cgsnapshot.consistencygroup.host)
-        cctxt = self.client.prepare(server=host, version='1.31')
+        cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, '1.31')
         cctxt.cast(ctxt, 'create_cgsnapshot', cgsnapshot=cgsnapshot)
 
     def delete_cgsnapshot(self, ctxt, cgsnapshot):
-        new_host = utils.extract_host(cgsnapshot.consistencygroup.host)
-        cctxt = self.client.prepare(server=new_host, version='1.31')
+        cctxt = self._get_cctxt(cgsnapshot.consistencygroup.host, '1.31')
         cctxt.cast(ctxt, 'delete_cgsnapshot', cgsnapshot=cgsnapshot)
 
     def create_volume(self, ctxt, volume, host, request_spec,
@@ -143,8 +141,7 @@ class VolumeAPI(rpc.RPCAPI):
         else:
             version = '1.24'
 
-        new_host = utils.extract_host(host)
-        cctxt = self.client.prepare(server=new_host, version=version)
+        cctxt = self._get_cctxt(host, version)
         request_spec_p = jsonutils.to_primitive(request_spec)
         cctxt.cast(ctxt, 'create_volume', **msg_args)
 
@@ -156,27 +153,23 @@ class VolumeAPI(rpc.RPCAPI):
         else:
             version = '1.15'
 
-        new_host = utils.extract_host(volume.host)
-        cctxt = self.client.prepare(server=new_host, version=version)
+        cctxt = self._get_cctxt(volume.host, version)
         cctxt.cast(ctxt, 'delete_volume', **msg_args)
 
     def create_snapshot(self, ctxt, volume, snapshot):
-        new_host = utils.extract_host(volume['host'])
-        cctxt = self.client.prepare(server=new_host, version='1.20')
+        cctxt = self._get_cctxt(volume['host'], version='1.20')
         cctxt.cast(ctxt, 'create_snapshot', volume_id=volume['id'],
                    snapshot=snapshot)
 
     def delete_snapshot(self, ctxt, snapshot, host, unmanage_only=False):
-        new_host = utils.extract_host(host)
-        cctxt = self.client.prepare(server=new_host, version='1.20')
+        cctxt = self._get_cctxt(host, version='1.20')
         cctxt.cast(ctxt, 'delete_snapshot', snapshot=snapshot,
                    unmanage_only=unmanage_only)
 
     def attach_volume(self, ctxt, volume, instance_uuid, host_name,
                       mountpoint, mode):
 
-        new_host = utils.extract_host(volume['host'])
-        cctxt = self.client.prepare(server=new_host, version='1.11')
+        cctxt = self._get_cctxt(volume['host'], '1.11')
         return cctxt.call(ctxt, 'attach_volume',
                           volume_id=volume['id'],
                           instance_uuid=instance_uuid,
@@ -185,33 +178,28 @@ class VolumeAPI(rpc.RPCAPI):
                           mode=mode)
 
     def detach_volume(self, ctxt, volume, attachment_id):
-        new_host = utils.extract_host(volume['host'])
-        cctxt = self.client.prepare(server=new_host, version='1.20')
+        cctxt = self._get_cctxt(volume['host'], '1.20')
         return cctxt.call(ctxt, 'detach_volume', volume_id=volume['id'],
                           attachment_id=attachment_id)
 
     def copy_volume_to_image(self, ctxt, volume, image_meta):
-        new_host = utils.extract_host(volume['host'])
-        cctxt = self.client.prepare(server=new_host, version='1.3')
+        cctxt = self._get_cctxt(volume['host'], '1.3')
         cctxt.cast(ctxt, 'copy_volume_to_image', volume_id=volume['id'],
                    image_meta=image_meta)
 
     def initialize_connection(self, ctxt, volume, connector):
-        new_host = utils.extract_host(volume['host'])
-        cctxt = self.client.prepare(server=new_host, version='1.0')
+        cctxt = self._get_cctxt(volume['host'], version='1.0')
         return cctxt.call(ctxt, 'initialize_connection',
                           volume_id=volume['id'],
                           connector=connector)
 
     def terminate_connection(self, ctxt, volume, connector, force=False):
-        new_host = utils.extract_host(volume['host'])
-        cctxt = self.client.prepare(server=new_host, version='1.0')
+        cctxt = self._get_cctxt(volume['host'], version='1.0')
         return cctxt.call(ctxt, 'terminate_connection', volume_id=volume['id'],
                           connector=connector, force=force)
 
     def remove_export(self, ctxt, volume):
-        new_host = utils.extract_host(volume['host'])
-        cctxt = self.client.prepare(server=new_host, version='1.30')
+        cctxt = self._get_cctxt(volume['host'], '1.30')
         cctxt.cast(ctxt, 'remove_export', volume_id=volume['id'])
 
     def publish_service_capabilities(self, ctxt):
@@ -219,13 +207,11 @@ class VolumeAPI(rpc.RPCAPI):
         cctxt.cast(ctxt, 'publish_service_capabilities')
 
     def accept_transfer(self, ctxt, volume, new_user, new_project):
-        new_host = utils.extract_host(volume['host'])
-        cctxt = self.client.prepare(server=new_host, version='1.9')
+        cctxt = self._get_cctxt(volume['host'], '1.9')
         return cctxt.call(ctxt, 'accept_transfer', volume_id=volume['id'],
                           new_user=new_user, new_project=new_project)
 
     def extend_volume(self, ctxt, volume, new_size, reservations):
-        new_host = utils.extract_host(volume.host)
 
         msg_args = {'volume_id': volume.id, 'new_size': new_size,
                     'reservations': reservations}
@@ -235,11 +221,10 @@ class VolumeAPI(rpc.RPCAPI):
         else:
             version = '1.14'
 
-        cctxt = self.client.prepare(server=new_host, version=version)
+        cctxt = self._get_cctxt(volume.host, version)
         cctxt.cast(ctxt, 'extend_volume', **msg_args)
 
     def migrate_volume(self, ctxt, volume, dest_host, force_host_copy):
-        new_host = utils.extract_host(volume.host)
         host_p = {'host': dest_host.host,
                   'capabilities': dest_host.capabilities}
 
@@ -251,11 +236,10 @@ class VolumeAPI(rpc.RPCAPI):
         else:
             version = '1.8'
 
-        cctxt = self.client.prepare(server=new_host, version=version)
+        cctxt = self._get_cctxt(volume.host, version)
         cctxt.cast(ctxt, 'migrate_volume', **msg_args)
 
     def migrate_volume_completion(self, ctxt, volume, new_volume, error):
-        new_host = utils.extract_host(volume.host)
 
         msg_args = {'volume_id': volume.id, 'new_volume_id': new_volume.id,
                     'error': error}
@@ -266,7 +250,7 @@ class VolumeAPI(rpc.RPCAPI):
         else:
             version = '1.10'
 
-        cctxt = self.client.prepare(server=new_host, version=version)
+        cctxt = self._get_cctxt(volume.host, version)
         return cctxt.call(ctxt, 'migrate_volume_completion', **msg_args)
 
     def retype(self, ctxt, volume, new_type_id, dest_host,
@@ -287,29 +271,24 @@ class VolumeAPI(rpc.RPCAPI):
             else:
                 version = '1.12'
 
-        new_host = utils.extract_host(volume.host)
-        cctxt = self.client.prepare(server=new_host, version=version)
+        cctxt = self._get_cctxt(volume.host, version)
         cctxt.cast(ctxt, 'retype', **msg_args)
 
     def manage_existing(self, ctxt, volume, ref):
-        new_host = utils.extract_host(volume['host'])
-        cctxt = self.client.prepare(server=new_host, version='1.15')
+        cctxt = self._get_cctxt(volume['host'], '1.15')
         cctxt.cast(ctxt, 'manage_existing', volume_id=volume['id'], ref=ref)
 
     def promote_replica(self, ctxt, volume):
-        new_host = utils.extract_host(volume['host'])
-        cctxt = self.client.prepare(server=new_host, version='1.17')
+        cctxt = self._get_cctxt(volume['host'], '1.17')
         cctxt.cast(ctxt, 'promote_replica', volume_id=volume['id'])
 
     def reenable_replication(self, ctxt, volume):
-        new_host = utils.extract_host(volume['host'])
-        cctxt = self.client.prepare(server=new_host, version='1.17')
+        cctxt = self._get_cctxt(volume['host'], '1.17')
         cctxt.cast(ctxt, 'reenable_replication', volume_id=volume['id'])
 
     def update_migrated_volume(self, ctxt, volume, new_volume,
                                original_volume_status):
-        host = utils.extract_host(new_volume['host'])
-        cctxt = self.client.prepare(server=host, version='1.36')
+        cctxt = self._get_cctxt(new_volume['host'], '1.36')
         cctxt.call(ctxt,
                    'update_migrated_volume',
                    volume=volume,
@@ -317,13 +296,11 @@ class VolumeAPI(rpc.RPCAPI):
                    volume_status=original_volume_status)
 
     def enable_replication(self, ctxt, volume):
-        new_host = utils.extract_host(volume['host'])
-        cctxt = self.client.prepare(server=new_host, version='1.27')
+        cctxt = self._get_cctxt(volume['host'], '1.27')
         cctxt.cast(ctxt, 'enable_replication', volume=volume)
 
     def disable_replication(self, ctxt, volume):
-        new_host = utils.extract_host(volume['host'])
-        cctxt = self.client.prepare(server=new_host, version='1.27')
+        cctxt = self._get_cctxt(volume['host'], '1.27')
         cctxt.cast(ctxt, 'disable_replication',
                    volume=volume)
 
@@ -331,24 +308,21 @@ class VolumeAPI(rpc.RPCAPI):
                              ctxt,
                              volume,
                              secondary=None):
-        new_host = utils.extract_host(volume['host'])
-        cctxt = self.client.prepare(server=new_host, version='1.27')
+        cctxt = self._get_cctxt(volume['host'], '1.27')
         cctxt.cast(ctxt, 'failover_replication',
                    volume=volume,
                    secondary=secondary)
 
     def list_replication_targets(self, ctxt, volume):
-        new_host = utils.extract_host(volume['host'])
-        cctxt = self.client.prepare(server=new_host, version='1.27')
+        cctxt = self._get_cctxt(volume['host'], '1.27')
         return cctxt.call(ctxt, 'list_replication_targets', volume=volume)
 
     def manage_existing_snapshot(self, ctxt, snapshot, ref, host):
-        cctxt = self.client.prepare(server=host, version='1.28')
+        cctxt = self._get_cctxt(host, '1.28')
         cctxt.cast(ctxt, 'manage_existing_snapshot',
                    snapshot=snapshot,
                    ref=ref)
 
     def get_capabilities(self, ctxt, host, discover):
-        new_host = utils.extract_host(host)
-        cctxt = self.client.prepare(server=new_host, version='1.29')
+        cctxt = self._get_cctxt(host, '1.29')
         return cctxt.call(ctxt, 'get_capabilities', discover=discover)
index f93c17c37ed7ec21656e8df0cd4d7d37e5107432..f5752c914032ade62ed2e8ee52c93ad578416af8 100644 (file)
@@ -623,6 +623,14 @@ def extract_host(host, level='backend', default_pool_name=False):
             return None
 
 
+def get_volume_rpc_host(host):
+    if CONF.rpc_backend and CONF.rpc_backend == "zmq":
+        # ZeroMQ RPC driver requires only the hostname.
+        # So, return just that.
+        return extract_host(host, 'host')
+    return extract_host(host)
+
+
 def append_host(host, pool):
     """Encode pool into host info."""
     if not host or not pool:
diff --git a/releasenotes/notes/support-zeromq-messaging-driver-d26a1141290f5548.yaml b/releasenotes/notes/support-zeromq-messaging-driver-d26a1141290f5548.yaml
new file mode 100644 (file)
index 0000000..8496e27
--- /dev/null
@@ -0,0 +1,3 @@
+---
+features:
+  - Added support for ZeroMQ messaging driver in cinder single backend config