]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Retry on unassigned ofport instead of treating it as a failure
authorTerry Wilson <twilson@redhat.com>
Fri, 11 Jul 2014 23:55:30 +0000 (17:55 -0600)
committerTerry Wilson <twilson@redhat.com>
Mon, 29 Dec 2014 22:12:52 +0000 (15:12 -0700)
Open vSwitch will return '[]' when querying an interface's ofport
when the ofport has not yet been assigned. This doesn't signal a
failure, but the get_port_ofport code was treating it as such.
This patch uses a decorator from python-retrying which has been
added as a dependency of oslo_concurrency and therefore packaged
everywhere. The call to fetch the ofport is retried until the
vsctl_timeout is reached and, on failure, INVALID_OFPORT is
returned.

The add_port function will attempt to delete the port if
INVALID_OFPORT is returned from get_port_ofport. add_port is also
extended to take optional Interface options so that the
add_tunnel_port and add_patch_port functions can reuse it instead
of just duplicating its functionality.

Closes-Bug: #1341020
Change-Id: Ifc52d8589c7aafd360893cb9c1cdcbf43b04ee2c

neutron/agent/linux/ovs_lib.py
neutron/plugins/ofagent/agent/ofa_neutron_agent.py
neutron/plugins/openvswitch/agent/ovs_neutron_agent.py
neutron/tests/unit/agent/linux/test_ovs_lib.py
neutron/tests/unit/ofagent/test_ofa_neutron_agent.py
neutron/tests/unit/openvswitch/test_ovs_neutron_agent.py
requirements.txt

index 81c72b745c67e9f7876e44c7fcab1300db923283..5a34eeb2530b427d061c7ab6d6a706740e2ee101 100644 (file)
@@ -19,6 +19,8 @@ import operator
 from oslo.config import cfg
 from oslo.serialization import jsonutils
 from oslo.utils import excutils
+import retrying
+import six
 
 from neutron.agent.linux import ip_lib
 from neutron.agent.linux import utils
@@ -43,6 +45,35 @@ cfg.CONF.register_opts(OPTS)
 LOG = logging.getLogger(__name__)
 
 
+def _ofport_result_pending(result):
+    """Return True if ovs-vsctl indicates the result is still pending."""
+    # ovs-vsctl can return '[]' for an ofport that has not yet been assigned
+    try:
+        int(result)
+        return False
+    except (ValueError, TypeError):
+        return True
+
+
+def _ofport_retry(fn):
+    """Decorator for retrying when OVS has yet to assign an ofport.
+
+    The instance's vsctl_timeout is used as the max waiting time. This relies
+    on the fact that instance methods receive self as the first argument.
+    """
+    @six.wraps(fn)
+    def wrapped(*args, **kwargs):
+        self = args[0]
+        new_fn = retrying.retry(
+            retry_on_result=_ofport_result_pending,
+            stop_max_delay=self.vsctl_timeout * 1000,
+            wait_exponential_multiplier=10,
+            wait_exponential_max=1000,
+            retry_on_exception=lambda _: False)(fn)
+        return new_fn(*args, **kwargs)
+    return wrapped
+
+
 class VifPort:
     def __init__(self, port_name, ofport, vif_id, vif_mac, switch):
         self.port_name = port_name
@@ -145,10 +176,16 @@ class OVSBridge(BaseOVS):
         self.destroy()
         self.create()
 
-    def add_port(self, port_name):
-        self.run_vsctl(["--", "--may-exist", "add-port", self.br_name,
-                        port_name])
-        return self.get_port_ofport(port_name)
+    def add_port(self, port_name, *interface_options):
+        args = ["--", "--may-exist", "add-port", self.br_name, port_name]
+        if interface_options:
+            args += ['--', 'set', 'Interface', port_name]
+            args += ['%s=%s' % kv for kv in interface_options]
+        self.run_vsctl(args)
+        ofport = self.get_port_ofport(port_name)
+        if ofport == INVALID_OFPORT:
+            self.delete_port(port_name)
+        return ofport
 
     def replace_port(self, port_name, *interface_attr_tuples):
         """Replace existing port or create it, and configure port interface."""
@@ -188,15 +225,20 @@ class OVSBridge(BaseOVS):
     def remove_all_flows(self):
         self.run_ofctl("del-flows", [])
 
+    @_ofport_retry
+    def _get_port_ofport(self, port_name):
+        return self.db_get_val("Interface", port_name, "ofport")
+
     def get_port_ofport(self, port_name):
