"update_agent": "rule:admin_only",
"delete_agent": "rule:admin_only",
"get_agent": "rule:admin_only",
- "get_agents": "rule:admin_only"
+ "get_agents": "rule:admin_only",
+
+ "create_dhcp-network": "rule:admin_only",
+ "delete_dhcp-network": "rule:admin_only",
+ "get_dhcp-networks": "rule:admin_only",
+ "create_l3-router": "rule:admin_only",
+ "delete_l3-router": "rule:admin_only",
+ "get_l3-routers": "rule:admin_only",
+ "get_dhcp-agents": "rule:admin_only",
+ "get_l3-agents": "rule:admin_only"
}
# Maximum number of fixed ips per port
# max_fixed_ips_per_port = 5
+# =========== items for agent management extension =============
+# Seconds to regard the agent as down.
+# agent_down_time = 5
+# =========== end of items for agent management extension =====
+
+# =========== items for agent scheduler extension =============
+# Driver to use for scheduling network to DHCP agent
+# network_scheduler_driver = quantum.scheduler.dhcp_agent_scheduler.ChanceScheduler
+# Driver to use for scheduling router to a default L3 agent
+# router_scheduler_driver = quantum.scheduler.l3_agent_scheduler.ChanceScheduler
+
+# Allow auto scheduling networks to DHCP agent. It will schedule non-hosted
+# networks to first DHCP agent which sends get_active_networks message to
+# quantum server
+# network_auto_schedule = True
+
+# Allow auto scheduling routers to L3 agent. It will schedule non-hosted
+# routers to first L3 agent which sends sync_routers message to quantum server
+# router_auto_schedule = True
+# =========== end of items for agent scheduler extension =====
+
[QUOTAS]
# resource name(s) that are supported in quota features
# quota_items = network,subnet,port
# default driver to use for quota checks
# quota_driver = quantum.quota.ConfDriver
-# =========== items for agent management extension =============
-# Seconds to regard the agent as down.
-# agent_down_time = 5
-# =========== end of items for agent management extension =====
-
[DEFAULT_SERVICETYPE]
# Description of the default service type (optional)
# description = "default service type"
super(DhcpPluginApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.context = context
- self.host = socket.gethostname()
+ self.host = cfg.CONF.host
def get_active_networks(self):
"""Make a remote process call to retrieve the active networks."""
if self.agent_state.pop('start_flag', None):
self.run()
+ def agent_updated(self, context, payload):
+ """Handle the agent_updated notification event."""
+ self.needs_resync = True
+ LOG.info(_("agent_updated by server side %s!"), payload)
+
def after_start(self):
LOG.info(_("DHCP agent started"))
class RouterInfo(object):
- def __init__(self, router_id, root_helper, use_namespaces, router=None):
+ def __init__(self, router_id, root_helper, use_namespaces, router):
self.router_id = router_id
self.ex_gw_port = None
self.internal_ports = []
else:
raise
- def _router_added(self, router_id, router=None):
+ def _router_added(self, router_id, router):
ri = RouterInfo(router_id, self.root_helper,
self.conf.use_namespaces, router)
self.router_info[router_id] = ri
def _router_removed(self, router_id):
ri = self.router_info[router_id]
+ ri.router['gw_port'] = None
+ ri.router[l3_constants.INTERFACE_KEY] = []
+ ri.router[l3_constants.FLOATINGIP_KEY] = []
+ self.process_router(ri)
for c, r in self.metadata_filter_rules():
ri.iptables_manager.ipv4['filter'].remove_rule(c, r)
for c, r in self.metadata_nat_rules():
LOG.debug(msg)
self.fullsync = True
- def _process_routers(self, routers):
+ def router_removed_from_agent(self, context, payload):
+ self.router_deleted(context, payload['router_id'])
+
+ def router_added_to_agent(self, context, payload):
+ self.routers_updated(context, payload)
+
+ def _process_routers(self, routers, all_routers=False):
if (self.conf.external_network_bridge and
not ip_lib.device_exists(self.conf.external_network_bridge)):
LOG.error(_("The external network bridge '%s' does not exist"),
return
target_ex_net_id = self._fetch_external_net_id()
-
+ # if routers are all the routers we have (They are from router sync on
+ # starting or when error occurs during running), we seek the
+ # routers which should be removed.
+ # If routers are from server side notification, we seek them
+ # from subset of incoming routers and ones we have now.
+ if all_routers:
+ prev_router_ids = set(self.router_info)
+ else:
+ prev_router_ids = set(self.router_info) & set(
+ [router['id'] for router in routers])
+ cur_router_ids = set()
for r in routers:
if not r['admin_state_up']:
continue
if ex_net_id and ex_net_id != target_ex_net_id:
continue
-
+ cur_router_ids.add(r['id'])
if r['id'] not in self.router_info:
- self._router_added(r['id'])
-
+ self._router_added(r['id'], r)
ri = self.router_info[r['id']]
ri.router = r
self.process_router(ri)
+ # identify and remove routers that no longer exist
+ for router_id in prev_router_ids - cur_router_ids:
+ self._router_removed(router_id)
@periodic_task.periodic_task
def _sync_routers_task(self, context):
router_id = None
routers = self.plugin_rpc.get_routers(
context, router_id)
- self.router_info = {}
- self._process_routers(routers)
+ self._process_routers(routers, all_routers=True)
self.fullsync = False
except Exception:
LOG.exception(_("Failed synchronizing routers"))
except Exception:
LOG.exception(_("Failed reporting state!"))
+ def agent_updated(self, context, payload):
+ """Handle the agent_updated notification event."""
+ self.fullsync = True
+ LOG.info(_("agent_updated by server side %s!"), payload)
+
def main():
eventlet.monkey_patch()
# See the License for the specific language governing permissions and
# limitations under the License.
+from quantum.common import constants
from quantum.common import topics
+from quantum.common import utils
+from quantum import manager
from quantum.openstack.common import log as logging
from quantum.openstack.common.rpc import proxy
super(DhcpAgentNotifyAPI, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
- def _notification(self, context, method, payload):
+ def _get_dhcp_agents(self, context, network_id):
+ plugin = manager.QuantumManager.get_plugin()
+ dhcp_agents = plugin.get_dhcp_agents_hosting_networks(
+ context, [network_id], active=True)
+ return [(dhcp_agent.host, dhcp_agent.topic) for
+ dhcp_agent in dhcp_agents]
+
+ def _notification_host(self, context, method, payload, host):
+ """Notify the agent on host"""
+ self.cast(
+ context, self.make_msg(method,
+ payload=payload),
+ topic='%s.%s' % (topics.DHCP_AGENT, host))
+
+ def _notification(self, context, method, payload, network_id):
"""Notify all the agents that are hosting the network"""
- # By now, we have no scheduling feature, so we fanout
- # to all of the DHCP agents
- self._notification_fanout(context, method, payload)
+ plugin = manager.QuantumManager.get_plugin()
+ if (method != 'network_delete_end' and utils.is_extension_supported(
+ plugin, constants.AGENT_SCHEDULER_EXT_ALIAS)):
+ for (host, topic) in self._get_dhcp_agents(context, network_id):
+ self.cast(
+ context, self.make_msg(method,
+ payload=payload),
+ topic='%s.%s' % (topic, host))
+ else:
+ # besides the non-agentscheduler plugin,
+ # There is no way to query who is hosting the network
+ # when the network is deleted, so we need to fanout
+ self._notification_fanout(context, method, payload)
def _notification_fanout(self, context, method, payload):
"""Fanout the payload to all dhcp agents"""
payload=payload),
topic=topics.DHCP_AGENT)
+ def network_removed_from_agent(self, context, network_id, host):
+ self._notification_host(context, 'network_delete_end',
+ {'network_id': network_id}, host)
+
+ def network_added_to_agent(self, context, network_id, host):
+ self._notification_host(context, 'network_create_end',
+ {'network': {'id': network_id}}, host)
+
+ def agent_updated(self, context, admin_state_up, host):
+ self._notification_host(context, 'agent_updated',
+ {'admin_state_up': admin_state_up},
+ host)
+
def notify(self, context, data, methodname):
# data is {'key' : 'value'} with only one key
if methodname not in self.VALID_METHOD_NAMES:
if obj_type not in self.VALID_RESOURCES:
return
obj_value = data[obj_type]
+ network_id = None
+ if obj_type == 'network' and 'id' in obj_value:
+ network_id = obj_value['id']
+ elif obj_type in ['port', 'subnet'] and 'network_id' in obj_value:
+ network_id = obj_value['network_id']
+ if not network_id:
+ return
methodname = methodname.replace(".", "_")
if methodname.endswith("_delete_end"):
if 'id' in obj_value:
self._notification(context, methodname,
- {obj_type + '_id': obj_value['id']})
+ {obj_type + '_id': obj_value['id']},
+ network_id)
else:
- self._notification(context, methodname, data)
+ self._notification(context, methodname, data, network_id)
--- /dev/null
+# Copyright (c) 2013 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from quantum.common import constants
+from quantum.common import topics
+from quantum.common import utils
+from quantum import manager
+from quantum.openstack.common import log as logging
+from quantum.openstack.common.rpc import proxy
+
+
+LOG = logging.getLogger(__name__)
+
+
+class L3AgentNotifyAPI(proxy.RpcProxy):
+ """API for plugin to notify L3 agent."""
+ BASE_RPC_API_VERSION = '1.0'
+
+ def __init__(self, topic=topics.L3_AGENT):
+ super(L3AgentNotifyAPI, self).__init__(
+ topic=topic, default_version=self.BASE_RPC_API_VERSION)
+
+ def _notification_host(self, context, method, payload, host):
+ """Notify the agent that is hosting the router"""
+ LOG.debug(_('Nofity agent at %(host)s the message '
+ '%(method)s'), {'host': host,
+ 'method': method})
+ self.cast(
+ context, self.make_msg(method,
+ payload=payload),
+ topic='%s.%s' % (topics.L3_AGENT, host))
+
+ def _agent_notification(self, context, method, routers,
+ operation, data):
+ """Notify changed routers to hosting l3 agents.
+
+ Adjust routers according to l3 agents' role and
+ related dhcp agents.
+ Notify dhcp agent to get right subnet's gateway ips.
+ """
+ adminContext = context.is_admin and context or context.elevated()
+ plugin = manager.QuantumManager.get_plugin()
+ for router in routers:
+ l3_agents = plugin.get_l3_agents_hosting_routers(
+ adminContext, [router['id']],
+ admin_state_up=True,
+ active=True)
+ for l3_agent in l3_agents:
+ LOG.debug(_('Notify agent at %(topic)s.%(host)s the message '
+ '%(method)s'),
+ {'topic': l3_agent.topic,
+ 'host': l3_agent.host,
+ 'method': method})
+ self.cast(
+ context, self.make_msg(method,
+ routers=[router]),
+ topic='%s.%s' % (l3_agent.topic, l3_agent.host))
+
+ def _notification(self, context, method, routers, operation, data):
+ """Notify all the agents that are hosting the routers"""
+ plugin = manager.QuantumManager.get_plugin()
+ if utils.is_extension_supported(
+ plugin, constants.AGENT_SCHEDULER_EXT_ALIAS):
+ adminContext = (context.is_admin and
+ context or context.elevated())
+ plugin.schedule_routers(adminContext, routers)
+ self._agent_notification(
+ context, method, routers, operation, data)
+ else:
+ self.fanout_cast(
+ context, self.make_msg(method,
+ routers=routers),
+ topic=topics.L3_AGENT)
+
+ def _notification_fanout(self, context, method, router_id):
+ """Fanout the deleted router to all L3 agents"""
+ LOG.debug(_('Fanout notify agent at %(topic)s the message '
+ '%(method)s on router %(router_id)s'),
+ {'topic': topics.DHCP_AGENT,
+ 'method': method,
+ 'router_id': router_id})
+ self.fanout_cast(
+ context, self.make_msg(method,
+ router_id=router_id),
+ topic=topics.L3_AGENT)
+
+ def agent_updated(self, context, admin_state_up, host):
+ self._notification_host(context, 'agent_updated',
+ {'admin_state_up': admin_state_up},
+ host)
+
+ def router_deleted(self, context, router_id):
+ self._notification_fanout(context, 'router_deleted', router_id)
+
+ def routers_updated(self, context, routers, operation=None, data=None):
+ if routers:
+ self._notification(context, 'routers_updated', routers,
+ operation, data)
+
+ def router_removed_from_agent(self, context, router_id, host):
+ self._notification_host(context, 'router_removed_from_agent',
+ {'router_id': router_id}, host)
+
+ def router_added_to_agent(self, context, routers, host):
+ self._notification_host(context, 'router_added_to_agent',
+ routers, host)
+
+L3AgentNotify = L3AgentNotifyAPI()
SORT_DIRECTION_ASC = 'asc'
SORT_DIRECTION_DESC = 'desc'
+
+AGENT_SCHEDULER_EXT_ALIAS = 'agent_scheduler'
added = new_set - old_set
removed = old_set - new_set
return [str2dict(a) for a in added], [str2dict(r) for r in removed]
+
+
+def is_extension_supported(plugin, ext_alias):
+ return ext_alias in getattr(
+ plugin, "supported_extension_aliases", [])
raise ext_agent.AgentNotFound(id=id)
return agent
- def _is_agent_down(self, heart_beat_time_str):
- return timeutils.is_older_than(heart_beat_time_str,
+ @classmethod
+ def is_agent_down(cls, heart_beat_time):
+ return timeutils.is_older_than(heart_beat_time,
cfg.CONF.agent_down_time)
+ def get_configuration_dict(self, agent_db):
+ try:
+ conf = jsonutils.loads(agent_db.configurations)
+ except Exception:
+ msg = _('Configuration for agent %(agent_type)s on host %(host)s'
+ ' is invalid.')
+ LOG.warn(msg, {'agent_type': agent_db.agent_type,
+ 'host': agent_db.host})
+ conf = {}
+ return conf
+
def _make_agent_dict(self, agent, fields=None):
attr = ext_agent.RESOURCE_ATTRIBUTE_MAP.get(
ext_agent.RESOURCE_NAME + 's')
res = dict((k, agent[k]) for k in attr
if k not in ['alive', 'configurations'])
- res['alive'] = not self._is_agent_down(res['heartbeat_timestamp'])
- try:
- res['configurations'] = jsonutils.loads(agent['configurations'])
- except Exception:
- msg = _('Configurations for agent %(agent_type)s on host %(host)s'
- ' are invalid.')
- LOG.warn(msg, {'agent_type': res['agent_type'],
- 'host': res['host']})
- res['configurations'] = {}
+ res['alive'] = not AgentDbMixin.is_agent_down(
+ res['heartbeat_timestamp'])
+ res['configurations'] = self.get_configuration_dict(agent)
return self._fields(res, fields)
def delete_agent(self, context, id):
agent.update(agent_data)
return self._make_agent_dict(agent)
+ def get_agents_db(self, context, filters=None):
+ query = self._get_collection_query(context, Agent, filters=filters)
+ return query.all()
+
def get_agents(self, context, filters=None, fields=None):
return self._get_collection(context, Agent,
self._make_agent_dict,
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+
+import sqlalchemy as sa
+from sqlalchemy import orm
+from sqlalchemy.orm import exc
+from sqlalchemy.orm import joinedload
+
+from quantum.api.v2 import attributes
+from quantum.common import constants
+from quantum.db import agents_db
+from quantum.db import model_base
+from quantum.db import models_v2
+from quantum.extensions import agentscheduler
+from quantum.openstack.common import log as logging
+from quantum.openstack.common import uuidutils
+
+
+LOG = logging.getLogger(__name__)
+
+
+class NetworkDhcpAgentBinding(model_base.BASEV2):
+ """Represents binding between quantum networks and DHCP agents"""
+ network_id = sa.Column(sa.String(36),
+ sa.ForeignKey("networks.id", ondelete='CASCADE'),
+ primary_key=True)
+ dhcp_agent = orm.relation(agents_db.Agent)
+ dhcp_agent_id = sa.Column(sa.String(36),
+ sa.ForeignKey("agents.id",
+ ondelete='CASCADE'),
+ primary_key=True)
+
+
+class RouterL3AgentBinding(model_base.BASEV2, models_v2.HasId):
+ """Represents binding between quantum routers and L3 agents"""
+ router_id = sa.Column(sa.String(36),
+ sa.ForeignKey("routers.id", ondelete='CASCADE'))
+ l3_agent = orm.relation(agents_db.Agent)
+ l3_agent_id = sa.Column(sa.String(36),
+ sa.ForeignKey("agents.id",
+ ondelete='CASCADE'))
+
+
+class AgentSchedulerDbMixin(agentscheduler.AgentSchedulerPluginBase):
+ """Mixin class to add agent scheduler extension to db_plugin_base_v2."""
+
+ dhcp_agent_notifier = None
+ l3_agent_notifier = None
+ network_scheduler = None
+ router_scheduler = None
+
+ def get_dhcp_agents_hosting_networks(
+ self, context, network_ids, active=None):
+ if not network_ids:
+ return []
+ query = context.session.query(NetworkDhcpAgentBinding)
+ query = query.options(joinedload('dhcp_agent'))
+ if len(network_ids) == 1:
+ query = query.filter(
+ NetworkDhcpAgentBinding.network_id == network_ids[0])
+ elif network_ids:
+ query = query.filter(
+ NetworkDhcpAgentBinding.network_id in network_ids)
+ if active is not None:
+ query = (query.filter(agents_db.Agent.admin_state_up == active))
+ dhcp_agents = [binding.dhcp_agent for binding in query.all()]
+ if active is not None:
+ dhcp_agents = [dhcp_agent for dhcp_agent in
+ dhcp_agents if not
+ agents_db.AgentDbMixin.is_agent_down(
+ dhcp_agent['heartbeat_timestamp'])]
+ return dhcp_agents
+
+ def add_network_to_dhcp_agent(self, context, id, network_id):
+ self._get_network(context, network_id)
+ with context.session.begin(subtransactions=True):
+ agent_db = self._get_agent(context, id)
+ if (agent_db['agent_type'] != constants.AGENT_TYPE_DHCP or
+ not agent_db['admin_state_up']):
+ raise agentscheduler.InvalidDHCPAgent(id=id)
+ dhcp_agents = self.get_dhcp_agents_hosting_networks(
+ context, [network_id])
+ for dhcp_agent in dhcp_agents:
+ if id == dhcp_agent.id:
+ raise agentscheduler.NetworkHostedByDHCPAgent(
+ network_id=network_id, agent_id=id)
+ binding = NetworkDhcpAgentBinding()
+ binding.dhcp_agent_id = id
+ binding.network_id = network_id
+ context.session.add(binding)
+ if self.dhcp_agent_notifier:
+ self.dhcp_agent_notifier.network_added_to_agent(
+ context, network_id, agent_db.host)
+
+ def remove_network_from_dhcp_agent(self, context, id, network_id):
+ agent = self._get_agent(context, id)
+ with context.session.begin(subtransactions=True):
+ try:
+ query = context.session.query(NetworkDhcpAgentBinding)
+ binding = query.filter(
+ NetworkDhcpAgentBinding.network_id == network_id,
+ NetworkDhcpAgentBinding.dhcp_agent_id == id).one()
+ except exc.NoResultFound:
+ raise agentscheduler.NetworkNotHostedByDhcpAgent(
+ network_id=network_id, agent_id=id)
+ context.session.delete(binding)
+ if self.dhcp_agent_notifier:
+ self.dhcp_agent_notifier.network_removed_from_agent(
+ context, network_id, agent.host)
+
+ def list_networks_on_dhcp_agent(self, context, id):
+ query = context.session.query(NetworkDhcpAgentBinding.network_id)
+ net_ids = query.filter(
+ NetworkDhcpAgentBinding.dhcp_agent_id == id).all()
+ if net_ids:
+ _ids = [item[0] for item in net_ids]
+ return {'networks':
+ self.get_networks(context, filters={'id': _ids})}
+ else:
+ return {'networks': []}
+
+ def list_active_networks_on_active_dhcp_agent(self, context, host):
+ agent = self._get_agent_by_type_and_host(
+ context, constants.AGENT_TYPE_DHCP, host)
+ if not agent.admin_state_up:
+ return []
+ query = context.session.query(NetworkDhcpAgentBinding.network_id)
+ net_ids = query.filter(
+ NetworkDhcpAgentBinding.dhcp_agent_id == agent.id).all()
+ if net_ids:
+ _ids = [item[0] for item in net_ids]
+ return self.get_networks(
+ context, filters={'id': _ids, 'admin_state_up': [True]})
+ else:
+ return []
+
+ def list_dhcp_agents_hosting_network(self, context, network_id):
+ dhcp_agents = self.get_dhcp_agents_hosting_networks(
+ context, [network_id])
+ agent_ids = [dhcp_agent.id for dhcp_agent in dhcp_agents]
+ if agent_ids:
+ return {
+ 'agents':
+ self.get_agents(context, filters={'id': agent_ids})}
+ else:
+ return {'agents': []}
+
+ def add_router_to_l3_agent(self, context, id, router_id):
+ """Add a l3 agent to host a router.
+ """
+ router = self.get_router(context, router_id)
+ with context.session.begin(subtransactions=True):
+ agent_db = self._get_agent(context, id)
+ if (agent_db['agent_type'] != constants.AGENT_TYPE_L3 or
+ not agent_db['admin_state_up'] or
+ not self.get_l3_agent_candidates(router, [agent_db])):
+ raise agentscheduler.InvalidL3Agent(id=id)
+ query = context.session.query(RouterL3AgentBinding)
+ try:
+ binding = query.filter(
+ RouterL3AgentBinding.l3_agent_id == agent_db.id,
+ RouterL3AgentBinding.router_id == router_id).one()
+ if binding:
+ raise agentscheduler.RouterHostedByL3Agent(
+ router_id=router_id, agent_id=id)
+ except exc.NoResultFound:
+ pass
+
+ result = self.auto_schedule_routers(context,
+ agent_db.host,
+ router_id)
+ if not result:
+ raise agentscheduler.RouterSchedulingFailed(
+ router_id=router_id, agent_id=id)
+
+ if self.l3_agent_notifier:
+ routers = self.get_sync_data(context, [router_id])
+ self.l3_agent_notifier.router_added_to_agent(
+ context, routers, agent_db.host)
+
+ def remove_router_from_l3_agent(self, context, id, router_id):
+ """Remove the router from l3 agent. After it, the router
+ will be non-hosted until there is update which
+ lead to re schedule or be added to another agent manually."""
+ agent = self._get_agent(context, id)
+ with context.session.begin(subtransactions=True):
+ query = context.session.query(RouterL3AgentBinding)
+ query = query.filter(
+ RouterL3AgentBinding.router_id == router_id,
+ RouterL3AgentBinding.l3_agent_id == id)
+ try:
+ binding = query.one()
+ except exc.NoResultFound:
+ raise agentscheduler.RouterNotHostedByL3Agent(
+ router_id=router_id, agent_id=id)
+ context.session.delete(binding)
+ if self.l3_agent_notifier:
+ self.l3_agent_notifier.router_removed_from_agent(
+ context, router_id, agent.host)
+
+ def list_routers_on_l3_agent(self, context, id):
+ query = context.session.query(RouterL3AgentBinding.router_id)
+ router_ids = query.filter(
+ RouterL3AgentBinding.l3_agent_id == id).all()
+ if router_ids:
+ _ids = [item[0] for item in router_ids]
+ return {'routers':
+ self.get_routers(context, filters={'id': _ids})}
+ else:
+ return {'routers': []}
+
+ def list_active_sync_routers_on_active_l3_agent(
+ self, context, host, router_id):
+ agent = self._get_agent_by_type_and_host(
+ context, constants.AGENT_TYPE_L3, host)
+ if not agent.admin_state_up:
+ return []
+ query = context.session.query(RouterL3AgentBinding.router_id)
+ query = query.filter(
+ RouterL3AgentBinding.l3_agent_id == agent.id)
+ if router_id:
+ query = query.filter(RouterL3AgentBinding.router_id == router_id)
+ router_ids = query.all()
+ if router_ids:
+ _ids = [item[0] for item in router_ids]
+ routers = self.get_sync_data(context, router_ids=_ids,
+ active=True)
+ return routers
+ return []
+
+ def get_l3_agents_hosting_routers(self, context, router_ids,
+ admin_state_up=None,
+ active=None):
+ if not router_ids:
+ return []
+ query = context.session.query(RouterL3AgentBinding)
+ if len(router_ids) > 1:
+ query = query.options(joinedload('l3_agent')).filter(
+ RouterL3AgentBinding.router_id.in_(router_ids))
+ else:
+ query = query.options(joinedload('l3_agent')).filter(
+ RouterL3AgentBinding.router_id == router_ids[0])
+ if admin_state_up is not None:
+ query = (query.filter(agents_db.Agent.admin_state_up ==
+ admin_state_up))
+ l3_agents = [binding.l3_agent for binding in query.all()]
+ if active is not None:
+ l3_agents = [l3_agent for l3_agent in
+ l3_agents if not
+ agents_db.AgentDbMixin.is_agent_down(
+ l3_agent['heartbeat_timestamp'])]
+ return l3_agents
+
+ def _get_l3_bindings_hosting_routers(self, context, router_ids):
+ if not router_ids:
+ return []
+ query = context.session.query(RouterL3AgentBinding)
+ if len(router_ids) > 1:
+ query = query.options(joinedload('l3_agent')).filter(
+ RouterL3AgentBinding.router_id.in_(router_ids))
+ else:
+ query = query.options(joinedload('l3_agent')).filter(
+ RouterL3AgentBinding.router_id == router_ids[0])
+ return query.all()
+
+ def list_l3_agents_hosting_router(self, context, router_id):
+ with context.session.begin(subtransactions=True):
+ bindings = self._get_l3_bindings_hosting_routers(
+ context, [router_id])
+ results = []
+ for binding in bindings:
+ l3_agent_dict = self._make_agent_dict(binding.l3_agent)
+ results.append(l3_agent_dict)
+ if results:
+ return {'agents': results}
+ else:
+ return {'agents': []}
+
+ def schedule_network(self, context, request_network, created_network):
+ if self.network_scheduler:
+ result = self.network_scheduler.schedule(
+ self, context, request_network, created_network)
+ if not result:
+ LOG.warn(_('Fail scheduling network %s'), created_network)
+
+ def auto_schedule_networks(self, context, host):
+ if self.network_scheduler:
+ self.network_scheduler.auto_schedule_networks(self, context, host)
+
+ def get_l3_agents(self, context, active=None, filters=None):
+ query = context.session.query(agents_db.Agent)
+ query = query.filter(
+ agents_db.Agent.agent_type == constants.AGENT_TYPE_L3)
+ if active is not None:
+ query = (query.filter(agents_db.Agent.admin_state_up == active))
+ if filters:
+ for key, value in filters.iteritems():
+ column = getattr(agents_db.Agent, key, None)
+ if column:
+ query = query.filter(column.in_(value))
+ l3_agents = query.all()
+ if active is not None:
+ l3_agents = [l3_agent for l3_agent in
+ l3_agents if not
+ agents_db.AgentDbMixin.is_agent_down(
+ l3_agent['heartbeat_timestamp'])]
+ return l3_agents
+
+ def get_l3_agent_candidates(self, sync_router, l3_agents):
+ """Get the valid l3 agents for the router from a list of l3_agents"""
+ candidates = []
+ for l3_agent in l3_agents:
+ if not l3_agent.admin_state_up:
+ continue
+ agent_conf = self.get_configuration_dict(l3_agent)
+ router_id = agent_conf.get('router_id', None)
+ use_namespaces = agent_conf.get('use_namespaces', True)
+ handle_internal_only_routers = agent_conf.get(
+ 'handle_internal_only_routers', True)
+ gateway_external_network_id = agent_conf.get(
+ 'gateway_external_network_id', None)
+ if not use_namespaces and router_id != sync_router['id']:
+ continue
+ ex_net_id = (sync_router['external_gateway_info'] or {}).get(
+ 'network_id')
+ if ((not ex_net_id and not handle_internal_only_routers) or
+ (ex_net_id and gateway_external_network_id and
+ ex_net_id != gateway_external_network_id)):
+ continue
+ candidates.append(l3_agent)
+ return candidates
+
+ def auto_schedule_routers(self, context, host, router_id):
+ if self.router_scheduler:
+ return self.router_scheduler.auto_schedule_routers(
+ self, context, host, router_id)
+
+ def schedule_router(self, context, router):
+ if self.router_scheduler:
+ return self.router_scheduler.schedule(
+ self, context, router)
+
+ def schedule_routers(self, context, routers):
+ """Schedule the routers to l3 agents.
+ """
+ for router in routers:
+ self.schedule_router(context, router)
# See the License for the specific language governing permissions and
# limitations under the License.
+from oslo.config import cfg
from sqlalchemy.orm import exc
from quantum.api.v2 import attributes
+from quantum.common import constants
+from quantum.common import utils
from quantum import manager
from quantum.openstack.common import log as logging
host = kwargs.get('host')
LOG.debug(_('Network list requested from %s'), host)
plugin = manager.QuantumManager.get_plugin()
- filters = dict(admin_state_up=[True])
-
- return [net['id'] for net in
- plugin.get_networks(context, filters=filters)]
+ if utils.is_extension_supported(
+ plugin, constants.AGENT_SCHEDULER_EXT_ALIAS):
+ if cfg.CONF.network_auto_schedule:
+ plugin.auto_schedule_networks(context, host)
+ nets = plugin.list_active_networks_on_active_dhcp_agent(
+ context, host)
+ else:
+ filters = dict(admin_state_up=[True])
+ nets = plugin.get_networks(context, filters=filters)
+ return [net['id'] for net in nets]
def get_network_info(self, context, **kwargs):
"""Retrieve and return a extended information about a network."""
network_id = kwargs.get('network_id')
+ host = kwargs.get('host')
+ LOG.debug(_('Network %(network_id)s requested from '
+ '%(host)s'), {'network_id': network_id,
+ 'host': host})
plugin = manager.QuantumManager.get_plugin()
network = plugin.get_network(context, network_id)
# a device id that combines host and network ids
LOG.debug(_('Port %(device_id)s for %(network_id)s requested from '
- '%(host)s'), locals())
+ '%(host)s'), {'device_id': device_id,
+ 'network_id': network_id,
+ 'host': host})
plugin = manager.QuantumManager.get_plugin()
retval = None
context, router['id'])
return routers
- def get_sync_data(self, context, router_ids=None):
+ def get_sync_data(self, context, router_ids=None, active=None):
"""Query routers and their related floating_ips, interfaces."""
with context.session.begin(subtransactions=True):
routers = super(ExtraRoute_db_mixin,
- self).get_sync_data(context, router_ids)
+ self).get_sync_data(context, router_ids,
+ active=active)
for router in routers:
router['routes'] = self._get_extra_routes_by_router_id(
context, router['id'])
from sqlalchemy.orm import exc
from sqlalchemy.sql import expression as expr
+from quantum.api.rpc.agentnotifiers import l3_rpc_agent_api
from quantum.api.v2 import attributes
from quantum.common import constants as l3_constants
from quantum.common import exceptions as q_exc
from quantum.db import db_base_plugin_v2
-from quantum.db import l3_rpc_agent_api
from quantum.db import model_base
from quantum.db import models_v2
from quantum.extensions import l3
if len(fixed_ips) != 1:
msg = _('Router port must have exactly one fixed IP')
raise q_exc.BadRequest(resource='router', msg=msg)
- subnet = self._get_subnet(context, fixed_ips[0]['subnet_id'])
+ subnet_id = fixed_ips[0]['subnet_id']
+ subnet = self._get_subnet(context, subnet_id)
self._check_for_dup_router_subnet(context, router_id,
port['network_id'],
subnet['id'],
'name': ''}})
routers = self.get_sync_data(context.elevated(), [router_id])
- l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers)
+ l3_rpc_agent_api.L3AgentNotify.routers_updated(
+ context, routers, 'add_router_interface',
+ {'network_id': port['network_id'],
+ 'subnet_id': subnet_id})
info = {'port_id': port['id'],
'subnet_id': port['fixed_ips'][0]['subnet_id']}
notifier_api.notify(context,
subnet_id = port_db['fixed_ips'][0]['subnet_id']
self._confirm_router_interface_not_in_use(
context, router_id, subnet_id)
+ _network_id = port_db['network_id']
self.delete_port(context, port_db['id'], l3_port_check=False)
elif 'subnet_id' in interface_info:
subnet_id = interface_info['subnet_id']
for p in ports:
if p['fixed_ips'][0]['subnet_id'] == subnet_id:
port_id = p['id']
+ _network_id = p['network_id']
self.delete_port(context, p['id'], l3_port_check=False)
found = True
break
raise l3.RouterInterfaceNotFoundForSubnet(router_id=router_id,
subnet_id=subnet_id)
routers = self.get_sync_data(context.elevated(), [router_id])
- l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers)
+ l3_rpc_agent_api.L3AgentNotify.routers_updated(
+ context, routers, 'remove_router_interface',
+ {'network_id': _network_id,
+ 'subnet_id': subnet_id})
notifier_api.notify(context,
notifier_api.publisher_id('network'),
'router.interface.delete',
router_id = floatingip_db['router_id']
if router_id:
routers = self.get_sync_data(context.elevated(), [router_id])
- l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers)
+ l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers,
+ 'create_floatingip')
return self._make_floatingip_dict(floatingip_db)
def update_floatingip(self, context, id, floatingip):
router_ids.append(router_id)
if router_ids:
routers = self.get_sync_data(context.elevated(), router_ids)
- l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers)
+ l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers,
+ 'update_floatingip')
return self._make_floatingip_dict(floatingip_db)
def delete_floatingip(self, context, id):
l3_port_check=False)
if router_id:
routers = self.get_sync_data(context.elevated(), [router_id])
- l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers)
+ l3_rpc_agent_api.L3AgentNotify.routers_updated(context, routers,
+ 'delete_floatingip')
def get_floatingip(self, context, id, fields=None):
floatingip = self._get_floatingip(context, id)
else:
return [n for n in nets if n['id'] not in ext_nets]
- def _get_sync_routers(self, context, router_ids=None):
+ def _get_sync_routers(self, context, router_ids=None, active=None):
"""Query routers and their gw ports for l3 agent.
Query routers with the router_ids. The gateway ports, if any,
"""
router_query = context.session.query(Router)
if router_ids:
- router_query = router_query.filter(Router.id.in_(router_ids))
+ if 1 == len(router_ids):
+ router_query = router_query.filter(Router.id == router_ids[0])
+ else:
+ router_query = router_query.filter(Router.id.in_(router_ids))
+ if active is not None:
+ router_query = router_query.filter(Router.admin_state_up == active)
routers = router_query.all()
gw_port_ids = []
if not routers:
gw_port_ids.append(gw_port_id)
gw_ports = []
if gw_port_ids:
- gw_ports = self._get_sync_gw_ports(context, gw_port_ids)
+ gw_ports = self.get_sync_gw_ports(context, gw_port_ids)
gw_port_id_gw_port_dict = {}
for gw_port in gw_ports:
gw_port_id_gw_port_dict[gw_port['id']] = gw_port
return []
return self.get_floatingips(context, {'router_id': router_ids})
- def _get_sync_gw_ports(self, context, gw_port_ids):
+ def get_sync_gw_ports(self, context, gw_port_ids):
if not gw_port_ids:
return []
filters = {'id': gw_port_ids}
self._populate_subnet_for_ports(context, gw_ports)
return gw_ports
- def _get_sync_interfaces(self, context, router_ids):
+ def get_sync_interfaces(self, context, router_ids,
+ device_owner=DEVICE_OWNER_ROUTER_INTF):
"""Query router interfaces that relate to list of router_ids."""
if not router_ids:
return []
filters = {'device_id': router_ids,
- 'device_owner': [DEVICE_OWNER_ROUTER_INTF]}
+ 'device_owner': [device_owner]}
interfaces = self.get_ports(context, filters)
if interfaces:
self._populate_subnet_for_ports(context, interfaces)
router[l3_constants.INTERFACE_KEY] = router_interfaces
return routers_dict.values()
- def get_sync_data(self, context, router_ids=None):
+ def get_sync_data(self, context, router_ids=None, active=None):
"""Query routers and their related floating_ips, interfaces."""
with context.session.begin(subtransactions=True):
routers = self._get_sync_routers(context,
- router_ids)
+ router_ids=router_ids,
+ active=active)
router_ids = [router['id'] for router in routers]
floating_ips = self._get_sync_floating_ips(context, router_ids)
- interfaces = self._get_sync_interfaces(context, router_ids)
+ interfaces = self.get_sync_interfaces(context, router_ids)
return self._process_sync_data(routers, interfaces, floating_ips)
def get_external_network_id(self, context):
+++ /dev/null
-# Copyright (c) 2012 OpenStack, LLC.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-# implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from quantum.common import topics
-from quantum.openstack.common import jsonutils
-from quantum.openstack.common import log as logging
-from quantum.openstack.common.rpc import proxy
-
-
-LOG = logging.getLogger(__name__)
-
-
-class L3AgentNotifyAPI(proxy.RpcProxy):
- """API for plugin to notify L3 agent."""
- BASE_RPC_API_VERSION = '1.0'
-
- def __init__(self, topic=topics.L3_AGENT):
- super(L3AgentNotifyAPI, self).__init__(
- topic=topic, default_version=self.BASE_RPC_API_VERSION)
-
- def router_deleted(self, context, router_id):
- LOG.debug(_('Notify agent the router %s is deleted'), router_id)
- self.cast(context,
- self.make_msg('router_deleted',
- router_id=router_id),
- topic=self.topic)
-
- def routers_updated(self, context, routers):
- if routers:
- LOG.debug(_('Notify agent routers were updated:\n %s'),
- jsonutils.dumps(routers, indent=5))
- self.cast(context,
- self.make_msg('routers_updated',
- routers=routers),
- topic=self.topic)
-
-
-L3AgentNotify = L3AgentNotifyAPI()
# See the License for the specific language governing permissions and
# limitations under the License.
+from oslo.config import cfg
+
+from quantum.common import constants
+from quantum.common import utils
from quantum import context as quantum_context
from quantum import manager
from quantum.openstack.common import jsonutils
with their interfaces and floating_ips
"""
router_id = kwargs.get('router_id')
- # TODO(gongysh) we will use host in kwargs for multi host BP
+ host = kwargs.get('host')
context = quantum_context.get_admin_context()
plugin = manager.QuantumManager.get_plugin()
- routers = plugin.get_sync_data(context, router_id)
+ if utils.is_extension_supported(
+ plugin, constants.AGENT_SCHEDULER_EXT_ALIAS):
+ if cfg.CONF.router_auto_schedule:
+ plugin.auto_schedule_routers(context, host, router_id)
+ routers = plugin.list_active_sync_routers_on_active_l3_agent(
+ context, host, router_id)
+ else:
+ routers = plugin.get_sync_data(context, router_id)
LOG.debug(_("Routers returned to l3 agent:\n %s"),
jsonutils.dumps(routers, indent=5))
return routers
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 OpenStack LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+"""agent scheduler
+
+Revision ID: 4692d074d587
+Revises: 3b54bf9e29f7
+Create Date: 2013-02-21 23:01:50.370306
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '4692d074d587'
+down_revision = '3b54bf9e29f7'
+
+# Change to ['*'] if this migration applies to all plugins
+
+migration_for_plugins = [
+ 'quantum.plugins.openvswitch.ovs_quantum_plugin.OVSQuantumPluginV2'
+]
+
+from alembic import op
+import sqlalchemy as sa
+
+
+from quantum.db import migration
+
+
+def upgrade(active_plugin=None, options=None):
+ if not migration.should_run(active_plugin, migration_for_plugins):
+ return
+
+ ### commands auto generated by Alembic - please adjust! ###
+ op.create_table(
+ 'networkdhcpagentbindings',
+ sa.Column('network_id', sa.String(length=36), nullable=False),
+ sa.Column('dhcp_agent_id', sa.String(length=36), nullable=False),
+ sa.ForeignKeyConstraint(['dhcp_agent_id'], ['agents.id'],
+ ondelete='CASCADE'),
+ sa.ForeignKeyConstraint(['network_id'], ['networks.id'],
+ ondelete='CASCADE'),
+ sa.PrimaryKeyConstraint('network_id', 'dhcp_agent_id')
+ )
+ op.create_table(
+ 'routerl3agentbindings',
+ sa.Column('id', sa.String(length=36), nullable=False),
+ sa.Column('router_id', sa.String(length=36), nullable=True),
+ sa.Column('l3_agent_id', sa.String(length=36), nullable=True),
+ sa.ForeignKeyConstraint(['l3_agent_id'], ['agents.id'],
+ ondelete='CASCADE'),
+ sa.ForeignKeyConstraint(['router_id'], ['routers.id'],
+ ondelete='CASCADE'),
+ sa.PrimaryKeyConstraint('id')
+ )
+ ### end Alembic commands ###
+
+
+def downgrade(active_plugin=None, options=None):
+ if not migration.should_run(active_plugin, migration_for_plugins):
+ return
+
+ ### commands auto generated by Alembic - please adjust! ###
+ op.drop_table('routerl3agentbindings')
+ op.drop_table('networkdhcpagentbindings')
+ ### end Alembic commands ###
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 OpenStack, LLC.
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from abc import abstractmethod
+
+from quantum.api import extensions
+from quantum.api.v2 import base
+from quantum.api.v2 import resource
+from quantum.common import constants
+from quantum.common import exceptions
+from quantum.extensions import agent
+from quantum import manager
+from quantum import policy
+from quantum import wsgi
+
+DHCP_NET = 'dhcp-network'
+DHCP_NETS = DHCP_NET + 's'
+DHCP_AGENT = 'dhcp-agent'
+DHCP_AGENTS = DHCP_AGENT + 's'
+L3_ROUTER = 'l3-router'
+L3_ROUTERS = L3_ROUTER + 's'
+L3_AGENT = 'l3-agent'
+L3_AGENTS = L3_AGENT + 's'
+
+
+class NetworkSchedulerController(wsgi.Controller):
+ def index(self, request, **kwargs):
+ plugin = manager.QuantumManager.get_plugin()
+ policy.enforce(request.context,
+ "get_%s" % DHCP_NETS,
+ {},
+ plugin=plugin)
+ return plugin.list_networks_on_dhcp_agent(
+ request.context, kwargs['agent_id'])
+
+ def create(self, request, body, **kwargs):
+ plugin = manager.QuantumManager.get_plugin()
+ policy.enforce(request.context,
+ "create_%s" % DHCP_NET,
+ {},
+ plugin=plugin)
+ return plugin.add_network_to_dhcp_agent(
+ request.context, kwargs['agent_id'], body['network_id'])
+
+ def delete(self, request, id, **kwargs):
+ plugin = manager.QuantumManager.get_plugin()
+ policy.enforce(request.context,
+ "delete_%s" % DHCP_NET,
+ {},
+ plugin=plugin)
+ return plugin.remove_network_from_dhcp_agent(
+ request.context, kwargs['agent_id'], id)
+
+
+class RouterSchedulerController(wsgi.Controller):
+ def index(self, request, **kwargs):
+ plugin = manager.QuantumManager.get_plugin()
+ policy.enforce(request.context,
+ "get_%s" % L3_ROUTERS,
+ {},
+ plugin=plugin)
+ return plugin.list_routers_on_l3_agent(
+ request.context, kwargs['agent_id'])
+
+ def create(self, request, body, **kwargs):
+ plugin = manager.QuantumManager.get_plugin()
+ policy.enforce(request.context,
+ "create_%s" % L3_ROUTER,
+ {},
+ plugin=plugin)
+ return plugin.add_router_to_l3_agent(
+ request.context,
+ kwargs['agent_id'],
+ body['router_id'])
+
+ def delete(self, request, id, **kwargs):
+ plugin = manager.QuantumManager.get_plugin()
+ policy.enforce(request.context,
+ "delete_%s" % L3_ROUTER,
+ {},
+ plugin=plugin)
+ return plugin.remove_router_from_l3_agent(
+ request.context, kwargs['agent_id'], id)
+
+
+class DhcpAgentsHostingNetworkController(wsgi.Controller):
+ def index(self, request, **kwargs):
+ plugin = manager.QuantumManager.get_plugin()
+ policy.enforce(request.context,
+ "get_%s" % DHCP_AGENTS,
+ {},
+ plugin=plugin)
+ return plugin.list_dhcp_agents_hosting_network(
+ request.context, kwargs['network_id'])
+
+
+class L3AgentsHostingRouterController(wsgi.Controller):
+ def index(self, request, **kwargs):
+ plugin = manager.QuantumManager.get_plugin()
+ policy.enforce(request.context,
+ "get_%s" % L3_AGENTS,
+ {},
+ plugin=plugin)
+ return plugin.list_l3_agents_hosting_router(
+ request.context, kwargs['router_id'])
+
+
+class Agentscheduler(extensions.ExtensionDescriptor):
+ """Extension class supporting agent scheduler.
+ """
+
+ @classmethod
+ def get_name(cls):
+ return "Agent Schedulers"
+
+ @classmethod
+ def get_alias(cls):
+ return constants.AGENT_SCHEDULER_EXT_ALIAS
+
+ @classmethod
+ def get_description(cls):
+ return "Schedule resources among agents"
+
+ @classmethod
+ def get_namespace(cls):
+ return "http://docs.openstack.org/ext/agent_scheduler/api/v1.0"
+
+ @classmethod
+ def get_updated(cls):
+ return "2013-02-03T10:00:00-00:00"
+
+ @classmethod
+ def get_resources(cls):
+ """Returns Ext Resources """
+ exts = []
+ parent = dict(member_name="agent",
+ collection_name="agents")
+ controller = resource.Resource(NetworkSchedulerController(),
+ base.FAULT_MAP)
+ exts.append(extensions.ResourceExtension(
+ DHCP_NETS, controller, parent))
+
+ controller = resource.Resource(RouterSchedulerController(),
+ base.FAULT_MAP)
+ exts.append(extensions.ResourceExtension(
+ L3_ROUTERS, controller, parent))
+
+ parent = dict(member_name="network",
+ collection_name="networks")
+
+ controller = resource.Resource(DhcpAgentsHostingNetworkController(),
+ base.FAULT_MAP)
+ exts.append(extensions.ResourceExtension(
+ DHCP_AGENTS, controller, parent))
+
+ parent = dict(member_name="router",
+ collection_name="routers")
+
+ controller = resource.Resource(L3AgentsHostingRouterController(),
+ base.FAULT_MAP)
+ exts.append(extensions.ResourceExtension(
+ L3_AGENTS, controller, parent))
+ return exts
+
+ def get_extended_resources(self, version):
+ return {}
+
+
+class InvalidDHCPAgent(agent.AgentNotFound):
+ message = _("Agent %(id)s is not a valid DHCP Agent or has been disabled")
+
+
+class NetworkHostedByDHCPAgent(exceptions.Conflict):
+ message = _("The network %(network_id)s has been already hosted"
+ " by the DHCP Agent %(agent_id)s.")
+
+
+class NetworkNotHostedByDhcpAgent(exceptions.Conflict):
+ message = _("The network %(network_id)s is not hosted"
+ " by the DHCP agent %(agent_id)s.")
+
+
+class InvalidL3Agent(agent.AgentNotFound):
+ message = _("Agent %(id)s is not a L3 Agent or has been disabled")
+
+
+class RouterHostedByL3Agent(exceptions.Conflict):
+ message = _("The router %(router_id)s has been already hosted"
+ " by the L3 Agent %(agent_id)s.")
+
+
+class RouterSchedulingFailed(exceptions.Conflict):
+ message = _("Failed scheduling router %(router_id)s to"
+ " the L3 Agent %(agent_id)s.")
+
+
+class RouterNotHostedByL3Agent(exceptions.Conflict):
+ message = _("The router %(router_id)s is not hosted"
+ " by L3 agent %(agent_id)s.")
+
+
+class AgentSchedulerPluginBase(object):
+ """ REST API to operate the agent scheduler.
+
+ All of method must be in an admin context.
+ """
+
+ @abstractmethod
+ def add_network_to_dhcp_agent(self, context, id, network_id):
+ pass
+
+ @abstractmethod
+ def remove_network_from_dhcp_agent(self, context, id, network_id):
+ pass
+
+ @abstractmethod
+ def list_networks_on_dhcp_agent(self, context, id):
+ pass
+
+ @abstractmethod
+ def list_dhcp_agents_hosting_network(self, context, network_id):
+ pass
+
+ @abstractmethod
+ def add_router_to_l3_agent(self, context, id, router_id):
+ pass
+
+ @abstractmethod
+ def remove_router_from_l3_agent(self, context, id, router_id):
+ pass
+
+ @abstractmethod
+ def list_routers_on_l3_agent(self, context, id):
+ pass
+
+ @abstractmethod
+ def list_l3_agents_hosting_router(self, context, router_id):
+ pass
from oslo.config import cfg
from quantum.agent.common import config
+from quantum import scheduler
DEFAULT_BRIDGE_MAPPINGS = []
cfg.CONF.register_opts(agent_opts, "AGENT")
config.register_agent_state_opts_helper(cfg.CONF)
config.register_root_helper(cfg.CONF)
+cfg.CONF.register_opts(scheduler.AGENTS_SCHEDULER_OPTS)
from oslo.config import cfg
from quantum.agent import securitygroups_rpc as sg_rpc
+from quantum.api.rpc.agentnotifiers import dhcp_rpc_agent_api
+from quantum.api.rpc.agentnotifiers import l3_rpc_agent_api
from quantum.api.v2 import attributes
from quantum.common import constants as q_const
from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc
from quantum.common import topics
from quantum.db import agents_db
+from quantum.db import agentschedulers_db
from quantum.db import db_base_plugin_v2
from quantum.db import dhcp_rpc_base
from quantum.db import extraroute_db
from quantum.extensions import portbindings
from quantum.extensions import providernet as provider
from quantum.extensions import securitygroup as ext_sg
+from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import proxy
class OVSQuantumPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
extraroute_db.ExtraRoute_db_mixin,
sg_db_rpc.SecurityGroupServerRpcMixin,
- agents_db.AgentDbMixin):
+ agents_db.AgentDbMixin,
+ agentschedulers_db.AgentSchedulerDbMixin):
+
"""Implement the Quantum abstractions using Open vSwitch.
Depending on whether tunneling is enabled, either a GRE tunnel or
supported_extension_aliases = ["provider", "router",
"binding", "quotas", "security-group",
- "agent",
- "extraroute"]
+ "agent", "extraroute", "agent_scheduler"]
network_view = "extension:provider_network:view"
network_set = "extension:provider_network:set"
"Agent terminated!"))
sys.exit(1)
self.setup_rpc()
+ self.network_scheduler = importutils.import_object(
+ cfg.CONF.network_scheduler_driver)
+ self.router_scheduler = importutils.import_object(
+ cfg.CONF.router_scheduler_driver)
def setup_rpc(self):
# RPC support
self.topic = topics.PLUGIN
self.conn = rpc.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT)
+ self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
+ self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
self.callbacks = OVSRpcCallbacks(self.notifier)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
self._extend_network_dict_l3(context, net)
# note - exception will rollback entire transaction
LOG.debug(_("Created network: %s"), net['id'])
+ self.schedule_network(context, network['network'], net)
return net
def update_network(self, context, id, network):
else:
self.notifier.security_groups_member_updated(
context, port.get(ext_sg.SECURITYGROUPS))
+ net = self.get_network(context, port['network_id'])
+ self.schedule_network(context, None, net)
return self._extend_port_dict_binding(context, port)
def get_port(self, context, id, fields=None):
self.notifier.security_groups_member_updated(
context, port.get(ext_sg.SECURITYGROUPS))
+
+ def update_agent(self, context, id, agent):
+ original_agent = self.get_agent(context, id)
+ result = super(OVSQuantumPluginV2, self).update_agent(
+ context, id, agent)
+ agent_data = agent['agent']
+ if ('admin_state_up' in agent_data and
+ original_agent['admin_state_up'] != agent_data['admin_state_up']):
+ if original_agent['agent_type'] == q_const.AGENT_TYPE_DHCP:
+ self.dhcp_agent_notifier.agent_updated(
+ context, agent_data['admin_state_up'],
+ original_agent['host'])
+ elif original_agent['agent_type'] == q_const.AGENT_TYPE_L3:
+ self.l3_agent_notifier.agent_updated(
+ context, agent_data['admin_state_up'],
+ original_agent['host'])
+ return result
raise exceptions.PolicyNotFound(path=cfg.CONF.policy_file)
# pass _set_brain to read_cached_file so that the policy brain
# is reset only if the file has changed
+ LOG.debug(_("loading policy file at %s"), _POLICY_PATH)
utils.read_cached_file(_POLICY_PATH, _POLICY_CACHE,
reload_func=_set_rules)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo.config import cfg
+
+
+AGENTS_SCHEDULER_OPTS = [
+ cfg.StrOpt('network_scheduler_driver',
+ default='quantum.scheduler.'
+ 'dhcp_agent_scheduler.ChanceScheduler',
+ help=_('Driver to use for scheduling network to DHCP agent')),
+ cfg.StrOpt('router_scheduler_driver',
+ default='quantum.scheduler.l3_agent_scheduler.ChanceScheduler',
+ help=_('Driver to use for scheduling '
+ 'router to a default L3 agent')),
+ cfg.BoolOpt('network_auto_schedule', default=True,
+ help=_('Allow auto scheduling networks to DHCP agent.')),
+ cfg.BoolOpt('router_auto_schedule', default=True,
+ help=_('Allow auto scheduling routers to L3 agent.')),
+]
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import random
+
+from sqlalchemy.orm import exc
+from sqlalchemy.sql import exists
+
+from quantum.common import constants
+from quantum.db import models_v2
+from quantum.db import agents_db
+from quantum.db import agentschedulers_db
+from quantum.openstack.common import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+class ChanceScheduler(object):
+ """Allocate a DHCP agent for a network in a random way.
+ More sophisticated scheduler (similar to filter scheduler in nova?)
+ can be introduced later."""
+
+ def schedule(self, plugin, context, request_network, network):
+ """Schedule the network to an active DHCP agent if there
+ is no active DHCP agent hosting it.
+ """
+ #TODO(gongysh) don't schedule the networks with only
+ # subnets whose enable_dhcp is false
+ with context.session.begin(subtransactions=True):
+ dhcp_agents = plugin.get_dhcp_agents_hosting_networks(
+ context, [network['id']], active=True)
+ if dhcp_agents:
+ LOG.debug(_('Network %s is hosted already'),
+ network['id'])
+ return False
+ enabled_dhcp_agents = plugin.get_agents_db(
+ context, filters={
+ 'agent_type': [constants.AGENT_TYPE_DHCP],
+ 'admin_state_up': [True]})
+ if not enabled_dhcp_agents:
+ LOG.warn(_('No enabled DHCP agents'))
+ return False
+ active_dhcp_agents = [enabled_dhcp_agent for enabled_dhcp_agent in
+ enabled_dhcp_agents if not
+ agents_db.AgentDbMixin.is_agent_down(
+ enabled_dhcp_agent['heartbeat_timestamp'])]
+ if not active_dhcp_agents:
+ LOG.warn(_('No active DHCP agents'))
+ return False
+ chosen_agent = random.choice(active_dhcp_agents)
+ binding = agentschedulers_db.NetworkDhcpAgentBinding()
+ binding.dhcp_agent = chosen_agent
+ binding.network_id = network['id']
+ context.session.add(binding)
+ LOG.debug(_('Network %(network_id)s is scheduled to be hosted by '
+ 'DHCP agent %(agent_id)s'),
+ {'network_id': network['id'],
+ 'agent_id': chosen_agent['id']})
+ return True
+
+ def auto_schedule_networks(self, plugin, context, host):
+ """Schedule non-hosted networks to the DHCP agent on
+ the specified host."""
+ with context.session.begin(subtransactions=True):
+ query = context.session.query(agents_db.Agent)
+ query = query.filter(agents_db.Agent.agent_type ==
+ constants.AGENT_TYPE_DHCP,
+ agents_db.Agent.host == host,
+ agents_db.Agent.admin_state_up == True)
+ try:
+ dhcp_agent = query.one()
+ except (exc.MultipleResultsFound, exc.NoResultFound):
+ LOG.warn(_('No enabled DHCP agent on host %s'),
+ host)
+ return False
+ if agents_db.AgentDbMixin.is_agent_down(
+ dhcp_agent.heartbeat_timestamp):
+ LOG.warn(_('DHCP agent %s is not active'), dhcp_agent.id)
+ #TODO(gongysh) consider the disabled agent's network
+ net_stmt = ~exists().where(
+ models_v2.Network.id ==
+ agentschedulers_db.NetworkDhcpAgentBinding.network_id)
+ net_ids = context.session.query(
+ models_v2.Network.id).filter(net_stmt).all()
+ if not net_ids:
+ LOG.debug(_('No non-hosted networks'))
+ return False
+ for net_id in net_ids:
+ binding = agentschedulers_db.NetworkDhcpAgentBinding()
+ binding.dhcp_agent = dhcp_agent
+ binding.network_id = net_id[0]
+ context.session.add(binding)
+ return True
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import random
+
+from sqlalchemy.orm import exc
+from sqlalchemy.sql import exists
+
+from quantum.common import constants
+from quantum.db import l3_db
+from quantum.db import agents_db
+from quantum.db import agentschedulers_db
+from quantum.openstack.common import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+class ChanceScheduler(object):
+ """Allocate a L3 agent for a router in a random way.
+ More sophisticated scheduler (similar to filter scheduler in nova?)
+ can be introduced later."""
+
+ def auto_schedule_routers(self, plugin, context, host, router_id):
+ """Schedule non-hosted routers to L3 Agent running on host.
+ If router_id is given, only this router is scheduled
+ if it is not hosted yet.
+ Don't schedule the routers which are hosted already
+ by active l3 agents.
+ """
+ with context.session.begin(subtransactions=True):
+ # query if we have valid l3 agent on the host
+ query = context.session.query(agents_db.Agent)
+ query = query.filter(agents_db.Agent.agent_type ==
+ constants.AGENT_TYPE_L3,
+ agents_db.Agent.host == host,
+ agents_db.Agent.admin_state_up == True)
+ try:
+ l3_agent = query.one()
+ except (exc.MultipleResultsFound, exc.NoResultFound):
+ LOG.debug(_('No enabled L3 agent on host %s'),
+ host)
+ return False
+ if agents_db.AgentDbMixin.is_agent_down(
+ l3_agent.heartbeat_timestamp):
+ LOG.warn(_('L3 agent %s is not active'), l3_agent.id)
+ # check if the specified router is hosted
+ if router_id:
+ l3_agents = plugin.get_l3_agents_hosting_routers(
+ context, [router_id], admin_state_up=True)
+ if l3_agents:
+ LOG.debug(_('Router %(router_id)s has already been hosted'
+ ' by L3 agent %(agent_id)s'),
+ {'router_id': router_id,
+ 'agent_id': l3_agents[0]['id']})
+ return False
+
+ # get the router ids
+ if router_id:
+ router_ids = [(router_id,)]
+ else:
+ # get all routers that are not hosted
+ #TODO(gongysh) consider the disabled agent's router
+ stmt = ~exists().where(
+ l3_db.Router.id ==
+ agentschedulers_db.RouterL3AgentBinding.router_id)
+ router_ids = context.session.query(
+ l3_db.Router.id).filter(stmt).all()
+ if not router_ids:
+ LOG.debug(_('No non-hosted routers'))
+ return False
+
+ # check if the configuration of l3 agent is compatible
+ # with the router
+ router_ids = [router_id[0] for router_id in router_ids]
+ routers = plugin.get_routers(context, filters={'id': router_ids})
+ to_removed_ids = []
+ for router in routers:
+ candidates = plugin.get_l3_agent_candidates(router, [l3_agent])
+ if not candidates:
+ to_removed_ids.append(router['id'])
+ router_ids = list(set(router_ids) - set(to_removed_ids))
+ if not router_ids:
+ LOG.warn(_('No routers compatible with L3 agent configuration'
+ ' on host %s', host))
+ return False
+
+ # binding
+ for router_id in router_ids:
+ binding = agentschedulers_db.RouterL3AgentBinding()
+ binding.l3_agent = l3_agent
+ binding.router_id = router_id
+ binding.default = True
+ context.session.add(binding)
+ return True
+
+ def schedule(self, plugin, context, sync_router):
+ """Schedule the router to an active L3 agent if there
+ is no enable L3 agent hosting it.
+ """
+ with context.session.begin(subtransactions=True):
+ # allow one router is hosted by just
+ # one enabled l3 agent hosting since active is just a
+ # timing problem. Non-active l3 agent can return to
+ # active any time
+ l3_agents = plugin.get_l3_agents_hosting_routers(
+ context, [sync_router['id']], admin_state_up=True)
+ if l3_agents:
+ LOG.debug(_('Router %(router_id)s has already been hosted'
+ ' by L3 agent %(agent_id)s'),
+ {'router_id': sync_router['id'],
+ 'agent_id': l3_agents[0]['id']})
+ return False
+
+ active_l3_agents = plugin.get_l3_agents(context, active=True)
+ if not active_l3_agents:
+ LOG.warn(_('No active L3 agents'))
+ return False
+ candidates = plugin.get_l3_agent_candidates(sync_router,
+ active_l3_agents)
+ if not candidates:
+ LOG.warn(_('No L3 agents can host the router %s'),
+ sync_router['id'])
+ return False
+
+ chosen_agent = random.choice(candidates)
+ binding = agentschedulers_db.RouterL3AgentBinding()
+ binding.l3_agent = chosen_agent
+ binding.router_id = sync_router['id']
+ context.session.add(binding)
+ LOG.debug(_('Router %(router_id)s is scheduled to '
+ 'L3 agent %(agent_id)s'),
+ {'router_id': sync_router['id'],
+ 'agent_id': chosen_agent['id']})
+ return True
--- /dev/null
+# Copyright (c) 2013 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import contextlib
+import copy
+
+import mock
+from webob import exc
+
+from quantum.api import extensions
+from quantum.common import constants
+from quantum import context
+from quantum.db import agents_db
+from quantum.db import dhcp_rpc_base
+from quantum.db import l3_rpc_base
+from quantum.extensions import agentscheduler
+from quantum import manager
+from quantum.openstack.common import uuidutils
+from quantum.plugins.openvswitch.ovs_quantum_plugin import OVSQuantumPluginV2
+from quantum.tests.unit import test_agent_ext_plugin
+from quantum.tests.unit.testlib_api import create_request
+from quantum.tests.unit import test_db_plugin as test_plugin
+from quantum.tests.unit import test_extensions
+from quantum.tests.unit import test_l3_plugin
+from quantum.wsgi import Serializer
+
+L3_HOSTA = 'hosta'
+DHCP_HOSTA = 'hosta'
+L3_HOSTB = 'hostb'
+DHCP_HOSTC = 'hostc'
+
+
+class AgentSchedulerTestMixIn(object):
+
+ def _request_list(self, path, admin_context=True,
+ expected_code=exc.HTTPOk.code):
+ req = self._path_req(path, admin_context=admin_context)
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, expected_code)
+ return self.deserialize(self.fmt, res)
+
+ def _path_req(self, path, method='GET', data=None,
+ query_string=None,
+ admin_context=True):
+ content_type = 'application/%s' % self.fmt
+ body = None
+ if data is not None: # empty dict is valid
+ body = Serializer().serialize(data, content_type)
+ if admin_context:
+ return create_request(
+ path, body, content_type, method, query_string=query_string)
+ else:
+ return create_request(
+ path, body, content_type, method, query_string=query_string,
+ context=context.Context('', 'tenant_id'))
+
+ def _path_create_request(self, path, data, admin_context=True):
+ return self._path_req(path, method='POST', data=data,
+ admin_context=admin_context)
+
+ def _path_show_request(self, path, admin_context=True):
+ return self._path_req(path, admin_context=admin_context)
+
+ def _path_delete_request(self, path, admin_context=True):
+ return self._path_req(path, method='DELETE',
+ admin_context=admin_context)
+
+ def _path_update_request(self, path, data, admin_context=True):
+ return self._path_req(path, method='PUT', data=data,
+ admin_context=admin_context)
+
+ def _list_routers_hosted_by_l3_agent(self, agent_id,
+ expected_code=exc.HTTPOk.code,
+ admin_context=True):
+ path = "/agents/%s/%s.%s" % (agent_id,
+ agentscheduler.L3_ROUTERS,
+ self.fmt)
+ return self._request_list(path, expected_code=expected_code,
+ admin_context=admin_context)
+
+ def _list_networks_hosted_by_dhcp_agent(self, agent_id,
+ expected_code=exc.HTTPOk.code,
+ admin_context=True):
+ path = "/agents/%s/%s.%s" % (agent_id,
+ agentscheduler.DHCP_NETS,
+ self.fmt)
+ return self._request_list(path, expected_code=expected_code,
+ admin_context=admin_context)
+
+ def _list_l3_agents_hosting_router(self, router_id,
+ expected_code=exc.HTTPOk.code,
+ admin_context=True):
+ path = "/routers/%s/%s.%s" % (router_id,
+ agentscheduler.L3_AGENTS,
+ self.fmt)
+ return self._request_list(path, expected_code=expected_code,
+ admin_context=admin_context)
+
+ def _list_dhcp_agents_hosting_network(self, network_id,
+ expected_code=exc.HTTPOk.code,
+ admin_context=True):
+ path = "/networks/%s/%s.%s" % (network_id,
+ agentscheduler.DHCP_AGENTS,
+ self.fmt)
+ return self._request_list(path, expected_code=expected_code,
+ admin_context=admin_context)
+
+ def _add_router_to_l3_agent(self, id, router_id,
+ expected_code=exc.HTTPCreated.code,
+ admin_context=True):
+ path = "/agents/%s/%s.%s" % (id,
+ agentscheduler.L3_ROUTERS,
+ self.fmt)
+ req = self._path_create_request(path,
+ {'router_id': router_id},
+ admin_context=admin_context)
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, expected_code)
+
+ def _add_network_to_dhcp_agent(self, id, network_id,
+ expected_code=exc.HTTPCreated.code,
+ admin_context=True):
+ path = "/agents/%s/%s.%s" % (id,
+ agentscheduler.DHCP_NETS,
+ self.fmt)
+ req = self._path_create_request(path,
+ {'network_id': network_id},
+ admin_context=admin_context)
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, expected_code)
+
+ def _remove_network_from_dhcp_agent(self, id, network_id,
+ expected_code=exc.HTTPNoContent.code,
+ admin_context=True):
+ path = "/agents/%s/%s/%s.%s" % (id,
+ agentscheduler.DHCP_NETS,
+ network_id,
+ self.fmt)
+ req = self._path_delete_request(path,
+ admin_context=admin_context)
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, expected_code)
+
+ def _remove_router_from_l3_agent(self, id, router_id,
+ expected_code=exc.HTTPNoContent.code,
+ admin_context=True):
+ path = "/agents/%s/%s/%s.%s" % (id,
+ agentscheduler.L3_ROUTERS,
+ router_id,
+ self.fmt)
+ req = self._path_delete_request(path, admin_context=admin_context)
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, expected_code)
+
+ def _register_one_agent_state(self, agent_state):
+ callback = agents_db.AgentExtRpcCallback()
+ callback.report_state(self.adminContext,
+ agent_state={'agent_state': agent_state})
+
+ def _disable_agent(self, agent_id, admin_state_up=False):
+ new_agent = {}
+ new_agent['agent'] = {}
+ new_agent['agent']['admin_state_up'] = admin_state_up
+ self._update('agents', agent_id, new_agent)
+
+ def _get_agent_id(self, agent_type, host):
+ agents = self._list_agents()
+ for agent in agents['agents']:
+ if (agent['agent_type'] == agent_type and
+ agent['host'] == host):
+ return agent['id']
+
+
+class AgentSchedulerTestCase(test_l3_plugin.L3NatTestCaseMixin,
+ test_agent_ext_plugin.AgentDBTestMixIn,
+ AgentSchedulerTestMixIn,
+ test_plugin.QuantumDbPluginV2TestCase):
+ fmt = 'json'
+
+ def setUp(self):
+ plugin = ('quantum.plugins.openvswitch.'
+ 'ovs_quantum_plugin.OVSQuantumPluginV2')
+ self.dhcp_notifier_cls_p = mock.patch(
+ 'quantum.api.rpc.agentnotifiers.dhcp_rpc_agent_api.'
+ 'DhcpAgentNotifyAPI')
+ self.dhcp_notifier = mock.Mock(name='dhcp_notifier')
+ self.dhcp_notifier_cls = self.dhcp_notifier_cls_p.start()
+ self.dhcp_notifier_cls.return_value = self.dhcp_notifier
+ super(AgentSchedulerTestCase, self).setUp(plugin)
+ ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
+ self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
+ self.adminContext = context.get_admin_context()
+ self.agentscheduler_dbMinxin = manager.QuantumManager.get_plugin()
+ self.addCleanup(self.dhcp_notifier_cls_p.stop)
+
+ def test_report_states(self):
+ self._register_agent_states()
+ agents = self._list_agents()
+ self.assertEqual(4, len(agents['agents']))
+
+ def test_network_scheduling_on_network_creation(self):
+ self._register_agent_states()
+ with self.network() as net:
+ dhcp_agents = self._list_dhcp_agents_hosting_network(
+ net['network']['id'])
+ self.assertEqual(1, len(dhcp_agents['agents']))
+
+ def test_network_auto_schedule_with_disabled(self):
+ with contextlib.nested(self.network(),
+ self.network()):
+ dhcp_rpc = dhcp_rpc_base.DhcpRpcCallbackMixin()
+ self._register_agent_states()
+ hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
+ DHCP_HOSTA)
+ hostc_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
+ DHCP_HOSTC)
+ self._disable_agent(hosta_id)
+ dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTA)
+ # second agent will host all the networks since first is disabled.
+ dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTC)
+ networks = self._list_networks_hosted_by_dhcp_agent(hostc_id)
+ num_hostc_nets = len(networks['networks'])
+ networks = self._list_networks_hosted_by_dhcp_agent(hosta_id)
+ num_hosta_nets = len(networks['networks'])
+ self.assertEqual(0, num_hosta_nets)
+ self.assertEqual(2, num_hostc_nets)
+
+ def test_network_auto_schedule_with_hosted(self):
+ # one agent hosts all the networks, other hosts none
+ with contextlib.nested(self.network(),
+ self.network()) as (net1, net2):
+ dhcp_rpc = dhcp_rpc_base.DhcpRpcCallbackMixin()
+ self._register_agent_states()
+ dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTA)
+ # second agent will not host the network since first has got it.
+ dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTC)
+ dhcp_agents = self._list_dhcp_agents_hosting_network(
+ net1['network']['id'])
+ hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
+ DHCP_HOSTA)
+ hostc_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
+ DHCP_HOSTC)
+ hosta_nets = self._list_networks_hosted_by_dhcp_agent(hosta_id)
+ num_hosta_nets = len(hosta_nets['networks'])
+ hostc_nets = self._list_networks_hosted_by_dhcp_agent(hostc_id)
+ num_hostc_nets = len(hostc_nets['networks'])
+
+ self.assertEqual(2, num_hosta_nets)
+ self.assertEqual(0, num_hostc_nets)
+ self.assertEqual(1, len(dhcp_agents['agents']))
+ self.assertEqual(DHCP_HOSTA, dhcp_agents['agents'][0]['host'])
+
+ def test_network_auto_schedule_with_hosted_2(self):
+ # one agent hosts one network
+ dhcp_rpc = dhcp_rpc_base.DhcpRpcCallbackMixin()
+ dhcp_hosta = {
+ 'binary': 'quantum-dhcp-agent',
+ 'host': DHCP_HOSTA,
+ 'topic': 'DHCP_AGENT',
+ 'configurations': {'dhcp_driver': 'dhcp_driver',
+ 'use_namespaces': True,
+ },
+ 'agent_type': constants.AGENT_TYPE_DHCP}
+ dhcp_hostc = copy.deepcopy(dhcp_hosta)
+ dhcp_hostc['host'] = DHCP_HOSTC
+ with self.network() as net1:
+ self._register_one_agent_state(dhcp_hosta)
+ dhcp_rpc.get_active_networks(self.adminContext, host=DHCP_HOSTA)
+ hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
+ DHCP_HOSTA)
+ self._disable_agent(hosta_id, admin_state_up=False)
+ with self.network() as net2:
+ self._register_one_agent_state(dhcp_hostc)
+ dhcp_rpc.get_active_networks(self.adminContext,
+ host=DHCP_HOSTC)
+ dhcp_agents_1 = self._list_dhcp_agents_hosting_network(
+ net1['network']['id'])
+ dhcp_agents_2 = self._list_dhcp_agents_hosting_network(
+ net2['network']['id'])
+ hosta_nets = self._list_networks_hosted_by_dhcp_agent(hosta_id)
+ num_hosta_nets = len(hosta_nets['networks'])
+ hostc_id = self._get_agent_id(
+ constants.AGENT_TYPE_DHCP,
+ DHCP_HOSTC)
+ hostc_nets = self._list_networks_hosted_by_dhcp_agent(hostc_id)
+ num_hostc_nets = len(hostc_nets['networks'])
+
+ self.assertEqual(1, num_hosta_nets)
+ self.assertEqual(1, num_hostc_nets)
+ self.assertEqual(1, len(dhcp_agents_1['agents']))
+ self.assertEqual(1, len(dhcp_agents_2['agents']))
+ self.assertEqual(DHCP_HOSTA, dhcp_agents_1['agents'][0]['host'])
+ self.assertEqual(DHCP_HOSTC, dhcp_agents_2['agents'][0]['host'])
+
+ def test_network_scheduling_on_port_creation(self):
+ with self.subnet() as subnet:
+ dhcp_agents = self._list_dhcp_agents_hosting_network(
+ subnet['subnet']['network_id'])
+ result0 = len(dhcp_agents['agents'])
+ self._register_agent_states()
+ with self.port(subnet=subnet,
+ device_owner="compute:test:" + DHCP_HOSTA) as port:
+ dhcp_agents = self._list_dhcp_agents_hosting_network(
+ port['port']['network_id'])
+ result1 = len(dhcp_agents['agents'])
+ self.assertEqual(0, result0)
+ self.assertEqual(1, result1)
+
+ def test_network_scheduler_with_disabled_agent(self):
+ dhcp_hosta = {
+ 'binary': 'quantum-dhcp-agent',
+ 'host': DHCP_HOSTA,
+ 'topic': 'DHCP_AGENT',
+ 'configurations': {'dhcp_driver': 'dhcp_driver',
+ 'use_namespaces': True,
+ },
+ 'agent_type': constants.AGENT_TYPE_DHCP}
+ self._register_one_agent_state(dhcp_hosta)
+ with self.network() as net1:
+ dhcp_agents = self._list_dhcp_agents_hosting_network(
+ net1['network']['id'])
+ self.assertEqual(1, len(dhcp_agents['agents']))
+ agents = self._list_agents()
+ self._disable_agent(agents['agents'][0]['id'])
+ with self.network() as net2:
+ dhcp_agents = self._list_dhcp_agents_hosting_network(
+ net2['network']['id'])
+ self.assertEqual(0, len(dhcp_agents['agents']))
+
+ def test_network_scheduler_with_down_agent(self):
+ dhcp_hosta = {
+ 'binary': 'quantum-dhcp-agent',
+ 'host': DHCP_HOSTA,
+ 'topic': 'DHCP_AGENT',
+ 'configurations': {'dhcp_driver': 'dhcp_driver',
+ 'use_namespaces': True,
+ },
+ 'agent_type': constants.AGENT_TYPE_DHCP}
+ self._register_one_agent_state(dhcp_hosta)
+ is_agent_down_str = 'quantum.db.agents_db.AgentDbMixin.is_agent_down'
+ with mock.patch(is_agent_down_str) as mock_is_agent_down:
+ mock_is_agent_down.return_value = False
+ with self.network() as net:
+ dhcp_agents = self._list_dhcp_agents_hosting_network(
+ net['network']['id'])
+ self.assertEqual(1, len(dhcp_agents['agents']))
+ with mock.patch(is_agent_down_str) as mock_is_agent_down:
+ mock_is_agent_down.return_value = True
+ with self.network() as net:
+ dhcp_agents = self._list_dhcp_agents_hosting_network(
+ net['network']['id'])
+ self.assertEqual(0, len(dhcp_agents['agents']))
+
+ def test_network_scheduler_with_hosted_network(self):
+ dhcp_hosta = {
+ 'binary': 'quantum-dhcp-agent',
+ 'host': DHCP_HOSTA,
+ 'topic': 'DHCP_AGENT',
+ 'configurations': {'dhcp_driver': 'dhcp_driver',
+ 'use_namespaces': True,
+ },
+ 'agent_type': constants.AGENT_TYPE_DHCP}
+ self._register_one_agent_state(dhcp_hosta)
+ agents = self._list_agents()
+ with self.network() as net1:
+ dhcp_agents = self._list_dhcp_agents_hosting_network(
+ net1['network']['id'])
+ self.assertEqual(1, len(dhcp_agents['agents']))
+ with mock.patch.object(OVSQuantumPluginV2,
+ 'get_dhcp_agents_hosting_networks',
+ autospec=True) as mock_hosting_agents:
+
+ mock_hosting_agents.return_value = agents['agents']
+ with self.network(do_delete=False) as net2:
+ pass
+ dhcp_agents = self._list_dhcp_agents_hosting_network(
+ net2['network']['id'])
+ self.assertEqual(0, len(dhcp_agents['agents']))
+
+ def test_network_policy(self):
+ with self.network() as net1:
+ self._register_agent_states()
+ hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
+ DHCP_HOSTA)
+ self._list_networks_hosted_by_dhcp_agent(
+ hosta_id, expected_code=exc.HTTPForbidden.code,
+ admin_context=False)
+ self._add_network_to_dhcp_agent(
+ hosta_id, net1['network']['id'],
+ expected_code=exc.HTTPForbidden.code,
+ admin_context=False)
+ self._add_network_to_dhcp_agent(hosta_id,
+ net1['network']['id'])
+ self._remove_network_from_dhcp_agent(
+ hosta_id, net1['network']['id'],
+ expected_code=exc.HTTPForbidden.code,
+ admin_context=False)
+ self._list_dhcp_agents_hosting_network(
+ net1['network']['id'],
+ expected_code=exc.HTTPForbidden.code,
+ admin_context=False)
+
+ def test_network_add_to_dhcp_agent(self):
+ with self.network() as net1:
+ self._register_agent_states()
+ hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
+ DHCP_HOSTA)
+ num_before_add = len(
+ self._list_networks_hosted_by_dhcp_agent(
+ hosta_id)['networks'])
+ self._add_network_to_dhcp_agent(hosta_id,
+ net1['network']['id'])
+ num_after_add = len(
+ self._list_networks_hosted_by_dhcp_agent(
+ hosta_id)['networks'])
+ self.assertEqual(0, num_before_add)
+ self.assertEqual(1, num_after_add)
+
+ def test_network_remove_from_dhcp_agent(self):
+ dhcp_hosta = {
+ 'binary': 'quantum-dhcp-agent',
+ 'host': DHCP_HOSTA,
+ 'topic': 'DHCP_AGENT',
+ 'configurations': {'dhcp_driver': 'dhcp_driver',
+ 'use_namespaces': True,
+ },
+ 'agent_type': constants.AGENT_TYPE_DHCP}
+ self._register_one_agent_state(dhcp_hosta)
+ hosta_id = self._get_agent_id(constants.AGENT_TYPE_DHCP,
+ DHCP_HOSTA)
+ with self.network() as net1:
+ num_before_remove = len(
+ self._list_networks_hosted_by_dhcp_agent(
+ hosta_id)['networks'])
+ self._remove_network_from_dhcp_agent(hosta_id,
+ net1['network']['id'])
+ num_after_remove = len(
+ self._list_networks_hosted_by_dhcp_agent(
+ hosta_id)['networks'])
+ self.assertEqual(1, num_before_remove)
+ self.assertEqual(0, num_after_remove)
+
+ def test_router_auto_schedule_with_hosted(self):
+ with self.router() as router:
+ l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
+ self._register_agent_states()
+ l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
+ l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
+ l3_agents = self._list_l3_agents_hosting_router(
+ router['router']['id'])
+ self.assertEqual(1, len(l3_agents['agents']))
+ self.assertEqual(L3_HOSTA, l3_agents['agents'][0]['host'])
+
+ def test_router_auto_schedule_with_hosted_2(self):
+ # one agent hosts one router
+ l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
+ l3_hosta = {
+ 'binary': 'quantum-l3-agent',
+ 'host': L3_HOSTA,
+ 'topic': 'L3_AGENT',
+ 'configurations': {'use_namespaces': True,
+ 'router_id': None,
+ 'handle_internal_only_routers':
+ True,
+ 'gateway_external_network_id':
+ None,
+ 'interface_driver': 'interface_driver',
+ },
+ 'agent_type': constants.AGENT_TYPE_L3}
+ l3_hostb = copy.deepcopy(l3_hosta)
+ l3_hostb['host'] = L3_HOSTB
+ with self.router() as router1:
+ self._register_one_agent_state(l3_hosta)
+ l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
+ hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
+ L3_HOSTA)
+ self._disable_agent(hosta_id, admin_state_up=False)
+ with self.router() as router2:
+ self._register_one_agent_state(l3_hostb)
+ l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
+ l3_agents_1 = self._list_l3_agents_hosting_router(
+ router1['router']['id'])
+ l3_agents_2 = self._list_l3_agents_hosting_router(
+ router2['router']['id'])
+ hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id)
+ num_hosta_routers = len(hosta_routers['routers'])
+ hostb_id = self._get_agent_id(
+ constants.AGENT_TYPE_L3,
+ L3_HOSTB)
+ hostb_routers = self._list_routers_hosted_by_l3_agent(hostb_id)
+ num_hostc_routers = len(hostb_routers['routers'])
+
+ self.assertEqual(1, num_hosta_routers)
+ self.assertEqual(1, num_hostc_routers)
+ self.assertEqual(1, len(l3_agents_1['agents']))
+ self.assertEqual(1, len(l3_agents_2['agents']))
+ self.assertEqual(L3_HOSTA, l3_agents_1['agents'][0]['host'])
+ self.assertEqual(L3_HOSTB, l3_agents_2['agents'][0]['host'])
+
+ def test_router_auto_schedule_with_disabled(self):
+ with contextlib.nested(self.router(),
+ self.router()):
+ l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
+ self._register_agent_states()
+ hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
+ L3_HOSTA)
+ hostb_id = self._get_agent_id(constants.AGENT_TYPE_L3,
+ L3_HOSTB)
+ self._disable_agent(hosta_id)
+ # first agent will not host router since it is disabled
+ l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
+ # second agent will host all the routers since first is disabled.
+ l3_rpc.sync_routers(self.adminContext, host=L3_HOSTB)
+ hostb_routers = self._list_routers_hosted_by_l3_agent(hostb_id)
+ num_hostb_routers = len(hostb_routers['routers'])
+ hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id)
+ num_hosta_routers = len(hosta_routers['routers'])
+ self.assertEqual(2, num_hostb_routers)
+ self.assertEqual(0, num_hosta_routers)
+
+ def test_router_auto_schedule_with_candidates(self):
+ l3_hosta = {
+ 'binary': 'quantum-l3-agent',
+ 'host': L3_HOSTA,
+ 'topic': 'L3_AGENT',
+ 'configurations': {'use_namespaces': False,
+ 'router_id': None,
+ 'handle_internal_only_routers':
+ True,
+ 'gateway_external_network_id':
+ None,
+ 'interface_driver': 'interface_driver',
+ },
+ 'agent_type': constants.AGENT_TYPE_L3}
+ with contextlib.nested(self.router(),
+ self.router()) as (router1, router2):
+ l3_rpc = l3_rpc_base.L3RpcCallbackMixin()
+ l3_hosta['configurations']['router_id'] = router1['router']['id']
+ self._register_one_agent_state(l3_hosta)
+ hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
+ L3_HOSTA)
+ l3_rpc.sync_routers(self.adminContext, host=L3_HOSTA)
+ hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id)
+ num_hosta_routers = len(hosta_routers['routers'])
+ l3_agents_1 = self._list_l3_agents_hosting_router(
+ router1['router']['id'])
+ l3_agents_2 = self._list_l3_agents_hosting_router(
+ router2['router']['id'])
+ # L3 agent will host only the compatible router.
+ self.assertEqual(1, num_hosta_routers)
+ self.assertEqual(1, len(l3_agents_1['agents']))
+ self.assertEqual(0, len(l3_agents_2['agents']))
+
+ def test_router_schedule_with_candidates(self):
+ l3_hosta = {
+ 'binary': 'quantum-l3-agent',
+ 'host': L3_HOSTA,
+ 'topic': 'L3_AGENT',
+ 'configurations': {'use_namespaces': False,
+ 'router_id': None,
+ 'handle_internal_only_routers':
+ True,
+ 'gateway_external_network_id':
+ None,
+ 'interface_driver': 'interface_driver',
+ },
+ 'agent_type': constants.AGENT_TYPE_L3}
+ with contextlib.nested(self.router(),
+ self.router(),
+ self.subnet(),
+ self.subnet(cidr='10.0.3.0/24')) as (router1,
+ router2,
+ subnet1,
+ subnet2):
+ l3_hosta['configurations']['router_id'] = router1['router']['id']
+ self._register_one_agent_state(l3_hosta)
+ hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
+ L3_HOSTA)
+ self._router_interface_action('add',
+ router1['router']['id'],
+ subnet1['subnet']['id'],
+ None)
+ self._router_interface_action('add',
+ router2['router']['id'],
+ subnet2['subnet']['id'],
+ None)
+ hosta_routers = self._list_routers_hosted_by_l3_agent(hosta_id)
+ num_hosta_routers = len(hosta_routers['routers'])
+ l3_agents_1 = self._list_l3_agents_hosting_router(
+ router1['router']['id'])
+ l3_agents_2 = self._list_l3_agents_hosting_router(
+ router2['router']['id'])
+ # L3 agent will host only the compatible router.
+ self.assertEqual(1, num_hosta_routers)
+ self.assertEqual(1, len(l3_agents_1['agents']))
+ self.assertEqual(0, len(l3_agents_2['agents']))
+
+ def test_router_without_l3_agents(self):
+ with self.subnet() as s:
+ self._set_net_external(s['subnet']['network_id'])
+ data = {'router': {'tenant_id': uuidutils.generate_uuid()}}
+ data['router']['name'] = 'router1'
+ data['router']['external_gateway_info'] = {
+ 'network_id': s['subnet']['network_id']}
+ router_req = self.new_create_request('routers', data, self.fmt)
+ res = router_req.get_response(self.ext_api)
+ router = self.deserialize(self.fmt, res)
+ l3agents = (
+ self.agentscheduler_dbMinxin.get_l3_agents_hosting_routers(
+ self.adminContext, [router['router']['id']]))
+ self._delete('routers', router['router']['id'])
+ self.assertEqual(0, len(l3agents))
+
+ def test_router_sync_data(self):
+ with contextlib.nested(self.subnet(),
+ self.subnet(cidr='10.0.2.0/24'),
+ self.subnet(cidr='10.0.3.0/24')) as (
+ s1, s2, s3):
+ self._register_agent_states()
+ self._set_net_external(s1['subnet']['network_id'])
+ data = {'router': {'tenant_id': uuidutils.generate_uuid()}}
+ data['router']['name'] = 'router1'
+ data['router']['external_gateway_info'] = {
+ 'network_id': s1['subnet']['network_id']}
+ router_req = self.new_create_request('routers', data, self.fmt)
+ res = router_req.get_response(self.ext_api)
+ router = self.deserialize(self.fmt, res)
+ self._router_interface_action('add',
+ router['router']['id'],
+ s2['subnet']['id'],
+ None)
+ self._router_interface_action('add',
+ router['router']['id'],
+ s3['subnet']['id'],
+ None)
+ l3agents = self._list_l3_agents_hosting_router(
+ router['router']['id'])
+ self.assertEqual(1, len(l3agents['agents']))
+ agents = self._list_agents()
+ another_l3_agent_id = None
+ another_l3_agent_host = None
+ default = l3agents['agents'][0]['id']
+ for com in agents['agents']:
+ if (com['id'] != default and
+ com['agent_type'] == constants.AGENT_TYPE_L3):
+ another_l3_agent_id = com['id']
+ another_l3_agent_host = com['host']
+ break
+ self.assertTrue(another_l3_agent_id is not None)
+ self._add_router_to_l3_agent(another_l3_agent_id,
+ router['router']['id'],
+ expected_code=exc.HTTPConflict.code)
+ self._remove_router_from_l3_agent(default,
+ router['router']['id'])
+ self._add_router_to_l3_agent(another_l3_agent_id,
+ router['router']['id'])
+ l3agents = self._list_l3_agents_hosting_router(
+ router['router']['id'])
+ self.assertEqual(another_l3_agent_host,
+ l3agents['agents'][0]['host'])
+ self._remove_router_from_l3_agent(another_l3_agent_id,
+ router['router']['id'])
+ self._router_interface_action('remove',
+ router['router']['id'],
+ s2['subnet']['id'],
+ None)
+ l3agents = self._list_l3_agents_hosting_router(
+ router['router']['id'])
+ self.assertEqual(1,
+ len(l3agents['agents']))
+ self._router_interface_action('remove',
+ router['router']['id'],
+ s3['subnet']['id'],
+ None)
+ self._delete('routers', router['router']['id'])
+
+ def test_router_add_to_l3_agent(self):
+ with self.router() as router1:
+ self._register_agent_states()
+ hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
+ L3_HOSTA)
+ num_before_add = len(
+ self._list_routers_hosted_by_l3_agent(
+ hosta_id)['routers'])
+ self._add_router_to_l3_agent(hosta_id,
+ router1['router']['id'])
+ hostb_id = self._get_agent_id(constants.AGENT_TYPE_L3,
+ L3_HOSTB)
+ self._add_router_to_l3_agent(hostb_id,
+ router1['router']['id'],
+ expected_code=exc.HTTPConflict.code)
+ num_after_add = len(
+ self._list_routers_hosted_by_l3_agent(
+ hosta_id)['routers'])
+ self.assertEqual(0, num_before_add)
+ self.assertEqual(1, num_after_add)
+
+ def test_router_policy(self):
+ with self.router() as router1:
+ self._register_agent_states()
+ hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
+ L3_HOSTA)
+ self._list_routers_hosted_by_l3_agent(
+ hosta_id, expected_code=exc.HTTPForbidden.code,
+ admin_context=False)
+ self._add_router_to_l3_agent(
+ hosta_id, router1['router']['id'],
+ expected_code=exc.HTTPForbidden.code,
+ admin_context=False)
+ self._add_router_to_l3_agent(
+ hosta_id, router1['router']['id'])
+ self._remove_router_from_l3_agent(
+ hosta_id, router1['router']['id'],
+ expected_code=exc.HTTPForbidden.code,
+ admin_context=False)
+ self._list_l3_agents_hosting_router(
+ router1['router']['id'],
+ expected_code=exc.HTTPForbidden.code,
+ admin_context=False)
+
+
+class L3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin,
+ test_agent_ext_plugin.AgentDBTestMixIn,
+ AgentSchedulerTestMixIn,
+ test_plugin.QuantumDbPluginV2TestCase):
+ def setUp(self):
+ plugin = ('quantum.plugins.openvswitch.'
+ 'ovs_quantum_plugin.OVSQuantumPluginV2')
+ self.dhcp_notifier_cls_p = mock.patch(
+ 'quantum.api.rpc.agentnotifiers.dhcp_rpc_agent_api.'
+ 'DhcpAgentNotifyAPI')
+ self.dhcp_notifier = mock.Mock(name='dhcp_notifier')
+ self.dhcp_notifier_cls = self.dhcp_notifier_cls_p.start()
+ self.dhcp_notifier_cls.return_value = self.dhcp_notifier
+ super(L3AgentNotifierTestCase, self).setUp(plugin)
+ ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
+ self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
+ self.adminContext = context.get_admin_context()
+ self.addCleanup(self.dhcp_notifier_cls_p.stop)
+
+ def test_router_add_to_l3_agent_notification(self):
+ plugin = manager.QuantumManager.get_plugin()
+ with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
+ with self.router() as router1:
+ self._register_agent_states()
+ hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
+ L3_HOSTA)
+ self._add_router_to_l3_agent(hosta_id,
+ router1['router']['id'])
+ routers = plugin.get_sync_data(self.adminContext,
+ [router1['router']['id']])
+ mock_l3.assert_called_with(
+ mock.ANY,
+ plugin.l3_agent_notifier.make_msg(
+ 'router_added_to_agent',
+ payload=routers),
+ topic='l3_agent.hosta')
+
+ def test_router_remove_from_l3_agent_notification(self):
+ plugin = manager.QuantumManager.get_plugin()
+ with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
+ with self.router() as router1:
+ self._register_agent_states()
+ hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
+ L3_HOSTA)
+ self._add_router_to_l3_agent(hosta_id,
+ router1['router']['id'])
+ self._remove_router_from_l3_agent(hosta_id,
+ router1['router']['id'])
+ mock_l3.assert_called_with(
+ mock.ANY, plugin.l3_agent_notifier.make_msg(
+ 'router_removed_from_agent',
+ payload={'router_id': router1['router']['id']}),
+ topic='l3_agent.hosta')
+
+ def test_agent_updated_l3_agent_notification(self):
+ plugin = manager.QuantumManager.get_plugin()
+ with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
+ self._register_agent_states()
+ hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
+ L3_HOSTA)
+ self._disable_agent(hosta_id, admin_state_up=False)
+ mock_l3.assert_called_with(
+ mock.ANY, plugin.l3_agent_notifier.make_msg(
+ 'agent_updated',
+ payload={'admin_state_up': False}),
+ topic='l3_agent.hosta')
+
+
+class AgentSchedulerTestCaseXML(AgentSchedulerTestCase):
+ fmt = 'xml'
supported_extension_aliases = ["agent"]
-class AgentDBTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
- fmt = 'json'
-
- def setUp(self):
- self.adminContext = context.get_admin_context()
- test_config['plugin_name_v2'] = (
- 'quantum.tests.unit.test_agent_ext_plugin.TestAgentPlugin')
- # for these tests we need to enable overlapping ips
- cfg.CONF.set_default('allow_overlapping_ips', True)
- ext_mgr = AgentTestExtensionManager()
- test_config['extension_manager'] = ext_mgr
- super(AgentDBTestCase, self).setUp()
+class AgentDBTestMixIn(object):
def _list_agents(self, expected_res_status=None,
quantum_context=None,
query_string=None):
- comp_res = self._list('agents',
- quantum_context=quantum_context,
- query_params=query_string)
+ agent_res = self._list('agents',
+ quantum_context=quantum_context,
+ query_params=query_string)
if expected_res_status:
- self.assertEqual(comp_res.status_int, expected_res_status)
- return comp_res
+ self.assertEqual(agent_res.status_int, expected_res_status)
+ return agent_res
def _register_agent_states(self):
"""Register two L3 agents and two DHCP agents."""
agent_state={'agent_state': dhcp_hostc})
return [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc]
+
+class AgentDBTestCase(AgentDBTestMixIn,
+ test_db_plugin.QuantumDbPluginV2TestCase):
+ fmt = 'json'
+
+ def setUp(self):
+ self.adminContext = context.get_admin_context()
+ test_config['plugin_name_v2'] = (
+ 'quantum.tests.unit.test_agent_ext_plugin.TestAgentPlugin')
+ # for these tests we need to enable overlapping ips
+ cfg.CONF.set_default('allow_overlapping_ips', True)
+ ext_mgr = AgentTestExtensionManager()
+ test_config['extension_manager'] = ext_mgr
+ super(AgentDBTestCase, self).setUp()
+
def test_create_agent(self):
data = {'agent': {}}
_req = self.new_create_request('agents', data, self.fmt)
super(TestDhcpRpcCallackMixin, self).setUp()
self.plugin_p = mock.patch('quantum.manager.QuantumManager.get_plugin')
get_plugin = self.plugin_p.start()
- self.plugin = mock.Mock()
+ self.plugin = mock.MagicMock()
get_plugin.return_value = self.plugin
self.callbacks = dhcp_rpc_base.DhcpRpcCallbackMixin()
self.log_p = mock.patch('quantum.db.dhcp_rpc_base.LOG')
def testRouterInfoCreate(self):
id = _uuid()
ri = l3_agent.RouterInfo(id, self.conf.root_helper,
- self.conf.use_namespaces)
+ self.conf.use_namespaces, None)
self.assertTrue(ri.ns_name().endswith(id))
router_id = _uuid()
network_id = _uuid()
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
- self.conf.use_namespaces)
+ self.conf.use_namespaces, None)
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
cidr = '99.0.1.9/24'
mac = 'ca:fe:de:ad:be:ef'
def _test_external_gateway_action(self, action):
router_id = _uuid()
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
- self.conf.use_namespaces)
+ self.conf.use_namespaces, None)
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
internal_cidrs = ['100.0.1.0/24', '200.74.0.0/16']
ex_gw_port = {'fixed_ips': [{'ip_address': '20.0.0.30',
def _test_floating_ip_action(self, action):
router_id = _uuid()
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
- self.conf.use_namespaces)
+ self.conf.use_namespaces, None)
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
floating_ip = '20.0.0.100'
fixed_ip = '10.0.0.23'
router_id = _uuid()
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
- self.conf.use_namespaces)
+ self.conf.use_namespaces,
+ None)
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
fake_route1 = {'destination': '135.207.0.0/16',
router_id = _uuid()
ri = l3_agent.RouterInfo(router_id, self.conf.root_helper,
- self.conf.use_namespaces)
+ self.conf.use_namespaces,
+ None)
ri.router = {}
fake_old_routes = []
import webtest
from quantum.api import extensions
+from quantum.api.rpc.agentnotifiers import l3_rpc_agent_api
from quantum.api.v2 import attributes
from quantum.common import config
from quantum.common import constants as l3_constants
from quantum import context
from quantum.db import db_base_plugin_v2
from quantum.db import l3_db
-from quantum.db import l3_rpc_agent_api
from quantum.db import models_v2
from quantum.extensions import l3
from quantum.manager import QuantumManager
return super(TestL3NatPlugin, self).delete_port(context, id)
-class L3NatTestCaseBase(test_db_plugin.QuantumDbPluginV2TestCase):
-
- def setUp(self):
- test_config['plugin_name_v2'] = (
- 'quantum.tests.unit.test_l3_plugin.TestL3NatPlugin')
- # for these tests we need to enable overlapping ips
- cfg.CONF.set_default('allow_overlapping_ips', True)
- ext_mgr = L3TestExtensionManager()
- test_config['extension_manager'] = ext_mgr
- super(L3NatTestCaseBase, self).setUp()
-
- # Set to None to reload the drivers
- notifier_api._drivers = None
- cfg.CONF.set_override("notification_driver", [test_notifier.__name__])
-
- def tearDown(self):
- test_notifier.NOTIFICATIONS = []
- super(L3NatTestCaseBase, self).tearDown()
+class L3NatTestCaseMixin(object):
def _create_network(self, fmt, name, admin_state_up, **kwargs):
""" Override the routine for allowing the router:external attribute """
kwargs),
kwargs.values()))
arg_list = new_args.pop('arg_list', ()) + (l3.EXTERNAL,)
- return super(L3NatTestCaseBase, self)._create_network(
+ return super(L3NatTestCaseMixin, self)._create_network(
fmt, name, admin_state_up, arg_list=arg_list, **new_args)
def _create_router(self, fmt, tenant_id, name=None,
public_sub['subnet']['network_id'])
+class L3NatTestCaseBase(L3NatTestCaseMixin,
+ test_db_plugin.QuantumDbPluginV2TestCase):
+
+ def setUp(self):
+ test_config['plugin_name_v2'] = (
+ 'quantum.tests.unit.test_l3_plugin.TestL3NatPlugin')
+ # for these tests we need to enable overlapping ips
+ cfg.CONF.set_default('allow_overlapping_ips', True)
+ ext_mgr = L3TestExtensionManager()
+ test_config['extension_manager'] = ext_mgr
+ super(L3NatTestCaseBase, self).setUp()
+
+ # Set to None to reload the drivers
+ notifier_api._drivers = None
+ cfg.CONF.set_override("notification_driver", [test_notifier.__name__])
+
+ def tearDown(self):
+ test_notifier.NOTIFICATIONS = []
+ super(L3NatTestCaseBase, self).tearDown()
+
+
class L3NatDBTestCase(L3NatTestCaseBase):
def test_router_create(self):
def _test_notify_op_agent(self, target_func, *args):
l3_rpc_agent_api_str = (
- 'quantum.db.l3_rpc_agent_api.L3AgentNotifyAPI')
+ 'quantum.api.rpc.agentnotifiers.l3_rpc_agent_api.L3AgentNotifyAPI')
oldNotify = l3_rpc_agent_api.L3AgentNotify
try:
with mock.patch(l3_rpc_agent_api_str) as notifyApi:
downloadcache = ~/cache/pip
[testenv:pep8]
+# E712 comparison to False should be 'if cond is False:' or 'if not cond:'
+# query = query.filter(Component.disabled == False)
+# E125 continuation line does not distinguish itself from next logical line
+
commands =
- pep8 --repeat --show-source --ignore=E125 --exclude=.venv,.tox,dist,doc,openstack,*egg .
+ pep8 --repeat --show-source --ignore=E125,E712 --exclude=.venv,.tox,dist,doc,openstack,*egg .
pep8 --repeat --show-source --ignore=E125 --filename=quantum* bin
[testenv:i18n]