volume_ref = volume_get(context, volume_id, session=session)
volume_ref.update(values)
volume_ref.save(session=session)
+ return volume_ref
####################
snapshot_id = request_spec['snapshot_id']
image_id = request_spec['image_id']
- driver.cast_to_host(context, topic, host, 'create_volume',
- volume_id=volume_id,
- snapshot_id=snapshot_id,
- image_id=image_id)
+ updated_volume = driver.volume_update_db(context, volume_id, host)
+ self.volume_rpcapi.create_volume(context, updated_volume, host,
+ snapshot_id, image_id)
from cinder import flags
from cinder.openstack.common import cfg
from cinder.openstack.common import importutils
-from cinder.openstack.common import log as logging
-from cinder.openstack.common import rpc
from cinder.openstack.common import timeutils
from cinder import utils
+from cinder.volume import rpcapi as volume_rpcapi
-LOG = logging.getLogger(__name__)
-
scheduler_driver_opts = [
cfg.StrOpt('scheduler_host_manager',
default='cinder.scheduler.host_manager.HostManager',
FLAGS.register_opts(scheduler_driver_opts)
-def cast_to_volume_host(context, host, method, update_db=True, **kwargs):
- """Cast request to a volume host queue"""
-
- if update_db:
- volume_id = kwargs.get('volume_id', None)
- if volume_id is not None:
- now = timeutils.utcnow()
- db.volume_update(context, volume_id,
- {'host': host, 'scheduled_at': now})
- rpc.cast(context,
- rpc.queue_get_for(context, FLAGS.volume_topic, host),
- {"method": method, "args": kwargs})
- LOG.debug(_("Casted '%(method)s' to host '%(host)s'") % locals())
-
-
-def cast_to_host(context, topic, host, method, update_db=True, **kwargs):
- """Generic cast to host"""
-
- topic_mapping = {
- "volume": cast_to_volume_host}
+def volume_update_db(context, volume_id, host):
+ '''Set the host and set the scheduled_at field of a volume.
- func = topic_mapping.get(topic)
- if func:
- func(context, host, method, update_db=update_db, **kwargs)
- else:
- rpc.cast(context,
- rpc.queue_get_for(context, topic, host),
- {"method": method, "args": kwargs})
- LOG.debug(_("Casted '%(method)s' to %(topic)s '%(host)s'")
- % locals())
+ :returns: A Volume with the updated fields set properly.
+ '''
+ now = timeutils.utcnow()
+ values = {'host': host, 'scheduled_at': now}
+ return db.volume_update(context, volume_id, values)
class Scheduler(object):
def __init__(self):
self.host_manager = importutils.import_object(
FLAGS.scheduler_host_manager)
+ self.volume_rpcapi = volume_rpcapi.VolumeAPI()
def get_host_list(self):
"""Get a list of hosts from the HostManager."""
service = db.service_get_by_args(elevated, host, topic)
if not utils.service_is_up(service):
raise exception.WillNotSchedule(host=host)
- driver.cast_to_volume_host(context, host, 'create_volume',
- volume_id=volume_id, snapshot_id=snapshot_id,
- image_id=image_id)
+ updated_volume = driver.volume_update_db(context, volume_id, host)
+ self.volume_rpcapi.create_volume(context, updated_volume,
+ host,
+ snapshot_id,
+ image_id)
return None
results = db.service_get_all_volume_sorted(elevated)
msg = _("Not enough allocatable volume gigabytes remaining")
raise exception.NoValidHost(reason=msg)
if utils.service_is_up(service) and not service['disabled']:
- driver.cast_to_volume_host(context, service['host'],
- 'create_volume', volume_id=volume_id,
- snapshot_id=snapshot_id, image_id=image_id)
+ updated_volume = driver.volume_update_db(context, volume_id,
+ service['host'])
+ self.volume_rpcapi.create_volume(context, updated_volume,
+ service['host'],
+ snapshot_id,
+ image_id)
return None
msg = _("Is the appropriate service running?")
raise exception.NoValidHost(reason=msg)
from cinder import db
from cinder import exception
from cinder import flags
-from cinder.openstack.common import rpc
from cinder.openstack.common import timeutils
from cinder.scheduler import driver
from cinder.scheduler import manager
from cinder import test
from cinder import utils
+
FLAGS = flags.FLAGS
super(SchedulerDriverModuleTestCase, self).setUp()
self.context = context.RequestContext('fake_user', 'fake_project')
- def test_cast_to_volume_host_update_db_with_volume_id(self):
- host = 'fake_host1'
- method = 'fake_method'
- fake_kwargs = {'volume_id': 31337,
- 'extra_arg': 'meow'}
- queue = 'fake_queue'
-
+ def test_volume_host_update_db(self):
self.mox.StubOutWithMock(timeutils, 'utcnow')
self.mox.StubOutWithMock(db, 'volume_update')
- self.mox.StubOutWithMock(rpc, 'queue_get_for')
- self.mox.StubOutWithMock(rpc, 'cast')
timeutils.utcnow().AndReturn('fake-now')
db.volume_update(self.context, 31337,
- {'host': host, 'scheduled_at': 'fake-now'})
- rpc.queue_get_for(self.context,
- FLAGS.volume_topic, host).AndReturn(queue)
- rpc.cast(self.context, queue,
- {'method': method,
- 'args': fake_kwargs})
-
- self.mox.ReplayAll()
- driver.cast_to_volume_host(self.context, host, method,
- update_db=True, **fake_kwargs)
-
- def test_cast_to_volume_host_update_db_without_volume_id(self):
- host = 'fake_host1'
- method = 'fake_method'
- fake_kwargs = {'extra_arg': 'meow'}
- queue = 'fake_queue'
-
- self.mox.StubOutWithMock(rpc, 'queue_get_for')
- self.mox.StubOutWithMock(rpc, 'cast')
-
- rpc.queue_get_for(self.context,
- FLAGS.volume_topic, host).AndReturn(queue)
- rpc.cast(self.context, queue,
- {'method': method,
- 'args': fake_kwargs})
-
- self.mox.ReplayAll()
- driver.cast_to_volume_host(self.context, host, method,
- update_db=True, **fake_kwargs)
-
- def test_cast_to_volume_host_no_update_db(self):
- host = 'fake_host1'
- method = 'fake_method'
- fake_kwargs = {'extra_arg': 'meow'}
- queue = 'fake_queue'
-
- self.mox.StubOutWithMock(rpc, 'queue_get_for')
- self.mox.StubOutWithMock(rpc, 'cast')
-
- rpc.queue_get_for(self.context,
- FLAGS.volume_topic, host).AndReturn(queue)
- rpc.cast(self.context, queue,
- {'method': method,
- 'args': fake_kwargs})
-
- self.mox.ReplayAll()
- driver.cast_to_volume_host(self.context, host, method,
- update_db=False, **fake_kwargs)
-
- def test_cast_to_host_volume_topic(self):
- host = 'fake_host1'
- method = 'fake_method'
- fake_kwargs = {'extra_arg': 'meow'}
-
- self.mox.StubOutWithMock(driver, 'cast_to_volume_host')
- driver.cast_to_volume_host(self.context, host, method,
- update_db=False, **fake_kwargs)
-
- self.mox.ReplayAll()
- driver.cast_to_host(self.context, 'volume', host, method,
- update_db=False, **fake_kwargs)
-
- def test_cast_to_host_unknown_topic(self):
- host = 'fake_host1'
- method = 'fake_method'
- fake_kwargs = {'extra_arg': 'meow'}
- topic = 'unknown'
- queue = 'fake_queue'
-
- self.mox.StubOutWithMock(rpc, 'queue_get_for')
- self.mox.StubOutWithMock(rpc, 'cast')
-
- rpc.queue_get_for(self.context, topic, host).AndReturn(queue)
- rpc.cast(self.context, queue,
- {'method': method,
- 'args': fake_kwargs})
+ {'host': 'fake_host', 'scheduled_at': 'fake-now'})
self.mox.ReplayAll()
- driver.cast_to_host(self.context, topic, host, method,
- update_db=False, **fake_kwargs)
+ driver.volume_update_db(self.context, 31337, 'fake_host')
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012, Intel, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Unit Tests for cinder.volume.rpcapi
+"""
+
+
+from cinder import context
+from cinder import db
+from cinder import flags
+from cinder.openstack.common import jsonutils
+from cinder.openstack.common import rpc
+from cinder import test
+from cinder.volume import rpcapi as volume_rpcapi
+
+
+FLAGS = flags.FLAGS
+
+
+class VolumeRpcAPITestCase(test.TestCase):
+
+ def setUp(self):
+ self.context = context.get_admin_context()
+ vol = {}
+ vol['host'] = 'fake_host'
+ vol['availability_zone'] = FLAGS.storage_availability_zone
+ vol['status'] = "available"
+ vol['attach_status'] = "detached"
+ volume = db.volume_create(self.context, vol)
+
+ snpshot = {
+ 'volume_id': 'fake_id',
+ 'status': "creating",
+ 'progress': '0%',
+ 'volume_size': 0,
+ 'display_name': 'fake_name',
+ 'display_description': 'fake_description'}
+ snapshot = db.snapshot_create(self.context, snpshot)
+ self.fake_volume = jsonutils.to_primitive(volume)
+ self.fake_snapshot = jsonutils.to_primitive(snapshot)
+ super(VolumeRpcAPITestCase, self).setUp()
+
+ def test_serialized_volume_has_id(self):
+ self.assertTrue('id' in self.fake_volume)
+
+ def _test_volume_api(self, method, rpc_method, **kwargs):
+ ctxt = context.RequestContext('fake_user', 'fake_project')
+
+ if 'rpcapi_class' in kwargs:
+ rpcapi_class = kwargs['rpcapi_class']
+ del kwargs['rpcapi_class']
+ else:
+ rpcapi_class = volume_rpcapi.VolumeAPI
+ rpcapi = rpcapi_class()
+ expected_retval = 'foo' if method == 'call' else None
+
+ expected_version = kwargs.pop('version', rpcapi.BASE_RPC_API_VERSION)
+ expected_msg = rpcapi.make_msg(method, **kwargs)
+ if 'volume' in expected_msg['args']:
+ volume = expected_msg['args']['volume']
+ del expected_msg['args']['volume']
+ expected_msg['args']['volume_id'] = volume['id']
+ if 'snapshot' in expected_msg['args']:
+ snapshot = expected_msg['args']['snapshot']
+ del expected_msg['args']['snapshot']
+ expected_msg['args']['snapshot_id'] = snapshot['id']
+ if 'host' in expected_msg['args']:
+ del expected_msg['args']['host']
+
+ expected_msg['version'] = expected_version
+
+ if 'host' in kwargs:
+ host = kwargs['host']
+ else:
+ host = kwargs['volume']['host']
+ expected_topic = '%s.%s' % (FLAGS.volume_topic, host)
+
+ self.fake_args = None
+ self.fake_kwargs = None
+
+ def _fake_rpc_method(*args, **kwargs):
+ self.fake_args = args
+ self.fake_kwargs = kwargs
+ if expected_retval:
+ return expected_retval
+
+ self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
+
+ retval = getattr(rpcapi, method)(ctxt, **kwargs)
+
+ self.assertEqual(retval, expected_retval)
+ expected_args = [ctxt, expected_topic, expected_msg]
+ for arg, expected_arg in zip(self.fake_args, expected_args):
+ self.assertEqual(arg, expected_arg)
+
+ def test_create_volume(self):
+ self._test_volume_api('create_volume',
+ rpc_method='cast',
+ volume=self.fake_volume,
+ host='fake_host1',
+ snapshot_id='fake_snapshot_id',
+ image_id='fake_image_id')
+
+ def test_delete_volume(self):
+ self._test_volume_api('delete_volume',
+ rpc_method='cast',
+ volume=self.fake_volume)
+
+ def test_create_snapshot(self):
+ self._test_volume_api('create_snapshot',
+ rpc_method='cast',
+ volume=self.fake_volume,
+ snapshot=self.fake_snapshot)
+
+ def test_delete_snapshot(self):
+ self._test_volume_api('delete_snapshot',
+ rpc_method='cast',
+ snapshot=self.fake_snapshot,
+ host='fake_host')
+
+ def test_attach_volume(self):
+ self._test_volume_api('attach_volume',
+ rpc_method='call',
+ volume=self.fake_volume,
+ instance_uuid='fake_uuid',
+ mountpoint='fake_mountpoint')
+
+ def test_detach_volume(self):
+ self._test_volume_api('detach_volume',
+ rpc_method='call',
+ volume=self.fake_volume)
+
+ def test_copy_volume_to_image(self):
+ self._test_volume_api('copy_volume_to_image',
+ rpc_method='cast',
+ volume=self.fake_volume,
+ image_id='fake_image_id')
+
+ def test_initialize_connection(self):
+ self._test_volume_api('initialize_connection',
+ rpc_method='call',
+ volume=self.fake_volume,
+ connector='fake_connector')
+
+ def test_terminate_connection(self):
+ self._test_volume_api('terminate_connection',
+ rpc_method='call',
+ volume=self.fake_volume,
+ connector='fake_connector',
+ force=False)
import cinder.policy
from cinder import quota
from cinder.scheduler import rpcapi as scheduler_rpcapi
-from cinder.volume import volume_types
+from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import volume_types
volume_host_opt = cfg.BoolOpt('snapshot_same_host',
self.image_service = (image_service or
glance.get_default_image_service())
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
+ self.volume_rpcapi = volume_rpcapi.VolumeAPI()
super(API, self).__init__(db_driver)
def create(self, context, size, name, description, snapshot=None,
snapshot_ref = self.db.snapshot_get(context, snapshot_id)
src_volume_ref = self.db.volume_get(context,
snapshot_ref['volume_id'])
- topic = rpc.queue_get_for(context,
- FLAGS.volume_topic,
- src_volume_ref['host'])
# bypass scheduler and send request directly to volume
- rpc.cast(context,
- topic,
- {"method": "create_volume",
- "args": {"volume_id": volume_id,
- "snapshot_id": snapshot_id,
- "image_id": image_id}})
+ self.volume_rpcapi.create_volume(context,
+ src_volume_ref,
+ src_volume_ref['host'],
+ snapshot_id,
+ image_id)
else:
self.scheduler_rpcapi.create_volume(context,
- FLAGS.volume_topic,
- volume_id,
- snapshot_id,
- image_id,
- request_spec=request_spec,
- filter_properties=filter_properties)
+ FLAGS.volume_topic,
+ volume_id,
+ snapshot_id,
+ image_id,
+ request_spec=request_spec,
+ filter_properties=filter_properties)
@wrap_check_policy
def delete(self, context, volume, force=False):
now = timeutils.utcnow()
self.db.volume_update(context, volume_id, {'status': 'deleting',
'terminated_at': now})
- host = volume['host']
- rpc.cast(context,
- rpc.queue_get_for(context, FLAGS.volume_topic, host),
- {"method": "delete_volume",
- "args": {"volume_id": volume_id}})
+
+ self.volume_rpcapi.delete_volume(context, volume)
@wrap_check_policy
def update(self, context, volume, fields):
@wrap_check_policy
def attach(self, context, volume, instance_uuid, mountpoint):
- host = volume['host']
- queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
- return rpc.call(context, queue,
- {"method": "attach_volume",
- "args": {"volume_id": volume['id'],
- "instance_uuid": instance_uuid,
- "mountpoint": mountpoint}})
+ return self.volume_rpcapi.attach_volume(context,
+ volume,
+ instance_uuid,
+ mountpoint)
@wrap_check_policy
def detach(self, context, volume):
- host = volume['host']
- queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
- return rpc.call(context, queue,
- {"method": "detach_volume",
- "args": {"volume_id": volume['id']}})
+ return self.volume_rpcapi.detach_volume(context, volume)
@wrap_check_policy
def initialize_connection(self, context, volume, connector):
- host = volume['host']
- queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
- return rpc.call(context, queue,
- {"method": "initialize_connection",
- "args": {"volume_id": volume['id'],
- "connector": connector}})
+ return self.volume_rpcapi.initialize_connection(context,
+ volume,
+ connector)
@wrap_check_policy
def terminate_connection(self, context, volume, connector, force=False):
self.unreserve_volume(context, volume)
- host = volume['host']
- queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
- return rpc.call(context, queue,
- {"method": "terminate_connection",
- "args": {"volume_id": volume['id'],
- "connector": connector, 'force': force}})
+ return self.volume_rpcapi.terminate_connection(context,
+ volume,
+ connector,
+ force)
def _create_snapshot(self, context, volume, name, description,
force=False):
'display_description': description}
snapshot = self.db.snapshot_create(context, options)
- host = volume['host']
- rpc.cast(context,
- rpc.queue_get_for(context, FLAGS.volume_topic, host),
- {"method": "create_snapshot",
- "args": {"volume_id": volume['id'],
- "snapshot_id": snapshot['id']}})
+ self.volume_rpcapi.create_snapshot(context, volume, snapshot)
+
return snapshot
def create_snapshot(self, context, volume, name, description):
self.db.snapshot_update(context, snapshot['id'],
{'status': 'deleting'})
volume = self.db.volume_get(context, snapshot['volume_id'])
- host = volume['host']
- rpc.cast(context,
- rpc.queue_get_for(context, FLAGS.volume_topic, host),
- {"method": "delete_snapshot",
- "args": {"snapshot_id": snapshot['id']}})
+ self.volume_rpcapi.delete_snapshot(context, snapshot, volume['host'])
@wrap_check_policy
def update_snapshot(self, context, snapshot, fields):
recv_metadata = self.image_service.create(context, metadata)
self.update(context, volume, {'status': 'uploading'})
- rpc.cast(context,
- rpc.queue_get_for(context,
- FLAGS.volume_topic,
- volume['host']),
- {"method": "copy_volume_to_image",
- "args": {"volume_id": volume['id'],
- "image_id": recv_metadata['id']}})
+ self.volume_rpcapi.copy_volume_to_image(context, volume,
+ recv_metadata['id'])
response = {"id": volume['id'],
"updated_at": volume['updated_at'],
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
+
+ RPC_API_VERSION = '1.0'
+
def __init__(self, volume_driver=None, *args, **kwargs):
"""Load the driver from the one specified in args, or from flags."""
if not volume_driver:
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012, Intel, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Client side of the volume RPC API.
+"""
+
+from cinder import exception
+from cinder import flags
+from cinder.openstack.common import rpc
+import cinder.openstack.common.rpc.proxy
+
+
+FLAGS = flags.FLAGS
+
+
+class VolumeAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
+ '''Client side of the volume rpc API.
+
+ API version history:
+
+ 1.0 - Initial version.
+ '''
+
+ BASE_RPC_API_VERSION = '1.0'
+
+ def __init__(self):
+ super(VolumeAPI, self).__init__(topic=FLAGS.volume_topic,
+ default_version=self.BASE_RPC_API_VERSION)
+
+ def create_volume(self, ctxt, volume, host,
+ snapshot_id=None, image_id=None):
+ self.cast(ctxt, self.make_msg('create_volume',
+ volume_id=volume['id'],
+ snapshot_id=snapshot_id,
+ image_id=image_id),
+ topic=rpc.queue_get_for(ctxt, self.topic, host))
+
+ def delete_volume(self, ctxt, volume):
+ self.cast(ctxt, self.make_msg('delete_volume',
+ volume_id=volume['id']),
+ topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))
+
+ def create_snapshot(self, ctxt, volume, snapshot):
+ self.cast(ctxt, self.make_msg('create_snapshot',
+ volume_id=volume['id'],
+ snapshot_id=snapshot['id']),
+ topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))
+
+ def delete_snapshot(self, ctxt, snapshot, host):
+ self.cast(ctxt, self.make_msg('delete_snapshot',
+ snapshot_id=snapshot['id']),
+ topic=rpc.queue_get_for(ctxt, self.topic, host))
+
+ def attach_volume(self, ctxt, volume, instance_uuid, mountpoint):
+ return self.call(ctxt, self.make_msg('attach_volume',
+ volume_id=volume['id'],
+ instance_uuid=instance_uuid,
+ mountpoint=mountpoint),
+ topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))
+
+ def detach_volume(self, ctxt, volume):
+ return self.call(ctxt, self.make_msg('detach_volume',
+ volume_id=volume['id']),
+ topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))
+
+ def copy_volume_to_image(self, ctxt, volume, image_id):
+ self.cast(ctxt, self.make_msg('copy_volume_to_image',
+ volume_id=volume['id'],
+ image_id=image_id),
+ topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))
+
+ def initialize_connection(self, ctxt, volume, connector):
+ return self.call(ctxt, self.make_msg('initialize_connection',
+ volume_id=volume['id'],
+ connector=connector),
+ topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))
+
+ def terminate_connection(self, ctxt, volume, connector, force=False):
+ return self.call(ctxt, self.make_msg('terminate_connection',
+ volume_id=volume['id'],
+ connector=connector,
+ force=force),
+ topic=rpc.queue_get_for(ctxt, self.topic, volume['host']))