]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Volume RPC API Versioning
authorZhiteng Huang <zhiteng.huang@intel.com>
Fri, 9 Nov 2012 17:32:22 +0000 (01:32 +0800)
committerZhiteng Huang <zhiteng.huang@intel.com>
Thu, 15 Nov 2012 15:50:45 +0000 (23:50 +0800)
Add versioning to Volume Rpc API version.  This is initial version
1.0, which is compatible with previous non-versioned RPC API.

Note: this patch slightly change the db.volume_update() behavior,
which now returns updated volume info.

Change-Id: I78036b6ed97c5bc369d8c85307ecaaad8e31ff90

cinder/db/sqlalchemy/api.py
cinder/scheduler/chance.py
cinder/scheduler/driver.py
cinder/scheduler/simple.py
cinder/tests/scheduler/test_scheduler.py
cinder/tests/test_volume_rpcapi.py [new file with mode: 0644]
cinder/volume/api.py
cinder/volume/manager.py
cinder/volume/rpcapi.py [new file with mode: 0644]

index 8dd3abc283becee3f040b232065901de26ff4f92..73374ef21d5c908375f94df32dc38408a2381403 100644 (file)
@@ -1046,6 +1046,7 @@ def volume_update(context, volume_id, values):
         volume_ref = volume_get(context, volume_id, session=session)
         volume_ref.update(values)
         volume_ref.save(session=session)
+        return volume_ref
 
 
 ####################
index e5af364f2765f0f79e321d21ac98c9923ed9f252..363d73857f80cbd49405608a8121c980ac8eead9 100644 (file)
@@ -67,7 +67,6 @@ class ChanceScheduler(driver.Scheduler):
         snapshot_id = request_spec['snapshot_id']
         image_id = request_spec['image_id']
 
-        driver.cast_to_host(context, topic, host, 'create_volume',
-                            volume_id=volume_id,
-                            snapshot_id=snapshot_id,
-                            image_id=image_id)
+        updated_volume = driver.volume_update_db(context, volume_id, host)
+        self.volume_rpcapi.create_volume(context, updated_volume, host,
+                                    snapshot_id, image_id)
index 9f5646dd92419d797eda4a122f41f86915bd9166..be1c3f320943ddf2111b6d3a6650406625e1da77 100644 (file)
@@ -25,14 +25,11 @@ from cinder import db
 from cinder import flags
 from cinder.openstack.common import cfg
 from cinder.openstack.common import importutils
-from cinder.openstack.common import log as logging
-from cinder.openstack.common import rpc
 from cinder.openstack.common import timeutils
 from cinder import utils
