# network_scheduler_driver = neutron.scheduler.dhcp_agent_scheduler.ChanceScheduler
# Driver to use for scheduling router to a default L3 agent
# router_scheduler_driver = neutron.scheduler.l3_agent_scheduler.ChanceScheduler
+# Driver to use for scheduling a loadbalancer pool to an lbaas agent
+# loadbalancer_pool_scheduler_driver = neutron.services.loadbalancer.agent_scheduler.ChanceScheduler
# Allow auto scheduling networks to DHCP agent. It will schedule non-hosted
# networks to first DHCP agent which sends get_active_networks message to
"get_l3-routers": "rule:admin_only",
"get_dhcp-agents": "rule:admin_only",
"get_l3-agents": "rule:admin_only",
+ "get_loadbalancer-agent": "rule:admin_only",
+ "get_loadbalancer-pools": "rule:admin_only",
"create_router": "rule:regular_user",
"get_router": "rule:admin_or_owner",
AGENT_TYPE_LINUXBRIDGE = 'Linux bridge agent'
AGENT_TYPE_NEC = 'NEC plugin agent'
AGENT_TYPE_L3 = 'L3 agent'
+AGENT_TYPE_LOADBALANCER = 'Loadbalancer agent'
L2_AGENT_TOPIC = 'N/A'
PAGINATION_INFINITE = 'infinite'
L3_AGENT_SCHEDULER_EXT_ALIAS = 'l3_agent_scheduler'
DHCP_AGENT_SCHEDULER_EXT_ALIAS = 'dhcp_agent_scheduler'
+LBAAS_AGENT_SCHEDULER_EXT_ALIAS = 'lbaas_agent_scheduler'
RPC_API_VERSION = '1.0'
START_TIME = timeutils.utcnow()
+ def __init__(self, plugin=None):
+ self.plugin = plugin
+
def report_state(self, context, **kwargs):
"""Report state from agent to server."""
time = kwargs['time']
LOG.debug(_("Message with invalid timestamp received"))
return
agent_state = kwargs['agent_state']['agent_state']
- plugin = manager.NeutronManager.get_plugin()
- plugin.create_or_update_agent(context, agent_state)
+ if not self.plugin:
+ self.plugin = manager.NeutronManager.get_plugin()
+ self.plugin.create_or_update_agent(context, agent_state)
class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
"""Common class for agent scheduler mixins."""
- dhcp_agent_notifier = None
- l3_agent_notifier = None
+ # agent notifiers to handle agent update operations;
+ # should be updated by plugins;
+ agent_notifiers = {
+ constants.AGENT_TYPE_DHCP: None,
+ constants.AGENT_TYPE_L3: None,
+ constants.AGENT_TYPE_LOADBALANCER: None,
+ }
@staticmethod
def is_eligible_agent(active, agent):
result = super(AgentSchedulerDbMixin, self).update_agent(
context, id, agent)
agent_data = agent['agent']
- if ('admin_state_up' in agent_data and
+ agent_notifier = self.agent_notifiers.get(original_agent['agent_type'])
+ if (agent_notifier and
+ 'admin_state_up' in agent_data and
original_agent['admin_state_up'] != agent_data['admin_state_up']):
- if (original_agent['agent_type'] == constants.AGENT_TYPE_DHCP and
- self.dhcp_agent_notifier):
- self.dhcp_agent_notifier.agent_updated(
- context, agent_data['admin_state_up'],
- original_agent['host'])
- elif (original_agent['agent_type'] == constants.AGENT_TYPE_L3 and
- self.l3_agent_notifier):
- self.l3_agent_notifier.agent_updated(
- context, agent_data['admin_state_up'],
- original_agent['host'])
+ agent_notifier.agent_updated(context,
+ agent_data['admin_state_up'],
+ original_agent['host'])
return result
raise l3agentscheduler.RouterSchedulingFailed(
router_id=router_id, agent_id=id)
- if self.l3_agent_notifier:
- self.l3_agent_notifier.router_added_to_agent(
+ l3_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_L3)
+ if l3_notifier:
+ l3_notifier.router_added_to_agent(
context, [router_id], agent_db.host)
def remove_router_from_l3_agent(self, context, id, router_id):
raise l3agentscheduler.RouterNotHostedByL3Agent(
router_id=router_id, agent_id=id)
context.session.delete(binding)
- if self.l3_agent_notifier:
- self.l3_agent_notifier.router_removed_from_agent(
+ l3_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_L3)
+ if l3_notifier:
+ l3_notifier.router_removed_from_agent(
context, router_id, agent.host)
def list_routers_on_l3_agent(self, context, id):
binding.dhcp_agent_id = id
binding.network_id = network_id
context.session.add(binding)
- if self.dhcp_agent_notifier:
- self.dhcp_agent_notifier.network_added_to_agent(
+ dhcp_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_DHCP)
+ if dhcp_notifier:
+ dhcp_notifier.network_added_to_agent(
context, network_id, agent_db.host)
def remove_network_from_dhcp_agent(self, context, id, network_id):
raise dhcpagentscheduler.NetworkNotHostedByDhcpAgent(
network_id=network_id, agent_id=id)
context.session.delete(binding)
- if self.dhcp_agent_notifier:
- self.dhcp_agent_notifier.network_removed_from_agent(
+ dhcp_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_DHCP)
+ if dhcp_notifier:
+ dhcp_notifier.network_removed_from_agent(
context, network_id, agent.host)
def list_networks_on_dhcp_agent(self, context, id):
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+"""LBaaS Pool scheduler
+
+Revision ID: 52c5e4a18807
+Revises: 2032abe8edac
+Create Date: 2013-06-14 03:23:47.815865
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '52c5e4a18807'
+down_revision = '2032abe8edac'
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade(active_plugin=None, options=None):
+ ### commands auto generated by Alembic - please adjust! ###
+ op.create_table(
+ 'poolloadbalanceragentbindings',
+ sa.Column('pool_id', sa.String(length=36), nullable=False),
+ sa.Column('loadbalancer_agent_id', sa.String(length=36),
+ nullable=False),
+ sa.ForeignKeyConstraint(['loadbalancer_agent_id'], ['agents.id'],
+ ondelete='CASCADE'),
+ sa.ForeignKeyConstraint(['pool_id'], ['pools.id'],
+ ondelete='CASCADE'),
+ sa.PrimaryKeyConstraint('pool_id')
+ )
+ ### end Alembic commands ###
+
+
+def downgrade(active_plugin=None, options=None):
+ ### commands auto generated by Alembic - please adjust! ###
+ op.drop_table('poolloadbalanceragentbindings')
+ ### end Alembic commands ###
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 OpenStack Foundation.
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from abc import abstractmethod
+
+from neutron.api import extensions
+from neutron.api.v2 import base
+from neutron.api.v2 import resource
+from neutron.common import constants
+from neutron.extensions import agent
+from neutron import manager
+from neutron.plugins.common import constants as plugin_const
+from neutron import policy
+from neutron import wsgi
+
+LOADBALANCER_POOL = 'loadbalancer-pool'
+LOADBALANCER_POOLS = LOADBALANCER_POOL + 's'
+LOADBALANCER_AGENT = 'loadbalancer-agent'
+
+
+class PoolSchedulerController(wsgi.Controller):
+ def index(self, request, **kwargs):
+ lbaas_plugin = manager.NeutronManager.get_service_plugins().get(
+ plugin_const.LOADBALANCER)
+ if not lbaas_plugin:
+ return {'pools': []}
+
+ policy.enforce(request.context,
+ "get_%s" % LOADBALANCER_POOLS,
+ {},
+ plugin=lbaas_plugin)
+ return lbaas_plugin.list_pools_on_lbaas_agent(
+ request.context, kwargs['agent_id'])
+
+
+class LbaasAgentHostingPoolController(wsgi.Controller):
+ def index(self, request, **kwargs):
+ lbaas_plugin = manager.NeutronManager.get_service_plugins().get(
+ plugin_const.LOADBALANCER)
+ if not lbaas_plugin:
+ return
+
+ policy.enforce(request.context,
+ "get_%s" % LOADBALANCER_AGENT,
+ {},
+ plugin=lbaas_plugin)
+ return lbaas_plugin.get_lbaas_agent_hosting_pool(
+ request.context, kwargs['pool_id'])
+
+
+class Lbaas_agentscheduler(extensions.ExtensionDescriptor):
+ """Extension class supporting l3 agent scheduler.
+ """
+
+ @classmethod
+ def get_name(cls):
+ return "Loadbalancer Agent Scheduler"
+
+ @classmethod
+ def get_alias(cls):
+ return constants.LBAAS_AGENT_SCHEDULER_EXT_ALIAS
+
+ @classmethod
+ def get_description(cls):
+ return "Schedule pools among lbaas agents"
+
+ @classmethod
+ def get_namespace(cls):
+ return "http://docs.openstack.org/ext/lbaas_agent_scheduler/api/v1.0"
+
+ @classmethod
+ def get_updated(cls):
+ return "2013-02-07T10:00:00-00:00"
+
+ @classmethod
+ def get_resources(cls):
+ """Returns Ext Resources."""
+ exts = []
+ parent = dict(member_name="agent",
+ collection_name="agents")
+
+ controller = resource.Resource(PoolSchedulerController(),
+ base.FAULT_MAP)
+ exts.append(extensions.ResourceExtension(
+ LOADBALANCER_POOLS, controller, parent))
+
+ parent = dict(member_name="pool",
+ collection_name="pools")
+
+ controller = resource.Resource(LbaasAgentHostingPoolController(),
+ base.FAULT_MAP)
+ exts.append(extensions.ResourceExtension(
+ LOADBALANCER_AGENT, controller, parent,
+ path_prefix=plugin_const.
+ COMMON_PREFIXES[plugin_const.LOADBALANCER]))
+ return exts
+
+ def get_extended_resources(self, version):
+ return {}
+
+
+class NoEligibleLbaasAgent(agent.AgentNotFound):
+ message = _("No eligible loadbalancer agent found "
+ "for pool %(pool_id)s.")
+
+
+class NoActiveLbaasAgent(agent.AgentNotFound):
+ message = _("No active loadbalancer agent found "
+ "for pool %(pool_id)s.")
+
+
+class LbaasAgentSchedulerPluginBase(object):
+ """REST API to operate the lbaas agent scheduler.
+
+ All of method must be in an admin context.
+ """
+
+ @abstractmethod
+ def list_pools_on_lbaas_agent(self, context, id):
+ pass
+
+ @abstractmethod
+ def get_lbaas_agent_hosting_pool(self, context, pool_id):
+ pass
self.service_plugins[plugin_inst.get_plugin_type()] = plugin_inst
+ # search for possible agent notifiers declared in service plugin
+ # (needed by agent management extension)
+ if (hasattr(self.plugin, 'agent_notifiers') and
+ hasattr(plugin_inst, 'agent_notifiers')):
+ self.plugin.agent_notifiers.update(plugin_inst.agent_notifiers)
+
LOG.debug(_("Successfully loaded %(type)s plugin. "
"Description: %(desc)s"),
{"type": plugin_inst.get_plugin_type(),
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
+from neutron.common import constants as q_const
from neutron.common import rpc as q_rpc
from neutron.common import topics
from neutron.common import utils
# Consume from all consumers in a thread
self.conn.consume_in_thread()
self.notifier = AgentNotifierApi(topics.AGENT)
- self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
- self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
+ self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
+ dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
+ )
+ self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
+ l3_rpc_agent_api.L3AgentNotify
+ )
def create_network(self, context, network):
"""Create network.
# Consume from all consumers in a thread
self.conn.consume_in_thread()
self.notifier = AgentNotifierApi(topics.AGENT)
- self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
- self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
+ self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
+ dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
+ )
+ self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
+ l3_rpc_agent_api.L3AgentNotify
+ )
def _parse_network_vlan_ranges(self):
try:
def _setup_rpc(self):
self.notifier = rpc.AgentNotifierApi(topics.AGENT)
- self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
- self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
+ self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
+ dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
+ )
+ self.agent_notifiers[const.AGENT_TYPE_L3] = (
+ l3_rpc_agent_api.L3AgentNotify
+ )
self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager)
self.topic = topics.PLUGIN
self.conn = c_rpc.create_connection(new=True)
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
+from neutron.common import constants as q_const
from neutron.common import exceptions as q_exc
from neutron.common import rpc as q_rpc
from neutron.common import topics
self.topic = topics.PLUGIN
self.conn = rpc.create_connection(new=True)
self.notifier = NECPluginV2AgentNotifierApi(topics.AGENT)
- self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
- self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
+ self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
+ dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
+ )
+ self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
+ l3_rpc_agent_api.L3AgentNotify
+ )
# NOTE: callback_sg is referred to from the sg unit test.
self.callback_sg = SecurityGroupServerRpcCallback()
self.dispatcher = NVPRpcCallbacks().create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
fanout=False)
- self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
+ self.agent_notifiers[constants.AGENT_TYPE_DHCP] = (
+ dhcp_rpc_agent_api.DhcpAgentNotifyAPI())
# Consume from all consumers in a thread
self.conn.consume_in_thread()
self.topic = topics.PLUGIN
self.conn = rpc.create_connection(new=True)
self.notifier = AgentNotifierApi(topics.AGENT)
- self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
- self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
+ self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
+ dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
+ )
+ self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
+ l3_rpc_agent_api.L3AgentNotify
+ )
self.callbacks = OVSRpcCallbacks(self.notifier, self.tunnel_type)
self.dispatcher = self.callbacks.create_rpc_dispatcher()
self.conn.create_consumer(self.topic, self.dispatcher,
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import random
+
+import sqlalchemy as sa
+from sqlalchemy import orm
+from sqlalchemy.orm import joinedload
+
+from neutron.common import constants
+from neutron.db import agents_db
+from neutron.db import agentschedulers_db
+from neutron.db import model_base
+from neutron.extensions import lbaas_agentscheduler
+from neutron.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+class PoolLoadbalancerAgentBinding(model_base.BASEV2):
+ """Represents binding between neutron loadbalancer pools and agents."""
+
+ pool_id = sa.Column(sa.String(36),
+ sa.ForeignKey("pools.id", ondelete='CASCADE'),
+ primary_key=True)
+ agent = orm.relation(agents_db.Agent)
+ agent_id = sa.Column(sa.String(36), sa.ForeignKey("agents.id",
+ ondelete='CASCADE'))
+
+
+class LbaasAgentSchedulerDbMixin(agentschedulers_db.AgentSchedulerDbMixin,
+ lbaas_agentscheduler
+ .LbaasAgentSchedulerPluginBase):
+
+ def get_lbaas_agent_hosting_pool(self, context, pool_id, active=None):
+ query = context.session.query(PoolLoadbalancerAgentBinding)
+ query = query.options(joinedload('agent'))
+ binding = query.get(pool_id)
+
+ if (binding and self.is_eligible_agent(
+ active, binding.agent)):
+ return {'agent': self._make_agent_dict(binding.agent)}
+
+ def get_lbaas_agents(self, context, active=None, filters=None):
+ query = context.session.query(agents_db.Agent)
+ query = query.filter_by(agent_type=constants.AGENT_TYPE_LOADBALANCER)
+ if active is not None:
+ query = query.filter_by(admin_state_up=active)
+ if filters:
+ for key, value in filters.iteritems():
+ column = getattr(agents_db.Agent, key, None)
+ if column:
+ query = query.filter(column.in_(value))
+
+ return [agent
+ for agent in query
+ if self.is_eligible_agent(active, agent)]
+
+ def list_pools_on_lbaas_agent(self, context, id):
+ query = context.session.query(PoolLoadbalancerAgentBinding.pool_id)
+ query = query.filter_by(agent_id=id)
+ pool_ids = [item[0] for item in query]
+ if pool_ids:
+ return {'pools': self.get_pools(context, filters={'id': pool_ids})}
+ else:
+ return {'pools': []}
+
+
+class ChanceScheduler(object):
+ """Allocate a loadbalancer agent for a vip in a random way."""
+
+ def schedule(self, plugin, context, pool):
+ """Schedule the pool to an active loadbalancer agent if there
+ is no enabled agent hosting it.
+ """
+ with context.session.begin(subtransactions=True):
+ lbaas_agent = plugin.get_lbaas_agent_hosting_pool(
+ context, pool['id'])
+ if lbaas_agent:
+ LOG.debug(_('Pool %(pool_id)s has already been hosted'
+ ' by lbaas agent %(agent_id)s'),
+ {'pool_id': pool['id'],
+ 'agent_id': lbaas_agent['id']})
+ return
+
+ candidates = plugin.get_lbaas_agents(context, active=True)
+ if not candidates:
+ LOG.warn(_('No active lbaas agents for pool %s') % pool['id'])
+ return
+
+ chosen_agent = random.choice(candidates)
+ binding = PoolLoadbalancerAgentBinding()
+ binding.agent = chosen_agent
+ binding.pool_id = pool['id']
+ context.session.add(binding)
+ LOG.debug(_('Pool %(pool_id)s is scheduled to '
+ 'lbaas agent %(agent_id)s'),
+ {'pool_id': pool['id'],
+ 'agent_id': chosen_agent['id']})
+ return chosen_agent
cfg.CONF.register_opts(manager.OPTS)
# import interface options just in case the driver uses namespaces
cfg.CONF.register_opts(interface.OPTS)
+ config.register_agent_state_opts_helper(cfg.CONF)
config.register_root_helper(cfg.CONF)
cfg.CONF(project='neutron')
from oslo.config import cfg
from neutron.agent.common import config
+from neutron.agent import rpc as agent_rpc
+from neutron.common import constants
from neutron import context
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
+from neutron.openstack.common import loopingcall
from neutron.openstack.common import periodic_task
from neutron.services.loadbalancer.drivers.haproxy import (
agent_api,
class LbaasAgentManager(periodic_task.PeriodicTasks):
+
+ # history
+ # 1.0 Initial version
+ # 1.1 Support agent_updated call
+ RPC_API_VERSION = '1.1'
+
def __init__(self, conf):
self.conf = conf
try:
except ImportError:
msg = _('Error importing loadbalancer device driver: %s')
raise SystemExit(msg % conf.device_driver)
- ctx = context.get_admin_context_without_session()
+
+ self.agent_state = {
+ 'binary': 'neutron-loadbalancer-agent',
+ 'host': conf.host,
+ 'topic': plugin_driver.TOPIC_LOADBALANCER_AGENT,
+ 'configurations': {'device_driver': conf.device_driver,
+ 'interface_driver': conf.interface_driver},
+ 'agent_type': constants.AGENT_TYPE_LOADBALANCER,
+ 'start_flag': True}
+ self.admin_state_up = True
+
+ self.context = context.get_admin_context_without_session()
+ self._setup_rpc()
+ self.needs_resync = False
+ self.cache = LogicalDeviceCache()
+
+ def _setup_rpc(self):
self.plugin_rpc = agent_api.LbaasAgentApi(
plugin_driver.TOPIC_PROCESS_ON_HOST,
- ctx,
- conf.host
+ self.context,
+ self.conf.host
)
- self.needs_resync = False
- self.cache = LogicalDeviceCache()
+ self.state_rpc = agent_rpc.PluginReportStateAPI(
+ plugin_driver.TOPIC_PROCESS_ON_HOST)
+ report_interval = self.conf.AGENT.report_interval
+ if report_interval:
+ heartbeat = loopingcall.FixedIntervalLoopingCall(
+ self._report_state)
+ heartbeat.start(interval=report_interval)
+
+ def _report_state(self):
+ try:
+ device_count = len(self.cache.devices)
+ self.agent_state['configurations']['devices'] = device_count
+ self.state_rpc.report_state(self.context,
+ self.agent_state)
+ self.agent_state.pop('start_flag', None)
+ except Exception:
+ LOG.exception("Failed reporting state!")
def initialize_service_hook(self, started_by):
self.sync_state()
"""Handle RPC cast from plugin to destroy a pool if known to agent."""
if self.cache.get_by_pool_id(pool_id):
self.destroy_device(pool_id)
+
+ def agent_updated(self, context, payload):
+ """Handle the agent_updated notification event."""
+ if payload['admin_state_up'] != self.admin_state_up:
+ self.admin_state_up = payload['admin_state_up']
+ if self.admin_state_up:
+ self.needs_resync = True
+ else:
+ for pool_id in self.cache.get_pool_ids():
+ self.destroy_device(pool_id)
+ LOG.info(_("agent_updated by server side %s!"), payload)
from oslo.config import cfg
+from neutron.common import constants as q_const
from neutron.common import exceptions as q_exc
from neutron.common import rpc as q_rpc
+from neutron.db import agents_db
from neutron.db.loadbalancer import loadbalancer_db
+from neutron.extensions import lbaas_agentscheduler
+from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.openstack.common.rpc import proxy
constants.PENDING_UPDATE
)
+AGENT_SCHEDULER_OPTS = [
+ cfg.StrOpt('loadbalancer_pool_scheduler_driver',
+ default='neutron.services.loadbalancer.agent_scheduler'
+ '.ChanceScheduler',
+ help=_('Driver to use for scheduling '
+ 'pool to a default loadbalancer agent')),
+]
+
+cfg.CONF.register_opts(AGENT_SCHEDULER_OPTS)
+
# topic name for this particular agent implementation
TOPIC_PROCESS_ON_HOST = 'q-lbaas-process-on-host'
TOPIC_LOADBALANCER_AGENT = 'lbaas_process_on_host_agent'
class LoadBalancerCallbacks(object):
+
RPC_API_VERSION = '1.0'
def __init__(self, plugin):
self.plugin = plugin
def create_rpc_dispatcher(self):
- return q_rpc.PluginRpcDispatcher([self])
+ return q_rpc.PluginRpcDispatcher(
+ [self, agents_db.AgentExtRpcCallback(self.plugin)])
def get_ready_devices(self, context, host=None):
with context.session.begin(subtransactions=True):
up = True # makes pep8 and sqlalchemy happy
qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up)
qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
+ agents = self.plugin.get_lbaas_agents(context,
+ filters={'host': [host]})
+ if not agents:
+ return []
+ elif len(agents) > 1:
+ LOG.warning(_('Multiple lbaas agents found on host %s') % host)
+
+ pools = self.plugin.list_pools_on_lbaas_agent(context,
+ agents[0].id)
+ pool_ids = [pool['id'] for pool in pools['pools']]
+ qry = qry.filter(loadbalancer_db.Pool.id.in_(pool_ids))
return [id for id, in qry]
def get_logical_device(self, context, pool_id=None, activate=True,
class LoadBalancerAgentApi(proxy.RpcProxy):
"""Plugin side of plugin to agent RPC API."""
- API_VERSION = '1.0'
+ BASE_RPC_API_VERSION = '1.0'
+ # history
+ # 1.0 Initial version
+ # 1.1 Support agent_updated call
- def __init__(self, topic, host):
- super(LoadBalancerAgentApi, self).__init__(topic, self.API_VERSION)
- self.host = host
+ def __init__(self, topic):
+ super(LoadBalancerAgentApi, self).__init__(
+ topic, default_version=self.BASE_RPC_API_VERSION)
+
+ def reload_pool(self, context, pool_id, host):
+ return self.cast(
+ context,
+ self.make_msg('reload_pool', pool_id=pool_id, host=host),
+ topic='%s.%s' % (self.topic, host)
+ )
- def reload_pool(self, context, pool_id):
+ def destroy_pool(self, context, pool_id, host):
return self.cast(
context,
- self.make_msg('reload_pool', pool_id=pool_id, host=self.host),
- topic=self.topic
+ self.make_msg('destroy_pool', pool_id=pool_id, host=host),
+ topic='%s.%s' % (self.topic, host)
)
- def destroy_pool(self, context, pool_id):
+ def modify_pool(self, context, pool_id, host):
return self.cast(
context,
- self.make_msg('destroy_pool', pool_id=pool_id, host=self.host),
- topic=self.topic
+ self.make_msg('modify_pool', pool_id=pool_id, host=host),
+ topic='%s.%s' % (self.topic, host)
)
- def modify_pool(self, context, pool_id):
+ def agent_updated(self, context, admin_state_up, host):
return self.cast(
context,
- self.make_msg('modify_pool', pool_id=pool_id, host=self.host),
- topic=self.topic
+ self.make_msg('agent_updated',
+ payload={'admin_state_up': admin_state_up}),
+ topic='%s.%s' % (self.topic, host),
+ version='1.1'
)
class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
+
def __init__(self, plugin):
- self.agent_rpc = LoadBalancerAgentApi(
- TOPIC_LOADBALANCER_AGENT,
- cfg.CONF.host
- )
+ self.agent_rpc = LoadBalancerAgentApi(TOPIC_LOADBALANCER_AGENT)
self.callbacks = LoadBalancerCallbacks(plugin)
self.conn = rpc.create_connection(new=True)
fanout=False)
self.conn.consume_in_thread()
self.plugin = plugin
+ self.plugin.agent_notifiers.update(
+ {q_const.AGENT_TYPE_LOADBALANCER: self.agent_rpc})
+
+ self.pool_scheduler = importutils.import_object(
+ cfg.CONF.loadbalancer_pool_scheduler_driver)
+
+ def get_pool_agent(self, context, pool_id):
+ agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id)
+ if not agent:
+ raise lbaas_agentscheduler.NoActiveLbaasAgent(pool_id=pool_id)
+ return agent['agent']
def create_vip(self, context, vip):
- self.agent_rpc.reload_pool(context, vip['pool_id'])
+ agent = self.get_pool_agent(context, vip['pool_id'])
+ self.agent_rpc.reload_pool(context, vip['pool_id'], agent['host'])
def update_vip(self, context, old_vip, vip):
+ agent = self.get_pool_agent(context, vip['pool_id'])
if vip['status'] in ACTIVE_PENDING:
- self.agent_rpc.reload_pool(context, vip['pool_id'])
+ self.agent_rpc.reload_pool(context, vip['pool_id'], agent['host'])
else:
- self.agent_rpc.destroy_pool(context, vip['pool_id'])
+ self.agent_rpc.destroy_pool(context, vip['pool_id'], agent['host'])
def delete_vip(self, context, vip):
self.plugin._delete_db_vip(context, vip['id'])
- self.agent_rpc.destroy_pool(context, vip['pool_id'])
+ agent = self.get_pool_agent(context, vip['pool_id'])
+ self.agent_rpc.destroy_pool(context, vip['pool_id'], agent['host'])
def create_pool(self, context, pool):
+ if not self.pool_scheduler.schedule(self.plugin, context, pool):
+ raise lbaas_agentscheduler.NoEligibleLbaasAgent(pool_id=pool['id'])
# don't notify here because a pool needs a vip to be useful
- pass
def update_pool(self, context, old_pool, pool):
+ agent = self.get_pool_agent(context, pool['id'])
if pool['status'] in ACTIVE_PENDING:
if pool['vip_id'] is not None:
- self.agent_rpc.reload_pool(context, pool['id'])
+ self.agent_rpc.reload_pool(context, pool['id'], agent['host'])
else:
- self.agent_rpc.destroy_pool(context, pool['id'])
+ self.agent_rpc.destroy_pool(context, pool['id'], agent['host'])
def delete_pool(self, context, pool):
+ agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool['id'])
+ if agent:
+ self.agent_rpc.destroy_pool(context, pool['id'],
+ agent['agent']['host'])
self.plugin._delete_db_pool(context, pool['id'])
- self.agent_rpc.destroy_pool(context, pool['id'])
def create_member(self, context, member):
- self.agent_rpc.modify_pool(context, member['pool_id'])
+ agent = self.get_pool_agent(context, member['pool_id'])
+ self.agent_rpc.modify_pool(context, member['pool_id'], agent['host'])
def update_member(self, context, old_member, member):
# member may change pool id
if member['pool_id'] != old_member['pool_id']:
- self.agent_rpc.modify_pool(context, old_member['pool_id'])
- self.agent_rpc.modify_pool(context, member['pool_id'])
+ agent = self.plugin.get_lbaas_agent_hosting_pool(
+ context, old_member['pool_id'])
+ if agent:
+ self.agent_rpc.modify_pool(context,
+ old_member['pool_id'],
+ agent['agent']['host'])
+ agent = self.get_pool_agent(context, member['pool_id'])
+ self.agent_rpc.modify_pool(context, member['pool_id'], agent['host'])
def delete_member(self, context, member):
self.plugin._delete_db_member(context, member['id'])
- self.agent_rpc.modify_pool(context, member['pool_id'])
+ agent = self.get_pool_agent(context, member['pool_id'])
+ self.agent_rpc.modify_pool(context, member['pool_id'], agent['host'])
def update_health_monitor(self, context, old_health_monitor,
health_monitor, pool_id):
# monitors are unused here because agent will fetch what is necessary
- self.agent_rpc.modify_pool(context, pool_id)
+ agent = self.get_pool_agent(context, pool_id)
+ self.agent_rpc.modify_pool(context, pool_id, agent['host'])
def create_pool_health_monitor(self, context, healthmon, pool_id):
# healthmon is not used here
- self.agent_rpc.modify_pool(context, pool_id)
+ agent = self.get_pool_agent(context, pool_id)
+ self.agent_rpc.modify_pool(context, pool_id, agent['host'])
def delete_pool_health_monitor(self, context, health_monitor, pool_id):
self.plugin._delete_db_pool_health_monitor(
)
# healthmon_id is not used here
- self.agent_rpc.modify_pool(context, pool_id)
+ agent = self.get_pool_agent(context, pool_id)
+ self.agent_rpc.modify_pool(context, pool_id, agent['host'])
def create_health_monitor(self, context, health_monitor):
pass
@log.log
def delete_pool(self, context, pool):
- pass
+ self.plugin._delete_db_pool(context, pool["id"])
@log.log
def stats(self, context, pool_id):
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants
+from neutron.services.loadbalancer import agent_scheduler
LOG = logging.getLogger(__name__)
legacy.override_config(cfg.CONF, [('LBAAS', 'driver_fqn')])
-class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
+class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb,
+ agent_scheduler.LbaasAgentSchedulerDbMixin):
"""Implementation of the Neutron Loadbalancer Service Plugin.
Most DB related works are implemented in class
loadbalancer_db.LoadBalancerPluginDb.
"""
- supported_extension_aliases = ["lbaas"]
+ supported_extension_aliases = ["lbaas", "lbaas_agent_scheduler"]
+
+ # lbaas agent notifiers to handle agent update operations;
+ # can be updated by plugin drivers while loading;
+ # will be extracted by neutron manager when loading service plugins;
+ agent_notifiers = {}
def __init__(self):
"""Initialization for the loadbalancer service plugin."""
# update the db and return the value from db
# else - return what we have in db
if stats_data:
- super(LoadBalancerPlugin, self)._update_pool_stats(
+ super(LoadBalancerPlugin, self).update_pool_stats(
context,
pool_id,
stats_data
import contextlib
import logging
import os
-import testtools
+import mock
+from oslo.config import cfg
+import testtools
import webob.exc
from neutron.api.extensions import ExtensionMiddleware
from neutron.api.extensions import PluginAwareExtensionManager
from neutron.common import config
from neutron import context
+import neutron.db.l3_db # noqa
from neutron.db.loadbalancer import loadbalancer_db as ldb
import neutron.extensions
from neutron.extensions import loadbalancer
extensions_path = ':'.join(neutron.extensions.__path__)
+_subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14"
+
def etcdir(*p):
return os.path.join(ETCDIR, *p)
-class LoadBalancerPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
+class LoadBalancerTestMixin(object):
resource_prefix_map = dict(
(k, constants.COMMON_PREFIXES[constants.LOADBALANCER])
for k in loadbalancer.RESOURCE_ATTRIBUTE_MAP.keys()
)
- def setUp(self, core_plugin=None, lb_plugin=None):
- service_plugins = {'lb_plugin_name': DB_LB_PLUGIN_KLASS}
-
- super(LoadBalancerPluginDbTestCase, self).setUp(
- service_plugins=service_plugins
- )
-
- self._subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14"
-
- self.plugin = loadbalancer_plugin.LoadBalancerPlugin()
- ext_mgr = PluginAwareExtensionManager(
- extensions_path,
- {constants.LOADBALANCER: self.plugin}
- )
- app = config.load_paste_app('extensions_test_app')
- self.ext_api = ExtensionMiddleware(app, ext_mgr=ext_mgr)
-
def _create_vip(self, fmt, name, pool_id, protocol, protocol_port,
admin_state_up, expected_res_status=None, **kwargs):
data = {'vip': {'name': name,
def _create_pool(self, fmt, name, lb_method, protocol, admin_state_up,
expected_res_status=None, **kwargs):
data = {'pool': {'name': name,
- 'subnet_id': self._subnet_id,
+ 'subnet_id': _subnet_id,
'lb_method': lb_method,
'protocol': protocol,
'admin_state_up': admin_state_up,
return res
- def _api_for_resource(self, resource):
- if resource in ['networks', 'subnets', 'ports']:
- return self.api
- else:
- return self.ext_api
-
@contextlib.contextmanager
def vip(self, fmt=None, name='vip1', pool=None, subnet=None,
protocol='HTTP', protocol_port=80, admin_state_up=True,
self._delete('health_monitors', the_health_monitor['id'])
+class LoadBalancerPluginDbTestCase(LoadBalancerTestMixin,
+ test_db_plugin.NeutronDbPluginV2TestCase):
+ def setUp(self, core_plugin=None, lb_plugin=None):
+ service_plugins = {'lb_plugin_name': DB_LB_PLUGIN_KLASS}
+ super(LoadBalancerPluginDbTestCase, self).setUp(
+ service_plugins=service_plugins
+ )
+
+ self._subnet_id = _subnet_id
+
+ self.plugin = loadbalancer_plugin.LoadBalancerPlugin()
+
+ get_lbaas_agent_patcher = mock.patch(
+ 'neutron.services.loadbalancer.agent_scheduler'
+ '.LbaasAgentSchedulerDbMixin.get_lbaas_agent_hosting_pool')
+ mock_lbaas_agent = mock.MagicMock()
+ get_lbaas_agent_patcher.start().return_value = mock_lbaas_agent
+ mock_lbaas_agent.__getitem__.return_value = {'host': 'host'}
+ self.addCleanup(mock.patch.stopall)
+
+ ext_mgr = PluginAwareExtensionManager(
+ extensions_path,
+ {constants.LOADBALANCER: self.plugin}
+ )
+ app = config.load_paste_app('extensions_test_app')
+ self.ext_api = ExtensionMiddleware(app, ext_mgr=ext_mgr)
+
+
class TestLoadBalancer(LoadBalancerPluginDbTestCase):
+ def setUp(self):
+ cfg.CONF.set_override('driver_fqn',
+ 'neutron.services.loadbalancer.drivers.noop'
+ '.noop_driver.NoopLbaaSDriver',
+ group='LBAAS')
+ self.addCleanup(cfg.CONF.reset)
+ super(TestLoadBalancer, self).setUp()
+
def test_create_vip(self, **extras):
expected = {
'name': 'vip1',
"""
supported_extension_aliases = ['dummy', servicetype.EXT_ALIAS]
+ agent_notifiers = {'dummy': 'dummy_agent_notifier'}
def __init__(self):
self.svctype_mgr = servicetype_db.ServiceTypeManager.get_instance()
def test_router_add_to_l3_agent_notification(self):
plugin = manager.NeutronManager.get_plugin()
- with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
+ l3_notifier = plugin.agent_notifiers[constants.AGENT_TYPE_L3]
+ with mock.patch.object(l3_notifier, 'cast') as mock_l3:
with self.router() as router1:
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
routers = [router1['router']['id']]
mock_l3.assert_called_with(
mock.ANY,
- plugin.l3_agent_notifier.make_msg(
+ l3_notifier.make_msg(
'router_added_to_agent',
payload=routers),
topic='l3_agent.hosta')
def test_router_remove_from_l3_agent_notification(self):
plugin = manager.NeutronManager.get_plugin()
- with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
+ l3_notifier = plugin.agent_notifiers[constants.AGENT_TYPE_L3]
+ with mock.patch.object(l3_notifier, 'cast') as mock_l3:
with self.router() as router1:
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
self._remove_router_from_l3_agent(hosta_id,
router1['router']['id'])
mock_l3.assert_called_with(
- mock.ANY, plugin.l3_agent_notifier.make_msg(
+ mock.ANY, l3_notifier.make_msg(
'router_removed_from_agent',
payload={'router_id': router1['router']['id']}),
topic='l3_agent.hosta')
def test_agent_updated_l3_agent_notification(self):
plugin = manager.NeutronManager.get_plugin()
- with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
+ l3_notifier = plugin.agent_notifiers[constants.AGENT_TYPE_L3]
+ with mock.patch.object(l3_notifier, 'cast') as mock_l3:
self._register_agent_states()
hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
L3_HOSTA)
self._disable_agent(hosta_id, admin_state_up=False)
mock_l3.assert_called_with(
- mock.ANY, plugin.l3_agent_notifier.make_msg(
- 'agent_updated',
- payload={'admin_state_up': False}),
+ mock.ANY, l3_notifier.make_msg(
+ 'agent_updated', payload={'admin_state_up': False}),
topic='l3_agent.hosta')
self.callbacks = plugin_driver.LoadBalancerCallbacks(
self.plugin_instance
)
+ get_lbaas_agents_patcher = mock.patch(
+ 'neutron.services.loadbalancer.agent_scheduler'
+ '.LbaasAgentSchedulerDbMixin.get_lbaas_agents')
+ get_lbaas_agents_patcher.start()
+
+ # mocking plugin_driver create_pool() as it does nothing more than
+ # pool scheduling which is beyond the scope of this test case
+ mock.patch('neutron.services.loadbalancer.drivers.haproxy'
+ '.plugin_driver.HaproxyOnHostPluginDriver'
+ '.create_pool').start()
+
+ self.addCleanup(mock.patch.stopall)
def test_get_ready_devices(self):
with self.vip() as vip:
- ready = self.callbacks.get_ready_devices(
- context.get_admin_context(),
- )
- self.assertEqual(ready, [vip['vip']['pool_id']])
+ with mock.patch('neutron.services.loadbalancer.agent_scheduler'
+ '.LbaasAgentSchedulerDbMixin.'
+ 'list_pools_on_lbaas_agent') as mock_agent_pools:
+ mock_agent_pools.return_value = {
+ 'pools': [{'id': vip['vip']['pool_id']}]}
+ ready = self.callbacks.get_ready_devices(
+ context.get_admin_context(),
+ )
+ self.assertEqual(ready, [vip['vip']['pool_id']])
def test_get_ready_devices_multiple_vips_and_pools(self):
ctx = context.get_admin_context()
self.assertEqual(ctx.session.query(ldb.Pool).count(), 3)
self.assertEqual(ctx.session.query(ldb.Vip).count(), 2)
- ready = self.callbacks.get_ready_devices(ctx)
- self.assertEqual(len(ready), 2)
- self.assertIn(pools[0].id, ready)
- self.assertIn(pools[1].id, ready)
- self.assertNotIn(pools[2].id, ready)
+ with mock.patch('neutron.services.loadbalancer.agent_scheduler'
+ '.LbaasAgentSchedulerDbMixin'
+ '.list_pools_on_lbaas_agent') as mock_agent_pools:
+ mock_agent_pools.return_value = {'pools': [{'id': pools[0].id},
+ {'id': pools[1].id},
+ {'id': pools[2].id}]}
+ ready = self.callbacks.get_ready_devices(ctx)
+ self.assertEqual(len(ready), 2)
+ self.assertIn(pools[0].id, ready)
+ self.assertIn(pools[1].id, ready)
+ self.assertNotIn(pools[2].id, ready)
# cleanup
ctx.session.query(ldb.Pool).delete()
ctx.session.query(ldb.Vip).delete()
vip['vip']['id'],
{'vip': {'status': constants.INACTIVE}}
)
-
- ready = self.callbacks.get_ready_devices(
- context.get_admin_context(),
- )
- self.assertFalse(ready)
+ with mock.patch('neutron.services.loadbalancer.agent_scheduler'
+ '.LbaasAgentSchedulerDbMixin.'
+ 'list_pools_on_lbaas_agent') as mock_agent_pools:
+ mock_agent_pools.return_value = {
+ 'pools': [{'id': vip['vip']['pool_id']}]}
+ ready = self.callbacks.get_ready_devices(
+ context.get_admin_context(),
+ )
+ self.assertFalse(ready)
def test_get_ready_devices_inactive_pool(self):
with self.vip() as vip:
vip['vip']['pool_id'],
{'pool': {'status': constants.INACTIVE}}
)
-
- ready = self.callbacks.get_ready_devices(
- context.get_admin_context(),
- )
- self.assertFalse(ready)
+ with mock.patch('neutron.services.loadbalancer.agent_scheduler'
+ '.LbaasAgentSchedulerDbMixin.'
+ 'list_pools_on_lbaas_agent') as mock_agent_pools:
+ mock_agent_pools.return_value = {
+ 'pools': [{'id': vip['vip']['pool_id']}]}
+ ready = self.callbacks.get_ready_devices(
+ context.get_admin_context(),
+ )
+ self.assertFalse(ready)
def test_get_logical_device_inactive(self):
with self.pool() as pool:
super(TestLoadBalancerAgentApi, self).setUp()
self.addCleanup(mock.patch.stopall)
- self.api = plugin_driver.LoadBalancerAgentApi('topic', 'host')
+ self.api = plugin_driver.LoadBalancerAgentApi('topic')
self.mock_cast = mock.patch.object(self.api, 'cast').start()
self.mock_msg = mock.patch.object(self.api, 'make_msg').start()
def test_init(self):
self.assertEqual(self.api.topic, 'topic')
- self.assertEqual(self.api.host, 'host')
def _call_test_helper(self, method_name):
- rv = getattr(self.api, method_name)(mock.sentinel.context, 'the_id')
+ rv = getattr(self.api, method_name)(mock.sentinel.context, 'test',
+ 'host')
self.assertEqual(rv, self.mock_cast.return_value)
self.mock_cast.assert_called_once_with(
mock.sentinel.context,
self.mock_msg.return_value,
- topic='topic'
+ topic='topic.host'
)
self.mock_msg.assert_called_once_with(
method_name,
- pool_id='the_id',
+ pool_id='test',
host='host'
)
def test_modify_pool(self):
self._call_test_helper('modify_pool')
+ def test_agent_updated(self):
+ rv = self.api.agent_updated(mock.sentinel.context, True, 'host')
+ self.assertEqual(rv, self.mock_cast.return_value)
+ self.mock_cast.assert_called_once_with(
+ mock.sentinel.context,
+ self.mock_msg.return_value,
+ topic='topic.host',
+ version='1.1'
+ )
+
+ self.mock_msg.assert_called_once_with(
+ 'agent_updated',
+ payload={'admin_state_up': True}
+ )
+
class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
def setUp(self):
super(TestLoadBalancerPluginNotificationWrapper, self).setUp()
self.mock_api = api_cls.return_value
+ # mocking plugin_driver create_pool() as it does nothing more than
+ # pool scheduling which is beyond the scope of this test case
+ mock.patch('neutron.services.loadbalancer.drivers.haproxy'
+ '.plugin_driver.HaproxyOnHostPluginDriver'
+ '.create_pool').start()
+
self.addCleanup(mock.patch.stopall)
def test_create_vip(self):
with self.vip(pool=pool, subnet=subnet) as vip:
self.mock_api.reload_pool.assert_called_once_with(
mock.ANY,
- vip['vip']['pool_id']
+ vip['vip']['pool_id'],
+ 'host'
)
def test_update_vip(self):
self.mock_api.reload_pool.assert_called_once_with(
mock.ANY,
- vip['vip']['pool_id']
+ vip['vip']['pool_id'],
+ 'host'
)
self.assertEqual(
self.plugin_instance.delete_vip(ctx, vip['vip']['id'])
self.mock_api.destroy_pool.assert_called_once_with(
mock.ANY,
- vip['vip']['pool_id']
+ vip['vip']['pool_id'],
+ 'host'
)
def test_create_pool(self):
ctx = context.get_admin_context()
self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool)
self.mock_api.destroy_pool.assert_called_once_with(
- mock.ANY, pool['pool']['id'])
+ mock.ANY, pool['pool']['id'], 'host')
self.assertFalse(self.mock_api.reload_pool.called)
self.assertFalse(self.mock_api.modify_pool.called)
ctx = context.get_admin_context()
self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool)
self.mock_api.reload_pool.assert_called_once_with(
- mock.ANY, pool['pool']['id'])
+ mock.ANY, pool['pool']['id'], 'host')
self.assertFalse(self.mock_api.destroy_pool.called)
self.assertFalse(self.mock_api.modify_pool.called)
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 204)
self.mock_api.destroy_pool.assert_called_once_with(
- mock.ANY, pool['pool']['id'])
+ mock.ANY, pool['pool']['id'], 'host')
def test_create_member(self):
with self.pool() as pool:
pool_id = pool['pool']['id']
with self.member(pool_id=pool_id):
self.mock_api.modify_pool.assert_called_once_with(
- mock.ANY, pool_id)
+ mock.ANY, pool_id, 'host')
def test_update_member(self):
with self.pool() as pool:
self.plugin_instance.update_member(
ctx, member['member']['id'], member)
self.mock_api.modify_pool.assert_called_once_with(
- mock.ANY, pool_id)
+ mock.ANY, pool_id, 'host')
def test_update_member_new_pool(self):
with self.pool() as pool1:
member)
self.assertEqual(2, self.mock_api.modify_pool.call_count)
self.mock_api.modify_pool.assert_has_calls(
- [mock.call(mock.ANY, pool1_id),
- mock.call(mock.ANY, pool2_id)])
+ [mock.call(mock.ANY, pool1_id, 'host'),
+ mock.call(mock.ANY, pool2_id, 'host')])
def test_delete_member(self):
with self.pool() as pool:
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, 204)
self.mock_api.modify_pool.assert_called_once_with(
- mock.ANY, pool_id)
+ mock.ANY, pool_id, 'host')
def test_create_pool_health_monitor(self):
with self.pool() as pool:
hm,
pool_id)
self.mock_api.modify_pool.assert_called_once_with(
- mock.ANY, pool_id)
+ mock.ANY, pool_id, 'host')
def test_delete_pool_health_monitor(self):
with self.pool() as pool:
self.plugin_instance.delete_pool_health_monitor(
ctx, hm['health_monitor']['id'], pool_id)
self.mock_api.modify_pool.assert_called_once_with(
- mock.ANY, pool_id)
+ mock.ANY, pool_id, 'host')
def test_update_health_monitor_associated_with_pool(self):
with self.health_monitor(type='HTTP') as monitor:
self.assertEqual(res.status_int, 201)
self.mock_api.modify_pool.assert_called_once_with(
mock.ANY,
- pool['pool']['id']
+ pool['pool']['id'],
+ 'host'
)
self.mock_api.reset_mock()
req.get_response(self.ext_api)
self.mock_api.modify_pool.assert_called_once_with(
mock.ANY,
- pool['pool']['id']
+ pool['pool']['id'],
+ 'host'
)
--- /dev/null
+# Copyright (c) 2013 OpenStack Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+from webob import exc
+
+from neutron.api import extensions
+from neutron.api.v2 import attributes
+from neutron.common import constants
+from neutron import context
+from neutron.extensions import agent
+from neutron.extensions import lbaas_agentscheduler
+from neutron import manager
+from neutron.plugins.common import constants as plugin_const
+from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
+from neutron.tests.unit.openvswitch import test_agent_scheduler
+from neutron.tests.unit import test_agent_ext_plugin
+from neutron.tests.unit import test_db_plugin as test_plugin
+from neutron.tests.unit import test_extensions
+
+LBAAS_HOSTA = 'hosta'
+
+
+class AgentSchedulerTestMixIn(test_agent_scheduler.AgentSchedulerTestMixIn):
+ def _list_pools_hosted_by_lbaas_agent(self, agent_id,
+ expected_code=exc.HTTPOk.code,
+ admin_context=True):
+ path = "/agents/%s/%s.%s" % (agent_id,
+ lbaas_agentscheduler.LOADBALANCER_POOLS,
+ self.fmt)
+ return self._request_list(path, expected_code=expected_code,
+ admin_context=admin_context)
+
+ def _get_lbaas_agent_hosting_pool(self, pool_id,
+ expected_code=exc.HTTPOk.code,
+ admin_context=True):
+ path = "/lb/pools/%s/%s.%s" % (pool_id,
+ lbaas_agentscheduler.LOADBALANCER_AGENT,
+ self.fmt)
+ return self._request_list(path, expected_code=expected_code,
+ admin_context=admin_context)
+
+
+class LBaaSAgentSchedulerTestCase(test_agent_ext_plugin.AgentDBTestMixIn,
+ AgentSchedulerTestMixIn,
+ test_db_loadbalancer.LoadBalancerTestMixin,
+ test_plugin.NeutronDbPluginV2TestCase):
+ fmt = 'json'
+ plugin_str = ('neutron.plugins.openvswitch.'
+ 'ovs_neutron_plugin.OVSNeutronPluginV2')
+
+ def setUp(self):
+ # Save the global RESOURCE_ATTRIBUTE_MAP
+ self.saved_attr_map = {}
+ for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
+ self.saved_attr_map[resource] = attrs.copy()
+ service_plugins = {
+ 'lb_plugin_name': test_db_loadbalancer.DB_LB_PLUGIN_KLASS}
+ super(LBaaSAgentSchedulerTestCase, self).setUp(
+ self.plugin_str, service_plugins=service_plugins)
+ ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
+ self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
+ self.adminContext = context.get_admin_context()
+ # Add the resources to the global attribute map
+ # This is done here as the setup process won't
+ # initialize the main API router which extends
+ # the global attribute map
+ attributes.RESOURCE_ATTRIBUTE_MAP.update(
+ agent.RESOURCE_ATTRIBUTE_MAP)
+ self.addCleanup(self.restore_attribute_map)
+
+ def restore_attribute_map(self):
+ # Restore the original RESOURCE_ATTRIBUTE_MAP
+ attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
+
+ def test_report_states(self):
+ self._register_agent_states(lbaas_agents=True)
+ agents = self._list_agents()
+ self.assertEqual(6, len(agents['agents']))
+
+ def test_pool_scheduling_on_pool_creation(self):
+ self._register_agent_states(lbaas_agents=True)
+ with self.pool() as pool:
+ lbaas_agent = self._get_lbaas_agent_hosting_pool(
+ pool['pool']['id'])
+ self.assertIsNotNone(lbaas_agent)
+ self.assertEqual(lbaas_agent['agent']['agent_type'],
+ constants.AGENT_TYPE_LOADBALANCER)
+ pools = self._list_pools_hosted_by_lbaas_agent(
+ lbaas_agent['agent']['id'])
+ self.assertEqual(1, len(pools['pools']))
+ self.assertEqual(pool['pool'], pools['pools'][0])
+
+ def test_schedule_poll_with_disabled_agent(self):
+ lbaas_hosta = {
+ 'binary': 'neutron-loadbalancer-agent',
+ 'host': LBAAS_HOSTA,
+ 'topic': 'LOADBALANCER_AGENT',
+ 'configurations': {'device_driver': 'device_driver',
+ 'interface_driver': 'interface_driver'},
+ 'agent_type': constants.AGENT_TYPE_LOADBALANCER}
+ self._register_one_agent_state(lbaas_hosta)
+ with self.pool() as pool:
+ lbaas_agent = self._get_lbaas_agent_hosting_pool(
+ pool['pool']['id'])
+ self.assertIsNotNone(lbaas_agent)
+
+ agents = self._list_agents()
+ self._disable_agent(agents['agents'][0]['id'])
+ pool = {'pool': {'name': 'test',
+ 'subnet_id': 'test',
+ 'lb_method': 'ROUND_ROBIN',
+ 'protocol': 'HTTP',
+ 'admin_state_up': True,
+ 'tenant_id': 'test',
+ 'description': 'test'}}
+ lbaas_plugin = manager.NeutronManager.get_service_plugins()[
+ plugin_const.LOADBALANCER]
+ self.assertRaises(lbaas_agentscheduler.NoEligibleLbaasAgent,
+ lbaas_plugin.create_pool, self.adminContext, pool)
+
+ def test_schedule_poll_with_down_agent(self):
+ lbaas_hosta = {
+ 'binary': 'neutron-loadbalancer-agent',
+ 'host': LBAAS_HOSTA,
+ 'topic': 'LOADBALANCER_AGENT',
+ 'configurations': {'device_driver': 'device_driver',
+ 'interface_driver': 'interface_driver'},
+ 'agent_type': constants.AGENT_TYPE_LOADBALANCER}
+ self._register_one_agent_state(lbaas_hosta)
+ is_agent_down_str = 'neutron.db.agents_db.AgentDbMixin.is_agent_down'
+ with mock.patch(is_agent_down_str) as mock_is_agent_down:
+ mock_is_agent_down.return_value = False
+ with self.pool() as pool:
+ lbaas_agent = self._get_lbaas_agent_hosting_pool(
+ pool['pool']['id'])
+ self.assertIsNotNone(lbaas_agent)
+ with mock.patch(is_agent_down_str) as mock_is_agent_down:
+ mock_is_agent_down.return_value = True
+ pool = {'pool': {'name': 'test',
+ 'subnet_id': 'test',
+ 'lb_method': 'ROUND_ROBIN',
+ 'protocol': 'HTTP',
+ 'admin_state_up': True,
+ 'tenant_id': 'test',
+ 'description': 'test'}}
+ lbaas_plugin = manager.NeutronManager.get_service_plugins()[
+ plugin_const.LOADBALANCER]
+ self.assertRaises(lbaas_agentscheduler.NoEligibleLbaasAgent,
+ lbaas_plugin.create_pool,
+ self.adminContext, pool)
+
+ def test_pool_unscheduling_on_pool_deletion(self):
+ self._register_agent_states(lbaas_agents=True)
+ with self.pool(no_delete=True) as pool:
+ lbaas_agent = self._get_lbaas_agent_hosting_pool(
+ pool['pool']['id'])
+ self.assertIsNotNone(lbaas_agent)
+ self.assertEqual(lbaas_agent['agent']['agent_type'],
+ constants.AGENT_TYPE_LOADBALANCER)
+ pools = self._list_pools_hosted_by_lbaas_agent(
+ lbaas_agent['agent']['id'])
+ self.assertEqual(1, len(pools['pools']))
+ self.assertEqual(pool['pool'], pools['pools'][0])
+
+ req = self.new_delete_request('pools',
+ pool['pool']['id'])
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 204)
+ pools = self._list_pools_hosted_by_lbaas_agent(
+ lbaas_agent['agent']['id'])
+ self.assertEqual(0, len(pools['pools']))
+
+ def test_pool_scheduling_non_admin_access(self):
+ self._register_agent_states(lbaas_agents=True)
+ with self.pool() as pool:
+ self._get_lbaas_agent_hosting_pool(
+ pool['pool']['id'],
+ expected_code=exc.HTTPForbidden.code,
+ admin_context=False)
+ self._list_pools_hosted_by_lbaas_agent(
+ 'fake_id',
+ expected_code=exc.HTTPForbidden.code,
+ admin_context=False)
+
+
+class LBaaSAgentSchedulerTestCaseXML(LBaaSAgentSchedulerTestCase):
+ fmt = 'xml'
L3_HOSTB = 'hostb'
DHCP_HOSTC = 'hostc'
DHCP_HOST1 = 'host1'
+LBAAS_HOSTA = 'hosta'
+LBAAS_HOSTB = 'hostb'
class AgentTestExtensionManager(object):
self.assertEqual(agent_res.status_int, expected_res_status)
return agent_res
- def _register_agent_states(self):
+ def _register_agent_states(self, lbaas_agents=False):
"""Register two L3 agents and two DHCP agents."""
l3_hosta = {
'binary': 'neutron-l3-agent',
'agent_type': constants.AGENT_TYPE_DHCP}
dhcp_hostc = copy.deepcopy(dhcp_hosta)
dhcp_hostc['host'] = DHCP_HOSTC
+ lbaas_hosta = {
+ 'binary': 'neutron-loadbalancer-agent',
+ 'host': LBAAS_HOSTA,
+ 'topic': 'LOADBALANCER_AGENT',
+ 'configurations': {'device_driver': 'device_driver',
+ 'interface_driver': 'interface_driver',
+ },
+ 'agent_type': constants.AGENT_TYPE_LOADBALANCER}
+ lbaas_hostb = copy.deepcopy(lbaas_hosta)
+ lbaas_hostb['host'] = LBAAS_HOSTB
callback = agents_db.AgentExtRpcCallback()
callback.report_state(self.adminContext,
agent_state={'agent_state': l3_hosta},
callback.report_state(self.adminContext,
agent_state={'agent_state': dhcp_hostc},
time=timeutils.strtime())
- return [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc]
+
+ res = [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc]
+ if lbaas_agents:
+ callback.report_state(self.adminContext,
+ agent_state={'agent_state': lbaas_hosta},
+ time=timeutils.strtime())
+ callback.report_state(self.adminContext,
+ agent_state={'agent_state': lbaas_hostb},
+ time=timeutils.strtime())
+ res += [lbaas_hosta, lbaas_hostb]
+
+ return res
def _register_one_dhcp_agent(self):
"""Register one DHCP agent."""
supported_extension_aliases = ['lbaas', 'dummy']
+class CorePluginWithAgentNotifiers(object):
+ agent_notifiers = {'l3': 'l3_agent_notifier',
+ 'dhcp': 'dhcp_agent_notifier'}
+
+
class NeutronManagerTestCase(base.BaseTestCase):
def setUp(self):
self.assertIsNotNone(validate_pre_plugin_load())
cfg.CONF.set_override('core_plugin', 'dummy.plugin')
self.assertIsNone(validate_pre_plugin_load())
+
+ def test_manager_gathers_agent_notifiers_from_service_plugins(self):
+ cfg.CONF.set_override("service_plugins",
+ ["neutron.tests.unit.dummy_plugin."
+ "DummyServicePlugin"])
+ cfg.CONF.set_override("core_plugin",
+ "neutron.tests.unit.test_neutron_manager."
+ "CorePluginWithAgentNotifiers")
+ expected = {'l3': 'l3_agent_notifier',
+ 'dhcp': 'dhcp_agent_notifier',
+ 'dummy': 'dummy_agent_notifier'}
+ core_plugin = NeutronManager.get_plugin()
+ self.assertEqual(expected, core_plugin.agent_notifiers)