import random
from cinder import exception
+from cinder import flags
from cinder.scheduler import driver
+FLAGS = flags.FLAGS
+
+
class ChanceScheduler(driver.Scheduler):
"""Implements Scheduler as a random node selector."""
return hosts[int(random.random() * len(hosts))]
- def schedule(self, context, topic, method, *_args, **kwargs):
+ def schedule_create_volume(self, context, request_spec, filter_properties):
"""Picks a host that is up at random."""
+ topic = FLAGS.volume_topic
+ host = self._schedule(context, topic, request_spec,
+ filter_properties=filter_properties)
+ volume_id = request_spec['volume_id']
+ snapshot_id = request_spec['snapshot_id']
+ image_id = request_spec['image_id']
- host = self._schedule(context, topic, None, **kwargs)
- driver.cast_to_host(context, topic, host, method, **kwargs)
+ driver.cast_to_host(context, topic, host, 'create_volume',
+ volume_id=volume_id,
+ snapshot_id=snapshot_id,
+ image_id=image_id)
def schedule(self, context, topic, method, *_args, **_kwargs):
"""Must override schedule method for scheduler to work."""
raise NotImplementedError(_("Must implement a fallback schedule"))
+
+ def schedule_create_volume(self, context, request_spec, filter_properties):
+ """Must override schedule method for scheduler to work."""
+ raise NotImplementedError(_("Must implement schedule_create_volume"))
import functools
from cinder import db
+from cinder import exception
from cinder import flags
-from cinder.openstack.common import log as logging
from cinder import manager
from cinder.openstack.common import cfg
from cinder.openstack.common import excutils
from cinder.openstack.common import importutils
+from cinder.openstack.common import log as logging
+from cinder.openstack.common.notifier import api as notifier
LOG = logging.getLogger(__name__)
class SchedulerManager(manager.Manager):
"""Chooses a host to create volumes"""
- RPC_API_VERSION = '1.1'
+ RPC_API_VERSION = '1.2'
def __init__(self, scheduler_driver=None, *args, **kwargs):
if not scheduler_driver:
self.driver = importutils.import_object(scheduler_driver)
super(SchedulerManager, self).__init__(*args, **kwargs)
- def __getattr__(self, key):
- """Converts all method calls to use the schedule method"""
- # NOTE(russellb) Because of what this is doing, we must be careful
- # when changing the API of the scheduler drivers, as that changes
- # the rpc API as well, and the version should be updated accordingly.
- return functools.partial(self._schedule, key)
-
def get_host_list(self, context):
"""Get a list of hosts from the HostManager."""
return self.driver.get_host_list()
self.driver.update_service_capabilities(service_name, host,
capabilities)
- def _schedule(self, method, context, topic, *args, **kwargs):
- """Tries to call schedule_* method on the driver to retrieve host.
- Falls back to schedule(context, topic) if method doesn't exist.
- """
- driver_method_name = 'schedule_%s' % method
+ def create_volume(self, context, topic, volume_id, snapshot_id=None,
+ image_id=None, request_spec=None,
+ filter_properties=None):
try:
- driver_method = getattr(self.driver, driver_method_name)
- args = (context,) + args
- except AttributeError, e:
- LOG.warning(_("Driver Method %(driver_method_name)s missing: "
- "%(e)s. Reverting to schedule()") % locals())
- driver_method = self.driver.schedule
- args = (context, topic, method) + args
-
- try:
- return driver_method(*args, **kwargs)
- except Exception:
+ if request_spec is None:
+ # For RPC version < 1.2 backward compatibility
+ request_spec = {}
+ volume_ref = db.volume_get(context, volume_id)
+ size = volume_ref.get('size')
+ availability_zone = volume_ref.get('availability_zone')
+ volume_type_id = volume_ref.get('volume_type_id')
+ vol_type = db.volume_type_get(context, volume_type_id)
+ volume_properties = {'size': size,
+ 'availability_zone': availability_zone,
+ 'volume_type_id': volume_type_id}
+ request_spec.update({'volume_id': volume_id,
+ 'snapshot_id': snapshot_id,
+ 'image_id': image_id,
+ 'volume_properties': volume_properties,
+ 'volume_type': dict(vol_type).iteritems()})
+
+ self.driver.schedule_create_volume(context, request_spec,
+ filter_properties)
+ except exception.NoValidHost as ex:
+ volume_state = {'volume_state': {'status': 'error'}}
+ self._set_volume_state_and_notify('create_volume',
+ volume_state,
+ context, ex, request_spec)
+ except Exception as ex:
with excutils.save_and_reraise_exception():
- volume_id = kwargs.get('volume_id')
- db.volume_update(context, volume_id, {'status': 'error'})
+ volume_state = {'volume_state': {'status': 'error'}}
+ self._set_volume_state_and_notify('create_volume',
+ volume_state,
+ context, ex, request_spec)
+
+ def _set_volume_state_and_notify(self, method, updates, context, ex,
+ request_spec):
+ LOG.warning(_("Failed to schedule_%(method)s: %(ex)s") % locals())
+
+ volume_state = updates['volume_state']
+ properties = request_spec.get('volume_properties', {})
+
+ volume_id = request_spec.get('volume_id', None)
+
+ if volume_id:
+ db.volume_update(context, volume_id, volume_state)
+
+ payload = dict(request_spec=request_spec,
+ volume_properties=properties,
+ volume_id=volume_id,
+ state=volume_state,
+ method=method,
+ reason=ex)
+
+ notifier.notify(context, notifier.publisher_id("scheduler"),
+ 'scheduler.' + method, notifier.ERROR, payload)
1.0 - Initial version.
1.1 - Add create_volume() method
+ 1.2 - Add request_spec, filter_properties arguments
+ to create_volume()
'''
RPC_API_VERSION = '1.0'
default_version=self.RPC_API_VERSION)
def create_volume(self, ctxt, topic, volume_id, snapshot_id=None,
- image_id=None):
+ image_id=None, request_spec=None,
+ filter_properties=None):
return self.cast(ctxt, self.make_msg('create_volume',
- topic=topic,
- volume_id=volume_id,
- snapshot_id=snapshot_id,
- image_id=image_id),
- topic=None,
- version='1.1')
+ topic=topic,
+ volume_id=volume_id,
+ snapshot_id=snapshot_id,
+ image_id=image_id,
+ request_spec=request_spec,
+ filter_properties=filter_properties),
+ version='1.2')
def update_service_capabilities(self, ctxt, service_name, host,
capabilities):
class SimpleScheduler(chance.ChanceScheduler):
"""Implements Naive Scheduler that tries to find least loaded host."""
- def schedule_create_volume(self, context, volume_id, **_kwargs):
+ def schedule_create_volume(self, context, request_spec, filter_properties):
"""Picks a host that is up and has the fewest volumes."""
elevated = context.elevated()
- volume_ref = db.volume_get(context, volume_id)
- availability_zone = volume_ref.get('availability_zone')
+ volume_id = request_spec.get('volume_id')
+ snapshot_id = request_spec.get('snapshot_id')
+ image_id = request_spec.get('image_id')
+ volume_properties = request_spec.get('volume_properties')
+ volume_size = volume_properties.get('size')
+ availability_zone = volume_properties.get('availability_zone')
zone, host = None, None
if availability_zone:
zone, _x, host = availability_zone.partition(':')
if host and context.is_admin:
- service = db.service_get_by_args(elevated, host, 'cinder-volume')
+ topic = FLAGS.volume_topic
+ service = db.service_get_by_args(elevated, host, topic)
if not utils.service_is_up(service):
raise exception.WillNotSchedule(host=host)
driver.cast_to_volume_host(context, host, 'create_volume',
- volume_id=volume_id, **_kwargs)
+ volume_id=volume_id, snapshot_id=snapshot_id,
+ image_id=image_id)
return None
results = db.service_get_all_volume_sorted(elevated)
if service['availability_zone'] == zone]
for result in results:
(service, volume_gigabytes) = result
- if volume_gigabytes + volume_ref['size'] > FLAGS.max_gigabytes:
+ if volume_gigabytes + volume_size > FLAGS.max_gigabytes:
msg = _("Not enough allocatable volume gigabytes remaining")
raise exception.NoValidHost(reason=msg)
if utils.service_is_up(service) and not service['disabled']:
driver.cast_to_volume_host(context, service['host'],
- 'create_volume', volume_id=volume_id, **_kwargs)
+ 'create_volume', volume_id=volume_id,
+ snapshot_id=snapshot_id, image_id=image_id)
return None
msg = _("Is the appropriate service running?")
raise exception.NoValidHost(reason=msg)
def test_create_volume(self):
self._test_scheduler_api('create_volume',
- rpc_method='cast', topic='fake_topic',
- volume_id='fake_volume_id',
- snapshot_id='fake_snapshot_id',
- image_id='fake_image_id',
- version='1.1')
+ rpc_method='cast', topic='topic', volume_id='volume_id',
+ snapshot_id='snapshot_id', image_id='image_id',
+ request_spec='fake_request_spec',
+ filter_properties='filter_properties',
+ version='1.2')
from cinder import context
from cinder import db
+from cinder import exception
from cinder import flags
from cinder.openstack.common import rpc
from cinder.openstack.common import timeutils
manager = self.manager
self.assertTrue(isinstance(manager.driver, self.driver_cls))
- def test_get_host_list(self):
- expected = 'fake_hosts'
-
- self.mox.StubOutWithMock(self.manager.driver, 'get_host_list')
- self.manager.driver.get_host_list().AndReturn(expected)
-
- self.mox.ReplayAll()
- result = self.manager.get_host_list(self.context)
- self.assertEqual(result, expected)
-
- def test_get_service_capabilities(self):
- expected = 'fake_service_capabs'
-
- self.mox.StubOutWithMock(self.manager.driver,
- 'get_service_capabilities')
- self.manager.driver.get_service_capabilities().AndReturn(
- expected)
-
- self.mox.ReplayAll()
- result = self.manager.get_service_capabilities(self.context)
- self.assertEqual(result, expected)
-
def test_update_service_capabilities(self):
service_name = 'fake_service'
host = 'fake_host'
service_name=service_name, host=host,
capabilities=capabilities)
- def test_existing_method(self):
- def stub_method(self, *args, **kwargs):
- pass
- setattr(self.manager.driver, 'schedule_stub_method', stub_method)
-
- self.mox.StubOutWithMock(self.manager.driver,
- 'schedule_stub_method')
- self.manager.driver.schedule_stub_method(self.context,
- *self.fake_args, **self.fake_kwargs)
+ def test_create_volume_exception_puts_volume_in_error_state(self):
+ """ Test that a NoValideHost exception for create_volume puts
+ the volume in 'error' state and eats the exception.
+ """
+ fake_volume_id = 1
+ self._mox_schedule_method_helper('schedule_create_volume')
+ self.mox.StubOutWithMock(db, 'volume_update')
- self.mox.ReplayAll()
- self.manager.stub_method(self.context, self.topic,
- *self.fake_args, **self.fake_kwargs)
+ topic = 'fake_topic'
+ volume_id = fake_volume_id
+ request_spec = {'volume_id': fake_volume_id}
- def test_missing_method_fallback(self):
- self.mox.StubOutWithMock(self.manager.driver, 'schedule')
- self.manager.driver.schedule(self.context, self.topic,
- 'noexist', *self.fake_args, **self.fake_kwargs)
+ self.manager.driver.schedule_create_volume(self.context,
+ request_spec, {}).AndRaise(exception.NoValidHost(reason=""))
+ db.volume_update(self.context, fake_volume_id, {'status': 'error'})
self.mox.ReplayAll()
- self.manager.noexist(self.context, self.topic,
- *self.fake_args, **self.fake_kwargs)
+ self.manager.create_volume(self.context, topic, volume_id,
+ request_spec=request_spec,
+ filter_properties={})
def _mox_schedule_method_helper(self, method_name):
# Make sure the method exists that we're going to test call
self.context = context.RequestContext('fake_user', 'fake_project')
self.topic = 'fake_topic'
- def test_get_host_list(self):
- expected = 'fake_hosts'
-
- self.mox.StubOutWithMock(self.driver.host_manager, 'get_host_list')
- self.driver.host_manager.get_host_list().AndReturn(expected)
-
- self.mox.ReplayAll()
- result = self.driver.get_host_list()
- self.assertEqual(result, expected)
-
- def test_get_service_capabilities(self):
- expected = 'fake_service_capabs'
-
- self.mox.StubOutWithMock(self.driver.host_manager,
- 'get_service_capabilities')
- self.driver.host_manager.get_service_capabilities().AndReturn(
- expected)
-
- self.mox.ReplayAll()
- result = self.driver.get_service_capabilities()
- self.assertEqual(result, expected)
-
def test_update_service_capabilities(self):
service_name = 'fake_service'
host = 'fake_host'
finally:
QUOTAS.rollback(context, reservations)
- self._cast_create_volume(context, volume['id'], snapshot_id,
- image_id)
+ request_spec = {
+ 'volume_properties': options,
+ 'volume_type': volume_type,
+ 'volume_id': volume['id'],
+ 'snapshot_id': volume['snapshot_id'],
+ 'image_id': image_id
+ }
+
+ filter_properties = {}
+
+ self._cast_create_volume(context, request_spec, filter_properties)
+
return volume
- def _cast_create_volume(self, context, volume_id, snapshot_id,
- image_id):
+ def _cast_create_volume(self, context, request_spec, filter_properties):
# NOTE(Rongze Zhu): It is a simple solution for bug 1008866
# If snapshot_id is set, make the call create volume directly to
# the volume host where the snapshot resides instead of passing it
- # through the scheduer. So snapshot can be copy to new volume.
+ # through the scheduler. So snapshot can be copy to new volume.
+
+ volume_id = request_spec['volume_id']
+ snapshot_id = request_spec['snapshot_id']
+ image_id = request_spec['image_id']
if snapshot_id and FLAGS.snapshot_same_host:
snapshot_ref = self.db.snapshot_get(context, snapshot_id)
self.scheduler_rpcapi.create_volume(context,
FLAGS.volume_topic,
volume_id,
- snapshot_id=snapshot_id,
- image_id=image_id)
+ snapshot_id,
+ image_id,
+ request_spec=request_spec,
+ filter_properties=filter_properties)
@wrap_check_policy
def delete(self, context, volume, force=False):