def initialize(self):
LOG.debug(_("Experimental L2 population driver"))
self.rpc_ctx = n_context.get_admin_context_without_session()
+ self.migrated_ports = {}
+ self.deleted_ports = {}
def _get_port_fdb_entries(self, port):
return [[port['mac_address'],
# available in delete_port_postcommit. in delete_port_postcommit
# agent_active_ports will be equal to 0, and the _update_port_down
# won't need agent_active_ports_count_for_flooding anymore
- self.remove_fdb_entries = self._update_port_down(context, 1)
+ port_context = context.current
+ fdb_entries = self._update_port_down(context, port_context, 1)
+ self.deleted_ports[context.current['id']] = fdb_entries
def delete_port_postcommit(self, context):
- l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
- self.rpc_ctx, self.remove_fdb_entries)
+ fanout_msg = self.deleted_ports.pop(context.current['id'], None)
+ if fanout_msg:
+ l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
+ self.rpc_ctx, fanout_msg)
def _get_diff_ips(self, orig, port):
orig_ips = set([ip['ip_address'] for ip in orig['fixed_ips']])
if orig_chg_ips or port_chg_ips:
return orig_chg_ips, port_chg_ips
- def _fixed_ips_changed(self, context, orig, port):
- diff_ips = self._get_diff_ips(orig, port)
- if not diff_ips:
- return
+ def _fixed_ips_changed(self, context, orig, port, diff_ips):
orig_ips, port_ips = diff_ips
port_infos = self._get_port_infos(context, orig)
port = context.current
orig = context.original
- if port['status'] == orig['status']:
- self._fixed_ips_changed(context, orig, port)
- elif port['status'] == const.PORT_STATUS_ACTIVE:
- self._update_port_up(context)
- elif port['status'] == const.PORT_STATUS_DOWN:
- fdb_entries = self._update_port_down(context)
- l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
- self.rpc_ctx, fdb_entries)
+ diff_ips = self._get_diff_ips(orig, port)
+ if diff_ips:
+ self._fixed_ips_changed(context, orig, port, diff_ips)
+ if (port['binding:host_id'] != orig['binding:host_id']
+ and port['status'] == const.PORT_STATUS_ACTIVE
+ and not self.migrated_ports.get(orig['id'])):
+ # The port has been migrated. We have to store the original
+ # binding to send appropriate fdb once the port will be set
+ # on the destination host
+ self.migrated_ports[orig['id']] = orig
+ elif port['status'] != orig['status']:
+ if port['status'] == const.PORT_STATUS_ACTIVE:
+ self._update_port_up(context)
+ elif port['status'] == const.PORT_STATUS_DOWN:
+ fdb_entries = self._update_port_down(context, port)
+ l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
+ self.rpc_ctx, fdb_entries)
+ elif port['status'] == const.PORT_STATUS_BUILD:
+ orig = self.migrated_ports.pop(port['id'], None)
+ if orig:
+ # this port has been migrated : remove its entries from fdb
+ fdb_entries = self._update_port_down(context, orig)
+ l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
+ self.rpc_ctx, fdb_entries)
def _get_port_infos(self, context, port):
agent_host = port['binding:host_id']
l2pop_rpc.L2populationAgentNotify.add_fdb_entries(self.rpc_ctx,
other_fdb_entries)
- def _update_port_down(self, context,
+ def _update_port_down(self, context, port_context,
agent_active_ports_count_for_flooding=0):
- port_context = context.current
port_infos = self._get_port_infos(context, port_context)
if not port_infos:
return
{'segment_id': segment['segmentation_id'],
'network_type': segment['network_type'],
'ports': {agent_ip: []}}}
-
if agent_active_ports == agent_active_ports_count_for_flooding:
# Agent is removing its last activated port in this network,
# other agents needs to be notified to delete their flooding entry.
other_fdb_entries[network_id]['ports'][agent_ip].append(
const.FLOODING_ENTRY)
-
- # Notify other agents to remove fdb rule for current port
- fdb_entries = self._get_port_fdb_entries(port_context)
- other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries
+ # Notify other agents to remove fdb rules for current port
+ other_fdb_entries[network_id]['ports'][agent_ip] += port_fdb_entries
return other_fdb_entries
new_status = (q_const.PORT_STATUS_BUILD if port.admin_state_up
else q_const.PORT_STATUS_DOWN)
if port.status != new_status:
+ plugin = manager.NeutronManager.get_plugin()
+ plugin.update_port_status(rpc_context,
+ port_id,
+ new_status)
port.status = new_status
entry = {'device': device,
'network_id': port.network_id,
'binary': 'neutron-openvswitch-agent',
'host': HOST + '_3',
'topic': constants.L2_AGENT_TOPIC,
- 'configurations': {'tunneling_ip': '20.0.0.2',
+ 'configurations': {'tunneling_ip': '20.0.0.3',
'tunnel_types': []},
'agent_type': constants.AGENT_TYPE_OVS,
'tunnel_type': [],
'start_flag': True
}
+L2_AGENT_4 = {
+ 'binary': 'neutron-openvswitch-agent',
+ 'host': HOST + '_4',
+ 'topic': constants.L2_AGENT_TOPIC,
+ 'configurations': {'tunneling_ip': '20.0.0.4',
+ 'tunnel_types': ['vxlan']},
+ 'agent_type': constants.AGENT_TYPE_OVS,
+ 'tunnel_type': [],
+ 'start_flag': True
+}
+
PLUGIN_NAME = 'neutron.plugins.ml2.plugin.Ml2Plugin'
NOTIFIER = 'neutron.plugins.ml2.rpc.AgentNotifierApi'
callback.report_state(self.adminContext,
agent_state={'agent_state': L2_AGENT_3},
time=timeutils.strtime())
+ callback.report_state(self.adminContext,
+ agent_state={'agent_state': L2_AGENT_4},
+ time=timeutils.strtime())
def test_fdb_add_called(self):
self._register_ml2_agents()
self.assertFalse(mock_fanout.called)
fanout_patch.stop()
+
+ def test_host_changed(self):
+ self._register_ml2_agents()
+ with self.subnet(network=self._network) as subnet:
+ host_arg = {portbindings.HOST_ID: L2_AGENT['host']}
+ host2_arg = {portbindings.HOST_ID: L2_AGENT_2['host']}
+ with self.port(subnet=subnet, cidr='10.0.0.0/24',
+ arg_list=(portbindings.HOST_ID,),
+ **host_arg) as port1:
+ with self.port(subnet=subnet, cidr='10.0.0.0/24',
+ arg_list=(portbindings.HOST_ID,),
+ **host2_arg) as port2:
+ p1 = port1['port']
+ device1 = 'tap' + p1['id']
+ self.callbacks.update_device_up(
+ self.adminContext,
+ agent_id=L2_AGENT['host'],
+ device=device1)
+ p2 = port2['port']
+ device2 = 'tap' + p2['id']
+ self.callbacks.update_device_up(
+ self.adminContext,
+ agent_id=L2_AGENT_2['host'],
+ device=device2)
+ data2 = {'port': {'binding:host_id': L2_AGENT_2['host']}}
+ req = self.new_update_request('ports', data2, p1['id'])
+ res = self.deserialize(self.fmt,
+ req.get_response(self.api))
+ self.assertEqual(res['port']['binding:host_id'],
+ L2_AGENT_2['host'])
+ self.mock_fanout.reset_mock()
+ self.callbacks.get_device_details(
+ self.adminContext,
+ device=device1,
+ agent_id=L2_AGENT_2['host'])
+ p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
+ expected = {'args':
+ {'fdb_entries':
+ {p1['network_id']:
+ {'ports':
+ {'20.0.0.1': [constants.FLOODING_ENTRY,
+ [p1['mac_address'],
+ p1_ips[0]]]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}},
+ 'namespace': None,
+ 'method': 'remove_fdb_entries'}
+
+ self.mock_fanout.assert_called_with(
+ mock.ANY, expected, topic=self.fanout_topic)
+
+ def test_host_changed_twice(self):
+ self._register_ml2_agents()
+ with self.subnet(network=self._network) as subnet:
+ host_arg = {portbindings.HOST_ID: L2_AGENT['host']}
+ host2_arg = {portbindings.HOST_ID: L2_AGENT_2['host']}
+ with self.port(subnet=subnet, cidr='10.0.0.0/24',
+ arg_list=(portbindings.HOST_ID,),
+ **host_arg) as port1:
+ with self.port(subnet=subnet, cidr='10.0.0.0/24',
+ arg_list=(portbindings.HOST_ID,),
+ **host2_arg) as port2:
+ p1 = port1['port']
+ device1 = 'tap' + p1['id']
+ self.callbacks.update_device_up(
+ self.adminContext,
+ agent_id=L2_AGENT['host'],
+ device=device1)
+ p2 = port2['port']
+ device2 = 'tap' + p2['id']
+ self.callbacks.update_device_up(
+ self.adminContext,
+ agent_id=L2_AGENT_2['host'],
+ device=device2)
+ data2 = {'port': {'binding:host_id': L2_AGENT_2['host']}}
+ req = self.new_update_request('ports', data2, p1['id'])
+ res = self.deserialize(self.fmt,
+ req.get_response(self.api))
+ self.assertEqual(res['port']['binding:host_id'],
+ L2_AGENT_2['host'])
+ data4 = {'port': {'binding:host_id': L2_AGENT_4['host']}}
+ req = self.new_update_request('ports', data4, p1['id'])
+ res = self.deserialize(self.fmt,
+ req.get_response(self.api))
+ self.assertEqual(res['port']['binding:host_id'],
+ L2_AGENT_4['host'])
+ self.mock_fanout.reset_mock()
+ self.callbacks.get_device_details(
+ self.adminContext,
+ device=device1,
+ agent_id=L2_AGENT_4['host'])
+ p1_ips = [p['ip_address'] for p in p1['fixed_ips']]
+ expected = {'args':
+ {'fdb_entries':
+ {p1['network_id']:
+ {'ports':
+ {'20.0.0.1': [constants.FLOODING_ENTRY,
+ [p1['mac_address'],
+ p1_ips[0]]]},
+ 'network_type': 'vxlan',
+ 'segment_id': 1}}},
+ 'namespace': None,
+ 'method': 'remove_fdb_entries'}
+
+ self.mock_fanout.assert_called_with(
+ mock.ANY, expected, topic=self.fanout_topic)
self._check_response(port['port'], vif_type, has_port_filter,
bound)
port_id = port['port']['id']
+ neutron_context = context.get_admin_context()
details = self.plugin.callbacks.get_device_details(
- None, agent_id="theAgentId", device=port_id)
+ neutron_context, agent_id="theAgentId", device=port_id)
if bound:
self.assertEqual(details['network_type'], 'local')
else: