]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Move queue_get_for() from db to rpc.
authorRussell Bryant <rbryant@redhat.com>
Tue, 29 May 2012 20:35:35 +0000 (16:35 -0400)
committerJenkins <jenkins@review.openstack.org>
Wed, 18 Jul 2012 17:27:56 +0000 (17:27 +0000)
Part of blueprint common-rpc.

The function queue_get_for() is a utility function used by various
consumers of the rpc API.  This function lived in the db API, but never
ended up using anything from the database.  This patch moves it into the
rpc API so that it can be used by other users of rpc once it moves into
openstack-common.

Change-Id: If92675beecff5471b416a929c161b810e3c71939
Reviewed-on: https://review.openstack.org/9906
Reviewed-by: Vish Ishaya <vishvananda@gmail.com>
Approved: John Griffith <john.griffith@solidfire.com>
Tested-by: Jenkins
bin/cinder-manage
cinder/db/api.py
cinder/db/sqlalchemy/api.py
cinder/rpc/__init__.py
cinder/scheduler/driver.py
cinder/tests/scheduler/test_scheduler.py
cinder/volume/api.py

index 97bee2af60e5336a559d2e3a4e33f8f1a102a6c3..b7c42628e0dedbd97c51f288564a58565b85e3b4 100755 (executable)
@@ -259,7 +259,7 @@ class VolumeCommands(object):
             return
 
         rpc.cast(ctxt,
-                 db.queue_get_for(ctxt, FLAGS.volume_topic, host),
+                 rpc.queue_get_for(ctxt, FLAGS.volume_topic, host),
                  {"method": "delete_volume",
                   "args": {"volume_id": volume['id']}})
 
@@ -277,7 +277,7 @@ class VolumeCommands(object):
         instance = db.instance_get(ctxt, volume['instance_id'])
         host = instance['host']
         rpc.cast(ctxt,
-                 db.queue_get_for(ctxt, FLAGS.compute_topic, host),
+                 rpc.queue_get_for(ctxt, FLAGS.compute_topic, host),
                  {"method": "attach_volume",
                   "args": {"instance_id": instance['id'],
                            "volume_id": volume['id'],
index 8b736875c65122eb3088735885821935ed72a88c..5e49a5eddc4cbfc5c79a715ec59321c4381c7937 100644 (file)
@@ -184,14 +184,6 @@ def migration_get_all_unconfirmed(context, confirm_window):
 ###################
 
 
-def queue_get_for(context, topic, physical_node_id):
-    """Return a channel to send a message to a node with a topic."""
-    return IMPL.queue_get_for(context, topic, physical_node_id)
-
-
-###################
-
-
 def iscsi_target_count_by_host(context, host):
     """Return count of export devices."""
     return IMPL.iscsi_target_count_by_host(context, host)
index 60954de7c8d686b316f0676ad48d71adc0d77cf1..fbad55e2e07093f9e47b6ce136bc27c7520b82ab 100644 (file)
@@ -361,14 +361,6 @@ def _dict_with_extra_specs(inst_type_query):
 ###################
 
 
-def queue_get_for(context, topic, physical_node_id):
-    # FIXME(ja): this should be servername?
-    return "%s.%s" % (topic, physical_node_id)
-
-
-###################
-
-
 @require_admin_context
 def iscsi_target_count_by_host(context, host):
     return model_query(context, models.IscsiTarget).\
index becdefa9ed81e7ef3ba8914560bc7139995ad51b..4512e5ec162b8dc716b85a58e854f8030d4581e6 100644 (file)
@@ -224,6 +224,11 @@ def fanout_cast_to_server(context, server_params, topic, msg):
                                              topic, msg)
 
 
+def queue_get_for(context, topic, host):
+    """Get a queue name for a given topic + host."""
+    return '%s.%s' % (topic, host)
+
+
 _RPCIMPL = None
 
 
index 49420d84877b04fb3363a71d28a3365cc62def8f..b844e28013deaf074a219c94550edc6fe3c3cd49 100644 (file)
@@ -54,8 +54,8 @@ def cast_to_volume_host(context, host, method, update_db=True, **kwargs):
             db.volume_update(context, volume_id,
                     {'host': host, 'scheduled_at': now})
     rpc.cast(context,
-            db.queue_get_for(context, FLAGS.volume_topic, host),
-            {"method": method, "args": kwargs})
+             rpc.queue_get_for(context, FLAGS.volume_topic, host),
+             {"method": method, "args": kwargs})
     LOG.debug(_("Casted '%(method)s' to host '%(host)s'") % locals())
 
 
@@ -70,8 +70,8 @@ def cast_to_host(context, topic, host, method, update_db=True, **kwargs):
         func(context, host, method, update_db=update_db, **kwargs)
     else:
         rpc.cast(context,
-            db.queue_get_for(context, topic, host),
-                {"method": method, "args": kwargs})
+                 rpc.queue_get_for(context, topic, host),
+                 {"method": method, "args": kwargs})
         LOG.debug(_("Casted '%(method)s' to %(topic)s '%(host)s'")
                 % locals())
 
