raise SystemExit(msg)
self.context = context.get_admin_context_without_session()
- self.plugin_rpc = L3PluginApi(topics.PLUGIN, host)
+ self.plugin_rpc = L3PluginApi(topics.L3PLUGIN, host)
self.fullsync = True
self.updated_routers = set()
self.removed_routers = set()
if extended_attrs:
attrs.update(extended_attrs)
+ def get_alias_namespace_compatibility_map(self):
+ """Returns mappings between extension aliases and XML namespaces.
+
+ The mappings are XML namespaces that should, for backward compatibility
+ reasons, be added to the XML serialization of extended attributes.
+ This allows an established extended attribute to be provided by
+ another extension than the original one while keeping its old alias
+ in the name.
+ :return: A dictionary of extension_aliases and namespace strings.
+ """
+ return {}
+
class ActionExtensionController(wsgi.Controller):
except AttributeError:
LOG.exception(_("Error fetching extended attributes for "
"extension '%s'"), ext.get_name())
+ try:
+ comp_map = ext.get_alias_namespace_compatibility_map()
+ attributes.EXT_NSES_BC.update(comp_map)
+ except AttributeError:
+ LOG.info(_("Extension '%s' provides no backward "
+ "compatibility map for extended attributes"),
+ ext.get_name())
processed_exts.add(ext_name)
del exts_to_process[ext_name]
if len(processed_exts) == processed_ext_count:
from neutron import manager
from neutron.openstack.common import log as logging
from neutron.openstack.common.rpc import proxy
+from neutron.plugins.common import constants as service_constants
LOG = logging.getLogger(__name__)
operation, data):
"""Notify changed routers to hosting l3 agents."""
adminContext = context.is_admin and context or context.elevated()
- plugin = manager.NeutronManager.get_plugin()
+ plugin = manager.NeutronManager.get_service_plugins().get(
+ service_constants.L3_ROUTER_NAT)
for router_id in router_ids:
l3_agents = plugin.get_l3_agents_hosting_routers(
adminContext, [router_id],
def _notification(self, context, method, router_ids, operation, data):
"""Notify all the agents that are hosting the routers."""
- plugin = manager.NeutronManager.get_plugin()
+ plugin = manager.NeutronManager.get_service_plugins().get(
+ service_constants.L3_ROUTER_NAT)
+ if not plugin:
+ LOG.error(_('No plugin for L3 routing registered. Cannot notify '
+ 'agents with the message %s'), method)
+ return
if utils.is_extension_supported(
- plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
+ plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
adminContext = (context.is_admin and
context or context.elevated())
plugin.schedule_routers(adminContext, router_ids)
'extensions': 'extension'}
EXT_NSES = {}
+# Namespaces to be added for backward compatibility
+# when existing extended resource attributes are
+# provided by other extension than original one.
+EXT_NSES_BC = {}
+
def get_attr_metadata():
return {'plurals': PLURALS,
'xmlns': constants.XML_NS_V20,
- constants.EXT_NS: EXT_NSES}
+ constants.EXT_NS: EXT_NSES,
+ constants.EXT_NS_COMP: EXT_NSES_BC}
MIN_VLAN_TAG = 1
MAX_VLAN_TAG = 4094
+EXT_NS_COMP = '_backward_comp_e_ns'
EXT_NS = '_extension_ns'
XML_NS_V20 = 'http://openstack.org/quantum/api/v2.0'
XSI_NAMESPACE = "http://www.w3.org/2001/XMLSchema-instance"
AGENT = 'q-agent-notifier'
PLUGIN = 'q-plugin'
+L3PLUGIN = 'q-l3-plugin'
DHCP = 'q-dhcp-notifer'
FIREWALL_PLUGIN = 'q-firewall-plugin'
METERING_PLUGIN = 'q-metering-plugin'
from neutron.common import constants
from neutron.db import agents_db
from neutron.db import model_base
-from neutron.db import models_v2
from neutron.extensions import dhcpagentscheduler
-from neutron.extensions import l3agentscheduler
from neutron.openstack.common import log as logging
default='neutron.scheduler.'
'dhcp_agent_scheduler.ChanceScheduler',
help=_('Driver to use for scheduling network to DHCP agent')),
- cfg.StrOpt('router_scheduler_driver',
- default='neutron.scheduler.l3_agent_scheduler.ChanceScheduler',
- help=_('Driver to use for scheduling '
- 'router to a default L3 agent')),
cfg.BoolOpt('network_auto_schedule', default=True,
help=_('Allow auto scheduling networks to DHCP agent.')),
- cfg.BoolOpt('router_auto_schedule', default=True,
- help=_('Allow auto scheduling routers to L3 agent.')),
cfg.IntOpt('dhcp_agents_per_network', default=1,
help=_('Number of DHCP agents scheduled to host a network.')),
]
primary_key=True)
-class RouterL3AgentBinding(model_base.BASEV2, models_v2.HasId):
- """Represents binding between neutron routers and L3 agents."""
-
- router_id = sa.Column(sa.String(36),
- sa.ForeignKey("routers.id", ondelete='CASCADE'))
- l3_agent = orm.relation(agents_db.Agent)
- l3_agent_id = sa.Column(sa.String(36),
- sa.ForeignKey("agents.id",
- ondelete='CASCADE'))
-
-
class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
"""Common class for agent scheduler mixins."""
return result
-class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
- AgentSchedulerDbMixin):
- """Mixin class to add l3 agent scheduler extension to db_plugin_base_v2."""
-
- router_scheduler = None
-
- def add_router_to_l3_agent(self, context, id, router_id):
- """Add a l3 agent to host a router."""
- router = self.get_router(context, router_id)
- with context.session.begin(subtransactions=True):
- agent_db = self._get_agent(context, id)
- if (agent_db['agent_type'] != constants.AGENT_TYPE_L3 or
- not agent_db['admin_state_up'] or
- not self.get_l3_agent_candidates(router, [agent_db])):
- raise l3agentscheduler.InvalidL3Agent(id=id)
- query = context.session.query(RouterL3AgentBinding)
- try:
- binding = query.filter_by(router_id=router_id).one()
-
- raise l3agentscheduler.RouterHostedByL3Agent(
- router_id=router_id,
- agent_id=binding.l3_agent_id)
- except exc.NoResultFound:
- pass
-
- result = self.auto_schedule_routers(context,
- agent_db.host,
- [router_id])
- if not result:
- raise l3agentscheduler.RouterSchedulingFailed(
- router_id=router_id, agent_id=id)
-
- l3_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_L3)
- if l3_notifier:
- l3_notifier.router_added_to_agent(
- context, [router_id], agent_db.host)
-
- def remove_router_from_l3_agent(self, context, id, router_id):
- """Remove the router from l3 agent.
-
- After it, the router will be non-hosted until there is update which
- lead to re schedule or be added to another agent manually.
- """
- agent = self._get_agent(context, id)
- with context.session.begin(subtransactions=True):
- query = context.session.query(RouterL3AgentBinding)
- query = query.filter(
- RouterL3AgentBinding.router_id == router_id,
- RouterL3AgentBinding.l3_agent_id == id)
- try:
- binding = query.one()
- except exc.NoResultFound:
- raise l3agentscheduler.RouterNotHostedByL3Agent(
- router_id=router_id, agent_id=id)
- context.session.delete(binding)
- l3_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_L3)
- if l3_notifier:
- l3_notifier.router_removed_from_agent(
- context, router_id, agent.host)
-
- def list_routers_on_l3_agent(self, context, id):
- query = context.session.query(RouterL3AgentBinding.router_id)
- query = query.filter(RouterL3AgentBinding.l3_agent_id == id)
-
- router_ids = [item[0] for item in query]
- if router_ids:
- return {'routers':
- self.get_routers(context, filters={'id': router_ids})}
- else:
- return {'routers': []}
-
- def list_active_sync_routers_on_active_l3_agent(
- self, context, host, router_ids):
- agent = self._get_agent_by_type_and_host(
- context, constants.AGENT_TYPE_L3, host)
- if not agent.admin_state_up:
- return []
- query = context.session.query(RouterL3AgentBinding.router_id)
- query = query.filter(
- RouterL3AgentBinding.l3_agent_id == agent.id)
-
- if not router_ids:
- pass
- else:
- query = query.filter(
- RouterL3AgentBinding.router_id.in_(router_ids))
- router_ids = [item[0] for item in query]
- if router_ids:
- return self.get_sync_data(context, router_ids=router_ids,
- active=True)
- else:
- return []
-
- def get_l3_agents_hosting_routers(self, context, router_ids,
- admin_state_up=None,
- active=None):
- if not router_ids:
- return []
- query = context.session.query(RouterL3AgentBinding)
- if len(router_ids) > 1:
- query = query.options(joinedload('l3_agent')).filter(
- RouterL3AgentBinding.router_id.in_(router_ids))
- else:
- query = query.options(joinedload('l3_agent')).filter(
- RouterL3AgentBinding.router_id == router_ids[0])
- if admin_state_up is not None:
- query = (query.filter(agents_db.Agent.admin_state_up ==
- admin_state_up))
- l3_agents = [binding.l3_agent for binding in query]
- if active is not None:
- l3_agents = [l3_agent for l3_agent in
- l3_agents if not
- agents_db.AgentDbMixin.is_agent_down(
- l3_agent['heartbeat_timestamp'])]
- return l3_agents
-
- def _get_l3_bindings_hosting_routers(self, context, router_ids):
- if not router_ids:
- return []
- query = context.session.query(RouterL3AgentBinding)
- if len(router_ids) > 1:
- query = query.options(joinedload('l3_agent')).filter(
- RouterL3AgentBinding.router_id.in_(router_ids))
- else:
- query = query.options(joinedload('l3_agent')).filter(
- RouterL3AgentBinding.router_id == router_ids[0])
- return query.all()
-
- def list_l3_agents_hosting_router(self, context, router_id):
- with context.session.begin(subtransactions=True):
- bindings = self._get_l3_bindings_hosting_routers(
- context, [router_id])
- results = []
- for binding in bindings:
- l3_agent_dict = self._make_agent_dict(binding.l3_agent)
- results.append(l3_agent_dict)
- if results:
- return {'agents': results}
- else:
- return {'agents': []}
-
- def get_l3_agents(self, context, active=None, filters=None):
- query = context.session.query(agents_db.Agent)
- query = query.filter(
- agents_db.Agent.agent_type == constants.AGENT_TYPE_L3)
- if active is not None:
- query = (query.filter(agents_db.Agent.admin_state_up == active))
- if filters:
- for key, value in filters.iteritems():
- column = getattr(agents_db.Agent, key, None)
- if column:
- query = query.filter(column.in_(value))
-
- return [l3_agent
- for l3_agent in query
- if AgentSchedulerDbMixin.is_eligible_agent(active, l3_agent)]
-
- def get_l3_agent_candidates(self, sync_router, l3_agents):
- """Get the valid l3 agents for the router from a list of l3_agents."""
- candidates = []
- for l3_agent in l3_agents:
- if not l3_agent.admin_state_up:
- continue
- agent_conf = self.get_configuration_dict(l3_agent)
- router_id = agent_conf.get('router_id', None)
- use_namespaces = agent_conf.get('use_namespaces', True)
- handle_internal_only_routers = agent_conf.get(
- 'handle_internal_only_routers', True)
- gateway_external_network_id = agent_conf.get(
- 'gateway_external_network_id', None)
- if not use_namespaces and router_id != sync_router['id']:
- continue
- ex_net_id = (sync_router['external_gateway_info'] or {}).get(
- 'network_id')
- if ((not ex_net_id and not handle_internal_only_routers) or
- (ex_net_id and gateway_external_network_id and
- ex_net_id != gateway_external_network_id)):
- continue
- candidates.append(l3_agent)
- return candidates
-
- def auto_schedule_routers(self, context, host, router_ids):
- if self.router_scheduler:
- return self.router_scheduler.auto_schedule_routers(
- self, context, host, router_ids)
-
- def schedule_router(self, context, router):
- if self.router_scheduler:
- return self.router_scheduler.schedule(
- self, context, router)
-
- def schedule_routers(self, context, routers):
- """Schedule the routers to l3 agents."""
- for router in routers:
- self.schedule_router(context, router)
-
-
class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
.DhcpAgentSchedulerPluginBase,
AgentSchedulerDbMixin):
# from this class should be invoked
_model_query_hooks = {}
+ # This dictionary will store methods for extending attributes of
+ # api resources. Mixins can use this dict for adding their own methods
+ # TODO(salvatore-orlando): Avoid using class-level variables
+ _dict_extend_functions = {}
+
@classmethod
def register_model_query_hook(cls, model, name, query_hook, filter_hook,
result_filters=None):
__native_pagination_support = True
__native_sorting_support = True
- # This dictionary will store methods for extending attributes of
- # api resources. Mixins can use this dict for adding their own methods
- # TODO(salvatore-orlando): Avoid using class-level variables
- _dict_extend_functions = {}
-
def __init__(self):
# NOTE(jkoelker) This is an incomplete implementation. Subclasses
# must override __init__ and setup the database
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import sqlalchemy as sa
+from sqlalchemy import orm
+from sqlalchemy.orm import exc
+from sqlalchemy.sql import expression as expr
+
+from neutron.api.v2 import attributes
+from neutron.common import constants as l3_constants
+from neutron.common import exceptions as q_exc
+from neutron.db import db_base_plugin_v2
+from neutron.db import model_base
+from neutron.db import models_v2
+from neutron.extensions import external_net
+
+
+DEVICE_OWNER_ROUTER_GW = l3_constants.DEVICE_OWNER_ROUTER_GW
+
+
+class ExternalNetwork(model_base.BASEV2):
+ network_id = sa.Column(sa.String(36),
+ sa.ForeignKey('networks.id', ondelete="CASCADE"),
+ primary_key=True)
+
+ # Add a relationship to the Network model in order to instruct
+ # SQLAlchemy to eagerly load this association
+ network = orm.relationship(
+ models_v2.Network,
+ backref=orm.backref("external", lazy='joined',
+ uselist=False, cascade='delete'))
+
+
+class External_net_db_mixin(object):
+ """Mixin class to add external network methods to db_plugin_base_v2."""
+
+ def _network_model_hook(self, context, original_model, query):
+ query = query.outerjoin(ExternalNetwork,
+ (original_model.id ==
+ ExternalNetwork.network_id))
+ return query
+
+ def _network_filter_hook(self, context, original_model, conditions):
+ if conditions is not None and not hasattr(conditions, '__iter__'):
+ conditions = (conditions, )
+ # Apply the external network filter only in non-admin context
+ if not context.is_admin and hasattr(original_model, 'tenant_id'):
+ conditions = expr.or_(ExternalNetwork.network_id != expr.null(),
+ *conditions)
+ return conditions
+
+ def _network_result_filter_hook(self, query, filters):
+ vals = filters and filters.get(external_net.EXTERNAL, [])
+ if not vals:
+ return query
+ if vals[0]:
+ return query.filter((ExternalNetwork.network_id != expr.null()))
+ return query.filter((ExternalNetwork.network_id == expr.null()))
+
+ # TODO(salvatore-orlando): Perform this operation without explicitly
+ # referring to db_base_plugin_v2, as plugins that do not extend from it
+ # might exist in the future
+ db_base_plugin_v2.NeutronDbPluginV2.register_model_query_hook(
+ models_v2.Network,
+ "external_net",
+ '_network_model_hook',
+ '_network_filter_hook',
+ '_network_result_filter_hook')
+
+ def _network_is_external(self, context, net_id):
+ try:
+ context.session.query(ExternalNetwork).filter_by(
+ network_id=net_id).one()
+ return True
+ except exc.NoResultFound:
+ return False
+
+ def _extend_network_dict_l3(self, network_res, network_db):
+ # Comparing with None for converting uuid into bool
+ network_res[external_net.EXTERNAL] = network_db.external is not None
+ return network_res
+
+ # Register dict extend functions for networks
+ db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
+ attributes.NETWORKS, ['_extend_network_dict_l3'])
+
+ def _process_l3_create(self, context, net_data, req_data):
+ external = req_data.get(external_net.EXTERNAL)
+ external_set = attributes.is_attr_set(external)
+
+ if not external_set:
+ return
+
+ if external:
+ # expects to be called within a plugin's session
+ context.session.add(ExternalNetwork(network_id=net_data['id']))
+ net_data[external_net.EXTERNAL] = external
+
+ def _process_l3_update(self, context, net_data, req_data):
+
+ new_value = req_data.get(external_net.EXTERNAL)
+ net_id = net_data['id']
+ if not attributes.is_attr_set(new_value):
+ return
+
+ if net_data.get(external_net.EXTERNAL) == new_value:
+ return
+
+ if new_value:
+ context.session.add(ExternalNetwork(network_id=net_id))
+ net_data[external_net.EXTERNAL] = True
+ else:
+ # must make sure we do not have any external gateway ports
+ # (and thus, possible floating IPs) on this network before
+ # allow it to be update to external=False
+ port = context.session.query(models_v2.Port).filter_by(
+ device_owner=DEVICE_OWNER_ROUTER_GW,
+ network_id=net_data['id']).first()
+ if port:
+ raise external_net.ExternalNetworkInUse(net_id=net_id)
+
+ context.session.query(ExternalNetwork).filter_by(
+ network_id=net_id).delete()
+ net_data[external_net.EXTERNAL] = False
+
+ def _filter_nets_l3(self, context, nets, filters):
+ vals = filters and filters.get(external_net.EXTERNAL, [])
+ if not vals:
+ return nets
+
+ ext_nets = set(en['network_id']
+ for en in context.session.query(ExternalNetwork))
+ if vals[0]:
+ return [n for n in nets if n['id'] in ext_nets]
+ else:
+ return [n for n in nets if n['id'] not in ext_nets]
+
+ def get_external_network_id(self, context):
+ nets = self.get_networks(context, {external_net.EXTERNAL: [True]})
+ if len(nets) > 1:
+ raise q_exc.TooManyExternalNetworks()
+ else:
+ return nets[0]['id'] if nets else None
# nexthop belongs to one of cidrs of the router ports
cidrs = []
for port in ports:
- cidrs += [self._get_subnet(context,
- ip['subnet_id'])['cidr']
+ cidrs += [self._core_plugin._get_subnet(context,
+ ip['subnet_id'])['cidr']
for ip in port['fixed_ips']]
if not netaddr.all_matching_cidrs(nexthop, cidrs):
raise extraroute.InvalidRoutes(
quota=cfg.CONF.max_routes)
filters = {'device_id': [router_id]}
- ports = self.get_ports(context, filters)
+ ports = self._core_plugin.get_ports(context, filters)
for route in routes:
self._validate_routes_nexthop(
context, ports, routes, route['nexthop'])
subnet_id):
super(ExtraRoute_db_mixin, self)._confirm_router_interface_not_in_use(
context, router_id, subnet_id)
- subnet_db = self._get_subnet(context, subnet_id)
+ subnet_db = self._core_plugin._get_subnet(context, subnet_id)
subnet_cidr = netaddr.IPNetwork(subnet_db['cidr'])
extra_routes = self._get_extra_routes_by_router_id(context, router_id)
for route in extra_routes:
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo.config import cfg
+import sqlalchemy as sa
+from sqlalchemy import orm
+from sqlalchemy.orm import exc
+from sqlalchemy.orm import joinedload
+
+from neutron.common import constants
+from neutron.db import agents_db
+from neutron.db.agentschedulers_db import AgentSchedulerDbMixin
+from neutron.db import model_base
+from neutron.db import models_v2
+from neutron.extensions import l3agentscheduler
+
+
+L3_AGENTS_SCHEDULER_OPTS = [
+ cfg.StrOpt('router_scheduler_driver',
+ default='neutron.scheduler.l3_agent_scheduler.ChanceScheduler',
+ help=_('Driver to use for scheduling '
+ 'router to a default L3 agent')),
+ cfg.BoolOpt('router_auto_schedule', default=True,
+ help=_('Allow auto scheduling of routers to L3 agent.')),
+]
+
+cfg.CONF.register_opts(L3_AGENTS_SCHEDULER_OPTS)
+
+
+class RouterL3AgentBinding(model_base.BASEV2, models_v2.HasId):
+ """Represents binding between neutron routers and L3 agents."""
+
+ router_id = sa.Column(sa.String(36),
+ sa.ForeignKey("routers.id", ondelete='CASCADE'))
+ l3_agent = orm.relation(agents_db.Agent)
+ l3_agent_id = sa.Column(sa.String(36),
+ sa.ForeignKey("agents.id",
+ ondelete='CASCADE'))
+
+
+class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
+ AgentSchedulerDbMixin):
+ """Mixin class to add l3 agent scheduler extension to plugins
+ using the l3 agent for routing.
+ """
+
+ router_scheduler = None
+
+ def add_router_to_l3_agent(self, context, agent_id, router_id):
+ """Add a l3 agent to host a router."""
+ router = self.get_router(context, router_id)
+ with context.session.begin(subtransactions=True):
+ agent_db = self._get_agent(context, agent_id)
+ if (agent_db['agent_type'] != constants.AGENT_TYPE_L3 or
+ not agent_db['admin_state_up'] or
+ not self.get_l3_agent_candidates(router, [agent_db])):
+ raise l3agentscheduler.InvalidL3Agent(id=agent_id)
+ query = context.session.query(RouterL3AgentBinding)
+ try:
+ binding = query.filter_by(router_id=router_id).one()
+
+ raise l3agentscheduler.RouterHostedByL3Agent(
+ router_id=router_id,
+ agent_id=binding.l3_agent_id)
+ except exc.NoResultFound:
+ pass
+
+ result = self.auto_schedule_routers(context,
+ agent_db.host,
+ [router_id])
+ if not result:
+ raise l3agentscheduler.RouterSchedulingFailed(
+ router_id=router_id, agent_id=agent_id)
+
+ l3_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_L3)
+ if l3_notifier:
+ l3_notifier.router_added_to_agent(
+ context, [router_id], agent_db.host)
+
+ def remove_router_from_l3_agent(self, context, agent_id, router_id):
+ """Remove the router from l3 agent.
+
+ After removal, the router will be non-hosted until there is update
+ which leads to re-schedule or be added to another agent manually.
+ """
+ agent = self._get_agent(context, agent_id)
+ with context.session.begin(subtransactions=True):
+ query = context.session.query(RouterL3AgentBinding)
+ query = query.filter(
+ RouterL3AgentBinding.router_id == router_id,
+ RouterL3AgentBinding.l3_agent_id == agent_id)
+ try:
+ binding = query.one()
+ except exc.NoResultFound:
+ raise l3agentscheduler.RouterNotHostedByL3Agent(
+ router_id=router_id, agent_id=agent_id)
+ context.session.delete(binding)
+ l3_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_L3)
+ if l3_notifier:
+ l3_notifier.router_removed_from_agent(
+ context, router_id, agent.host)
+
+ def list_routers_on_l3_agent(self, context, agent_id):
+ query = context.session.query(RouterL3AgentBinding.router_id)
+ query = query.filter(RouterL3AgentBinding.l3_agent_id == agent_id)
+
+ router_ids = [item[0] for item in query]
+ if router_ids:
+ return {'routers':
+ self.get_routers(context, filters={'id': router_ids})}
+ else:
+ return {'routers': []}
+
+ def list_active_sync_routers_on_active_l3_agent(
+ self, context, host, router_ids):
+ agent = self._get_agent_by_type_and_host(
+ context, constants.AGENT_TYPE_L3, host)
+ if not agent.admin_state_up:
+ return []
+ query = context.session.query(RouterL3AgentBinding.router_id)
+ query = query.filter(
+ RouterL3AgentBinding.l3_agent_id == agent.id)
+
+ if not router_ids:
+ pass
+ else:
+ query = query.filter(
+ RouterL3AgentBinding.router_id.in_(router_ids))
+ router_ids = [item[0] for item in query]
+ if router_ids:
+ return self.get_sync_data(context, router_ids=router_ids,
+ active=True)
+ else:
+ return []
+
+ def get_l3_agents_hosting_routers(self, context, router_ids,
+ admin_state_up=None,
+ active=None):
+ if not router_ids:
+ return []
+ query = context.session.query(RouterL3AgentBinding)
+ if len(router_ids) > 1:
+ query = query.options(joinedload('l3_agent')).filter(
+ RouterL3AgentBinding.router_id.in_(router_ids))
+ else:
+ query = query.options(joinedload('l3_agent')).filter(
+ RouterL3AgentBinding.router_id == router_ids[0])
+ if admin_state_up is not None:
+ query = (query.filter(agents_db.Agent.admin_state_up ==
+ admin_state_up))
+ l3_agents = [binding.l3_agent for binding in query]
+ if active is not None:
+ l3_agents = [l3_agent for l3_agent in
+ l3_agents if not
+ agents_db.AgentDbMixin.is_agent_down(
+ l3_agent['heartbeat_timestamp'])]
+ return l3_agents
+
+ def _get_l3_bindings_hosting_routers(self, context, router_ids):
+ if not router_ids:
+ return []
+ query = context.session.query(RouterL3AgentBinding)
+ if len(router_ids) > 1:
+ query = query.options(joinedload('l3_agent')).filter(
+ RouterL3AgentBinding.router_id.in_(router_ids))
+ else:
+ query = query.options(joinedload('l3_agent')).filter(
+ RouterL3AgentBinding.router_id == router_ids[0])
+ return query.all()
+
+ def list_l3_agents_hosting_router(self, context, router_id):
+ with context.session.begin(subtransactions=True):
+ bindings = self._get_l3_bindings_hosting_routers(
+ context, [router_id])
+ results = []
+ for binding in bindings:
+ l3_agent_dict = self._make_agent_dict(binding.l3_agent)
+ results.append(l3_agent_dict)
+ if results:
+ return {'agents': results}
+ else:
+ return {'agents': []}
+
+ def get_l3_agents(self, context, active=None, filters=None):
+ query = context.session.query(agents_db.Agent)
+ query = query.filter(
+ agents_db.Agent.agent_type == constants.AGENT_TYPE_L3)
+ if active is not None:
+ query = (query.filter(agents_db.Agent.admin_state_up == active))
+ if filters:
+ for key, value in filters.iteritems():
+ column = getattr(agents_db.Agent, key, None)
+ if column:
+ query = query.filter(column.in_(value))
+
+ return [l3_agent
+ for l3_agent in query
+ if AgentSchedulerDbMixin.is_eligible_agent(active, l3_agent)]
+
+ def get_l3_agent_candidates(self, sync_router, l3_agents):
+ """Get the valid l3 agents for the router from a list of l3_agents."""
+ candidates = []
+ for l3_agent in l3_agents:
+ if not l3_agent.admin_state_up:
+ continue
+ agent_conf = self.get_configuration_dict(l3_agent)
+ router_id = agent_conf.get('router_id', None)
+ use_namespaces = agent_conf.get('use_namespaces', True)
+ handle_internal_only_routers = agent_conf.get(
+ 'handle_internal_only_routers', True)
+ gateway_external_network_id = agent_conf.get(
+ 'gateway_external_network_id', None)
+ if not use_namespaces and router_id != sync_router['id']:
+ continue
+ ex_net_id = (sync_router['external_gateway_info'] or {}).get(
+ 'network_id')
+ if ((not ex_net_id and not handle_internal_only_routers) or
+ (ex_net_id and gateway_external_network_id and
+ ex_net_id != gateway_external_network_id)):
+ continue
+ candidates.append(l3_agent)
+ return candidates
+
+ def auto_schedule_routers(self, context, host, router_ids):
+ if self.router_scheduler:
+ return self.router_scheduler.auto_schedule_routers(
+ self, context, host, router_ids)
+
+ def schedule_router(self, context, router):
+ if self.router_scheduler:
+ return self.router_scheduler.schedule(
+ self, context, router)
+
+ def schedule_routers(self, context, routers):
+ """Schedule the routers to l3 agents."""
+ for router in routers:
+ self.schedule_router(context, router)
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.orm import exc
-from sqlalchemy.sql import expression as expr
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.common import constants as l3_constants
from neutron.common import exceptions as q_exc
-from neutron.db import db_base_plugin_v2
from neutron.db import model_base
from neutron.db import models_v2
from neutron.extensions import l3
+from neutron import manager
from neutron.openstack.common import log as logging
from neutron.openstack.common.notifier import api as notifier_api
from neutron.openstack.common import uuidutils
gw_port = orm.relationship(models_v2.Port)
-class ExternalNetwork(model_base.BASEV2):
- network_id = sa.Column(sa.String(36),
- sa.ForeignKey('networks.id', ondelete="CASCADE"),
- primary_key=True)
-
- # Add a relationship to the Network model in order to instruct
- # SQLAlchemy to eagerly load this association
- network = orm.relationship(
- models_v2.Network,
- backref=orm.backref("external", lazy='joined',
- uselist=False, cascade='delete'))
-
-
class FloatingIP(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
"""Represents a floating IP address.
l3_rpc_notifier = l3_rpc_agent_api.L3AgentNotify
- def _network_model_hook(self, context, original_model, query):
- query = query.outerjoin(ExternalNetwork,
- (original_model.id ==
- ExternalNetwork.network_id))
- return query
-
- def _network_filter_hook(self, context, original_model, conditions):
- if conditions is not None and not hasattr(conditions, '__iter__'):
- conditions = (conditions, )
- # Apply the external network filter only in non-admin context
- if not context.is_admin and hasattr(original_model, 'tenant_id'):
- conditions = expr.or_(ExternalNetwork.network_id != expr.null(),
- *conditions)
- return conditions
-
- def _network_result_filter_hook(self, query, filters):
- vals = filters and filters.get('router:external', [])
- if not vals:
- return query
- if vals[0]:
- return query.filter((ExternalNetwork.network_id != expr.null()))
- return query.filter((ExternalNetwork.network_id == expr.null()))
-
- # TODO(salvatore-orlando): Perform this operation without explicitly
- # referring to db_base_plugin_v2, as plugins that do not extend from it
- # might exist in the future
- db_base_plugin_v2.NeutronDbPluginV2.register_model_query_hook(
- models_v2.Network,
- "external_net",
- '_network_model_hook',
- '_network_filter_hook',
- '_network_result_filter_hook')
+ @property
+ def _core_plugin(self):
+ return manager.NeutronManager.get_plugin()
def _get_router(self, context, id):
try:
def _create_router_gw_port(self, context, router, network_id):
# Port has no 'tenant-id', as it is hidden from user
- gw_port = self.create_port(context.elevated(), {
+ gw_port = self._core_plugin.create_port(context.elevated(), {
'port': {'tenant_id': '', # intentionally not set
'network_id': network_id,
'mac_address': attributes.ATTR_NOT_SPECIFIED,
'name': ''}})
if not gw_port['fixed_ips']:
- self.delete_port(context.elevated(), gw_port['id'],
- l3_port_check=False)
+ self._core_plugin.delete_port(context.elevated(), gw_port['id'],
+ l3_port_check=False)
msg = (_('No IPs available for external network %s') %
network_id)
raise q_exc.BadRequest(resource='router', msg=msg)
with context.session.begin(subtransactions=True):
- router.gw_port = self._get_port(context.elevated(),
- gw_port['id'])
+ router.gw_port = self._core_plugin._get_port(context.elevated(),
+ gw_port['id'])
context.session.add(router)
def _update_router_gw_info(self, context, router_id, info, router=None):
# network_id attribute is required by API, so it must be present
network_id = info['network_id'] if info else None
if network_id:
- network_db = self._get_network(context, network_id)
+ network_db = self._core_plugin._get_network(context, network_id)
if not network_db.external:
msg = _("Network %s is not a valid external "
"network") % network_id
with context.session.begin(subtransactions=True):
router.gw_port = None
context.session.add(router)
- self.delete_port(context.elevated(), gw_port['id'],
- l3_port_check=False)
+ self._core_plugin.delete_port(context.elevated(),
+ gw_port['id'],
+ l3_port_check=False)
if network_id is not None and (gw_port is None or
gw_port['network_id'] != network_id):
- subnets = self._get_subnets_by_network(context,
- network_id)
+ subnets = self._core_plugin._get_subnets_by_network(context,
+ network_id)
for subnet in subnets:
self._check_for_dup_router_subnet(context, router_id,
network_id, subnet['id'],
device_filter = {'device_id': [id],
'device_owner': [DEVICE_OWNER_ROUTER_INTF]}
- ports = self.get_ports_count(context.elevated(),
- filters=device_filter)
+ ports = self._core_plugin.get_ports_count(context.elevated(),
+ filters=device_filter)
if ports:
raise l3.RouterInUse(router_id=id)
# delete any gw port
device_filter = {'device_id': [id],
'device_owner': [DEVICE_OWNER_ROUTER_GW]}
- ports = self.get_ports(context.elevated(), filters=device_filter)
+ ports = self._core_plugin.get_ports(context.elevated(),
+ filters=device_filter)
if ports:
- self._delete_port(context.elevated(), ports[0]['id'])
+ self._core_plugin._delete_port(context.elevated(),
+ ports[0]['id'])
context.session.delete(router)
self.l3_rpc_notifier.router_deleted(context, id)
% subnet_id)
raise q_exc.BadRequest(resource='router', msg=msg)
sub_id = ip['subnet_id']
- cidr = self._get_subnet(context.elevated(),
- sub_id)['cidr']
+ cidr = self._core_plugin._get_subnet(context.elevated(),
+ sub_id)['cidr']
ipnet = netaddr.IPNetwork(cidr)
match1 = netaddr.all_matching_cidrs(new_ipnet, [cidr])
match2 = netaddr.all_matching_cidrs(ipnet, [subnet_cidr])
msg = _("Cannot specify both subnet-id and port-id")
raise q_exc.BadRequest(resource='router', msg=msg)
- port = self._get_port(context, interface_info['port_id'])
+ port = self._core_plugin._get_port(context,
+ interface_info['port_id'])
if port['device_id']:
raise q_exc.PortInUse(net_id=port['network_id'],
port_id=port['id'],
msg = _('Router port must have exactly one fixed IP')
raise q_exc.BadRequest(resource='router', msg=msg)
subnet_id = fixed_ips[0]['subnet_id']
- subnet = self._get_subnet(context, subnet_id)
+ subnet = self._core_plugin._get_subnet(context, subnet_id)
self._check_for_dup_router_subnet(context, router_id,
port['network_id'],
subnet['id'],
'device_owner': DEVICE_OWNER_ROUTER_INTF})
elif 'subnet_id' in interface_info:
subnet_id = interface_info['subnet_id']
- subnet = self._get_subnet(context, subnet_id)
+ subnet = self._core_plugin._get_subnet(context, subnet_id)
# Ensure the subnet has a gateway
if not subnet['gateway_ip']:
msg = _('Subnet for router interface must have a gateway IP')
subnet['cidr'])
fixed_ip = {'ip_address': subnet['gateway_ip'],
'subnet_id': subnet['id']}
- port = self.create_port(context, {
+ port = self._core_plugin.create_port(context, {
'port':
{'tenant_id': subnet['tenant_id'],
'network_id': subnet['network_id'],
def _confirm_router_interface_not_in_use(self, context, router_id,
subnet_id):
- subnet_db = self._get_subnet(context, subnet_id)
+ subnet_db = self._core_plugin._get_subnet(context, subnet_id)
subnet_cidr = netaddr.IPNetwork(subnet_db['cidr'])
fip_qry = context.session.query(FloatingIP)
for fip_db in fip_qry.filter_by(router_id=router_id):
raise q_exc.BadRequest(resource='router', msg=msg)
if 'port_id' in interface_info:
port_id = interface_info['port_id']
- port_db = self._get_port(context, port_id)
+ port_db = self._core_plugin._get_port(context, port_id)
if not (port_db['device_owner'] == DEVICE_OWNER_ROUTER_INTF and
port_db['device_id'] == router_id):
raise l3.RouterInterfaceNotFound(router_id=router_id,
port_id=port_id,
subnet_id=interface_info['subnet_id'])
subnet_id = port_db['fixed_ips'][0]['subnet_id']
- subnet = self._get_subnet(context, subnet_id)
+ subnet = self._core_plugin._get_subnet(context, subnet_id)
self._confirm_router_interface_not_in_use(
context, router_id, subnet_id)
- self.delete_port(context, port_db['id'], l3_port_check=False)
+ self._core_plugin.delete_port(context, port_db['id'],
+ l3_port_check=False)
elif 'subnet_id' in interface_info:
subnet_id = interface_info['subnet_id']
self._confirm_router_interface_not_in_use(context, router_id,
subnet_id)
- subnet = self._get_subnet(context, subnet_id)
+ subnet = self._core_plugin._get_subnet(context, subnet_id)
found = False
try:
for p in ports:
if p['fixed_ips'][0]['subnet_id'] == subnet_id:
port_id = p['id']
- self.delete_port(context, p['id'], l3_port_check=False)
+ self._core_plugin.delete_port(context, p['id'],
+ l3_port_check=False)
found = True
break
except exc.NoResultFound:
def _get_router_for_floatingip(self, context, internal_port,
internal_subnet_id,
external_network_id):
- subnet_db = self._get_subnet(context, internal_subnet_id)
+ subnet_db = self._core_plugin._get_subnet(context,
+ internal_subnet_id)
if not subnet_db['gateway_ip']:
msg = (_('Cannot add floating IP to port on subnet %s '
'which has no gateway_ip') % internal_subnet_id)
Retrieve information concerning the internal port where
the floating IP should be associated to.
"""
- internal_port = self._get_port(context, fip['port_id'])
+ internal_port = self._core_plugin._get_port(context, fip['port_id'])
if not internal_port['tenant_id'] == fip['tenant_id']:
port_id = fip['port_id']
if 'id' in fip:
fip_id = uuidutils.generate_uuid()
f_net_id = fip['floating_network_id']
- if not self._network_is_external(context, f_net_id):
+ if not self._core_plugin._network_is_external(context, f_net_id):
msg = _("Network %s is not a valid external network") % f_net_id
raise q_exc.BadRequest(resource='floatingip', msg=msg)
# This external port is never exposed to the tenant.
# it is used purely for internal system and admin use when
# managing floating IPs.
- external_port = self.create_port(context.elevated(), {
+ external_port = self._core_plugin.create_port(context.elevated(), {
'port':
{'tenant_id': '', # tenant intentionally not set
'network_id': f_net_id,
fip_port_id = floatingip_db['floating_port_id']
before_router_id = floatingip_db['router_id']
self._update_fip_assoc(context, fip, floatingip_db,
- self.get_port(context.elevated(),
- fip_port_id))
+ self._core_plugin.get_port(
+ context.elevated(), fip_port_id))
router_ids = []
if before_router_id:
router_ids.append(before_router_id)
router_id = floatingip['router_id']
with context.session.begin(subtransactions=True):
context.session.delete(floatingip)
- self.delete_port(context.elevated(),
- floatingip['floating_port_id'],
- l3_port_check=False)
+ self._core_plugin.delete_port(context.elevated(),
+ floatingip['floating_port_id'],
+ l3_port_check=False)
if router_id:
self.l3_rpc_notifier.routers_updated(
context, [router_id],
to /ports, but rather via other API calls that perform the proper
deletion checks.
"""
- port_db = self._get_port(context, port_id)
+ port_db = self._core_plugin._get_port(context, port_id)
if port_db['device_owner'] in [DEVICE_OWNER_ROUTER_INTF,
DEVICE_OWNER_ROUTER_GW,
DEVICE_OWNER_FLOATINGIP]:
self.l3_rpc_notifier.routers_updated(
context, [router_id])
- def _network_is_external(self, context, net_id):
- try:
- context.session.query(ExternalNetwork).filter_by(
- network_id=net_id).one()
- return True
- except exc.NoResultFound:
- return False
-
- def _extend_network_dict_l3(self, network_res, network_db):
- # Comparing with None for converting uuid into bool
- network_res[l3.EXTERNAL] = network_db.external is not None
- return network_res
-
- # Register dict extend functions for networks
- db_base_plugin_v2.NeutronDbPluginV2.register_dict_extend_funcs(
- attributes.NETWORKS, ['_extend_network_dict_l3'])
-
- def _process_l3_create(self, context, net_data, req_data):
- external = req_data.get(l3.EXTERNAL)
- external_set = attributes.is_attr_set(external)
-
- if not external_set:
- return
-
- if external:
- # expects to be called within a plugin's session
- context.session.add(ExternalNetwork(network_id=net_data['id']))
- net_data[l3.EXTERNAL] = external
-
- def _process_l3_update(self, context, net_data, req_data):
-
- new_value = req_data.get(l3.EXTERNAL)
- net_id = net_data['id']
- if not attributes.is_attr_set(new_value):
- return
-
- if net_data.get(l3.EXTERNAL) == new_value:
- return
-
- if new_value:
- context.session.add(ExternalNetwork(network_id=net_id))
- net_data[l3.EXTERNAL] = True
- else:
- # must make sure we do not have any external gateway ports
- # (and thus, possible floating IPs) on this network before
- # allow it to be update to external=False
- port = context.session.query(models_v2.Port).filter_by(
- device_owner=DEVICE_OWNER_ROUTER_GW,
- network_id=net_data['id']).first()
- if port:
- raise l3.ExternalNetworkInUse(net_id=net_id)
-
- context.session.query(ExternalNetwork).filter_by(
- network_id=net_id).delete()
- net_data[l3.EXTERNAL] = False
-
- def _filter_nets_l3(self, context, nets, filters):
- vals = filters and filters.get('router:external', [])
- if not vals:
- return nets
-
- ext_nets = set(en['network_id']
- for en in context.session.query(ExternalNetwork))
- if vals[0]:
- return [n for n in nets if n['id'] in ext_nets]
- else:
- return [n for n in nets if n['id'] not in ext_nets]
-
def _build_routers_list(self, routers, gw_ports):
gw_port_id_gw_port_dict = dict((gw_port['id'], gw_port)
for gw_port in gw_ports)
if not gw_port_ids:
return []
filters = {'id': gw_port_ids}
- gw_ports = self.get_ports(context, filters)
+ gw_ports = self._core_plugin.get_ports(context, filters)
if gw_ports:
self._populate_subnet_for_ports(context, gw_ports)
return gw_ports
return []
filters = {'device_id': router_ids,
'device_owner': [device_owner]}
- interfaces = self.get_ports(context, filters)
+ interfaces = self._core_plugin.get_ports(context, filters)
if interfaces:
self._populate_subnet_for_ports(context, interfaces)
return interfaces
return
filters = {'id': subnet_id_ports_dict.keys()}
fields = ['id', 'cidr', 'gateway_ip']
- subnet_dicts = self.get_subnets(context, filters, fields)
+ subnet_dicts = self._core_plugin.get_subnets(context, filters, fields)
for subnet_dict in subnet_dicts:
ports = subnet_id_ports_dict.get(subnet_dict['id'], [])
for port in ports:
floating_ips = self._get_sync_floating_ips(context, router_ids)
interfaces = self.get_sync_interfaces(context, router_ids)
return self._process_sync_data(routers, interfaces, floating_ips)
-
- def get_external_network_id(self, context):
- nets = self.get_networks(context, {'router:external': [True]})
- if len(nets) > 1:
- raise q_exc.TooManyExternalNetworks()
- else:
- return nets[0]['id'] if nets else None
from neutron import manager
from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
+from neutron.plugins.common import constants as plugin_constants
LOG = logging.getLogger(__name__)
router_ids = kwargs.get('router_ids')
host = kwargs.get('host')
context = neutron_context.get_admin_context()
- plugin = manager.NeutronManager.get_plugin()
- if utils.is_extension_supported(
- plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
+ l3plugin = manager.NeutronManager.get_service_plugins()[
+ plugin_constants.L3_ROUTER_NAT]
+ if not l3plugin:
+ routers = {}
+ LOG.error(_('No plugin for L3 routing registered! Will reply '
+ 'to l3 agent with empty router dictionary.'))
+ elif utils.is_extension_supported(
+ l3plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
if cfg.CONF.router_auto_schedule:
- plugin.auto_schedule_routers(context, host, router_ids)
- routers = plugin.list_active_sync_routers_on_active_l3_agent(
+ l3plugin.auto_schedule_routers(context, host, router_ids)
+ routers = l3plugin.list_active_sync_routers_on_active_l3_agent(
context, host, router_ids)
else:
- routers = plugin.get_sync_data(context, router_ids)
+ routers = l3plugin.get_sync_data(context, router_ids)
+ plugin = manager.NeutronManager.get_plugin()
if utils.is_extension_supported(
plugin, constants.PORT_BINDING_EXT_ALIAS):
self._ensure_host_set_on_ports(context, plugin, host, routers)
from sqlalchemy.orm import exc
from neutron.common import constants as n_constants
-from neutron.db import agentschedulers_db as agent_db
from neutron.db import api as qdbapi
from neutron.db import db_base_plugin_v2 as base_db
+from neutron.db import l3_agentschedulers_db as l3_agent_db
from neutron.db import l3_db
from neutron.db import model_base
from neutron.db import models_v2
query = query.join(IKEPolicy)
query = query.join(IPsecPolicy)
query = query.join(IPsecPeerCidr)
- query = query.join(agent_db.RouterL3AgentBinding,
- agent_db.RouterL3AgentBinding.router_id ==
+ query = query.join(l3_agent_db.RouterL3AgentBinding,
+ l3_agent_db.RouterL3AgentBinding.router_id ==
VPNService.router_id)
query = query.filter(
- agent_db.RouterL3AgentBinding.l3_agent_id == agent.id)
+ l3_agent_db.RouterL3AgentBinding.l3_agent_id == agent.id)
return query
def update_status_by_agent(self, context, service_status_info_list):
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.api import extensions
+from neutron.api.v2 import attributes as attr
+from neutron.common import exceptions as qexception
+from neutron.extensions import l3
+
+
+class ExternalNetworkInUse(qexception.InUse):
+ message = _("External network %(net_id)s cannot be updated to be made "
+ "non-external, since it has existing gateway ports")
+
+
+# For backward compatibility the 'router' prefix is kept.
+EXTERNAL = 'router:external'
+EXTENDED_ATTRIBUTES_2_0 = {
+ 'networks': {EXTERNAL: {'allow_post': True,
+ 'allow_put': True,
+ 'default': attr.ATTR_NOT_SPECIFIED,
+ 'is_visible': True,
+ 'convert_to': attr.convert_to_boolean,
+ 'enforce_policy': True,
+ 'required_by_policy': True}}}
+
+
+class External_net(extensions.ExtensionDescriptor):
+
+ @classmethod
+ def get_name(cls):
+ return "Neutron external network"
+
+ @classmethod
+ def get_alias(cls):
+ return "external-net"
+
+ @classmethod
+ def get_description(cls):
+ return _("Adds external network attribute to network resource.")
+
+ @classmethod
+ def get_namespace(cls):
+ return "http://docs.openstack.org/ext/neutron/external_net/api/v1.0"
+
+ @classmethod
+ def get_updated(cls):
+ return "2013-01-14T10:00:00-00:00"
+
+ def get_extended_resources(self, version):
+ if version == "2.0":
+ return EXTENDED_ATTRIBUTES_2_0
+ else:
+ return {}
+
+ def get_alias_namespace_compatibility_map(self):
+ return {l3.L3.get_alias(): l3.L3.get_namespace()}
from neutron.api.v2 import base
from neutron.common import exceptions as qexception
from neutron import manager
+from neutron.plugins.common import constants
from neutron import quota
" cannot be deleted directly via the port API.")
-class ExternalNetworkInUse(qexception.InUse):
- message = _("External network %(net_id)s cannot be updated to be made "
- "non-external, since it has existing gateway ports")
-
-
class RouterExternalGatewayInUseByFloatingIp(qexception.InUse):
message = _("Gateway cannot be updated for router %(router_id)s, since a "
"gateway to external network %(net_id)s is required by one or "
},
}
-EXTERNAL = 'router:external'
-EXTENDED_ATTRIBUTES_2_0 = {
- 'networks': {EXTERNAL: {'allow_post': True,
- 'allow_put': True,
- 'default': attr.ATTR_NOT_SPECIFIED,
- 'is_visible': True,
- 'convert_to': attr.convert_to_boolean,
- 'enforce_policy': True,
- 'required_by_policy': True}}}
-
l3_quota_opts = [
cfg.IntOpt('quota_router',
default=10,
my_plurals = [(key, key[:-1]) for key in RESOURCE_ATTRIBUTE_MAP.keys()]
attr.PLURALS.update(dict(my_plurals))
exts = []
- plugin = manager.NeutronManager.get_plugin()
+ plugin = manager.NeutronManager.get_service_plugins()[
+ constants.L3_ROUTER_NAT]
for resource_name in ['router', 'floatingip']:
collection_name = resource_name + "s"
params = RESOURCE_ATTRIBUTE_MAP.get(collection_name, dict())
def get_extended_resources(self, version):
if version == "2.0":
- return dict(EXTENDED_ATTRIBUTES_2_0.items() +
- RESOURCE_ATTRIBUTE_MAP.items())
+ return RESOURCE_ATTRIBUTE_MAP
else:
return {}
from abc import abstractmethod
+import webob.exc
+
from neutron.api import extensions
from neutron.api.v2 import base
from neutron.api.v2 import resource
from neutron.common import exceptions
from neutron.extensions import agent
from neutron import manager
+from neutron.openstack.common import log as logging
+from neutron.plugins.common import constants as service_constants
from neutron import policy
from neutron import wsgi
+
+LOG = logging.getLogger(__name__)
+
+
L3_ROUTER = 'l3-router'
L3_ROUTERS = L3_ROUTER + 's'
L3_AGENT = 'l3-agent'
class RouterSchedulerController(wsgi.Controller):
+ def get_plugin(self):
+ plugin = manager.NeutronManager.get_service_plugins().get(
+ service_constants.L3_ROUTER_NAT)
+ if not plugin:
+ LOG.error(_('No plugin for L3 routing registered to handle '
+ 'router scheduling'))
+ msg = _('The resource could not be found.')
+ raise webob.exc.HTTPNotFound(msg)
+ return plugin
+
def index(self, request, **kwargs):
- plugin = manager.NeutronManager.get_plugin()
+ plugin = self.get_plugin()
policy.enforce(request.context,
"get_%s" % L3_ROUTERS,
{})
request.context, kwargs['agent_id'])
def create(self, request, body, **kwargs):
- plugin = manager.NeutronManager.get_plugin()
+ plugin = self.get_plugin()
policy.enforce(request.context,
"create_%s" % L3_ROUTER,
{})
body['router_id'])
def delete(self, request, id, **kwargs):
- plugin = manager.NeutronManager.get_plugin()
+ plugin = self.get_plugin()
policy.enforce(request.context,
"delete_%s" % L3_ROUTER,
{})
class L3AgentsHostingRouterController(wsgi.Controller):
+ def get_plugin(self):
+ plugin = manager.NeutronManager.get_service_plugins().get(
+ service_constants.L3_ROUTER_NAT)
+ if not plugin:
+ LOG.error(_('No plugin for L3 routing registered to handle '
+ 'router scheduling'))
+ msg = _('The resource could not be found.')
+ raise webob.exc.HTTPNotFound(msg)
+ return plugin
+
def index(self, request, **kwargs):
- plugin = manager.NeutronManager.get_plugin()
+ plugin = manager.NeutronManager.get_service_plugins().get(
+ service_constants.L3_ROUTER_NAT)
policy.enforce(request.context,
"get_%s" % L3_AGENTS,
{})
from neutron.db import api as db
from neutron.db import db_base_plugin_v2
from neutron.db import dhcp_rpc_base
+from neutron.db import external_net_db
from neutron.db import extradhcpopt_db
from neutron.db import l3_db
+from neutron.extensions import external_net
from neutron.extensions import extra_dhcp_opt as edo_ext
from neutron.extensions import l3
from neutron.extensions import portbindings
class NeutronRestProxyV2(db_base_plugin_v2.NeutronDbPluginV2,
+ external_net_db.External_net_db_mixin,
routerrule_db.RouterRule_db_mixin,
extradhcpopt_db.ExtraDhcpOptMixin):
- supported_extension_aliases = ["router", "binding", "router_rules",
- "extra_dhcp_opt"]
+ supported_extension_aliases = ["external-net", "router", "binding",
+ "router_rules", "extra_dhcp_opt"]
def __init__(self, server_timeout=None):
LOG.info(_('NeutronRestProxy: Starting plugin. Version=%s'),
break
else:
network['gateway'] = ''
- network[l3.EXTERNAL] = self._network_is_external(context,
- network['id'])
+ network[external_net.EXTERNAL] = self._network_is_external(
+ context, network['id'])
return network
from neutron.db import api as db
from neutron.db import db_base_plugin_v2
from neutron.db import dhcp_rpc_base
+from neutron.db import external_net_db
from neutron.db import extraroute_db
+from neutron.db import l3_agentschedulers_db
from neutron.db import l3_rpc_base
from neutron.db import portbindings_base
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.openstack.common.rpc import proxy
from neutron.plugins.brocade.db import models as brocade_db
from neutron.plugins.brocade import vlanbm as vbm
+from neutron.plugins.common import constants as svc_constants
LOG = logging.getLogger(__name__)
class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
+ external_net_db.External_net_db_mixin,
extraroute_db.ExtraRoute_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin,
- agentschedulers_db.L3AgentSchedulerDbMixin,
+ l3_agentschedulers_db.L3AgentSchedulerDbMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
portbindings_base.PortBindingBaseMixin):
"""BrocadePluginV2 is a Neutron plugin.
"""
self.supported_extension_aliases = ["binding", "security-group",
- "router", "extraroute",
- "agent", "l3_agent_scheduler",
+ "external-net", "router",
+ "extraroute", "agent",
+ "l3_agent_scheduler",
"dhcp_agent_scheduler"]
self.physical_interface = (cfg.CONF.PHYSICAL_INTERFACE.
def _setup_rpc(self):
# RPC support
- self.topic = topics.PLUGIN
+ self.service_topics = {svc_constants.CORE: topics.PLUGIN,
+ svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.rpc_context = context.RequestContext('neutron', 'neutron',
is_admin=False)
self.conn = rpc.create_connection(new=True)
self.callbacks = BridgeRpcCallbacks()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
- self.conn.create_consumer(self.topic, self.dispatcher,
- fanout=False)
+ for svc_topic in self.service_topics.values():
+ self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
self.notifier = AgentNotifierApi(topics.AGENT)
from neutron.db import agentschedulers_db
from neutron.db import db_base_plugin_v2
from neutron.db import dhcp_rpc_base
+from neutron.db import external_net_db
from neutron.db import l3_db
from neutron.db import l3_rpc_base
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.plugins.cisco.db import network_db_v2
from neutron.plugins.cisco.extensions import n1kv_profile
from neutron.plugins.cisco.n1kv import n1kv_client
+from neutron.plugins.common import constants as svc_constants
LOG = logging.getLogger(__name__)
class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
+ external_net_db.External_net_db_mixin,
l3_db.L3_NAT_db_mixin,
n1kv_db_v2.NetworkProfile_db_mixin,
n1kv_db_v2.PolicyProfile_db_mixin,
"policy_profile_binding",
"network_profile_binding",
"n1kv_profile", "network_profile",
- "policy_profile", "router", "credential"]
+ "policy_profile", "external-net", "router",
+ "credential"]
def __init__(self, configfile=None):
"""
def _setup_rpc(self):
# RPC support
- self.topic = topics.PLUGIN
+ self.service_topics = {svc_constants.CORE: topics.PLUGIN,
+ svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT)
self.callbacks = N1kvRpcCallbacks(self.notifier)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
- self.conn.create_consumer(self.topic, self.dispatcher,
- fanout=False)
+ for svc_topic in self.service_topics.values():
+ self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
# Consume from all consumers in a thread
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
FIREWALL = "FIREWALL"
VPN = "VPN"
METERING = "METERING"
+L3_ROUTER_NAT = "L3_ROUTER_NAT"
+
#maps extension alias to service type
EXT_TO_SERVICE_MAPPING = {
'fwaas': FIREWALL,
'vpnaas': VPN,
'metering': METERING,
+ 'router': L3_ROUTER_NAT
}
# TODO(salvatore-orlando): Move these (or derive them) from conf file
-ALLOWED_SERVICES = [CORE, DUMMY, LOADBALANCER, FIREWALL, VPN, METERING]
+ALLOWED_SERVICES = [CORE, DUMMY, LOADBALANCER, FIREWALL, VPN, METERING,
+ L3_ROUTER_NAT]
COMMON_PREFIXES = {
CORE: "",
FIREWALL: "/fw",
VPN: "/vpn",
METERING: "/metering",
+ L3_ROUTER_NAT: "",
}
# Service operation status constants
from neutron.common import exceptions as q_exc
from neutron.common import topics
from neutron.db import db_base_plugin_v2
+from neutron.db import external_net_db
from neutron.db import l3_gwmode_db
from neutron.db import portbindings_base
from neutron.db import quota_db # noqa
from neutron.extensions import providernet as provider
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
+from neutron.plugins.common import constants as svc_constants
from neutron.plugins.common import utils as plugin_utils
from neutron.plugins.hyperv import agent_notifier_api
from neutron.plugins.hyperv.common import constants
class HyperVNeutronPlugin(db_base_plugin_v2.NeutronDbPluginV2,
+ external_net_db.External_net_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
portbindings_base.PortBindingBaseMixin):
# bulk operations. Name mangling is used in order to ensure it
# is qualified by class
__native_bulk_support = True
- supported_extension_aliases = ["provider", "router", "ext-gw-mode",
- "binding", "quotas"]
+ supported_extension_aliases = ["provider", "external-net", "router",
+ "ext-gw-mode", "binding", "quotas"]
def __init__(self, configfile=None):
self._db = hyperv_db.HyperVPluginDB()
def _setup_rpc(self):
# RPC support
- self.topic = topics.PLUGIN
+ self.service_topics = {svc_constants.CORE: topics.PLUGIN,
+ svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc.create_connection(new=True)
self.notifier = agent_notifier_api.AgentNotifierApi(
topics.AGENT)
self.callbacks = rpc_callbacks.HyperVRpcCallbacks(self.notifier)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
- self.conn.create_consumer(self.topic, self.dispatcher,
- fanout=False)
+ for svc_topic in self.service_topics.values():
+ self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
from neutron.db import api as db_api
from neutron.db import db_base_plugin_v2
from neutron.db import dhcp_rpc_base
+from neutron.db import external_net_db
from neutron.db import extraroute_db
+from neutron.db import l3_agentschedulers_db
from neutron.db import l3_gwmode_db
from neutron.db import l3_rpc_base
from neutron.db import portbindings_db
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.openstack.common.rpc import proxy
+from neutron.plugins.common import constants as svc_constants
from neutron.plugins.common import utils as plugin_utils
from neutron.plugins.linuxbridge.common import constants
from neutron.plugins.linuxbridge.db import l2network_db_v2 as db
class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
+ external_net_db.External_net_db_mixin,
extraroute_db.ExtraRoute_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin,
- agentschedulers_db.L3AgentSchedulerDbMixin,
+ l3_agentschedulers_db.L3AgentSchedulerDbMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
portbindings_db.PortBindingMixin):
"""Implement the Neutron abstractions using Linux bridging.
__native_pagination_support = True
__native_sorting_support = True
- _supported_extension_aliases = ["provider", "router", "ext-gw-mode",
- "binding", "quotas", "security-group",
- "agent", "extraroute",
+ _supported_extension_aliases = ["provider", "external-net", "router",
+ "ext-gw-mode", "binding", "quotas",
+ "security-group", "agent", "extraroute",
"l3_agent_scheduler",
"dhcp_agent_scheduler"]
def _setup_rpc(self):
# RPC support
- self.topic = topics.PLUGIN
+ self.service_topics = {svc_constants.CORE: topics.PLUGIN,
+ svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc.create_connection(new=True)
self.callbacks = LinuxBridgeRpcCallbacks()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
- self.conn.create_consumer(self.topic, self.dispatcher,
- fanout=False)
+ for svc_topic in self.service_topics.values():
+ self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
self.notifier = AgentNotifierApi(topics.AGENT)
from neutron.common import exceptions as exc
from neutron.db import api as db
from neutron.db import db_base_plugin_v2
+from neutron.db import external_net_db
from neutron.db import extraroute_db
from neutron.db import l3_db
from neutron.db import models_v2
class MetaPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
+ external_net_db.External_net_db_mixin,
extraroute_db.ExtraRoute_db_mixin):
def __init__(self, configfile=None):
LOG.debug(_("Start initializing metaplugin"))
self.supported_extension_aliases = \
cfg.CONF.META.supported_extension_aliases.split(',')
- self.supported_extension_aliases += ['flavor', 'router',
- 'ext-gw-mode', 'extraroute']
+ self.supported_extension_aliases += ['flavor', 'external-net',
+ 'router', 'ext-gw-mode',
+ 'extraroute']
# Ignore config option overapping
def _is_opt_registered(opts, opt):
from neutron.db import api as db
from neutron.db import db_base_plugin_v2
+from neutron.db import external_net_db
from neutron.db import l3_db
from neutron.openstack.common import log as logging
from neutronclient.common import exceptions
class ProxyPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
+ external_net_db.External_net_db_mixin,
l3_db.L3_NAT_db_mixin):
- supported_extension_aliases = ["router"]
+ supported_extension_aliases = ["external-net", "router"]
def __init__(self, configfile=None):
db.configure_db()
from neutron.db import api as db
from neutron.db import db_base_plugin_v2
from neutron.db import dhcp_rpc_base
+from neutron.db import external_net_db
from neutron.db import l3_db
from neutron.db import models_v2
from neutron.db import securitygroups_db
class MidonetPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
+ external_net_db.External_net_db_mixin,
l3_db.L3_NAT_db_mixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
securitygroups_db.SecurityGroupDbMixin):
- supported_extension_aliases = ['router', 'security-group', 'agent',
- 'dhcp_agent_scheduler']
+ supported_extension_aliases = ['external-net', 'router', 'security-group',
+ 'agent' 'dhcp_agent_scheduler']
__native_bulk_support = False
def __init__(self):
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
-from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.common import constants as const
from neutron.common import exceptions as exc
from neutron.db import agentschedulers_db
from neutron.db import allowedaddresspairs_db as addr_pair_db
from neutron.db import db_base_plugin_v2
-from neutron.db import extraroute_db
-from neutron.db import l3_gwmode_db
+from neutron.db import external_net_db
from neutron.db import models_v2
from neutron.db import quota_db # noqa
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.extensions import multiprovidernet as mpnet
from neutron.extensions import portbindings
from neutron.extensions import providernet as provider
+from neutron import manager
from neutron.openstack.common import excutils
from neutron.openstack.common import importutils
from neutron.openstack.common import log
from neutron.openstack.common import rpc as c_rpc
+from neutron.plugins.common import constants as service_constants
from neutron.plugins.ml2.common import exceptions as ml2_exc
from neutron.plugins.ml2 import config # noqa
from neutron.plugins.ml2 import db
class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
- extraroute_db.ExtraRoute_db_mixin,
- l3_gwmode_db.L3_NAT_db_mixin,
+ external_net_db.External_net_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin,
- agentschedulers_db.L3AgentSchedulerDbMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
addr_pair_db.AllowedAddressPairsMixin):
+
"""Implement the Neutron L2 abstractions using modules.
Ml2Plugin is a Neutron plugin based on separately extensible sets
__native_sorting_support = True
# List of supported extensions
- _supported_extension_aliases = ["provider", "router", "extraroute",
- "binding", "quotas", "security-group",
- "agent", "l3_agent_scheduler",
- "dhcp_agent_scheduler", "ext-gw-mode",
+ _supported_extension_aliases = ["provider", "external-net", "binding",
+ "quotas", "security-group", "agent",
+ "dhcp_agent_scheduler",
"multi-provider", "allowed-address-pairs"]
@property
self.network_scheduler = importutils.import_object(
cfg.CONF.network_scheduler_driver
)
- self.router_scheduler = importutils.import_object(
- cfg.CONF.router_scheduler_driver
- )
LOG.info(_("Modular L2 Plugin initialization complete"))
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
)
- self.agent_notifiers[const.AGENT_TYPE_L3] = (
- l3_rpc_agent_api.L3AgentNotify
- )
self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager)
self.topic = topics.PLUGIN
self.conn = c_rpc.create_connection(new=True)
return updated_port
def delete_port(self, context, id, l3_port_check=True):
- if l3_port_check:
- self.prevent_l3_port_deletion(context, id)
+ l3plugin = manager.NeutronManager.get_service_plugins().get(
+ service_constants.L3_ROUTER_NAT)
+ if l3plugin and l3_port_check:
+ l3plugin.prevent_l3_port_deletion(context, id)
session = context.session
with session.begin(subtransactions=True):
- self.disassociate_floatingips(context, id)
+ if l3plugin:
+ l3plugin.disassociate_floatingips(context, id)
port = self.get_port(context, id)
network = self.get_network(context, port['network_id'])
mech_context = driver_context.PortContext(self, context, port,
from neutron.db import agents_db
from neutron.db import api as db_api
from neutron.db import dhcp_rpc_base
-from neutron.db import l3_rpc_base
from neutron.db import securitygroups_rpc_base as sg_db_rpc
from neutron.openstack.common import log
from neutron.openstack.common.rpc import proxy
class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin,
- l3_rpc_base.L3RpcCallbackMixin,
sg_db_rpc.SecurityGroupServerRpcCallbackMixin,
type_tunnel.TunnelRpcCallbackMixin):
from neutron.common import utils
from neutron.db import agentschedulers_db
from neutron.db import db_base_plugin_v2
+from neutron.db import external_net_db
from neutron.db import extraroute_db
+from neutron.db import l3_agentschedulers_db
from neutron.db import l3_gwmode_db
from neutron.db import portbindings_db
from neutron.db import quota_db # noqa
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
+from neutron.plugins.common import constants as svc_constants
from neutron.plugins.common import utils as plugin_utils
from neutron.plugins.mlnx import agent_notify_api
from neutron.plugins.mlnx.common import constants
class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
+ external_net_db.External_net_db_mixin,
extraroute_db.ExtraRoute_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin,
- agentschedulers_db.L3AgentSchedulerDbMixin,
+ l3_agentschedulers_db.L3AgentSchedulerDbMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
portbindings_db.PortBindingMixin):
"""Realization of Neutron API on Mellanox HCA embedded switch technology.
# is qualified by class
__native_bulk_support = True
- _supported_extension_aliases = ["provider", "router", "ext-gw-mode",
- "binding", "quotas", "security-group",
- "agent", "extraroute",
+ _supported_extension_aliases = ["provider", "external-net", "router",
+ "ext-gw-mode", "binding", "quotas",
+ "security-group", "agent", "extraroute",
"l3_agent_scheduler",
"dhcp_agent_scheduler"]
def _setup_rpc(self):
# RPC support
- self.topic = topics.PLUGIN
+ self.service_topics = {svc_constants.CORE: topics.PLUGIN,
+ svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc.create_connection(new=True)
self.callbacks = rpc_callbacks.MlnxRpcCallbacks()
self.dispatcher = self.callbacks.create_rpc_dispatcher()
- self.conn.create_consumer(self.topic, self.dispatcher,
- fanout=False)
+ for svc_topic in self.service_topics.values():
+ self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
self.notifier = agent_notify_api.AgentNotifierApi(topics.AGENT)
from neutron.db import agentschedulers_db
from neutron.db import db_base_plugin_v2
from neutron.db import dhcp_rpc_base
+from neutron.db import external_net_db
from neutron.db import l3_rpc_base
from neutron.db import portbindings_base
from neutron.db import portbindings_db
from neutron.openstack.common import rpc
from neutron.openstack.common.rpc import proxy
from neutron.openstack.common import uuidutils
+from neutron.plugins.common import constants as svc_constants
from neutron.plugins.nec.common import config
from neutron.plugins.nec.common import exceptions as nexc
from neutron.plugins.nec.db import api as ndb
class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
+ external_net_db.External_net_db_mixin,
nec_router.RouterMixin,
sg_db_rpc.SecurityGroupServerRpcMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
_supported_extension_aliases = ["agent",
"binding",
"dhcp_agent_scheduler",
+ "external-net",
"ext-gw-mode",
"extraroute",
"l3_agent_scheduler",
}
def setup_rpc(self):
- self.topic = topics.PLUGIN
+ self.service_topics = {svc_constants.CORE: topics.PLUGIN,
+ svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc.create_connection(new=True)
self.notifier = NECPluginV2AgentNotifierApi(topics.AGENT)
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
self.callback_sg,
agents_db.AgentExtRpcCallback()]
self.dispatcher = q_rpc.PluginRpcDispatcher(callbacks)
- self.conn.create_consumer(self.topic, self.dispatcher, fanout=False)
+ for svc_topic in self.service_topics.values():
+ self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.v2 import attributes as attr
from neutron.common import exceptions as q_exc
-from neutron.db import agentschedulers_db
from neutron.db import db_base_plugin_v2
from neutron.db import extraroute_db
+from neutron.db import l3_agentschedulers_db
from neutron.db import l3_db
from neutron.db import l3_gwmode_db
from neutron.db import models_v2
l3.ROUTERS, [extend_router_dict_provider])
-class L3AgentSchedulerDbMixin(agentschedulers_db.L3AgentSchedulerDbMixin):
+class L3AgentSchedulerDbMixin(l3_agentschedulers_db.L3AgentSchedulerDbMixin):
def auto_schedule_routers(self, context, host, router_ids):
router_ids = rdb.get_routers_by_provider(
from neutron.db import allowedaddresspairs_db as addr_pair_db
from neutron.db import api as db
from neutron.db import db_base_plugin_v2
+from neutron.db import external_net_db
from neutron.db import extraroute_db
from neutron.db import l3_db
from neutron.db import l3_gwmode_db
from neutron.db import quota_db # noqa
from neutron.db import securitygroups_db
from neutron.extensions import allowedaddresspairs as addr_pair
+from neutron.extensions import external_net as ext_net_extn
from neutron.extensions import extraroute
from neutron.extensions import l3
from neutron.extensions import multiprovidernet as mpnet
db_base_plugin_v2.NeutronDbPluginV2,
dhcpmeta_modes.DhcpMetadataAccess,
dist_rtr.DistributedRouter_mixin,
+ external_net_db.External_net_db_mixin,
extraroute_db.ExtraRoute_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
mac_db.MacLearningDbMixin,
"port-security",
"provider",
"quotas",
+ "external-net",
"router",
"security-group"]
"network %s"), net_data.get('name', '<unknown>'))
transport_zone_config = self._convert_to_nvp_transport_zones(
self.cluster, net_data)
- external = net_data.get(l3.EXTERNAL)
+ external = net_data.get(ext_net_extn.EXTERNAL)
if (not attr.is_attr_set(external) or
attr.is_attr_set(external) and not external):
lswitch = nvplib.create_lswitch(
# being updated or not
old_mac_learning_state = ret_port.get(mac_ext.MAC_LEARNING)
# copy values over - except fixed_ips as
- # they've alreaby been processed
+ # they've already been processed
port['port'].pop('fixed_ips', None)
ret_port.update(port['port'])
tenant_id = self._get_tenant_id_for_create(context, ret_port)
from neutron.common import constants
from neutron.common import exceptions
from neutron import context
+from neutron.db import external_net_db
from neutron.db import l3_db
from neutron.db import models_v2
from neutron.openstack.common import jsonutils
if not ext_networks:
ext_networks = [net['id'] for net in context.session.query(
models_v2.Network).join(
- l3_db.ExternalNetwork,
+ external_net_db.ExternalNetwork,
(models_v2.Network.id ==
- l3_db.ExternalNetwork.network_id))]
+ external_net_db.ExternalNetwork.network_id))]
if neutron_port_data['network_id'] in ext_networks:
with context.session.begin(subtransactions=True):
neutron_port_data['status'] = constants.PORT_STATUS_ACTIVE
# this query
ext_nets = [net['id'] for net in ctx.session.query(
models_v2.Network).join(
- l3_db.ExternalNetwork,
+ external_net_db.ExternalNetwork,
(models_v2.Network.id ==
- l3_db.ExternalNetwork.network_id))]
+ external_net_db.ExternalNetwork.network_id))]
for port in self._plugin._get_collection_query(
ctx, models_v2.Port, filters=filters):
lswitchport = neutron_port_mappings.get(port['id'])
from neutron.db import allowedaddresspairs_db as addr_pair_db
from neutron.db import db_base_plugin_v2
from neutron.db import dhcp_rpc_base
+from neutron.db import external_net_db
from neutron.db import extradhcpopt_db
from neutron.db import extraroute_db
+from neutron.db import l3_agentschedulers_db
from neutron.db import l3_gwmode_db
from neutron.db import l3_rpc_base
from neutron.db import portbindings_db
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.openstack.common.rpc import proxy
+from neutron.plugins.common import constants as svc_constants
from neutron.plugins.common import utils as plugin_utils
from neutron.plugins.openvswitch.common import config # noqa
from neutron.plugins.openvswitch.common import constants
class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
+ external_net_db.External_net_db_mixin,
extraroute_db.ExtraRoute_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin,
- agentschedulers_db.L3AgentSchedulerDbMixin,
+ l3_agentschedulers_db.L3AgentSchedulerDbMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin,
portbindings_db.PortBindingMixin,
extradhcpopt_db.ExtraDhcpOptMixin,
__native_pagination_support = True
__native_sorting_support = True
- _supported_extension_aliases = ["provider", "router", "ext-gw-mode",
- "binding", "quotas", "security-group",
- "agent", "extraroute",
+ _supported_extension_aliases = ["provider", "external-net", "router",
+ "ext-gw-mode", "binding", "quotas",
+ "security-group", "agent", "extraroute",
"l3_agent_scheduler",
"dhcp_agent_scheduler",
"extra_dhcp_opt",
def setup_rpc(self):
# RPC support
- self.topic = topics.PLUGIN
+ self.service_topics = {svc_constants.CORE: topics.PLUGIN,
+ svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN}
self.conn = rpc.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT)
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
)
self.callbacks = OVSRpcCallbacks(self.notifier, self.tunnel_type)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
- self.conn.create_consumer(self.topic, self.dispatcher,
- fanout=False)
+ for svc_topic in self.service_topics.values():
+ self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
from neutron.api.v2 import attributes
from neutron.db import api as db
from neutron.db import db_base_plugin_v2
+from neutron.db import external_net_db
from neutron.db import l3_db
from neutron.db import portbindings_db
from neutron.extensions import portbindings
class NeutronPluginPLUMgridV2(db_base_plugin_v2.NeutronDbPluginV2,
portbindings_db.PortBindingMixin,
+ external_net_db.External_net_db_mixin,
l3_db.L3_NAT_db_mixin):
- supported_extension_aliases = ["router", "binding"]
+ supported_extension_aliases = ["external-net", "router", "binding"]
binding_view = "extension:port_binding:view"
binding_set = "extension:port_binding:set"
from neutron.db import api as db
from neutron.db import db_base_plugin_v2
from neutron.db import dhcp_rpc_base
+from neutron.db import external_net_db
from neutron.db import extraroute_db
from neutron.db import l3_gwmode_db
from neutron.db import l3_rpc_base
class RyuNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
+ external_net_db.External_net_db_mixin,
extraroute_db.ExtraRoute_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin,
portbindings_base.PortBindingBaseMixin):
- _supported_extension_aliases = ["router", "ext-gw-mode",
+ _supported_extension_aliases = ["external-net", "router", "ext-gw-mode",
"extraroute", "security-group",
"binding"]
from neutron.common import constants
from neutron.db import agents_db
-from neutron.db import agentschedulers_db
+from neutron.db import l3_agentschedulers_db
from neutron.db import l3_db
from neutron.openstack.common import log as logging
#TODO(gongysh) consider the disabled agent's router
stmt = ~exists().where(
l3_db.Router.id ==
- agentschedulers_db.RouterL3AgentBinding.router_id)
+ l3_agentschedulers_db.RouterL3AgentBinding.router_id)
unscheduled_router_ids = [router_id_[0] for router_id_ in
context.session.query(
l3_db.Router.id).filter(stmt)]
# binding
for router_id in router_ids:
- binding = agentschedulers_db.RouterL3AgentBinding()
+ binding = l3_agentschedulers_db.RouterL3AgentBinding()
binding.l3_agent = l3_agent
binding.router_id = router_id
binding.default = True
return
chosen_agent = random.choice(candidates)
- binding = agentschedulers_db.RouterL3AgentBinding()
+ binding = l3_agentschedulers_db.RouterL3AgentBinding()
binding.l3_agent = chosen_agent
binding.router_id = sync_router['id']
context.session.add(binding)
--- /dev/null
+This service plugin implements the L3 routing functionality (resources router
+and floatingip) that in earlier releases before Havana was provided by core
+plugins (openvswitch, linuxbridge, ... etc).
+
+Core plugins can now choose not to implement L3 routing functionality and
+instead delegate that to the L3 routing service plugin.
+
+The required changes to a core plugin are in that case:
+- Do not inherit 'l3_db.L3_NAT_db_mixin' (or its descendants like extraroute)
+ anymore.
+- Remove "router" from 'supported_extension_aliases'.
+- Modify any 'self' references to members in L3_NAT_db_mixin to instead use
+ 'manager.NeutronManager.get_service_plugins().get(constants.L3_ROUTER_NAT)'
+ For example,
+ self.prevent_l3_port_deletion(...)
+ becomes something like
+ plugin = manager.NeutronManager.get_service_plugins().get(
+ constants.L3_ROUTER_NAT)
+ if plugin:
+ plugin.prevent_l3_port_deletion(...)
+
+If the core plugin has relied on the L3Agent the following must also be changed:
+- Do not inherit 'l3_rpc_base.L3RpcCallbackMixin' in any '*RpcCallbacks' class.
+- Do not be a consumer of the topics.L3PLUGIN topic for RPC.
+
+To use the L3 routing service plugin, add
+'neutron.services.l3_router.l3_router_plugin.L3RouterPlugin'
+to 'service_plugins' in '/etc/neutron/neutron.conf'.
+That is,
+service_plugins = neutron.services.l3_router.l3_router_plugin.L3RouterPlugin
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Bob Melander, Cisco Systems, Inc.
+
+from oslo.config import cfg
+
+from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
+from neutron.common import constants as q_const
+from neutron.common import rpc as q_rpc
+from neutron.common import topics
+from neutron.db import api as qdbapi
+from neutron.db import db_base_plugin_v2
+from neutron.db import extraroute_db
+from neutron.db import l3_agentschedulers_db
+from neutron.db import l3_gwmode_db
+from neutron.db import l3_rpc_base
+from neutron.db import model_base
+from neutron.openstack.common import importutils
+from neutron.openstack.common import rpc
+from neutron.plugins.common import constants
+
+
+class L3RouterPluginRpcCallbacks(l3_rpc_base.L3RpcCallbackMixin):
+
+ # Set RPC API version to 1.0 by default.
+ RPC_API_VERSION = '1.0'
+
+ def create_rpc_dispatcher(self):
+ """Get the rpc dispatcher for this manager.
+
+ If a manager would like to set an rpc API version, or support more than
+ one class as the target of rpc messages, override this method.
+ """
+ return q_rpc.PluginRpcDispatcher([self])
+
+
+class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin,
+ extraroute_db.ExtraRoute_db_mixin,
+ l3_gwmode_db.L3_NAT_db_mixin,
+ l3_agentschedulers_db.L3AgentSchedulerDbMixin):
+
+ """Implementation of the Neutron L3 Router Service Plugin.
+
+ This class implements a L3 service plugin that provides
+ router and floatingip resources and manages associated
+ request/response.
+ All DB related work is implemented in classes
+ l3_db.L3_NAT_db_mixin and extraroute_db.ExtraRoute_db_mixin.
+ """
+ supported_extension_aliases = ["router", "ext-gw-mode",
+ "extraroute", "l3_agent_scheduler"]
+
+ def __init__(self):
+ qdbapi.register_models(base=model_base.BASEV2)
+ self.setup_rpc()
+ self.router_scheduler = importutils.import_object(
+ cfg.CONF.router_scheduler_driver)
+
+ def setup_rpc(self):
+ # RPC support
+ self.topic = topics.L3PLUGIN
+ self.conn = rpc.create_connection(new=True)
+ self.agent_notifiers.update(
+ {q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotify})
+ self.callbacks = L3RouterPluginRpcCallbacks()
+ self.dispatcher = self.callbacks.create_rpc_dispatcher()
+ self.conn.create_consumer(self.topic, self.dispatcher,
+ fanout=False)
+ self.conn.consume_in_thread()
+
+ def get_plugin_type(self):
+ return constants.L3_ROUTER_NAT
+
+ def get_plugin_description(self):
+ """returns string description of the plugin."""
+ return ("L3 Router Service Plugin for basic L3 forwarding"
+ " between (L2) Neutron networks and access to external"
+ " networks via a NAT gateway.")
cfg.CONF.set_default('allow_overlapping_ips', False)
ext_mgr = RouterRulesTestExtensionManager()
test_config['extension_manager'] = ext_mgr
- super(test_l3_plugin.L3NatTestCaseBase, self).setUp()
+ super(test_l3_plugin.L3BaseForIntTests, self).setUp()
# Set to None to reload the drivers
notifier_api._drivers = None
cfg.CONF.set_override("notification_driver", [test_notifier.__name__])
-origSetUp = test_l3_plugin.L3NatDBTestCase.setUp
+origSetUp = test_l3_plugin.L3NatDBIntTestCase.setUp
class RouterRulesTestExtensionManager(object):
super(test_extradhcp.ExtraDhcpOptDBTestCase, self).setUp(plugin=p_path)
-class RouterDBTestCase(test_l3_plugin.L3NatDBTestCase):
+class RouterDBTestCase(test_l3_plugin.L3NatDBIntTestCase):
def setUp(self):
self.httpPatch = patch('httplib.HTTPConnection', create=True,
new=fake_server.HTTPConnectionMock)
self.httpPatch.start()
- test_l3_plugin.L3NatDBTestCase.setUp = new_L3_setUp
+ test_l3_plugin.L3NatDBIntTestCase.setUp = new_L3_setUp
super(RouterDBTestCase, self).setUp()
self.plugin_obj = NeutronManager.get_plugin()
del test_config['plugin_name_v2']
del test_config['config_files']
cfg.CONF.reset()
- test_l3_plugin.L3NatDBTestCase.setUp = origSetUp
+ test_l3_plugin.L3NatDBIntTestCase.setUp = origSetUp
def test_router_remove_router_interface_wrong_subnet_returns_400(self):
with self.router() as r:
from neutron.common import config
from neutron import context
from neutron.db import agentschedulers_db
+from neutron.db import l3_agentschedulers_db
from neutron.db.vpn import vpn_db
from neutron import extensions
from neutron.extensions import vpnaas
extensions_path = ':'.join(extensions.__path__)
-class TestVpnCorePlugin(test_l3_plugin.TestL3NatPlugin,
- agentschedulers_db.L3AgentSchedulerDbMixin,
+class TestVpnCorePlugin(test_l3_plugin.TestL3NatIntPlugin,
+ l3_agentschedulers_db.L3AgentSchedulerDbMixin,
agentschedulers_db.DhcpAgentSchedulerDbMixin):
def __init__(self, configfile=None):
super(TestVpnCorePlugin, self).__init__()
sys.modules["heleosapi"] = mock.Mock()
-class TestEmbraneL3NatDBTestCase(router_test.L3NatDBTestCase):
+class TestEmbraneL3NatDBTestCase(router_test.L3NatDBIntTestCase):
_plugin_name = PLUGIN_NAME
def setUp(self):
super(TestEmbraneL3NatDBTestCase, self).setUp()
-class ExtraRouteDBTestCase(extraroute_test.ExtraRouteDBTestCase):
+class ExtraRouteDBTestCase(extraroute_test.ExtraRouteDBIntTestCase):
_plugin_name = PLUGIN_NAME
class LbAgentSchedulerTestCase(
test_agent_scheduler.OvsAgentSchedulerTestCase):
plugin_str = test_linuxbridge_plugin.PLUGIN_NAME
+ l3_plugin = None
class LbL3AgentNotifierTestCase(
test_agent_scheduler.OvsL3AgentNotifierTestCase):
plugin_str = test_linuxbridge_plugin.PLUGIN_NAME
+ l3_plugin = None
class LbDhcpAgentNotifierTestCase(
# under the License.
from neutron.db import db_base_plugin_v2
+from neutron.db import external_net_db
from neutron.db import l3_gwmode_db
class Fake1(db_base_plugin_v2.NeutronDbPluginV2,
+ external_net_db.External_net_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin):
- supported_extension_aliases = ['router']
+ supported_extension_aliases = ['external-net', 'router']
def fake_func(self):
return 'fake1'
pass
-class TestMetaL3NatDBTestCase(test_l3_plugin.L3NatDBTestCase,
+class TestMetaL3NatDBTestCase(test_l3_plugin.L3NatDBIntTestCase,
MetaPluginV2DBTestCase):
pass
class Ml2AgentSchedulerTestCase(
test_agent_scheduler.OvsAgentSchedulerTestCase):
plugin_str = test_ml2_plugin.PLUGIN_NAME
+ l3_plugin = ('neutron.services.l3_router.'
+ 'l3_router_plugin.L3RouterPlugin')
class Ml2L3AgentNotifierTestCase(
test_agent_scheduler.OvsL3AgentNotifierTestCase):
plugin_str = test_ml2_plugin.PLUGIN_NAME
+ l3_plugin = ('neutron.services.l3_router.'
+ 'l3_router_plugin.L3RouterPlugin')
class Ml2DhcpAgentNotifierTestCase(
from neutron.plugins.ml2 import config
from neutron.tests.unit import _test_extension_portbindings as test_bindings
from neutron.tests.unit import test_db_plugin as test_plugin
-from neutron.tests.unit import test_extension_ext_gw_mode
from neutron.tests.unit import test_security_groups_rpc as test_sg_rpc
_plugin_name = PLUGIN_NAME
def setUp(self):
+ # We need a L3 service plugin
+ l3_plugin = ('neutron.tests.unit.test_l3_plugin.'
+ 'TestL3NatServicePlugin')
+ service_plugins = {'l3_plugin_name': l3_plugin}
# Enable the test mechanism driver to ensure that
# we can successfully call through to all mechanism
# driver apis.
config.cfg.CONF.set_override('network_vlan_ranges', [self.phys_vrange],
group='ml2_type_vlan')
self.addCleanup(config.cfg.CONF.reset)
- super(Ml2PluginV2TestCase, self).setUp(PLUGIN_NAME)
+ super(Ml2PluginV2TestCase, self).setUp(PLUGIN_NAME,
+ service_plugins=service_plugins)
self.port_create_status = 'DOWN'
pass
-class TestMl2ExtGwModeSupport(Ml2PluginV2TestCase,
- test_extension_ext_gw_mode.ExtGwModeTestCase):
- pass
-
-
class TestMultiSegmentNetworks(Ml2PluginV2TestCase):
def setUp(self, plugin=None):
class MlnxAgentSchedulerTestCase(
test_agent_scheduler.OvsAgentSchedulerTestCase):
plugin_str = test_mlnx_plugin.PLUGIN_NAME
+ l3_plugin = None
class MlnxL3AgentNotifierTestCase(
test_agent_scheduler.OvsL3AgentNotifierTestCase):
plugin_str = test_mlnx_plugin.PLUGIN_NAME
+ l3_plugin = None
class MlnxDhcpAgentNotifierTestCase(
test_nec_plugin.NecPluginV2TestCaseBase):
plugin_str = test_nec_plugin.PLUGIN_NAME
+ l3_plugin = None
def setUp(self):
self.setup_nec_plugin_base()
test_nec_plugin.NecPluginV2TestCaseBase):
plugin_str = test_nec_plugin.PLUGIN_NAME
+ l3_plugin = None
def setUp(self):
# OvsDhcpAgentNotifierTestCase uses stop() for each mock.
from neutron.tests.unit import test_extension_extraroute as test_ext_route
-class NecRouterL3AgentTestCase(test_ext_route.ExtraRouteDBTestCase):
+class NecRouterL3AgentTestCase(test_ext_route.ExtraRouteDBIntTestCase):
_plugin_name = test_nec_plugin.PLUGIN_NAME
from neutron.common import exceptions as ntn_exc
import neutron.common.test_lib as test_lib
from neutron import context
+from neutron.extensions import external_net
from neutron.extensions import l3
from neutron.extensions import l3_ext_gw_mode
from neutron.extensions import multiprovidernet as mpnet
data = {'network': {'name': name,
'admin_state_up': admin_state_up,
'tenant_id': self._tenant_id}}
+ # Fix to allow the router:external attribute and any other
+ # attributes containing a colon to be passed with
+ # a double underscore instead
+ kwargs = dict((k.replace('__', ':'), v) for k, v in kwargs.items())
+ if external_net.EXTERNAL in kwargs:
+ arg_list = (external_net.EXTERNAL, ) + (arg_list or ())
+
attrs = kwargs
if providernet_args:
attrs.update(providernet_args)
return []
-class TestNiciraL3NatTestCase(test_l3_plugin.L3NatDBTestCase,
+class TestNiciraL3NatTestCase(test_l3_plugin.L3NatDBIntTestCase,
NiciraPluginV2TestCase):
def _restore_l3_attribute_map(self):
net_type = NeutronPlugin.NetworkTypes.L3_EXT
expected = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', 'ACTIVE'), ('shared', False),
- (l3.EXTERNAL, True),
+ (external_net.EXTERNAL, True),
(pnet.NETWORK_TYPE, net_type),
(pnet.PHYSICAL_NETWORK, 'l3_gw_uuid'),
(pnet.SEGMENTATION_ID, vlan_id)]
class NiciraExtGwModeTestCase(NiciraPluginV2TestCase,
- test_ext_gw_mode.ExtGwModeTestCase):
+ test_ext_gw_mode.ExtGwModeIntTestCase):
pass
class NiciraNeutronNVPOutOfSync(NiciraPluginV2TestCase,
- test_l3_plugin.L3NatTestCaseBase):
+ test_l3_plugin.L3NatTestCaseMixin):
def setUp(self):
+ ext_mgr = test_l3_plugin.L3TestExtensionManager()
+ test_lib.test_config['extension_manager'] = ext_mgr
super(NiciraNeutronNVPOutOfSync, self).setUp()
def test_delete_network_not_in_nvp(self):
net_id = net['network']['id']
if external:
self._update('networks', net_id,
- {'network': {l3.EXTERNAL: True}})
+ {'network': {external_net.EXTERNAL: True}})
sub_res = self._create_subnet('json', net_id, cidr)
sub = self.deserialize('json', sub_res)
return net_id, sub['subnet']['id']
'--config-file', get_fake_conf('nvp.ini.test')]
config.parse(args=args)
self._plugin = NeutronPlugin.NvpPluginV2()
+ mock_nm_get_plugin = mock.patch('neutron.manager.NeutronManager.'
+ 'get_plugin')
+ self.mock_nm_get_plugin = mock_nm_get_plugin.start()
+ self.mock_nm_get_plugin.return_value = self._plugin
super(NvpSyncTestCase, self).setUp()
self.addCleanup(self.fc.reset_all)
self.addCleanup(patch_sync.stop)
self.addCleanup(mock_nvpapi.stop)
+ self.addCleanup(mock_nm_get_plugin.stop)
def tearDown(self):
cfg.CONF.reset()
from neutron import manager
from neutron.openstack.common import timeutils
from neutron.openstack.common import uuidutils
+from neutron.plugins.common import constants as service_constants
from neutron.tests.unit import test_agent_ext_plugin
from neutron.tests.unit import test_db_plugin as test_plugin
from neutron.tests.unit import test_extensions
fmt = 'json'
plugin_str = ('neutron.plugins.openvswitch.'
'ovs_neutron_plugin.OVSNeutronPluginV2')
+ l3_plugin = None
def setUp(self):
# Save the global RESOURCE_ATTRIBUTE_MAP
self.saved_attr_map = {}
for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
self.saved_attr_map[resource] = attrs.copy()
- super(OvsAgentSchedulerTestCaseBase, self).setUp(self.plugin_str)
+ if self.l3_plugin:
+ service_plugins = {'l3_plugin_name': self.l3_plugin}
+ else:
+ service_plugins = None
+ super(OvsAgentSchedulerTestCaseBase, self).setUp(
+ self.plugin_str, service_plugins=service_plugins)
ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
self.adminContext = context.get_admin_context()
attributes.RESOURCE_ATTRIBUTE_MAP.update(
agent.RESOURCE_ATTRIBUTE_MAP)
self.addCleanup(self.restore_attribute_map)
- self.agentscheduler_dbMinxin = manager.NeutronManager.get_plugin()
+ self.l3agentscheduler_dbMinxin = (
+ manager.NeutronManager.get_service_plugins().get(
+ service_constants.L3_ROUTER_NAT))
def restore_attribute_map(self):
# Restore the original RESOURCE_ATTRIBUTE_MAP
res = router_req.get_response(self.ext_api)
router = self.deserialize(self.fmt, res)
l3agents = (
- self.agentscheduler_dbMinxin.get_l3_agents_hosting_routers(
+ self.l3agentscheduler_dbMinxin.get_l3_agents_hosting_routers(
self.adminContext, [router['router']['id']]))
self._delete('routers', router['router']['id'])
self.assertEqual(0, len(l3agents))
admin_context=False)
-class OvsDhcpAgentNotifierTestCase(OvsAgentSchedulerTestCaseBase):
+class OvsDhcpAgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin,
+ test_agent_ext_plugin.AgentDBTestMixIn,
+ AgentSchedulerTestMixIn,
+ test_plugin.NeutronDbPluginV2TestCase):
+ plugin_str = ('neutron.plugins.openvswitch.'
+ 'ovs_neutron_plugin.OVSNeutronPluginV2')
def setUp(self):
self.dhcp_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
'DhcpAgentNotifyAPI')
self.dhcp_notifier_cls = self.dhcp_notifier_cls_p.start()
self.dhcp_notifier_cls.return_value = self.dhcp_notifier
- super(OvsDhcpAgentNotifierTestCase, self).setUp()
+ # Save the global RESOURCE_ATTRIBUTE_MAP
+ self.saved_attr_map = {}
+ for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
+ self.saved_attr_map[resource] = attrs.copy()
+ super(OvsDhcpAgentNotifierTestCase, self).setUp(self.plugin_str)
+ ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
+ self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
+ self.adminContext = context.get_admin_context()
+ # Add the resources to the global attribute map
+ # This is done here as the setup process won't
+ # initialize the main API router which extends
+ # the global attribute map
+ attributes.RESOURCE_ATTRIBUTE_MAP.update(
+ agent.RESOURCE_ATTRIBUTE_MAP)
self.addCleanup(self.dhcp_notifier_cls_p.stop)
+ self.addCleanup(self.restore_attribute_map)
+
+ def restore_attribute_map(self):
+ # Restore the original RESOURCE_ATTRIBUTE_MAP
+ attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
def test_network_add_to_dhcp_agent_notification(self):
with mock.patch.object(self.dhcp_notifier, 'cast') as mock_dhcp:
self.assertIn(expected, mock_dhcp.call_args_list)
-class OvsL3AgentNotifierTestCase(OvsAgentSchedulerTestCaseBase):
+class OvsL3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin,
+ test_agent_ext_plugin.AgentDBTestMixIn,
+ AgentSchedulerTestMixIn,
+ test_plugin.NeutronDbPluginV2TestCase):
+ plugin_str = ('neutron.plugins.openvswitch.'
+ 'ovs_neutron_plugin.OVSNeutronPluginV2')
+ l3_plugin = None
def setUp(self):
self.dhcp_notifier_cls_p = mock.patch(
self.dhcp_notifier = mock.Mock(name='dhcp_notifier')
self.dhcp_notifier_cls = self.dhcp_notifier_cls_p.start()
self.dhcp_notifier_cls.return_value = self.dhcp_notifier
- super(OvsL3AgentNotifierTestCase, self).setUp()
+ # Save the global RESOURCE_ATTRIBUTE_MAP
+ self.saved_attr_map = {}
+ for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
+ self.saved_attr_map[resource] = attrs.copy()
+ if self.l3_plugin:
+ service_plugins = {'l3_plugin_name': self.l3_plugin}
+ else:
+ service_plugins = None
+ super(OvsL3AgentNotifierTestCase, self).setUp(
+ self.plugin_str, service_plugins=service_plugins)
+ ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
+ self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
+ self.adminContext = context.get_admin_context()
+ # Add the resources to the global attribute map
+ # This is done here as the setup process won't
+ # initialize the main API router which extends
+ # the global attribute map
+ attributes.RESOURCE_ATTRIBUTE_MAP.update(
+ agent.RESOURCE_ATTRIBUTE_MAP)
self.addCleanup(self.dhcp_notifier_cls_p.stop)
+ self.addCleanup(self.restore_attribute_map)
+
+ def restore_attribute_map(self):
+ # Restore the original RESOURCE_ATTRIBUTE_MAP
+ attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
def test_router_add_to_l3_agent_notification(self):
plugin = manager.NeutronManager.get_plugin()
from neutron.common.test_lib import test_config
from neutron import context
from neutron.db import agents_db
-from neutron.db import agentschedulers_db
+from neutron.db import l3_agentschedulers_db
from neutron.extensions import l3 as ext_l3
from neutron.extensions import metering as ext_metering
from neutron.openstack.common import uuidutils
def setUp(self):
service_plugins = {'metering_plugin_name': DB_METERING_PLUGIN_KLASS}
test_config['plugin_name_v2'] = ('neutron.tests.unit.test_l3_plugin.'
- 'TestL3NatPlugin')
+ 'TestL3NatIntPlugin')
ext_mgr = MeteringTestExtensionManager()
test_config['extension_manager'] = ext_mgr
super(TestMeteringPlugin, self).setUp(service_plugins=service_plugins)
topic=self.topic)
-class TestRoutePlugin(agentschedulers_db.L3AgentSchedulerDbMixin,
- test_l3_plugin.TestL3NatPlugin):
+class TestRouteIntPlugin(l3_agentschedulers_db.L3AgentSchedulerDbMixin,
+ test_l3_plugin.TestL3NatIntPlugin):
supported_extension_aliases = ["router", "l3_agent_scheduler"]
service_plugins = {'metering_plugin_name': DB_METERING_PLUGIN_KLASS}
plugin_str = ('neutron.tests.unit.services.metering.'
- 'test_metering_plugin.TestRoutePlugin')
+ 'test_metering_plugin.TestRouteIntPlugin')
test_config['plugin_name_v2'] = plugin_str
ext_mgr = MeteringTestExtensionManager()
from neutron.common import constants
from neutron.db import api as db_api
+from neutron.db import external_net_db
from neutron.db import l3_db
from neutron.db import l3_gwmode_db
from neutron.db import models_v2
# A simple class for making a concrete class out of the mixin
-class TestDbPlugin(test_l3_plugin.TestL3NatPlugin,
- l3_gwmode_db.L3_NAT_db_mixin):
+# for the case of a plugin that integrates l3 routing.
+class TestDbIntPlugin(test_l3_plugin.TestL3NatIntPlugin,
+ l3_gwmode_db.L3_NAT_db_mixin):
+
+ supported_extension_aliases = ["external-net", "router", "ext-gw-mode"]
+
+
+# A simple class for making a concrete class out of the mixin
+# for the case of a l3 router service plugin
+class TestDbSepPlugin(test_l3_plugin.TestL3NatServicePlugin,
+ l3_gwmode_db.L3_NAT_db_mixin):
supported_extension_aliases = ["router", "ext-gw-mode"]
super(TestL3GwModeMixin, self).setUp()
stubout_fixture = self.useFixture(StuboutFixture())
self.stubs = stubout_fixture.stubs
- self.target_object = TestDbPlugin()
+ self.target_object = TestDbIntPlugin()
# Patch the context
ctx_patcher = mock.patch('neutron.context', autospec=True)
mock_context = ctx_patcher.start()
tenant_id=self.tenant_id,
admin_state_up=True,
status=constants.NET_STATUS_ACTIVE)
- self.net_ext = l3_db.ExternalNetwork(network_id=self.ext_net_id)
+ self.net_ext = external_net_db.ExternalNetwork(
+ network_id=self.ext_net_id)
self.context.session.add(self.network)
# The following is to avoid complains from sqlite on
# foreign key violations
self.assertFalse(router.get('enable_snat'))
-class ExtGwModeTestCase(test_db_plugin.NeutronDbPluginV2TestCase,
- test_l3_plugin.L3NatTestCaseMixin):
+class ExtGwModeIntTestCase(test_db_plugin.NeutronDbPluginV2TestCase,
+ test_l3_plugin.L3NatTestCaseMixin):
- def setUp(self, plugin=None, ext_mgr=None):
+ def setUp(self, plugin=None, svc_plugins=None, ext_mgr=None):
# Store l3 resource attribute map as it will be updated
self._l3_attribute_map_bk = {}
for item in l3.RESOURCE_ATTRIBUTE_MAP:
self._l3_attribute_map_bk[item] = (
l3.RESOURCE_ATTRIBUTE_MAP[item].copy())
plugin = plugin or (
- 'neutron.tests.unit.test_extension_ext_gw_mode.TestDbPlugin')
+ 'neutron.tests.unit.test_extension_ext_gw_mode.TestDbIntPlugin')
# for these tests we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
ext_mgr = ext_mgr or TestExtensionManager()
- super(ExtGwModeTestCase, self).setUp(plugin=plugin,
- ext_mgr=ext_mgr)
+ super(ExtGwModeIntTestCase, self).setUp(plugin=plugin,
+ ext_mgr=ext_mgr,
+ service_plugins=svc_plugins)
self.addCleanup(self.restore_l3_attribute_map)
def restore_l3_attribute_map(self):
l3.RESOURCE_ATTRIBUTE_MAP = self._l3_attribute_map_bk
def tearDown(self):
- super(ExtGwModeTestCase, self).tearDown()
+ super(ExtGwModeIntTestCase, self).tearDown()
def _set_router_external_gateway(self, router_id, network_id,
snat_enabled=None,
def test_router_update_ext_gwinfo_with_invalid_snat_setting(self):
self._test_router_update_ext_gwinfo(
'xxx', None, expected_http_code=exc.HTTPBadRequest.code)
+
+
+class ExtGwModeSepTestCase(ExtGwModeIntTestCase):
+
+ def setUp(self, plugin=None):
+ # Store l3 resource attribute map as it will be updated
+ self._l3_attribute_map_bk = {}
+ for item in l3.RESOURCE_ATTRIBUTE_MAP:
+ self._l3_attribute_map_bk[item] = (
+ l3.RESOURCE_ATTRIBUTE_MAP[item].copy())
+ plugin = plugin or (
+ 'neutron.tests.unit.test_l3_plugin.TestNoL3NatPlugin')
+ # the L3 service plugin
+ l3_plugin = ('neutron.tests.unit.test_extension_ext_gw_mode.'
+ 'TestDbSepPlugin')
+ svc_plugins = {'l3_plugin_name': l3_plugin}
+ # for these tests we need to enable overlapping ips
+ cfg.CONF.set_default('allow_overlapping_ips', True)
+ super(ExtGwModeSepTestCase, self).setUp(plugin=plugin,
+ svc_plugins=svc_plugins)
+ self.addCleanup(self.restore_l3_attribute_map)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import itertools
+
+import testtools
+from webob import exc
+
+from neutron.common.test_lib import test_config
+from neutron import context
+from neutron.db import models_v2
+from neutron.extensions import external_net as external_net
+from neutron.manager import NeutronManager
+from neutron.openstack.common import log as logging
+from neutron.openstack.common import uuidutils
+from neutron.tests.unit import test_api_v2
+from neutron.tests.unit import test_db_plugin
+
+
+LOG = logging.getLogger(__name__)
+
+_uuid = uuidutils.generate_uuid
+_get_path = test_api_v2._get_path
+
+
+class ExtNetTestExtensionManager(object):
+
+ def get_resources(self):
+ return []
+
+ def get_actions(self):
+ return []
+
+ def get_request_extensions(self):
+ return []
+
+
+class ExtNetDBTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
+
+ def _create_network(self, fmt, name, admin_state_up, **kwargs):
+ """Override the routine for allowing the router:external attribute."""
+ # attributes containing a colon should be passed with
+ # a double underscore
+ new_args = dict(itertools.izip(map(lambda x: x.replace('__', ':'),
+ kwargs),
+ kwargs.values()))
+ arg_list = new_args.pop('arg_list', ()) + (external_net.EXTERNAL,)
+ return super(ExtNetDBTestCase, self)._create_network(
+ fmt, name, admin_state_up, arg_list=arg_list, **new_args)
+
+ def setUp(self):
+ test_config['plugin_name_v2'] = (
+ 'neutron.tests.unit.test_l3_plugin.TestNoL3NatPlugin')
+ ext_mgr = ExtNetTestExtensionManager()
+ test_config['extension_manager'] = ext_mgr
+ super(ExtNetDBTestCase, self).setUp()
+
+ def _set_net_external(self, net_id):
+ self._update('networks', net_id,
+ {'network': {external_net.EXTERNAL: True}})
+
+ def test_list_nets_external(self):
+ with self.network() as n1:
+ self._set_net_external(n1['network']['id'])
+ with self.network():
+ body = self._list('networks')
+ self.assertEqual(len(body['networks']), 2)
+
+ body = self._list('networks',
+ query_params="%s=True" %
+ external_net.EXTERNAL)
+ self.assertEqual(len(body['networks']), 1)
+
+ body = self._list('networks',
+ query_params="%s=False" %
+ external_net.EXTERNAL)
+ self.assertEqual(len(body['networks']), 1)
+
+ def test_list_nets_external_pagination(self):
+ if self._skip_native_pagination:
+ self.skipTest("Skip test for not implemented pagination feature")
+ with contextlib.nested(self.network(name='net1'),
+ self.network(name='net3')) as (n1, n3):
+ self._set_net_external(n1['network']['id'])
+ self._set_net_external(n3['network']['id'])
+ with self.network(name='net2') as n2:
+ self._test_list_with_pagination(
+ 'network', (n1, n3), ('name', 'asc'), 1, 3,
+ query_params='router:external=True')
+ self._test_list_with_pagination(
+ 'network', (n2, ), ('name', 'asc'), 1, 2,
+ query_params='router:external=False')
+
+ def test_get_network_succeeds_without_filter(self):
+ plugin = NeutronManager.get_plugin()
+ ctx = context.Context(None, None, is_admin=True)
+ result = plugin.get_networks(ctx, filters=None)
+ self.assertEqual(result, [])
+
+ def test_network_filter_hook_admin_context(self):
+ plugin = NeutronManager.get_plugin()
+ ctx = context.Context(None, None, is_admin=True)
+ model = models_v2.Network
+ conditions = plugin._network_filter_hook(ctx, model, [])
+ self.assertEqual(conditions, [])
+
+ def test_network_filter_hook_nonadmin_context(self):
+ plugin = NeutronManager.get_plugin()
+ ctx = context.Context('edinson', 'cavani')
+ model = models_v2.Network
+ txt = "externalnetworks.network_id IS NOT NULL"
+ conditions = plugin._network_filter_hook(ctx, model, [])
+ self.assertEqual(conditions.__str__(), txt)
+ # Try to concatenate conditions
+ conditions = plugin._network_filter_hook(ctx, model, conditions)
+ self.assertEqual(conditions.__str__(), "%s OR %s" % (txt, txt))
+
+ def test_create_port_external_network_non_admin_fails(self):
+ with self.network(router__external=True) as ext_net:
+ with self.subnet(network=ext_net) as ext_subnet:
+ with testtools.ExpectedException(
+ exc.HTTPClientError) as ctx_manager:
+ with self.port(subnet=ext_subnet,
+ set_context='True',
+ tenant_id='noadmin'):
+ pass
+ self.assertEqual(ctx_manager.exception.code, 403)
+
+ def test_create_port_external_network_admin_suceeds(self):
+ with self.network(router__external=True) as ext_net:
+ with self.subnet(network=ext_net) as ext_subnet:
+ with self.port(subnet=ext_subnet) as port:
+ self.assertEqual(port['port']['network_id'],
+ ext_net['network']['id'])
+
+ def test_create_external_network_non_admin_fails(self):
+ with testtools.ExpectedException(exc.HTTPClientError) as ctx_manager:
+ with self.network(router__external=True,
+ set_context='True',
+ tenant_id='noadmin'):
+ pass
+ self.assertEqual(ctx_manager.exception.code, 403)
+
+ def test_create_external_network_admin_suceeds(self):
+ with self.network(router__external=True) as ext_net:
+ self.assertEqual(ext_net['network'][external_net.EXTERNAL],
+ True)
+
+
+class ExtNetDBTestCaseXML(ExtNetDBTestCase):
+ fmt = 'xml'
return []
-# This plugin class is just for testing
-class TestExtraRoutePlugin(test_l3.TestL3NatPlugin,
- extraroute_db.ExtraRoute_db_mixin):
- supported_extension_aliases = ["router", "extraroute"]
+# This plugin class is for tests with plugin that integrates L3.
+class TestExtraRouteIntPlugin(test_l3.TestL3NatIntPlugin,
+ extraroute_db.ExtraRoute_db_mixin):
+ supported_extension_aliases = ["external-net", "router", "extraroute"]
-class ExtraRouteDBTestCase(test_l3.L3NatDBTestCase):
+# A fake l3 service plugin class with extra route capability for
+# plugins that delegate away L3 routing functionality
+class TestExtraRouteL3NatServicePlugin(test_l3.TestL3NatServicePlugin,
+ extraroute_db.ExtraRoute_db_mixin):
+ supported_extension_aliases = ["router", "extraroute"]
- def setUp(self, plugin=None):
- if not plugin:
- plugin = ('neutron.tests.unit.test_extension_extraroute.'
- 'TestExtraRoutePlugin')
- test_config['plugin_name_v2'] = plugin
- # for these tests we need to enable overlapping ips
- cfg.CONF.set_default('allow_overlapping_ips', True)
- cfg.CONF.set_default('max_routes', 3)
- ext_mgr = ExtraRouteTestExtensionManager()
- test_config['extension_manager'] = ext_mgr
- #L3NatDBTestCase will overwrite plugin_name_v2,
- #so we don't need to setUp on the class here
- super(test_l3.L3NatTestCaseBase, self).setUp()
-
- # Set to None to reload the drivers
- notifier_api._drivers = None
- cfg.CONF.set_override("notification_driver", [test_notifier.__name__])
+class ExtraRouteDBTestCaseBase(object):
def _routes_update_prepare(self, router_id, subnet_id,
port_id, routes, skip_add=False):
if not skip_add:
('name', 'asc'), 2, 2)
-class ExtraRouteDBTestCaseXML(ExtraRouteDBTestCase):
+class ExtraRouteDBIntTestCase(test_l3.L3NatDBIntTestCase,
+ ExtraRouteDBTestCaseBase):
+
+ def setUp(self, plugin=None):
+ if not plugin:
+ plugin = ('neutron.tests.unit.test_extension_extraroute.'
+ 'TestExtraRouteIntPlugin')
+ test_config['plugin_name_v2'] = plugin
+ # for these tests we need to enable overlapping ips
+ cfg.CONF.set_default('allow_overlapping_ips', True)
+ cfg.CONF.set_default('max_routes', 3)
+ ext_mgr = ExtraRouteTestExtensionManager()
+ test_config['extension_manager'] = ext_mgr
+ # L3NatDBIntTestCase will overwrite plugin_name_v2,
+ # so we don't need to setUp on the class here
+ super(test_l3.L3BaseForIntTests, self).setUp()
+
+ # Set to None to reload the drivers
+ notifier_api._drivers = None
+ cfg.CONF.set_override("notification_driver", [test_notifier.__name__])
+
+
+class ExtraRouteDBIntTestCaseXML(ExtraRouteDBIntTestCase):
+ fmt = 'xml'
+
+
+class ExtraRouteDBSepTestCase(test_l3.L3NatDBSepTestCase,
+ ExtraRouteDBTestCaseBase):
+ def setUp(self):
+ # the plugin without L3 support
+ test_config['plugin_name_v2'] = (
+ 'neutron.tests.unit.test_l3_plugin.TestNoL3NatPlugin')
+ # the L3 service plugin
+ l3_plugin = ('neutron.tests.unit.test_extension_extraroute.'
+ 'TestExtraRouteL3NatServicePlugin')
+ service_plugins = {'l3_plugin_name': l3_plugin}
+
+ # for these tests we need to enable overlapping ips
+ cfg.CONF.set_default('allow_overlapping_ips', True)
+ cfg.CONF.set_default('max_routes', 3)
+ ext_mgr = ExtraRouteTestExtensionManager()
+ test_config['extension_manager'] = ext_mgr
+ # L3NatDBSepTestCase will overwrite plugin_name_v2,
+ # so we don't need to setUp on the class here
+ super(test_l3.L3BaseForSepTests, self).setUp(
+ service_plugins=service_plugins)
+
+ # Set to None to reload the drivers
+ notifier_api._drivers = None
+ cfg.CONF.set_override("notification_driver", [test_notifier.__name__])
+
+
+class ExtraRouteDBSepTestCaseXML(ExtraRouteDBSepTestCase):
fmt = 'xml'
import contextlib
import copy
-import itertools
import mock
from oslo.config import cfg
from neutron.common import exceptions as q_exc
from neutron.common.test_lib import test_config
from neutron import context
+from neutron.db import api as qdbapi
from neutron.db import db_base_plugin_v2
+from neutron.db import external_net_db
from neutron.db import l3_db
-from neutron.db import models_v2
+from neutron.db import model_base
+from neutron.extensions import external_net
from neutron.extensions import l3
from neutron.manager import NeutronManager
from neutron.openstack.common import log as logging
from neutron.openstack.common.notifier import api as notifier_api
from neutron.openstack.common.notifier import test_notifier
from neutron.openstack.common import uuidutils
+from neutron.plugins.common import constants as service_constants
from neutron.tests.unit import test_api_v2
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import test_extensions
instances = self.plugin.return_value
instances._RouterPluginBase__native_pagination_support = True
instances._RouterPluginBase__native_sorting_support = True
- # Instantiate mock plugin and enable the 'router' extension
- NeutronManager.get_plugin().supported_extension_aliases = (
- ["router"])
+ # Enable the 'router' extension
+ instances.supported_extension_aliases = ["router"]
ext_mgr = L3TestExtensionManager()
self.ext_mdw = test_extensions.setup_extensions_middleware(ext_mgr)
self.api = webtest.TestApp(self.ext_mdw)
fmt = 'xml'
-# This plugin class is just for testing
-class TestL3NatPlugin(db_base_plugin_v2.NeutronDbPluginV2,
- l3_db.L3_NAT_db_mixin):
+# This base plugin class is for tests.
+class TestL3NatBasePlugin(db_base_plugin_v2.NeutronDbPluginV2,
+ external_net_db.External_net_db_mixin):
__native_pagination_support = True
__native_sorting_support = True
- supported_extension_aliases = ["router"]
-
def create_network(self, context, network):
session = context.session
with session.begin(subtransactions=True):
- net = super(TestL3NatPlugin, self).create_network(context,
- network)
+ net = super(TestL3NatBasePlugin, self).create_network(context,
+ network)
self._process_l3_create(context, net, network['network'])
return net
session = context.session
with session.begin(subtransactions=True):
- net = super(TestL3NatPlugin, self).update_network(context, id,
- network)
+ net = super(TestL3NatBasePlugin, self).update_network(context, id,
+ network)
self._process_l3_update(context, net, network['network'])
return net
def delete_port(self, context, id, l3_port_check=True):
- if l3_port_check:
- self.prevent_l3_port_deletion(context, id)
- self.disassociate_floatingips(context, id)
- return super(TestL3NatPlugin, self).delete_port(context, id)
+ plugin = NeutronManager.get_service_plugins().get(
+ service_constants.L3_ROUTER_NAT)
+ if plugin:
+ if l3_port_check:
+ plugin.prevent_l3_port_deletion(context, id)
+ plugin.disassociate_floatingips(context, id)
+ return super(TestL3NatBasePlugin, self).delete_port(context, id)
-class L3NatTestCaseMixin(object):
+# This plugin class is for tests with plugin that integrates L3.
+class TestL3NatIntPlugin(TestL3NatBasePlugin,
+ l3_db.L3_NAT_db_mixin):
+
+ supported_extension_aliases = ["external-net", "router"]
+
+
+# This plugin class is for tests with plugin not supporting L3.
+class TestNoL3NatPlugin(TestL3NatBasePlugin):
+
+ __native_pagination_support = True
+ __native_sorting_support = True
+
+ supported_extension_aliases = ["external-net"]
+
- def _create_network(self, fmt, name, admin_state_up, **kwargs):
- """Override the routine for allowing the router:external attribute."""
- # attributes containing a colon should be passed with
- # a double underscore
- new_args = dict(itertools.izip(map(lambda x: x.replace('__', ':'),
- kwargs),
- kwargs.values()))
- arg_list = new_args.pop('arg_list', ()) + (l3.EXTERNAL,)
- return super(L3NatTestCaseMixin, self)._create_network(
- fmt, name, admin_state_up, arg_list=arg_list, **new_args)
+# A L3 routing service plugin class for tests with plugins that
+# delegate away L3 routing functionality
+class TestL3NatServicePlugin(db_base_plugin_v2.CommonDbMixin,
+ l3_db.L3_NAT_db_mixin):
+
+ supported_extension_aliases = ["router"]
+
+ def __init__(self):
+ qdbapi.register_models(base=model_base.BASEV2)
+
+ def get_plugin_type(self):
+ return service_constants.L3_ROUTER_NAT
+
+ def get_plugin_description(self):
+ return "L3 Routing Service Plugin for testing"
+
+
+class L3NatTestCaseMixin(object):
def _create_router(self, fmt, tenant_id, name=None,
admin_state_up=None, set_context=False,
def _set_net_external(self, net_id):
self._update('networks', net_id,
- {'network': {l3.EXTERNAL: True}})
+ {'network': {external_net.EXTERNAL: True}})
def _create_floatingip(self, fmt, network_id, port_id=None,
fixed_ip=None, set_context=False):
public_sub['subnet']['network_id'])
-class L3NatTestCaseBase(L3NatTestCaseMixin,
- test_db_plugin.NeutronDbPluginV2TestCase):
-
- def setUp(self, plugin=None, ext_mgr=None,
- service_plugins=None):
- test_config['plugin_name_v2'] = (
- 'neutron.tests.unit.test_l3_plugin.TestL3NatPlugin')
- # for these tests we need to enable overlapping ips
- cfg.CONF.set_default('allow_overlapping_ips', True)
- ext_mgr = ext_mgr or L3TestExtensionManager()
- super(L3NatTestCaseBase, self).setUp(
- plugin=plugin, ext_mgr=ext_mgr,
- service_plugins=service_plugins)
-
- # Set to None to reload the drivers
- notifier_api._drivers = None
- cfg.CONF.set_override("notification_driver", [test_notifier.__name__])
-
- def tearDown(self):
- test_notifier.NOTIFICATIONS = []
- super(L3NatTestCaseBase, self).tearDown()
-
-
-class L3NatDBTestCase(L3NatTestCaseBase):
+class L3NatTestCaseBase(L3NatTestCaseMixin):
def test_router_create(self):
name = 'router1'
r['router']['id'],
s1['subnet']['network_id'])
self._update('networks', s1['subnet']['network_id'],
- {'network': {'router:external': False}},
+ {'network': {external_net.EXTERNAL: False}},
expected_code=exc.HTTPConflict.code)
self._remove_external_gateway_from_router(
r['router']['id'],
r['router']['id'],
s1['subnet']['network_id'])
self._update('networks', testnet['network']['id'],
- {'network': {'router:external': False}})
+ {'network': {external_net.EXTERNAL: False}})
self._remove_external_gateway_from_router(
r['router']['id'],
s1['subnet']['network_id'])
break
self.assertTrue(found)
- def test_list_nets_external(self):
- with self.network() as n1:
- self._set_net_external(n1['network']['id'])
- with self.network():
- body = self._list('networks')
- self.assertEqual(len(body['networks']), 2)
-
- body = self._list('networks',
- query_params="%s=True" % l3.EXTERNAL)
- self.assertEqual(len(body['networks']), 1)
-
- body = self._list('networks',
- query_params="%s=False" % l3.EXTERNAL)
- self.assertEqual(len(body['networks']), 1)
-
- def test_list_nets_external_pagination(self):
- if self._skip_native_pagination:
- self.skipTest("Skip test for not implemented pagination feature")
- with contextlib.nested(self.network(name='net1'),
- self.network(name='net3')) as (n1, n3):
- self._set_net_external(n1['network']['id'])
- self._set_net_external(n3['network']['id'])
- with self.network(name='net2') as n2:
- self._test_list_with_pagination(
- 'network', (n1, n3), ('name', 'asc'), 1, 3,
- query_params='router:external=True')
- self._test_list_with_pagination(
- 'network', (n2, ), ('name', 'asc'), 1, 2,
- query_params='router:external=False')
-
- def test_get_network_succeeds_without_filter(self):
- plugin = NeutronManager.get_plugin()
- ctx = context.Context(None, None, is_admin=True)
- result = plugin.get_networks(ctx, filters=None)
- self.assertEqual(result, [])
-
- def test_network_filter_hook_admin_context(self):
- plugin = NeutronManager.get_plugin()
- ctx = context.Context(None, None, is_admin=True)
- model = models_v2.Network
- conditions = plugin._network_filter_hook(ctx, model, [])
- self.assertEqual(conditions, [])
-
- def test_network_filter_hook_nonadmin_context(self):
- plugin = NeutronManager.get_plugin()
- ctx = context.Context('edinson', 'cavani')
- model = models_v2.Network
- txt = "externalnetworks.network_id IS NOT NULL"
- conditions = plugin._network_filter_hook(ctx, model, [])
- self.assertEqual(conditions.__str__(), txt)
- # Try to concatenate confitions
- conditions = plugin._network_filter_hook(ctx, model, conditions)
- self.assertEqual(conditions.__str__(), "%s OR %s" % (txt, txt))
-
- def test_create_port_external_network_non_admin_fails(self):
- with self.network(router__external=True) as ext_net:
- with self.subnet(network=ext_net) as ext_subnet:
- with testlib_api.ExpectedException(
- exc.HTTPClientError) as ctx_manager:
- with self.port(subnet=ext_subnet,
- set_context='True',
- tenant_id='noadmin'):
- pass
- self.assertEqual(ctx_manager.exception.code, 403)
-
- def test_create_port_external_network_admin_suceeds(self):
- with self.network(router__external=True) as ext_net:
- with self.subnet(network=ext_net) as ext_subnet:
- with self.port(subnet=ext_subnet) as port:
- self.assertEqual(port['port']['network_id'],
- ext_net['network']['id'])
-
- def test_create_external_network_non_admin_fails(self):
- with testlib_api.ExpectedException(exc.HTTPClientError) as ctx_manager:
- with self.network(router__external=True,
- set_context='True',
- tenant_id='noadmin'):
- pass
- self.assertEqual(ctx_manager.exception.code, 403)
-
- def test_create_external_network_admin_suceeds(self):
- with self.network(router__external=True) as ext_net:
- self.assertEqual(ext_net['network'][l3.EXTERNAL],
- True)
-
def test_router_delete_subnet_inuse_returns_409(self):
with self.router() as r:
with self.subnet() as s:
None)
-class L3AgentDbTestCase(L3NatTestCaseBase):
- """Unit tests for methods called by the L3 agent."""
+class L3AgentDbTestCaseBase(L3NatTestCaseMixin):
- def setUp(self):
- self.plugin = TestL3NatPlugin()
- super(L3AgentDbTestCase, self).setUp()
+ """Unit tests for methods called by the L3 agent."""
def test_l3_agent_routers_query_interfaces(self):
with self.router() as r:
{'ip_address': '9.0.1.5',
'subnet_id': subnet['subnet']['id']}]}}
ctx = context.get_admin_context()
- self.plugin.update_port(ctx, p['port']['id'], port)
+ self.core_plugin.update_port(ctx, p['port']['id'], port)
routers = self.plugin.get_sync_data(ctx, None)
self.assertEqual(1, len(routers))
interfaces = routers[0].get(l3_constants.INTERFACE_KEY, [])
def _test_notify_op_agent(self, target_func, *args):
l3_rpc_agent_api_str = (
'neutron.api.rpc.agentnotifiers.l3_rpc_agent_api.L3AgentNotifyAPI')
- plugin = NeutronManager.get_plugin()
+ plugin = NeutronManager.get_service_plugins()[
+ service_constants.L3_ROUTER_NAT]
oldNotify = plugin.l3_rpc_notifier
try:
with mock.patch(l3_rpc_agent_api_str) as notifyApi:
self._test_notify_op_agent(self._test_floatingips_op_agent)
-class L3NatDBTestCaseXML(L3NatDBTestCase):
+class L3BaseForIntTests(test_db_plugin.NeutronDbPluginV2TestCase):
+
+ def setUp(self, plugin=None, ext_mgr=None):
+ test_config['plugin_name_v2'] = (
+ 'neutron.tests.unit.test_l3_plugin.TestL3NatIntPlugin')
+ # for these tests we need to enable overlapping ips
+ cfg.CONF.set_default('allow_overlapping_ips', True)
+ ext_mgr = ext_mgr or L3TestExtensionManager()
+ test_config['extension_manager'] = ext_mgr
+ super(L3BaseForIntTests, self).setUp(plugin=plugin, ext_mgr=ext_mgr)
+
+ # Set to None to reload the drivers
+ notifier_api._drivers = None
+ cfg.CONF.set_override("notification_driver", [test_notifier.__name__])
+
+ def tearDown(self):
+ test_notifier.NOTIFICATIONS = []
+ del test_config['extension_manager']
+ super(L3BaseForIntTests, self).tearDown()
+
+
+class L3BaseForSepTests(test_db_plugin.NeutronDbPluginV2TestCase):
+
+ def setUp(self):
+ # the plugin without L3 support
+ test_config['plugin_name_v2'] = (
+ 'neutron.tests.unit.test_l3_plugin.TestNoL3NatPlugin')
+ # the L3 service plugin
+ l3_plugin = ('neutron.tests.unit.test_l3_plugin.'
+ 'TestL3NatServicePlugin')
+ service_plugins = {'l3_plugin_name': l3_plugin}
+
+ # for these tests we need to enable overlapping ips
+ cfg.CONF.set_default('allow_overlapping_ips', True)
+ ext_mgr = L3TestExtensionManager()
+ test_config['extension_manager'] = ext_mgr
+ super(L3BaseForSepTests, self).setUp(service_plugins=service_plugins)
+
+ # Set to None to reload the drivers
+ notifier_api._drivers = None
+ cfg.CONF.set_override("notification_driver", [test_notifier.__name__])
+
+ def tearDown(self):
+ test_notifier.NOTIFICATIONS = []
+ del test_config['extension_manager']
+ super(L3BaseForSepTests, self).tearDown()
+
+
+class L3AgentDbIntTestCase(L3BaseForIntTests, L3AgentDbTestCaseBase):
+
+ """Unit tests for methods called by the L3 agent for
+ the case where core plugin implements L3 routing.
+ """
+
+ def setUp(self):
+ self.core_plugin = TestL3NatIntPlugin()
+ # core plugin is also plugin providing L3 routing
+ self.plugin = self.core_plugin
+ super(L3AgentDbIntTestCase, self).setUp()
+
+
+class L3AgentDbSepTestCase(L3BaseForSepTests, L3AgentDbTestCaseBase):
+
+ """Unit tests for methods called by the L3 agent for the
+ case where separate service plugin implements L3 routing.
+ """
+
+ def setUp(self):
+ self.core_plugin = TestNoL3NatPlugin()
+ # core plugin is also plugin providing L3 routing
+ self.plugin = TestL3NatServicePlugin()
+ super(L3AgentDbSepTestCase, self).setUp()
+
+
+class L3NatDBIntTestCase(L3BaseForIntTests, L3NatTestCaseBase):
+
+ """Unit tests for core plugin with L3 routing integrated."""
+ pass
+
+
+class L3NatDBSepTestCase(L3BaseForSepTests, L3NatTestCaseBase):
+
+ """Unit tests for a separate L3 routing service plugin."""
+ pass
+
+
+class L3NatDBIntTestCaseXML(L3NatDBIntTestCase):
+ fmt = 'xml'
+
+
+class L3NatDBSepTestCaseXML(L3NatDBSepTestCase):
fmt = 'xml'
db_base_plugin_v2.NeutronDbPluginV2):
supported_extension_aliases = [
- "router", "router-service-type", "routed-service-insertion",
- "service-type", "lbaas"
+ "router", "router-service-type",
+ "routed-service-insertion", "service-type", "lbaas"
]
def create_router(self, context, router):
node.set(constants.ATOM_XMLNS, constants.ATOM_NAMESPACE)
node.set(constants.XSI_NIL_ATTR, constants.XSI_NAMESPACE)
ext_ns = self.metadata.get(constants.EXT_NS, {})
+ ext_ns_bc = self.metadata.get(constants.EXT_NS_COMP, {})
for prefix in used_prefixes:
if prefix in ext_ns:
node.set('xmlns:' + prefix, ext_ns[prefix])
+ if prefix in ext_ns_bc:
+ node.set('xmlns:' + prefix, ext_ns_bc[prefix])
def _to_xml_node(self, parent, metadata, nodename, data, used_prefixes):
"""Recursive method to convert data members to XML nodes."""
for prefix, _ns in ext_ns.items():
if ns == _ns:
return prefix + ":" + bare_tag
+ ext_ns_bc = self.metadata.get(constants.EXT_NS_COMP, {})
+ for prefix, _ns in ext_ns_bc.items():
+ if ns == _ns:
+ return prefix + ":" + bare_tag
else:
return tag