]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Reprogram flows when ovs-vswitchd restarts
authorKyle Mestery <kmestery@cisco.com>
Fri, 16 May 2014 04:21:32 +0000 (04:21 +0000)
committerThomas Goirand <thomas@goirand.fr>
Mon, 9 Jun 2014 15:06:55 +0000 (23:06 +0800)
When OVS is restarted, by default it will not reprogram flows which were
programmed. For the case of the OVS agent, this means a restart will cause
all traffic to be switched using the NORMAL action. This is undesirable for
a number of reasons, including obvious security reasons.

This change provides a way for the agent to check if a restart of ovs-vswitchd
has happened in the main agent loop. If a restart of ovs-vswitchd is detected,
the agent will run through the setup of the bridges on the host and reprogram
flows for all the ports connected.

DocImpact
This changes adds a new table (table 23) to the integration bridge, with a
single 'drop' flow. This is used to monitor OVS restarts and to reprogram
flows from the agent.

Conflicts:
neutron/plugins/openvswitch/common/constants.py

Change-Id: If9e07465c43115838de23e12a4e0087c9218cea2
Closes-Bug: #1290486
(cherry picked from commit 8e9f00a19dab98e5cfc7ca32beb9f17ebb5bc1bb)

neutron/agent/linux/ovs_lib.py
neutron/plugins/openvswitch/agent/ovs_neutron_agent.py
neutron/plugins/openvswitch/common/constants.py
neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py
neutron/tests/unit/openvswitch/test_ovs_tunnel.py

index 9fc2c81622f2f3d320f92d2ea8f87853204161fb..a8bb12d250ec6c0bf883358ea592951cf91b3e7e 100644 (file)
@@ -201,6 +201,13 @@ class OVSBridge(BaseOVS):
         else:
             self.run_ofctl("del-flows", [flow_expr_str])
 
+    def dump_flows_for_table(self, table):
+        flow_str = "table=%s" % table
+        flows = self.run_ofctl("dump-flows", [flow_str])
+        retval = '\n'.join(item for item in flows.splitlines()
+                           if 'NXST' not in item)
+        return retval
+
     def defer_apply_on(self):
         LOG.debug(_('defer_apply_on'))
         self.defer_apply_flows = True
index 11ff682183b8722558234759d2ab91871f4f43e4..79c03f1de2ad9730fba81b3450586aaaf256c4ea 100644 (file)
@@ -195,7 +195,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         self.int_br = ovs_lib.OVSBridge(integ_br, self.root_helper)
         self.setup_rpc()
         self.setup_integration_br()
-        self.setup_physical_bridges(bridge_mappings)
+        self.bridge_mappings = bridge_mappings
+        self.setup_physical_bridges(self.bridge_mappings)
         self.local_vlan_map = {}
         self.tun_br_ofports = {p_const.TYPE_GRE: {},
                                p_const.TYPE_VXLAN: {}}
@@ -212,6 +213,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         self.tunnel_count = 0
         self.vxlan_udp_port = cfg.CONF.AGENT.vxlan_udp_port
         self._check_ovs_version()
+        self.tun_br = None
         if self.enable_tunneling:
             self.setup_tunnel_br(tun_br)
         # Collect additional bridges to monitor