@@ -145,8 +145,8 @@ class Scheduler(object):
         """
 
         src = instance_ref['host']
-        dst_t = db.queue_get_for(context, FLAGS.compute_topic, dest)
-        src_t = db.queue_get_for(context, FLAGS.compute_topic, src)
+        dst_t = rpc.queue_get_for(context, FLAGS.compute_topic, dest)
+        src_t = rpc.queue_get_for(context, FLAGS.compute_topic, src)
 
         filename = rpc.call(context, dst_t,
                             {"method": 'create_shared_storage_test_file'})
index b45413c6627b954bec8c3974ecc18bcd11d3de03..0575d0c11e23accfd2da578782b4d66b5df75010 100644 (file)
@@ -239,13 +239,13 @@ class SchedulerDriverModuleTestCase(test.TestCase):
 
         self.mox.StubOutWithMock(timeutils, 'utcnow')
         self.mox.StubOutWithMock(db, 'volume_update')
-        self.mox.StubOutWithMock(db, 'queue_get_for')
+        self.mox.StubOutWithMock(rpc, 'queue_get_for')
         self.mox.StubOutWithMock(rpc, 'cast')
 
         timeutils.utcnow().AndReturn('fake-now')
         db.volume_update(self.context, 31337,
                 {'host': host, 'scheduled_at': 'fake-now'})
-        db.queue_get_for(self.context,
+        rpc.queue_get_for(self.context,
                          FLAGS.volume_topic, host).AndReturn(queue)
         rpc.cast(self.context, queue,
                 {'method': method,
@@ -261,10 +261,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
         fake_kwargs = {'extra_arg': 'meow'}
         queue = 'fake_queue'
 
-        self.mox.StubOutWithMock(db, 'queue_get_for')
+        self.mox.StubOutWithMock(rpc, 'queue_get_for')
         self.mox.StubOutWithMock(rpc, 'cast')
 
-        db.queue_get_for(self.context,
+        rpc.queue_get_for(self.context,
                          FLAGS.volume_topic, host).AndReturn(queue)
         rpc.cast(self.context, queue,
                 {'method': method,
@@ -280,10 +280,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
         fake_kwargs = {'extra_arg': 'meow'}
         queue = 'fake_queue'
 
-        self.mox.StubOutWithMock(db, 'queue_get_for')
+        self.mox.StubOutWithMock(rpc, 'queue_get_for')
         self.mox.StubOutWithMock(rpc, 'cast')
 
-        db.queue_get_for(self.context,
+        rpc.queue_get_for(self.context,
                          FLAGS.volume_topic, host).AndReturn(queue)
         rpc.cast(self.context, queue,
                 {'method': method,
@@ -313,10 +313,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
         topic = 'unknown'
         queue = 'fake_queue'
 
-        self.mox.StubOutWithMock(db, 'queue_get_for')
+        self.mox.StubOutWithMock(rpc, 'queue_get_for')
         self.mox.StubOutWithMock(rpc, 'cast')
 
-        db.queue_get_for(self.context, topic, host).AndReturn(queue)
+        rpc.queue_get_for(self.context, topic, host).AndReturn(queue)
         rpc.cast(self.context, queue,
                 {'method': method,
                  'args': fake_kwargs})
index 2dfe436795d3f976fbfa33ea0d21cd1e2f2cb1e1..47e3306509151bf377c9c952f4434048b2bafc09 100644 (file)
@@ -148,7 +148,7 @@ class API(base.Base):
                                                    'terminated_at': now})
         host = volume['host']
         rpc.cast(context,
-                 self.db.queue_get_for(context, FLAGS.volume_topic, host),
+                 rpc.queue_get_for(context, FLAGS.volume_topic, host),
                  {"method": "delete_volume",
                   "args": {"volume_id": volume_id}})
 
@@ -234,7 +234,7 @@ class API(base.Base):
     def remove_from_compute(self, context, volume, instance_id, host):
         """Remove volume from specified compute host."""
         rpc.call(context,
-                 self.db.queue_get_for(context, FLAGS.compute_topic, host),
+                 rpc.queue_get_for(context, FLAGS.compute_topic, host),
                  {"method": "remove_volume_connection",
                   "args": {'instance_id': instance_id,
                            'volume_id': volume['id']}})
@@ -251,7 +251,7 @@ class API(base.Base):
     @wrap_check_policy
     def attach(self, context, volume, instance_uuid, mountpoint):
         host = volume['host']
-        queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
+        queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
         return rpc.call(context, queue,
                         {"method": "attach_volume",
                          "args": {"volume_id": volume['id'],
@@ -261,7 +261,7 @@ class API(base.Base):
     @wrap_check_policy
     def detach(self, context, volume):
         host = volume['host']
-        queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
+        queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
         return rpc.call(context, queue,
                  {"method": "detach_volume",
                   "args": {"volume_id": volume['id']}})
@@ -269,7 +269,7 @@ class API(base.Base):
     @wrap_check_policy
     def initialize_connection(self, context, volume, connector):
         host = volume['host']
-        queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
+        queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
         return rpc.call(context, queue,
                         {"method": "initialize_connection",
                          "args": {"volume_id": volume['id'],
@@ -279,7 +279,7 @@ class API(base.Base):
     def terminate_connection(self, context, volume, connector):
         self.unreserve_volume(context, volume)
         host = volume['host']
-        queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
+        queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
         return rpc.call(context, queue,
                         {"method": "terminate_connection",
                          "args": {"volume_id": volume['id'],
@@ -306,7 +306,7 @@ class API(base.Base):
         snapshot = self.db.snapshot_create(context, options)
         host = volume['host']
         rpc.cast(context,
-                 self.db.queue_get_for(context, FLAGS.volume_topic, host),
+                 rpc.queue_get_for(context, FLAGS.volume_topic, host),
                  {"method": "create_snapshot",
                   "args": {"volume_id": volume['id'],
                            "snapshot_id": snapshot['id']}})
@@ -330,7 +330,7 @@ class API(base.Base):
         volume = self.db.volume_get(context, snapshot['volume_id'])
         host = volume['host']
         rpc.cast(context,
-                 self.db.queue_get_for(context, FLAGS.volume_topic, host),
+                 rpc.queue_get_for(context, FLAGS.volume_topic, host),
                  {"method": "delete_snapshot",
                   "args": {"snapshot_id": snapshot['id']}})