LOG = logging.getLogger(__name__)
-def create_consumers(endpoints, prefix, topic_details):
+def create_consumers(endpoints, prefix, topic_details, start_listening=True):
"""Create agent RPC consumers.
:param endpoints: The list of endpoints to process the incoming messages.
:param topic_details: A list of topics. Each topic has a name, an
operation, and an optional host param keying the
subscription to topic.host for plugin calls.
+ :param start_listening: if True, it starts the processing loop
:returns: A common Connection.
"""
connection.create_consumer(node_topic_name,
endpoints,
fanout=False)
- connection.consume_in_threads()
+ if start_listening:
+ connection.consume_in_threads()
return connection
self.iter_num = 0
self.run_daemon_loop = True
+ # The initialization is complete; we can start receiving messages
+ self.connection.consume_in_threads()
+
def _report_state(self):
# How many devices are likely used by a VM
self.agent_state.get('configurations')['devices'] = (
topics.UPDATE, cfg.CONF.host])
self.connection = agent_rpc.create_consumers(self.endpoints,
self.topic,
- consumers)
+ consumers,
+ start_listening=False)
def get_net_uuid(self, vif_id):
for network_id, vlan_mapping in self.local_vlan_map.iteritems():
class AgentRPCMethods(base.BaseTestCase):
- def test_create_consumers(self):
+
+ def _test_create_consumers(
+ self, endpoints, method, expected, topics, listen):
+ call_to_patch = 'neutron.common.rpc.create_connection'
+ with mock.patch(call_to_patch) as create_connection:
+ rpc.create_consumers(
+ endpoints, method, topics, start_listening=listen)
+ create_connection.assert_has_calls(expected)
+
+ def test_create_consumers_start_listening(self):
endpoints = [mock.Mock()]
expected = [
mock.call(new=True),
fanout=True),
mock.call().consume_in_threads()
]
+ method = 'foo'
+ topics = [('topic', 'op')]
+ self._test_create_consumers(
+ endpoints, method, expected, topics, True)
- call_to_patch = 'neutron.common.rpc.create_connection'
- with mock.patch(call_to_patch) as create_connection:
- rpc.create_consumers(endpoints, 'foo', [('topic', 'op')])
- create_connection.assert_has_calls(expected)
+ def test_create_consumers_do_not_listen(self):
+ endpoints = [mock.Mock()]
+ expected = [
+ mock.call(new=True),
+ mock.call().create_consumer('foo-topic-op', endpoints,
+ fanout=True),
+ ]
+ method = 'foo'
+ topics = [('topic', 'op')]
+ self._test_create_consumers(
+ endpoints, method, expected, topics, False)
def test_create_consumers_with_node_name(self):
endpoints = [mock.Mock()]