-        ofport = self.db_get_val("Interface", port_name, "ofport")
-        # This can return a non-integer string, like '[]' so ensure a
-        # common failure case
+        """Get the port's assigned ofport, retrying if not yet assigned."""
+        ofport = INVALID_OFPORT
         try:
-            int(ofport)
-            return ofport
-        except (ValueError, TypeError):
-            return INVALID_OFPORT
+            ofport = self._get_port_ofport(port_name)
+        except retrying.RetryError as e:
+            LOG.exception(_LE("Timed out retrieving ofport on port %(pname)s. "
+                              "Exception: %(exception)s"),
+                          {'pname': port_name, 'exception': e})
+        return ofport
 
     def get_datapath_id(self):
         return self.db_get_val('Bridge',
@@ -231,34 +273,25 @@ class OVSBridge(BaseOVS):
                         tunnel_type=constants.TYPE_GRE,
                         vxlan_udp_port=constants.VXLAN_UDP_PORT,
                         dont_fragment=True):
-        vsctl_command = ["--", "--may-exist", "add-port", self.br_name,
-                         port_name]
-        vsctl_command.extend(["--", "set", "Interface", port_name,
-                              "type=%s" % tunnel_type])
-        if tunnel_type == constants.TYPE_VXLAN:
-            # Only set the VXLAN UDP port if it's not the default
-            if vxlan_udp_port != constants.VXLAN_UDP_PORT:
-                vsctl_command.append("options:dst_port=%s" % vxlan_udp_port)
-        vsctl_command.append(("options:df_default=%s" %
-                             bool(dont_fragment)).lower())
-        vsctl_command.extend(["options:remote_ip=%s" % remote_ip,
-                              "options:local_ip=%s" % local_ip,
-                              "options:in_key=flow",
-                              "options:out_key=flow"])
-        self.run_vsctl(vsctl_command)
-        ofport = self.get_port_ofport(port_name)
+        options = [('type', tunnel_type)]
         if (tunnel_type == constants.TYPE_VXLAN and
-                ofport == INVALID_OFPORT):
-            LOG.error(_LE('Unable to create VXLAN tunnel port. Please ensure '
-                          'that an openvswitch version that supports VXLAN is '
-                          'installed.'))
-        return ofport
+                vxlan_udp_port != constants.VXLAN_UDP_PORT):
+            # Only set the VXLAN UDP port if it's not the default
+            options.append(("options:dst_port", vxlan_udp_port))
+        options.extend([
+            ("options:df_default", str(bool(dont_fragment)).lower()),
+            ("options:remote_ip", remote_ip),
+            ("options:local_ip", local_ip),
+            ("options:in_key", "flow"),
+            ("options:out_key", "flow")])
+        return self.add_port(port_name, *options)
 
     def add_patch_port(self, local_name, remote_name):
-        self.run_vsctl(["add-port", self.br_name, local_name,
-                        "--", "set", "Interface", local_name,
-                        "type=patch", "options:peer=%s" % remote_name])
-        return self.get_port_ofport(local_name)
+        options = [
+            ('type', 'patch'),
+            ('options:peer', remote_name)
+        ]
+        return self.add_port(local_name, *options)
 
     def db_get_map(self, table, record, column, check_error=False):
         output = self.run_vsctl(["get", table, record, column], check_error)
index 3427d30c2f0030b522c0e3f31ca853e33203d611..d75dc3f18ce9036cfea7f83dcb10c258590f4fe4 100644 (file)
@@ -648,23 +648,17 @@ class OFANeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
             LOG.debug("No VIF port for port %s defined on agent.", port_id)
 
     def _setup_tunnel_port(self, br, port_name, remote_ip, tunnel_type):
-        ofport_str = br.add_tunnel_port(port_name,
-                                        remote_ip,
-                                        self.local_ip,
-                                        tunnel_type,
-                                        self.vxlan_udp_port,
-                                        self.dont_fragment)
-        ofport = -1
-        try:
-            ofport = int(ofport_str)
-        except (TypeError, ValueError):
-            LOG.exception(_LE("ofport should have a value that can be "
-                              "interpreted as an integer"))
-        if ofport < 0:
+        ofport = br.add_tunnel_port(port_name,
+                                    remote_ip,
+                                    self.local_ip,
+                                    tunnel_type,
+                                    self.vxlan_udp_port,
+                                    self.dont_fragment)
+        if ofport == ovs_lib.INVALID_OFPORT:
             LOG.error(_LE("Failed to set-up %(type)s tunnel port to %(ip)s"),
                       {'type': tunnel_type, 'ip': remote_ip})
             return 0
