to_fip_interface_name = (
self._get_external_device_interface_name(ri, ex_gw_port))
ri.process_floating_ip_addresses(to_fip_interface_name)
+ snat_ports = self.get_snat_interfaces(ri)
for p in ri.internal_ports:
+ gateway = self._map_internal_interfaces(ri, p, snat_ports)
internal_interface = ri.get_internal_device_name(p['id'])
- self._snat_redirect_remove(ri, p, internal_interface)
+ self._snat_redirect_remove(ri, gateway['fixed_ips'][0]
+ ['ip_address'],
+ p, internal_interface)
if (self.conf.agent_mode == l3_constants.L3_AGENT_MODE_DVR_SNAT
and self.get_gw_port_host(ri.router) == self.host):
port_id = port['id']
interface_name = ri.get_internal_device_name(port_id)
if ri.router['distributed'] and ri.ex_gw_port:
- # DVR handling code for SNAT
- self._snat_redirect_remove(ri, port, interface_name)
- if (self.conf.agent_mode == l3_constants.L3_AGENT_MODE_DVR_SNAT
- and ri.ex_gw_port['binding:host_id'] == self.host):
- snat_port = self._map_internal_interfaces(ri, port,
- ri.snat_ports)
- if snat_port:
+ sn_port = self._map_internal_interfaces(ri, port, ri.snat_ports)
+ if sn_port:
+ self._snat_redirect_remove(ri, sn_port['fixed_ips'][0]
+ ['ip_address'], port,
+ interface_name)
+ if (self.conf.agent_mode == l3_constants.L3_AGENT_MODE_DVR_SNAT
+ and ri.ex_gw_port['binding:host_id'] == self.host):
snat_interface = (
- self.get_snat_int_device_name(snat_port['id'])
+ self.get_snat_int_device_name(sn_port['id'])
)
ns_name = ri.snat_namespace.name
prefix = dvr.SNAT_INT_DEV_PREFIX
try:
ip_cidr = sn_port['ip_cidr']
snat_idx = self._get_snat_idx(ip_cidr)
- ns_ipr = ip_lib.IpRule(namespace=ri.ns_name)
+ ns_ipr = ip_lib.IPRule(namespace=ri.ns_name)
ns_ipd = ip_lib.IPDevice(sn_int, namespace=ri.ns_name)
+ ns_ipwrapr = ip_lib.IPWrapper(namespace=ri.ns_name)
ns_ipd.route.add_gateway(gateway, table=snat_idx)
- ns_ipr.add(ip_cidr, snat_idx, snat_idx)
- ns_ipr.netns.execute(['sysctl', '-w', 'net.ipv4.conf.%s.'
- 'send_redirects=0' % sn_int])
+ ns_ipr.rule.add(ip_cidr, snat_idx, snat_idx)
+ ns_ipwrapr.netns.execute(['sysctl', '-w', 'net.ipv4.conf.%s.'
+ 'send_redirects=0' % sn_int])
except Exception:
LOG.exception(_LE('DVR: error adding redirection logic'))
- def _snat_redirect_remove(self, ri, sn_port, sn_int):
+ def _snat_redirect_remove(self, ri, gateway, sn_port, sn_int):
"""Removes rules and routes for SNAT redirection."""
try:
ip_cidr = sn_port['ip_cidr']
snat_idx = self._get_snat_idx(ip_cidr)
- ns_ipr = ip_lib.IpRule(namespace=ri.ns_name)
+ ns_ipr = ip_lib.IPRule(namespace=ri.ns_name)
ns_ipd = ip_lib.IPDevice(sn_int, namespace=ri.ns_name)
- ns_ipd.route.delete_gateway(table=snat_idx)
- ns_ipr.delete(ip_cidr, snat_idx, snat_idx)
+ ns_ipd.route.delete_gateway(gateway, table=snat_idx)
+ ns_ipr.rule.delete(ip_cidr, snat_idx, snat_idx)
except Exception:
LOG.exception(_LE('DVR: removed snat failed'))
# License for the specific language governing permissions and limitations
# under the License.
-import netaddr
from oslo_utils import excutils
from neutron.agent.l3 import dvr_fip_ns
rule_pr = self.fip_ns.allocate_rule_priority()
self.floating_ips_dict[floating_ip] = rule_pr
fip_2_rtr_name = self.fip_ns.get_int_device_name(self.router_id)
- ip_rule = ip_lib.IpRule(namespace=self.ns_name)
- ip_rule.add(fixed_ip, dvr_fip_ns.FIP_RT_TBL, rule_pr)
+ ip_rule = ip_lib.IPRule(namespace=self.ns_name)
+ ip_rule.rule.add(fixed_ip, dvr_fip_ns.FIP_RT_TBL, rule_pr)
#Add routing rule in fip namespace
fip_ns_name = self.fip_ns.get_name()
rtr_2_fip, _ = self.rtr_fip_subnet.get_pair()
fip_ns_name = self.fip_ns.get_name()
if floating_ip in self.floating_ips_dict:
rule_pr = self.floating_ips_dict[floating_ip]
- ip_rule = ip_lib.IpRule(namespace=self.ns_name)
- ip_rule.delete(floating_ip, dvr_fip_ns.FIP_RT_TBL, rule_pr)
+ ip_rule = ip_lib.IPRule(namespace=self.ns_name)
+ ip_rule.rule.delete(floating_ip, dvr_fip_ns.FIP_RT_TBL, rule_pr)
self.fip_ns.deallocate_rule_priority(rule_pr)
#TODO(rajeev): Handle else case - exception/log?
if not port:
return
- ip_cidr = str(ip) + '/32'
try:
# TODO(mrsmith): optimize the calls below for bulk calls
- net = netaddr.IPNetwork(ip_cidr)
interface_name = self.get_internal_device_name(port['id'])
device = ip_lib.IPDevice(interface_name, namespace=self.ns_name)
if operation == 'add':
- device.neigh.add(net.version, ip, mac)
+ device.neigh.add(ip, mac)
elif operation == 'delete':
- device.neigh.delete(net.version, ip, mac)
+ device.neigh.delete(ip, mac)
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("DVR: Failed updating arp entry"))
ipv6_lladdr = self._get_ipv6_lladdr(device.link.address)
if self._should_delete_ipv6_lladdr(ipv6_lladdr):
- device.addr.flush()
+ device.addr.flush(n_consts.IP_VERSION_6)
self._remove_vip(ipv6_lladdr)
self._add_vip(ipv6_lladdr, interface_name, scope='link')
# License for the specific language governing permissions and limitations
# under the License.
-import netaddr
-
from neutron.agent.l3 import namespaces
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager
"""
try:
ip_cidr = common_utils.ip_to_cidr(fip['floating_ip_address'])
- net = netaddr.IPNetwork(ip_cidr)
- device.addr.add(net.version, ip_cidr, str(net.broadcast))
+ device.addr.add(ip_cidr)
return True
except RuntimeError:
# any exception occurred here should cause the floating IP
raise NotImplementedError()
def remove_floating_ip(self, device, ip_cidr):
- net = netaddr.IPNetwork(ip_cidr)
- device.addr.delete(net.version, ip_cidr)
+ device.addr.delete(ip_cidr)
self.driver.delete_conntrack_state(namespace=self.ns_name, ip=ip_cidr)
def get_router_cidrs(self, device):
LOG = logging.getLogger(__name__)
-IPV4 = 4
-IPV6 = 6
UDP = 'udp'
TCP = 'tcp'
DNS_PORT = 53
# The ports that need to be opened when security policies are active
# on the Neutron port used for DHCP. These are provided as a convenience
# for users of this class.
- PORTS = {IPV4: [(UDP, DNS_PORT), (TCP, DNS_PORT), (UDP, DHCPV4_PORT)],
- IPV6: [(UDP, DNS_PORT), (TCP, DNS_PORT), (UDP, DHCPV6_PORT)],
+ PORTS = {constants.IP_VERSION_4:
+ [(UDP, DNS_PORT), (TCP, DNS_PORT), (UDP, DHCPV4_PORT)],
+ constants.IP_VERSION_6:
+ [(UDP, DNS_PORT), (TCP, DNS_PORT), (UDP, DHCPV6_PORT)],
}
_TAG_PREFIX = 'tag%d'
"""
device = ip_lib.IPDevice(device_name, namespace=namespace)
- previous = {}
+ previous = set()
for address in device.addr.list(scope='global', filters=['permanent']):
- previous[address['cidr']] = address['ip_version']
+ previous.add(address['cidr'])
# add new addresses
for ip_cidr in ip_cidrs:
if net.version == 6:
ip_cidr = str(net)
if ip_cidr in previous:
- del previous[ip_cidr]
+ previous.remove(ip_cidr)
continue
- # Make sure the format of this network, if IPv6, is zero-filled.
- # The Linux netaddr library seems to do this by default (bug?),
- # and the test verifies it, but we should force it just in case
- # the behavior changes. It also makes sure that non-Linux-based
- # libraries also work correctly (e.g. OSX).
- device.addr.add(net.version, ip_cidr,
- str(net.broadcast.format(netaddr.ipv6_full)))
+ device.addr.add(ip_cidr)
# clean up any old addresses
- for ip_cidr, ip_version in previous.items():
+ for ip_cidr in previous:
if ip_cidr not in preserve_ips:
- device.addr.delete(ip_version, ip_cidr)
+ device.addr.delete(ip_cidr)
self.delete_conntrack_state(namespace=namespace, ip=ip_cidr)
if gateway:
device.route.add_gateway(gateway)
new_onlink_routes = set(s['cidr'] for s in extra_subnets)
- existing_onlink_routes = set(device.route.list_onlink_routes())
+ existing_onlink_routes = set(
+ device.route.list_onlink_routes(n_const.IP_VERSION_4) +
+ device.route.list_onlink_routes(n_const.IP_VERSION_6))
for route in new_onlink_routes - existing_onlink_routes:
device.route.add_onlink_route(route)
for route in existing_onlink_routes - new_onlink_routes:
return retval
def add_tuntap(self, name, mode='tap'):
- self._as_root('', 'tuntap', ('add', name, 'mode', mode))
+ self._as_root([], 'tuntap', ('add', name, 'mode', mode))
return IPDevice(name, namespace=self.namespace)
def add_veth(self, name1, name2, namespace2=None):
self.ensure_namespace(namespace2)
args += ['netns', namespace2]
- self._as_root('', 'link', tuple(args))
+ self._as_root([], 'link', tuple(args))
return (IPDevice(name1, namespace=self.namespace),
IPDevice(name2, namespace=namespace2))
def del_veth(self, name):
"""Delete a virtual interface between two namespaces."""
- self._as_root('', 'link', ('del', name))
+ self._as_root([], 'link', ('del', name))
def ensure_namespace(self, name):
if not self.netns.exists(name):
cmd.extend(['port', port[0], port[1]])
elif port:
raise exceptions.NetworkVxlanPortRangeError(vxlan_range=port)
- self._as_root('', 'link', cmd)
+ self._as_root([], 'link', cmd)
return (IPDevice(name, namespace=self.namespace))
@classmethod
def get_namespaces(cls):
- output = cls._execute('', 'netns', ('list',))
+ output = cls._execute([], 'netns', ('list',))
return [l.strip() for l in output.split('\n')]
-class IpRule(IPWrapper):
- def _exists(self, ip, ip_version, table, rule_pr):
- # Typical rule from 'ip rule show':
- # 4030201: from 1.2.3.4/24 lookup 10203040
-
- rule_pr = str(rule_pr) + ":"
- for line in self._as_root([ip_version], 'rule', ['show']).splitlines():
- parts = line.split()
- if parts and (parts[0] == rule_pr and
- parts[2] == str(ip) and
- parts[-1] == str(table)):
- return True
-
- return False
-
- def add(self, ip, table, rule_pr):
- ip_version = netaddr.IPNetwork(ip).version
- if not self._exists(ip, ip_version, table, rule_pr):
- args = ['add', 'from', ip, 'table', table, 'priority', rule_pr]
- self._as_root([ip_version], 'rule', tuple(args))
-
- def delete(self, ip, table, rule_pr):
- ip_version = netaddr.IPNetwork(ip).version
- args = ['del', 'table', table, 'priority', rule_pr]
- self._as_root([ip_version], 'rule', tuple(args))
-
-
class IPDevice(SubProcessBase):
def __init__(self, name, namespace=None):
super(IPDevice, self).__init__(namespace=namespace)
def __init__(self, parent):
self._parent = parent
- def _run(self, *args, **kwargs):
- return self._parent._run(kwargs.get('options', []), self.COMMAND, args)
+ def _run(self, options, args):
+ return self._parent._run(options, self.COMMAND, args)
- def _as_root(self, *args, **kwargs):
- return self._parent._as_root(kwargs.get('options', []),
+ def _as_root(self, options, args, use_root_namespace=False):
+ return self._parent._as_root(options,
self.COMMAND,
args,
- kwargs.get('use_root_namespace', False))
+ use_root_namespace=use_root_namespace)
+
+
+class IPRule(SubProcessBase):
+ def __init__(self, namespace=None):
+ super(IPRule, self).__init__(namespace=namespace)
+ self.rule = IpRuleCommand(self)
+
+
+class IpRuleCommand(IpCommandBase):
+ COMMAND = 'rule'
+
+ def _exists(self, ip, ip_version, table, rule_pr):
+ # Typical rule from 'ip rule show':
+ # 4030201: from 1.2.3.4/24 lookup 10203040
+
+ rule_pr = str(rule_pr) + ":"
+ for line in self._as_root([ip_version], ['show']).splitlines():
+ parts = line.split()
+ if parts and (parts[0] == rule_pr and
+ parts[2] == str(ip) and
+ parts[-1] == str(table)):
+ return True
+
+ return False
+
+ def add(self, ip, table, rule_pr):
+ ip_version = get_ip_version(ip)
+ if not self._exists(ip, ip_version, table, rule_pr):
+ args = ['add', 'from', ip, 'table', table, 'priority', rule_pr]
+ self._as_root([ip_version], tuple(args))
+
+ def delete(self, ip, table, rule_pr):
+ ip_version = get_ip_version(ip)
+ args = ['del', 'table', table, 'priority', rule_pr]
+ self._as_root([ip_version], tuple(args))
class IpDeviceCommandBase(IpCommandBase):
COMMAND = 'link'
def set_address(self, mac_address):
- self._as_root('set', self.name, 'address', mac_address)
+ self._as_root([], ('set', self.name, 'address', mac_address))
def set_mtu(self, mtu_size):
- self._as_root('set', self.name, 'mtu', mtu_size)
+ self._as_root([], ('set', self.name, 'mtu', mtu_size))
def set_up(self):
- self._as_root('set', self.name, 'up')
+ self._as_root([], ('set', self.name, 'up'))
def set_down(self):
- self._as_root('set', self.name, 'down')
+ self._as_root([], ('set', self.name, 'down'))
def set_netns(self, namespace):
- self._as_root('set', self.name, 'netns', namespace)
+ self._as_root([], ('set', self.name, 'netns', namespace))
self._parent.namespace = namespace
def set_name(self, name):
- self._as_root('set', self.name, 'name', name)
+ self._as_root([], ('set', self.name, 'name', name))
self._parent.name = name
def set_alias(self, alias_name):
- self._as_root('set', self.name, 'alias', alias_name)
+ self._as_root([], ('set', self.name, 'alias', alias_name))
def delete(self):
- self._as_root('delete', self.name)
+ self._as_root([], ('delete', self.name))
@property
def address(self):
@property
def attributes(self):
- return self._parse_line(self._run('show', self.name, options='o'))
+ return self._parse_line(self._run(['o'], ('show', self.name)))
def _parse_line(self, value):
if not value:
class IpAddrCommand(IpDeviceCommandBase):
COMMAND = 'addr'
- def add(self, ip_version, cidr, broadcast, scope='global'):
- self._as_root('add',
- cidr,
- 'brd',
- broadcast,
- 'scope',
- scope,
- 'dev',
- self.name,
- options=[ip_version])
-
- def delete(self, ip_version, cidr):
- self._as_root('del',
- cidr,
- 'dev',
- self.name,
- options=[ip_version])
-
- def flush(self):
- self._as_root('flush', self.name)
-
- def list(self, scope=None, to=None, filters=None):
- if filters is None:
- filters = []
+ def add(self, cidr, scope='global'):
+ net = netaddr.IPNetwork(cidr)
+ args = ['add', cidr,
+ 'scope', scope,
+ 'dev', self.name]
+ if net.version == 4:
+ args += ['brd', str(net.broadcast)]
+ self._as_root([net.version], tuple(args))
+
+ def delete(self, cidr):
+ ip_version = get_ip_version(cidr)
+ self._as_root([ip_version],
+ ('del', cidr,
+ 'dev', self.name))
+
+ def flush(self, ip_version):
+ self._as_root([ip_version], ('flush', self.name))
+
+ def list(self, scope=None, to=None, filters=None, ip_version=None):
+ options = [ip_version] if ip_version else []
+ args = ['show', self.name]
+ if filters:
+ args += filters
retval = []
if scope:
- filters += ['scope', scope]
+ args += ['scope', scope]
if to:
- filters += ['to', to]
+ args += ['to', to]
- for line in self._run('show', self.name, *filters).split('\n'):
+ for line in self._run(options, tuple(args)).split('\n'):
line = line.strip()
if not line.startswith('inet'):
continue
parts = line.split()
if parts[0] == 'inet6':
- version = 6
scope = parts[3]
- broadcast = '::'
else:
- version = 4
if parts[2] == 'brd':
- broadcast = parts[3]
scope = parts[5]
else:
- # sometimes output of 'ip a' might look like:
- # inet 192.168.100.100/24 scope global eth0
- # and broadcast needs to be calculated from CIDR
- broadcast = str(netaddr.IPNetwork(parts[1]).broadcast)
scope = parts[3]
retval.append(dict(cidr=parts[1],
- broadcast=broadcast,
scope=scope,
- ip_version=version,
dynamic=('dynamic' == parts[-1])))
return retval
COMMAND = 'route'
def add_gateway(self, gateway, metric=None, table=None):
+ ip_version = get_ip_version(gateway)
args = ['replace', 'default', 'via', gateway]
if metric:
args += ['metric', metric]
args += ['dev', self.name]
if table:
args += ['table', table]
- self._as_root(*args)
+ self._as_root([ip_version], tuple(args))
- def delete_gateway(self, gateway=None, table=None):
- args = ['del', 'default']
- if gateway:
- args += ['via', gateway]
- args += ['dev', self.name]
+ def delete_gateway(self, gateway, table=None):
+ ip_version = get_ip_version(gateway)
+ args = ['del', 'default',
+ 'via', gateway,
+ 'dev', self.name]
if table:
args += ['table', table]
- self._as_root(*args)
+ self._as_root([ip_version], tuple(args))
- def list_onlink_routes(self):
+ def list_onlink_routes(self, ip_version):
def iterate_routes():
- output = self._run('list', 'dev', self.name, 'scope', 'link')
+ output = self._run([ip_version],
+ ('list',
+ 'dev', self.name,
+ 'scope', 'link'))
for line in output.split('\n'):
line = line.strip()
if line and not line.count('src'):
return [x for x in iterate_routes()]
def add_onlink_route(self, cidr):
- self._as_root('replace', cidr, 'dev', self.name, 'scope', 'link')
+ ip_version = get_ip_version(cidr)
+ self._as_root([ip_version],
+ ('replace', cidr,
+ 'dev', self.name,
+ 'scope', 'link'))
def delete_onlink_route(self, cidr):
- self._as_root('del', cidr, 'dev', self.name, 'scope', 'link')
+ ip_version = get_ip_version(cidr)
+ self._as_root([ip_version],
+ ('del', cidr,
+ 'dev', self.name,
+ 'scope', 'link'))
+
+ def get_gateway(self, scope=None, filters=None, ip_version=None):
+ options = [ip_version] if ip_version else []
- def get_gateway(self, scope=None, filters=None):
- if filters is None:
- filters = []
+ args = ['list', 'dev', self.name]
+ if filters:
+ args += filters
retval = None
if scope:
- filters += ['scope', scope]
+ args += ['scope', scope]
- route_list_lines = self._run('list', 'dev', self.name,
- *filters).split('\n')
+ route_list_lines = self._run(options, tuple(args)).split('\n')
default_route_line = next((x.strip() for x in
route_list_lines if
x.strip().startswith('default')), None)
others on the same subnet.
"""
device_list = []
- device_route_list_lines = self._run('list', 'proto', 'kernel',
- 'dev', interface_name).split('\n')
+ device_route_list_lines = self._run([],
+ ('list',
+ 'proto', 'kernel',
+ 'dev', interface_name)
+ ).split('\n')
for device_route_line in device_route_list_lines:
try:
subnet = device_route_line.split()[0]
except Exception:
continue
- subnet_route_list_lines = self._run('list', 'proto', 'kernel',
- 'match', subnet).split('\n')
+ subnet_route_list_lines = self._run([],
+ ('list',
+ 'proto', 'kernel',
+ 'match', subnet)
+ ).split('\n')
for subnet_route_line in subnet_route_list_lines:
i = iter(subnet_route_line.split())
while(i.next() != 'dev'):
break
for (device, src) in device_list:
- self._as_root('del', subnet, 'dev', device)
+ self._as_root([], ('del', subnet, 'dev', device))
if (src != ''):
- self._as_root('append', subnet, 'proto', 'kernel',
- 'src', src, 'dev', device)
+ self._as_root([],
+ ('append', subnet,
+ 'proto', 'kernel',
+ 'src', src,
+ 'dev', device))
else:
- self._as_root('append', subnet, 'proto', 'kernel',
- 'dev', device)
+ self._as_root([],
+ ('append', subnet,
+ 'proto', 'kernel',
+ 'dev', device))
def add_route(self, cidr, ip, table=None):
+ ip_version = get_ip_version(cidr)
args = ['replace', cidr, 'via', ip, 'dev', self.name]
if table:
args += ['table', table]
- self._as_root(*args)
+ self._as_root([ip_version], tuple(args))
def delete_route(self, cidr, ip, table=None):
+ ip_version = get_ip_version(cidr)
args = ['del', cidr, 'via', ip, 'dev', self.name]
if table:
args += ['table', table]
- self._as_root(*args)
+ self._as_root([ip_version], tuple(args))
class IpNeighCommand(IpDeviceCommandBase):
COMMAND = 'neigh'
- def add(self, ip_version, ip_address, mac_address):
- self._as_root('replace',
- ip_address,
- 'lladdr',
- mac_address,
- 'nud',
- 'permanent',
- 'dev',
- self.name,
- options=[ip_version])
-
- def delete(self, ip_version, ip_address, mac_address):
- self._as_root('del',
- ip_address,
- 'lladdr',
- mac_address,
- 'dev',
- self.name,
- options=[ip_version])
+ def add(self, ip_address, mac_address):
+ ip_version = get_ip_version(ip_address)
+ self._as_root([ip_version],
+ ('replace', ip_address,
+ 'lladdr', mac_address,
+ 'nud', 'permanent',
+ 'dev', self.name))
+
+ def delete(self, ip_address, mac_address):
+ ip_version = get_ip_version(ip_address)
+ self._as_root([ip_version],
+ ('del', ip_address,
+ 'lladdr', mac_address,
+ 'dev', self.name))
class IpNetnsCommand(IpCommandBase):
COMMAND = 'netns'
def add(self, name):
- self._as_root('add', name, use_root_namespace=True)
+ self._as_root([], ('add', name), use_root_namespace=True)
wrapper = IPWrapper(namespace=name)
wrapper.netns.execute(['sysctl', '-w',
'net.ipv4.conf.all.promote_secondaries=1'])
return wrapper
def delete(self, name):
- self._as_root('delete', name, use_root_namespace=True)
+ self._as_root([], ('delete', name), use_root_namespace=True)
def execute(self, cmds, addl_env=None, check_exit_code=True,
extra_ok_codes=None):
def exists(self, name):
output = self._parent._execute(
- 'o', 'netns', ['list'],
+ ['o'], 'netns', ['list'],
run_as_root=cfg.CONF.AGENT.use_helper_for_ns_read)
for line in output.split('\n'):
if name == line.strip():
def send_gratuitous_arp(ns_name, iface_name, address, count):
- """Send a gratuitous arp using given namespace, interface, and address"""
+ """Send a gratuitous arp using given namespace, interface, and address."""
def arping():
_arping(ns_name, iface_name, address, count)
# Configure the address on the interface
device = IPDevice(iface_name, namespace=ns_name)
net = netaddr.IPNetwork(str(address))
- device.addr.add(net.version, str(net), str(net.broadcast))
+ device.addr.add(str(net))
_arping(ns_name, iface_name, address, count)
# Delete the address from the interface
- device.addr.delete(net.version, str(net))
+ device.addr.delete(str(net))
if count > 0:
eventlet.spawn_n(arping_with_temporary_address)
"""Add an optional namespace to the command."""
return ['ip', 'netns', 'exec', namespace] + cmd if namespace else cmd
+
+
+def get_ip_version(ip_or_cidr):
+ return netaddr.IPNetwork(ip_or_cidr).version
IPv4 = 'IPv4'
IPv6 = 'IPv6'
+IP_VERSION_4 = 4
+IP_VERSION_6 = 6
IPv4_BITS = 32
IPv6_BITS = 128
# Append IP's to bridge if necessary
if ips:
for ip in ips:
- dst_device.addr.add(ip_version=ip['ip_version'],
- cidr=ip['cidr'],
- broadcast=ip['broadcast'])
+ dst_device.addr.add(cidr=ip['cidr'])
if gateway:
# Ensure that the gateway can be updated by changing the metric
# Remove IP's from interface
if ips:
for ip in ips:
- src_device.addr.delete(ip_version=ip['ip_version'],
- cidr=ip['cidr'])
+ src_device.addr.delete(cidr=ip['cidr'])
def _bridge_exists_and_ensure_up(self, bridge_name):
"""Check if the bridge exists and make sure it is up."""
"""
net = netaddr.IPNetwork(ip_cidr)
port_dev = self.create_ovs_port_in_ns(br, namespace)
- port_dev.addr.add(net.version, str(net), net.broadcast)
+ port_dev.addr.add(str(net))
return port_dev
class BaseIPVethTestCase(BaseLinuxTestCase):
SRC_ADDRESS = '192.168.0.1'
DST_ADDRESS = '192.168.0.2'
- BROADCAST_ADDRESS = '192.168.0.255'
@staticmethod
- def _set_ip_up(device, cidr, broadcast, ip_version=4):
- device.addr.add(ip_version=ip_version, cidr=cidr, broadcast=broadcast)
+ def _set_ip_up(device, cidr):
+ device.addr.add(cidr)
device.link.set_up()
def prepare_veth_pairs(self):
dst_veth,
dst_ns.namespace)
- self._set_ip_up(src_veth, '%s/24' % src_addr, self.BROADCAST_ADDRESS)
- self._set_ip_up(dst_veth, '%s/24' % dst_addr, self.BROADCAST_ADDRESS)
+ self._set_ip_up(src_veth, '%s/24' % src_addr)
+ self._set_ip_up(dst_veth, '%s/24' % dst_addr)
return src_ns, dst_ns
self.monitor.start()
cidr = '169.254.128.1/24'
- self.device.addr.add(4, cidr, '169.254.128.255')
+ self.device.addr.add(cidr)
self._assert_event(expected_name=self.device.name,
expected_cidr=cidr,
expected_added=True,
event=ip_monitor.IPMonitorEvent.from_text(
next(self.monitor.iter_stdout(block=True))))
- self.device.addr.delete(4, cidr)
+ self.device.addr.delete(cidr)
self._assert_event(expected_name=self.device.name,
expected_cidr=cidr,
expected_added=False,
@mock.patch.object(ip_lib, 'send_garp_for_proxyarp')
@mock.patch.object(ip_lib, 'IPDevice')
- @mock.patch.object(ip_lib, 'IpRule')
- def test_floating_ip_added_dist(self, mIpRule, mIPDevice, mock_arp):
+ @mock.patch.object(ip_lib, 'IPRule')
+ def test_floating_ip_added_dist(self, mIPRule, mIPDevice, mock_arp):
router = mock.MagicMock()
ri = self._create_router(router)
ext_net_id = _uuid()
ri.dist_fip_count = 0
ip_cidr = common_utils.ip_to_cidr(fip['floating_ip_address'])
ri.floating_ip_added_dist(fip, ip_cidr)
- mIpRule().add.assert_called_with('192.168.0.1', 16, FIP_PRI)
+ mIPRule().rule.add.assert_called_with('192.168.0.1', 16, FIP_PRI)
self.assertEqual(1, ri.dist_fip_count)
# TODO(mrsmith): add more asserts
@mock.patch.object(ip_lib, 'IPWrapper')
@mock.patch.object(ip_lib, 'IPDevice')
- @mock.patch.object(ip_lib, 'IpRule')
- def test_floating_ip_removed_dist(self, mIpRule, mIPDevice, mIPWrapper):
+ @mock.patch.object(ip_lib, 'IPRule')
+ def test_floating_ip_removed_dist(self, mIPRule, mIPDevice, mIPWrapper):
router = mock.MagicMock()
ri = self._create_router(router)
s = lla.LinkLocalAddressPair('169.254.30.42/31')
ri.rtr_fip_subnet = s
ri.floating_ip_removed_dist(fip_cidr)
- mIpRule().delete.assert_called_with(
+ mIPRule().rule.delete.assert_called_with(
str(netaddr.IPNetwork(fip_cidr).ip), 16, FIP_PRI)
mIPDevice().route.delete_route.assert_called_with(fip_cidr, str(s.ip))
self.assertFalse(ri.fip_ns.unsubscribe.called)
result = ri._add_fip_addr_to_device(
{'id': mock.sentinel.id, 'floating_ip_address': ip}, device)
- device.addr.add.assert_called_with(4, ip + '/32', ip)
+ device.addr.add.assert_called_with(ip + '/32')
return result
def test__add_fip_addr_to_device(self):
ri.remove_floating_ip(device, cidr)
- device.addr.delete.assert_called_once_with(4, cidr)
+ device.addr.delete.assert_called_once_with(cidr)
self.driver.delete_conntrack_state.assert_called_once_with(
ip=cidr,
namespace=ri.ns_name)
self.mock_ip = mock.MagicMock()
ip_cls.return_value = self.mock_ip
- ip_rule = mock.patch('neutron.agent.linux.ip_lib.IpRule').start()
+ ip_rule = mock.patch('neutron.agent.linux.ip_lib.IPRule').start()
self.mock_rule = mock.MagicMock()
ip_rule.return_value = self.mock_rule
'network_id': _uuid(),
'mac_address': 'ca:fe:de:ad:be:ef',
'ip_cidr': '20.0.0.31/24'}
+ ex_gw_port = {'fixed_ips': [{'ip_address': '20.0.0.30',
+ 'subnet_id': _uuid()}],
+ 'subnet': {'gateway_ip': '20.0.0.1'},
+ 'extra_subnets': [{'cidr': '172.16.0.0/24'}],
+ 'id': _uuid(),
+ 'binding:host_id': HOSTNAME,
+ 'network_id': _uuid(),
+ 'mac_address': 'ca:fe:de:ad:be:ef',
+ 'ip_cidr': '20.0.0.30/24'}
+ ri.snat_ports = sn_port
+ ri.ex_gw_port = ex_gw_port
+ ri.snat_namespace = mock.Mock()
if action == 'add':
self.device_exists.return_value = False
sn_port['mac_address'],
agent.get_snat_int_device_name(sn_port['id']),
dvr_snat_ns.SNAT_INT_DEV_PREFIX)
+ elif action == 'remove':
+ self.device_exists.return_value = False
+ agent._map_internal_interfaces = mock.Mock(return_value=sn_port)
+ agent._snat_redirect_remove = mock.Mock()
+ agent.internal_network_removed(ri, port)
+ agent._snat_redirect_remove.assert_called_with(
+ ri,
+ sn_port['fixed_ips'][0]['ip_address'],
+ port,
+ ri.get_internal_device_name(port['id']))
def test_agent_add_internal_network(self):
self._test_internal_network_action('add')
def test_agent_remove_internal_network(self):
self._test_internal_network_action('remove')
+ def test_agent_remove_internal_network_dist(self):
+ self._test_internal_network_action_dist('remove')
+
def _test_external_gateway_action(self, action, router):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
ex_net_id = _uuid()
+ sn_port = self.snat_ports[1]
# Special setup for dvr routers
if router.get('distributed'):
agent.conf.agent_mode = 'dvr_snat'
**self.ri_kwargs)
ri.create_snat_namespace()
ri.fip_ns = agent.get_fip_ns(ex_net_id)
+ ri.internal_ports = self.snat_ports
else:
ri = l3router.RouterInfo(
router['id'], router,
elif action == 'remove':
self.device_exists.return_value = True
+ agent._map_internal_interfaces = mock.Mock(return_value=sn_port)
+ agent._snat_redirect_remove = mock.Mock()
agent.external_gateway_removed(ri, ex_gw_port, interface_name)
- self.mock_driver.unplug.assert_called_once_with(
- interface_name,
- bridge=agent.conf.external_network_bridge,
- namespace=mock.ANY,
- prefix=mock.ANY)
+ if not router.get('distributed'):
+ self.mock_driver.unplug.assert_called_once_with(
+ interface_name,
+ bridge=agent.conf.external_network_bridge,
+ namespace=mock.ANY,
+ prefix=mock.ANY)
+ else:
+ agent._snat_redirect_remove.assert_called_with(
+ ri,
+ sn_port['fixed_ips'][0]['ip_address'],
+ sn_port,
+ ri.get_internal_device_name(sn_port['id']))
else:
raise Exception("Invalid action %s" % action)
ports[0]['subnet']['id'] = _get_subnet_id(ports[0])
ri._set_subnet_arp_info(ports[0])
self.mock_ip_dev.neigh.add.assert_called_once_with(
- 4, '1.2.3.4', '00:11:22:33:44:55')
+ '1.2.3.4', '00:11:22:33:44:55')
# Test negative case
router['distributed'] = False
agent.add_arp_entry(None, payload)
agent.router_deleted(None, router['id'])
self.mock_ip_dev.neigh.add.assert_called_once_with(
- 4, '1.7.23.11', '00:11:22:33:44:55')
+ '1.7.23.11', '00:11:22:33:44:55')
def test_add_arp_entry_no_routerinfo(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
# now delete it
agent.del_arp_entry(None, payload)
self.mock_ip_dev.neigh.delete.assert_called_once_with(
- 4, '1.5.25.15', '00:44:33:22:11:55')
+ '1.5.25.15', '00:44:33:22:11:55')
agent.router_deleted(None, router['id'])
def test_process_cent_router(self):
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib
from neutron.agent.linux import utils
+from neutron.common import constants
from neutron.extensions import flavor
from neutron.openstack.common import uuidutils
from neutron.tests import base
self.assertEqual('tapabcdef01-12', device_name)
def test_l3_init(self):
- addresses = [dict(ip_version=4, scope='global',
+ addresses = [dict(scope='global',
dynamic=False, cidr='172.16.77.240/24')]
self.ip_dev().addr.list = mock.Mock(return_value=addresses)
self.ip_dev().route.list_onlink_routes.return_value = []
self.ip_dev.assert_has_calls(
[mock.call('tap0', namespace=ns),
mock.call().addr.list(scope='global', filters=['permanent']),
- mock.call().addr.add(4, '192.168.1.2/24', '192.168.1.255'),
- mock.call().addr.delete(4, '172.16.77.240/24'),
- mock.call().route.list_onlink_routes(),
+ mock.call().addr.add('192.168.1.2/24'),
+ mock.call().addr.delete('172.16.77.240/24'),
+ mock.call().route.list_onlink_routes(constants.IP_VERSION_4),
+ mock.call().route.list_onlink_routes(constants.IP_VERSION_6),
mock.call().route.add_onlink_route('172.20.0.0/24')])
def test_l3_init_delete_onlink_routes(self):
- addresses = [dict(ip_version=4, scope='global',
+ addresses = [dict(scope='global',
dynamic=False, cidr='172.16.77.240/24')]
self.ip_dev().addr.list = mock.Mock(return_value=addresses)
self.ip_dev().route.list_onlink_routes.return_value = ['172.20.0.0/24']
ns = '12345678-1234-5678-90ab-ba0987654321'
bc.init_l3('tap0', ['192.168.1.2/24'], namespace=ns)
self.ip_dev.assert_has_calls(
- [mock.call().route.list_onlink_routes(),
+ [mock.call().route.list_onlink_routes(constants.IP_VERSION_4),
+ mock.call().route.list_onlink_routes(constants.IP_VERSION_6),
mock.call().route.delete_onlink_route('172.20.0.0/24')])
def test_l3_init_with_preserve(self):
- addresses = [dict(ip_version=4, scope='global',
+ addresses = [dict(scope='global',
dynamic=False, cidr='192.168.1.3/32')]
self.ip_dev().addr.list = mock.Mock(return_value=addresses)
self.ip_dev.assert_has_calls(
[mock.call('tap0', namespace=ns),
mock.call().addr.list(scope='global', filters=['permanent']),
- mock.call().addr.add(4, '192.168.1.2/24', '192.168.1.255')])
+ mock.call().addr.add('192.168.1.2/24')])
self.assertFalse(self.ip_dev().addr.delete.called)
def test_l3_init_with_ipv6(self):
- addresses = [dict(ip_version=6,
- scope='global',
+ addresses = [dict(scope='global',
dynamic=False,
cidr='2001:db8:a::123/64')]
self.ip_dev().addr.list = mock.Mock(return_value=addresses)
self.ip_dev.assert_has_calls(
[mock.call('tap0', namespace=ns),
mock.call().addr.list(scope='global', filters=['permanent']),
- mock.call().addr.add(6, '2001:db8:a::124/64',
- '2001:db8:a:0:ffff:ffff:ffff:ffff'),
- mock.call().addr.delete(6, '2001:db8:a::123/64'),
- mock.call().route.list_onlink_routes(),
+ mock.call().addr.add('2001:db8:a::124/64'),
+ mock.call().addr.delete('2001:db8:a::123/64'),
+ mock.call().route.list_onlink_routes(constants.IP_VERSION_4),
+ mock.call().route.list_onlink_routes(constants.IP_VERSION_6),
mock.call().route.add_onlink_route('2001:db8:b::/64')])
+ def test_l3_init_with_ipv6_delete_onlink_routes(self):
+ addresses = [dict(scope='global',
+ dynamic=False, cidr='2001:db8:a::123/64')]
+ route = '2001:db8:a::/64'
+ self.ip_dev().addr.list = mock.Mock(return_value=addresses)
+ self.ip_dev().route.list_onlink_routes.return_value = [route]
+
+ bc = BaseChild(self.conf)
+ ns = '12345678-1234-5678-90ab-ba0987654321'
+ bc.init_l3('tap0', ['2001:db8:a::124/64'], namespace=ns)
+ self.ip_dev.assert_has_calls(
+ [mock.call().route.list_onlink_routes(constants.IP_VERSION_4),
+ mock.call().route.list_onlink_routes(constants.IP_VERSION_6),
+ mock.call().route.delete_onlink_route(route)])
+
def test_l3_init_with_duplicated_ipv6(self):
- addresses = [dict(ip_version=6,
- scope='global',
+ addresses = [dict(scope='global',
dynamic=False,
cidr='2001:db8:a::123/64')]
self.ip_dev().addr.list = mock.Mock(return_value=addresses)
self.assertFalse(self.ip_dev().addr.add.called)
def test_l3_init_with_duplicated_ipv6_uncompact(self):
- addresses = [dict(ip_version=6,
- scope='global',
+ addresses = [dict(scope='global',
dynamic=False,
cidr='2001:db8:a::123/64')]
self.ip_dev().addr.list = mock.Mock(return_value=addresses)
default via 192.168.99.1 proto static metric 100
""")
+IPv6_GATEWAY_SAMPLE1 = ("""
+default via 2001:470:9:1224:4508:b885:5fb:740b metric 100
+2001:db8::/64 proto kernel scope link src 2001:470:9:1224:dfcc:aaff:feb9:76ce
+""")
+
+IPv6_GATEWAY_SAMPLE2 = ("""
+default via 2001:470:9:1224:4508:b885:5fb:740b metric 100
+""")
+
+IPv6_GATEWAY_SAMPLE3 = ("""
+2001:db8::/64 proto kernel scope link src 2001:470:9:1224:dfcc:aaff:feb9:76ce
+""")
+
+IPv6_GATEWAY_SAMPLE4 = ("""
+default via fe80::dfcc:aaff:feb9:76ce
+""")
+
+IPv6_GATEWAY_SAMPLE5 = ("""
+default via 2001:470:9:1224:4508:b885:5fb:740b metric 1024
+""")
+
DEVICE_ROUTE_SAMPLE = ("10.0.0.0/24 scope link src 10.0.0.2")
SUBNET_SAMPLE1 = ("10.0.0.0/24 dev qr-23380d11-d2 scope link src 10.0.0.1\n"
self.execute = self.execute_p.start()
def test_execute_wrapper(self):
- ip_lib.SubProcessBase._execute('o', 'link', ('list',),
+ ip_lib.SubProcessBase._execute(['o'], 'link', ('list',),
run_as_root=True)
self.execute.assert_called_once_with(['ip', '-o', 'link', 'list'],
'bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb',
'cccccccc-cccc-cccc-cccc-cccccccccccc'])
- self.execute.assert_called_once_with('', 'netns', ('list',))
+ self.execute.assert_called_once_with([], 'netns', ('list',))
def test_add_tuntap(self):
ip_lib.IPWrapper().add_tuntap('tap0')
- self.execute.assert_called_once_with('', 'tuntap',
+ self.execute.assert_called_once_with([], 'tuntap',
('add', 'tap0', 'mode', 'tap'),
run_as_root=True, namespace=None,
log_fail_as_error=True)
def test_add_veth(self):
ip_lib.IPWrapper().add_veth('tap0', 'tap1')
- self.execute.assert_called_once_with('', 'link',
+ self.execute.assert_called_once_with([], 'link',
('add', 'tap0', 'type', 'veth',
'peer', 'name', 'tap1'),
run_as_root=True, namespace=None,
def test_del_veth(self):
ip_lib.IPWrapper().del_veth('fpr-1234')
- self.execute.assert_called_once_with('', 'link',
+ self.execute.assert_called_once_with([], 'link',
('del', 'fpr-1234'),
run_as_root=True, namespace=None,
log_fail_as_error=True)
with mock.patch.object(ip_lib.IPWrapper, 'ensure_namespace') as en:
ip_lib.IPWrapper().add_veth('tap0', 'tap1', namespace2=ns2)
en.assert_has_calls([mock.call(ns2)])
- self.execute.assert_called_once_with('', 'link',
+ self.execute.assert_called_once_with([], 'link',
('add', 'tap0', 'type', 'veth',
'peer', 'name', 'tap1',
'netns', ns2),
port=('1', '2'))
self.assertIsInstance(retval, ip_lib.IPDevice)
self.assertEqual(retval.name, 'vxlan0')
- self.execute.assert_called_once_with('', 'link',
+ self.execute.assert_called_once_with([], 'link',
['add', 'vxlan0', 'type',
'vxlan', 'id', 'vni0', 'group',
'group0', 'dev', 'dev0',
self.assertEqual(dev.mock_calls, [])
-class TestIpRule(base.BaseTestCase):
- def setUp(self):
- super(TestIpRule, self).setUp()
- self.execute_p = mock.patch.object(ip_lib.IpRule, '_execute')
- self.execute = self.execute_p.start()
-
- def _test_add_rule(self, ip, table, priority):
- ip_version = netaddr.IPNetwork(ip).version
- ip_lib.IpRule().add(ip, table, priority)
- call_1 = mock.call([ip_version], 'rule', ['show'],
- run_as_root=True, namespace=None,
- log_fail_as_error=True)
- call_2 = mock.call().splitlines()
- # This is for call().splitlines().__iter__(), which can't be mocked
- call_3 = mock.ANY
- call_4 = mock.call([ip_version], 'rule',
- ('add', 'from', ip,
- 'table', table, 'priority', priority),
- run_as_root=True, namespace=None,
- log_fail_as_error=True)
- self.execute.assert_has_calls([call_1, call_2, call_3, call_4])
-
- def _test_add_rule_exists(self, ip, table, priority, output):
- self.execute.return_value = output
- ip_version = netaddr.IPNetwork(ip).version
- ip_lib.IpRule().add(ip, table, priority)
- self.execute.assert_called_once_with([ip_version], 'rule',
- ['show'],
- run_as_root=True, namespace=None,
- log_fail_as_error=True)
-
- def _test_delete_rule(self, ip, table, priority):
- ip_version = netaddr.IPNetwork(ip).version
- ip_lib.IpRule().delete(ip, table, priority)
- self.execute.assert_called_once_with([ip_version], 'rule',
- ('del', 'table', table,
- 'priority', priority),
- run_as_root=True, namespace=None,
- log_fail_as_error=True)
-
- def test_add_rule_v4(self):
- self._test_add_rule('192.168.45.100', 2, 100)
-
- def test_add_rule_v4_exists(self):
- self._test_add_rule_exists('192.168.45.100', 2, 101, RULE_V4_SAMPLE)
-
- def test_add_rule_v6(self):
- self._test_add_rule('2001:db8::1', 3, 200)
-
- def test_add_rule_v6_exists(self):
- self._test_add_rule_exists('2001:db8::1', 3, 201, RULE_V6_SAMPLE)
-
- def test_delete_rule_v4(self):
- self._test_delete_rule('192.168.45.100', 2, 100)
-
- def test_delete_rule_v6(self):
- self._test_delete_rule('2001:db8::1', 3, 200)
-
-
class TestIPDevice(base.BaseTestCase):
def test_eq_same_name(self):
dev1 = ip_lib.IPDevice('tap0')
self.ip_cmd.COMMAND = 'foo'
def test_run(self):
- self.ip_cmd._run('link', 'show')
+ self.ip_cmd._run([], ('link', 'show'))
self.ip.assert_has_calls([mock.call._run([], 'foo', ('link', 'show'))])
def test_run_with_options(self):
- self.ip_cmd._run('link', options='o')
- self.ip.assert_has_calls([mock.call._run('o', 'foo', ('link', ))])
+ self.ip_cmd._run(['o'], ('link'))
+ self.ip.assert_has_calls([mock.call._run(['o'], 'foo', ('link'))])
- def test_as_root(self):
- self.ip_cmd._as_root('link')
+ def test_as_root_namespace_false(self):
+ self.ip_cmd._as_root([], ('link'))
self.ip.assert_has_calls(
- [mock.call._as_root([], 'foo', ('link', ), False)])
+ [mock.call._as_root([],
+ 'foo',
+ ('link'),
+ use_root_namespace=False)])
- def test_as_root_with_options(self):
- self.ip_cmd._as_root('link', options='o')
+ def test_as_root_namespace_true(self):
+ self.ip_cmd._as_root([], ('link'), use_root_namespace=True)
self.ip.assert_has_calls(
- [mock.call._as_root('o', 'foo', ('link', ), False)])
+ [mock.call._as_root([],
+ 'foo',
+ ('link'),
+ use_root_namespace=True)])
+
+ def test_as_root_namespace_true_with_options(self):
+ self.ip_cmd._as_root('o', 'link', use_root_namespace=True)
+ self.ip.assert_has_calls(
+ [mock.call._as_root('o',
+ 'foo',
+ ('link'),
+ use_root_namespace=True)])
class TestIPDeviceCommandBase(base.BaseTestCase):
self.parent.assert_has_calls([
mock.call._run(options, self.command, args)])
- def _assert_sudo(self, options, args, force_root_namespace=False):
+ def _assert_sudo(self, options, args, use_root_namespace=False):
self.parent.assert_has_calls(
[mock.call._as_root(options, self.command, args,
- force_root_namespace)])
+ use_root_namespace=use_root_namespace)])
+
+
+class TestIpRuleCommand(TestIPCmdBase):
+ def setUp(self):
+ super(TestIpRuleCommand, self).setUp()
+ self.parent._as_root.return_value = ''
+ self.command = 'rule'
+ self.rule_cmd = ip_lib.IpRuleCommand(self.parent)
+
+ def _test_add_rule(self, ip, table, priority):
+ ip_version = netaddr.IPNetwork(ip).version
+ self.rule_cmd.add(ip, table, priority)
+ self._assert_sudo([ip_version], (['show']))
+ self._assert_sudo([ip_version], ('add', 'from', ip,
+ 'table', table, 'priority', priority))
+
+ def _test_add_rule_exists(self, ip, table, priority, output):
+ self.parent._as_root.return_value = output
+ ip_version = netaddr.IPNetwork(ip).version
+ self.rule_cmd.add(ip, table, priority)
+ self._assert_sudo([ip_version], (['show']))
+
+ def _test_delete_rule(self, ip, table, priority):
+ ip_version = netaddr.IPNetwork(ip).version
+ self.rule_cmd.delete(ip, table, priority)
+ self._assert_sudo([ip_version],
+ ('del', 'table', table,
+ 'priority', priority))
+
+ def test_add_rule_v4(self):
+ self._test_add_rule('192.168.45.100', 2, 100)
+
+ def test_add_rule_v4_exists(self):
+ self._test_add_rule_exists('192.168.45.100', 2, 101, RULE_V4_SAMPLE)
+
+ def test_add_rule_v6(self):
+ self._test_add_rule('2001:db8::1', 3, 200)
+
+ def test_add_rule_v6_exists(self):
+ self._test_add_rule_exists('2001:db8::1', 3, 201, RULE_V6_SAMPLE)
+
+ def test_delete_rule_v4(self):
+ self._test_delete_rule('192.168.45.100', 2, 100)
+
+ def test_delete_rule_v6(self):
+ self._test_delete_rule('2001:db8::1', 3, 200)
class TestIpLinkCommand(TestIPCmdBase):
'alias': 'openvswitch'}
self.parent._execute = mock.Mock(return_value=LINK_SAMPLE[1])
self.assertEqual(self.link_cmd.attributes, expected)
- self._assert_call('o', ('show', 'eth0'))
+ self._assert_call(['o'], ('show', 'eth0'))
class TestIpAddrCommand(TestIPCmdBase):
self.addr_cmd = ip_lib.IpAddrCommand(self.parent)
def test_add_address(self):
- self.addr_cmd.add(4, '192.168.45.100/24', '192.168.45.255')
+ self.addr_cmd.add('192.168.45.100/24')
self._assert_sudo([4],
- ('add', '192.168.45.100/24', 'brd', '192.168.45.255',
- 'scope', 'global', 'dev', 'tap0'))
+ ('add', '192.168.45.100/24',
+ 'scope', 'global',
+ 'dev', 'tap0',
+ 'brd', '192.168.45.255'))
def test_add_address_scoped(self):
- self.addr_cmd.add(4, '192.168.45.100/24', '192.168.45.255',
- scope='link')
+ self.addr_cmd.add('192.168.45.100/24', scope='link')
self._assert_sudo([4],
- ('add', '192.168.45.100/24', 'brd', '192.168.45.255',
- 'scope', 'link', 'dev', 'tap0'))
+ ('add', '192.168.45.100/24',
+ 'scope', 'link',
+ 'dev', 'tap0',
+ 'brd', '192.168.45.255'))
def test_del_address(self):
- self.addr_cmd.delete(4, '192.168.45.100/24')
+ self.addr_cmd.delete('192.168.45.100/24')
self._assert_sudo([4],
('del', '192.168.45.100/24', 'dev', 'tap0'))
def test_flush(self):
- self.addr_cmd.flush()
- self._assert_sudo([], ('flush', 'tap0'))
+ self.addr_cmd.flush(6)
+ self._assert_sudo([6], ('flush', 'tap0'))
def test_list(self):
expected = [
- dict(ip_version=4, scope='global',
- dynamic=False, cidr='172.16.77.240/24',
- broadcast='172.16.77.255'),
- dict(ip_version=6, scope='global',
- dynamic=True, cidr='2001:470:9:1224:5595:dd51:6ba2:e788/64',
- broadcast='::'),
- dict(ip_version=6, scope='global',
- dynamic=True, cidr='2001:470:9:1224:fd91:272:581e:3a32/64',
- broadcast='::'),
- dict(ip_version=6, scope='global',
- dynamic=True, cidr='2001:470:9:1224:4508:b885:5fb:740b/64',
- broadcast='::'),
- dict(ip_version=6, scope='global',
- dynamic=True, cidr='2001:470:9:1224:dfcc:aaff:feb9:76ce/64',
- broadcast='::'),
- dict(ip_version=6, scope='link',
- dynamic=False, cidr='fe80::dfcc:aaff:feb9:76ce/64',
- broadcast='::')]
+ dict(scope='global',
+ dynamic=False, cidr='172.16.77.240/24'),
+ dict(scope='global',
+ dynamic=True, cidr='2001:470:9:1224:5595:dd51:6ba2:e788/64'),
+ dict(scope='global',
+ dynamic=True, cidr='2001:470:9:1224:fd91:272:581e:3a32/64'),
+ dict(scope='global',
+ dynamic=True, cidr='2001:470:9:1224:4508:b885:5fb:740b/64'),
+ dict(scope='global',
+ dynamic=True, cidr='2001:470:9:1224:dfcc:aaff:feb9:76ce/64'),
+ dict(scope='link',
+ dynamic=False, cidr='fe80::dfcc:aaff:feb9:76ce/64')]
test_cases = [ADDR_SAMPLE, ADDR_SAMPLE2]
def test_list_filtered(self):
expected = [
- dict(ip_version=4, scope='global',
- dynamic=False, cidr='172.16.77.240/24',
- broadcast='172.16.77.255')]
+ dict(scope='global',
+ dynamic=False, cidr='172.16.77.240/24')]
test_cases = [ADDR_SAMPLE, ADDR_SAMPLE2]
self.parent.name = 'eth0'
self.command = 'route'
self.route_cmd = ip_lib.IpRouteCommand(self.parent)
+ self.ip_version = 4
+ self.table = 14
+ self.metric = 100
+ self.cidr = '192.168.45.100/24'
+ self.ip = '10.0.0.1'
+ self.gateway = '192.168.45.100'
+ self.test_cases = [{'sample': GATEWAY_SAMPLE1,
+ 'expected': {'gateway': '10.35.19.254',
+ 'metric': 100}},
+ {'sample': GATEWAY_SAMPLE2,
+ 'expected': {'gateway': '10.35.19.254',
+ 'metric': 100}},
+ {'sample': GATEWAY_SAMPLE3,
+ 'expected': None},
+ {'sample': GATEWAY_SAMPLE4,
+ 'expected': {'gateway': '10.35.19.254'}},
+ {'sample': GATEWAY_SAMPLE5,
+ 'expected': {'gateway': '192.168.99.1'}},
+ {'sample': GATEWAY_SAMPLE6,
+ 'expected': {'gateway': '192.168.99.1',
+ 'metric': 100}}]
def test_add_gateway(self):
- gateway = '192.168.45.100'
- metric = 100
- table = 14
- self.route_cmd.add_gateway(gateway, metric, table)
- self._assert_sudo([],
- ('replace', 'default', 'via', gateway,
- 'metric', metric,
- 'dev', self.parent.name, 'table', table))
+ self.route_cmd.add_gateway(self.gateway, self.metric, self.table)
+ self._assert_sudo([self.ip_version],
+ ('replace', 'default',
+ 'via', self.gateway,
+ 'metric', self.metric,
+ 'dev', self.parent.name,
+ 'table', self.table))
def test_del_gateway(self):
- gateway = '192.168.45.100'
- table = 14
- self.route_cmd.delete_gateway(gateway, table)
- self._assert_sudo([],
- ('del', 'default', 'via', gateway,
- 'dev', self.parent.name, 'table', table))
+ self.route_cmd.delete_gateway(self.gateway, table=self.table)
+ self._assert_sudo([self.ip_version],
+ ('del', 'default',
+ 'via', self.gateway,
+ 'dev', self.parent.name,
+ 'table', self.table))
def test_get_gateway(self):
- test_cases = [{'sample': GATEWAY_SAMPLE1,
- 'expected': {'gateway': '10.35.19.254',
- 'metric': 100}},
- {'sample': GATEWAY_SAMPLE2,
- 'expected': {'gateway': '10.35.19.254',
- 'metric': 100}},
- {'sample': GATEWAY_SAMPLE3,
- 'expected': None},
- {'sample': GATEWAY_SAMPLE4,
- 'expected': {'gateway': '10.35.19.254'}},
- {'sample': GATEWAY_SAMPLE5,
- 'expected': {'gateway': '192.168.99.1'}},
- {'sample': GATEWAY_SAMPLE6,
- 'expected': {'gateway': '192.168.99.1',
- 'metric': 100}}]
- for test_case in test_cases:
+ for test_case in self.test_cases:
self.parent._run = mock.Mock(return_value=test_case['sample'])
self.assertEqual(self.route_cmd.get_gateway(),
test_case['expected'])
def test_pullup_route(self):
+ # NOTE(brian-haley) Currently we do not have any IPv6-specific usecase
+ # for pullup_route, hence skipping. Revisit, if required, in future.
+ if self.ip_version == 6:
+ return
# interface is not the first in the list - requires
# deleting and creating existing entries
output = [DEVICE_ROUTE_SAMPLE, SUBNET_SAMPLE1]
'src', '10.0.0.1', 'dev', 'qr-23380d11-d2'))
def test_pullup_route_first(self):
+ # NOTE(brian-haley) Currently we do not have any IPv6-specific usecase
+ # for pullup_route, hence skipping. Revisit, if required, in future.
+ if self.ip_version == 6:
+ return
# interface is first in the list - no changes
output = [DEVICE_ROUTE_SAMPLE, SUBNET_SAMPLE2]
self.assertEqual(len(self.parent._run.mock_calls), 2)
def test_add_route(self):
- cidr = '192.168.45.100/24'
- ip = '10.0.0.1'
- table = 14
- self.route_cmd.add_route(cidr, ip, table)
- self._assert_sudo([],
- ('replace', cidr, 'via', ip,
- 'dev', self.parent.name, 'table', table))
+ self.route_cmd.add_route(self.cidr, self.ip, self.table)
+ self._assert_sudo([self.ip_version],
+ ('replace', self.cidr,
+ 'via', self.ip,
+ 'dev', self.parent.name,
+ 'table', self.table))
def test_delete_route(self):
- cidr = '192.168.45.100/24'
- ip = '10.0.0.1'
- table = 14
- self.route_cmd.delete_route(cidr, ip, table)
- self._assert_sudo([],
- ('del', cidr, 'via', ip,
- 'dev', self.parent.name, 'table', table))
+ self.route_cmd.delete_route(self.cidr, self.ip, self.table)
+ self._assert_sudo([self.ip_version],
+ ('del', self.cidr,
+ 'via', self.ip,
+ 'dev', self.parent.name,
+ 'table', self.table))
+
+
+class TestIPv6IpRouteCommand(TestIpRouteCommand):
+ def setUp(self):
+ super(TestIPv6IpRouteCommand, self).setUp()
+ self.ip_version = 6
+ self.cidr = '2001:db8::/64'
+ self.ip = '2001:db8::100'
+ self.gateway = '2001:db8::1'
+ self.test_cases = [{'sample': IPv6_GATEWAY_SAMPLE1,
+ 'expected':
+ {'gateway': '2001:470:9:1224:4508:b885:5fb:740b',
+ 'metric': 100}},
+ {'sample': IPv6_GATEWAY_SAMPLE2,
+ 'expected':
+ {'gateway': '2001:470:9:1224:4508:b885:5fb:740b',
+ 'metric': 100}},
+ {'sample': IPv6_GATEWAY_SAMPLE3,
+ 'expected': None},
+ {'sample': IPv6_GATEWAY_SAMPLE4,
+ 'expected':
+ {'gateway': 'fe80::dfcc:aaff:feb9:76ce'}},
+ {'sample': IPv6_GATEWAY_SAMPLE5,
+ 'expected':
+ {'gateway': '2001:470:9:1224:4508:b885:5fb:740b',
+ 'metric': 1024}}]
class TestIpNetnsCommand(TestIPCmdBase):
def test_add_namespace(self):
with mock.patch('neutron.agent.linux.utils.execute') as execute:
ns = self.netns_cmd.add('ns')
- self._assert_sudo([], ('add', 'ns'), force_root_namespace=True)
+ self._assert_sudo([], ('add', 'ns'), use_root_namespace=True)
self.assertEqual(ns.namespace, 'ns')
execute.assert_called_once_with(
['ip', 'netns', 'exec', 'ns',
def test_delete_namespace(self):
with mock.patch('neutron.agent.linux.utils.execute'):
self.netns_cmd.delete('ns')
- self._assert_sudo([], ('delete', 'ns'), force_root_namespace=True)
+ self._assert_sudo([], ('delete', 'ns'), use_root_namespace=True)
def test_namespace_exists_use_helper(self):
self.config(group='AGENT', use_helper_for_ns_read=True)
with mock.patch.object(ip_lib.IPDevice, '_execute') as _execute:
_execute.return_value = LINK_SAMPLE[1]
self.assertTrue(ip_lib.device_exists('eth0'))
- _execute.assert_called_once_with('o', 'link', ('show', 'eth0'),
+ _execute.assert_called_once_with(['o'], 'link', ('show', 'eth0'),
log_fail_as_error=False)
def test_device_does_not_exist(self):
self.neigh_cmd = ip_lib.IpNeighCommand(self.parent)
def test_add_entry(self):
- self.neigh_cmd.add(4, '192.168.45.100', 'cc:dd:ee:ff:ab:cd')
- self._assert_sudo([4], ('replace', '192.168.45.100', 'lladdr',
- 'cc:dd:ee:ff:ab:cd', 'nud', 'permanent',
- 'dev', 'tap0'))
+ self.neigh_cmd.add('192.168.45.100', 'cc:dd:ee:ff:ab:cd')
+ self._assert_sudo([4],
+ ('replace', '192.168.45.100',
+ 'lladdr', 'cc:dd:ee:ff:ab:cd',
+ 'nud', 'permanent',
+ 'dev', 'tap0'))
def test_delete_entry(self):
- self.neigh_cmd.delete(4, '192.168.45.100', 'cc:dd:ee:ff:ab:cd')
- self._assert_sudo([4], ('del', '192.168.45.100', 'lladdr',
- 'cc:dd:ee:ff:ab:cd', 'dev', 'tap0'))
+ self.neigh_cmd.delete('192.168.45.100', 'cc:dd:ee:ff:ab:cd')
+ self._assert_sudo([4],
+ ('del', '192.168.45.100',
+ 'lladdr', 'cc:dd:ee:ff:ab:cd',
+ 'dev', 'tap0'))
class TestArpPing(TestIPCmdBase):
def check_added_address(*args, **kwargs):
mIPDevice.assert_called_once_with(mock.sentinel.iface_name,
namespace=mock.sentinel.ns_name)
- device.addr.add.assert_called_once_with(4, addr + '/32', addr)
+ device.addr.add.assert_called_once_with(addr + '/32')
self.assertFalse(device.addr.delete.called)
device.addr.reset_mock()
# Test that the address was removed after arping
device = mIPDevice(mock.sentinel.iface_name,
namespace=mock.sentinel.ns_name)
- device.addr.delete.assert_called_once_with(4, addr + '/32')
+ device.addr.delete.assert_called_once_with(addr + '/32')
# If this was called then check_added_address probably had a assert
self.assertFalse(device.addr.add.called)