The function gets a list of all the devices that exists on the machine,
and then iterates on them one at a time in order to find the correct
device which holds the ip specified. However, if one of the devices was
in the mean time deleted, the code will raise an Exception. In the ovs
agent's case, this will cause it to not run at all (requiring a
restart).
Also, changes to a few tests of LinuxBridge were made because
linuxbridge doesn't check that cfg.CONF.VXLAN.local_ip is not empty
before using it (a bug, surely). Since it's out of scope of this patch
to fix this, workarounds were implemented to make sure the tests ignore
the option instead.
Closes-Bug: #
1506503
Change-Id: Iad285d7c763b0e8e8f877c6892aadb0043e9a186
SYS_NET_PATH = '/sys/class/net'
DEFAULT_GW_PATTERN = re.compile(r"via (\S+)")
METRIC_PATTERN = re.compile(r"metric (\S+)")
+DEVICE_NAME_PATTERN = re.compile(r"(\d+?): (\S+?):.*")
class AddressNotReady(exceptions.NeutronException):
return retval
def get_device_by_ip(self, ip):
- """Get the IPDevice from system which has ip configured."""
- for device in self.get_devices():
- if device.addr.list(to=ip):
- return device
+ """Get the IPDevice from system which has ip configured.
+
+ @param ip: look for the device holding this ip. If this is None,
+ None is returned.
+ @type ip: str.
+ """
+ if not ip:
+ return None
+
+ addr = IpAddrCommand(self)
+ devices = addr.get_devices_with_ip(to=ip)
+ if devices:
+ return IPDevice(devices[0]['name'], namespace=self.namespace)
def add_tuntap(self, name, mode='tap'):
self._as_root([], 'tuntap', ('add', name, 'mode', mode))
def flush(self, ip_version):
self._as_root([ip_version], ('flush', self.name))
- def list(self, scope=None, to=None, filters=None, ip_version=None):
+ def get_devices_with_ip(self, name=None, scope=None, to=None,
+ filters=None, ip_version=None):
+ """Get a list of all the devices with an IP attached in the namespace.
+
+ @param name: if it's not None, only a device with that matching name
+ will be returned.
+ """
options = [ip_version] if ip_version else []
- args = ['show', self.name]
+
+ args = ['show']
+ if name:
+ args += [name]
if filters:
args += filters
-
- retval = []
-
if scope:
args += ['scope', scope]
if to:
args += ['to', to]
+ retval = []
+
for line in self._run(options, tuple(args)).split('\n'):
line = line.strip()
- if not line.startswith('inet'):
+
+ match = DEVICE_NAME_PATTERN.search(line)
+ if match:
+ # Found a match for a device name, but its' addresses will
+ # only appear in following lines, so we may as well continue.
+ device_name = match.group(2)
continue
- parts = line.split()
+ elif not line.startswith('inet'):
+ continue
+
+ parts = line.split(" ")
if parts[0] == 'inet6':
scope = parts[3]
else:
else:
scope = parts[3]
- retval.append(dict(cidr=parts[1],
+ retval.append(dict(name=device_name,
+ cidr=parts[1],
scope=scope,
dynamic=('dynamic' == parts[-1]),
tentative=('tentative' in line),
dadfailed=('dadfailed' == parts[-1])))
return retval
+ def list(self, scope=None, to=None, filters=None, ip_version=None):
+ """Get device details of a device named <self.name>."""
+ return self.get_devices_with_ip(
+ self.name, scope, to, filters, ip_version)
+
def wait_until_address_ready(self, address, wait_time=30):
"""Wait until an address is no longer marked 'tentative'
device.link.delete()
self.assertFalse(ip_lib.vxlan_in_use(9999, namespace=attr.namespace))
+ def test_ipwrapper_get_device_by_ip_None(self):
+ ip_wrapper = ip_lib.IPWrapper(namespace=None)
+ self.assertIsNone(ip_wrapper.get_device_by_ip(ip=None))
+
def test_ipwrapper_get_device_by_ip(self):
attr = self.generate_device_details()
self.manage_device(attr)
# under the License.
import mock
+from oslo_config import cfg
from oslo_log import log as logging
import testtools
agent_rpc = ('neutron.agent.rpc.PluginApi')
mock.patch(agent_rpc).start()
mock.patch('neutron.agent.rpc.PluginReportStateAPI').start()
+ cfg.CONF.set_override('enable_vxlan', False, 'VXLAN')
def test_validate_interface_mappings(self):
mappings = {'physnet1': 'int1', 'physnet2': 'int2'}
prefix=lb_agent.BRIDGE_NAME_PREFIX))).fixture
config = callback(br_fixture)
+ config.update({'VXLAN': {'enable_vxlan': 'False'}})
temp_dir = self.useFixture(fixtures.TempDir()).path
conf = self.useFixture(config_fixtures.ConfigFileFixture(
def test_list(self):
expected = [
- dict(scope='global', dadfailed=False, tentative=False,
+ dict(name='eth0', scope='global', dadfailed=False, tentative=False,
dynamic=False, cidr='172.16.77.240/24'),
- dict(scope='global', dadfailed=False, tentative=False,
+ dict(name='eth0', scope='global', dadfailed=False, tentative=False,
dynamic=True, cidr='2001:470:9:1224:5595:dd51:6ba2:e788/64'),
- dict(scope='link', dadfailed=False, tentative=True,
+ dict(name='eth0', scope='link', dadfailed=False, tentative=True,
dynamic=False, cidr='fe80::3023:39ff:febc:22ae/64'),
- dict(scope='link', dadfailed=True, tentative=True,
+ dict(name='eth0', scope='link', dadfailed=True, tentative=True,
dynamic=False, cidr='fe80::3023:39ff:febc:22af/64'),
- dict(scope='global', dadfailed=False, tentative=False,
+ dict(name='eth0', scope='global', dadfailed=False, tentative=False,
dynamic=True, cidr='2001:470:9:1224:fd91:272:581e:3a32/64'),
- dict(scope='global', dadfailed=False, tentative=False,
+ dict(name='eth0', scope='global', dadfailed=False, tentative=False,
dynamic=True, cidr='2001:470:9:1224:4508:b885:5fb:740b/64'),
- dict(scope='global', dadfailed=False, tentative=False,
+ dict(name='eth0', scope='global', dadfailed=False, tentative=False,
dynamic=True, cidr='2001:470:9:1224:dfcc:aaff:feb9:76ce/64'),
- dict(scope='link', dadfailed=False, tentative=False,
+ dict(name='eth0', scope='link', dadfailed=False, tentative=False,
dynamic=False, cidr='fe80::dfcc:aaff:feb9:76ce/64')]
test_cases = [ADDR_SAMPLE, ADDR_SAMPLE2]
def test_list_filtered(self):
expected = [
- dict(scope='global', tentative=False, dadfailed=False,
+ dict(name='eth0', scope='global', tentative=False, dadfailed=False,
dynamic=False, cidr='172.16.77.240/24')]
test_cases = [ADDR_SAMPLE, ADDR_SAMPLE2]