# License for the specific language governing permissions and limitations
# under the License.
+from oslo import messaging
+
from neutron.common import constants
from neutron.common import rpc as n_rpc
from neutron.common import topics
LOG = logging.getLogger(__name__)
-class MeteringAgentNotifyAPI(n_rpc.RpcProxy):
+class MeteringAgentNotifyAPI(object):
"""API for plugin to notify L3 metering agent."""
- BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic=topics.METERING_AGENT):
- super(MeteringAgentNotifyAPI, self).__init__(
- topic=topic, default_version=self.BASE_RPC_API_VERSION)
+ self.topic = topic
+ target = messaging.Target(topic=topic, version='1.0')
+ self.client = n_rpc.get_client(target)
def _agent_notification(self, context, method, routers):
"""Notify l3 metering agents hosted by l3 agent hosts."""
l3_routers[l3_agent.host] = l3_router
for host, routers in l3_routers.iteritems():
- self.cast(context, self.make_msg(method, routers=routers),
- topic='%s.%s' % (self.topic, host))
+ cctxt = self.client.prepare(server=host)
+ cctxt.cast(context, method, routers=routers)
def _notification_fanout(self, context, method, router_id):
LOG.debug('Fanout notify metering agent at %(topic)s the message '
{'topic': self.topic,
'method': method,
'router_id': router_id})
- self.fanout_cast(
- context, self.make_msg(method,
- router_id=router_id))
+ cctxt = self.client.prepare(fanout=True)
+ cctxt.cast(context, method, router_id=router_id)
def _notification(self, context, method, routers):
"""Notify all the agents that are hosting the routers."""
plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
self._agent_notification(context, method, routers)
else:
- self.fanout_cast(context, self.make_msg(method, routers=routers))
+ cctxt = self.client.prepare(fanout=True)
+ cctxt.cast(context, method, routers=routers)
def router_deleted(self, context, router_id):
self._notification_fanout(context, 'router_deleted', router_id)
self.uuid_patch = mock.patch(uuid, return_value=self.uuid)
self.mock_uuid = self.uuid_patch.start()
- fanout = ('neutron.common.rpc.RpcProxy.fanout_cast')
- self.fanout_patch = mock.patch(fanout)
- self.mock_fanout = self.fanout_patch.start()
-
self.tenant_id = 'a7e61382-47b8-4d40-bae3-f95981b5637b'
self.ctx = context.Context('', self.tenant_id, is_admin=True)
self.context_patch = mock.patch('neutron.context.Context',
self.topic = 'metering_agent'
+ add = ('neutron.api.rpc.agentnotifiers.' +
+ 'metering_rpc_agent_api.MeteringAgentNotifyAPI' +
+ '.add_metering_label')
+ self.add_patch = mock.patch(add)
+ self.mock_add = self.add_patch.start()
+
+ remove = ('neutron.api.rpc.agentnotifiers.' +
+ 'metering_rpc_agent_api.MeteringAgentNotifyAPI' +
+ '.remove_metering_label')
+ self.remove_patch = mock.patch(remove)
+ self.mock_remove = self.remove_patch.start()
+
+ update = ('neutron.api.rpc.agentnotifiers.' +
+ 'metering_rpc_agent_api.MeteringAgentNotifyAPI' +
+ '.update_metering_label_rules')
+ self.update_patch = mock.patch(update)
+ self.mock_update = self.update_patch.start()
+
def test_add_metering_label_rpc_call(self):
second_uuid = 'e27fe2df-376e-4ac7-ae13-92f050a21f84'
- expected = {'args': {'routers': [{'status': 'ACTIVE',
- 'name': 'router1',
- 'gw_port_id': None,
- 'admin_state_up': True,
- 'tenant_id': self.tenant_id,
- '_metering_labels': [
- {'rules': [],
- 'id': self.uuid}],
- 'id': self.uuid}]},
- 'namespace': None,
- 'method': 'add_metering_label'}
+ expected = [{'status': 'ACTIVE',
+ 'name': 'router1',
+ 'gw_port_id': None,
+ 'admin_state_up': True,
+ 'tenant_id': self.tenant_id,
+ '_metering_labels': [
+ {'rules': [],
+ 'id': self.uuid}],
+ 'id': self.uuid}]
tenant_id_2 = '8a268a58-1610-4890-87e0-07abb8231206'
self.mock_uuid.return_value = second_uuid
set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
set_context=True):
- self.mock_fanout.assert_called_with(self.ctx, expected)
+ self.mock_add.assert_called_with(self.ctx, expected)
def test_add_metering_label_shared_rpc_call(self):
second_uuid = 'e27fe2df-376e-4ac7-ae13-92f050a21f84'
- expected = {'args': {'routers': [{'status': 'ACTIVE',
- 'name': 'router1',
- 'gw_port_id': None,
- 'admin_state_up': True,
- 'tenant_id': self.tenant_id,
- '_metering_labels': [
- {'rules': [],
- 'id': self.uuid},
- {'rules': [],
- 'id': second_uuid}],
- 'id': self.uuid}]},
- 'namespace': None,
- 'method': 'add_metering_label'}
+ expected = [{'status': 'ACTIVE',
+ 'name': 'router1',
+ 'gw_port_id': None,
+ 'admin_state_up': True,
+ 'tenant_id': self.tenant_id,
+ '_metering_labels': [
+ {'rules': [],
+ 'id': self.uuid},
+ {'rules': [],
+ 'id': second_uuid}],
+ 'id': self.uuid}]
tenant_id_2 = '8a268a58-1610-4890-87e0-07abb8231206'
with self.router(name='router1', tenant_id=self.tenant_id,
self.mock_uuid.return_value = second_uuid
with self.metering_label(tenant_id=tenant_id_2, shared=True,
set_context=True):
- self.mock_fanout.assert_called_with(self.ctx, expected)
+ self.mock_add.assert_called_with(self.ctx, expected)
def test_remove_metering_label_rpc_call(self):
- expected = {'args':
- {'routers': [{'status': 'ACTIVE',
- 'name': 'router1',
- 'gw_port_id': None,
- 'admin_state_up': True,
- 'tenant_id': self.tenant_id,
- '_metering_labels': [
- {'rules': [],
- 'id': self.uuid}],
- 'id': self.uuid}]},
- 'namespace': None,
- 'method': 'add_metering_label'}
+ expected = [{'status': 'ACTIVE',
+ 'name': 'router1',
+ 'gw_port_id': None,
+ 'admin_state_up': True,
+ 'tenant_id': self.tenant_id,
+ '_metering_labels': [
+ {'rules': [],
+ 'id': self.uuid}],
+ 'id': self.uuid}]
with self.router(tenant_id=self.tenant_id, set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
set_context=True):
- self.mock_fanout.assert_called_with(self.ctx, expected)
- expected['method'] = 'remove_metering_label'
- self.mock_fanout.assert_called_with(self.ctx, expected)
+ self.mock_add.assert_called_with(self.ctx, expected)
+ self.mock_remove.assert_called_with(self.ctx, expected)
def test_remove_one_metering_label_rpc_call(self):
second_uuid = 'e27fe2df-376e-4ac7-ae13-92f050a21f84'
- expected_add = {'args':
- {'routers': [{'status': 'ACTIVE',
- 'name': 'router1',
- 'gw_port_id': None,
- 'admin_state_up': True,
- 'tenant_id': self.tenant_id,
- '_metering_labels': [
- {'rules': [],
- 'id': self.uuid},
- {'rules': [],
- 'id': second_uuid}],
- 'id': self.uuid}]},
- 'namespace': None,
- 'method': 'add_metering_label'}
- expected_remove = {'args':
- {'routers': [{'status': 'ACTIVE',
- 'name': 'router1',
- 'gw_port_id': None,
- 'admin_state_up': True,
- 'tenant_id': self.tenant_id,
- '_metering_labels': [
- {'rules': [],
- 'id': second_uuid}],
- 'id': self.uuid}]},
- 'namespace': None,
- 'method': 'remove_metering_label'}
+ expected_add = [{'status': 'ACTIVE',
+ 'name': 'router1',
+ 'gw_port_id': None,
+ 'admin_state_up': True,
+ 'tenant_id': self.tenant_id,
+ '_metering_labels': [
+ {'rules': [],
+ 'id': self.uuid},
+ {'rules': [],
+ 'id': second_uuid}],
+ 'id': self.uuid}]
+ expected_remove = [{'status': 'ACTIVE',
+ 'name': 'router1',
+ 'gw_port_id': None,
+ 'admin_state_up': True,
+ 'tenant_id': self.tenant_id,
+ '_metering_labels': [
+ {'rules': [],
+ 'id': second_uuid}],
+ 'id': self.uuid}]
with self.router(tenant_id=self.tenant_id, set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
self.mock_uuid.return_value = second_uuid
with self.metering_label(tenant_id=self.tenant_id,
set_context=True):
- self.mock_fanout.assert_called_with(self.ctx, expected_add)
- self.mock_fanout.assert_called_with(self.ctx, expected_remove)
+ self.mock_add.assert_called_with(self.ctx, expected_add)
+ self.mock_remove.assert_called_with(self.ctx, expected_remove)
def test_update_metering_label_rules_rpc_call(self):
second_uuid = 'e27fe2df-376e-4ac7-ae13-92f050a21f84'
- expected_add = {'args':
- {'routers': [
- {'status': 'ACTIVE',
- 'name': 'router1',
- 'gw_port_id': None,
- 'admin_state_up': True,
- 'tenant_id': self.tenant_id,
- '_metering_labels': [
- {'rules': [
- {'remote_ip_prefix': '10.0.0.0/24',
- 'direction': 'ingress',
- 'metering_label_id': self.uuid,
- 'excluded': False,
- 'id': self.uuid},
- {'remote_ip_prefix': '10.0.0.0/24',
- 'direction': 'egress',
- 'metering_label_id': self.uuid,
- 'excluded': False,
- 'id': second_uuid}],
- 'id': self.uuid}],
- 'id': self.uuid}]},
- 'namespace': None,
- 'method': 'update_metering_label_rules'}
-
- expected_del = {'args':
- {'routers': [
- {'status': 'ACTIVE',
- 'name': 'router1',
- 'gw_port_id': None,
- 'admin_state_up': True,
- 'tenant_id': self.tenant_id,
- '_metering_labels': [
- {'rules': [
- {'remote_ip_prefix': '10.0.0.0/24',
- 'direction': 'ingress',
- 'metering_label_id': self.uuid,
- 'excluded': False,
- 'id': self.uuid}],
+ expected_add = [{'status': 'ACTIVE',
+ 'name': 'router1',
+ 'gw_port_id': None,
+ 'admin_state_up': True,
+ 'tenant_id': self.tenant_id,
+ '_metering_labels': [
+ {'rules': [
+ {'remote_ip_prefix': '10.0.0.0/24',
+ 'direction': 'ingress',
+ 'metering_label_id': self.uuid,
+ 'excluded': False,
+ 'id': self.uuid},
+ {'remote_ip_prefix': '10.0.0.0/24',
+ 'direction': 'egress',
+ 'metering_label_id': self.uuid,
+ 'excluded': False,
+ 'id': second_uuid}],
+ 'id': self.uuid}],
+ 'id': self.uuid}]
+
+ expected_del = [{'status': 'ACTIVE',
+ 'name': 'router1',
+ 'gw_port_id': None,
+ 'admin_state_up': True,
+ 'tenant_id': self.tenant_id,
+ '_metering_labels': [
+ {'rules': [
+ {'remote_ip_prefix': '10.0.0.0/24',
+ 'direction': 'ingress',
+ 'metering_label_id': self.uuid,
+ 'excluded': False,
'id': self.uuid}],
- 'id': self.uuid}]},
- 'namespace': None,
- 'method': 'update_metering_label_rules'}
+ 'id': self.uuid}],
+ 'id': self.uuid}]
with self.router(tenant_id=self.tenant_id, set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
with self.metering_label_rule(l['id']):
self.mock_uuid.return_value = second_uuid
with self.metering_label_rule(l['id'], direction='egress'):
- self.mock_fanout.assert_called_with(self.ctx,
+ self.mock_update.assert_called_with(self.ctx,
expected_add)
- self.mock_fanout.assert_called_with(self.ctx,
+ self.mock_update.assert_called_with(self.ctx,
expected_del)
def test_delete_metering_label_does_not_clear_router_tenant_id(self):
self.uuid_patch = mock.patch(uuid, return_value=self.uuid)
self.mock_uuid = self.uuid_patch.start()
- cast = 'neutron.common.rpc.RpcProxy.cast'
- self.cast_patch = mock.patch(cast)
- self.mock_cast = self.cast_patch.start()
-
self.tenant_id = 'a7e61382-47b8-4d40-bae3-f95981b5637b'
self.ctx = context.Context('', self.tenant_id, is_admin=True)
self.context_patch = mock.patch('neutron.context.Context',
self.topic = 'metering_agent'
+ add = ('neutron.api.rpc.agentnotifiers.' +
+ 'metering_rpc_agent_api.MeteringAgentNotifyAPI' +
+ '.add_metering_label')
+ self.add_patch = mock.patch(add)
+ self.mock_add = self.add_patch.start()
+
+ remove = ('neutron.api.rpc.agentnotifiers.' +
+ 'metering_rpc_agent_api.MeteringAgentNotifyAPI' +
+ '.remove_metering_label')
+ self.remove_patch = mock.patch(remove)
+ self.mock_remove = self.remove_patch.start()
+
def test_add_metering_label_rpc_call(self):
second_uuid = 'e27fe2df-376e-4ac7-ae13-92f050a21f84'
- expected1 = {'args': {'routers': [{'status': 'ACTIVE',
- 'name': 'router1',
- 'gw_port_id': None,
- 'admin_state_up': True,
- 'tenant_id': self.tenant_id,
- '_metering_labels': [
- {'rules': [],
- 'id': second_uuid}],
- 'id': self.uuid}]},
- 'namespace': None,
- 'method': 'add_metering_label'}
- expected2 = {'args': {'routers': [{'status': 'ACTIVE',
- 'name': 'router2',
- 'gw_port_id': None,
- 'admin_state_up': True,
- 'tenant_id': self.tenant_id,
- '_metering_labels': [
- {'rules': [],
- 'id': second_uuid}],
- 'id': second_uuid}]},
- 'namespace': None,
- 'method': 'add_metering_label'}
+ expected = [{'status': 'ACTIVE',
+ 'name': 'router1',
+ 'gw_port_id': None,
+ 'admin_state_up': True,
+ 'tenant_id': self.tenant_id,
+ '_metering_labels': [
+ {'rules': [],
+ 'id': second_uuid}],
+ 'id': self.uuid},
+ {'status': 'ACTIVE',
+ 'name': 'router2',
+ 'gw_port_id': None,
+ 'admin_state_up': True,
+ 'tenant_id': self.tenant_id,
+ '_metering_labels': [
+ {'rules': [],
+ 'id': second_uuid}],
+ 'id': second_uuid}]
# bind each router to a specific agent
agent1 = agents_db.Agent(host='agent1')
set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
set_context=True):
-
- topic1 = "%s.%s" % (self.topic, 'agent1')
- topic2 = "%s.%s" % (self.topic, 'agent2')
-
- # check if there is a call per agent
- expected = [mock.call(self.ctx, expected1, topic=topic1),
- mock.call(self.ctx, expected2, topic=topic2)]
-
- self.mock_cast.assert_has_calls(expected, any_order=True)
+ self.mock_add.assert_called_with(self.ctx, expected)
class TestMeteringPluginL3AgentSchedulerServicePlugin(