@@ -434,16 +436,25 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         :param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'
         '''
 
-        if not self.available_local_vlans:
-            LOG.error(_("No local VLAN available for net-id=%s"), net_uuid)
-            return
-        lvid = self.available_local_vlans.pop()
+        # On a restart or crash of OVS, the network associated with this VLAN
+        # will already be assigned, so check for that here before assigning a
+        # new one.
+        lvm = self.local_vlan_map.get(net_uuid)
+        if lvm:
+            lvid = lvm.vlan
+        else:
+            if not self.available_local_vlans:
+                LOG.error(_("No local VLAN available for net-id=%s"), net_uuid)
+                return
+            lvid = self.available_local_vlans.pop()
+            self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid,
+                                                             network_type,
+                                                             physical_network,
+                                                             segmentation_id)
+
         LOG.info(_("Assigning %(vlan_id)s as local vlan for "
                    "net-id=%(net_uuid)s"),
                  {'vlan_id': lvid, 'net_uuid': net_uuid})
-        self.local_vlan_map[net_uuid] = LocalVLANMapping(lvid, network_type,
-                                                         physical_network,
-                                                         segmentation_id)
 
         if network_type in constants.TUNNEL_NETWORK_TYPES:
             if self.enable_tunneling:
@@ -576,7 +587,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         self.available_local_vlans.add(lvm.vlan)
 
     def port_bound(self, port, net_uuid,
-                   network_type, physical_network, segmentation_id):
+                   network_type, physical_network, segmentation_id,
+                   ovs_restarted):
         '''Bind port to net_uuid/lsw_id and install flow for inbound traffic
         to vm.
 
@@ -585,8 +597,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         :param network_type: the network type ('gre', 'vlan', 'flat', 'local')
         :param physical_network: the physical network for 'vlan' or 'flat'
         :param segmentation_id: the VID for 'vlan' or tunnel ID for 'tunnel'
+        :param ovs_restarted: indicates if this is called for an OVS restart.
         '''
-        if net_uuid not in self.local_vlan_map:
+        if net_uuid not in self.local_vlan_map or ovs_restarted:
             self.provision_local_vlan(net_uuid, network_type,
                                       physical_network, segmentation_id)
         lvm = self.local_vlan_map[net_uuid]
@@ -647,6 +660,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         self.int_br.remove_all_flows()
         # switch all traffic using L2 learning
         self.int_br.add_flow(priority=1, actions="normal")
+        # Add a canary flow to int_br to track OVS restarts
+        self.int_br.add_flow(table=constants.CANARY_TABLE, priority=0,
+                             actions="drop")
 
     def setup_ancillary_bridges(self, integ_br, tun_br):
         '''Setup ancillary bridges - for example br-ex.'''
@@ -674,7 +690,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
             ancillary_bridges.append(br)
         return ancillary_bridges
 
-    def setup_tunnel_br(self, tun_br):
+    def setup_tunnel_br(self, tun_br=None):
         '''Setup the tunnel bridge.
 
         Creates tunnel bridge, and links it to the integration bridge
