:returns: A common Connection.
"""
- connection = n_rpc.create_connection(new=True)
+ connection = n_rpc.create_connection()
for details in topic_details:
topic, operation, node_name = itertools.islice(
itertools.chain(details, [None]), 3)
# License for the specific language governing permissions and limitations
# under the License.
+from debtcollector import removals
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
def start(self):
super(Service, self).start()
- self.conn = create_connection(new=True)
+ self.conn = create_connection()
LOG.debug("Creating Consumer connection for Service %s",
self.topic)
# functions
+@removals.removed_kwarg('new')
def create_connection(new=True):
# NOTE(salv-orlando): This is a clever interpreation of the factory design
# patter aimed at preventing plugins from initializing RPC servers upon
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.rpc_context = n_context.ContextBase('neutron', 'neutron',
is_admin=False)
- self.conn = n_rpc.create_connection(new=True)
+ self.conn = n_rpc.create_connection()
self.endpoints = [BridgeRpcCallbacks(),
securitygroups_rpc.SecurityGroupServerRpcCallback(),
dhcp_rpc.DhcpRpcCallback(),
"""Start the RPC loop to let the plugin communicate with agents."""
self._setup_rpc()
self.topic = topics.PLUGIN
- self.conn = n_rpc.create_connection(new=True)
+ self.conn = n_rpc.create_connection()
self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
self.conn.create_consumer(topics.REPORTS,
[agents_db.AgentExtRpcCallback()],
def start_rpc_listeners(self):
# RPC support
self.topic = topics.L3PLUGIN
- self.conn = n_rpc.create_connection(new=True)
+ self.conn = n_rpc.create_connection()
self.agent_notifiers.update(
{n_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
self.endpoints = [l3_rpc.L3RpcCallback()]
def start_rpc_listeners(self):
self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)]
- self.conn = n_rpc.create_connection(new=True)
+ self.conn = n_rpc.create_connection()
self.conn.create_consumer(
topics.METERING_PLUGIN, self.endpoints, fanout=False)
return self.conn.consume_in_threads()
def test_create_consumers_start_listening(self):
endpoints = [mock.Mock()]
expected = [
- mock.call(new=True),
+ mock.call(),
mock.call().create_consumer('foo-topic-op', endpoints,
fanout=True),
mock.call().consume_in_threads()
def test_create_consumers_do_not_listen(self):
endpoints = [mock.Mock()]
expected = [
- mock.call(new=True),
+ mock.call(),
mock.call().create_consumer('foo-topic-op', endpoints,
fanout=True),
]
def test_create_consumers_with_node_name(self):
endpoints = [mock.Mock()]
expected = [
- mock.call(new=True),
+ mock.call(),
mock.call().create_consumer('foo-topic-op', endpoints,
fanout=True),
mock.call().create_consumer('foo-topic-op.node1', endpoints,