# no additional setup of the DHCP server.
# dhcp_driver = neutron.agent.linux.dhcp.Dnsmasq
-# Allow overlapping IP (Must have kernel build with CONFIG_NET_NS=y and
-# iproute2 package that supports namespaces). This option is deprecated and
-# will be removed in a future release, at which point the old behavior of
-# use_namespaces = True will be enforced.
-# use_namespaces = True
-
# In some cases the neutron router is not present to provide the metadata
# IP but the DHCP server can be used to provide this info. Setting this
# value will force the DHCP server to append specific host routes to the
# Example of interface_driver option for LinuxBridge
# interface_driver = linuxbridge
-# Allow overlapping IP (Must have kernel build with CONFIG_NET_NS=y and
-# iproute2 package that supports namespaces). This option is deprecated and
-# will be removed in a future release, at which point the old behavior of
-# use_namespaces = True will be enforced.
-# use_namespaces = True
-
-# If use_namespaces is set as False then the agent can only configure one router.
-
-# This is done by setting the specific router_id.
+# If non-empty, the l3 agent can only configure a router that has the matching
+# router ID.
# router_id =
# When external_network_bridge is set, each L3 agent can be associated
# report_interval = 300
# interface_driver = neutron.agent.linux.interface.OVSInterfaceDriver
-
-# This option is deprecated and will be removed in a future release,
-# at which point the old behavior of use_namespaces = True will be enforced.
-# use_namespaces = True
help=_("The driver used to manage the virtual interface.")),
]
-USE_NAMESPACES_OPTS = [
- cfg.BoolOpt('use_namespaces', default=True,
- help=_("Allow overlapping IP. This option is deprecated and "
- "will be removed in a future release."),
- deprecated_for_removal=True),
-]
-
IPTABLES_OPTS = [
cfg.BoolOpt('comment_iptables_rules', default=True,
help=_("Add comments to iptables rules.")),
conf.register_opts(INTERFACE_DRIVER_OPTS)
-def register_use_namespaces_opts_helper(conf):
- conf.register_opts(USE_NAMESPACES_OPTS)
-
-
def register_iptables_opts(conf):
conf.register_opts(IPTABLES_OPTS, 'AGENT')
self.cache = NetworkCache()
self.dhcp_driver_cls = importutils.import_class(self.conf.dhcp_driver)
ctx = context.get_admin_context_without_session()
- self.plugin_rpc = DhcpPluginApi(topics.PLUGIN,
- ctx, self.conf.use_namespaces,
- self.conf.host)
+ self.plugin_rpc = DhcpPluginApi(topics.PLUGIN, ctx, self.conf.host)
# create dhcp dir to store dhcp info
dhcp_dir = os.path.dirname("/%s/dhcp/" % self.conf.state_path)
utils.ensure_dir(dhcp_dir)
self.conf
)
for net_id in existing_networks:
- net = dhcp.NetModel(self.conf.use_namespaces,
- {"id": net_id,
- "subnets": [],
- "ports": []})
+ net = dhcp.NetModel({"id": net_id, "subnets": [], "ports": []})
self.cache.put(net)
except NotImplementedError:
# just go ahead with an empty networks cache
"""Disable DHCP for a network known to the agent."""
network = self.cache.get_network_by_id(network_id)
if network:
- if (self.conf.use_namespaces and
- self.conf.enable_isolated_metadata):
+ if self.conf.enable_isolated_metadata:
# NOTE(jschwarz): In the case where a network is deleted, all
# the subnets and ports are deleted before this function is
# called, so checking if 'should_enable_metadata' is True
"""
- def __init__(self, topic, context, use_namespaces, host):
+ def __init__(self, topic, context, host):
self.context = context
self.host = host
- self.use_namespaces = use_namespaces
target = oslo_messaging.Target(
topic=topic,
namespace=constants.RPC_NAMESPACE_DHCP_PLUGIN,
cctxt = self.client.prepare(version='1.1')
networks = cctxt.call(self.context, 'get_active_networks_info',
host=self.host)
- return [dhcp.NetModel(self.use_namespaces, n) for n in networks]
+ return [dhcp.NetModel(n) for n in networks]
def get_network_info(self, network_id):
"""Make a remote process call to retrieve network info."""
network = cctxt.call(self.context, 'get_network_info',
network_id=network_id, host=self.host)
if network:
- return dhcp.NetModel(self.use_namespaces, network)
+ return dhcp.NetModel(network)
def create_dhcp_port(self, port):
"""Make a remote process call to create the dhcp port."""
'topic': topics.DHCP_AGENT,
'configurations': {
'dhcp_driver': self.conf.dhcp_driver,
- 'use_namespaces': self.conf.use_namespaces,
'dhcp_lease_duration': self.conf.dhcp_lease_duration,
'log_agent_heartbeats': self.conf.AGENT.log_agent_heartbeats},
'start_flag': True,
def register_options(conf):
config.register_interface_driver_opts_helper(conf)
- config.register_use_namespaces_opts_helper(conf)
config.register_agent_state_opts_helper(conf)
config.register_availability_zone_opts_helper(conf)
conf.register_opts(dhcp_config.DHCP_AGENT_OPTS)
self.namespaces_manager = namespace_manager.NamespaceManager(
self.conf,
self.driver,
- self.conf.use_namespaces,
self.metadata_driver)
self._queue = queue.RouterProcessingQueue()
LOG.error(msg)
raise SystemExit(1)
- if not self.conf.use_namespaces and not self.conf.router_id:
- msg = _LE('Router id is required if not using namespaces.')
- LOG.error(msg)
- raise SystemExit(1)
-
if self.conf.ipv6_gateway:
# ipv6_gateway configured. Check for valid v6 link-local address.
try:
self.conf.external_network_bridge)
return
- # If namespaces are disabled, only process the router associated
- # with the configured agent id.
- if (not self.conf.use_namespaces and
- router['id'] != self.conf.router_id):
+ if self.conf.router_id and router['id'] != self.conf.router_id:
raise n_exc.RouterNotCompatibleWithAgent(router_id=router['id'])
# Either ex_net_id or handle_internal_only_routers must be set
timestamp = timeutils.utcnow()
try:
- if self.conf.use_namespaces:
- routers = self.plugin_rpc.get_routers(context)
- else:
+ if self.conf.router_id:
routers = self.plugin_rpc.get_routers(context,
[self.conf.router_id])
-
+ else:
+ routers = self.plugin_rpc.get_routers(context)
except oslo_messaging.MessagingException:
LOG.exception(_LE("Failed synchronizing routers due to RPC error"))
raise n_exc.AbortSyncRouters()
'topic': topics.L3_AGENT,
'configurations': {
'agent_mode': self.conf.agent_mode,
- 'use_namespaces': self.conf.use_namespaces,
'router_id': self.conf.router_id,
'handle_internal_only_routers':
self.conf.handle_internal_only_routers,
help=_("Send this many gratuitous ARPs for HA setup, if "
"less than or equal to 0, the feature is disabled")),
cfg.StrOpt('router_id', default='',
- help=_("If namespaces is disabled, the l3 agent can only "
- "configure a router that has the matching router ID.")),
+ help=_("If non-empty, the l3 agent can only configure a router "
+ "that has the matching router ID.")),
cfg.BoolOpt('handle_internal_only_routers',
default=True,
help=_("Agent should implement routers with no gateway")),
return [i for i in floating_ips if i['host'] == self.host]
def _handle_fip_nat_rules(self, interface_name):
- """Configures NAT rules for Floating IPs for DVR.
-
- Remove all the rules. This is safe because if
- use_namespaces is set as False then the agent can
- only configure one router, otherwise each router's
- NAT rules will be in their own namespace.
- """
+ """Configures NAT rules for Floating IPs for DVR."""
self.iptables_manager.ipv4['nat'].empty_chain('POSTROUTING')
self.iptables_manager.ipv4['nat'].empty_chain('snat')
dvr_fip_ns.FIP_NS_PREFIX: dvr_fip_ns.FipNamespace,
}
- def __init__(self, agent_conf, driver, clean_stale, metadata_driver=None):
+ def __init__(self, agent_conf, driver, metadata_driver=None):
"""Initialize the NamespaceManager.
:param agent_conf: configuration from l3 agent
:param driver: to perform operations on devices
- :param clean_stale: Whether to try to clean stale namespaces
:param metadata_driver: used to cleanup stale metadata proxy processes
"""
self.agent_conf = agent_conf
self.driver = driver
- self._clean_stale = clean_stale
+ self._clean_stale = True
self.metadata_driver = metadata_driver
if metadata_driver:
self.process_monitor = external_process.ProcessMonitor(
# Invoke the setter for establishing initial SNAT action
self.router = router
self.use_ipv6 = use_ipv6
- self.ns_name = None
- self.router_namespace = None
- if agent_conf.use_namespaces:
- ns = namespaces.RouterNamespace(
- router_id, agent_conf, interface_driver, use_ipv6)
- self.router_namespace = ns
- self.ns_name = ns.name
+ ns = namespaces.RouterNamespace(
+ router_id, agent_conf, interface_driver, use_ipv6)
+ self.router_namespace = ns
+ self.ns_name = ns.name
self.iptables_manager = iptables_manager.IptablesManager(
use_ipv6=use_ipv6,
namespace=self.ns_name)
process_monitor,
self.get_internal_device_name)
- if self.router_namespace:
- self.router_namespace.create()
+ self.router_namespace.create()
@property
def router(self):
self.router[l3_constants.FLOATINGIP_KEY] = []
self.process(agent)
self.disable_radvd()
- if self.router_namespace:
- self.router_namespace.delete()
+ self.router_namespace.delete()
def _internal_network_updated(self, port, subnet_id, prefix, old_prefix,
updated_cidrs):
conf.register_opts(metadata_config.SHARED_OPTS)
conf.register_opts(ha.OPTS)
config.register_interface_driver_opts_helper(conf)
- config.register_use_namespaces_opts_helper(conf)
config.register_agent_state_opts_helper(conf)
conf.register_opts(interface.OPTS)
conf.register_opts(external_process.OPTS)
class NetModel(DictModel):
- def __init__(self, use_namespaces, d):
+ def __init__(self, d):
super(NetModel, self).__init__(d)
- self._ns_name = (use_namespaces and
- "%s%s" % (NS_PREFIX, self.id) or None)
+ self._ns_name = "%s%s" % (NS_PREFIX, self.id)
@property
def namespace(self):
LOG.warning(_LW('Failed trying to delete interface: %s'),
self.interface_name)
- if self.network.namespace:
- ns_ip = ip_lib.IPWrapper(namespace=self.network.namespace)
- try:
- ns_ip.netns.delete(self.network.namespace)
- except RuntimeError:
- LOG.warning(_LW('Failed trying to delete namespace: %s'),
- self.network.namespace)
+ ns_ip = ip_lib.IPWrapper(namespace=self.network.namespace)
+ try:
+ ns_ip.netns.delete(self.network.namespace)
+ except RuntimeError:
+ LOG.warning(_LW('Failed trying to delete namespace: %s'),
+ self.network.namespace)
def _get_value_from_conf_file(self, kind, converter=None):
"""A helper function to read a value from one of the state files."""
for s in network.subnets):
return True
- if not conf.use_namespaces or not conf.enable_isolated_metadata:
+ if not conf.enable_isolated_metadata:
return False
isolated_subnets = cls.get_isolated_subnets(network)
net = netaddr.IPNetwork(subnet.cidr)
ip_cidrs.append('%s/%s' % (gateway, net.prefixlen))
- if (self.conf.enable_isolated_metadata and
- self.conf.use_namespaces):
+ if self.conf.enable_isolated_metadata:
ip_cidrs.append(METADATA_DEFAULT_CIDR)
self.driver.init_l3(interface_name, ip_cidrs,
namespace=network.namespace)
- # ensure that the dhcp interface is first in the list
- if network.namespace is None:
- device = ip_lib.IPDevice(interface_name)
- device.route.pullup_route(interface_name,
- ip_version=constants.IP_VERSION_4)
-
- if self.conf.use_namespaces:
- self._set_default_route(network, interface_name)
- try:
- self._cleanup_stale_devices(network, port)
- except Exception:
- # catch everything as we don't want to fail because of
- # cleanup step
- LOG.error(_LE("Exception during stale dhcp device cleanup"))
+ self._set_default_route(network, interface_name)
+ try:
+ self._cleanup_stale_devices(network, port)
+ except Exception:
+ # catch everything as we don't want to fail because of
+ # cleanup step
+ LOG.error(_LE("Exception during stale dhcp device cleanup"))
return interface_name
def update(self, network, device_name):
"""Update device settings for the network's DHCP on this host."""
- if self.conf.use_namespaces:
- self._set_default_route(network, device_name)
+ self._set_default_route(network, device_name)
def destroy(self, network, device_name):
"""Destroy the device used for the network's DHCP on this host."""
# License for the specific language governing permissions and limitations
# under the License.
+import debtcollector
import eventlet
import netaddr
import os
return retval
+ @debtcollector.removals.remove(message="Will be removed in the N cycle.")
def pullup_route(self, interface_name, ip_version):
"""Ensures that the route entry for the interface is before all
others on the same subnet.
conf = cfg.CONF
conf.register_cli_opts(cli_opts)
agent_config.register_interface_driver_opts_helper(conf)
- agent_config.register_use_namespaces_opts_helper(conf)
conf.register_opts(dhcp_config.DHCP_AGENT_OPTS)
conf.register_opts(dhcp_config.DHCP_OPTS)
conf.register_opts(dhcp_config.DNSMASQ_OPTS)
conf.dhcp_driver,
conf=conf,
process_monitor=_get_dhcp_process_monitor(conf),
- network=dhcp.NetModel(conf.use_namespaces, {'id': network_id}),
+ network=dhcp.NetModel({'id': network_id}),
plugin=FakeDhcpPlugin())
if dhcp_driver.active:
conf.register_opts(l3_config.OPTS)
conf.register_opts(interface.OPTS)
agent_config.register_interface_driver_opts_helper(conf)
- agent_config.register_use_namespaces_opts_helper(conf)
return conf
continue
agent_conf = self.get_configuration_dict(l3_agent)
router_id = agent_conf.get('router_id', None)
- use_namespaces = agent_conf.get('use_namespaces', True)
handle_internal_only_routers = agent_conf.get(
'handle_internal_only_routers', True)
gateway_external_network_id = agent_conf.get(
'gateway_external_network_id', None)
agent_mode = agent_conf.get(constants.L3_AGENT_MODE,
constants.L3_AGENT_MODE_LEGACY)
- if not use_namespaces and router_id != sync_router['id']:
+ if router_id and router_id != sync_router['id']:
continue
ex_net_id = (sync_router['external_gateway_info'] or {}).get(
'network_id')
continue
router_id = agent_conf.get('router_id', None)
- use_namespaces = agent_conf.get('use_namespaces', True)
- if not use_namespaces and router_id != sync_router['id']:
+ if router_id and router_id != sync_router['id']:
continue
handle_internal_only_routers = agent_conf.get(
from neutron.agent.linux import dhcp
from neutron.agent.linux import ip_lib
-from neutron.agent.linux import utils
from neutron.common import constants
from neutron.i18n import _LW
port = self._create_port(network, device_owner)
interface_name = self.driver.get_device_name(port)
- namespace = None
- if self.conf.use_namespaces:
- namespace = self._get_namespace(port)
+ namespace = self._get_namespace(port)
if ip_lib.device_exists(interface_name, namespace=namespace):
LOG.debug('Reusing existing device: %s.', interface_name)
bridge = self.conf.external_network_bridge
ip = ip_lib.IPWrapper()
namespace = self._get_namespace(port)
- if self.conf.use_namespaces and ip.netns.exists(namespace):
+ if ip.netns.exists(namespace):
self.driver.unplug(self.driver.get_device_name(port),
bridge=bridge,
namespace=namespace)
port = dhcp.DictModel(self.client.show_port(port_id)['port'])
ip = ip_lib.IPWrapper()
namespace = self._get_namespace(port)
- if self.conf.use_namespaces:
- if not command:
- return "sudo ip netns exec %s" % self._get_namespace(port)
- namespace = ip.ensure_namespace(namespace)
- return namespace.netns.execute(shlex.split(command))
- else:
- return utils.execute(shlex.split(command))
+ if not command:
+ return "sudo ip netns exec %s" % self._get_namespace(port)
+ namespace = ip.ensure_namespace(namespace)
+ return namespace.netns.execute(shlex.split(command))
def ensure_probe(self, network_id):
ports = self.client.list_ports(network_id=network_id,
cfg.CONF.register_opts(interface.OPTS)
cfg.CONF.register_opts(debug_agent.NeutronDebugAgent.OPTS)
config.register_interface_driver_opts_helper(cfg.CONF)
- config.register_use_namespaces_opts_helper(cfg.CONF)
cfg.CONF(['--config-file', self.options.config_file])
config.setup_logging()
driver = utils.load_interface_driver(cfg.CONF)
router['id']
for router in routers
if router['tenant_id'] == tenant_id]
- local_ns_list = (root_ip.get_namespaces()
- if self.conf.use_namespaces else [])
+ local_ns_list = root_ip.get_namespaces()
router_info_list = []
# Pick up namespaces for Tenant Routers
# the router - but this is not yet populated in router_info
if rid not in self.router_info:
continue
- if self.conf.use_namespaces:
- router_ns = self.router_info[rid].ns_name
- if router_ns in local_ns_list:
- router_info_list.append(self.router_info[rid])
- else:
+ router_ns = self.router_info[rid].ns_name
+ if router_ns in local_ns_list:
router_info_list.append(self.router_info[rid])
return router_info_list
LABEL = '-l-'
config.register_interface_driver_opts_helper(cfg.CONF)
-config.register_use_namespaces_opts_helper(cfg.CONF)
cfg.CONF.register_opts(interface.OPTS)
self.conf = conf
self.id = router['id']
self.router = router
- self.ns_name = NS_PREFIX + self.id if conf.use_namespaces else None
+ # TODO(cbrandily): deduplicate ns_name generation in metering/l3
+ self.ns_name = NS_PREFIX + self.id
self.iptables_manager = iptables_manager.IptablesManager(
namespace=self.ns_name,
binary_name=WRAP_NAME,
'handle_internal_only_routers': internal_only,
'external_network_bridge': ext_bridge,
'gateway_external_network_id': ext_net_id,
- 'router_id': router_id,
- 'use_namespaces': router_id is None}}
+ 'router_id': router_id}}
def _register_agent(agent):
'agent_type': constants.AGENT_TYPE_DHCP,
'availability_zone': az,
'configurations': {'dhcp_driver': 'dhcp_driver',
- 'use_namespaces': True,
'networks': networks}}
return agent
self.agent_conf = mock.MagicMock()
self.metadata_driver_mock = mock.Mock()
self.namespace_manager = namespace_manager.NamespaceManager(
- self.agent_conf, driver=None, clean_stale=True,
+ self.agent_conf, driver=None,
metadata_driver=self.metadata_driver_mock)
def _create_namespace(self, router_id, ns_class):
super(TestDhcp, self).setUp()
conf = cfg.ConfigOpts()
conf.register_opts(config.INTERFACE_DRIVER_OPTS)
- conf.register_opts(config.USE_NAMESPACES_OPTS)
conf.register_opts(interface.OPTS)
conf.register_opts(common_conf.core_opts)
conf.register_opts(dhcp_conf.DHCP_AGENT_OPTS)
conf.set_override('interface_driver', 'openvswitch')
conf.set_override('host', 'foo_host')
- conf.set_override('use_namespaces', True)
self.conf = conf
br_int = self.useFixture(net_helpers.OVSBridgeFixture()).bridge
self.conf.set_override('ovs_integration_bridge', br_int.br_name)
def create_network_dict(self, net_id, subnets=None, ports=None):
subnets = [] if not subnets else subnets
ports = [] if not ports else ports
- net_dict = dhcp.NetModel(use_namespaces=True, d={
+ net_dict = dhcp.NetModel(d={
"id": net_id,
"subnets": subnets,
"ports": ports,
FAKE_NETWORK_UUID = '12345678-1234-5678-1234567890ab'
FAKE_NETWORK_DHCP_NS = "qdhcp-%s" % FAKE_NETWORK_UUID
-fake_network = dhcp.NetModel(True, dict(id=FAKE_NETWORK_UUID,
+fake_network = dhcp.NetModel(dict(id=FAKE_NETWORK_UUID,
tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
admin_state_up=True,
subnets=[fake_subnet1, fake_subnet2],
ports=[fake_port1]))
-fake_network_ipv6 = dhcp.NetModel(True, dict(
+fake_network_ipv6 = dhcp.NetModel(dict(
id='12345678-1234-5678-1234567890ab',
tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
admin_state_up=True,
subnets=[fake_ipv6_subnet],
ports=[fake_ipv6_port]))
-fake_network_ipv6_ipv4 = dhcp.NetModel(True, dict(
+fake_network_ipv6_ipv4 = dhcp.NetModel(dict(
id='12345678-1234-5678-1234567890ab',
tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
admin_state_up=True,
ports=[fake_port1]))
isolated_network = dhcp.NetModel(
- True, dict(
+ dict(
id='12345678-1234-5678-1234567890ab',
tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
admin_state_up=True,
ports=[fake_port1]))
nonisolated_dist_network = dhcp.NetModel(
- True, dict(
+ dict(
id='12345678-1234-5678-1234567890ab',
tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
admin_state_up=True,
ports=[fake_port1, fake_port2]))
empty_network = dhcp.NetModel(
- True, dict(
+ dict(
id='12345678-1234-5678-1234567890ab',
tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
admin_state_up=True,
ports=[]))
fake_meta_network = dhcp.NetModel(
- True, dict(id='12345678-1234-5678-1234567890ab',
- tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
- admin_state_up=True,
- subnets=[fake_meta_subnet],
- ports=[fake_meta_port]))
+ dict(id='12345678-1234-5678-1234567890ab',
+ tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
+ admin_state_up=True,
+ subnets=[fake_meta_subnet],
+ ports=[fake_meta_port]))
-fake_meta_dvr_network = dhcp.NetModel(True, fake_meta_network.copy())
+fake_meta_dvr_network = dhcp.NetModel(fake_meta_network.copy())
fake_meta_dvr_network.ports = [fake_meta_dvr_port]
fake_dist_network = dhcp.NetModel(
- True, dict(id='12345678-1234-5678-1234567890ab',
- tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
- admin_state_up=True,
- subnets=[fake_meta_subnet],
- ports=[fake_meta_port, fake_dist_port]))
+ dict(id='12345678-1234-5678-1234567890ab',
+ tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
+ admin_state_up=True,
+ subnets=[fake_meta_subnet],
+ ports=[fake_meta_port, fake_dist_port]))
fake_down_network = dhcp.NetModel(
- True, dict(id='12345678-dddd-dddd-1234567890ab',
- tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
- admin_state_up=False,
- subnets=[],
- ports=[]))
+ dict(id='12345678-dddd-dddd-1234567890ab',
+ tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
+ admin_state_up=False,
+ subnets=[],
+ ports=[]))
class TestDhcpAgent(base.BaseTestCase):
disable.assert_called_once_with(fake_network.id)
def test_refresh_dhcp_helper_no_dhcp_enabled_networks(self):
- network = dhcp.NetModel(True, dict(id='net-id',
+ network = dhcp.NetModel(dict(id='net-id',
tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
admin_state_up=True,
subnets=[],
[mock.call.get_network_by_id('net-id')])
def test_refresh_dhcp_helper_exception_during_rpc(self):
- network = dhcp.NetModel(True, dict(id='net-id',
+ network = dhcp.NetModel(dict(id='net-id',
tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
admin_state_up=True,
subnets=[],
fake_network)
def test_subnet_update_end_restart(self):
- new_state = dhcp.NetModel(True, dict(id=fake_network.id,
+ new_state = dhcp.NetModel(dict(id=fake_network.id,
tenant_id=fake_network.tenant_id,
admin_state_up=True,
subnets=[fake_subnet1, fake_subnet3],
new_state)
def test_subnet_update_end_delete_payload(self):
- prev_state = dhcp.NetModel(True, dict(id=fake_network.id,
+ prev_state = dhcp.NetModel(dict(id=fake_network.id,
tenant_id=fake_network.tenant_id,
admin_state_up=True,
subnets=[fake_subnet1, fake_subnet3],
class TestDhcpPluginApiProxy(base.BaseTestCase):
def _test_dhcp_api(self, method, **kwargs):
ctxt = context.get_admin_context()
- proxy = dhcp_agent.DhcpPluginApi('foo', ctxt, None, host='foo')
+ proxy = dhcp_agent.DhcpPluginApi('foo', ctxt, host='foo')
with mock.patch.object(proxy.client, 'call') as rpc_mock,\
mock.patch.object(proxy.client, 'prepare') as prepare_mock:
def test_put_port(self):
fake_net = dhcp.NetModel(
- True, dict(id='12345678-1234-5678-1234567890ab',
- tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
- subnets=[fake_subnet1],
- ports=[fake_port1]))
+ dict(id='12345678-1234-5678-1234567890ab',
+ tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
+ subnets=[fake_subnet1],
+ ports=[fake_port1]))
nc = dhcp_agent.NetworkCache()
nc.put(fake_net)
nc.put_port(fake_port2)
def test_put_port_existing(self):
fake_net = dhcp.NetModel(
- True, dict(id='12345678-1234-5678-1234567890ab',
- tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
- subnets=[fake_subnet1],
- ports=[fake_port1, fake_port2]))
+ dict(id='12345678-1234-5678-1234567890ab',
+ tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
+ subnets=[fake_subnet1],
+ ports=[fake_port1, fake_port2]))
nc = dhcp_agent.NetworkCache()
nc.put(fake_net)
nc.put_port(fake_port2)
def test_remove_port_existing(self):
fake_net = dhcp.NetModel(
- True, dict(id='12345678-1234-5678-1234567890ab',
- tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
- subnets=[fake_subnet1],
- ports=[fake_port1, fake_port2]))
+ dict(id='12345678-1234-5678-1234567890ab',
+ tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa',
+ subnets=[fake_subnet1],
+ ports=[fake_port1, fake_port2]))
nc = dhcp_agent.NetworkCache()
nc.put(fake_net)
nc.remove_port(fake_port2)
def setUp(self):
super(TestDeviceManager, self).setUp()
config.register_interface_driver_opts_helper(cfg.CONF)
- config.register_use_namespaces_opts_helper(cfg.CONF)
cfg.CONF.register_opts(dhcp_config.DHCP_AGENT_OPTS)
cfg.CONF.set_override('interface_driver',
'neutron.agent.linux.interface.NullDriver')
- cfg.CONF.set_override('use_namespaces', True)
cfg.CONF.set_override('enable_isolated_metadata', True)
self.ensure_device_is_ready_p = mock.patch(
def test_destroy(self):
fake_net = dhcp.NetModel(
- True, dict(id=FAKE_NETWORK_UUID,
- tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa'))
+ dict(id=FAKE_NETWORK_UUID,
+ tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa'))
with mock.patch('neutron.agent.linux.interface.NullDriver') as dvr_cls:
mock_driver = mock.MagicMock()
def test_destroy_with_none(self):
fake_net = dhcp.NetModel(
- True, dict(id=FAKE_NETWORK_UUID,
- tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa'))
+ dict(id=FAKE_NETWORK_UUID,
+ tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa'))
with mock.patch('neutron.agent.linux.interface.NullDriver') as dvr_cls:
mock_driver = mock.MagicMock()
def test_get_interface_name(self):
fake_net = dhcp.NetModel(
- True, dict(id='12345678-1234-5678-1234567890ab',
- tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa'))
+ dict(id='12345678-1234-5678-1234567890ab',
+ tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa'))
fake_port = dhcp.DictModel(
dict(id='12345678-1234-aaaa-1234567890ab',
def test_get_device_id(self):
fake_net = dhcp.NetModel(
- True, dict(id='12345678-1234-5678-1234567890ab',
- tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa'))
+ dict(id='12345678-1234-5678-1234567890ab',
+ tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa'))
expected = ('dhcp1ae5f96c-c527-5079-82ea-371a01645457-12345678-1234-'
'5678-1234567890ab')
def test_update(self):
# Try with namespaces and no metadata network
- cfg.CONF.set_override('use_namespaces', True)
cfg.CONF.set_override('enable_metadata_network', False)
dh = dhcp.DeviceManager(cfg.CONF, None)
dh._set_default_route = mock.Mock()
dh._set_default_route.assert_called_once_with(network,
'ns-12345678-12')
- # No namespaces, shouldn't set default route.
- cfg.CONF.set_override('use_namespaces', False)
- cfg.CONF.set_override('enable_metadata_network', False)
- dh = dhcp.DeviceManager(cfg.CONF, None)
- dh._set_default_route = mock.Mock()
-
- dh.update(FakeV4Network(), 'tap12345678-12')
-
- self.assertFalse(dh._set_default_route.called)
-
# Meta data network enabled, don't interfere with its gateway.
- cfg.CONF.set_override('use_namespaces', True)
cfg.CONF.set_override('enable_metadata_network', True)
dh = dhcp.DeviceManager(cfg.CONF, None)
dh._set_default_route = mock.Mock()
self.assertTrue(dh._set_default_route.called)
- # For completeness
- cfg.CONF.set_override('use_namespaces', False)
- cfg.CONF.set_override('enable_metadata_network', True)
- dh = dhcp.DeviceManager(cfg.CONF, None)
- dh._set_default_route = mock.Mock()
-
- dh.update(FakeV4Network(), 'ns-12345678-12')
-
- self.assertFalse(dh._set_default_route.called)
-
def test_set_default_route(self):
dh = dhcp.DeviceManager(cfg.CONF, None)
with mock.patch.object(dhcp.ip_lib, 'IPDevice') as mock_IPDevice:
m = dhcp.DictModel(d)
self.assertEqual(m.a[0].b, 2)
self.assertEqual(m.a[1].c, 3)
-
-
-class TestNetModel(base.BaseTestCase):
- def test_ns_name(self):
- network = dhcp.NetModel(True, {'id': 'foo'})
- self.assertEqual(network.namespace, 'qdhcp-foo')
-
- def test_ns_name_false_namespace(self):
- network = dhcp.NetModel(False, {'id': 'foo'})
- self.assertIsNone(network.namespace)
-
- def test_ns_name_none_namespace(self):
- network = dhcp.NetModel(None, {'id': 'foo'})
- self.assertIsNone(network.namespace)
self.conf.register_opts(l3_config.OPTS)
self.conf.register_opts(ha.OPTS)
agent_config.register_interface_driver_opts_helper(self.conf)
- agent_config.register_use_namespaces_opts_helper(self.conf)
agent_config.register_process_monitor_opts(self.conf)
agent_config.register_availability_zone_opts_helper(self.conf)
self.conf.register_opts(interface.OPTS)
self.conf.register_opts(external_process.OPTS)
- self.conf.set_override('router_id', 'fake_id')
self.conf.set_override('interface_driver',
'neutron.agent.linux.interface.NullDriver')
self.conf.set_override('send_arp_for_ha', 1)
self._configure_metadata_proxy(enableflag=False)
def test_router_id_specified_in_conf(self):
- self.conf.set_override('use_namespaces', False)
- self.conf.set_override('router_id', '')
- self.assertRaises(SystemExit, l3_agent.L3NATAgent,
- HOSTNAME, self.conf)
-
self.conf.set_override('router_id', '1234')
- agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
- self.assertEqual('1234', agent.conf.router_id)
- self.assertFalse(agent.namespaces_manager._clean_stale)
+ self._configure_metadata_proxy()
def test_process_routers_update_rpc_timeout_on_get_routers(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
self.conf.register_opts(l3_config.OPTS)
self.conf.register_opts(ha.OPTS)
agent_config.register_interface_driver_opts_helper(self.conf)
- agent_config.register_use_namespaces_opts_helper(self.conf)
agent_config.register_process_monitor_opts(self.conf)
self.conf.register_opts(interface.OPTS)
self.conf.register_opts(external_process.OPTS)
if not router:
router = mock.MagicMock()
self.agent_conf = mock.Mock()
- # NOTE The use_namespaces config will soon be deprecated
- self.agent_conf.use_namespaces = True
self.router_id = _uuid()
return ha_router.HaRouter(mock.sentinel.enqueue_state,
self.router_id,
def _create_namespace_manager(self):
self.agent_conf = mock.Mock()
self.driver = mock.Mock()
- return namespace_manager.NamespaceManager(self.agent_conf,
- self.driver, True)
+ return namespace_manager.NamespaceManager(self.agent_conf, self.driver)
class TestNamespaceManager(NamespaceManagerTestCaseFramework):
super(TestRouterInfo, self).setUp()
conf = agent_config.setup_conf()
- conf.use_namespaces = True
self.ip_cls_p = mock.patch('neutron.agent.linux.ip_lib.IPWrapper')
ip_cls = self.ip_cls_p.start()
if not router:
router = mock.MagicMock()
self.agent_conf = mock.Mock()
- # NOTE The use_namespaces config will soon be deprecated
- self.agent_conf.use_namespaces = True
self.router_id = _uuid()
return router_info.RouterInfo(self.router_id,
router,
self.conf.register_opts(dhcp_config.DNSMASQ_OPTS)
self.conf.register_opts(external_process.OPTS)
config.register_interface_driver_opts_helper(self.conf)
- config.register_use_namespaces_opts_helper(self.conf)
class TestBase(TestConfBase):
self.safe.assert_has_calls([mock.call(exp_host_name, exp_host_data),
mock.call(exp_opt_name, exp_opt_data)])
- def test_should_enable_metadata_namespaces_disabled_returns_false(self):
- self.conf.set_override('use_namespaces', False)
- self.assertFalse(dhcp.Dnsmasq.should_enable_metadata(self.conf,
- mock.ANY))
-
def test_should_enable_metadata_isolated_network_returns_true(self):
self.assertTrue(dhcp.Dnsmasq.should_enable_metadata(
self.conf, FakeV4NetworkNoRouter()))
agent_config.register_interface_driver_opts_helper(cfg.CONF)
cfg.CONF.set_override('interface_driver',
'neutron.agent.linux.interface.NullDriver')
- agent_config.register_use_namespaces_opts_helper(cfg.CONF)
mock.patch('neutron.agent.l3.agent.L3PluginApi').start()
mock.patch('neutron.agent.l3.ha.AgentMixin'
cfg.CONF.register_opts(debug_agent.NeutronDebugAgent.OPTS)
common_config.init([])
config.register_interface_driver_opts_helper(cfg.CONF)
- config.register_use_namespaces_opts_helper(cfg.CONF)
- cfg.CONF.set_override('use_namespaces', True)
device_exists_p = mock.patch(
'neutron.agent.linux.ip_lib.device_exists', return_value=False)
namespace=namespace,
bridge='br-ex')])
- def test_delete_probe_without_namespace(self):
- cfg.CONF.set_override('use_namespaces', False)
- cmd = commands.DeleteProbe(self.app, None)
- cmd_parser = cmd.get_parser('delete_probe')
- args = ['fake_port']
- parsed_args = cmd_parser.parse_args(args)
- cmd.run(parsed_args)
- self.client.assert_has_calls([mock.call.show_port('fake_port'),
- mock.call.show_network('fake_net'),
- mock.call.show_subnet('fake_subnet'),
- mock.call.delete_port('fake_port')])
- self.driver.assert_has_calls([mock.call.get_device_name(mock.ANY),
- mock.call.unplug('tap12345678-12',
- bridge=None)])
-
def test_list_probe(self):
cmd = commands.ListProbe(self.app, None)
cmd_parser = cmd.get_parser('list_probe')
ns.assert_has_calls([mock.call.execute(mock.ANY)])
self.client.assert_has_calls([mock.call.show_port('fake_port')])
- def test_exec_command_without_namespace(self):
- cfg.CONF.set_override('use_namespaces', False)
- cmd = commands.ExecProbe(self.app, None)
- cmd_parser = cmd.get_parser('exec_command')
- args = ['fake_port', 'fake_command']
- parsed_args = cmd_parser.parse_args(args)
- with mock.patch('neutron.agent.linux.utils.execute') as exe:
- cmd.run(parsed_args)
- exe.assert_has_calls([mock.call.execute(mock.ANY)])
- self.client.assert_has_calls([mock.call.show_port('fake_port')])
-
def test_clear_probe(self):
cmd = commands.ClearProbe(self.app, None)
cmd_parser = cmd.get_parser('clear_probe')