@@ -682,7 +698,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
 
         :param tun_br: the name of the tunnel bridge.
         '''
-        self.tun_br = ovs_lib.OVSBridge(tun_br, self.root_helper)
+        if not self.tun_br:
+            self.tun_br = ovs_lib.OVSBridge(tun_br, self.root_helper)
+
         self.tun_br.reset_bridge()
         self.patch_tun_ofport = self.int_br.add_patch_port(
             cfg.CONF.OVS.int_peer_patch_port, cfg.CONF.OVS.tun_peer_patch_port)
@@ -877,7 +895,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                 'removed': removed}
 
     def treat_vif_port(self, vif_port, port_id, network_id, network_type,
-                       physical_network, segmentation_id, admin_state_up):
+                       physical_network, segmentation_id, admin_state_up,
+                       ovs_restarted):
         # When this function is called for a port, the port should have
         # an OVS ofport configured, as only these ports were considered
         # for being treated. If that does not happen, it is a potential
@@ -888,7 +907,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         if vif_port:
             if admin_state_up:
                 self.port_bound(vif_port, network_id, network_type,
-                                physical_network, segmentation_id)
+                                physical_network, segmentation_id,
+                                ovs_restarted)
             else:
                 self.port_dead(vif_port)
         else:
@@ -946,7 +966,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                     self.tun_br.delete_port(port_name)
                     self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
 
-    def treat_devices_added_or_updated(self, devices):
+    def treat_devices_added_or_updated(self, devices, ovs_restarted):
         resync = False
         for device in devices:
             LOG.debug(_("Processing port %s"), device)
@@ -978,7 +998,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                                     details['network_type'],
                                     details['physical_network'],
                                     details['segmentation_id'],
-                                    details['admin_state_up'])
+                                    details['admin_state_up'],
+                                    ovs_restarted)
                 # update plugin about port status
                 if details.get('admin_state_up'):
                     LOG.debug(_("Setting status for %s to UP"), device)
@@ -1055,7 +1076,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                 LOG.debug(_("Device %s not defined on plugin"), device)
         return resync
 
-    def process_network_ports(self, port_info):
+    def process_network_ports(self, port_info, ovs_restarted):
         resync_a = False
         resync_b = False
         # TODO(salv-orlando): consider a solution for ensuring notifications
@@ -1078,7 +1099,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         if devices_added_updated:
             start = time.time()
             resync_a = self.treat_devices_added_or_updated(
-                devices_added_updated)
+                devices_added_updated, ovs_restarted)
             LOG.debug(_("process_network_ports - iteration:%(iter_num)d -"
                         "treat_devices_added_or_updated completed "
                         "in %(elapsed).3f"),
@@ -1166,6 +1187,11 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                 port_info.get('removed') or
                 port_info.get('updated'))
 
+    def check_ovs_restart(self):
+        # Check for the canary flow
+        canary_flow = self.int_br.dump_flows_for_table(constants.CANARY_TABLE)
+        return not canary_flow
+
     def rpc_loop(self, polling_manager=None):
         if not polling_manager:
             polling_manager = polling.AlwaysPoll()
@@ -1175,6 +1201,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         updated_ports_copy = set()
         ancillary_ports = set()
         tunnel_sync = True
+        ovs_restarted = False
         while True:
             start = time.time()
             port_stats = {'regular': {'added': 0,
@@ -1198,7 +1225,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                 except Exception:
                     LOG.exception(_("Error while synchronizing tunnels"))
                     tunnel_sync = True
-            if self._agent_has_updates(polling_manager):
+            ovs_restarted = self.check_ovs_restart()
+            if ovs_restarted:
+                self.setup_integration_br()
+                self.setup_physical_bridges(self.bridge_mappings)
+                if self.enable_tunneling:
+                    self.setup_tunnel_br()
+            if self._agent_has_updates(polling_manager) or ovs_restarted:
                 try:
                     LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
                                 "starting polling. Elapsed:%(elapsed).3f"),
@@ -1210,7 +1243,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                     # between these two statements, this will be thread-safe
                     updated_ports_copy = self.updated_ports
                     self.updated_ports = set()
-                    port_info = self.scan_ports(ports, updated_ports_copy)
+                    reg_ports = (set() if ovs_restarted else ports)
+                    port_info = self.scan_ports(reg_ports, updated_ports_copy)
                     ports = port_info['current']
                     LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
                                 "port information retrieved. "
@@ -1220,11 +1254,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                     # Secure and wire/unwire VIFs and update their status
                     # on Neutron server
                     if (self._port_info_has_changes(port_info) or
-                        self.sg_agent.firewall_refresh_needed()):
+                        self.sg_agent.firewall_refresh_needed() or
+                        ovs_restarted):
                         LOG.debug(_("Starting to process devices in:%s"),
                                   port_info)
                         # If treat devices fails - must resync with plugin
-                        sync = self.process_network_ports(port_info)
+                        sync = self.process_network_ports(port_info,
+                                                          ovs_restarted)
                         LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d -"
                                     "ports processed. Elapsed:%(elapsed).3f"),
                                   {'iter_num': self.iter_num,
index 3a5b4aaae93e811fe9942da84d05ea38b1f154d0..5e57b72cd74b5e70489ff279b9a05f7bbf66c7b3 100644 (file)
@@ -46,6 +46,8 @@ VXLAN_TUN_TO_LV = 3
 LEARN_FROM_TUN = 10
 UCAST_TO_TUN = 20
 FLOOD_TO_TUN = 21
+CANARY_TABLE = 22
+
 # Map tunnel types to tables number
 TUN_TABLE = {p_const.TYPE_GRE: GRE_TUN_TO_LV,
              p_const.TYPE_VXLAN: VXLAN_TUN_TO_LV}
index e6eebeffa1820c0dc7974dd37ea0b65d0f23d464..94d3c9371085e4a65071357abb999741bf30f2ca 100644 (file)
@@ -21,10 +21,12 @@ import mock
 from oslo.config import cfg
 import testtools
 
+from neutron.agent.linux import async_process
 from neutron.agent.linux import ip_lib
 from neutron.agent.linux import ovs_lib
 from neutron.agent.linux import utils
 from neutron.common import constants as n_const
+from neutron.openstack.common import log
 from neutron.plugins.common import constants as p_const
 from neutron.plugins.openvswitch.agent import ovs_neutron_agent
 from neutron.plugins.openvswitch.common import constants
@@ -139,7 +141,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
                        'db_get_val', return_value=str(old_local_vlan)),
             mock.patch.object(self.agent.int_br, 'delete_flows')
         ) as (set_ovs_db_func, get_ovs_db_func, delete_flows_func):
-            self.agent.port_bound(port, net_uuid, 'local', None, None)
+            self.agent.port_bound(port, net_uuid, 'local', None, None, False)
         get_ovs_db_func.assert_called_once_with("Port", mock.ANY, "tag")
         if new_local_vlan != old_local_vlan:
             set_ovs_db_func.assert_called_once_with(
@@ -275,7 +277,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
                               side_effect=Exception()),
             mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
                               return_value=mock.Mock())):
-            self.assertTrue(self.agent.treat_devices_added_or_updated([{}]))
+            self.assertTrue(self.agent.treat_devices_added_or_updated([{}],
+                                                                      False))
 
     def _mock_treat_devices_added_updated(self, details, port, func_name):
         """Mock treat devices added or updated.
@@ -294,7 +297,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
             mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
             mock.patch.object(self.agent, func_name)
         ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func):
-            self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
+            self.assertFalse(self.agent.treat_devices_added_or_updated([{}],
+                                                                       False))
         return func.called
 
     def test_treat_devices_added_updated_ignores_invalid_ofport(self):
@@ -341,7 +345,8 @@ class TestOvsNeutronAgent(base.BaseTestCase):
             mock.patch.object(self.agent, 'treat_vif_port')
         ) as (get_dev_fn, get_vif_func, upd_dev_up,
               upd_dev_down, treat_vif_port):
-            self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
+            self.assertFalse(self.agent.treat_devices_added_or_updated([{}],
+                                                                       False))
             self.assertTrue(treat_vif_port.called)
             self.assertTrue(upd_dev_down.called)
 
@@ -372,11 +377,12 @@ class TestOvsNeutronAgent(base.BaseTestCase):
             mock.patch.object(self.agent, "treat_devices_removed",
                               return_value=False)
         ) as (setup_port_filters, device_added_updated, device_removed):
-            self.assertFalse(self.agent.process_network_ports(port_info))
+            self.assertFalse(self.agent.process_network_ports(port_info,
+                                                              False))
             setup_port_filters.assert_called_once_with(
                 port_info['added'], port_info.get('updated', set()))
             device_added_updated.assert_called_once_with(
-                port_info['added'] | port_info.get('updated', set()))
+                port_info['added'] | port_info.get('updated', set()), False)
             device_removed.assert_called_once_with(port_info['removed'])
 
     def test_process_network_ports(self):
@@ -789,6 +795,61 @@ class TestOvsNeutronAgent(base.BaseTestCase):
         expected_calls = [mock.call('gre-0a0a0a0a', '10.10.10.10', 'gre')]
         self.agent.setup_tunnel_port.assert_has_calls(expected_calls)
 
+    def test_ovs_restart(self):
+        reply2 = {'current': set(['tap0']),
+                  'added': set(['tap2']),
+                  'removed': set([])}
+
+        reply3 = {'current': set(['tap2']),
+                  'added': set([]),
+                  'removed': set(['tap0'])}
+
+        with contextlib.nested(
+            mock.patch.object(async_process.AsyncProcess, "_spawn"),
+            mock.patch.object(log.ContextAdapter, 'exception'),
+            mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
+                              'scan_ports'),
+            mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
+                              'process_network_ports'),
+            mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
+                              'check_ovs_restart'),
+            mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
+                              'setup_integration_br'),
+            mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
+                              'setup_physical_bridges')
+        ) as (spawn_fn, log_exception, scan_ports, process_network_ports,
+              check_ovs_restart, setup_int_br, setup_phys_br):
+            log_exception.side_effect = Exception(
+                'Fake exception to get out of the loop')
+            scan_ports.side_effect = [reply2, reply3]
+            process_network_ports.side_effect = [
+                False, Exception('Fake exception to get out of the loop')]
+            check_ovs_restart.side_effect = [False, True]
+
+            # This will exit after the second loop
+            try:
+                self.agent.daemon_loop()
+            except Exception:
+                pass
+
+        scan_ports.assert_has_calls([
+            mock.call(set(), set()),
+            mock.call(set(), set())
+        ])
+        process_network_ports.assert_has_calls([
+            mock.call({'current': set(['tap0']),
+                       'removed': set([]),
+                       'added': set(['tap2'])}, False),
+            mock.call({'current': set(['tap2']),
+                       'removed': set(['tap0']),
+                       'added': set([])}, True)
+        ])
+
+        # Verify the second time through the loop we triggered an
+        # OVS restart and re-setup the bridges
+        setup_int_br.assert_has_calls([mock.call()])
+        setup_phys_br.assert_has_calls([mock.call({})])
+
 
 class AncillaryBridgesTest(base.BaseTestCase):
 
index 1940730fad152fce3f7c3ce8c917803dda05bcf9..126b9ccb1efaddd0325e490ccfd9fc89db16ca2c 100644 (file)
@@ -105,6 +105,8 @@ class TunnelTest(base.BaseTestCase):
             mock.call.delete_port('patch-tun'),
             mock.call.remove_all_flows(),
             mock.call.add_flow(priority=1, actions='normal'),
+            mock.call.add_flow(priority=0, table=constants.CANARY_TABLE,
+                               actions='drop')
         ]
 
         self.mock_map_tun_bridge = self.ovs_bridges[self.MAP_TUN_BRIDGE]
@@ -435,7 +437,7 @@ class TunnelTest(base.BaseTestCase):
                                               'sudo', 2, ['gre'],
                                               self.VETH_MTU)
         a.local_vlan_map[NET_UUID] = LVM
-        a.port_bound(VIF_PORT, NET_UUID, 'gre', None, LS_ID)
+        a.port_bound(VIF_PORT, NET_UUID, 'gre', None, LS_ID, False)
         self._verify_mock_calls()
 
     def test_port_unbound(self):
@@ -511,6 +513,11 @@ class TunnelTest(base.BaseTestCase):
                   'added': set([]),
                   'removed': set(['tap0'])}
 
+        self.mock_int_bridge_expected += [
+            mock.call.dump_flows_for_table(constants.CANARY_TABLE),
+            mock.call.dump_flows_for_table(constants.CANARY_TABLE)
+        ]
+
         with contextlib.nested(
             mock.patch.object(log.ContextAdapter, 'exception'),
             mock.patch.object(ovs_neutron_agent.OVSNeutronAgent,
@@ -549,10 +556,10 @@ class TunnelTest(base.BaseTestCase):
         process_network_ports.assert_has_calls([
             mock.call({'current': set(['tap0']),
                        'removed': set([]),
-                       'added': set(['tap2'])}),
+                       'added': set(['tap2'])}, False),
             mock.call({'current': set(['tap2']),
                        'removed': set(['tap0']),
-                       'added': set([])})
+                       'added': set([])}, False)
         ])
         self._verify_mock_calls()