if not host or host == cfg.CONF.host:
self.fdb_remove(context, fdb_entries)
+ @log.log
+ def update_fdb_entries(self, context, fdb_entries, host=None):
+ if not host or host == cfg.CONF.host:
+ self.fdb_update(context, fdb_entries)
+
@abc.abstractmethod
def fdb_add(self, context, fdb_entries):
pass
@abc.abstractmethod
def fdb_remove(self, context, fdb_entries):
pass
+
+ @abc.abstractmethod
+ def fdb_update(self, context, fdb_entries):
+ pass
ports,
interface)
+ def _fdb_chg_ip(self, context, fdb_entries):
+ LOG.debug(_("update chg_ip received"))
+ for network_id, agent_ports in fdb_entries.items():
+ segment = self.agent.br_mgr.network_map.get(network_id)
+ if not segment:
+ return
+
+ if segment.network_type != lconst.TYPE_VXLAN:
+ return
+
+ interface = self.agent.br_mgr.get_vxlan_device_name(
+ segment.segmentation_id)
+
+ for agent_ip, state in agent_ports.items():
+ if agent_ip == self.agent.br_mgr.local_ip:
+ continue
+
+ after = state.get('after')
+ for mac, ip in after:
+ self.agent.br_mgr.add_fdb_ip_entry(mac, ip, interface)
+
+ before = state.get('before')
+ for mac, ip in before:
+ self.agent.br_mgr.remove_fdb_ip_entry(mac, ip, interface)
+
+ def fdb_update(self, context, fdb_entries):
+ LOG.debug(_("fdb_update received"))
+ for action, values in fdb_entries.items():
+ method = '_fdb_' + action
+ if not hasattr(self, method):
+ raise NotImplementedError()
+
+ getattr(self, method)(context, values)
+
def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.
def initialize(self):
LOG.debug(_("Experimental L2 population driver"))
+ self.rpc_ctx = n_context.get_admin_context_without_session()
def _get_port_fdb_entries(self, port):
return [[port['mac_address'],
self.remove_fdb_entries = self._update_port_down(context)
def delete_port_postcommit(self, context):
- self._notify_remove_fdb_entries(context,
- self.remove_fdb_entries)
-
- def _notify_remove_fdb_entries(self, context, fdb_entries):
- rpc_ctx = n_context.get_admin_context_without_session()
l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
- rpc_ctx, fdb_entries)
+ self.rpc_ctx, self.remove_fdb_entries)
+
+ def _get_diff_ips(self, orig, port):
+ orig_ips = set([ip['ip_address'] for ip in orig['fixed_ips']])
+ port_ips = set([ip['ip_address'] for ip in port['fixed_ips']])
+
+ # check if an ip has been added or removed
+ orig_chg_ips = orig_ips.difference(port_ips)
+ port_chg_ips = port_ips.difference(orig_ips)
+
+ if orig_chg_ips or port_chg_ips:
+ return orig_chg_ips, port_chg_ips
+
+ def _fixed_ips_changed(self, context, orig, port):
+ diff_ips = self._get_diff_ips(orig, port)
+ if not diff_ips:
+ return
+ orig_ips, port_ips = diff_ips
+
+ port_infos = self._get_port_infos(context, orig)
+ if not port_infos:
+ return
+ agent, agent_ip, segment, port_fdb_entries = port_infos
+
+ orig_mac_ip = [[port['mac_address'], ip] for ip in orig_ips]
+ port_mac_ip = [[port['mac_address'], ip] for ip in port_ips]
+
+ upd_fdb_entries = {port['network_id']: {agent_ip: {}}}
+
+ ports = upd_fdb_entries[port['network_id']][agent_ip]
+ if orig_mac_ip:
+ ports['before'] = orig_mac_ip
+
+ if port_mac_ip:
+ ports['after'] = port_mac_ip
+
+ l2pop_rpc.L2populationAgentNotify.update_fdb_entries(
+ self.rpc_ctx, {'chg_ip': upd_fdb_entries})
+
+ return True
def update_port_postcommit(self, context):
port = context.current
orig = context.original
if port['status'] == orig['status']:
- return
+ self._fixed_ips_changed(context, orig, port)
if port['status'] == const.PORT_STATUS_ACTIVE:
self._update_port_up(context)
elif port['status'] == const.PORT_STATUS_DOWN:
fdb_entries = self._update_port_down(context)
- self._notify_remove_fdb_entries(context, fdb_entries)
+ l2pop_rpc.L2populationAgentNotify.remove_fdb_entries(
+ self.rpc_ctx, fdb_entries)
- def _update_port_up(self, context):
- port_context = context.current
- network_id = port_context['network_id']
- agent_host = port_context['binding:host_id']
+ def _get_port_infos(self, context, port):
+ agent_host = port['binding:host_id']
if not agent_host:
return
agent_ip = self.get_agent_ip(agent)
if not agent_ip:
- LOG.warning(_("Unable to retrieve the tunelling ip of agent %s"),
- agent_host)
+ LOG.warning(_("Unable to retrieve the agent ip, check the agent "
+ "configuration."))
return
segment = context.bound_segment
if not segment:
LOG.warning(_("Port %(port)s updated by agent %(agent)s "
"isn't bound to any segment"),
- {'port': port_context['id'], 'agent': agent.host})
+ {'port': port['id'], 'agent': agent})
return
tunnel_types = self.get_agent_tunnel_types(agent)
if segment['network_type'] not in tunnel_types:
return
+ fdb_entries = self._get_port_fdb_entries(port)
+
+ return agent, agent_ip, segment, fdb_entries
+
+ def _update_port_up(self, context):
+ port_context = context.current
+ port_infos = self._get_port_infos(context, port_context)
+ if not port_infos:
+ return
+ agent, agent_ip, segment, port_fdb_entries = port_infos
+
+ agent_host = port_context['binding:host_id']
+ network_id = port_context['network_id']
+
+ session = db_api.get_session()
agent_ports = self.get_agent_network_port_count(session, agent_host,
network_id)
- rpc_ctx = n_context.get_admin_context_without_session()
-
other_fdb_entries = {network_id:
{'segment_id': segment['segmentation_id'],
'network_type': segment['network_type'],
if ports.keys():
l2pop_rpc.L2populationAgentNotify.add_fdb_entries(
- rpc_ctx, agent_fdb_entries, agent_host)
+ self.rpc_ctx, agent_fdb_entries, agent_host)
# Notify other agents to add fdb rule for current port
- fdb_entries = self._get_port_fdb_entries(port_context)
- other_fdb_entries[network_id]['ports'][agent_ip] += fdb_entries
+ other_fdb_entries[network_id]['ports'][agent_ip] += port_fdb_entries
- l2pop_rpc.L2populationAgentNotify.add_fdb_entries(rpc_ctx,
+ l2pop_rpc.L2populationAgentNotify.add_fdb_entries(self.rpc_ctx,
other_fdb_entries)
def _update_port_down(self, context):
port_context = context.current
- network_id = port_context['network_id']
+ port_infos = self._get_port_infos(context, port_context)
+ if not port_infos:
+ return
+ agent, agent_ip, segment, port_fdb_entries = port_infos
agent_host = port_context['binding:host_id']
- if not agent_host:
- return
+ network_id = port_context['network_id']
session = db_api.get_session()
- agent = self.get_agent_by_host(session, agent_host)
- if not agent:
- return
-
- agent_ip = self.get_agent_ip(agent)
- if not agent_ip:
- LOG.warning(_("Unable to retrieve the agent ip, check the agent "
- "configuration."))
- return
-
- segment = context.bound_segment
- if not segment:
- LOG.warning(_("Port %(port)s updated by agent %(agent)s "
- "isn't bound to any segment"),
- {'port': port_context['id'], 'agent': agent})
- return
-
- tunnel_types = self.get_agent_tunnel_types(agent)
- if segment['network_type'] not in tunnel_types:
- return
-
agent_ports = self.get_agent_network_port_count(session, agent_host,
network_id)
self._notification_fanout(context, 'remove_fdb_entries',
fdb_entries)
+ def update_fdb_entries(self, context, fdb_entries, host=None):
+ if fdb_entries:
+ if host:
+ self._notification_host(context, 'update_fdb_entries',
+ fdb_entries, host)
+ else:
+ self._notification_fanout(context, 'update_fdb_entries',
+ fdb_entries)
+
L2populationAgentNotify = L2populationAgentNotifyAPI()
dl_vlan=lvm.vlan,
dl_dst=port_info[0])
+ def fdb_update(self, context, fdb_entries):
+ LOG.debug(_("fdb_update received"))
+ for action, values in fdb_entries.items():
+ method = '_fdb_' + action
+ if not hasattr(self, method):
+ raise NotImplementedError()
+
+ getattr(self, method)(context, values)
+
def create_rpc_dispatcher(self):
'''Get the rpc dispatcher for this manager.
check_exit_code=False),
]
execute_fn.assert_has_calls(expected)
+
+ def test_fdb_update_chg_ip(self):
+ fdb_entries = {'chg_ip':
+ {'net_id':
+ {'agent_ip':
+ {'before': [['port_mac', 'port_ip_1']],
+ 'after': [['port_mac', 'port_ip_2']]}}}}
+
+ with mock.patch.object(utils, 'execute',
+ return_value='') as execute_fn:
+ self.lb_rpc.fdb_update(None, fdb_entries)
+
+ expected = [
+ mock.call(['ip', 'neigh', 'add', 'port_ip_2', 'lladdr',
+ 'port_mac', 'dev', 'vxlan-1', 'nud', 'permanent'],
+ root_helper=self.root_helper,
+ check_exit_code=False),
+ mock.call(['ip', 'neigh', 'del', 'port_ip_1', 'lladdr',
+ 'port_mac', 'dev', 'vxlan-1'],
+ root_helper=self.root_helper,
+ check_exit_code=False)
+ ]
+ execute_fn.assert_has_calls(expected)
self.mock_fanout.assert_any_call(
mock.ANY, expected, topic=self.fanout_topic)
+
+ def test_fixed_ips_changed(self):
+ self._register_ml2_agents()
+
+ with self.subnet(network=self._network) as subnet:
+ host_arg = {portbindings.HOST_ID: HOST}
+ with self.port(subnet=subnet, cidr='10.0.0.0/24',
+ arg_list=(portbindings.HOST_ID,),
+ **host_arg) as port1:
+ p1 = port1['port']
+
+ self.mock_fanout.reset_mock()
+
+ data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.2'},
+ {'ip_address': '10.0.0.10'}]}}
+ req = self.new_update_request('ports', data, p1['id'])
+ res = self.deserialize(self.fmt, req.get_response(self.api))
+ ips = res['port']['fixed_ips']
+ self.assertEqual(len(ips), 2)
+
+ add_expected = {'args':
+ {'fdb_entries':
+ {'chg_ip':
+ {p1['network_id']:
+ {'20.0.0.1':
+ {'after': [[p1['mac_address'],
+ '10.0.0.10']]}}}}},
+ 'namespace': None,
+ 'method': 'update_fdb_entries'}
+
+ self.mock_fanout.assert_any_call(
+ mock.ANY, add_expected, topic=self.fanout_topic)
+
+ self.mock_fanout.reset_mock()
+
+ data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.2'},
+ {'ip_address': '10.0.0.16'}]}}
+ req = self.new_update_request('ports', data, p1['id'])
+ res = self.deserialize(self.fmt, req.get_response(self.api))
+ ips = res['port']['fixed_ips']
+ self.assertEqual(len(ips), 2)
+
+ upd_expected = {'args':
+ {'fdb_entries':
+ {'chg_ip':
+ {p1['network_id']:
+ {'20.0.0.1':
+ {'before': [[p1['mac_address'],
+ '10.0.0.10']],
+ 'after': [[p1['mac_address'],
+ '10.0.0.16']]}}}}},
+ 'namespace': None,
+ 'method': 'update_fdb_entries'}
+
+ self.mock_fanout.assert_any_call(
+ mock.ANY, upd_expected, topic=self.fanout_topic)
+
+ self.mock_fanout.reset_mock()
+
+ data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.16'}]}}
+ req = self.new_update_request('ports', data, p1['id'])
+ res = self.deserialize(self.fmt, req.get_response(self.api))
+ ips = res['port']['fixed_ips']
+ self.assertEqual(len(ips), 1)
+
+ del_expected = {'args':
+ {'fdb_entries':
+ {'chg_ip':
+ {p1['network_id']:
+ {'20.0.0.1':
+ {'before': [[p1['mac_address'],
+ '10.0.0.2']]}}}}},
+ 'namespace': None,
+ 'method': 'update_fdb_entries'}
+
+ self.mock_fanout.assert_any_call(
+ mock.ANY, del_expected, topic=self.fanout_topic)