-
+        ofport = int(ofport)
         self.tun_ofports[tunnel_type][remote_ip] = ofport
         br.check_in_port_add_tunnel_port(tunnel_type, ofport)
         return ofport
index ea64321e08ae679b3405262e896954669900404d..4fd77e37ea971be4bd10f7a73377a1db86215763 100644 (file)
@@ -763,11 +763,12 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
             cfg.CONF.OVS.int_peer_patch_port, cfg.CONF.OVS.tun_peer_patch_port)
         self.patch_int_ofport = self.tun_br.add_patch_port(
             cfg.CONF.OVS.tun_peer_patch_port, cfg.CONF.OVS.int_peer_patch_port)
-        if int(self.patch_tun_ofport) < 0 or int(self.patch_int_ofport) < 0:
+        if ovs_lib.INVALID_OFPORT in (self.patch_tun_ofport,
+                                      self.patch_int_ofport):
             LOG.error(_LE("Failed to create OVS patch port. Cannot have "
                           "tunneling enabled on this agent, since this "
-                          "version of OVS does not support tunnels or "
-                          "patch ports. Agent terminated!"))
+                          "version of OVS does not support tunnels or patch "
+                          "ports. Agent terminated!"))
             exit(1)
         self.tun_br.remove_all_flows()
 
@@ -1042,13 +1043,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
                                     tunnel_type,
                                     self.vxlan_udp_port,
                                     self.dont_fragment)
-        ofport_int = -1
-        try:
-            ofport_int = int(ofport)
-        except (TypeError, ValueError):
-            LOG.exception(_LE("ofport should have a value that can be "
-                              "interpreted as an integer"))
-        if ofport_int < 0:
+        if ofport == ovs_lib.INVALID_OFPORT:
             LOG.error(_LE("Failed to set-up %(type)s tunnel port to %(ip)s"),
                       {'type': tunnel_type, 'ip': remote_ip})
             return 0
index b6780a968aa344996a0cb397da1f02e228cc98d6..e2db4904d758728f7ec82879dbf5a4d2e6846f25 100644 (file)
@@ -363,11 +363,16 @@ class OVS_Lib_Test(base.BaseTestCase):
                           "actions=normal",
             root_helper=self.root_helper)
 
+    def _set_timeout(self, val):
+        self.TO = '--timeout=%d' % val
+        self.br.vsctl_timeout = val
+
     def _test_get_port_ofport(self, ofport, expected_result):
         pname = "tap99"
+        self._set_timeout(0)  # Don't waste precious time retrying
         self.execute.return_value = ofport
         self.assertEqual(self.br.get_port_ofport(pname), expected_result)
