class SchedulerManager(manager.Manager):
"""Chooses a host to create volumes"""
- RPC_API_VERSION = '1.0'
+ RPC_API_VERSION = '1.1'
def __init__(self, scheduler_driver=None, *args, **kwargs):
if not scheduler_driver:
API version history:
1.0 - Initial version.
+ 1.1 - Add create_volume() method
'''
RPC_API_VERSION = '1.0'
super(SchedulerAPI, self).__init__(topic=FLAGS.scheduler_topic,
default_version=self.RPC_API_VERSION)
+ def create_volume(self, ctxt, topic, volume_id, snapshot_id=None,
+ image_id=None):
+ return self.cast(ctxt, self.make_msg('create_volume',
+ topic=topic,
+ volume_id=volume_id,
+ snapshot_id=snapshot_id,
+ image_id=image_id),
+ topic=None,
+ version='1.1')
+
def update_service_capabilities(self, ctxt, service_name, host,
capabilities):
self.fanout_cast(ctxt, self.make_msg('update_service_capabilities',
ctxt = context.RequestContext('fake_user', 'fake_project')
rpcapi = scheduler_rpcapi.SchedulerAPI()
expected_retval = 'foo' if method == 'call' else None
+ expected_version = kwargs.pop('version', rpcapi.RPC_API_VERSION)
expected_msg = rpcapi.make_msg(method, **kwargs)
- expected_msg['version'] = rpcapi.RPC_API_VERSION
+ expected_msg['version'] = expected_version
self.fake_args = None
self.fake_kwargs = None
self._test_scheduler_api('update_service_capabilities',
rpc_method='fanout_cast', service_name='fake_name',
host='fake_host', capabilities='fake_capabilities')
+
+ def test_create_volume(self):
+ self._test_scheduler_api('create_volume',
+ rpc_method='cast', topic='fake_topic',
+ volume_id='fake_volume_id',
+ snapshot_id='fake_snapshot_id',
+ image_id='fake_image_id',
+ version='1.1')
from cinder.volume import volume_types
import cinder.policy
from cinder import quota
-
+from cinder.scheduler import rpcapi as scheduler_rpcapi
+from cinder.volume import volume_types
volume_host_opt = cfg.BoolOpt('snapshot_same_host',
default=True,
def __init__(self, db_driver=None, image_service=None):
self.image_service = (image_service or
glance.get_default_image_service())
+ self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
super(API, self).__init__(db_driver)
def create(self, context, size, name, description, snapshot=None,
topic = rpc.queue_get_for(context,
FLAGS.volume_topic,
src_volume_ref['host'])
+ # bypass scheduler and send request directly to volume
rpc.cast(context,
topic,
{"method": "create_volume",
"snapshot_id": snapshot_id,
"image_id": image_id}})
else:
- rpc.cast(context,
- FLAGS.scheduler_topic,
- {"method": "create_volume",
- "args": {"topic": FLAGS.volume_topic,
- "volume_id": volume_id,
- "snapshot_id": snapshot_id,
- "image_id": image_id}})
+ self.scheduler_rpcapi.create_volume(context,
+ FLAGS.volume_topic,
+ volume_id,
+ snapshot_id=snapshot_id,
+ image_id=image_id)
@wrap_check_policy
def delete(self, context, volume, force=False):