else:
self.run_ofctl("del-flows", [flow_expr_str])
+ def dump_flows_for_table(self, table):
+ flow_str = "table=%s" % table
+ flows = self.run_ofctl("dump-flows", [flow_str])
+ retval = '\n'.join(item for item in flows.splitlines()
+ if 'NXST' not in item)
+ return retval
+
def defer_apply_on(self):
LOG.debug(_('defer_apply_on'))
self.defer_apply_flows = True
self.int_br = ovs_lib.OVSBridge(integ_br, self.root_helper)
self.setup_rpc()
self.setup_integration_br()
- self.setup_physical_bridges(bridge_mappings)
+ self.bridge_mappings = bridge_mappings
+ self.setup_physical_bridges(self.bridge_mappings)
self.local_vlan_map = {}
self.tun_br_ofports = {p_const.TYPE_GRE: {},
p_const.TYPE_VXLAN: {}}
self.tunnel_count = 0
self.vxlan_udp_port = cfg.CONF.AGENT.vxlan_udp_port
self._check_ovs_version()
+ self.tun_br = None
if self.enable_tunneling:
self.setup_tunnel_br(tun_br)
# Collect additional bridges to monitor
:param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'
'''
- if not self.available_local_vlans:
- LOG.error(_("No local VLAN available for net-id=%s"), net_uuid)
- return
- lvid = self.available_local_vlans.pop()
+ # On a restart or crash of OVS, the network associated with this VLAN
+ # will already be assigned, so check for that here before assigning a
+ # new one.
+ lvm = self.local_vlan_map.get(net_uuid)
+ if lvm:
+ lvid = lvm.vlan
+ else:
+ if not self.available_local_vlans:
+ LOG.error(_("No local VLAN available for net-id=%s"), net_uuid)
+ return
+ lvid = self.available_local_vlans.pop()
+ self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid,
+ network_type,
+ physical_network,
+ segmentation_id)
+
LOG.info(_("Assigning %(vlan_id)s as local vlan for "
"net-id=%(net_uuid)s"),
{'vlan_id': lvid, 'net_uuid': net_uuid})
- self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid, network_type,
- physical_network,
- segmentation_id)
if network_type in constants.TUNNEL_NETWORK_TYPES:
if self.enable_tunneling:
self.available_local_vlans.add(lvm.vlan)
def port_bound(self, port, net_uuid,
- network_type, physical_network, segmentation_id):
+ network_type, physical_network, segmentation_id,
+ ovs_restarted):
'''Bind port to net_uuid/lsw_id and install flow for inbound traffic
to vm.
:param network_type: the network type ('gre', 'vlan', 'flat', 'local')
:param physical_network: the physical network for 'vlan' or 'flat'
:param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'
+ :param ovs_restarted: indicates if this is called for an OVS restart.
'''
- if net_uuid not in self.local_vlan_map:
+ if net_uuid not in self.local_vlan_map or ovs_restarted:
self.provision_local_vlan(net_uuid, network_type,
physical_network, segmentation_id)
lvm = self.local_vlan_map[net_uuid]
self.int_br.remove_all_flows()
# switch all traffic using L2 learning
self.int_br.add_flow(priority=1, actions="normal")
+ # Add a canary flow to int_br to track OVS restarts
+ self.int_br.add_flow(table=constants.CANARY_TABLE, priority=0,
+ actions="drop")
def setup_ancillary_bridges(self, integ_br, tun_br):
'''Setup ancillary bridges - for example br-ex.'''
ancillary_bridges.append(br)
return ancillary_bridges
- def setup_tunnel_br(self, tun_br):
+ def setup_tunnel_br(self, tun_br=None):
'''Setup the tunnel bridge.
Creates tunnel bridge, and links it to the integration bridge
:param tun_br: the name of the tunnel bridge.
'''
- self.tun_br = ovs_lib.OVSBridge(tun_br, self.root_helper)
+ if not self.tun_br:
+ self.tun_br = ovs_lib.OVSBridge(tun_br, self.root_helper)
+
self.tun_br.reset_bridge()
self.patch_tun_ofport = self.int_br.add_patch_port(
cfg.CONF.OVS.int_peer_patch_port, cfg.CONF.OVS.tun_peer_patch_port)
'removed': removed}
def treat_vif_port(self, vif_port, port_id, network_id, network_type,
- physical_network, segmentation_id, admin_state_up):
+ physical_network, segmentation_id, admin_state_up,
+ ovs_restarted):
# When this function is called for a port, the port should have
# an OVS ofport configured, as only these ports were considered
# for being treated. If that does not happen, it is a potential
if vif_port:
if admin_state_up:
self.port_bound(vif_port, network_id, network_type,
- physical_network, segmentation_id)
+ physical_network, segmentation_id,
+ ovs_restarted)
else:
self.port_dead(vif_port)
else:
self.tun_br.delete_port(port_name)
self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
- def treat_devices_added_or_updated(self, devices):
+ def treat_devices_added_or_updated(self, devices, ovs_restarted):
resync = False
for device in devices:
LOG.debug(_("Processing port %s"), device)
details['network_type'],
details['physical_network'],
details['segmentation_id'],
- details['admin_state_up'])
+ details['admin_state_up'],
+ ovs_restarted)
# update plugin about port status
if details.get('admin_state_up'):
LOG.debug(_("Setting status for %s to UP"), device)
LOG.debug(_("Device %s not defined on plugin"), device)
return resync
- def process_network_ports(self, port_info):
+ def process_network_ports(self, port_info, ovs_restarted):
resync_a = False
resync_b = False
# TODO(salv-orlando): consider a solution for ensuring notifications
if devices_added_updated:
start = time.time()
resync_a = self.treat_devices_added_or_updated(
- devices_added_updated)
+ devices_added_updated, ovs_restarted)
LOG.debug(_("process_network_ports - iteration:%(iter_num)d -"
"treat_devices_added_or_updated completed "
"in %(elapsed).3f"),
port_info.get('removed') or
port_info.get('updated'))
+ def check_ovs_restart(self):
+ # Check for the canary flow
+ canary_flow = self.int_br.dump_flows_for_table(constants.CANARY_TABLE)
+ return not canary_flow
+
def rpc_loop(self, polling_manager=None):
if not polling_manager:
polling_manager = polling.AlwaysPoll()
updated_ports_copy = set()
ancillary_ports = set()
tunnel_sync = True
+ ovs_restarted = False
while True:
start = time.time()
port_stats = {'regular': {'added': 0,
except Exception:
LOG.exception(_("Error while synchronizing tunnels"))
tunnel_sync = True
- if self._agent_has_updates(polling_manager):
+ ovs_restarted = self.check_ovs_restart()
+ if ovs_restarted:
+ self.setup_integration_br()
+ self.setup_physical_bridges(self.bridge_mappings)
+ if self.enable_tunneling:
+ self.setup_tunnel_br()
+ if self._agent_has_updates(polling_manager) or ovs_restarted:
try:
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
"starting polling. Elapsed:%(elapsed).3f"),
# between these two statements, this will be thread-safe
updated_ports_copy = self.updated_ports
self.updated_ports = set()
- port_info = self.scan_ports(ports, updated_ports_copy)
+ reg_ports = (set() if ovs_restarted else ports)
+ port_info = self.scan_ports(reg_ports, updated_ports_copy)
ports = port_info['current']
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
"port information retrieved. "
# Secure and wire/unwire VIFs and update their status
# on Neutron server
if (self._port_info_has_changes(port_info) or
- self.sg_agent.firewall_refresh_needed()):
+ self.sg_agent.firewall_refresh_needed() or
+ ovs_restarted):
LOG.debug(_("Starting to process devices in:%s"),
port_info)
# If treat devices fails - must resync with plugin
- sync = self.process_network_ports(port_info)
+ sync = self.process_network_ports(port_info,
+ ovs_restarted)
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d -"
"ports processed. Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
LEARN_FROM_TUN = 10
UCAST_TO_TUN = 20
FLOOD_TO_TUN = 21
+CANARY_TABLE = 22
+
# Map tunnel types to tables number
TUN_TABLE = {p_const.TYPE_GRE: GRE_TUN_TO_LV,
p_const.TYPE_VXLAN: VXLAN_TUN_TO_LV}
from oslo.config import cfg
import testtools
+from neutron.agent.linux import async_process
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib
from neutron.agent.linux import utils
from neutron.common import constants as n_const
+from neutron.openstack.common import log
from neutron.plugins.common import constants as p_const
from neutron.plugins.openvswitch.agent import ovs_neutron_agent
from neutron.plugins.openvswitch.common import constants
'db_get_val', return_value=str(old_local_vlan)),
mock.patch.object(self.agent.int_br, 'delete_flows')
) as (set_ovs_db_func, get_ovs_db_func, delete_flows_func):
- self.agent.port_bound(port, net_uuid, 'local', None, None)
+ self.agent.port_bound(port, net_uuid, 'local', None, None, False)
get_ovs_db_func.assert_called_once_with("Port", mock.ANY, "tag")
if new_local_vlan != old_local_vlan:
set_ovs_db_func.assert_called_once_with(
side_effect=Exception()),
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
return_value=mock.Mock())):
- self.assertTrue(self.agent.treat_devices_added_or_updated([{}]))
+ self.assertTrue(self.agent.treat_devices_added_or_updated([{}],
+ False))
def _mock_treat_devices_added_updated(self, details, port, func_name):
"""Mock treat devices added or updated.
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
mock.patch.object(self.agent, func_name)
) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func):
- self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
+ self.assertFalse(self.agent.treat_devices_added_or_updated([{}],
+ False))
return func.called
def test_treat_devices_added_updated_ignores_invalid_ofport(self):
mock.patch.object(self.agent, 'treat_vif_port')
) as (get_dev_fn, get_vif_func, upd_dev_up,
upd_dev_down, treat_vif_port):
- self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
+ self.assertFalse(self.agent.treat_devices_added_or_updated([{}],
+ False))
self.assertTrue(treat_vif_port.called)
self.assertTrue(upd_dev_down.called)
mock.patch.object(self.agent, "treat_devices_removed",
return_value=False)
) as (setup_port_filters, device_added_updated, device_removed):
- self.assertFalse(self.agent.process_network_ports(port_info))
+ self.assertFalse(self.agent.process_network_ports(port_info,
+ False))
setup_port_filters.assert_called_once_with(
port_info['added'], port_info.get('updated', set()))
device_added_updated.assert_called_once_with(
- port_info['added'] | port_info.get('updated', set()))
+ port_info['added'] | port_info.get('updated', set()), False)
device_removed.assert_called_once_with(port_info['removed'])
def test_process_network_ports(self):
expected_calls = [mock.call('gre-0a0a0a0a', '10.10.10.10', 'gre')]
self.agent.setup_tunnel_port.assert_has_calls(expected_calls)
+ def test_ovs_restart(self):
+ reply2 = {'current': set(['tap0']),
+ 'added': set(['tap2']),
+ 'removed': set([])}
+
+ reply3 = {'current': set(['tap2']),
+ 'added': set([]),
+ 'removed': set(['tap0'])}
+
+ with contextlib.nested(
+ mock.patch.object(async_process.AsyncProcess, "_spawn"),
+ mock.patch.object(log.ContextAdapter, 'exception'),
+ mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
+ 'scan_ports'),
+ mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
+ 'process_network_ports'),
+ mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
+ 'check_ovs_restart'),
+ mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
+ 'setup_integration_br'),
+ mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
+ 'setup_physical_bridges')
+ ) as (spawn_fn, log_exception, scan_ports, process_network_ports,
+ check_ovs_restart, setup_int_br, setup_phys_br):
+ log_exception.side_effect = Exception(
+ 'Fake exception to get out of the loop')
+ scan_ports.side_effect = [reply2, reply3]
+ process_network_ports.side_effect = [
+ False, Exception('Fake exception to get out of the loop')]
+ check_ovs_restart.side_effect = [False, True]
+
+ # This will exit after the second loop
+ try:
+ self.agent.daemon_loop()
+ except Exception:
+ pass
+
+ scan_ports.assert_has_calls([
+ mock.call(set(), set()),
+ mock.call(set(), set())
+ ])
+ process_network_ports.assert_has_calls([
+ mock.call({'current': set(['tap0']),
+ 'removed': set([]),
+ 'added': set(['tap2'])}, False),
+ mock.call({'current': set(['tap2']),
+ 'removed': set(['tap0']),
+ 'added': set([])}, True)
+ ])
+
+ # Verify the second time through the loop we triggered an
+ # OVS restart and re-setup the bridges
+ setup_int_br.assert_has_calls([mock.call()])
+ setup_phys_br.assert_has_calls([mock.call({})])
+
class AncillaryBridgesTest(base.BaseTestCase):
mock.call.delete_port('patch-tun'),
mock.call.remove_all_flows(),
mock.call.add_flow(priority=1, actions='normal'),
+ mock.call.add_flow(priority=0, table=constants.CANARY_TABLE,
+ actions='drop')
]
self.mock_map_tun_bridge = self.ovs_bridges[self.MAP_TUN_BRIDGE]
'sudo', 2, ['gre'],
self.VETH_MTU)
a.local_vlan_map[NET_UUID] = LVM
- a.port_bound(VIF_PORT, NET_UUID, 'gre', None, LS_ID)
+ a.port_bound(VIF_PORT, NET_UUID, 'gre', None, LS_ID, False)
self._verify_mock_calls()
def test_port_unbound(self):
'added': set([]),
'removed': set(['tap0'])}
+ self.mock_int_bridge_expected += [
+ mock.call.dump_flows_for_table(constants.CANARY_TABLE),
+ mock.call.dump_flows_for_table(constants.CANARY_TABLE)
+ ]
+
with contextlib.nested(
mock.patch.object(log.ContextAdapter, 'exception'),
mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
process_network_ports.assert_has_calls([
mock.call({'current': set(['tap0']),
'removed': set([]),
- 'added': set(['tap2'])}),
+ 'added': set(['tap2'])}, False),
mock.call({'current': set(['tap2']),
'removed': set(['tap0']),
- 'added': set([])})
+ 'added': set([])}, False)
])
self._verify_mock_calls()