]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Graceful ovs-agent restart
authorEugene Nikanorov <enikanorov@mirantis.com>
Sun, 10 May 2015 23:10:29 +0000 (03:10 +0400)
committerAnn Kamyshnikova <akamyshnikova@mirantis.com>
Thu, 20 Aug 2015 08:00:15 +0000 (11:00 +0300)
When agent is restarted it drops all existing flows. This
breaks all networking until the flows are re-created.

This change adds an ability to drop only old flows.
Agent_uuid_stamp is added for agents. This agent_uuid_stamp is set as
cookie for flows and then flows with stale cookies are deleted during
cleanup.

Co-Authored-By: Ann Kamyshnikova<akamyshnikova@mirantis.com>
Closes-bug: #1383674

DocImpact

Change-Id: I95070d8218859d4fff1d572c1792cdf6019dd7ea

15 files changed:
neutron/agent/common/ovs_lib.py
neutron/plugins/ml2/drivers/openvswitch/agent/common/config.py
neutron/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/br_int.py
neutron/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/br_tun.py
neutron/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/ofswitch.py
neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py
neutron/tests/common/net_helpers.py
neutron/tests/functional/agent/l2/base.py
neutron/tests/functional/agent/test_l2_ovs_agent.py
neutron/tests/unit/agent/common/test_ovs_lib.py
neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/ovs_bridge_test_base.py
neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/test_br_int.py
neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/openflow/ovs_ofctl/test_br_tun.py
neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py
neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py

index 9c23dd6ba618e771e4653d723fcd8abfd11c2d86..9c64f67d11e94eec6271b2c7fdf5bca930e57cca 100644 (file)
@@ -171,8 +171,12 @@ class OVSBridge(BaseOVS):
         self.set_db_attribute('Bridge', self.br_name, 'protocols', protocols,
                               check_error=True)
 
-    def create(self):
-        self.ovsdb.add_br(self.br_name).execute()
+    def create(self, secure_mode=False):
+        with self.ovsdb.transaction() as txn:
+            txn.add(self.ovsdb.add_br(self.br_name))
+            if secure_mode:
+                txn.add(self.ovsdb.set_fail_mode(self.br_name,
+                                                 FAILMODE_SECURE))
         # Don't return until vswitchd sets up the internal port
         self.get_port_ofport(self.br_name)
 
@@ -268,6 +272,10 @@ class OVSBridge(BaseOVS):
                                if 'NXST' not in item)
         return retval
 
+    def dump_all_flows(self):
+        return [f for f in self.run_ofctl("dump-flows", []).splitlines()
+                if 'NXST' not in f]
+
     def deferred(self, **kwargs):
         return DeferredOVSBridge(self, **kwargs)
 
index 98b6210f937d68fad6510aa6d136e04c0e56052c..7d866b6e852c391a7facb5b12c9ddd1ea3b0ac42 100644 (file)
@@ -97,7 +97,10 @@ agent_opts = [
     cfg.IntOpt('quitting_rpc_timeout', default=10,
                help=_("Set new timeout in seconds for new rpc calls after "
                       "agent receives SIGTERM. If value is set to 0, rpc "
-                      "timeout won't be changed"))
+                      "timeout won't be changed")),
+    cfg.BoolOpt('drop_flows_on_start', default=False,
+                help=_("Reset flow table on start. Setting this to True will "
+                       "cause brief traffic interruption."))
 ]
 
 
index c95a307634bbdd0b16ac134493ea7d516bbdcb36..952513e7176e0c934dde841c578de31d76813320 100644 (file)
@@ -29,7 +29,6 @@ class OVSIntegrationBridge(ovs_bridge.OVSAgentBridge):
     """openvswitch agent br-int specific logic."""
 
     def setup_default_table(self):
-        self.delete_flows()
         self.install_normal()
         self.setup_canary_table()
         self.install_drop(table_id=constants.ARP_SPOOF_TABLE)
