.. automodule:: cinder.scheduler
:platform: Unix
- :synopsis: Module that picks a compute node to run a VM instance.
+ :synopsis: Module that picks a volume node to create a volume.
.. moduleauthor:: Sandy Walsh <sandy.walsh@rackspace.com>
.. moduleauthor:: Ed Leafe <ed@leafe.com>
.. moduleauthor:: Chris Behrens <cbehrens@codestud.com>
hosts = self._filter_hosts(request_spec, hosts, **kwargs)
if not hosts:
- msg = _("Could not find another compute")
+ msg = _("Could not find another host")
raise exception.NoValidHost(reason=msg)
return hosts[int(random.random() * len(hosts))]
host = self._schedule(context, topic, None, **kwargs)
driver.cast_to_host(context, topic, host, method, **kwargs)
-
- def schedule_run_instance(self, context, request_spec, *_args, **kwargs):
- """Create and run an instance or instances"""
- num_instances = request_spec.get('num_instances', 1)
- instances = []
- for num in xrange(num_instances):
- host = self._schedule(context, 'compute', request_spec, **kwargs)
- request_spec['instance_properties']['launch_index'] = num
- instance = self.create_instance_db_entry(context, request_spec)
- driver.cast_to_compute_host(context, host,
- 'run_instance', instance_uuid=instance['uuid'], **kwargs)
- instances.append(driver.encode_instance(instance))
- # So if we loop around, create_instance_db_entry will actually
- # create a new entry, instead of assume it's been created
- # already
- del request_spec['instance_properties']['uuid']
- return instances
-
- def schedule_prep_resize(self, context, request_spec, *args, **kwargs):
- """Select a target for resize."""
- host = self._schedule(context, 'compute', request_spec, **kwargs)
- driver.cast_to_compute_host(context, host, 'prep_resize', **kwargs)
% locals())
-def encode_instance(instance, local=True):
- """Encode locally created instance for return via RPC"""
- # TODO(comstud): I would love to be able to return the full
- # instance information here, but we'll need some modifications
- # to the RPC code to handle datetime conversions with the
- # json encoding/decoding. We should be able to set a default
- # json handler somehow to do it.
- #
- # For now, I'll just return the instance ID and let the caller
- # do a DB lookup :-/
- if local:
- return dict(id=instance['id'], _is_precooked=False)
- else:
- inst = dict(instance)
- inst['_is_precooked'] = True
- return inst
-
-
class Scheduler(object):
"""The base class that all Scheduler classes should inherit from."""
def schedule(self, context, topic, method, *_args, **_kwargs):
"""Must override schedule method for scheduler to work."""
raise NotImplementedError(_("Must implement a fallback schedule"))
-
- def schedule_prep_resize(self, context, request_spec, *_args, **_kwargs):
- """Must override schedule_prep_resize method for scheduler to work."""
- msg = _("Driver must implement schedule_prep_resize")
- raise NotImplementedError(msg)
-
- def mounted_on_same_shared_storage(self, context, instance_ref, dest):
- """Check if the src and dest host mount same shared storage.
-
- At first, dest host creates temp file, and src host can see
- it if they mounts same shared storage. Then src host erase it.
-
- :param context: security context
- :param instance_ref: cinder.db.sqlalchemy.models.Instance object
- :param dest: destination host
-
- """
-
- src = instance_ref['host']
- dst_t = rpc.queue_get_for(context, FLAGS.compute_topic, dest)
- src_t = rpc.queue_get_for(context, FLAGS.compute_topic, src)
-
- filename = rpc.call(context, dst_t,
- {"method": 'create_shared_storage_test_file'})
-
- try:
- # make sure existence at src host.
- ret = rpc.call(context, src_t,
- {"method": 'check_shared_storage_test_file',
- "args": {'filename': filename}})
-
- finally:
- rpc.cast(context, dst_t,
- {"method": 'cleanup_shared_storage_test_file',
- "args": {'filename': filename}})
-
- return ret
import functools
from cinder import db
-from cinder import exception
from cinder import flags
from cinder.openstack.common import log as logging
from cinder import manager
-from cinder.notifier import api as notifier
from cinder.openstack.common import cfg
from cinder.openstack.common import excutils
from cinder.openstack.common import importutils
class SchedulerManager(manager.Manager):
- """Chooses a host to run instances on."""
+ """Chooses a host to create volumes"""
RPC_API_VERSION = '1.0'
driver_method = self.driver.schedule
args = (context, topic, method) + args
- # Scheduler methods are responsible for casting.
try:
return driver_method(*args, **kwargs)
- except Exception as ex:
+ except Exception:
with excutils.save_and_reraise_exception():
- self._set_vm_state_and_notify(method,
- {'vm_state': vm_states.ERROR},
- context, ex, *args, **kwargs)
-
- def run_instance(self, context, topic, *args, **kwargs):
- """Tries to call schedule_run_instance on the driver.
- Sets instance vm_state to ERROR on exceptions
- """
- args = (context,) + args
- try:
- return self.driver.schedule_run_instance(*args, **kwargs)
- except exception.NoValidHost as ex:
- # don't reraise
- self._set_vm_state_and_notify('run_instance',
- {'vm_state': vm_states.ERROR},
- context, ex, *args, **kwargs)
- except Exception as ex:
- with excutils.save_and_reraise_exception():
- self._set_vm_state_and_notify('run_instance',
- {'vm_state': vm_states.ERROR},
- context, ex, *args, **kwargs)
-
- def prep_resize(self, context, topic, *args, **kwargs):
- """Tries to call schedule_prep_resize on the driver.
- Sets instance vm_state to ACTIVE on NoHostFound
- Sets vm_state to ERROR on other exceptions
- """
- args = (context,) + args
- try:
- return self.driver.schedule_prep_resize(*args, **kwargs)
- except exception.NoValidHost as ex:
- self._set_vm_state_and_notify('prep_resize',
- {'vm_state': vm_states.ACTIVE,
- 'task_state': None},
- context, ex, *args, **kwargs)
- except Exception as ex:
- with excutils.save_and_reraise_exception():
- self._set_vm_state_and_notify('prep_resize',
- {'vm_state': vm_states.ERROR},
- context, ex, *args, **kwargs)
-
- def _set_vm_state_and_notify(self, method, updates, context, ex,
- *args, **kwargs):
- """changes VM state and notifies"""
- # FIXME(comstud): Re-factor this somehow. Not sure this belongs in the
- # scheduler manager like this. We should make this easier.
- # run_instance only sends a request_spec, and an instance may or may
- # not have been created in the API (or scheduler) already. If it was
- # created, there's a 'uuid' set in the instance_properties of the
- # request_spec.
- # (littleidea): I refactored this a bit, and I agree
- # it should be easier :)
- # The refactoring could go further but trying to minimize changes
- # for essex timeframe
-
- LOG.warning(_("Failed to schedule_%(method)s: %(ex)s") % locals())
-
- vm_state = updates['vm_state']
- request_spec = kwargs.get('request_spec', {})
- properties = request_spec.get('instance_properties', {})
- instance_uuid = properties.get('uuid', {})
-
- if instance_uuid:
- state = vm_state.upper()
- LOG.warning(_('Setting instance to %(state)s state.'), locals(),
- instance_uuid=instance_uuid)
- db.instance_update(context, instance_uuid, updates)
-
- payload = dict(request_spec=request_spec,
- instance_properties=properties,
- instance_id=instance_uuid,
- state=vm_state,
- method=method,
- reason=ex)
-
- notifier.notify(notifier.publisher_id("scheduler"),
- 'scheduler.' + method, notifier.ERROR, payload)
-
- # NOTE (masumotok) : This method should be moved to cinder.api.ec2.admin.
- # Based on bexar design summit discussion,
- # just put this here for bexar release.
- def show_host_resources(self, context, host):
- """Shows the physical/usage resource given by hosts.
-
- :param context: security context
- :param host: hostname
- :returns:
- example format is below::
-
- {'resource':D, 'usage':{proj_id1:D, proj_id2:D}}
- D: {'vcpus': 3, 'memory_mb': 2048, 'local_gb': 2048,
- 'vcpus_used': 12, 'memory_mb_used': 10240,
- 'local_gb_used': 64}
-
- """
- # Getting compute node info and related instances info
- compute_ref = db.service_get_all_compute_by_host(context, host)
- compute_ref = compute_ref[0]
-
- # Getting total available/used resource
- compute_ref = compute_ref['compute_node'][0]
- resource = {'vcpus': compute_ref['vcpus'],
- 'memory_mb': compute_ref['memory_mb'],
- 'local_gb': compute_ref['local_gb'],
- 'vcpus_used': compute_ref['vcpus_used'],
- 'memory_mb_used': compute_ref['memory_mb_used'],
- 'local_gb_used': compute_ref['local_gb_used']}
- usage = dict()
-
- return {'resource': resource, 'usage': usage}
+ volume_id = kwargs.get('volume_id')
+ db.volume_update(context, volume_id, {'status': 'error'})
simple_scheduler_opts = [
- cfg.IntOpt("max_cores",
- default=16,
- help="maximum number of instance cores to allow per host"),
cfg.IntOpt("max_gigabytes",
default=10000,
help="maximum number of volume gigabytes to allow per host"),
- cfg.IntOpt("max_networks",
- default=1000,
- help="maximum number of networks to allow per host"),
- cfg.BoolOpt('skip_isolated_core_check',
- default=True,
- help='Allow overcommitting vcpus on isolated hosts'),
]
FLAGS = flags.FLAGS
class SimpleScheduler(chance.ChanceScheduler):
"""Implements Naive Scheduler that tries to find least loaded host."""
- def _schedule_instance(self, context, instance_opts, *_args, **_kwargs):
- """Picks a host that is up and has the fewest running instances."""
- elevated = context.elevated()
-
- availability_zone = instance_opts.get('availability_zone')
-
- zone, host = FLAGS.default_schedule_zone, None
- if availability_zone:
- zone, _x, host = availability_zone.partition(':')
-
- if host and context.is_admin:
- service = db.service_get_by_args(elevated, host, 'cinder-compute')
- if not utils.service_is_up(service):
- raise exception.WillNotSchedule(host=host)
- return host
-
- results = db.service_get_all_compute_sorted(elevated)
- in_isolation = instance_opts['image_ref'] in FLAGS.isolated_images
- check_cores = not in_isolation or not FLAGS.skip_isolated_core_check
- if zone:
- results = [(service, cores) for (service, cores) in results
- if service['availability_zone'] == zone]
- for result in results:
- (service, instance_cores) = result
- if in_isolation and service['host'] not in FLAGS.isolated_hosts:
- # isloated images run on isolated hosts
- continue
- if service['host'] in FLAGS.isolated_hosts and not in_isolation:
- # images that aren't isolated only run on general hosts
- continue
- if (check_cores and
- instance_cores + instance_opts['vcpus'] > FLAGS.max_cores):
- msg = _("Not enough allocatable CPU cores remaining")
- raise exception.NoValidHost(reason=msg)
- if utils.service_is_up(service) and not service['disabled']:
- return service['host']
- msg = _("Is the appropriate service running?")
- raise exception.NoValidHost(reason=msg)
-
- def schedule_run_instance(self, context, request_spec, *_args, **_kwargs):
- num_instances = request_spec.get('num_instances', 1)
- instances = []
- for num in xrange(num_instances):
- host = self._schedule_instance(context,
- request_spec['instance_properties'], *_args, **_kwargs)
- request_spec['instance_properties']['launch_index'] = num
- instance_ref = self.create_instance_db_entry(context,
- request_spec)
- driver.cast_to_compute_host(context, host, 'run_instance',
- instance_uuid=instance_ref['uuid'], **_kwargs)
- instances.append(driver.encode_instance(instance_ref))
- # So if we loop around, create_instance_db_entry will actually
- # create a new entry, instead of assume it's been created
- # already
- del request_spec['instance_properties']['uuid']
- return instances
-
def schedule_create_volume(self, context, volume_id, *_args, **_kwargs):
"""Picks a host that is up and has the fewest volumes."""
elevated = context.elevated()
expected_retval = 'foo' if method == 'call' else None
expected_msg = rpcapi.make_msg(method, **kwargs)
expected_msg['version'] = rpcapi.RPC_API_VERSION
- if rpc_method == 'cast' and method == 'run_instance':
- kwargs['call'] = False
self.fake_args = None
self.fake_kwargs = None
orig_rpc_call = rpc.call
def rpc_call_wrapper(context, topic, msg, timeout=None):
- """Stub out the scheduler creating the instance entry"""
- if (topic == FLAGS.scheduler_topic and
- msg['method'] == 'run_instance'):
- scheduler = scheduler_driver.Scheduler
- instance = scheduler().create_instance_db_entry(
- context,
- msg['args']['request_spec'])
- return [scheduler_driver.encode_instance(instance)]
- else:
- return orig_rpc_call(context, topic, msg)
+ return orig_rpc_call(context, topic, msg)
self.stubs.Set(rpc, 'call', rpc_call_wrapper)
######### defined in cinder.scheduler.simple #########
-###### (IntOpt) maximum number of instance cores to allow per host
-# max_cores=16
###### (IntOpt) maximum number of volume gigabytes to allow per host
# max_gigabytes=10000
-###### (IntOpt) maximum number of networks to allow per host
-# max_networks=1000
-###### (BoolOpt) Allow overcommitting vcpus on isolated hosts
-# skip_isolated_core_check=true
######### defined in cinder.volume.driver #########