-        self.execute.assert_called_once_with(
+        self.execute.assert_called_with(
             ["ovs-vsctl", self.TO, "get", "Interface", pname, "ofport"],
             root_helper=self.root_helper)
 
@@ -380,6 +385,10 @@ class OVS_Lib_Test(base.BaseTestCase):
     def test_get_port_ofport_returns_invalid_ofport_for_none(self):
         self._test_get_port_ofport(None, ovs_lib.INVALID_OFPORT)
 
+    def test_get_port_ofport_returns_invalid_for_invalid(self):
+        self._test_get_port_ofport(ovs_lib.INVALID_OFPORT,
+                                   ovs_lib.INVALID_OFPORT)
+
     def test_get_datapath_id(self):
         datapath_id = '"0000b67f4fbcc149"'
         self.execute.return_value = datapath_id
@@ -539,7 +548,8 @@ class OVS_Lib_Test(base.BaseTestCase):
         ofport = "6"
 
         # Each element is a tuple of (expected mock call, return_value)
-        command = ["ovs-vsctl", self.TO, "add-port", self.BR_NAME, pname]
+        command = ["ovs-vsctl", self.TO, "--", "--may-exist", "add-port",
+                   self.BR_NAME, pname]
         command.extend(["--", "set", "Interface", pname])
         command.extend(["type=patch", "options:peer=" + peer])
         expected_calls_and_values = [
index b13196510999fff6e8a27c2d1ed438bc84f30ffd..81af685fef0d17a6ec82fae7ea55ecba53bad211 100644 (file)
@@ -29,6 +29,7 @@ from oslo.config import cfg
 from oslo.utils import importutils
 import testtools
 
+from neutron.agent.linux import ovs_lib
 from neutron.common import constants as n_const
 from neutron.plugins.common import constants as p_const
 from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc
@@ -748,25 +749,13 @@ class TestOFANeutronAgent(ofa_test_base.OFAAgentTestBase):
                 {'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
             self.assertEqual(ofport, 0)
 
-    def test__setup_tunnel_port_error_not_int(self):
-        with contextlib.nested(
-            mock.patch.object(self.agent.int_br, 'add_tunnel_port',
-                              return_value=None),
-            mock.patch.object(self.mod_agent.LOG, 'exception'),
-            mock.patch.object(self.mod_agent.LOG, 'error')
-        ) as (add_tunnel_port_fn, log_exc_fn, log_error_fn):
-            ofport = self.agent._setup_tunnel_port(
-                self.agent.int_br, 'gre-1', 'remote_ip', p_const.TYPE_GRE)
-            add_tunnel_port_fn.assert_called_once_with(
-                'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
-                self.agent.vxlan_udp_port, self.agent.dont_fragment)
-            log_exc_fn.assert_called_once_with(
-                _("ofport should have a value that can be "
-                  "interpreted as an integer"))
-            log_error_fn.assert_called_once_with(
-                _("Failed to set-up %(type)s tunnel port to %(ip)s"),
-                {'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
-            self.assertEqual(ofport, 0)
+    def test_setup_tunnel_port_returns_zero_for_failed_port_add(self):
+        with mock.patch.object(self.agent.int_br, 'add_tunnel_port',
+                               return_value=ovs_lib.INVALID_OFPORT):
+            result = self.agent._setup_tunnel_port(self.agent.int_br, 'gre-1',
+                                                  'remote_ip',
+                                                  p_const.TYPE_GRE)
+        self.assertEqual(0, result)
 
     def test_tunnel_sync(self):
         self.agent.local_ip = 'agent_ip'
index b098b710b0152447e5719326c222fd3465c97f43..8cc66c44bbbca5d0526e02f92751f329a9d88038 100644 (file)
@@ -1316,10 +1316,10 @@ class TestOvsNeutronAgent(base.BaseTestCase):
                                        constants.DEFAULT_OVSDBMON_RESPAWN)
         mock_loop.assert_called_once_with(polling_manager=mock.ANY)
 
-    def test__setup_tunnel_port_error_negative(self):
+    def test_setup_tunnel_port_invalid_ofport(self):
         with contextlib.nested(
             mock.patch.object(self.agent.tun_br, 'add_tunnel_port',
-                              return_value='-1'),
+                              return_value=ovs_lib.INVALID_OFPORT),
             mock.patch.object(ovs_neutron_agent.LOG, 'error')
         ) as (add_tunnel_port_fn, log_error_fn):
             ofport = self.agent._setup_tunnel_port(
@@ -1332,27 +1332,7 @@ class TestOvsNeutronAgent(base.BaseTestCase):
                 {'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
             self.assertEqual(ofport, 0)
 
-    def test__setup_tunnel_port_error_not_int(self):
-        with contextlib.nested(
-            mock.patch.object(self.agent.tun_br, 'add_tunnel_port',
-                              return_value=None),
-            mock.patch.object(ovs_neutron_agent.LOG, 'exception'),
-            mock.patch.object(ovs_neutron_agent.LOG, 'error')
-        ) as (add_tunnel_port_fn, log_exc_fn, log_error_fn):
-            ofport = self.agent._setup_tunnel_port(
-                self.agent.tun_br, 'gre-1', 'remote_ip', p_const.TYPE_GRE)
-            add_tunnel_port_fn.assert_called_once_with(
-                'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
-                self.agent.vxlan_udp_port, self.agent.dont_fragment)
-            log_exc_fn.assert_called_once_with(
-                _("ofport should have a value that can be "
-                  "interpreted as an integer"))
-            log_error_fn.assert_called_once_with(
-                _("Failed to set-up %(type)s tunnel port to %(ip)s"),
-                {'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
-            self.assertEqual(ofport, 0)
-
-    def test__setup_tunnel_port_error_negative_df_disabled(self):
+    def test_setup_tunnel_port_error_negative_df_disabled(self):
         with contextlib.nested(
             mock.patch.object(self.agent.tun_br, 'add_tunnel_port',
                               return_value='-1'),
index 6ec11414eccaae3d01d959a4fa0681826bdffd19..cc6689f499026005fb73a69b4460317b4c5e219e 100644 (file)
@@ -15,6 +15,7 @@ Jinja2>=2.6  # BSD License (3 clause)
 keystonemiddleware>=1.0.0
 netaddr>=0.7.12
 python-neutronclient>=2.3.6,<3
+retrying>=1.2.3,!=1.3.0 # Apache-2.0
 SQLAlchemy>=0.9.7,<=0.9.99
 WebOb>=1.2.3
 python-keystoneclient>=0.11.1