From: Russell Bryant Date: Tue, 29 May 2012 20:35:35 +0000 (-0400) Subject: Move queue_get_for() from db to rpc. X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=22a9da3cc4b0cc17d8d3483b95dc57ed43bf0ecb;p=openstack-build%2Fcinder-build.git Move queue_get_for() from db to rpc. Part of blueprint common-rpc. The function queue_get_for() is a utility function used by various consumers of the rpc API. This function lived in the db API, but never ended up using anything from the database. This patch moves it into the rpc API so that it can be used by other users of rpc once it moves into openstack-common. Change-Id: If92675beecff5471b416a929c161b810e3c71939 Reviewed-on: https://review.openstack.org/9906 Reviewed-by: Vish Ishaya Approved: John Griffith Tested-by: Jenkins --- diff --git a/bin/cinder-manage b/bin/cinder-manage index 97bee2af6..b7c42628e 100755 --- a/bin/cinder-manage +++ b/bin/cinder-manage @@ -259,7 +259,7 @@ class VolumeCommands(object): return rpc.cast(ctxt, - db.queue_get_for(ctxt, FLAGS.volume_topic, host), + rpc.queue_get_for(ctxt, FLAGS.volume_topic, host), {"method": "delete_volume", "args": {"volume_id": volume['id']}}) @@ -277,7 +277,7 @@ class VolumeCommands(object): instance = db.instance_get(ctxt, volume['instance_id']) host = instance['host'] rpc.cast(ctxt, - db.queue_get_for(ctxt, FLAGS.compute_topic, host), + rpc.queue_get_for(ctxt, FLAGS.compute_topic, host), {"method": "attach_volume", "args": {"instance_id": instance['id'], "volume_id": volume['id'], diff --git a/cinder/db/api.py b/cinder/db/api.py index 8b736875c..5e49a5edd 100644 --- a/cinder/db/api.py +++ b/cinder/db/api.py @@ -184,14 +184,6 @@ def migration_get_all_unconfirmed(context, confirm_window): ################### -def queue_get_for(context, topic, physical_node_id): - """Return a channel to send a message to a node with a topic.""" - return IMPL.queue_get_for(context, topic, physical_node_id) - - -################### - - def iscsi_target_count_by_host(context, host): """Return count of export devices.""" return IMPL.iscsi_target_count_by_host(context, host) diff --git a/cinder/db/sqlalchemy/api.py b/cinder/db/sqlalchemy/api.py index 60954de7c..fbad55e2e 100644 --- a/cinder/db/sqlalchemy/api.py +++ b/cinder/db/sqlalchemy/api.py @@ -361,14 +361,6 @@ def _dict_with_extra_specs(inst_type_query): ################### -def queue_get_for(context, topic, physical_node_id): - # FIXME(ja): this should be servername? - return "%s.%s" % (topic, physical_node_id) - - -################### - - @require_admin_context def iscsi_target_count_by_host(context, host): return model_query(context, models.IscsiTarget).\ diff --git a/cinder/rpc/__init__.py b/cinder/rpc/__init__.py index becdefa9e..4512e5ec1 100644 --- a/cinder/rpc/__init__.py +++ b/cinder/rpc/__init__.py @@ -224,6 +224,11 @@ def fanout_cast_to_server(context, server_params, topic, msg): topic, msg) +def queue_get_for(context, topic, host): + """Get a queue name for a given topic + host.""" + return '%s.%s' % (topic, host) + + _RPCIMPL = None diff --git a/cinder/scheduler/driver.py b/cinder/scheduler/driver.py index 49420d848..b844e2801 100644 --- a/cinder/scheduler/driver.py +++ b/cinder/scheduler/driver.py @@ -54,8 +54,8 @@ def cast_to_volume_host(context, host, method, update_db=True, **kwargs): db.volume_update(context, volume_id, {'host': host, 'scheduled_at': now}) rpc.cast(context, - db.queue_get_for(context, FLAGS.volume_topic, host), - {"method": method, "args": kwargs}) + rpc.queue_get_for(context, FLAGS.volume_topic, host), + {"method": method, "args": kwargs}) LOG.debug(_("Casted '%(method)s' to host '%(host)s'") % locals()) @@ -70,8 +70,8 @@ def cast_to_host(context, topic, host, method, update_db=True, **kwargs): func(context, host, method, update_db=update_db, **kwargs) else: rpc.cast(context, - db.queue_get_for(context, topic, host), - {"method": method, "args": kwargs}) + rpc.queue_get_for(context, topic, host), + {"method": method, "args": kwargs}) LOG.debug(_("Casted '%(method)s' to %(topic)s '%(host)s'") % locals()) @@ -145,8 +145,8 @@ class Scheduler(object): """ src = instance_ref['host'] - dst_t = db.queue_get_for(context, FLAGS.compute_topic, dest) - src_t = db.queue_get_for(context, FLAGS.compute_topic, src) + dst_t = rpc.queue_get_for(context, FLAGS.compute_topic, dest) + src_t = rpc.queue_get_for(context, FLAGS.compute_topic, src) filename = rpc.call(context, dst_t, {"method": 'create_shared_storage_test_file'}) diff --git a/cinder/tests/scheduler/test_scheduler.py b/cinder/tests/scheduler/test_scheduler.py index b45413c66..0575d0c11 100644 --- a/cinder/tests/scheduler/test_scheduler.py +++ b/cinder/tests/scheduler/test_scheduler.py @@ -239,13 +239,13 @@ class SchedulerDriverModuleTestCase(test.TestCase): self.mox.StubOutWithMock(timeutils, 'utcnow') self.mox.StubOutWithMock(db, 'volume_update') - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') timeutils.utcnow().AndReturn('fake-now') db.volume_update(self.context, 31337, {'host': host, 'scheduled_at': 'fake-now'}) - db.queue_get_for(self.context, + rpc.queue_get_for(self.context, FLAGS.volume_topic, host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, @@ -261,10 +261,10 @@ class SchedulerDriverModuleTestCase(test.TestCase): fake_kwargs = {'extra_arg': 'meow'} queue = 'fake_queue' - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') - db.queue_get_for(self.context, + rpc.queue_get_for(self.context, FLAGS.volume_topic, host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, @@ -280,10 +280,10 @@ class SchedulerDriverModuleTestCase(test.TestCase): fake_kwargs = {'extra_arg': 'meow'} queue = 'fake_queue' - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') - db.queue_get_for(self.context, + rpc.queue_get_for(self.context, FLAGS.volume_topic, host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, @@ -313,10 +313,10 @@ class SchedulerDriverModuleTestCase(test.TestCase): topic = 'unknown' queue = 'fake_queue' - self.mox.StubOutWithMock(db, 'queue_get_for') + self.mox.StubOutWithMock(rpc, 'queue_get_for') self.mox.StubOutWithMock(rpc, 'cast') - db.queue_get_for(self.context, topic, host).AndReturn(queue) + rpc.queue_get_for(self.context, topic, host).AndReturn(queue) rpc.cast(self.context, queue, {'method': method, 'args': fake_kwargs}) diff --git a/cinder/volume/api.py b/cinder/volume/api.py index 2dfe43679..47e330650 100644 --- a/cinder/volume/api.py +++ b/cinder/volume/api.py @@ -148,7 +148,7 @@ class API(base.Base): 'terminated_at': now}) host = volume['host'] rpc.cast(context, - self.db.queue_get_for(context, FLAGS.volume_topic, host), + rpc.queue_get_for(context, FLAGS.volume_topic, host), {"method": "delete_volume", "args": {"volume_id": volume_id}}) @@ -234,7 +234,7 @@ class API(base.Base): def remove_from_compute(self, context, volume, instance_id, host): """Remove volume from specified compute host.""" rpc.call(context, - self.db.queue_get_for(context, FLAGS.compute_topic, host), + rpc.queue_get_for(context, FLAGS.compute_topic, host), {"method": "remove_volume_connection", "args": {'instance_id': instance_id, 'volume_id': volume['id']}}) @@ -251,7 +251,7 @@ class API(base.Base): @wrap_check_policy def attach(self, context, volume, instance_uuid, mountpoint): host = volume['host'] - queue = self.db.queue_get_for(context, FLAGS.volume_topic, host) + queue = rpc.queue_get_for(context, FLAGS.volume_topic, host) return rpc.call(context, queue, {"method": "attach_volume", "args": {"volume_id": volume['id'], @@ -261,7 +261,7 @@ class API(base.Base): @wrap_check_policy def detach(self, context, volume): host = volume['host'] - queue = self.db.queue_get_for(context, FLAGS.volume_topic, host) + queue = rpc.queue_get_for(context, FLAGS.volume_topic, host) return rpc.call(context, queue, {"method": "detach_volume", "args": {"volume_id": volume['id']}}) @@ -269,7 +269,7 @@ class API(base.Base): @wrap_check_policy def initialize_connection(self, context, volume, connector): host = volume['host'] - queue = self.db.queue_get_for(context, FLAGS.volume_topic, host) + queue = rpc.queue_get_for(context, FLAGS.volume_topic, host) return rpc.call(context, queue, {"method": "initialize_connection", "args": {"volume_id": volume['id'], @@ -279,7 +279,7 @@ class API(base.Base): def terminate_connection(self, context, volume, connector): self.unreserve_volume(context, volume) host = volume['host'] - queue = self.db.queue_get_for(context, FLAGS.volume_topic, host) + queue = rpc.queue_get_for(context, FLAGS.volume_topic, host) return rpc.call(context, queue, {"method": "terminate_connection", "args": {"volume_id": volume['id'], @@ -306,7 +306,7 @@ class API(base.Base): snapshot = self.db.snapshot_create(context, options) host = volume['host'] rpc.cast(context, - self.db.queue_get_for(context, FLAGS.volume_topic, host), + rpc.queue_get_for(context, FLAGS.volume_topic, host), {"method": "create_snapshot", "args": {"volume_id": volume['id'], "snapshot_id": snapshot['id']}}) @@ -330,7 +330,7 @@ class API(base.Base): volume = self.db.volume_get(context, snapshot['volume_id']) host = volume['host'] rpc.cast(context, - self.db.queue_get_for(context, FLAGS.volume_topic, host), + rpc.queue_get_for(context, FLAGS.volume_topic, host), {"method": "delete_snapshot", "args": {"snapshot_id": snapshot['id']}})