Part of blueprint common-rpc.
The function queue_get_for() is a utility function used by various
consumers of the rpc API. This function lived in the db API, but never
ended up using anything from the database. This patch moves it into the
rpc API so that it can be used by other users of rpc once it moves into
openstack-common.
Change-Id: If92675beecff5471b416a929c161b810e3c71939
Reviewed-on: https://review.openstack.org/9906
Reviewed-by: Vish Ishaya <vishvananda@gmail.com>
Approved: John Griffith <john.griffith@solidfire.com>
Tested-by: Jenkins
return
rpc.cast(ctxt,
- db.queue_get_for(ctxt, FLAGS.volume_topic, host),
+ rpc.queue_get_for(ctxt, FLAGS.volume_topic, host),
{"method": "delete_volume",
"args": {"volume_id": volume['id']}})
instance = db.instance_get(ctxt, volume['instance_id'])
host = instance['host']
rpc.cast(ctxt,
- db.queue_get_for(ctxt, FLAGS.compute_topic, host),
+ rpc.queue_get_for(ctxt, FLAGS.compute_topic, host),
{"method": "attach_volume",
"args": {"instance_id": instance['id'],
"volume_id": volume['id'],
###################
-def queue_get_for(context, topic, physical_node_id):
- """Return a channel to send a message to a node with a topic."""
- return IMPL.queue_get_for(context, topic, physical_node_id)
-
-
-###################
-
-
def iscsi_target_count_by_host(context, host):
"""Return count of export devices."""
return IMPL.iscsi_target_count_by_host(context, host)
###################
-def queue_get_for(context, topic, physical_node_id):
- # FIXME(ja): this should be servername?
- return "%s.%s" % (topic, physical_node_id)
-
-
-###################
-
-
@require_admin_context
def iscsi_target_count_by_host(context, host):
return model_query(context, models.IscsiTarget).\
topic, msg)
+def queue_get_for(context, topic, host):
+ """Get a queue name for a given topic + host."""
+ return '%s.%s' % (topic, host)
+
+
_RPCIMPL = None
db.volume_update(context, volume_id,
{'host': host, 'scheduled_at': now})
rpc.cast(context,
- db.queue_get_for(context, FLAGS.volume_topic, host),
- {"method": method, "args": kwargs})
+ rpc.queue_get_for(context, FLAGS.volume_topic, host),
+ {"method": method, "args": kwargs})
LOG.debug(_("Casted '%(method)s' to host '%(host)s'") % locals())
func(context, host, method, update_db=update_db, **kwargs)
else:
rpc.cast(context,
- db.queue_get_for(context, topic, host),
- {"method": method, "args": kwargs})
+ rpc.queue_get_for(context, topic, host),
+ {"method": method, "args": kwargs})
LOG.debug(_("Casted '%(method)s' to %(topic)s '%(host)s'")
% locals())
"""
src = instance_ref['host']
- dst_t = db.queue_get_for(context, FLAGS.compute_topic, dest)
- src_t = db.queue_get_for(context, FLAGS.compute_topic, src)
+ dst_t = rpc.queue_get_for(context, FLAGS.compute_topic, dest)
+ src_t = rpc.queue_get_for(context, FLAGS.compute_topic, src)
filename = rpc.call(context, dst_t,
{"method": 'create_shared_storage_test_file'})
self.mox.StubOutWithMock(timeutils, 'utcnow')
self.mox.StubOutWithMock(db, 'volume_update')
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
timeutils.utcnow().AndReturn('fake-now')
db.volume_update(self.context, 31337,
{'host': host, 'scheduled_at': 'fake-now'})
- db.queue_get_for(self.context,
+ rpc.queue_get_for(self.context,
FLAGS.volume_topic, host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
fake_kwargs = {'extra_arg': 'meow'}
queue = 'fake_queue'
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
- db.queue_get_for(self.context,
+ rpc.queue_get_for(self.context,
FLAGS.volume_topic, host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
fake_kwargs = {'extra_arg': 'meow'}
queue = 'fake_queue'
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
- db.queue_get_for(self.context,
+ rpc.queue_get_for(self.context,
FLAGS.volume_topic, host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
topic = 'unknown'
queue = 'fake_queue'
- self.mox.StubOutWithMock(db, 'queue_get_for')
+ self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
- db.queue_get_for(self.context, topic, host).AndReturn(queue)
+ rpc.queue_get_for(self.context, topic, host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
'terminated_at': now})
host = volume['host']
rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.volume_topic, host),
+ rpc.queue_get_for(context, FLAGS.volume_topic, host),
{"method": "delete_volume",
"args": {"volume_id": volume_id}})
def remove_from_compute(self, context, volume, instance_id, host):
"""Remove volume from specified compute host."""
rpc.call(context,
- self.db.queue_get_for(context, FLAGS.compute_topic, host),
+ rpc.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "remove_volume_connection",
"args": {'instance_id': instance_id,
'volume_id': volume['id']}})
@wrap_check_policy
def attach(self, context, volume, instance_uuid, mountpoint):
host = volume['host']
- queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
+ queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "attach_volume",
"args": {"volume_id": volume['id'],
@wrap_check_policy
def detach(self, context, volume):
host = volume['host']
- queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
+ queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "detach_volume",
"args": {"volume_id": volume['id']}})
@wrap_check_policy
def initialize_connection(self, context, volume, connector):
host = volume['host']
- queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
+ queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "initialize_connection",
"args": {"volume_id": volume['id'],
def terminate_connection(self, context, volume, connector):
self.unreserve_volume(context, volume)
host = volume['host']
- queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
+ queue = rpc.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "terminate_connection",
"args": {"volume_id": volume['id'],
snapshot = self.db.snapshot_create(context, options)
host = volume['host']
rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.volume_topic, host),
+ rpc.queue_get_for(context, FLAGS.volume_topic, host),
{"method": "create_snapshot",
"args": {"volume_id": volume['id'],
"snapshot_id": snapshot['id']}})
volume = self.db.volume_get(context, snapshot['volume_id'])
host = volume['host']
rpc.cast(context,
- self.db.queue_get_for(context, FLAGS.volume_topic, host),
+ rpc.queue_get_for(context, FLAGS.volume_topic, host),
{"method": "delete_snapshot",
"args": {"snapshot_id": snapshot['id']}})