from oslo_utils import excutils
import six
-from neutron.common import constants as q_const
+from neutron.common import constants as n_const
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
LOG = logging.getLogger(__name__)
if they are required for DVR or any service directly or
indirectly associated with DVR.
"""
- dvr_serviced_device_owners = (q_const.DEVICE_OWNER_LOADBALANCER,
- q_const.DEVICE_OWNER_DHCP)
+ dvr_serviced_device_owners = (n_const.DEVICE_OWNER_LOADBALANCER,
+ n_const.DEVICE_OWNER_DHCP)
return (device_owner.startswith('compute:') or
device_owner in dvr_serviced_device_owners)
raise ValueError("cidr doesn't contain a '/'")
net = netaddr.IPNetwork(cidr)
if net.version == 4:
- return net.prefixlen == q_const.IPv4_BITS
- return net.prefixlen == q_const.IPv6_BITS
+ return net.prefixlen == n_const.IPv4_BITS
+ return net.prefixlen == n_const.IPv6_BITS
def ip_version_from_int(ip_version_int):
if ip_version_int == 4:
- return q_const.IPv4
+ return n_const.IPv4
if ip_version_int == 6:
- return q_const.IPv6
+ return n_const.IPv6
raise ValueError(_('Illegal IP version number'))
import sqlalchemy as sa
from sqlalchemy.orm import exc
-from neutron.common import exceptions as q_exc
+from neutron.common import exceptions as n_exc
from neutron.common import utils
from neutron.db import model_base
from neutron.extensions import dvr as ext_dvr
def get_subnet_for_dvr(self, context, subnet):
try:
subnet_info = self.plugin.get_subnet(context, subnet)
- except q_exc.SubnetNotFound:
+ except n_exc.SubnetNotFound:
return {}
else:
# retrieve the gateway port on this subnet
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
-from neutron.common import constants as q_const
+from neutron.common import constants as n_const
from neutron.common import utils as n_utils
from neutron.db import agents_db
from neutron.db import l3_agentschedulers_db as l3agent_sch_db
subnet = ip['subnet_id']
filter_sub = {'fixed_ips': {'subnet_id': [subnet]},
'device_owner':
- [q_const.DEVICE_OWNER_DVR_INTERFACE]}
+ [n_const.DEVICE_OWNER_DVR_INTERFACE]}
router_id = None
ports = self._core_plugin.get_ports(context, filters=filter_sub)
for port in ports:
vm_subnet = fixedip['subnet_id']
filter_sub = {'fixed_ips': {'subnet_id': [vm_subnet]},
'device_owner':
- [q_const.DEVICE_OWNER_DVR_INTERFACE]}
+ [n_const.DEVICE_OWNER_DVR_INTERFACE]}
subnet_ports = self._core_plugin.get_ports(
context, filters=filter_sub)
for subnet_port in subnet_ports:
continue
filter_rtr = {'device_id': [router_id],
'device_owner':
- [q_const.DEVICE_OWNER_DVR_INTERFACE]}
+ [n_const.DEVICE_OWNER_DVR_INTERFACE]}
int_ports = self._core_plugin.get_ports(
admin_context, filters=filter_rtr)
for prt in int_ports:
dvr_binding['router_id'] = None
dvr_binding.update(dvr_binding)
agent = self._get_agent_by_type_and_host(context,
- q_const.AGENT_TYPE_L3,
+ n_const.AGENT_TYPE_L3,
port_host)
info = {'router_id': router_id, 'host': port_host,
'agent_id': str(agent.id)}
def _get_active_l3_agent_routers_sync_data(self, context, host, agent,
router_ids):
- if n_utils.is_extension_supported(self, q_const.L3_HA_MODE_EXT_ALIAS):
+ if n_utils.is_extension_supported(self, n_const.L3_HA_MODE_EXT_ALIAS):
return self.get_ha_sync_data_for_host(context, host,
router_ids=router_ids,
active=True)
from oslo_log import log as logging
from sqlalchemy.orm import exc
-from neutron.common import constants as q_const
+from neutron.common import constants as n_const
from neutron.common import ipv6_utils as ipv6
from neutron.common import utils
from neutron.db import allowedaddresspairs_db as addr_pair
DIRECTION_IP_PREFIX = {'ingress': 'source_ip_prefix',
'egress': 'dest_ip_prefix'}
-DHCP_RULE_PORT = {4: (67, 68, q_const.IPv4), 6: (547, 546, q_const.IPv6)}
+DHCP_RULE_PORT = {4: (67, 68, n_const.IPv4), 6: (547, 546, n_const.IPv6)}
class SecurityGroupServerRpcMixin(sg_db.SecurityGroupDbMixin):
sg_provider_updated_networks = set()
sec_groups = set()
for port in ports:
- if port['device_owner'] == q_const.DEVICE_OWNER_DHCP:
+ if port['device_owner'] == n_const.DEVICE_OWNER_DHCP:
sg_provider_updated_networks.add(
port['network_id'])
# For IPv6, provider rule need to be updated in case router
# interface is created or updated after VM port is created.
- elif port['device_owner'] == q_const.DEVICE_OWNER_ROUTER_INTF:
+ elif port['device_owner'] == n_const.DEVICE_OWNER_ROUTER_INTF:
if any(netaddr.IPAddress(fixed_ip['ip_address']).version == 6
for fixed_ip in port['fixed_ips']):
sg_provider_updated_networks.add(
models_v2.IPAllocation.ip_address)
query = query.join(models_v2.IPAllocation)
query = query.filter(models_v2.Port.network_id.in_(network_ids))
- owner = q_const.DEVICE_OWNER_DHCP
+ owner = n_const.DEVICE_OWNER_DHCP
query = query.filter(models_v2.Port.device_owner == owner)
ips = {}
for mac_address, network_id, ip in query:
if (netaddr.IPAddress(ip).version == 6
and not netaddr.IPAddress(ip).is_link_local()):
- ip = str(ipv6.get_ipv6_addr_by_EUI64(q_const.IPV6_LLA_PREFIX,
+ ip = str(ipv6.get_ipv6_addr_by_EUI64(n_const.IPV6_LLA_PREFIX,
mac_address))
if ip not in ips[network_id]:
ips[network_id].append(ip)
query = query.filter(
models_v2.IPAllocation.ip_address == subnet['gateway_ip'])
query = query.filter(
- models_v2.Port.device_owner.in_(q_const.ROUTER_INTERFACE_OWNERS))
+ models_v2.Port.device_owner.in_(n_const.ROUTER_INTERFACE_OWNERS))
try:
mac_address = query.one()[0]
except (exc.NoResultFound, exc.MultipleResultsFound):
'found for IPv6 RA'), subnet['id'])
return
lla_ip = str(ipv6.get_ipv6_addr_by_EUI64(
- q_const.IPV6_LLA_PREFIX,
+ n_const.IPV6_LLA_PREFIX,
mac_address))
return lla_ip
ra_ips = ips.get(port['network_id'])
for ra_ip in ra_ips:
ra_rule = {'direction': 'ingress',
- 'ethertype': q_const.IPv6,
- 'protocol': q_const.PROTO_NAME_ICMP_V6,
+ 'ethertype': n_const.IPv6,
+ 'protocol': n_const.PROTO_NAME_ICMP_V6,
'source_ip_prefix': ra_ip,
- 'source_port_range_min': q_const.ICMPV6_TYPE_RA}
+ 'source_port_range_min': n_const.ICMPV6_TYPE_RA}
port['security_group_rules'].append(ra_rule)
def _apply_provider_rule(self, context, ports):
from neutron.api.rpc.handlers import l3_rpc
from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
-from neutron.common import constants as q_const
+from neutron.common import constants as n_const
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
LOG.debug("Device %(device)s details requested from %(agent_id)s",
{'device': device, 'agent_id': agent_id})
port = brocade_db.get_port(rpc_context,
- device[len(q_const.TAP_DEVICE_PREFIX):])
+ device[len(n_const.TAP_DEVICE_PREFIX):])
if port:
entry = {'device': device,
'vlan_id': port.vlan_id,
# Doing what other plugins are doing
session = db.get_session()
port = brocade_db.get_port_from_device(
- session, device[len(q_const.TAP_DEVICE_PREFIX):])
+ session, device[len(n_const.TAP_DEVICE_PREFIX):])
# TODO(shiv): need to extend the db model to include device owners
# make it appears that the device owner is of type network
# Consume from all consumers in threads
self.conn.consume_in_threads()
self.notifier = AgentNotifierApi(topics.AGENT)
- self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
+ self.agent_notifiers[n_const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
)
- self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
+ self.agent_notifiers[n_const.AGENT_TYPE_L3] = (
l3_rpc_agent_api.L3AgentNotifyAPI()
)
# under the License.
import eventlet
-from oslo_config import cfg as q_conf
+from oslo_config import cfg as o_conf
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import importutils
self._setup_vsm()
self._setup_rpc()
self.network_scheduler = importutils.import_object(
- q_conf.CONF.network_scheduler_driver
+ o_conf.CONF.network_scheduler_driver
)
self.start_periodic_dhcp_agent_status_check()
from neutron.common import constants
from neutron.common import exceptions
from neutron.common import topics
-from neutron.common import utils as q_utils
+from neutron.common import utils as n_utils
from neutron import context
from neutron.i18n import _LE, _LI, _LW
from neutron.plugins.common import constants as p_const
common_config.setup_logging()
try:
- interface_mappings = q_utils.parse_mappings(
+ interface_mappings = n_utils.parse_mappings(
cfg.CONF.LINUX_BRIDGE.physical_interface_mappings)
except ValueError as e:
LOG.error(_LE("Parsing physical_interface_mappings failed: %s. "
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import config as common_config
-from neutron.common import constants as q_constants
+from neutron.common import constants as n_constants
from neutron.common import topics
-from neutron.common import utils as q_utils
+from neutron.common import utils as n_utils
from neutron import context
from neutron.i18n import _LE, _LI
from neutron.plugins.ml2.drivers.mech_sriov.agent.common import config # noqa
self.agent_state = {
'binary': 'neutron-sriov-nic-agent',
'host': cfg.CONF.host,
- 'topic': q_constants.L2_AGENT_TOPIC,
+ 'topic': n_constants.L2_AGENT_TOPIC,
'configurations': configurations,
- 'agent_type': q_constants.AGENT_TYPE_NIC_SWITCH,
+ 'agent_type': n_constants.AGENT_TYPE_NIC_SWITCH,
'start_flag': True}
# Stores port update notifications for processing in the main loop
Parse and validate the consistency in both mappings
"""
- self.device_mappings = q_utils.parse_mappings(
+ self.device_mappings = n_utils.parse_mappings(
cfg.CONF.SRIOV_NIC.physical_device_mappings)
self.exclude_devices = config.parse_exclude_devices(
cfg.CONF.SRIOV_NIC.exclude_devices)
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import dvr_rpc
from neutron.common import config
-from neutron.common import constants as q_const
+from neutron.common import constants as n_const
from neutron.common import exceptions
from neutron.common import topics
-from neutron.common import utils as q_utils
+from neutron.common import utils as n_utils
from neutron import context
from neutron.i18n import _LE, _LI, _LW
from neutron.plugins.common import constants as p_const
self.agent_state = {
'binary': 'neutron-openvswitch-agent',
'host': self.conf.host,
- 'topic': q_const.L2_AGENT_TOPIC,
+ 'topic': n_const.L2_AGENT_TOPIC,
'configurations': {'bridge_mappings': bridge_mappings,
'tunnel_types': self.tunnel_types,
'tunneling_ip': local_ip,
self.enable_distributed_routing,
'log_agent_heartbeats':
self.conf.AGENT.log_agent_heartbeats},
- 'agent_type': q_const.AGENT_TYPE_OVS,
+ 'agent_type': n_const.AGENT_TYPE_OVS,
'start_flag': True}
if tunnel_types:
agent_ports, self._tunnel_port_lookup)
def add_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
- if port_info == q_const.FLOODING_ENTRY:
+ if port_info == n_const.FLOODING_ENTRY:
lvm.tun_ofports.add(ofport)
br.install_flood_to_tun(lvm.vlan, lvm.segmentation_id,
lvm.tun_ofports)
port_info.mac_address)
def del_fdb_flow(self, br, port_info, remote_ip, lvm, ofport):
- if port_info == q_const.FLOODING_ENTRY:
+ if port_info == n_const.FLOODING_ENTRY:
if ofport not in lvm.tun_ofports:
LOG.debug("attempt to remove a non-existent port %s", ofport)
return
The peer name can not exceed the maximum length allowed for a linux
device. Longer names are hashed to help ensure uniqueness.
"""
- if len(prefix + name) <= q_const.DEVICE_NAME_MAX_LEN:
+ if len(prefix + name) <= n_const.DEVICE_NAME_MAX_LEN:
return prefix + name
# We can't just truncate because bridges may be distinguished
# by an ident at the end. A hash over the name should be unique.
# Leave part of the bridge name on for easier identification
hashlen = 6
- namelen = q_const.DEVICE_NAME_MAX_LEN - len(prefix) - hashlen
+ namelen = n_const.DEVICE_NAME_MAX_LEN - len(prefix) - hashlen
new_name = ('%(prefix)s%(truncated)s%(hash)s' %
{'prefix': prefix, 'truncated': name[0:namelen],
'hash': hashlib.sha1(name).hexdigest()[0:hashlen]})
LOG.warning(_LW("Creating an interface named %(name)s exceeds the "
"%(limit)d character limitation. It was shortened to "
"%(new_name)s to fit."),
- {'name': name, 'limit': q_const.DEVICE_NAME_MAX_LEN,
+ {'name': name, 'limit': n_const.DEVICE_NAME_MAX_LEN,
'new_name': new_name})
return new_name
:returns: a map of agent configuration parameters
"""
try:
- bridge_mappings = q_utils.parse_mappings(config.OVS.bridge_mappings)
+ bridge_mappings = n_utils.parse_mappings(config.OVS.bridge_mappings)
except ValueError as e:
raise ValueError(_("Parsing bridge_mappings failed: %s.") % e)
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
-from neutron.common import constants as q_const
+from neutron.common import constants as n_const
from neutron.common import exceptions
from neutron.common import rpc as n_rpc
from neutron.common import topics
return {'device': device}
if (not host or host == port_context.host):
- new_status = (q_const.PORT_STATUS_BUILD if port['admin_state_up']
- else q_const.PORT_STATUS_DOWN)
+ new_status = (n_const.PORT_STATUS_BUILD if port['admin_state_up']
+ else n_const.PORT_STATUS_DOWN)
if port['status'] != new_status:
plugin.update_port_status(rpc_context,
port_id,
try:
port_exists = bool(plugin.update_port_status(
- rpc_context, port_id, q_const.PORT_STATUS_DOWN, host))
+ rpc_context, port_id, n_const.PORT_STATUS_DOWN, host))
except exc.StaleDataError:
port_exists = False
LOG.debug("delete_port and update_device_down are being executed "
return
port_id = plugin.update_port_status(rpc_context, port_id,
- q_const.PORT_STATUS_ACTIVE,
+ n_const.PORT_STATUS_ACTIVE,
host)
try:
# NOTE(armax): it's best to remove all objects from the
from neutron.api.rpc.handlers import l3_rpc
from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.rpc.handlers import securitygroups_rpc
-from neutron.common import constants as q_const
+from neutron.common import constants as n_const
from neutron.common import exceptions as nexception
from neutron.common import rpc as n_rpc
from neutron.common import topics
svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = n_rpc.create_connection(new=True)
self.notifier = NVSDPluginV2AgentNotifierApi(topics.AGENT)
- self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
+ self.agent_notifiers[n_const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
)
- self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
+ self.agent_notifiers[n_const.AGENT_TYPE_L3] = (
l3_rpc_agent_api.L3AgentNotifyAPI()
)
self.endpoints = [securitygroups_rpc.SecurityGroupServerRpcCallback(),
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import l3_rpc
-from neutron.common import constants as q_const
-from neutron.common import rpc as q_rpc
+from neutron.common import constants as n_const
+from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron import context as nctx
from neutron.db import db_base_plugin_v2
def setup_rpc(self):
# RPC support
self.topic = topics.L3PLUGIN
- self.conn = q_rpc.create_connection(new=True)
+ self.conn = n_rpc.create_connection(new=True)
self.agent_notifiers.update(
- {q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
+ {n_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
self.endpoints = [l3_rpc.L3RpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.rpc.handlers import l3_rpc
-from neutron.common import constants as q_const
+from neutron.common import constants as n_const
from neutron.common import log as neutron_log
from neutron.common import rpc as n_rpc
from neutron.common import topics
self.topic = topics.L3PLUGIN
self.conn = n_rpc.create_connection(new=True)
self.agent_notifiers.update(
- {q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
+ {n_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()})
self.endpoints = [l3_rpc.L3RpcCallback()]
self.conn.create_consumer(self.topic, self.endpoints,
fanout=False)
"""
return super(L3RouterPlugin, self).create_floatingip(
context, floatingip,
- initial_status=q_const.FLOATINGIP_STATUS_DOWN)
+ initial_status=n_const.FLOATINGIP_STATUS_DOWN)
from oslo_log import log as logging
from oslo_utils import excutils
-from neutron.common import constants as q_const
+from neutron.common import constants as n_const
from neutron.common import exceptions as n_exc
from neutron.db import db_base_plugin_v2
from neutron.db import extraroute_db
subnet = super(SdnveL3ServicePlugin, self).\
get_subnet(context, subnet_id)
device_filter = {'device_id': [router_id],
- 'device_owner': [q_const.DEVICE_OWNER_ROUTER_INTF],
+ 'device_owner': [n_const.DEVICE_OWNER_ROUTER_INTF],
'network_id': [subnet['network_id']]}
ports = super(SdnveL3ServicePlugin, self).get_ports(context,
filters=device_filter)
import fixtures
import six
-from neutron.common import exceptions as q_exc
+from neutron.common import exceptions as n_exc
from neutron import context
from neutron import manager
from neutron.tests import base
@property
def NotFound(self):
- return q_exc.NetworkNotFound
+ return n_exc.NetworkNotFound
def create_network(self, **kwargs):
# Supply defaults that are expected to be set by the api
# Admin request - must return both ports
self._test_list_resources('port', [port1, port2])
# Tenant_1 request - must return single port
- q_context = context.Context('', 'tenant_1')
+ n_context = context.Context('', 'tenant_1')
self._test_list_resources('port', [port1],
- neutron_context=q_context)
+ neutron_context=n_context)
# Tenant_2 request - must return single port
- q_context = context.Context('', 'tenant_2')
+ n_context = context.Context('', 'tenant_2')
self._test_list_resources('port', [port2],
- neutron_context=q_context)
+ neutron_context=n_context)
def test_list_ports_with_sort_native(self):
if self._skip_native_sorting:
@classmethod
def get_resources(cls):
"""Returns Extended Resource for dummy management."""
- q_mgr = manager.NeutronManager.get_instance()
- dummy_inst = q_mgr.get_service_plugins()['DUMMY']
+ n_mgr = manager.NeutronManager.get_instance()
+ dummy_inst = n_mgr.get_service_plugins()['DUMMY']
controller = base.create_resource(
COLLECTION_NAME, RESOURCE_NAME, dummy_inst,
RESOURCE_ATTRIBUTE_MAP[COLLECTION_NAME])
from oslo_config import cfg
-from neutron.common import utils as q_utils
+from neutron.common import utils as n_utils
from neutron.plugins.ml2.drivers.mech_sriov.agent.common import config
from neutron.plugins.ml2.drivers.mech_sriov.agent \
import sriov_nic_agent as agent
cfg.CONF.set_override('physical_device_mappings',
self.DEVICE_MAPPING_LIST,
'SRIOV_NIC')
- device_mappings = q_utils.parse_mappings(
+ device_mappings = n_utils.parse_mappings(
cfg.CONF.SRIOV_NIC.physical_device_mappings)
self.assertEqual(device_mappings, self.DEVICE_MAPPING)
cfg.CONF.set_override('physical_device_mappings',
self.DEVICE_MAPPING_WITH_ERROR_LIST,
'SRIOV_NIC')
- self.assertRaises(ValueError, q_utils.parse_mappings,
+ self.assertRaises(ValueError, n_utils.parse_mappings,
cfg.CONF.SRIOV_NIC.physical_device_mappings)
def test_device_mappings_with_spaces(self):
cfg.CONF.set_override('physical_device_mappings',
self.DEVICE_MAPPING_WITH_SPACES_LIST,
'SRIOV_NIC')
- device_mappings = q_utils.parse_mappings(
+ device_mappings = n_utils.parse_mappings(
cfg.CONF.SRIOV_NIC.physical_device_mappings)
self.assertEqual(device_mappings, self.DEVICE_MAPPING)
process_network_ports.side_effect = [
False, Exception('Fake exception to get out of the loop')]
- q_agent = self._build_agent()
+ n_agent = self._build_agent()
# Hack to test loop
# We start method and expect it will raise after 2nd loop
# If something goes wrong, assert_has_calls below will catch it
try:
- q_agent.daemon_loop()
+ n_agent.daemon_loop()
except Exception:
pass
from sqlalchemy.orm import query
from neutron.common import constants
-from neutron import context as q_context
+from neutron import context as n_context
from neutron.db import agents_db
from neutron.db import common_db_mixin
from neutron.db import db_base_plugin_v2 as db_v2
super(L3SchedulerTestCaseMixin, self).setUp(plugin=plugin_str,
ext_mgr=ext_mgr)
- self.adminContext = q_context.get_admin_context()
+ self.adminContext = n_context.get_admin_context()
self.plugin = manager.NeutronManager.get_plugin()
self.plugin.router_scheduler = importutils.import_object(
'neutron.scheduler.l3_agent_scheduler.ChanceScheduler'
plugin = 'neutron.plugins.ml2.plugin.Ml2Plugin'
self.setup_coreplugin(plugin)
super(L3DvrSchedulerTestCase, self).setUp()
- self.adminContext = q_context.get_admin_context()
+ self.adminContext = n_context.get_admin_context()
self.dut = L3DvrScheduler()
def test__notify_port_delete(self):
def test_dvr_deletens_if_no_port_no_routers(self):
# Delete a vm port, the port subnet has no router interface.
vm_tenant_id = 'tenant-1'
- my_context = q_context.Context('user-1', vm_tenant_id, is_admin=False)
+ my_context = n_context.Context('user-1', vm_tenant_id, is_admin=False)
vm_port_host = 'compute-node-1'
vm_port = self._create_port(
# A VM port is deleted, but the router can't be unscheduled from the
# compute node because there is another VM port present.
vm_tenant_id = 'tenant-1'
- my_context = q_context.Context('user-1', vm_tenant_id, is_admin=False)
+ my_context = n_context.Context('user-1', vm_tenant_id, is_admin=False)
shared_subnet_id = '80947d4a-fbc8-484b-9f92-623a6bfcf3e0',
vm_port_host = 'compute-node-1'
self.host = host
self.agent_type = agent_type
- my_context = q_context.Context('user-1', vm_tenant, is_admin=False)
+ my_context = n_context.Context('user-1', vm_tenant, is_admin=False)
shared_subnet_id = '80947d4a-fbc8-484b-9f92-623a6bfcf3e0',
vm_port_host = 'compute-node-1'
def setUp(self):
super(L3HATestCaseMixin, self).setUp()
- self.adminContext = q_context.get_admin_context()
+ self.adminContext = n_context.get_admin_context()
self.plugin = L3HAPlugin()
self.setup_coreplugin('neutron.plugins.ml2.plugin.Ml2Plugin')
super(TestGetL3AgentsWithAgentModeFilter, self).setUp()
self.plugin = L3HAPlugin()
self.setup_coreplugin('neutron.plugins.ml2.plugin.Ml2Plugin')
- self.adminContext = q_context.get_admin_context()
+ self.adminContext = n_context.get_admin_context()
hosts = ['host_1', 'host_2', 'host_3', 'host_4', 'host_5']
agent_modes = ['legacy', 'dvr_snat', 'dvr', 'fake_mode', 'legacy']
for host, agent_mode in zip(hosts, agent_modes):