Part of blueprint versioned-rpc-apis.
One side effect of this change was that cinder.scheduler.api was removed
in favor of cinder.scheduler.rpcapi. In this case, the api was just a
direct wrapper around rpc usage. For other APIs, I've been following
the pattern that the rpcapi module provides the rpc client wrapper, and
if any other client-side logic is needed, that's where an api module is
used.
Change-Id: Ibd0292936f9afc77aeb5d040660bfa857861eed1
from cinder import flags
from cinder import log as logging
from cinder.rpc import dispatcher as rpc_dispatcher
-from cinder.scheduler import api
+from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import version
def __init__(self, host=None, db_driver=None, service_name='undefined'):
self.last_capabilities = None
self.service_name = service_name
+ self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
super(SchedulerDependentManager, self).__init__(host, db_driver)
def update_service_capabilities(self, capabilities):
"""Pass data back to the scheduler at a periodic interval."""
if self.last_capabilities:
LOG.debug(_('Notifying Schedulers of capabilities ...'))
- api.update_service_capabilities(context, self.service_name,
- self.host, self.last_capabilities)
+ self.scheduler_rpcapi.update_service_capabilities(context,
+ self.service_name, self.host, self.last_capabilities)
+++ /dev/null
-# Copyright (c) 2011 OpenStack, LLC.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""
-Handles all requests relating to schedulers.
-"""
-
-from cinder import flags
-from cinder import log as logging
-from cinder import rpc
-
-FLAGS = flags.FLAGS
-LOG = logging.getLogger(__name__)
-
-
-def _call_scheduler(method, context, params=None):
- """Generic handler for RPC calls to the scheduler.
-
- :param params: Optional dictionary of arguments to be passed to the
- scheduler worker
-
- :retval: Result returned by scheduler worker
- """
- if not params:
- params = {}
- queue = FLAGS.scheduler_topic
- kwargs = {'method': method, 'args': params}
- return rpc.call(context, queue, kwargs)
-
-
-def get_host_list(context):
- """Return a list of hosts associated with this zone."""
- return _call_scheduler('get_host_list', context)
-
-
-def get_service_capabilities(context):
- """Return aggregated capabilities for all services."""
- return _call_scheduler('get_service_capabilities', context)
-
-
-def update_service_capabilities(context, service_name, host, capabilities):
- """Send an update to all the scheduler services informing them
- of the capabilities of this service."""
- kwargs = dict(method='update_service_capabilities',
- args=dict(service_name=service_name, host=host,
- capabilities=capabilities))
- return rpc.fanout_cast(context, 'scheduler', kwargs)
-
-
-def live_migration(context, block_migration, disk_over_commit,
- instance_id, dest, topic):
- """Migrate a server to a new host"""
- params = {"instance_id": instance_id,
- "dest": dest,
- "topic": topic,
- "block_migration": block_migration,
- "disk_over_commit": disk_over_commit}
- # NOTE(comstud): Call vs cast so we can get exceptions back, otherwise
- # this call in the scheduler driver doesn't return anything.
- _call_scheduler("live_migration", context=context, params=params)
class SchedulerManager(manager.Manager):
"""Chooses a host to run instances on."""
+ RPC_API_VERSION = '1.0'
+
def __init__(self, scheduler_driver=None, *args, **kwargs):
if not scheduler_driver:
scheduler_driver = FLAGS.scheduler_driver
def __getattr__(self, key):
"""Converts all method calls to use the schedule method"""
+ # NOTE(russellb) Because of what this is doing, we must be careful
+ # when changing the API of the scheduler drivers, as that changes
+ # the rpc API as well, and the version should be updated accordingly.
return functools.partial(self._schedule, key)
def get_host_list(self, context):
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012, Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Client side of the scheduler manager RPC API.
+"""
+
+from cinder import flags
+import cinder.rpc.proxy
+
+
+FLAGS = flags.FLAGS
+
+
+class SchedulerAPI(cinder.rpc.proxy.RpcProxy):
+ '''Client side of the scheduler rpc API.
+
+ API version history:
+
+ 1.0 - Initial version.
+ '''
+
+ RPC_API_VERSION = '1.0'
+
+ def __init__(self):
+ super(SchedulerAPI, self).__init__(topic=FLAGS.scheduler_topic,
+ default_version=self.RPC_API_VERSION)
+
+ def run_instance(self, ctxt, topic, request_spec, admin_password,
+ injected_files, requested_networks, is_first_time,
+ filter_properties, call=True):
+ rpc_method = self.call if call else self.cast
+ return rpc_method(ctxt, self.make_msg('run_instance', topic=topic,
+ request_spec=request_spec, admin_password=admin_password,
+ injected_files=injected_files,
+ requested_networks=requested_networks,
+ is_first_time=is_first_time,
+ filter_properties=filter_properties))
+
+ def prep_resize(self, ctxt, topic, instance_uuid, instance_type_id, image,
+ update_db, request_spec, filter_properties):
+ self.cast(ctxt, self.make_msg('prep_resize', topic=topic,
+ instance_uuid=instance_uuid, instance_type_id=instance_type_id,
+ image=image, update_db=update_db, request_spec=request_spec,
+ filter_properties=filter_properties))
+
+ def show_host_resources(self, ctxt, host):
+ return self.call(ctxt, self.make_msg('show_host_resources', host=host))
+
+ def live_migration(self, ctxt, block_migration, disk_over_commit,
+ instance_id, dest, topic):
+ # NOTE(comstud): Call vs cast so we can get exceptions back, otherwise
+ # this call in the scheduler driver doesn't return anything.
+ return self.call(ctxt, self.make_msg('live_migration',
+ block_migration=block_migration,
+ disk_over_commit=disk_over_commit, instance_id=instance_id,
+ dest=dest, topic=topic))
+
+ def update_service_capabilities(self, ctxt, service_name, host,
+ capabilities):
+ self.fanout_cast(ctxt, self.make_msg('update_service_capabilities',
+ service_name=service_name, host=host,
+ capabilities=capabilities))
+
+ def get_host_list(self, ctxt):
+ return self.call(ctxt, self.make_msg('get_host_list'))
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012, Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Unit Tests for cinder.scheduler.rpcapi
+"""
+
+from cinder import context
+from cinder import flags
+from cinder import rpc
+from cinder.scheduler import rpcapi as scheduler_rpcapi
+from cinder import test
+
+
+FLAGS = flags.FLAGS
+
+
+class SchedulerRpcAPITestCase(test.TestCase):
+
+ def setUp(self):
+ super(SchedulerRpcAPITestCase, self).setUp()
+
+ def tearDown(self):
+ super(SchedulerRpcAPITestCase, self).tearDown()
+
+ def _test_scheduler_api(self, method, rpc_method, **kwargs):
+ ctxt = context.RequestContext('fake_user', 'fake_project')
+ rpcapi = scheduler_rpcapi.SchedulerAPI()
+ expected_retval = 'foo' if method == 'call' else None
+ expected_msg = rpcapi.make_msg(method, **kwargs)
+ expected_msg['version'] = rpcapi.RPC_API_VERSION
+ if rpc_method == 'cast' and method == 'run_instance':
+ kwargs['call'] = False
+
+ self.fake_args = None
+ self.fake_kwargs = None
+
+ def _fake_rpc_method(*args, **kwargs):
+ self.fake_args = args
+ self.fake_kwargs = kwargs
+ if expected_retval:
+ return expected_retval
+
+ self.stubs.Set(rpc, rpc_method, _fake_rpc_method)
+
+ retval = getattr(rpcapi, method)(ctxt, **kwargs)
+
+ self.assertEqual(retval, expected_retval)
+ expected_args = [ctxt, FLAGS.scheduler_topic, expected_msg]
+ for arg, expected_arg in zip(self.fake_args, expected_args):
+ self.assertEqual(arg, expected_arg)
+
+ def test_run_instance_call(self):
+ self._test_scheduler_api('run_instance', rpc_method='call',
+ topic='fake_topic', request_spec='fake_request_spec',
+ admin_password='pw', injected_files='fake_injected_files',
+ requested_networks='fake_requested_networks',
+ is_first_time=True, filter_properties='fake_filter_properties')
+
+ def test_run_instance_cast(self):
+ self._test_scheduler_api('run_instance', rpc_method='cast',
+ topic='fake_topic', request_spec='fake_request_spec',
+ admin_password='pw', injected_files='fake_injected_files',
+ requested_networks='fake_requested_networks',
+ is_first_time=True, filter_properties='fake_filter_properties')
+
+ def test_prep_resize(self):
+ self._test_scheduler_api('prep_resize', rpc_method='cast',
+ topic='fake_topic', instance_uuid='fake_uuid',
+ instance_type_id='fake_type_id', image='fake_image',
+ update_db='fake_update_db', request_spec='fake_request_spec',
+ filter_properties='fake_props')
+
+ def test_show_host_resources(self):
+ self._test_scheduler_api('show_host_resources', rpc_method='call',
+ host='fake_host')
+
+ def test_live_migration(self):
+ self._test_scheduler_api('live_migration', rpc_method='call',
+ block_migration='fake_block_migration',
+ disk_over_commit='fake_disk_over_commit',
+ instance_id='fake_id', dest='fake_dest', topic='fake_topic')
+
+ def test_update_service_capabilities(self):
+ self._test_scheduler_api('update_service_capabilities',
+ rpc_method='fanout_cast', service_name='fake_name',
+ host='fake_host', capabilities='fake_capabilities')
+
+ def test_get_host_list(self):
+ self._test_scheduler_api('get_host_list', rpc_method='call')
is_admin=True)
orig_rpc_call = rpc.call
- def rpc_call_wrapper(context, topic, msg):
+ def rpc_call_wrapper(context, topic, msg, timeout=None):
"""Stub out the scheduler creating the instance entry"""
if (topic == FLAGS.scheduler_topic and
msg['method'] == 'run_instance'):