NOTIFIER = 'neutron.plugins.ml2.rpc.AgentNotifierApi'
DEVICE_OWNER_COMPUTE = 'compute:None'
-FLOODING_ENTRY_AS_LIST = list(constants.FLOODING_ENTRY)
-
class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
_mechanism_drivers = ['openvswitch', 'linuxbridge',
self.fanout_topic = topics.get_topic_name(topics.AGENT,
topics.L2POPULATION,
topics.UPDATE)
- fanout = ('neutron.common.rpc.RpcProxy.fanout_cast')
+ fanout = ('neutron.plugins.ml2.drivers.l2pop.rpc.'
+ 'L2populationAgentNotifyAPI._notification_fanout')
fanout_patch = mock.patch(fanout)
self.mock_fanout = fanout_patch.start()
- cast = ('neutron.common.rpc.RpcProxy.cast')
+ cast = ('neutron.plugins.ml2.drivers.l2pop.rpc.'
+ 'L2populationAgentNotifyAPI._notification_host')
cast_patch = mock.patch(cast)
self.mock_cast = cast_patch.start()
device=device)
p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
- expected = {'args':
- {'fdb_entries':
- {p1['network_id']:
- {'ports':
- {'20.0.0.1': [FLOODING_ENTRY_AS_LIST,
- [p1['mac_address'],
- p1_ips[0]]]},
- 'network_type': 'vxlan',
- 'segment_id': 1}}},
- 'namespace': None,
- 'method': 'add_fdb_entries'}
+ expected = {p1['network_id']:
+ {'ports':
+ {'20.0.0.1': [constants.FLOODING_ENTRY,
+ l2pop_rpc.PortInfo(
+ p1['mac_address'],
+ p1_ips[0])]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}
self.mock_fanout.assert_called_with(
- mock.ANY, expected, topic=self.fanout_topic)
+ mock.ANY, 'add_fdb_entries', expected)
def test_fdb_add_not_called_type_local(self):
self._register_ml2_agents()
device=device)
p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
- expected = {'args':
- {'fdb_entries':
- {p1['network_id']:
- {'ports':
- {'20.0.0.5': [FLOODING_ENTRY_AS_LIST,
- [p1['mac_address'],
- p1_ips[0]]]},
- 'network_type': 'vlan',
- 'segment_id': 2}}},
- 'namespace': None,
- 'method': 'add_fdb_entries'}
+ expected = {p1['network_id']:
+ {'ports':
+ {'20.0.0.5': [constants.FLOODING_ENTRY,
+ l2pop_rpc.PortInfo(
+ p1['mac_address'],
+ p1_ips[0])]},
+ 'network_type': 'vlan',
+ 'segment_id': 2}}
self.mock_fanout.assert_called_with(
- mock.ANY, expected, topic=self.fanout_topic)
+ mock.ANY, 'add_fdb_entries', expected)
def test_fdb_add_two_agents(self):
self._register_ml2_agents()
p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
p2_ips = [p['ip_address'] for p in p2['fixed_ips']]
- expected1 = {'args':
- {'fdb_entries':
- {p1['network_id']:
- {'ports':
- {'20.0.0.2': [FLOODING_ENTRY_AS_LIST,
- [p2['mac_address'],
- p2_ips[0]]]},
- 'network_type': 'vxlan',
- 'segment_id': 1}}},
- 'namespace': None,
- 'method': 'add_fdb_entries'}
-
- topic = topics.get_topic_name(topics.AGENT,
- topics.L2POPULATION,
- topics.UPDATE,
- HOST)
+ expected1 = {p1['network_id']:
+ {'ports':
+ {'20.0.0.2': [constants.FLOODING_ENTRY,
+ l2pop_rpc.PortInfo(
+ p2['mac_address'],
+ p2_ips[0])]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}
self.mock_cast.assert_called_with(mock.ANY,
- expected1,
- topic=topic)
-
- expected2 = {'args':
- {'fdb_entries':
- {p1['network_id']:
- {'ports':
- {'20.0.0.1': [FLOODING_ENTRY_AS_LIST,
- [p1['mac_address'],
- p1_ips[0]]]},
- 'network_type': 'vxlan',
- 'segment_id': 1}}},
- 'namespace': None,
- 'method': 'add_fdb_entries'}
+ 'add_fdb_entries',
+ expected1, HOST)
+
+ expected2 = {p1['network_id']:
+ {'ports':
+ {'20.0.0.1': [constants.FLOODING_ENTRY,
+ l2pop_rpc.PortInfo(
+ p1['mac_address'],
+ p1_ips[0])]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}
self.mock_fanout.assert_called_with(
- mock.ANY, expected2, topic=self.fanout_topic)
+ mock.ANY, 'add_fdb_entries', expected2)
def test_fdb_add_called_two_networks(self):
self._register_ml2_agents()
p1_ips = [p['ip_address']
for p in p1['fixed_ips']]
- expected1 = {'args':
- {'fdb_entries':
- {p1['network_id']:
- {'ports':
- {'20.0.0.2':
- [FLOODING_ENTRY_AS_LIST,
- [p1['mac_address'],
- p1_ips[0]]]},
- 'network_type': 'vxlan',
- 'segment_id': 1}}},
- 'namespace': None,
- 'method': 'add_fdb_entries'}
-
- topic = topics.get_topic_name(topics.AGENT,
- topics.L2POPULATION,
- topics.UPDATE,
- HOST)
-
- self.mock_cast.assert_called_with(mock.ANY,
- expected1,
- topic=topic)
+ expected1 = {p1['network_id']:
+ {'ports':
+ {'20.0.0.2':
+ [constants.FLOODING_ENTRY,
+ l2pop_rpc.PortInfo(
+ p1['mac_address'],
+ p1_ips[0])]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}
+
+ self.mock_cast.assert_called_with(
+ mock.ANY, 'add_fdb_entries', expected1,
+ HOST)
p3_ips = [p['ip_address']
for p in p3['fixed_ips']]
- expected2 = {'args':
- {'fdb_entries':
- {p1['network_id']:
- {'ports':
- {'20.0.0.1':
- [FLOODING_ENTRY_AS_LIST,
- [p3['mac_address'],
- p3_ips[0]]]},
- 'network_type': 'vxlan',
- 'segment_id': 1}}},
- 'namespace': None,
- 'method': 'add_fdb_entries'}
+ expected2 = {p1['network_id']:
+ {'ports':
+ {'20.0.0.1':
+ [constants.FLOODING_ENTRY,
+ l2pop_rpc.PortInfo(
+ p3['mac_address'],
+ p3_ips[0])]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}
self.mock_fanout.assert_called_with(
- mock.ANY, expected2,
- topic=self.fanout_topic)
+ mock.ANY, 'add_fdb_entries', expected2)
def test_update_port_down(self):
self._register_ml2_agents()
device=device2)
p2_ips = [p['ip_address'] for p in p2['fixed_ips']]
- expected = {'args':
- {'fdb_entries':
- {p2['network_id']:
- {'ports':
- {'20.0.0.1': [[p2['mac_address'],
- p2_ips[0]]]},
- 'network_type': 'vxlan',
- 'segment_id': 1}}},
- 'namespace': None,
- 'method': 'remove_fdb_entries'}
+ expected = {p2['network_id']:
+ {'ports':
+ {'20.0.0.1': [l2pop_rpc.PortInfo(
+ p2['mac_address'],
+ p2_ips[0])]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}
self.mock_fanout.assert_called_with(
- mock.ANY, expected, topic=self.fanout_topic)
+ mock.ANY, 'remove_fdb_entries', expected)
def test_update_port_down_last_port_up(self):
self._register_ml2_agents()
device=device2)
p2_ips = [p['ip_address'] for p in p2['fixed_ips']]
- expected = {'args':
- {'fdb_entries':
- {p2['network_id']:
- {'ports':
- {'20.0.0.1': [FLOODING_ENTRY_AS_LIST,
- [p2['mac_address'],
- p2_ips[0]]]},
- 'network_type': 'vxlan',
- 'segment_id': 1}}},
- 'namespace': None,
- 'method': 'remove_fdb_entries'}
+ expected = {p2['network_id']:
+ {'ports':
+ {'20.0.0.1': [constants.FLOODING_ENTRY,
+ l2pop_rpc.PortInfo(
+ p2['mac_address'],
+ p2_ips[0])]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}
self.mock_fanout.assert_called_with(
- mock.ANY, expected, topic=self.fanout_topic)
+ mock.ANY, 'remove_fdb_entries', expected)
def test_delete_port(self):
self._register_ml2_agents()
device=device1)
self._delete('ports', port2['port']['id'])
p2_ips = [p['ip_address'] for p in p2['fixed_ips']]
- expected = {'args':
- {'fdb_entries':
- {p2['network_id']:
- {'ports':
- {'20.0.0.1': [[p2['mac_address'],
- p2_ips[0]]]},
- 'network_type': 'vxlan',
- 'segment_id': 1}}},
- 'namespace': None,
- 'method': 'remove_fdb_entries'}
+ expected = {p2['network_id']:
+ {'ports':
+ {'20.0.0.1': [l2pop_rpc.PortInfo(
+ p2['mac_address'],
+ p2_ips[0])]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}
self.mock_fanout.assert_any_call(
- mock.ANY, expected, topic=self.fanout_topic)
+ mock.ANY, 'remove_fdb_entries', expected)
def test_delete_port_last_port_up(self):
self._register_ml2_agents()
device=device)
self._delete('ports', port['port']['id'])
p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
- expected = {'args':
- {'fdb_entries':
- {p1['network_id']:
- {'ports':
- {'20.0.0.1': [FLOODING_ENTRY_AS_LIST,
- [p1['mac_address'],
- p1_ips[0]]]},
- 'network_type': 'vxlan',
- 'segment_id': 1}}},
- 'namespace': None,
- 'method': 'remove_fdb_entries'}
+ expected = {p1['network_id']:
+ {'ports':
+ {'20.0.0.1': [constants.FLOODING_ENTRY,
+ l2pop_rpc.PortInfo(
+ p1['mac_address'],
+ p1_ips[0])]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}
self.mock_fanout.assert_any_call(
- mock.ANY, expected, topic=self.fanout_topic)
+ mock.ANY, 'remove_fdb_entries', expected)
def test_fixed_ips_changed(self):
self._register_ml2_agents()
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 2)
- add_expected = {'args':
- {'fdb_entries':
- {'chg_ip':
- {p1['network_id']:
- {'20.0.0.1':
- {'after': [(p1['mac_address'],
- '10.0.0.10')]}}}}},
- 'namespace': None,
- 'method': 'update_fdb_entries'}
+ add_expected = {'chg_ip':
+ {p1['network_id']:
+ {'20.0.0.1':
+ {'after': [(p1['mac_address'],
+ '10.0.0.10')]}}}}
self.mock_fanout.assert_any_call(
- mock.ANY, add_expected, topic=self.fanout_topic)
+ mock.ANY, 'update_fdb_entries', add_expected)
self.mock_fanout.reset_mock()
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 2)
- upd_expected = {'args':
- {'fdb_entries':
- {'chg_ip':
- {p1['network_id']:
- {'20.0.0.1':
- {'before': [(p1['mac_address'],
- '10.0.0.10')],
- 'after': [(p1['mac_address'],
- '10.0.0.16')]}}}}},
- 'namespace': None,
- 'method': 'update_fdb_entries'}
+ upd_expected = {'chg_ip':
+ {p1['network_id']:
+ {'20.0.0.1':
+ {'before': [(p1['mac_address'],
+ '10.0.0.10')],
+ 'after': [(p1['mac_address'],
+ '10.0.0.16')]}}}}
self.mock_fanout.assert_any_call(
- mock.ANY, upd_expected, topic=self.fanout_topic)
+ mock.ANY, 'update_fdb_entries', upd_expected)
self.mock_fanout.reset_mock()
ips = res['port']['fixed_ips']
self.assertEqual(len(ips), 1)
- del_expected = {'args':
- {'fdb_entries':
- {'chg_ip':
- {p1['network_id']:
- {'20.0.0.1':
- {'before': [(p1['mac_address'],
- '10.0.0.2')]}}}}},
- 'namespace': None,
- 'method': 'update_fdb_entries'}
+ del_expected = {'chg_ip':
+ {p1['network_id']:
+ {'20.0.0.1':
+ {'before': [(p1['mac_address'],
+ '10.0.0.2')]}}}}
self.mock_fanout.assert_any_call(
- mock.ANY, del_expected, topic=self.fanout_topic)
+ mock.ANY, 'update_fdb_entries', del_expected)
def test_no_fdb_updates_without_port_updates(self):
self._register_ml2_agents()
device=device1,
agent_id=L2_AGENT_2['host'])
p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
- expected = {'args':
- {'fdb_entries':
- {p1['network_id']:
- {'ports':
- {'20.0.0.1': [FLOODING_ENTRY_AS_LIST,
- [p1['mac_address'],
- p1_ips[0]]]},
- 'network_type': 'vxlan',
- 'segment_id': 1}}},
- 'namespace': None,
- 'method': 'remove_fdb_entries'}
+ expected = {p1['network_id']:
+ {'ports':
+ {'20.0.0.1': [constants.FLOODING_ENTRY,
+ l2pop_rpc.PortInfo(
+ p1['mac_address'],
+ p1_ips[0])]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}
self.mock_fanout.assert_called_with(
- mock.ANY, expected, topic=self.fanout_topic)
+ mock.ANY, 'remove_fdb_entries', expected)
def test_host_changed_twice(self):
self._register_ml2_agents()
device=device1,
agent_id=L2_AGENT_4['host'])
p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
- expected = {'args':
- {'fdb_entries':
- {p1['network_id']:
- {'ports':
- {'20.0.0.1': [FLOODING_ENTRY_AS_LIST,
- [p1['mac_address'],
- p1_ips[0]]]},
- 'network_type': 'vxlan',
- 'segment_id': 1}}},
- 'namespace': None,
- 'method': 'remove_fdb_entries'}
+ expected = {p1['network_id']:
+ {'ports':
+ {'20.0.0.1': [constants.FLOODING_ENTRY,
+ l2pop_rpc.PortInfo(
+ p1['mac_address'],
+ p1_ips[0])]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}
self.mock_fanout.assert_called_with(
- mock.ANY, expected, topic=self.fanout_topic)
+ mock.ANY, 'remove_fdb_entries', expected)
def test_delete_port_invokes_update_device_down(self):
l2pop_mech = l2pop_mech_driver.L2populationMechanismDriver()