+from cinder.volume import rpcapi as volume_rpcapi
 
 
-LOG = logging.getLogger(__name__)
-
 scheduler_driver_opts = [
     cfg.StrOpt('scheduler_host_manager',
                default='cinder.scheduler.host_manager.HostManager',
@@ -43,36 +40,14 @@ FLAGS = flags.FLAGS
 FLAGS.register_opts(scheduler_driver_opts)
 
 
-def cast_to_volume_host(context, host, method, update_db=True, **kwargs):
-    """Cast request to a volume host queue"""
-
-    if update_db:
-        volume_id = kwargs.get('volume_id', None)
-        if volume_id is not None:
-            now = timeutils.utcnow()
-            db.volume_update(context, volume_id,
-                    {'host': host, 'scheduled_at': now})
-    rpc.cast(context,
-             rpc.queue_get_for(context, FLAGS.volume_topic, host),
-             {"method": method, "args": kwargs})
-    LOG.debug(_("Casted '%(method)s' to host '%(host)s'") % locals())
-
-
-def cast_to_host(context, topic, host, method, update_db=True, **kwargs):
-    """Generic cast to host"""
-
-    topic_mapping = {
-            "volume": cast_to_volume_host}
+def volume_update_db(context, volume_id, host):
+    '''Set the host and set the scheduled_at field of a volume.
 
-    func = topic_mapping.get(topic)
-    if func:
-        func(context, host, method, update_db=update_db, **kwargs)
-    else:
-        rpc.cast(context,
-                 rpc.queue_get_for(context, topic, host),
-                 {"method": method, "args": kwargs})
-        LOG.debug(_("Casted '%(method)s' to %(topic)s '%(host)s'")
-                % locals())
+    :returns: A Volume with the updated fields set properly.
+    '''
+    now = timeutils.utcnow()
+    values = {'host': host, 'scheduled_at': now}
+    return db.volume_update(context, volume_id, values)
 
 
 class Scheduler(object):
@@ -81,6 +56,7 @@ class Scheduler(object):
     def __init__(self):
         self.host_manager = importutils.import_object(
                 FLAGS.scheduler_host_manager)
+        self.volume_rpcapi = volume_rpcapi.VolumeAPI()
 
     def get_host_list(self):
         """Get a list of hosts from the HostManager."""
index 6ced96a51401abc2e3209f321f8f2a14fbe9a8c2..9e138a79f2a774bcc1d4323aea10c0fa6fb23dce 100644 (file)
@@ -62,9 +62,11 @@ class SimpleScheduler(chance.ChanceScheduler):
             service = db.service_get_by_args(elevated, host, topic)
             if not utils.service_is_up(service):
                 raise exception.WillNotSchedule(host=host)
-            driver.cast_to_volume_host(context, host, 'create_volume',
-                    volume_id=volume_id, snapshot_id=snapshot_id,
-                    image_id=image_id)
+            updated_volume = driver.volume_update_db(context, volume_id, host)
+            self.volume_rpcapi.create_volume(context, updated_volume,
+                    host,
+                    snapshot_id,
+                    image_id)
             return None
 
         results = db.service_get_all_volume_sorted(elevated)
@@ -77,9 +79,12 @@ class SimpleScheduler(chance.ChanceScheduler):
                 msg = _("Not enough allocatable volume gigabytes remaining")
                 raise exception.NoValidHost(reason=msg)
             if utils.service_is_up(service) and not service['disabled']:
-                driver.cast_to_volume_host(context, service['host'],
-                        'create_volume', volume_id=volume_id,
-                        snapshot_id=snapshot_id, image_id=image_id)
+                updated_volume = driver.volume_update_db(context, volume_id,
+                                                         service['host'])
+                self.volume_rpcapi.create_volume(context, updated_volume,
+                                            service['host'],
+                                            snapshot_id,
+                                            image_id)
                 return None
         msg = _("Is the appropriate service running?")
         raise exception.NoValidHost(reason=msg)
index d0026a7c275a447be2213cadff7b4134e680754b..97fe9b8fdf75fb75605aa6a5410e944c608110af 100644 (file)
@@ -24,13 +24,13 @@ from cinder import context
 from cinder import db
 from cinder import exception
 from cinder import flags
-from cinder.openstack.common import rpc
 from cinder.openstack.common import timeutils
 from cinder.scheduler import driver
 from cinder.scheduler import manager
 from cinder import test
 from cinder import utils
 
+
 FLAGS = flags.FLAGS
 
 
@@ -179,97 +179,13 @@ class SchedulerDriverModuleTestCase(test.TestCase):
         super(SchedulerDriverModuleTestCase, self).setUp()
         self.context = context.RequestContext('fake_user', 'fake_project')
 
-    def test_cast_to_volume_host_update_db_with_volume_id(self):
-        host = 'fake_host1'
-        method = 'fake_method'
-        fake_kwargs = {'volume_id': 31337,
-                       'extra_arg': 'meow'}
-        queue = 'fake_queue'
-
+    def test_volume_host_update_db(self):
         self.mox.StubOutWithMock(timeutils, 'utcnow')
         self.mox.StubOutWithMock(db, 'volume_update')
-        self.mox.StubOutWithMock(rpc, 'queue_get_for')
-        self.mox.StubOutWithMock(rpc, 'cast')
 
         timeutils.utcnow().AndReturn('fake-now')
         db.volume_update(self.context, 31337,
-                {'host': host, 'scheduled_at': 'fake-now'})
-        rpc.queue_get_for(self.context,
-                         FLAGS.volume_topic, host).AndReturn(queue)
-        rpc.cast(self.context, queue,
-                {'method': method,
-                 'args': fake_kwargs})
-
-        self.mox.ReplayAll()
-        driver.cast_to_volume_host(self.context, host, method,
-                update_db=True, **fake_kwargs)
-
-    def test_cast_to_volume_host_update_db_without_volume_id(self):
-        host = 'fake_host1'
-        method = 'fake_method'
-        fake_kwargs = {'extra_arg': 'meow'}
-        queue = 'fake_queue'
-
-        self.mox.StubOutWithMock(rpc, 'queue_get_for')
-        self.mox.StubOutWithMock(rpc, 'cast')
-
-        rpc.queue_get_for(self.context,
-                         FLAGS.volume_topic, host).AndReturn(queue)
-        rpc.cast(self.context, queue,
-                {'method': method,
-                 'args': fake_kwargs})
-
-        self.mox.ReplayAll()
-        driver.cast_to_volume_host(self.context, host, method,
-                update_db=True, **fake_kwargs)
-
-    def test_cast_to_volume_host_no_update_db(self):
-        host = 'fake_host1'
-        method = 'fake_method'
-        fake_kwargs = {'extra_arg': 'meow'}
-        queue = 'fake_queue'
-
-        self.mox.StubOutWithMock(rpc, 'queue_get_for')
-        self.mox.StubOutWithMock(rpc, 'cast')
-
-        rpc.queue_get_for(self.context,
-                         FLAGS.volume_topic, host).AndReturn(queue)
-        rpc.cast(self.context, queue,
-                {'method': method,
-                 'args': fake_kwargs})
-
-        self.mox.ReplayAll()
-        driver.cast_to_volume_host(self.context, host, method,
-                update_db=False, **fake_kwargs)
-
-    def test_cast_to_host_volume_topic(self):
-        host = 'fake_host1'
-        method = 'fake_method'
-        fake_kwargs = {'extra_arg': 'meow'}
-
-        self.mox.StubOutWithMock(driver, 'cast_to_volume_host')
-        driver.cast_to_volume_host(self.context, host, method,
-                update_db=False, **fake_kwargs)
-
-        self.mox.ReplayAll()
-        driver.cast_to_host(self.context, 'volume', host, method,
-                update_db=False, **fake_kwargs)
-
-    def test_cast_to_host_unknown_topic(self):
-        host = 'fake_host1'
-        method = 'fake_method'
-        fake_kwargs = {'extra_arg': 'meow'}
-        topic = 'unknown'
-        queue = 'fake_queue'
-
-        self.mox.StubOutWithMock(rpc, 'queue_get_for')
-        self.mox.StubOutWithMock(rpc, 'cast')
-
-        rpc.queue_get_for(self.context, topic, host).AndReturn(queue)
-        rpc.cast(self.context, queue,
-                {'method': method,
-                 'args': fake_kwargs})
+                {'host': 'fake_host', 'scheduled_at': 'fake-now'})
 
         self.mox.ReplayAll()
-        driver.cast_to_host(self.context, topic, host, method,
-                update_db=False, **fake_kwargs)
+        driver.volume_update_db(self.context, 31337, 'fake_host')
diff --git a/cinder/tests/test_volume_rpcapi.py b/cinder/tests/test_volume_rpcapi.py
new file mode 100644 (file)
index 0000000..4cd571f
--- /dev/null
@@ -0,0 +1,164 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012, Intel, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Unit Tests for cinder.volume.rpcapi
+"""
+
+
+from cinder import context
+from cinder import db
+from cinder import flags
+from cinder.openstack.common import jsonutils
+from cinder.openstack.common import rpc
+from cinder import test
+from cinder.volume import rpcapi as volume_rpcapi
+
+
+FLAGS = flags.FLAGS
+
+
+class VolumeRpcAPITestCase(test.TestCase):
+
+    def setUp(self):
+        self.context = context.get_admin_context()
+        vol = {}
+        vol['host'] = 'fake_host'
+        vol['availability_zone'] = FLAGS.storage_availability_zone
+        vol['status'] = "available"
+        vol['attach_status'] = "detached"
+        volume = db.volume_create(self.context, vol)
+
+        snpshot = {
+            'volume_id': 'fake_id',
+            'status': "creating",
+            'progress': '0%',
+            'volume_size': 0,
+            'display_name': 'fake_name',
+            'display_description': 'fake_description'}
+        snapshot = db.snapshot_create(self.context, snpshot)
+        self.fake_volume = jsonutils.to_primitive(volume)
+        self.fake_snapshot = jsonutils.to_primitive(snapshot)
+        super(VolumeRpcAPITestCase, self).setUp()
+
+    def test_serialized_volume_has_id(self):
+        self.assertTrue('id' in self.fake_volume)
+
+    def _test_volume_api(self, method, rpc_method, **kwargs):
+        ctxt = context.RequestContext('fake_user', 'fake_project')
+
+        if 'rpcapi_class' in kwargs:
+            rpcapi_class = kwargs['rpcapi_class']
+            del kwargs['rpcapi_class']
+        else:
+            rpcapi_class = volume_rpcapi.VolumeAPI
+        rpcapi = rpcapi_class()
+        expected_retval = 'foo' if method == 'call' else None
+
+        expected_version = kwargs.pop('version', rpcapi.BASE_RPC_API_VERSION)
+        expected_msg = rpcapi.make_msg(method, **kwargs)
+        if 'volume' in expected_msg['args']:
+            volume = expected_msg['args']['volume']
+            del expected_msg['args']['volume']
+            expected_msg['args']['volume_id'] = volume['id']
+        if 'snapshot' in expected_msg['args']:
+            snapshot = expected_msg['args']['snapshot']
+            del expected_msg['args']['snapshot']
+            expected_msg['args']['snapshot_id'] = snapshot['id']
+        if 'host' in expected_msg['args']:
+            del expected_msg['args']['host']
+
+        expected_msg['version'] = expected_version
+
+        if 'host' in kwargs:
+            host = kwargs['host']
+        else:
+            host = kwargs['volume']['host']
+        expected_topic = '%s.%s' % (FLAGS.volume_topic, host)
+
+        self.fake_args = None
+        self.fake_kwargs = None
+
+        def _fake_rpc_method(*args, **kwargs):
+            self.fake_args = args
+            self.fake_kwargs = kwargs
+            if expected_retval:
+                return expected_retval
+
+        self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
+
+        retval = getattr(rpcapi, method)(ctxt, **kwargs)
+
+        self.assertEqual(retval, expected_retval)
+        expected_args = [ctxt, expected_topic, expected_msg]
+        for arg, expected_arg in zip(self.fake_args, expected_args):
+            self.assertEqual(arg, expected_arg)
+
+    def test_create_volume(self):
+        self._test_volume_api('create_volume',
+                            rpc_method='cast',
+                            volume=self.fake_volume,
+                            host='fake_host1',
+                            snapshot_id='fake_snapshot_id',
+                            image_id='fake_image_id')
+
+    def test_delete_volume(self):
+        self._test_volume_api('delete_volume',
+                            rpc_method='cast',
+                            volume=self.fake_volume)
+
+    def test_create_snapshot(self):
+        self._test_volume_api('create_snapshot',
+                            rpc_method='cast',
+                            volume=self.fake_volume,
+                            snapshot=self.fake_snapshot)
+
+    def test_delete_snapshot(self):
+        self._test_volume_api('delete_snapshot',
+                            rpc_method='cast',
+                            snapshot=self.fake_snapshot,
+                            host='fake_host')
+
+    def test_attach_volume(self):
+        self._test_volume_api('attach_volume',
+                            rpc_method='call',
+                            volume=self.fake_volume,
+                            instance_uuid='fake_uuid',
+                            mountpoint='fake_mountpoint')
+
+    def test_detach_volume(self):
+        self._test_volume_api('detach_volume',
+                            rpc_method='call',
+                            volume=self.fake_volume)
+
+    def test_copy_volume_to_image(self):
+        self._test_volume_api('copy_volume_to_image',
+                            rpc_method='cast',
+                            volume=self.fake_volume,
+                            image_id='fake_image_id')
+
+    def test_initialize_connection(self):
+        self._test_volume_api('initialize_connection',
+                            rpc_method='call',
+                            volume=self.fake_volume,
+                            connector='fake_connector')
+
+    def test_terminate_connection(self):
+        self._test_volume_api('terminate_connection',
+                            rpc_method='call',
+                            volume=self.fake_volume,
+                            connector='fake_connector',
+                            force=False)
index 7ba5969bb540d0f119b45048820170101364ec93..faed8a35b841065b86a7157cb4aad87fa8d89d33 100644 (file)
@@ -34,7 +34,7 @@ from cinder.openstack.common import timeutils
 import cinder.policy
 from cinder import quota
 from cinder.scheduler import rpcapi as scheduler_rpcapi
-from cinder.volume import volume_types
+from cinder.volume import rpcapi as volume_rpcapi
 from cinder.volume import volume_types
 
 volume_host_opt = cfg.BoolOpt('snapshot_same_host',
@@ -81,6 +81,7 @@ class API(base.Base):
         self.image_service = (image_service or
                               glance.get_default_image_service())
         self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
+        self.volume_rpcapi = volume_rpcapi.VolumeAPI()
         super(API, self).__init__(db_driver)
 
     def create(self, context, size, name, description, snapshot=None,
@@ -207,24 +208,20 @@ class API(base.Base):
             snapshot_ref = self.db.snapshot_get(context, snapshot_id)
             src_volume_ref = self.db.volume_get(context,
                                                 snapshot_ref['volume_id'])
-            topic = rpc.queue_get_for(context,
-                                      FLAGS.volume_topic,
-                                      src_volume_ref['host'])
             # bypass scheduler and send request directly to volume
-            rpc.cast(context,
-                     topic,
-                     {"method": "create_volume",
-                      "args": {"volume_id": volume_id,
-                               "snapshot_id": snapshot_id,
-                               "image_id": image_id}})
+            self.volume_rpcapi.create_volume(context,
+                                            src_volume_ref,
+                                            src_volume_ref['host'],
+                                            snapshot_id,
+                                            image_id)
         else:
             self.scheduler_rpcapi.create_volume(context,
-                FLAGS.volume_topic,
-                volume_id,
-                snapshot_id,
-                image_id,
-                request_spec=request_spec,
-                filter_properties=filter_properties)
+                                    FLAGS.volume_topic,
+                                    volume_id,
+                                    snapshot_id,
+                                    image_id,
+                                    request_spec=request_spec,
+                                    filter_properties=filter_properties)
 
     @wrap_check_policy
     def delete(self, context, volume, force=False):
@@ -256,11 +253,8 @@ class API(base.Base):
         now = timeutils.utcnow()
         self.db.volume_update(context, volume_id, {'status': 'deleting',
                                                    'terminated_at': now})
-        host = volume['host']
-        rpc.cast(context,
-                 rpc.queue_get_for(context, FLAGS.volume_topic, host),
-                 {"method": "delete_volume",
-                  "args": {"volume_id": volume_id}})
+
+        self.volume_rpcapi.delete_volume(context, volume)
 
     @wrap_check_policy
     def update(self, context, volume, fields):
@@ -388,40 +382,28 @@ class API(base.Base):
 
     @wrap_check_policy
     def attach(self, context, volume, instance_uuid, mountpoint):
-        host = volume['host']
-        queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
-        return rpc.call(context, queue,
-                        {"method": "attach_volume",
-                         "args": {"volume_id": volume['id'],
-                                  "instance_uuid": instance_uuid,
-                                  "mountpoint": mountpoint}})
+        return self.volume_rpcapi.attach_volume(context,
+                                        volume,
+                                        instance_uuid,
+                                        mountpoint)
 
     @wrap_check_policy
     def detach(self, context, volume):
-        host = volume['host']
-        queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
-        return rpc.call(context, queue,
-                 {"method": "detach_volume",
-                  "args": {"volume_id": volume['id']}})
+        return self.volume_rpcapi.detach_volume(context, volume)
 
     @wrap_check_policy
     def initialize_connection(self, context, volume, connector):
-        host = volume['host']
-        queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
-        return rpc.call(context, queue,
-                        {"method": "initialize_connection",
-                         "args": {"volume_id": volume['id'],
-                                  "connector": connector}})
+        return self.volume_rpcapi.initialize_connection(context,
+                                                volume,
+                                                connector)
 
     @wrap_check_policy
     def terminate_connection(self, context, volume, connector, force=False):
         self.unreserve_volume(context, volume)
-        host = volume['host']
-        queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
-        return rpc.call(context, queue,
-                        {"method": "terminate_connection",
-                         "args": {"volume_id": volume['id'],
-                                  "connector": connector, 'force': force}})
+        return self.volume_rpcapi.terminate_connection(context,
+                                                volume,
+                                                connector,
+                                                force)
 
     def _create_snapshot(self, context, volume, name, description,
                          force=False):
@@ -442,12 +424,8 @@ class API(base.Base):
             'display_description': description}
 
         snapshot = self.db.snapshot_create(context, options)
-        host = volume['host']
-        rpc.cast(context,
-                 rpc.queue_get_for(context, FLAGS.volume_topic, host),
-                 {"method": "create_snapshot",
-                  "args": {"volume_id": volume['id'],
-                           "snapshot_id": snapshot['id']}})
+        self.volume_rpcapi.create_snapshot(context, volume, snapshot)
+
         return snapshot
 
     def create_snapshot(self, context, volume, name, description):
@@ -466,11 +444,7 @@ class API(base.Base):
         self.db.snapshot_update(context, snapshot['id'],
                                 {'status': 'deleting'})
         volume = self.db.volume_get(context, snapshot['volume_id'])
-        host = volume['host']
-        rpc.cast(context,
-                 rpc.queue_get_for(context, FLAGS.volume_topic, host),
-                 {"method": "delete_snapshot",
-                  "args": {"snapshot_id": snapshot['id']}})
+        self.volume_rpcapi.delete_snapshot(context, snapshot, volume['host'])
 
     @wrap_check_policy
     def update_snapshot(self, context, snapshot, fields):
@@ -529,13 +503,8 @@ class API(base.Base):
 
         recv_metadata = self.image_service.create(context, metadata)
         self.update(context, volume, {'status': 'uploading'})
-        rpc.cast(context,
-                 rpc.queue_get_for(context,
-                                   FLAGS.volume_topic,
-                                   volume['host']),
-                 {"method": "copy_volume_to_image",
-                  "args": {"volume_id": volume['id'],
-                           "image_id": recv_metadata['id']}})
+        self.volume_rpcapi.copy_volume_to_image(context, volume,
+                                            recv_metadata['id'])
 
         response = {"id": volume['id'],
                "updated_at": volume['updated_at'],
index 378259e10f4bc9bd0aa607615ccc8afc0e68f130..7e5c2bb56429a00b209847d0c891aa6db56db042 100644 (file)
@@ -77,6 +77,9 @@ MAPPING = {
 
 class VolumeManager(manager.SchedulerDependentManager):
     """Manages attachable block storage devices."""
+
+    RPC_API_VERSION = '1.0'
+
     def __init__(self, volume_driver=None, *args, **kwargs):
         """Load the driver from the one specified in args, or from flags."""
         if not volume_driver:
diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py
new file mode 100644 (file)
index 0000000..640875c
--- /dev/null
@@ -0,0 +1,97 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012, Intel, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Client side of the volume RPC API.
+"""
+
+from cinder import exception
+from cinder import flags
+from cinder.openstack.common import rpc
+import cinder.openstack.common.rpc.proxy
+
+
+FLAGS = flags.FLAGS
+
+
+class VolumeAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
+    '''Client side of the volume rpc API.
+
+    API version history:
+
+        1.0 - Initial version.
+    '''
+
+    BASE_RPC_API_VERSION = '1.0'
+
+    def __init__(self):
+        super(VolumeAPI, self).__init__(topic=FLAGS.volume_topic,
+            default_version=self.BASE_RPC_API_VERSION)
+
+    def create_volume(self, ctxt, volume, host,
+                      snapshot_id=None, image_id=None):
+        self.cast(ctxt, self.make_msg('create_volume',
+                                      volume_id=volume['id'],
+                                      snapshot_id=snapshot_id,
+                                      image_id=image_id),
+                topic=rpc.queue_get_for(ctxt, self.topic, host))
+
+    def delete_volume(self, ctxt, volume):
+        self.cast(ctxt, self.make_msg('delete_volume',
+                                      volume_id=volume['id']),
+                topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))
+
+    def create_snapshot(self, ctxt, volume, snapshot):
+        self.cast(ctxt, self.make_msg('create_snapshot',
+                                      volume_id=volume['id'],
+                                      snapshot_id=snapshot['id']),
+                topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))
+
+    def delete_snapshot(self, ctxt, snapshot, host):
+        self.cast(ctxt, self.make_msg('delete_snapshot',
+                                      snapshot_id=snapshot['id']),
+                topic=rpc.queue_get_for(ctxt, self.topic, host))
+
+    def attach_volume(self, ctxt, volume, instance_uuid, mountpoint):
+        return self.call(ctxt, self.make_msg('attach_volume',
+                                      volume_id=volume['id'],
+                                      instance_uuid=instance_uuid,
+                                      mountpoint=mountpoint),
+                    topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))
+
+    def detach_volume(self, ctxt, volume):
+        return self.call(ctxt, self.make_msg('detach_volume',
+                                      volume_id=volume['id']),
+                    topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))
+
+    def copy_volume_to_image(self, ctxt, volume, image_id):
+        self.cast(ctxt, self.make_msg('copy_volume_to_image',
+                                      volume_id=volume['id'],
+                                      image_id=image_id),
+                topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))
+
+    def initialize_connection(self, ctxt, volume, connector):
+        return self.call(ctxt, self.make_msg('initialize_connection',
+                                      volume_id=volume['id'],
+                                      connector=connector),
+                    topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))
+
+    def terminate_connection(self, ctxt, volume, connector, force=False):
+        return self.call(ctxt, self.make_msg('terminate_connection',
+                                      volume_id=volume['id'],
+                                      connector=connector,
+                                      force=force),
+                    topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))