From c3664b5e171a4d30b8a6f0ba89670d39ea3bc34e Mon Sep 17 00:00:00 2001 From: Kyle Mestery Date: Fri, 16 May 2014 04:21:32 +0000 Subject: [PATCH] Reprogram flows when ovs-vswitchd restarts When OVS is restarted, by default it will not reprogram flows which were programmed. For the case of the OVS agent, this means a restart will cause all traffic to be switched using the NORMAL action. This is undesirable for a number of reasons, including obvious security reasons. This change provides a way for the agent to check if a restart of ovs-vswitchd has happened in the main agent loop. If a restart of ovs-vswitchd is detected, the agent will run through the setup of the bridges on the host and reprogram flows for all the ports connected. DocImpact This changes adds a new table (table 23) to the integration bridge, with a single 'drop' flow. This is used to monitor OVS restarts and to reprogram flows from the agent. Conflicts: neutron/plugins/openvswitch/common/constants.py Change-Id: If9e07465c43115838de23e12a4e0087c9218cea2 Closes-Bug: #1290486 (cherry picked from commit 8e9f00a19dab98e5cfc7ca32beb9f17ebb5bc1bb) --- neutron/agent/linux/ovs_lib.py | 7 ++ .../openvswitch/agent/ovs_neutron_agent.py | 80 ++++++++++++++----- .../plugins/openvswitch/common/constants.py | 2 + .../openvswitch/test_ovs_neutron_agent.py | 73 +++++++++++++++-- .../tests/unit/openvswitch/test_ovs_tunnel.py | 13 ++- 5 files changed, 144 insertions(+), 31 deletions(-) diff --git a/neutron/agent/linux/ovs_lib.py b/neutron/agent/linux/ovs_lib.py index 9fc2c8162..a8bb12d25 100644 --- a/neutron/agent/linux/ovs_lib.py +++ b/neutron/agent/linux/ovs_lib.py @@ -201,6 +201,13 @@ class OVSBridge(BaseOVS): else: self.run_ofctl("del-flows", [flow_expr_str]) + def dump_flows_for_table(self, table): + flow_str = "table=%s" % table + flows = self.run_ofctl("dump-flows", [flow_str]) + retval = '\n'.join(item for item in flows.splitlines() + if 'NXST' not in item) + return retval + def defer_apply_on(self): LOG.debug(_('defer_apply_on')) self.defer_apply_flows = True diff --git a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py index 11ff68218..79c03f1de 100644 --- a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py @@ -195,7 +195,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.int_br = ovs_lib.OVSBridge(integ_br, self.root_helper) self.setup_rpc() self.setup_integration_br() - self.setup_physical_bridges(bridge_mappings) + self.bridge_mappings = bridge_mappings + self.setup_physical_bridges(self.bridge_mappings) self.local_vlan_map = {} self.tun_br_ofports = {p_const.TYPE_GRE: {}, p_const.TYPE_VXLAN: {}} @@ -212,6 +213,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.tunnel_count = 0 self.vxlan_udp_port = cfg.CONF.AGENT.vxlan_udp_port self._check_ovs_version() + self.tun_br = None if self.enable_tunneling: self.setup_tunnel_br(tun_br) # Collect additional bridges to monitor @@ -434,16 +436,25 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, :param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel' ''' - if not self.available_local_vlans: - LOG.error(_("No local VLAN available for net-id=%s"), net_uuid) - return - lvid = self.available_local_vlans.pop() + # On a restart or crash of OVS, the network associated with this VLAN + # will already be assigned, so check for that here before assigning a + # new one. + lvm = self.local_vlan_map.get(net_uuid) + if lvm: + lvid = lvm.vlan + else: + if not self.available_local_vlans: + LOG.error(_("No local VLAN available for net-id=%s"), net_uuid) + return + lvid = self.available_local_vlans.pop() + self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid, + network_type, + physical_network, + segmentation_id) + LOG.info(_("Assigning %(vlan_id)s as local vlan for " "net-id=%(net_uuid)s"), {'vlan_id': lvid, 'net_uuid': net_uuid}) - self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid, network_type, - physical_network, - segmentation_id) if network_type in constants.TUNNEL_NETWORK_TYPES: if self.enable_tunneling: @@ -576,7 +587,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.available_local_vlans.add(lvm.vlan) def port_bound(self, port, net_uuid, - network_type, physical_network, segmentation_id): + network_type, physical_network, segmentation_id, + ovs_restarted): '''Bind port to net_uuid/lsw_id and install flow for inbound traffic to vm. @@ -585,8 +597,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, :param network_type: the network type ('gre', 'vlan', 'flat', 'local') :param physical_network: the physical network for 'vlan' or 'flat' :param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel' + :param ovs_restarted: indicates if this is called for an OVS restart. ''' - if net_uuid not in self.local_vlan_map: + if net_uuid not in self.local_vlan_map or ovs_restarted: self.provision_local_vlan(net_uuid, network_type, physical_network, segmentation_id) lvm = self.local_vlan_map[net_uuid] @@ -647,6 +660,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.int_br.remove_all_flows() # switch all traffic using L2 learning self.int_br.add_flow(priority=1, actions="normal") + # Add a canary flow to int_br to track OVS restarts + self.int_br.add_flow(table=constants.CANARY_TABLE, priority=0, + actions="drop") def setup_ancillary_bridges(self, integ_br, tun_br): '''Setup ancillary bridges - for example br-ex.''' @@ -674,7 +690,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, ancillary_bridges.append(br) return ancillary_bridges - def setup_tunnel_br(self, tun_br): + def setup_tunnel_br(self, tun_br=None): '''Setup the tunnel bridge. Creates tunnel bridge, and links it to the integration bridge @@ -682,7 +698,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, :param tun_br: the name of the tunnel bridge. ''' - self.tun_br = ovs_lib.OVSBridge(tun_br, self.root_helper) + if not self.tun_br: + self.tun_br = ovs_lib.OVSBridge(tun_br, self.root_helper) + self.tun_br.reset_bridge() self.patch_tun_ofport = self.int_br.add_patch_port( cfg.CONF.OVS.int_peer_patch_port, cfg.CONF.OVS.tun_peer_patch_port) @@ -877,7 +895,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, 'removed': removed} def treat_vif_port(self, vif_port, port_id, network_id, network_type, - physical_network, segmentation_id, admin_state_up): + physical_network, segmentation_id, admin_state_up, + ovs_restarted): # When this function is called for a port, the port should have # an OVS ofport configured, as only these ports were considered # for being treated. If that does not happen, it is a potential @@ -888,7 +907,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, if vif_port: if admin_state_up: self.port_bound(vif_port, network_id, network_type, - physical_network, segmentation_id) + physical_network, segmentation_id, + ovs_restarted) else: self.port_dead(vif_port) else: @@ -946,7 +966,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.tun_br.delete_port(port_name) self.tun_br_ofports[tunnel_type].pop(remote_ip, None) - def treat_devices_added_or_updated(self, devices): + def treat_devices_added_or_updated(self, devices, ovs_restarted): resync = False for device in devices: LOG.debug(_("Processing port %s"), device) @@ -978,7 +998,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, details['network_type'], details['physical_network'], details['segmentation_id'], - details['admin_state_up']) + details['admin_state_up'], + ovs_restarted) # update plugin about port status if details.get('admin_state_up'): LOG.debug(_("Setting status for %s to UP"), device) @@ -1055,7 +1076,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, LOG.debug(_("Device %s not defined on plugin"), device) return resync - def process_network_ports(self, port_info): + def process_network_ports(self, port_info, ovs_restarted): resync_a = False resync_b = False # TODO(salv-orlando): consider a solution for ensuring notifications @@ -1078,7 +1099,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, if devices_added_updated: start = time.time() resync_a = self.treat_devices_added_or_updated( - devices_added_updated) + devices_added_updated, ovs_restarted) LOG.debug(_("process_network_ports - iteration:%(iter_num)d -" "treat_devices_added_or_updated completed " "in %(elapsed).3f"), @@ -1166,6 +1187,11 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, port_info.get('removed') or port_info.get('updated')) + def check_ovs_restart(self): + # Check for the canary flow + canary_flow = self.int_br.dump_flows_for_table(constants.CANARY_TABLE) + return not canary_flow + def rpc_loop(self, polling_manager=None): if not polling_manager: polling_manager = polling.AlwaysPoll() @@ -1175,6 +1201,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, updated_ports_copy = set() ancillary_ports = set() tunnel_sync = True + ovs_restarted = False while True: start = time.time() port_stats = {'regular': {'added': 0, @@ -1198,7 +1225,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, except Exception: LOG.exception(_("Error while synchronizing tunnels")) tunnel_sync = True - if self._agent_has_updates(polling_manager): + ovs_restarted = self.check_ovs_restart() + if ovs_restarted: + self.setup_integration_br() + self.setup_physical_bridges(self.bridge_mappings) + if self.enable_tunneling: + self.setup_tunnel_br() + if self._agent_has_updates(polling_manager) or ovs_restarted: try: LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - " "starting polling. Elapsed:%(elapsed).3f"), @@ -1210,7 +1243,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, # between these two statements, this will be thread-safe updated_ports_copy = self.updated_ports self.updated_ports = set() - port_info = self.scan_ports(ports, updated_ports_copy) + reg_ports = (set() if ovs_restarted else ports) + port_info = self.scan_ports(reg_ports, updated_ports_copy) ports = port_info['current'] LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - " "port information retrieved. " @@ -1220,11 +1254,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, # Secure and wire/unwire VIFs and update their status # on Neutron server if (self._port_info_has_changes(port_info) or - self.sg_agent.firewall_refresh_needed()): + self.sg_agent.firewall_refresh_needed() or + ovs_restarted): LOG.debug(_("Starting to process devices in:%s"), port_info) # If treat devices fails - must resync with plugin - sync = self.process_network_ports(port_info) + sync = self.process_network_ports(port_info, + ovs_restarted) LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d -" "ports processed. Elapsed:%(elapsed).3f"), {'iter_num': self.iter_num, diff --git a/neutron/plugins/openvswitch/common/constants.py b/neutron/plugins/openvswitch/common/constants.py index 3a5b4aaae..5e57b72cd 100644 --- a/neutron/plugins/openvswitch/common/constants.py +++ b/neutron/plugins/openvswitch/common/constants.py @@ -46,6 +46,8 @@ VXLAN_TUN_TO_LV = 3 LEARN_FROM_TUN = 10 UCAST_TO_TUN = 20 FLOOD_TO_TUN = 21 +CANARY_TABLE = 22 + # Map tunnel types to tables number TUN_TABLE = {p_const.TYPE_GRE: GRE_TUN_TO_LV, p_const.TYPE_VXLAN: VXLAN_TUN_TO_LV} diff --git a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py index e6eebeffa..94d3c9371 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py @@ -21,10 +21,12 @@ import mock from oslo.config import cfg import testtools +from neutron.agent.linux import async_process from neutron.agent.linux import ip_lib from neutron.agent.linux import ovs_lib from neutron.agent.linux import utils from neutron.common import constants as n_const +from neutron.openstack.common import log from neutron.plugins.common import constants as p_const from neutron.plugins.openvswitch.agent import ovs_neutron_agent from neutron.plugins.openvswitch.common import constants @@ -139,7 +141,7 @@ class TestOvsNeutronAgent(base.BaseTestCase): 'db_get_val', return_value=str(old_local_vlan)), mock.patch.object(self.agent.int_br, 'delete_flows') ) as (set_ovs_db_func, get_ovs_db_func, delete_flows_func): - self.agent.port_bound(port, net_uuid, 'local', None, None) + self.agent.port_bound(port, net_uuid, 'local', None, None, False) get_ovs_db_func.assert_called_once_with("Port", mock.ANY, "tag") if new_local_vlan != old_local_vlan: set_ovs_db_func.assert_called_once_with( @@ -275,7 +277,8 @@ class TestOvsNeutronAgent(base.BaseTestCase): side_effect=Exception()), mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', return_value=mock.Mock())): - self.assertTrue(self.agent.treat_devices_added_or_updated([{}])) + self.assertTrue(self.agent.treat_devices_added_or_updated([{}], + False)) def _mock_treat_devices_added_updated(self, details, port, func_name): """Mock treat devices added or updated. @@ -294,7 +297,8 @@ class TestOvsNeutronAgent(base.BaseTestCase): mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), mock.patch.object(self.agent, func_name) ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func): - self.assertFalse(self.agent.treat_devices_added_or_updated([{}])) + self.assertFalse(self.agent.treat_devices_added_or_updated([{}], + False)) return func.called def test_treat_devices_added_updated_ignores_invalid_ofport(self): @@ -341,7 +345,8 @@ class TestOvsNeutronAgent(base.BaseTestCase): mock.patch.object(self.agent, 'treat_vif_port') ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, treat_vif_port): - self.assertFalse(self.agent.treat_devices_added_or_updated([{}])) + self.assertFalse(self.agent.treat_devices_added_or_updated([{}], + False)) self.assertTrue(treat_vif_port.called) self.assertTrue(upd_dev_down.called) @@ -372,11 +377,12 @@ class TestOvsNeutronAgent(base.BaseTestCase): mock.patch.object(self.agent, "treat_devices_removed", return_value=False) ) as (setup_port_filters, device_added_updated, device_removed): - self.assertFalse(self.agent.process_network_ports(port_info)) + self.assertFalse(self.agent.process_network_ports(port_info, + False)) setup_port_filters.assert_called_once_with( port_info['added'], port_info.get('updated', set())) device_added_updated.assert_called_once_with( - port_info['added'] | port_info.get('updated', set())) + port_info['added'] | port_info.get('updated', set()), False) device_removed.assert_called_once_with(port_info['removed']) def test_process_network_ports(self): @@ -789,6 +795,61 @@ class TestOvsNeutronAgent(base.BaseTestCase): expected_calls = [mock.call('gre-0a0a0a0a', '10.10.10.10', 'gre')] self.agent.setup_tunnel_port.assert_has_calls(expected_calls) + def test_ovs_restart(self): + reply2 = {'current': set(['tap0']), + 'added': set(['tap2']), + 'removed': set([])} + + reply3 = {'current': set(['tap2']), + 'added': set([]), + 'removed': set(['tap0'])} + + with contextlib.nested( + mock.patch.object(async_process.AsyncProcess, "_spawn"), + mock.patch.object(log.ContextAdapter, 'exception'), + mock.patch.object(ovs_neutron_agent.OVSNeutronAgent, + 'scan_ports'), + mock.patch.object(ovs_neutron_agent.OVSNeutronAgent, + 'process_network_ports'), + mock.patch.object(ovs_neutron_agent.OVSNeutronAgent, + 'check_ovs_restart'), + mock.patch.object(ovs_neutron_agent.OVSNeutronAgent, + 'setup_integration_br'), + mock.patch.object(ovs_neutron_agent.OVSNeutronAgent, + 'setup_physical_bridges') + ) as (spawn_fn, log_exception, scan_ports, process_network_ports, + check_ovs_restart, setup_int_br, setup_phys_br): + log_exception.side_effect = Exception( + 'Fake exception to get out of the loop') + scan_ports.side_effect = [reply2, reply3] + process_network_ports.side_effect = [ + False, Exception('Fake exception to get out of the loop')] + check_ovs_restart.side_effect = [False, True] + + # This will exit after the second loop + try: + self.agent.daemon_loop() + except Exception: + pass + + scan_ports.assert_has_calls([ + mock.call(set(), set()), + mock.call(set(), set()) + ]) + process_network_ports.assert_has_calls([ + mock.call({'current': set(['tap0']), + 'removed': set([]), + 'added': set(['tap2'])}, False), + mock.call({'current': set(['tap2']), + 'removed': set(['tap0']), + 'added': set([])}, True) + ]) + + # Verify the second time through the loop we triggered an + # OVS restart and re-setup the bridges + setup_int_br.assert_has_calls([mock.call()]) + setup_phys_br.assert_has_calls([mock.call({})]) + class AncillaryBridgesTest(base.BaseTestCase): diff --git a/neutron/tests/unit/openvswitch/test_ovs_tunnel.py b/neutron/tests/unit/openvswitch/test_ovs_tunnel.py index 1940730fa..126b9ccb1 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_tunnel.py +++ b/neutron/tests/unit/openvswitch/test_ovs_tunnel.py @@ -105,6 +105,8 @@ class TunnelTest(base.BaseTestCase): mock.call.delete_port('patch-tun'), mock.call.remove_all_flows(), mock.call.add_flow(priority=1, actions='normal'), + mock.call.add_flow(priority=0, table=constants.CANARY_TABLE, + actions='drop') ] self.mock_map_tun_bridge = self.ovs_bridges[self.MAP_TUN_BRIDGE] @@ -435,7 +437,7 @@ class TunnelTest(base.BaseTestCase): 'sudo', 2, ['gre'], self.VETH_MTU) a.local_vlan_map[NET_UUID] = LVM - a.port_bound(VIF_PORT, NET_UUID, 'gre', None, LS_ID) + a.port_bound(VIF_PORT, NET_UUID, 'gre', None, LS_ID, False) self._verify_mock_calls() def test_port_unbound(self): @@ -511,6 +513,11 @@ class TunnelTest(base.BaseTestCase): 'added': set([]), 'removed': set(['tap0'])} + self.mock_int_bridge_expected += [ + mock.call.dump_flows_for_table(constants.CANARY_TABLE), + mock.call.dump_flows_for_table(constants.CANARY_TABLE) + ] + with contextlib.nested( mock.patch.object(log.ContextAdapter, 'exception'), mock.patch.object(ovs_neutron_agent.OVSNeutronAgent, @@ -549,10 +556,10 @@ class TunnelTest(base.BaseTestCase): process_network_ports.assert_has_calls([ mock.call({'current': set(['tap0']), 'removed': set([]), - 'added': set(['tap2'])}), + 'added': set(['tap2'])}, False), mock.call({'current': set(['tap2']), 'removed': set(['tap0']), - 'added': set([])}) + 'added': set([])}, False) ]) self._verify_mock_calls() -- 2.45.2