from oslo.config import cfg
from oslo.serialization import jsonutils
from oslo.utils import excutils
+import retrying
+import six
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
LOG = logging.getLogger(__name__)
+def _ofport_result_pending(result):
+ """Return True if ovs-vsctl indicates the result is still pending."""
+ # ovs-vsctl can return '[]' for an ofport that has not yet been assigned
+ try:
+ int(result)
+ return False
+ except (ValueError, TypeError):
+ return True
+
+
+def _ofport_retry(fn):
+ """Decorator for retrying when OVS has yet to assign an ofport.
+
+ The instance's vsctl_timeout is used as the max waiting time. This relies
+ on the fact that instance methods receive self as the first argument.
+ """
+ @six.wraps(fn)
+ def wrapped(*args, **kwargs):
+ self = args[0]
+ new_fn = retrying.retry(
+ retry_on_result=_ofport_result_pending,
+ stop_max_delay=self.vsctl_timeout * 1000,
+ wait_exponential_multiplier=10,
+ wait_exponential_max=1000,
+ retry_on_exception=lambda _: False)(fn)
+ return new_fn(*args, **kwargs)
+ return wrapped
+
+
class VifPort:
def __init__(self, port_name, ofport, vif_id, vif_mac, switch):
self.port_name = port_name
self.destroy()
self.create()
- def add_port(self, port_name):
- self.run_vsctl(["--", "--may-exist", "add-port", self.br_name,
- port_name])
- return self.get_port_ofport(port_name)
+ def add_port(self, port_name, *interface_options):
+ args = ["--", "--may-exist", "add-port", self.br_name, port_name]
+ if interface_options:
+ args += ['--', 'set', 'Interface', port_name]
+ args += ['%s=%s' % kv for kv in interface_options]
+ self.run_vsctl(args)
+ ofport = self.get_port_ofport(port_name)
+ if ofport == INVALID_OFPORT:
+ self.delete_port(port_name)
+ return ofport
def replace_port(self, port_name, *interface_attr_tuples):
"""Replace existing port or create it, and configure port interface."""
def remove_all_flows(self):
self.run_ofctl("del-flows", [])
+ @_ofport_retry
+ def _get_port_ofport(self, port_name):
+ return self.db_get_val("Interface", port_name, "ofport")
+
def get_port_ofport(self, port_name):
- ofport = self.db_get_val("Interface", port_name, "ofport")
- # This can return a non-integer string, like '[]' so ensure a
- # common failure case
+ """Get the port's assigned ofport, retrying if not yet assigned."""
+ ofport = INVALID_OFPORT
try:
- int(ofport)
- return ofport
- except (ValueError, TypeError):
- return INVALID_OFPORT
+ ofport = self._get_port_ofport(port_name)
+ except retrying.RetryError as e:
+ LOG.exception(_LE("Timed out retrieving ofport on port %(pname)s. "
+ "Exception: %(exception)s"),
+ {'pname': port_name, 'exception': e})
+ return ofport
def get_datapath_id(self):
return self.db_get_val('Bridge',
tunnel_type=constants.TYPE_GRE,
vxlan_udp_port=constants.VXLAN_UDP_PORT,
dont_fragment=True):
- vsctl_command = ["--", "--may-exist", "add-port", self.br_name,
- port_name]
- vsctl_command.extend(["--", "set", "Interface", port_name,
- "type=%s" % tunnel_type])
- if tunnel_type == constants.TYPE_VXLAN:
- # Only set the VXLAN UDP port if it's not the default
- if vxlan_udp_port != constants.VXLAN_UDP_PORT:
- vsctl_command.append("options:dst_port=%s" % vxlan_udp_port)
- vsctl_command.append(("options:df_default=%s" %
- bool(dont_fragment)).lower())
- vsctl_command.extend(["options:remote_ip=%s" % remote_ip,
- "options:local_ip=%s" % local_ip,
- "options:in_key=flow",
- "options:out_key=flow"])
- self.run_vsctl(vsctl_command)
- ofport = self.get_port_ofport(port_name)
+ options = [('type', tunnel_type)]
if (tunnel_type == constants.TYPE_VXLAN and
- ofport == INVALID_OFPORT):
- LOG.error(_LE('Unable to create VXLAN tunnel port. Please ensure '
- 'that an openvswitch version that supports VXLAN is '
- 'installed.'))
- return ofport
+ vxlan_udp_port != constants.VXLAN_UDP_PORT):
+ # Only set the VXLAN UDP port if it's not the default
+ options.append(("options:dst_port", vxlan_udp_port))
+ options.extend([
+ ("options:df_default", str(bool(dont_fragment)).lower()),
+ ("options:remote_ip", remote_ip),
+ ("options:local_ip", local_ip),
+ ("options:in_key", "flow"),
+ ("options:out_key", "flow")])
+ return self.add_port(port_name, *options)
def add_patch_port(self, local_name, remote_name):
- self.run_vsctl(["add-port", self.br_name, local_name,
- "--", "set", "Interface", local_name,
- "type=patch", "options:peer=%s" % remote_name])
- return self.get_port_ofport(local_name)
+ options = [
+ ('type', 'patch'),
+ ('options:peer', remote_name)
+ ]
+ return self.add_port(local_name, *options)
def db_get_map(self, table, record, column, check_error=False):
output = self.run_vsctl(["get", table, record, column], check_error)
LOG.debug("No VIF port for port %s defined on agent.", port_id)
def _setup_tunnel_port(self, br, port_name, remote_ip, tunnel_type):
- ofport_str = br.add_tunnel_port(port_name,
- remote_ip,
- self.local_ip,
- tunnel_type,
- self.vxlan_udp_port,
- self.dont_fragment)
- ofport = -1
- try:
- ofport = int(ofport_str)
- except (TypeError, ValueError):
- LOG.exception(_LE("ofport should have a value that can be "
- "interpreted as an integer"))
- if ofport < 0:
+ ofport = br.add_tunnel_port(port_name,
+ remote_ip,
+ self.local_ip,
+ tunnel_type,
+ self.vxlan_udp_port,
+ self.dont_fragment)
+ if ofport == ovs_lib.INVALID_OFPORT:
LOG.error(_LE("Failed to set-up %(type)s tunnel port to %(ip)s"),
{'type': tunnel_type, 'ip': remote_ip})
return 0
-
+ ofport = int(ofport)
self.tun_ofports[tunnel_type][remote_ip] = ofport
br.check_in_port_add_tunnel_port(tunnel_type, ofport)
return ofport
cfg.CONF.OVS.int_peer_patch_port, cfg.CONF.OVS.tun_peer_patch_port)
self.patch_int_ofport = self.tun_br.add_patch_port(
cfg.CONF.OVS.tun_peer_patch_port, cfg.CONF.OVS.int_peer_patch_port)
- if int(self.patch_tun_ofport) < 0 or int(self.patch_int_ofport) < 0:
+ if ovs_lib.INVALID_OFPORT in (self.patch_tun_ofport,
+ self.patch_int_ofport):
LOG.error(_LE("Failed to create OVS patch port. Cannot have "
"tunneling enabled on this agent, since this "
- "version of OVS does not support tunnels or "
- "patch ports. Agent terminated!"))
+ "version of OVS does not support tunnels or patch "
+ "ports. Agent terminated!"))
exit(1)
self.tun_br.remove_all_flows()
tunnel_type,
self.vxlan_udp_port,
self.dont_fragment)
- ofport_int = -1
- try:
- ofport_int = int(ofport)
- except (TypeError, ValueError):
- LOG.exception(_LE("ofport should have a value that can be "
- "interpreted as an integer"))
- if ofport_int < 0:
+ if ofport == ovs_lib.INVALID_OFPORT:
LOG.error(_LE("Failed to set-up %(type)s tunnel port to %(ip)s"),
{'type': tunnel_type, 'ip': remote_ip})
return 0
"actions=normal",
root_helper=self.root_helper)
+ def _set_timeout(self, val):
+ self.TO = '--timeout=%d' % val
+ self.br.vsctl_timeout = val
+
def _test_get_port_ofport(self, ofport, expected_result):
pname = "tap99"
+ self._set_timeout(0) # Don't waste precious time retrying
self.execute.return_value = ofport
self.assertEqual(self.br.get_port_ofport(pname), expected_result)
- self.execute.assert_called_once_with(
+ self.execute.assert_called_with(
["ovs-vsctl", self.TO, "get", "Interface", pname, "ofport"],
root_helper=self.root_helper)
def test_get_port_ofport_returns_invalid_ofport_for_none(self):
self._test_get_port_ofport(None, ovs_lib.INVALID_OFPORT)
+ def test_get_port_ofport_returns_invalid_for_invalid(self):
+ self._test_get_port_ofport(ovs_lib.INVALID_OFPORT,
+ ovs_lib.INVALID_OFPORT)
+
def test_get_datapath_id(self):
datapath_id = '"0000b67f4fbcc149"'
self.execute.return_value = datapath_id
ofport = "6"
# Each element is a tuple of (expected mock call, return_value)
- command = ["ovs-vsctl", self.TO, "add-port", self.BR_NAME, pname]
+ command = ["ovs-vsctl", self.TO, "--", "--may-exist", "add-port",
+ self.BR_NAME, pname]
command.extend(["--", "set", "Interface", pname])
command.extend(["type=patch", "options:peer=" + peer])
expected_calls_and_values = [
from oslo.utils import importutils
import testtools
+from neutron.agent.linux import ovs_lib
from neutron.common import constants as n_const
from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2.drivers.l2pop import rpc as l2pop_rpc
{'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
self.assertEqual(ofport, 0)
- def test__setup_tunnel_port_error_not_int(self):
- with contextlib.nested(
- mock.patch.object(self.agent.int_br, 'add_tunnel_port',
- return_value=None),
- mock.patch.object(self.mod_agent.LOG, 'exception'),
- mock.patch.object(self.mod_agent.LOG, 'error')
- ) as (add_tunnel_port_fn, log_exc_fn, log_error_fn):
- ofport = self.agent._setup_tunnel_port(
- self.agent.int_br, 'gre-1', 'remote_ip', p_const.TYPE_GRE)
- add_tunnel_port_fn.assert_called_once_with(
- 'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
- self.agent.vxlan_udp_port, self.agent.dont_fragment)
- log_exc_fn.assert_called_once_with(
- _("ofport should have a value that can be "
- "interpreted as an integer"))
- log_error_fn.assert_called_once_with(
- _("Failed to set-up %(type)s tunnel port to %(ip)s"),
- {'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
- self.assertEqual(ofport, 0)
+ def test_setup_tunnel_port_returns_zero_for_failed_port_add(self):
+ with mock.patch.object(self.agent.int_br, 'add_tunnel_port',
+ return_value=ovs_lib.INVALID_OFPORT):
+ result = self.agent._setup_tunnel_port(self.agent.int_br, 'gre-1',
+ 'remote_ip',
+ p_const.TYPE_GRE)
+ self.assertEqual(0, result)
def test_tunnel_sync(self):
self.agent.local_ip = 'agent_ip'
constants.DEFAULT_OVSDBMON_RESPAWN)
mock_loop.assert_called_once_with(polling_manager=mock.ANY)
- def test__setup_tunnel_port_error_negative(self):
+ def test_setup_tunnel_port_invalid_ofport(self):
with contextlib.nested(
mock.patch.object(self.agent.tun_br, 'add_tunnel_port',
- return_value='-1'),
+ return_value=ovs_lib.INVALID_OFPORT),
mock.patch.object(ovs_neutron_agent.LOG, 'error')
) as (add_tunnel_port_fn, log_error_fn):
ofport = self.agent._setup_tunnel_port(
{'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
self.assertEqual(ofport, 0)
- def test__setup_tunnel_port_error_not_int(self):
- with contextlib.nested(
- mock.patch.object(self.agent.tun_br, 'add_tunnel_port',
- return_value=None),
- mock.patch.object(ovs_neutron_agent.LOG, 'exception'),
- mock.patch.object(ovs_neutron_agent.LOG, 'error')
- ) as (add_tunnel_port_fn, log_exc_fn, log_error_fn):
- ofport = self.agent._setup_tunnel_port(
- self.agent.tun_br, 'gre-1', 'remote_ip', p_const.TYPE_GRE)
- add_tunnel_port_fn.assert_called_once_with(
- 'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
- self.agent.vxlan_udp_port, self.agent.dont_fragment)
- log_exc_fn.assert_called_once_with(
- _("ofport should have a value that can be "
- "interpreted as an integer"))
- log_error_fn.assert_called_once_with(
- _("Failed to set-up %(type)s tunnel port to %(ip)s"),
- {'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
- self.assertEqual(ofport, 0)
-
- def test__setup_tunnel_port_error_negative_df_disabled(self):
+ def test_setup_tunnel_port_error_negative_df_disabled(self):
with contextlib.nested(
mock.patch.object(self.agent.tun_br, 'add_tunnel_port',
return_value='-1'),
keystonemiddleware>=1.0.0
netaddr>=0.7.12
python-neutronclient>=2.3.6,<3
+retrying>=1.2.3,!=1.3.0 # Apache-2.0
SQLAlchemy>=0.9.7,<=0.9.99
WebOb>=1.2.3
python-keystoneclient>=0.11.1