index f71d7acd9d4d34b80316cc3b438064d69db881be..fb2df032ff49cde3494b6b93a21ea1426637dad8 100644 (file)
@@ -98,7 +98,8 @@ class OVSTunnelBridge(ovs_bridge.OVSAgentBridge,
             # to dynamically set-up flows in UCAST_TO_TUN corresponding to
             # remote mac addresses (assumes that lvid has already been set by
             # a previous flow)
-            learned_flow = ("table=%s,"
+            learned_flow = ("cookie=%(cookie)s,"
+                            "table=%(table)s,"
                             "priority=1,"
                             "hard_timeout=300,"
                             "NXM_OF_VLAN_TCI[0..11],"
@@ -106,7 +107,8 @@ class OVSTunnelBridge(ovs_bridge.OVSAgentBridge,
                             "load:0->NXM_OF_VLAN_TCI[],"
                             "load:NXM_NX_TUN_ID[]->NXM_NX_TUN_ID[],"
                             "output:NXM_OF_IN_PORT[]" %
-                            constants.UCAST_TO_TUN)
+                            {'cookie': self.agent_uuid_stamp,
+                             'table': constants.UCAST_TO_TUN})
             # Once remote mac addresses are learnt, output packet to patch_int
             deferred_br.add_flow(table=constants.LEARN_FROM_TUN,
                                  priority=1,
index 578e3e2196ae1fe7e94f0a2dbf38f25e4b55cf2f..e0d5154c39fd5a98168659a85af0731cab8eb50a 100644 (file)
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import re
+
+from oslo_log import log as logging
+
+from neutron.i18n import _LW
+
+LOG = logging.getLogger(__name__)
+
 # Field name mappings (from Ryu to ovs-ofctl)
 _keywords = {
     'eth_src': 'dl_src',
@@ -26,6 +34,10 @@ _keywords = {
 
 class OpenFlowSwitchMixin(object):
     """Mixin to provide common convenient routines for an openflow switch."""
+    agent_uuid_stamp = '0x0'
+
+    def set_agent_uuid_stamp(self, val):
+        self.agent_uuid_stamp = val
 
     @staticmethod
     def _conv_args(kwargs):
@@ -37,6 +49,9 @@ class OpenFlowSwitchMixin(object):
     def dump_flows(self, table_id):
         return self.dump_flows_for_table(table_id)
 
+    def dump_flows_all_tables(self):
+        return self.dump_all_flows()
+
     def install_goto_next(self, table_id):
         self.install_goto(table_id=table_id, dest_table_id=table_id + 1)
 
@@ -72,3 +87,36 @@ class OpenFlowSwitchMixin(object):
                 **self._conv_args(kwargs))
         else:
             super(OpenFlowSwitchMixin, self).remove_all_flows()
+
+    def add_flow(self, **kwargs):
+        kwargs['cookie'] = self.agent_uuid_stamp
+        super(OpenFlowSwitchMixin, self).add_flow(**self._conv_args(kwargs))
+
+    def mod_flow(self, **kwargs):
+        kwargs['cookie'] = self.agent_uuid_stamp
+        super(OpenFlowSwitchMixin, self).mod_flow(**self._conv_args(kwargs))
+
+    def _filter_flows(self, flows):
+        LOG.debug("Agent uuid stamp used to filter flows: %s",
+                  self.agent_uuid_stamp)
+        cookie_re = re.compile('cookie=(0x[A-Fa-f0-9]*)')
+        table_re = re.compile('table=([0-9]*)')
+        for flow in flows:
+            fl_cookie = cookie_re.search(flow)
+            if not fl_cookie:
+                continue
+            fl_cookie = fl_cookie.group(1)
+            if int(fl_cookie, 16) != self.agent_uuid_stamp:
+                fl_table = table_re.search(flow)
+                if not fl_table:
+                    continue
+                fl_table = fl_table.group(1)
+                yield flow, fl_cookie, fl_table
+
+    def cleanup_flows(self):
+        flows = self.dump_flows_all_tables()
+        for flow, cookie, table in self._filter_flows(flows):
+            # deleting a stale flow should be rare.
+            # it might deserve some attention
+            LOG.warning(_LW("Deleting flow %s"), flow)
+            self.delete_flows(cookie=cookie + '/-1', table=table)
index 190c54b3a7e4bd0c39ae33e2b9df764dad35c8af..f9f04803373cf39ec09f473b7243504f84468fe0 100644 (file)
@@ -17,6 +17,7 @@ import hashlib
 import signal
 import sys
 import time
+import uuid
 
 import netaddr
 from oslo_config import cfg
@@ -57,6 +58,7 @@ cfg.CONF.import_group('OVS', 'neutron.plugins.ml2.drivers.openvswitch.agent.'
 
 # A placeholder for dead vlans.
 DEAD_VLAN_TAG = p_const.MAX_VLAN_TAG + 1
+UINT64_BITMASK = (1 << 64) - 1
 
 
 class _mac_mydialect(netaddr.mac_unix):
@@ -216,6 +218,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         # Keep track of int_br's device count for use by _report_state()
         self.int_br_device_count = 0
 
+        self.agent_uuid_stamp = uuid.uuid4().int & UINT64_BITMASK
+
         self.int_br = self.br_int_cls(integ_br)
         self.setup_integration_br()
         # Stores port update notifications for processing in main rpc loop
@@ -244,8 +248,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         self.patch_tun_ofport = constants.OFPORT_INVALID
         if self.enable_tunneling:
             # The patch_int_ofport and patch_tun_ofport are updated
-            # here inside the call to reset_tunnel_br()
-            self.reset_tunnel_br(tun_br)
+            # here inside the call to setup_tunnel_br()
+            self.setup_tunnel_br(tun_br)
 
         self.dvr_agent = ovs_dvr_neutron_agent.OVSDVRNeutronAgent(
             self.context,
@@ -269,7 +273,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
             heartbeat.start(interval=report_interval)
 
         if self.enable_tunneling:
-            self.setup_tunnel_br()
+            self.setup_tunnel_br_flows()
 
         self.dvr_agent.setup_dvr_flows()
 
@@ -872,8 +876,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
     def setup_integration_br(self):
         '''Setup the integration bridge.
 
-        Delete patch ports and remove all existing flows.
         '''
+        self.int_br.set_agent_uuid_stamp(self.agent_uuid_stamp)
         # Ensure the integration bridge is created.
         # ovs_lib.OVSBridge.create() will run
         #   ovs-vsctl -- --may-exist add-br BRIDGE_NAME
@@ -883,7 +887,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         self.int_br.setup_controllers(self.conf)
 
         self.int_br.delete_port(self.conf.OVS.int_peer_patch_port)
-
+        if self.conf.AGENT.drop_flows_on_start:
+            self.int_br.delete_flows()
         self.int_br.setup_default_table()
 
     def setup_ancillary_bridges(self, integ_br, tun_br):
@@ -912,7 +917,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
             ancillary_bridges.append(br)
         return ancillary_bridges
 
-    def reset_tunnel_br(self, tun_br_name=None):
+    def setup_tunnel_br(self, tun_br_name=None):
         '''(re)initialize the tunnel bridge.
 
         Creates tunnel bridge, and links it to the integration bridge
@@ -922,15 +927,21 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
         '''
         if not self.tun_br:
             self.tun_br = self.br_tun_cls(tun_br_name)
+        self.tun_br.set_agent_uuid_stamp(self.agent_uuid_stamp)
 
-        self.tun_br.reset_bridge(secure_mode=True)
+        if not self.tun_br.bridge_exists('br-tun'):
+            self.tun_br.create(secure_mode=True)
         self.tun_br.setup_controllers(self.conf)
-        self.patch_tun_ofport = self.int_br.add_patch_port(
-            self.conf.OVS.int_peer_patch_port,
-            self.conf.OVS.tun_peer_patch_port)
-        self.patch_int_ofport = self.tun_br.add_patch_port(
-            self.conf.OVS.tun_peer_patch_port,
-            self.conf.OVS.int_peer_patch_port)
+        if (not self.int_br.port_exists(self.conf.OVS.int_peer_patch_port) or
+                self.patch_tun_ofport == ovs_lib.INVALID_OFPORT):
+            self.patch_tun_ofport = self.int_br.add_patch_port(
+                self.conf.OVS.int_peer_patch_port,
+                self.conf.OVS.tun_peer_patch_port)
+        if (not self.tun_br.port_exists(self.conf.OVS.tun_peer_patch_port) or
+                self.patch_int_ofport == ovs_lib.INVALID_OFPORT):
+            self.patch_int_ofport = self.tun_br.add_patch_port(
+                self.conf.OVS.tun_peer_patch_port,
+                self.conf.OVS.int_peer_patch_port)
         if ovs_lib.INVALID_OFPORT in (self.patch_tun_ofport,
                                       self.patch_int_ofport):
             LOG.error(_LE("Failed to create OVS patch port. Cannot have "
@@ -938,9 +949,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                           "version of OVS does not support tunnels or patch "
                           "ports. Agent terminated!"))
             exit(1)
-        self.tun_br.delete_flows()
+        if self.conf.AGENT.drop_flows_on_start:
+            self.tun_br.delete_flows()
 
-    def setup_tunnel_br(self):
+    def setup_tunnel_br_flows(self):
         '''Setup the tunnel bridge.
 
         Add all flows to the tunnel bridge.
@@ -1008,9 +1020,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                                              bridge)
             phys_if_name = self.get_peer_name(constants.PEER_PHYSICAL_PREFIX,
                                               bridge)
-            self.int_br.delete_port(int_if_name)
-            br.delete_port(phys_if_name)
+            # Interface type of port for physical and integration bridges must
+            # be same, so check only one of them.
+            int_type = self.int_br.db_get_val("Interface", int_if_name, "type")
             if self.use_veth_interconnection:
+                # Drop ports if the interface types doesn't match the
+                # configuration value.
+                if int_type == 'patch':
+                    self.int_br.delete_port(int_if_name)
+                    br.delete_port(phys_if_name)
                 if ip_lib.device_exists(int_if_name):
                     ip_lib.IPDevice(int_if_name).link.delete()
                     # Give udev a chance to process its rules here, to avoid
@@ -1022,6 +1040,11 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                 int_ofport = self.int_br.add_port(int_veth)
                 phys_ofport = br.add_port(phys_veth)
             else:
+                # Drop ports if the interface type doesn't match the
+                # configuration value
+                if int_type == 'veth':
+                    self.int_br.delete_port(int_if_name)
+                    br.delete_port(phys_if_name)
                 # Create patch ports without associating them in order to block
                 # untranslated traffic before association
                 int_ofport = self.int_br.add_patch_port(
@@ -1515,6 +1538,15 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                 'removed': len(ancillary_port_info.get('removed', []))}
         return port_stats
 
+    def cleanup_stale_flows(self):
+        if self.iter_num == 0:
+            bridges = [self.int_br]
+            if self.enable_tunneling:
+                bridges.append(self.tun_br)
+            for bridge in bridges:
+                LOG.info(_LI("Cleaning stale %s flows"), bridge.br_name)
+                bridge.cleanup_flows()
+
     def rpc_loop(self, polling_manager=None):
         if not polling_manager:
             polling_manager = polling.get_polling_manager(
@@ -1543,8 +1575,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                 self.setup_integration_br()
                 self.setup_physical_bridges(self.bridge_mappings)
                 if self.enable_tunneling:
-                    self.reset_tunnel_br()
                     self.setup_tunnel_br()
+                    self.setup_tunnel_br_flows()
                     tunnel_sync = True
                 if self.enable_distributed_routing:
                     self.dvr_agent.reset_ovs_parameters(self.int_br,
@@ -1613,6 +1645,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                         # If treat devices fails - must resync with plugin
                         sync = self.process_network_ports(port_info,
                                                           ovs_restarted)
+                        self.cleanup_stale_flows()
                         LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
                                   "ports processed. Elapsed:%(elapsed).3f",
                                   {'iter_num': self.iter_num,
index d4bfe3736b4f15bf3e0f29698f04c995c829cf24..0a281a167480953b75bf5c3384dde18797ae1bbc 100644 (file)
@@ -14,6 +14,8 @@
 #
 
 import abc
+from concurrent import futures
+import contextlib
 import functools
 import os
 import random
@@ -86,6 +88,17 @@ def assert_ping(src_namespace, dst_ip, timeout=1, count=1):
                                  dst_ip])
 
 
+@contextlib.contextmanager
+def async_ping(namespace, ips):
+    with futures.ThreadPoolExecutor(max_workers=len(ips)) as executor:
+        fs = [executor.submit(assert_ping, namespace, ip, count=10)
+              for ip in ips]
+        yield lambda: all(f.done() for f in fs)
+        futures.wait(fs)
+        for f in fs:
+            f.result()
+
+
 def assert_no_ping(src_namespace, dst_ip, timeout=1, count=1):
     try:
         assert_ping(src_namespace, dst_ip, timeout, count)
index 46706d7ddad9fc7f17e0dab63f0fee0cb157b898..66afcafb9a28595e22a22432f25ad41382e3e421 100644 (file)
@@ -43,6 +43,7 @@ from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl \
     import br_tun
 from neutron.plugins.ml2.drivers.openvswitch.agent import ovs_neutron_agent \
     as ovs_agent
+from neutron.tests.common import net_helpers
 from neutron.tests.functional.agent.linux import base
 
 LOG = logging.getLogger(__name__)
@@ -66,6 +67,7 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
         self.ovs = ovs_lib.BaseOVS()
         self.config = self._configure_agent()
         self.driver = interface.OVSInterfaceDriver(self.config)
+        self.namespace = self.useFixture(net_helpers.NamespaceFixture()).name
 
     def _get_config_opts(self):
         config = cfg.ConfigOpts()
@@ -169,10 +171,11 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
             self.driver.plug(
                 network.get('id'), port.get('id'), port.get('vif_name'),
                 port.get('mac_address'),
-                agent.int_br.br_name, namespace=None)
+                agent.int_br.br_name, namespace=self.namespace)
             ip_cidrs = ["%s/%s" % (port.get('fixed_ips')[0][
                 'ip_address'], ip_len)]
-            self.driver.init_l3(port.get('vif_name'), ip_cidrs, namespace=None)
+            self.driver.init_l3(port.get('vif_name'), ip_cidrs,
+                                namespace=self.namespace)
 
     def _get_device_details(self, port, network):
         dev = {'device': port['id'],
@@ -276,8 +279,9 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
             lambda: self._expected_plugin_rpc_call(
                 self.agent.plugin_rpc.update_device_list, port_ids, up))
 
-    def setup_agent_and_ports(self, port_dicts, trigger_resync=False):
-        self.agent = self.create_agent()
+    def setup_agent_and_ports(self, port_dicts, create_tunnels=True,
+                              trigger_resync=False):
+        self.agent = self.create_agent(create_tunnels=create_tunnels)
         self.start_agent(self.agent)
         self.network = self._create_test_network_dict()
         self.ports = port_dicts
index abc573ba729f47783cfe8c033d058a06b776097d..a18d4c5e2e5a0ac06c09932ecb1197f5b93bc279 100644 (file)
@@ -14,7 +14,9 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import time
 
+from neutron.tests.common import net_helpers
 from neutron.tests.functional.agent.l2 import base
 
 
@@ -54,3 +56,13 @@ class TestOVSAgent(base.OVSAgentTestFramework):
         self.create_agent(create_tunnels=False)
         self.assertTrue(self.ovs.bridge_exists(self.br_int))
         self.assertFalse(self.ovs.bridge_exists(self.br_tun))
+
+    def test_assert_pings_during_br_int_setup_not_lost(self):
+        self.setup_agent_and_ports(port_dicts=self.create_test_ports(),
+                                   create_tunnels=False)
+        self.wait_until_ports_state(self.ports, up=True)
+        ips = [port['fixed_ips'][0]['ip_address'] for port in self.ports]
+        with net_helpers.async_ping(self.namespace, ips) as running:
+            while running():
+                self.agent.setup_integration_br()
+                time.sleep(0.25)
index b0b8180c1b922e0d1eae47d57a277f2a9938b14c..cb9f71f506a887e308facb1292e10cf8d1beb85c 100644 (file)
@@ -182,29 +182,36 @@ class OVS_Lib_Test(base.BaseTestCase):
         cidr = '192.168.1.0/24'
 
         flow_dict_1 = collections.OrderedDict([
+            ('cookie', 1234),
             ('priority', 2),
             ('dl_src', 'ca:fe:de:ad:be:ef'),
             ('actions', 'strip_vlan,output:0')])
         flow_dict_2 = collections.OrderedDict([
+            ('cookie', 1254),
             ('priority', 1),
             ('actions', 'normal')])
         flow_dict_3 = collections.OrderedDict([
+            ('cookie', 1257),
             ('priority', 2),
             ('actions', 'drop')])
         flow_dict_4 = collections.OrderedDict([
+            ('cookie', 1274),
             ('priority', 2),
             ('in_port', ofport),
             ('actions', 'drop')])
         flow_dict_5 = collections.OrderedDict([
+            ('cookie', 1284),
             ('priority', 4),
             ('in_port', ofport),
             ('dl_vlan', vid),
             ('actions', "strip_vlan,set_tunnel:%s,normal" % (lsw_id))])
         flow_dict_6 = collections.OrderedDict([
+            ('cookie', 1754),
             ('priority', 3),
             ('tun_id', lsw_id),
             ('actions', "mod_vlan_vid:%s,output:%s" % (vid, ofport))])
         flow_dict_7 = collections.OrderedDict([
+            ('cookie', 1256),
             ('priority', 4),
             ('nw_src', cidr),
             ('proto', 'arp'),
@@ -220,36 +227,39 @@ class OVS_Lib_Test(base.BaseTestCase):
         expected_calls = [
             self._ofctl_mock("add-flows", self.BR_NAME, '-',
                              process_input=OFCTLParamListMatcher(
-                                 "hard_timeout=0,idle_timeout=0,"
+                                 "hard_timeout=0,idle_timeout=0,cookie=1234,"
                                  "priority=2,dl_src=ca:fe:de:ad:be:ef,"
                                  "actions=strip_vlan,output:0")),
             self._ofctl_mock("add-flows", self.BR_NAME, '-',
                              process_input=OFCTLParamListMatcher(
-                                 "hard_timeout=0,idle_timeout=0,"
+                                 "hard_timeout=0,idle_timeout=0,cookie=1254,"
                                  "priority=1,actions=normal")),
             self._ofctl_mock("add-flows", self.BR_NAME, '-',
                              process_input=OFCTLParamListMatcher(
-                                 "hard_timeout=0,idle_timeout=0,"
+                                 "hard_timeout=0,idle_timeout=0,cookie=1257,"
                                  "priority=2,actions=drop")),
             self._ofctl_mock("add-flows", self.BR_NAME, '-',
                              process_input=OFCTLParamListMatcher(
-                                 "hard_timeout=0,idle_timeout=0,priority=2,"
-                                 "in_port=%s,actions=drop" % ofport)),
+                                 "hard_timeout=0,idle_timeout=0,cookie=1274,"
+                                 "priority=2,in_port=%s,actions=drop" % ofport
+                             )),
             self._ofctl_mock("add-flows", self.BR_NAME, '-',
                              process_input=OFCTLParamListMatcher(
-                                 "hard_timeout=0,idle_timeout=0,"
+                                 "hard_timeout=0,idle_timeout=0,cookie=1284,"
                                  "priority=4,dl_vlan=%s,in_port=%s,"
                                  "actions=strip_vlan,set_tunnel:%s,normal" %
                                  (vid, ofport, lsw_id))),
             self._ofctl_mock("add-flows", self.BR_NAME, '-',
                              process_input=OFCTLParamListMatcher(
-                                 "hard_timeout=0,idle_timeout=0,priority=3,"
-                                 "tun_id=%s,actions=mod_vlan_vid:%s,"
-                                 "output:%s" % (lsw_id, vid, ofport))),
+                                 "hard_timeout=0,idle_timeout=0,cookie=1754,"
+                                 "priority=3,"
+                                 "tun_id=%s,actions=mod_vlan_vid:%s,output:%s"
+                                 % (lsw_id, vid, ofport))),
             self._ofctl_mock("add-flows", self.BR_NAME, '-',
                              process_input=OFCTLParamListMatcher(
-                                 "hard_timeout=0,idle_timeout=0,priority=4,"
-                                 "nw_src=%s,arp,actions=drop" % cidr)),
+                                 "hard_timeout=0,idle_timeout=0,cookie=1256,"
+                                 "priority=4,nw_src=%s,arp,actions=drop"
+                                 % cidr)),
         ]
         self.execute.assert_has_calls(expected_calls)
 
@@ -269,6 +279,7 @@ class OVS_Lib_Test(base.BaseTestCase):
 
     def test_add_flow_timeout_set(self):
         flow_dict = collections.OrderedDict([
+            ('cookie', 1234),
             ('priority', 1),
             ('hard_timeout', 1000),
             ('idle_timeout', 2000),
@@ -277,17 +288,18 @@ class OVS_Lib_Test(base.BaseTestCase):
         self.br.add_flow(**flow_dict)
         self._verify_ofctl_mock(
             "add-flows", self.BR_NAME, '-',
-            process_input="hard_timeout=1000,idle_timeout=2000,priority=1,"
-            "actions=normal")
+            process_input="hard_timeout=1000,idle_timeout=2000,"
+                          "priority=1,cookie=1234,actions=normal")
 
     def test_add_flow_default_priority(self):
-        flow_dict = collections.OrderedDict([('actions', 'normal')])
+        flow_dict = collections.OrderedDict([('actions', 'normal'),
+                                             ('cookie', 1234)])
 
         self.br.add_flow(**flow_dict)
         self._verify_ofctl_mock(
             "add-flows", self.BR_NAME, '-',
             process_input="hard_timeout=0,idle_timeout=0,priority=1,"
-                          "actions=normal")
+                          "cookie=1234,actions=normal")
 
     def _test_get_port_ofport(self, ofport, expected_result):
         pname = "tap99"
index ad9de289fc3041828c7ce124e7a52ada74d4897d..b396103003728aa5e3b87a4aec8634fb35be58f7 100644 (file)
@@ -80,6 +80,17 @@ class OVSBridgeTestBase(ovs_test_base.OVSOFCtlTestBase):
         ]
         self.assertEqual(expected, self.mock.mock_calls)
 
+    def test_dump_flows_for_table(self):
+        table = 23
+        with mock.patch.object(self.br, 'run_ofctl') as run_ofctl:
+            self.br.dump_flows(table)
+            run_ofctl.assert_has_calls([mock.call("dump-flows", mock.ANY)])
+
+    def test_dump_all_flows(self):
+        with mock.patch.object(self.br, 'run_ofctl') as run_ofctl:
+            self.br.dump_flows_all_tables()
+            run_ofctl.assert_has_calls([mock.call("dump-flows", [])])
+
 
 class OVSDVRProcessTestMixin(object):
     def test_install_dvr_process_ipv4(self):
index 005112762f1ba58eff56ada6d9dc78fb6f79d473..9bb3c8f2346e2dedc9d40f69944361cc699c1df8 100644 (file)
@@ -31,7 +31,6 @@ class OVSIntegrationBridgeTest(ovs_bridge_test_base.OVSBridgeTestBase):
     def test_setup_default_table(self):
         self.br.setup_default_table()
         expected = [
-            call.delete_flows(),
             call.add_flow(priority=0, table=0, actions='normal'),
             call.add_flow(priority=0, table=23, actions='drop'),
             call.add_flow(priority=0, table=24, actions='drop'),
index 485523129e35b50cf57fb6bd0b257dccb0f37a22..6d04f230cc5a57a5faa1a5e4b7c71b8ce700efa5 100644 (file)
@@ -54,7 +54,7 @@ class OVSTunnelBridgeTest(ovs_bridge_test_base.OVSBridgeTestBase,
                      {'priority': 0, 'table': 3, 'actions': 'drop'},
                      {'priority': 0, 'table': 4, 'actions': 'drop'},
                      {'priority': 1, 'table': 10,
-                      'actions': 'learn(table=20,priority=1,'
+                      'actions': 'learn(cookie=0x0,table=20,priority=1,'
                       'hard_timeout=300,NXM_OF_VLAN_TCI[0..11],'
                       'NXM_OF_ETH_DST[]=NXM_OF_ETH_SRC[],'
                       'load:0->NXM_OF_VLAN_TCI[],'
@@ -88,7 +88,7 @@ class OVSTunnelBridgeTest(ovs_bridge_test_base.OVSBridgeTestBase,
                      {'priority': 0, 'table': 3, 'actions': 'drop'},
                      {'priority': 0, 'table': 4, 'actions': 'drop'},
                      {'priority': 1, 'table': 10,
-                      'actions': 'learn(table=20,priority=1,'
+                      'actions': 'learn(cookie=0x0,table=20,priority=1,'
                       'hard_timeout=300,NXM_OF_VLAN_TCI[0..11],'
                       'NXM_OF_ETH_DST[]=NXM_OF_ETH_SRC[],'
                       'load:0->NXM_OF_VLAN_TCI[],'
index 72eb801e96aa4cde367dd3f23525ed53fde08d8d..d92035059e4d0941e292bae13045608747ee2539 100644 (file)
@@ -405,6 +405,9 @@ class TestOvsNeutronAgent(object):
                                                 'devices_down': details,
                                                 'failed_devices_up': [],
                                                 'failed_devices_down': []}),\
+                mock.patch.object(self.agent.int_br,
+                    'get_port_tag_dict',
+                    return_value={}),\
                 mock.patch.object(self.agent, func_name) as func:
             skip_devs, need_bound_devices = (
                 self.agent.treat_devices_added_or_updated([{}], False))
@@ -469,6 +472,9 @@ class TestOvsNeutronAgent(object):
                                'get_devices_details_list_and_failed_devices',
                                return_value={'devices': [dev_mock],
                                              'failed_devices': None}),\
+                mock.patch.object(self.agent.int_br,
+                    'get_port_tag_dict',
+                    return_value={}),\
                 mock.patch.object(self.agent.int_br,
                                   'get_vifs_by_ids',
                                   return_value={}),\
@@ -500,6 +506,8 @@ class TestOvsNeutronAgent(object):
                 mock.patch.object(self.agent.int_br,
                                   'get_vifs_by_ids',
                                   return_value={'xxx': mock.MagicMock()}),\
+                mock.patch.object(self.agent.int_br, 'get_port_tag_dict',
+                                  return_value={}),\
                 mock.patch.object(self.agent,
                                   'treat_vif_port') as treat_vif_port:
             skip_devs, need_bound_devices = (
@@ -655,8 +663,11 @@ class TestOvsNeutronAgent(object):
                 mock.call.phys_br_cls('br-eth'),
                 mock.call.phys_br.setup_controllers(mock.ANY),
                 mock.call.phys_br.setup_default_table(),
-                mock.call.int_br.delete_port('int-br-eth'),
-                mock.call.phys_br.delete_port('phy-br-eth'),
+                mock.call.int_br.db_get_val('Interface', 'int-br-eth',
+                                            'type'),
+                # Have to use __getattr__ here to avoid mock._Call.__eq__
+                # method being called
+                mock.call.int_br.db_get_val().__getattr__('__eq__')('veth'),
                 mock.call.int_br.add_patch_port('int-br-eth',
                                                 constants.NONEXISTENT_PEER),
                 mock.call.phys_br.add_patch_port('phy-br-eth',
@@ -713,6 +724,46 @@ class TestOvsNeutronAgent(object):
             self.assertEqual(self.agent.phys_ofports["physnet1"],
                              "phys_veth_ofport")
 
+    def test_setup_physical_bridges_change_from_veth_to_patch_conf(self):
+        with mock.patch.object(sys, "exit"),\
+                mock.patch.object(utils, "execute"),\
+                mock.patch.object(self.agent, 'br_phys_cls') as phys_br_cls,\
+                mock.patch.object(self.agent, 'int_br') as int_br,\
+                mock.patch.object(self.agent.int_br, 'db_get_val',
+                                  return_value='veth'):
+            phys_br = phys_br_cls()
+            parent = mock.MagicMock()
+            parent.attach_mock(phys_br_cls, 'phys_br_cls')
+            parent.attach_mock(phys_br, 'phys_br')
+            parent.attach_mock(int_br, 'int_br')
+            phys_br.add_patch_port.return_value = "phy_ofport"
+            int_br.add_patch_port.return_value = "int_ofport"
+            self.agent.setup_physical_bridges({"physnet1": "br-eth"})
+            expected_calls = [
+                mock.call.phys_br_cls('br-eth'),
+                mock.call.phys_br.setup_controllers(mock.ANY),
+                mock.call.phys_br.setup_default_table(),
+                mock.call.int_br.delete_port('int-br-eth'),
+                mock.call.phys_br.delete_port('phy-br-eth'),
+                mock.call.int_br.add_patch_port('int-br-eth',
+                                                constants.NONEXISTENT_PEER),
+                mock.call.phys_br.add_patch_port('phy-br-eth',
+                                                 constants.NONEXISTENT_PEER),
+                mock.call.int_br.drop_port(in_port='int_ofport'),
+                mock.call.phys_br.drop_port(in_port='phy_ofport'),
+                mock.call.int_br.set_db_attribute('Interface', 'int-br-eth',
+                                                  'options:peer',
+                                                  'phy-br-eth'),
+                mock.call.phys_br.set_db_attribute('Interface', 'phy-br-eth',
+                                                   'options:peer',
+                                                   'int-br-eth'),
+            ]
+            parent.assert_has_calls(expected_calls)
+            self.assertEqual(self.agent.int_ofports["physnet1"],
+                             "int_ofport")
+            self.assertEqual(self.agent.phys_ofports["physnet1"],
+                             "phy_ofport")
+
     def test_get_peer_name(self):
         bridge1 = "A_REALLY_LONG_BRIDGE_NAME1"
         bridge2 = "A_REALLY_LONG_BRIDGE_NAME2"
@@ -728,15 +779,49 @@ class TestOvsNeutronAgent(object):
         self.tun_br = mock.Mock()
         with mock.patch.object(self.agent.int_br,
                                "add_patch_port",
-                               return_value=1) as intbr_patch_fn,\
-                mock.patch.object(self.agent,
-                                  'tun_br',
-                                  autospec=True) as tun_br,\
+                               return_value=1) as int_patch_port,\
+                mock.patch.object(self.agent.tun_br,
+                                  "add_patch_port",
+                                  return_value=1) as tun_patch_port,\
+                mock.patch.object(self.agent.tun_br, 'bridge_exists',
+                                  return_value=False),\
+                mock.patch.object(self.agent.tun_br, 'create') as create_tun,\
+                mock.patch.object(self.agent.tun_br,
+                                  'setup_controllers') as setup_controllers,\
+                mock.patch.object(self.agent.tun_br, 'port_exists',
+                                  return_value=False),\
+                mock.patch.object(self.agent.int_br, 'port_exists',
+                                  return_value=False),\
                 mock.patch.object(sys, "exit"):
-            tun_br.add_patch_port.return_value = 2
-            self.agent.reset_tunnel_br(None)
+            self.agent.setup_tunnel_br(None)
             self.agent.setup_tunnel_br()
-            self.assertTrue(intbr_patch_fn.called)
+            self.assertTrue(create_tun.called)
+            self.assertTrue(setup_controllers.called)
+            self.assertTrue(int_patch_port.called)
+            self.assertTrue(tun_patch_port.called)
+
+    def test_setup_tunnel_br_ports_exits_drop_flows(self):
+        cfg.CONF.set_override('drop_flows_on_start', True, 'AGENT')
+        with mock.patch.object(self.agent.tun_br, 'port_exists',
+                               return_value=True),\
+                mock.patch.object(self.agent, 'tun_br'),\
+                mock.patch.object(self.agent.int_br, 'port_exists',
+                                  return_value=True),\
+                mock.patch.object(self.agent.tun_br, 'setup_controllers'),\
+                mock.patch.object(self.agent, 'patch_tun_ofport', new=2),\
+                mock.patch.object(self.agent, 'patch_int_ofport', new=2),\
+                mock.patch.object(self.agent.tun_br,
+                                  'delete_flows') as delete,\
+                mock.patch.object(self.agent.int_br,
+                                  "add_patch_port") as int_patch_port,\
+                mock.patch.object(self.agent.tun_br,
+                                  "add_patch_port") as tun_patch_port,\
+                mock.patch.object(sys, "exit"):
+            self.agent.setup_tunnel_br(None)
+            self.agent.setup_tunnel_br()
+            self.assertFalse(int_patch_port.called)
+            self.assertFalse(tun_patch_port.called)
+            self.assertTrue(delete.called)
 
     def test_setup_tunnel_port(self):
         self.agent.tun_br = mock.Mock()
@@ -999,12 +1084,15 @@ class TestOvsNeutronAgent(object):
                                return_value=fake_tunnel_details),\
                 mock.patch.object(
                     self.agent,
-                    '_setup_tunnel_port') as _setup_tunnel_port_fn:
+                    '_setup_tunnel_port') as _setup_tunnel_port_fn,\
+                mock.patch.object(self.agent,
+                                  'cleanup_stale_flows') as cleanup:
             self.agent.tunnel_types = ['vxlan']
             self.agent.tunnel_sync()
             expected_calls = [mock.call(self.agent.tun_br, 'vxlan-64651f0f',
                                         '100.101.31.15', 'vxlan')]
             _setup_tunnel_port_fn.assert_has_calls(expected_calls)
+            self.assertEqual([], cleanup.mock_calls)
 
     def test_tunnel_sync_invalid_ip_address(self):
         fake_tunnel_details = {'tunnels': [{'ip_address': '300.300.300.300'},
@@ -1014,13 +1102,16 @@ class TestOvsNeutronAgent(object):
                                return_value=fake_tunnel_details),\
                 mock.patch.object(
                     self.agent,
-                    '_setup_tunnel_port') as _setup_tunnel_port_fn:
+                    '_setup_tunnel_port') as _setup_tunnel_port_fn,\
+                mock.patch.object(self.agent,
+                                  'cleanup_stale_flows') as cleanup:
             self.agent.tunnel_types = ['vxlan']
             self.agent.tunnel_sync()
             _setup_tunnel_port_fn.assert_called_once_with(self.agent.tun_br,
                                                           'vxlan-64646464',
                                                           '100.100.100.100',
                                                           'vxlan')
+            self.assertEqual([], cleanup.mock_calls)
 
     def test_tunnel_update(self):
         kwargs = {'tunnel_ip': '10.10.10.10',
@@ -1070,8 +1161,11 @@ class TestOvsNeutronAgent(object):
                 mock.patch.object(self.mod_agent.OVSNeutronAgent,
                                   'setup_physical_bridges') as setup_phys_br,\
                 mock.patch.object(time, 'sleep'),\
+                mock.patch.object(
+                    self.mod_agent.OVSNeutronAgent,
+                    'update_stale_ofport_rules') as update_stale, \
                 mock.patch.object(self.mod_agent.OVSNeutronAgent,
-                                  'update_stale_ofport_rules') as update_stale:
+                                  'cleanup_stale_flows') as cleanup:
             log_exception.side_effect = Exception(
                 'Fake exception to get out of the loop')
             scan_ports.side_effect = [reply2, reply3]
@@ -1091,6 +1185,7 @@ class TestOvsNeutronAgent(object):
                 mock.call(reply2, False),
                 mock.call(reply3, True)
             ])
+            cleanup.assert_called_once_with()
             self.assertTrue(update_stale.called)
             # Verify the OVS restart we triggered in the loop
             # re-setup the bridges
@@ -1113,6 +1208,24 @@ class TestOvsNeutronAgent(object):
                            self.agent.state_rpc.client):
             self.assertEqual(10, rpc_client.timeout)
 
+    def test_cleanup_stale_flows_iter_0(self):
+        with mock.patch.object(self.agent, 'agent_uuid_stamp', new=1234),\
+            mock.patch.object(self.agent.int_br,
+                              'dump_flows_all_tables') as dump_flows,\
+                mock.patch.object(self.agent.int_br,
+                                  'delete_flows') as del_flow:
+            dump_flows.return_value = [
+                'cookie=0x4d2, duration=50.156s, table=0,actions=drop',
+                'cookie=0x4321, duration=54.143s, table=2, priority=0',
+                'cookie=0x2345, duration=50.125s, table=2, priority=0',
+                'cookie=0x4d2, duration=52.112s, table=3, actions=drop',
+            ]
+            self.agent.cleanup_stale_flows()
+            del_flow.assert_has_calls([mock.call(cookie='0x4321/-1',
+                                                 table='2'),
+                                       mock.call(cookie='0x2345/-1',
+                                                 table='2')])
+
     def test_set_rpc_timeout_no_value(self):
         self.agent.quitting_rpc_timeout = None
         with mock.patch.object(self.agent, 'set_rpc_timeout') as mock_set_rpc:
@@ -2164,7 +2277,7 @@ class TestOvsDvrNeutronAgent(object):
             # block RPC calls and bridge calls
             self.agent.setup_physical_bridges = mock.Mock()
             self.agent.setup_integration_br = mock.Mock()
-            self.agent.reset_tunnel_br = mock.Mock()
+            self.agent.setup_tunnel_br = mock.Mock()
             self.agent.state_rpc = mock.Mock()
             try:
                 self.agent.rpc_loop(polling_manager=mock.Mock())
index e6f7fadb0b7eabd9c9c7b9a0a1d9c38511dd47d7..315360b8a7353d8b5dd304a300ad40ac8b33595f 100644 (file)
@@ -167,6 +167,7 @@ class TunnelTest(object):
 
         self.mock_int_bridge = self.ovs_bridges[self.INT_BRIDGE]
         self.mock_int_bridge_expected = [
+            mock.call.set_agent_uuid_stamp(mock.ANY),
             mock.call.create(),
             mock.call.set_secure_mode(),
             mock.call.setup_controllers(mock.ANY),
@@ -177,11 +178,11 @@ class TunnelTest(object):
         self.mock_map_tun_bridge_expected = [
             mock.call.setup_controllers(mock.ANY),
             mock.call.setup_default_table(),
-            mock.call.delete_port('phy-%s' % self.MAP_TUN_BRIDGE),
             mock.call.add_patch_port('phy-%s' % self.MAP_TUN_BRIDGE,
                                      constants.NONEXISTENT_PEER), ]
         self.mock_int_bridge_expected += [
-            mock.call.delete_port('int-%s' % self.MAP_TUN_BRIDGE),
+            mock.call.db_get_val('Interface', 'int-%s' % self.MAP_TUN_BRIDGE,
+                                 'type'),
             mock.call.add_patch_port('int-%s' % self.MAP_TUN_BRIDGE,
                                      constants.NONEXISTENT_PEER),
         ]
@@ -200,11 +201,17 @@ class TunnelTest(object):
         ]
 
         self.mock_tun_bridge_expected = [
-            mock.call.reset_bridge(secure_mode=True),
+            mock.call.set_agent_uuid_stamp(mock.ANY),
+            mock.call.bridge_exists('br-tun'),
+            mock.call.bridge_exists().__nonzero__(),
             mock.call.setup_controllers(mock.ANY),
+            mock.call.port_exists('patch-int'),
+            mock.call.port_exists().__nonzero__(),
             mock.call.add_patch_port('patch-int', 'patch-tun'),
         ]
         self.mock_int_bridge_expected += [
+            mock.call.port_exists('patch-tun'),
+            mock.call.port_exists().__nonzero__(),
             mock.call.add_patch_port('patch-tun', 'patch-int'),
         ]
         self.mock_int_bridge_expected += [
@@ -214,7 +221,6 @@ class TunnelTest(object):
         ]
 
         self.mock_tun_bridge_expected += [
-            mock.call.delete_flows(),
             mock.call.setup_default_table(self.INT_OFPORT, arp_responder),
         ]
 
@@ -510,8 +516,12 @@ class TunnelTest(object):
                 mock.patch.object(self.mod_agent.OVSNeutronAgent,
                                   'tunnel_sync'),\
                 mock.patch.object(time, 'sleep'),\
-                mock.patch.object(self.mod_agent.OVSNeutronAgent,
-                                  'update_stale_ofport_rules') as update_stale:
+                mock.patch.object(
+                    self.mod_agent.OVSNeutronAgent,
+                    'update_stale_ofport_rules') as update_stale,\
+                mock.patch.object(
+                    self.mod_agent.OVSNeutronAgent,
+                    'cleanup_stale_flows') as cleanup:
             log_exception.side_effect = Exception(
                 'Fake exception to get out of the loop')
             scan_ports.side_effect = [reply2, reply3]
@@ -545,6 +555,8 @@ class TunnelTest(object):
                            'removed': set(['tap0']),
                            'added': set([])}, False)
             ])
+
+            cleanup.assert_called_once_with()
             self.assertTrue(update_stale.called)
             self._verify_mock_calls()
 
@@ -568,6 +580,7 @@ class TunnelTestUseVethInterco(TunnelTest):
         ]
 
         self.mock_int_bridge_expected = [
+            mock.call.set_agent_uuid_stamp(mock.ANY),
             mock.call.create(),
             mock.call.set_secure_mode(),
             mock.call.setup_controllers(mock.ANY),
@@ -578,11 +591,11 @@ class TunnelTestUseVethInterco(TunnelTest):
         self.mock_map_tun_bridge_expected = [
             mock.call.setup_controllers(mock.ANY),
             mock.call.setup_default_table(),
-            mock.call.delete_port('phy-%s' % self.MAP_TUN_BRIDGE),
             mock.call.add_port(self.intb),
         ]
         self.mock_int_bridge_expected += [
-            mock.call.delete_port('int-%s' % self.MAP_TUN_BRIDGE),
+            mock.call.db_get_val('Interface', 'int-%s' % self.MAP_TUN_BRIDGE,
+                                 'type'),
             mock.call.add_port(self.inta)
         ]
 
@@ -594,11 +607,17 @@ class TunnelTestUseVethInterco(TunnelTest):
         ]
 
         self.mock_tun_bridge_expected = [
-            mock.call.reset_bridge(secure_mode=True),
+            mock.call.set_agent_uuid_stamp(mock.ANY),
+            mock.call.bridge_exists('br-tun'),
+            mock.call.bridge_exists().__nonzero__(),
             mock.call.setup_controllers(mock.ANY),
+            mock.call.port_exists('patch-int'),
+            mock.call.port_exists().__nonzero__(),
             mock.call.add_patch_port('patch-int', 'patch-tun'),
         ]
         self.mock_int_bridge_expected += [
+            mock.call.port_exists('patch-tun'),
+            mock.call.port_exists().__nonzero__(),
             mock.call.add_patch_port('patch-tun', 'patch-int')
         ]
         self.mock_int_bridge_expected += [
@@ -607,7 +626,6 @@ class TunnelTestUseVethInterco(TunnelTest):
                 'Port', columns=['name', 'other_config', 'tag'], ports=[])
         ]
         self.mock_tun_bridge_expected += [
-            mock.call.delete_flows(),
             mock.call.setup_default_table(self.INT_OFPORT, arp_responder),
         ]