DHCP = 'q-dhcp-notifer'
FIREWALL_PLUGIN = 'q-firewall-plugin'
METERING_PLUGIN = 'q-metering-plugin'
+LOADBALANCER_PLUGIN = 'n-lbaas-plugin'
L3_AGENT = 'l3_agent'
DHCP_AGENT = 'dhcp_agent'
METERING_AGENT = 'metering_agent'
+LOADBALANCER_AGENT = 'n-lbaas_agent'
def get_topic_name(prefix, table, operation, host=None):
from neutron.agent.common import config
from neutron.agent.linux import interface
from neutron.common import legacy
+from neutron.common import topics
from neutron.openstack.common.rpc import service as rpc_service
from neutron.openstack.common import service
-from neutron.services.loadbalancer.drivers.haproxy import (
- agent_manager as manager,
- plugin_driver
-)
+from neutron.services.loadbalancer.agent import agent_manager as manager
OPTS = [
cfg.IntOpt(
mgr = manager.LbaasAgentManager(cfg.CONF)
svc = LbaasAgentService(
host=cfg.CONF.host,
- topic=plugin_driver.TOPIC_LOADBALANCER_AGENT,
+ topic=topics.LOADBALANCER_AGENT,
manager=mgr
)
service.launch(svc).wait()
from neutron.agent import rpc as agent_rpc
from neutron.common import constants as n_const
from neutron.common import exceptions as n_exc
+from neutron.common import topics
from neutron import context
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common import periodic_task
from neutron.plugins.common import constants
-from neutron.services.loadbalancer.drivers.haproxy import (
- agent_api,
- plugin_driver
-)
+from neutron.services.loadbalancer.agent import agent_api
LOG = logging.getLogger(__name__)
self.conf = conf
self.context = context.get_admin_context_without_session()
self.plugin_rpc = agent_api.LbaasAgentApi(
- plugin_driver.TOPIC_LOADBALANCER_PLUGIN,
+ topics.LOADBALANCER_PLUGIN,
self.context,
self.conf.host
)
self.agent_state = {
'binary': 'neutron-lbaas-agent',
'host': conf.host,
- 'topic': plugin_driver.TOPIC_LOADBALANCER_AGENT,
+ 'topic': topics.LOADBALANCER_AGENT,
'configurations': {'device_drivers': self.device_drivers.keys()},
'agent_type': n_const.AGENT_TYPE_LOADBALANCER,
'start_flag': True}
def _setup_state_rpc(self):
self.state_rpc = agent_rpc.PluginReportStateAPI(
- plugin_driver.TOPIC_LOADBALANCER_PLUGIN)
+ topics.LOADBALANCER_PLUGIN)
report_interval = self.conf.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Mark McClain, DreamHost
+
+import uuid
+
+from oslo.config import cfg
+
+from neutron.common import constants as q_const
+from neutron.common import exceptions as q_exc
+from neutron.common import rpc as q_rpc
+from neutron.common import topics
+from neutron.db import agents_db
+from neutron.db.loadbalancer import loadbalancer_db
+from neutron.extensions import lbaas_agentscheduler
+from neutron.extensions import portbindings
+from neutron.openstack.common import importutils
+from neutron.openstack.common import log as logging
+from neutron.openstack.common import rpc
+from neutron.openstack.common.rpc import proxy
+from neutron.plugins.common import constants
+from neutron.services.loadbalancer.drivers import abstract_driver
+
+LOG = logging.getLogger(__name__)
+
+AGENT_SCHEDULER_OPTS = [
+ cfg.StrOpt('loadbalancer_pool_scheduler_driver',
+ default='neutron.services.loadbalancer.agent_scheduler'
+ '.ChanceScheduler',
+ help=_('Driver to use for scheduling '
+ 'pool to a default loadbalancer agent')),
+]
+
+cfg.CONF.register_opts(AGENT_SCHEDULER_OPTS)
+
+
+class DriverNotSpecified(q_exc.NeutronException):
+ message = _("Device driver for agent should be specified "
+ "in plugin driver.")
+
+
+class LoadBalancerCallbacks(object):
+
+ RPC_API_VERSION = '2.0'
+ # history
+ # 1.0 Initial version
+ # 2.0 Generic API for agent based drivers
+ # - get_logical_device() handling changed;
+ # - pool_deployed() and update_status() methods added;
+
+ def __init__(self, plugin):
+ self.plugin = plugin
+
+ def create_rpc_dispatcher(self):
+ return q_rpc.PluginRpcDispatcher(
+ [self, agents_db.AgentExtRpcCallback(self.plugin)])
+
+ def get_ready_devices(self, context, host=None):
+ with context.session.begin(subtransactions=True):
+ agents = self.plugin.get_lbaas_agents(context,
+ filters={'host': [host]})
+ if not agents:
+ return []
+ elif len(agents) > 1:
+ LOG.warning(_('Multiple lbaas agents found on host %s'), host)
+ pools = self.plugin.list_pools_on_lbaas_agent(context,
+ agents[0].id)
+ pool_ids = [pool['id'] for pool in pools['pools']]
+
+ qry = context.session.query(loadbalancer_db.Pool.id)
+ qry = qry.filter(loadbalancer_db.Pool.id.in_(pool_ids))
+ qry = qry.filter(
+ loadbalancer_db.Pool.status.in_(constants.ACTIVE_PENDING))
+ up = True # makes pep8 and sqlalchemy happy
+ qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
+ return [id for id, in qry]
+
+ def get_logical_device(self, context, pool_id=None):
+ with context.session.begin(subtransactions=True):
+ qry = context.session.query(loadbalancer_db.Pool)
+ qry = qry.filter_by(id=pool_id)
+ pool = qry.one()
+
+ if pool.status != constants.ACTIVE:
+ raise q_exc.Invalid(_('Expected active pool'))
+
+ retval = {}
+ retval['pool'] = self.plugin._make_pool_dict(pool)
+
+ if pool.vip:
+ retval['vip'] = self.plugin._make_vip_dict(pool.vip)
+ retval['vip']['port'] = (
+ self.plugin._core_plugin._make_port_dict(pool.vip.port)
+ )
+ for fixed_ip in retval['vip']['port']['fixed_ips']:
+ fixed_ip['subnet'] = (
+ self.plugin._core_plugin.get_subnet(
+ context,
+ fixed_ip['subnet_id']
+ )
+ )
+ retval['members'] = [
+ self.plugin._make_member_dict(m)
+ for m in pool.members if (
+ m.status in constants.ACTIVE_PENDING or
+ m.status == constants.INACTIVE)
+ ]
+ retval['healthmonitors'] = [
+ self.plugin._make_health_monitor_dict(hm.healthmonitor)
+ for hm in pool.monitors
+ if hm.status in constants.ACTIVE_PENDING
+ ]
+ retval['driver'] = (
+ self.plugin.drivers[pool.provider.provider_name].device_driver)
+
+ return retval
+
+ def pool_deployed(self, context, pool_id):
+ with context.session.begin(subtransactions=True):
+ qry = context.session.query(loadbalancer_db.Pool)
+ qry = qry.filter_by(id=pool_id)
+ pool = qry.one()
+
+ # set all resources to active
+ if pool.status in constants.ACTIVE_PENDING:
+ pool.status = constants.ACTIVE
+
+ if pool.vip and pool.vip.status in constants.ACTIVE_PENDING:
+ pool.vip.status = constants.ACTIVE
+
+ for m in pool.members:
+ if m.status in constants.ACTIVE_PENDING:
+ m.status = constants.ACTIVE
+
+ for hm in pool.monitors:
+ if hm.status in constants.ACTIVE_PENDING:
+ hm.status = constants.ACTIVE
+
+ def update_status(self, context, obj_type, obj_id, status):
+ model_mapping = {
+ 'pool': loadbalancer_db.Pool,
+ 'vip': loadbalancer_db.Vip,
+ 'member': loadbalancer_db.Member,
+ 'health_monitor': loadbalancer_db.PoolMonitorAssociation
+ }
+ if obj_type not in model_mapping:
+ raise q_exc.Invalid(_('Unknown object type: %s') % obj_type)
+ try:
+ if obj_type == 'health_monitor':
+ self.plugin.update_pool_health_monitor(
+ context, obj_id['monitor_id'], obj_id['pool_id'], status)
+ else:
+ self.plugin.update_status(
+ context, model_mapping[obj_type], obj_id, status)
+ except q_exc.NotFound:
+ # update_status may come from agent on an object which was
+ # already deleted from db with other request
+ LOG.warning(_('Cannot update status: %(obj_type)s %(obj_id)s '
+ 'not found in the DB, it was probably deleted '
+ 'concurrently'),
+ {'obj_type': obj_type, 'obj_id': obj_id})
+
+ def pool_destroyed(self, context, pool_id=None):
+ """Agent confirmation hook that a pool has been destroyed.
+
+ This method exists for subclasses to change the deletion
+ behavior.
+ """
+ pass
+
+ def plug_vip_port(self, context, port_id=None, host=None):
+ if not port_id:
+ return
+
+ try:
+ port = self.plugin._core_plugin.get_port(
+ context,
+ port_id
+ )
+ except q_exc.PortNotFound:
+ msg = _('Unable to find port %s to plug.')
+ LOG.debug(msg, port_id)
+ return
+
+ port['admin_state_up'] = True
+ port['device_owner'] = 'neutron:' + constants.LOADBALANCER
+ port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host)))
+ port[portbindings.HOST_ID] = host
+ self.plugin._core_plugin.update_port(
+ context,
+ port_id,
+ {'port': port}
+ )
+
+ def unplug_vip_port(self, context, port_id=None, host=None):
+ if not port_id:
+ return
+
+ try:
+ port = self.plugin._core_plugin.get_port(
+ context,
+ port_id
+ )
+ except q_exc.PortNotFound:
+ msg = _('Unable to find port %s to unplug. This can occur when '
+ 'the Vip has been deleted first.')
+ LOG.debug(msg, port_id)
+ return
+
+ port['admin_state_up'] = False
+ port['device_owner'] = ''
+ port['device_id'] = ''
+
+ try:
+ self.plugin._core_plugin.update_port(
+ context,
+ port_id,
+ {'port': port}
+ )
+
+ except q_exc.PortNotFound:
+ msg = _('Unable to find port %s to unplug. This can occur when '
+ 'the Vip has been deleted first.')
+ LOG.debug(msg, port_id)
+
+ def update_pool_stats(self, context, pool_id=None, stats=None, host=None):
+ self.plugin.update_pool_stats(context, pool_id, data=stats)
+
+
+class LoadBalancerAgentApi(proxy.RpcProxy):
+ """Plugin side of plugin to agent RPC API."""
+
+ BASE_RPC_API_VERSION = '2.0'
+ # history
+ # 1.0 Initial version
+ # 1.1 Support agent_updated call
+ # 2.0 Generic API for agent based drivers
+ # - modify/reload/destroy_pool methods were removed;
+ # - added methods to handle create/update/delete for every lbaas
+ # object individually;
+
+ def __init__(self, topic):
+ super(LoadBalancerAgentApi, self).__init__(
+ topic, default_version=self.BASE_RPC_API_VERSION)
+
+ def _cast(self, context, method_name, method_args, host, version=None):
+ return self.cast(
+ context,
+ self.make_msg(method_name, **method_args),
+ topic='%s.%s' % (self.topic, host),
+ version=version
+ )
+
+ def create_vip(self, context, vip, host):
+ return self._cast(context, 'create_vip', {'vip': vip}, host)
+
+ def update_vip(self, context, old_vip, vip, host):
+ return self._cast(context, 'update_vip',
+ {'old_vip': old_vip, 'vip': vip}, host)
+
+ def delete_vip(self, context, vip, host):
+ return self._cast(context, 'delete_vip', {'vip': vip}, host)
+
+ def create_pool(self, context, pool, host, driver_name):
+ return self._cast(context, 'create_pool',
+ {'pool': pool, 'driver_name': driver_name}, host)
+
+ def update_pool(self, context, old_pool, pool, host):
+ return self._cast(context, 'update_pool',
+ {'old_pool': old_pool, 'pool': pool}, host)
+
+ def delete_pool(self, context, pool, host):
+ return self._cast(context, 'delete_pool', {'pool': pool}, host)
+
+ def create_member(self, context, member, host):
+ return self._cast(context, 'create_member', {'member': member}, host)
+
+ def update_member(self, context, old_member, member, host):
+ return self._cast(context, 'update_member',
+ {'old_member': old_member, 'member': member}, host)
+
+ def delete_member(self, context, member, host):
+ return self._cast(context, 'delete_member', {'member': member}, host)
+
+ def create_pool_health_monitor(self, context, health_monitor, pool_id,
+ host):
+ return self._cast(context, 'create_pool_health_monitor',
+ {'health_monitor': health_monitor,
+ 'pool_id': pool_id}, host)
+
+ def update_pool_health_monitor(self, context, old_health_monitor,
+ health_monitor, pool_id, host):
+ return self._cast(context, 'update_pool_health_monitor',
+ {'old_health_monitor': old_health_monitor,
+ 'health_monitor': health_monitor,
+ 'pool_id': pool_id}, host)
+
+ def delete_pool_health_monitor(self, context, health_monitor, pool_id,
+ host):
+ return self._cast(context, 'delete_pool_health_monitor',
+ {'health_monitor': health_monitor,
+ 'pool_id': pool_id}, host)
+
+ def agent_updated(self, context, admin_state_up, host):
+ return self._cast(context, 'agent_updated',
+ {'payload': {'admin_state_up': admin_state_up}},
+ host)
+
+
+class AgentDriverBase(abstract_driver.LoadBalancerAbstractDriver):
+
+ # name of device driver that should be used by the agent;
+ # vendor specific plugin drivers must override it;
+ device_driver = None
+
+ def __init__(self, plugin):
+ if not self.device_driver:
+ raise DriverNotSpecified()
+
+ self.agent_rpc = LoadBalancerAgentApi(topics.LOADBALANCER_AGENT)
+
+ self.plugin = plugin
+ self._set_callbacks_on_plugin()
+ self.plugin.agent_notifiers.update(
+ {q_const.AGENT_TYPE_LOADBALANCER: self.agent_rpc})
+
+ self.pool_scheduler = importutils.import_object(
+ cfg.CONF.loadbalancer_pool_scheduler_driver)
+
+ def _set_callbacks_on_plugin(self):
+ # other agent based plugin driver might already set callbacks on plugin
+ if hasattr(self.plugin, 'agent_callbacks'):
+ return
+
+ self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin)
+ self.plugin.conn = rpc.create_connection(new=True)
+ self.plugin.conn.create_consumer(
+ topics.LOADBALANCER_PLUGIN,
+ self.plugin.agent_callbacks.create_rpc_dispatcher(),
+ fanout=False)
+ self.plugin.conn.consume_in_thread()
+
+ def get_pool_agent(self, context, pool_id):
+ agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id)
+ if not agent:
+ raise lbaas_agentscheduler.NoActiveLbaasAgent(pool_id=pool_id)
+ return agent['agent']
+
+ def create_vip(self, context, vip):
+ agent = self.get_pool_agent(context, vip['pool_id'])
+ self.agent_rpc.create_vip(context, vip, agent['host'])
+
+ def update_vip(self, context, old_vip, vip):
+ agent = self.get_pool_agent(context, vip['pool_id'])
+ if vip['status'] in constants.ACTIVE_PENDING:
+ self.agent_rpc.update_vip(context, old_vip, vip, agent['host'])
+ else:
+ self.agent_rpc.delete_vip(context, vip, agent['host'])
+
+ def delete_vip(self, context, vip):
+ self.plugin._delete_db_vip(context, vip['id'])
+ agent = self.get_pool_agent(context, vip['pool_id'])
+ self.agent_rpc.delete_vip(context, vip, agent['host'])
+
+ def create_pool(self, context, pool):
+ agent = self.pool_scheduler.schedule(self.plugin, context, pool,
+ self.device_driver)
+ if not agent:
+ raise lbaas_agentscheduler.NoEligibleLbaasAgent(pool_id=pool['id'])
+ self.agent_rpc.create_pool(context, pool, agent['host'],
+ self.device_driver)
+
+ def update_pool(self, context, old_pool, pool):
+ agent = self.get_pool_agent(context, pool['id'])
+ if pool['status'] in constants.ACTIVE_PENDING:
+ self.agent_rpc.update_pool(context, old_pool, pool,
+ agent['host'])
+ else:
+ self.agent_rpc.delete_pool(context, pool, agent['host'])
+
+ def delete_pool(self, context, pool):
+ # get agent first to know host as binding will be deleted
+ # after pool is deleted from db
+ agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool['id'])
+ self.plugin._delete_db_pool(context, pool['id'])
+ if agent:
+ self.agent_rpc.delete_pool(context, pool, agent['agent']['host'])
+
+ def create_member(self, context, member):
+ agent = self.get_pool_agent(context, member['pool_id'])
+ self.agent_rpc.create_member(context, member, agent['host'])
+
+ def update_member(self, context, old_member, member):
+ agent = self.get_pool_agent(context, member['pool_id'])
+ # member may change pool id
+ if member['pool_id'] != old_member['pool_id']:
+ old_pool_agent = self.plugin.get_lbaas_agent_hosting_pool(
+ context, old_member['pool_id'])
+ if old_pool_agent:
+ self.agent_rpc.delete_member(context, old_member,
+ old_pool_agent['agent']['host'])
+ self.agent_rpc.create_member(context, member, agent['host'])
+ else:
+ self.agent_rpc.update_member(context, old_member, member,
+ agent['host'])
+
+ def delete_member(self, context, member):
+ self.plugin._delete_db_member(context, member['id'])
+ agent = self.get_pool_agent(context, member['pool_id'])
+ self.agent_rpc.delete_member(context, member, agent['host'])
+
+ def create_pool_health_monitor(self, context, healthmon, pool_id):
+ # healthmon is not used here
+ agent = self.get_pool_agent(context, pool_id)
+ self.agent_rpc.create_pool_health_monitor(context, healthmon,
+ pool_id, agent['host'])
+
+ def update_pool_health_monitor(self, context, old_health_monitor,
+ health_monitor, pool_id):
+ agent = self.get_pool_agent(context, pool_id)
+ self.agent_rpc.update_pool_health_monitor(context, old_health_monitor,
+ health_monitor, pool_id,
+ agent['host'])
+
+ def delete_pool_health_monitor(self, context, health_monitor, pool_id):
+ self.plugin._delete_db_pool_health_monitor(
+ context, health_monitor['id'], pool_id
+ )
+
+ agent = self.get_pool_agent(context, pool_id)
+ self.agent_rpc.delete_pool_health_monitor(context, health_monitor,
+ pool_id, agent['host'])
+
+ def stats(self, context, pool_id):
+ pass
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants
+from neutron.services.loadbalancer.agent import agent_device_driver
from neutron.services.loadbalancer import constants as lb_const
-from neutron.services.loadbalancer.drivers import agent_device_driver
from neutron.services.loadbalancer.drivers.haproxy import cfg as hacfg
LOG = logging.getLogger(__name__)
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-#
-# Copyright 2013 New Dream Network, LLC (DreamHost)
+
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-#
-# @author: Mark McClain, DreamHost
-
-import uuid
-
-from oslo.config import cfg
-
-from neutron.common import constants as q_const
-from neutron.common import exceptions as q_exc
-from neutron.common import rpc as q_rpc
-from neutron.db import agents_db
-from neutron.db.loadbalancer import loadbalancer_db
-from neutron.extensions import lbaas_agentscheduler
-from neutron.extensions import portbindings
-from neutron.openstack.common import importutils
-from neutron.openstack.common import log as logging
-from neutron.openstack.common import rpc
-from neutron.openstack.common.rpc import proxy
-from neutron.plugins.common import constants
-from neutron.services.loadbalancer.drivers import abstract_driver
-
-LOG = logging.getLogger(__name__)
-
-AGENT_SCHEDULER_OPTS = [
- cfg.StrOpt('loadbalancer_pool_scheduler_driver',
- default='neutron.services.loadbalancer.agent_scheduler'
- '.ChanceScheduler',
- help=_('Driver to use for scheduling '
- 'pool to a default loadbalancer agent')),
-]
-
-cfg.CONF.register_opts(AGENT_SCHEDULER_OPTS)
-
-# topic name for this particular agent implementation
-TOPIC_LOADBALANCER_PLUGIN = 'n-lbaas-plugin'
-TOPIC_LOADBALANCER_AGENT = 'n-lbaas_agent'
-
-
-class DriverNotSpecified(q_exc.NeutronException):
- message = _("Device driver for agent should be specified "
- "in plugin driver.")
-
-
-class LoadBalancerCallbacks(object):
-
- RPC_API_VERSION = '2.0'
- # history
- # 1.0 Initial version
- # 2.0 Generic API for agent based drivers
- # - get_logical_device() handling changed;
- # - pool_deployed() and update_status() methods added;
-
- def __init__(self, plugin):
- self.plugin = plugin
-
- def create_rpc_dispatcher(self):
- return q_rpc.PluginRpcDispatcher(
- [self, agents_db.AgentExtRpcCallback(self.plugin)])
-
- def get_ready_devices(self, context, host=None):
- with context.session.begin(subtransactions=True):
- agents = self.plugin.get_lbaas_agents(context,
- filters={'host': [host]})
- if not agents:
- return []
- elif len(agents) > 1:
- LOG.warning(_('Multiple lbaas agents found on host %s'), host)
- pools = self.plugin.list_pools_on_lbaas_agent(context,
- agents[0].id)
- pool_ids = [pool['id'] for pool in pools['pools']]
-
- qry = context.session.query(loadbalancer_db.Pool.id)
- qry = qry.filter(loadbalancer_db.Pool.id.in_(pool_ids))
- qry = qry.filter(
- loadbalancer_db.Pool.status.in_(constants.ACTIVE_PENDING))
- up = True # makes pep8 and sqlalchemy happy
- qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
- return [id for id, in qry]
-
- def get_logical_device(self, context, pool_id=None):
- with context.session.begin(subtransactions=True):
- qry = context.session.query(loadbalancer_db.Pool)
- qry = qry.filter_by(id=pool_id)
- pool = qry.one()
-
- if pool.status != constants.ACTIVE:
- raise q_exc.Invalid(_('Expected active pool'))
-
- retval = {}
- retval['pool'] = self.plugin._make_pool_dict(pool)
-
- if pool.vip:
- retval['vip'] = self.plugin._make_vip_dict(pool.vip)
- retval['vip']['port'] = (
- self.plugin._core_plugin._make_port_dict(pool.vip.port)
- )
- for fixed_ip in retval['vip']['port']['fixed_ips']:
- fixed_ip['subnet'] = (
- self.plugin._core_plugin.get_subnet(
- context,
- fixed_ip['subnet_id']
- )
- )
- retval['members'] = [
- self.plugin._make_member_dict(m)
- for m in pool.members if (
- m.status in constants.ACTIVE_PENDING or
- m.status == constants.INACTIVE)
- ]
- retval['healthmonitors'] = [
- self.plugin._make_health_monitor_dict(hm.healthmonitor)
- for hm in pool.monitors
- if hm.status in constants.ACTIVE_PENDING
- ]
- retval['driver'] = (
- self.plugin.drivers[pool.provider.provider_name].device_driver)
-
- return retval
-
- def pool_deployed(self, context, pool_id):
- with context.session.begin(subtransactions=True):
- qry = context.session.query(loadbalancer_db.Pool)
- qry = qry.filter_by(id=pool_id)
- pool = qry.one()
-
- # set all resources to active
- if pool.status in constants.ACTIVE_PENDING:
- pool.status = constants.ACTIVE
-
- if pool.vip and pool.vip.status in constants.ACTIVE_PENDING:
- pool.vip.status = constants.ACTIVE
-
- for m in pool.members:
- if m.status in constants.ACTIVE_PENDING:
- m.status = constants.ACTIVE
-
- for hm in pool.monitors:
- if hm.status in constants.ACTIVE_PENDING:
- hm.status = constants.ACTIVE
-
- def update_status(self, context, obj_type, obj_id, status):
- model_mapping = {
- 'pool': loadbalancer_db.Pool,
- 'vip': loadbalancer_db.Vip,
- 'member': loadbalancer_db.Member,
- 'health_monitor': loadbalancer_db.PoolMonitorAssociation
- }
- if obj_type not in model_mapping:
- raise q_exc.Invalid(_('Unknown object type: %s') % obj_type)
- try:
- if obj_type == 'health_monitor':
- self.plugin.update_pool_health_monitor(
- context, obj_id['monitor_id'], obj_id['pool_id'], status)
- else:
- self.plugin.update_status(
- context, model_mapping[obj_type], obj_id, status)
- except q_exc.NotFound:
- # update_status may come from agent on an object which was
- # already deleted from db with other request
- LOG.warning(_('Cannot update status: %(obj_type)s %(obj_id)s '
- 'not found in the DB, it was probably deleted '
- 'concurrently'),
- {'obj_type': obj_type, 'obj_id': obj_id})
-
- def pool_destroyed(self, context, pool_id=None):
- """Agent confirmation hook that a pool has been destroyed.
-
- This method exists for subclasses to change the deletion
- behavior.
- """
- pass
-
- def plug_vip_port(self, context, port_id=None, host=None):
- if not port_id:
- return
-
- try:
- port = self.plugin._core_plugin.get_port(
- context,
- port_id
- )
- except q_exc.PortNotFound:
- msg = _('Unable to find port %s to plug.')
- LOG.debug(msg, port_id)
- return
-
- port['admin_state_up'] = True
- port['device_owner'] = 'neutron:' + constants.LOADBALANCER
- port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host)))
- port[portbindings.HOST_ID] = host
- self.plugin._core_plugin.update_port(
- context,
- port_id,
- {'port': port}
- )
-
- def unplug_vip_port(self, context, port_id=None, host=None):
- if not port_id:
- return
-
- try:
- port = self.plugin._core_plugin.get_port(
- context,
- port_id
- )
- except q_exc.PortNotFound:
- msg = _('Unable to find port %s to unplug. This can occur when '
- 'the Vip has been deleted first.')
- LOG.debug(msg, port_id)
- return
-
- port['admin_state_up'] = False
- port['device_owner'] = ''
- port['device_id'] = ''
-
- try:
- self.plugin._core_plugin.update_port(
- context,
- port_id,
- {'port': port}
- )
-
- except q_exc.PortNotFound:
- msg = _('Unable to find port %s to unplug. This can occur when '
- 'the Vip has been deleted first.')
- LOG.debug(msg, port_id)
-
- def update_pool_stats(self, context, pool_id=None, stats=None, host=None):
- self.plugin.update_pool_stats(context, pool_id, data=stats)
-
-
-class LoadBalancerAgentApi(proxy.RpcProxy):
- """Plugin side of plugin to agent RPC API."""
-
- BASE_RPC_API_VERSION = '2.0'
- # history
- # 1.0 Initial version
- # 1.1 Support agent_updated call
- # 2.0 Generic API for agent based drivers
- # - modify/reload/destroy_pool methods were removed;
- # - added methods to handle create/update/delete for every lbaas
- # object individually;
-
- def __init__(self, topic):
- super(LoadBalancerAgentApi, self).__init__(
- topic, default_version=self.BASE_RPC_API_VERSION)
-
- def _cast(self, context, method_name, method_args, host, version=None):
- return self.cast(
- context,
- self.make_msg(method_name, **method_args),
- topic='%s.%s' % (self.topic, host),
- version=version
- )
-
- def create_vip(self, context, vip, host):
- return self._cast(context, 'create_vip', {'vip': vip}, host)
-
- def update_vip(self, context, old_vip, vip, host):
- return self._cast(context, 'update_vip',
- {'old_vip': old_vip, 'vip': vip}, host)
-
- def delete_vip(self, context, vip, host):
- return self._cast(context, 'delete_vip', {'vip': vip}, host)
-
- def create_pool(self, context, pool, host, driver_name):
- return self._cast(context, 'create_pool',
- {'pool': pool, 'driver_name': driver_name}, host)
-
- def update_pool(self, context, old_pool, pool, host):
- return self._cast(context, 'update_pool',
- {'old_pool': old_pool, 'pool': pool}, host)
-
- def delete_pool(self, context, pool, host):
- return self._cast(context, 'delete_pool', {'pool': pool}, host)
-
- def create_member(self, context, member, host):
- return self._cast(context, 'create_member', {'member': member}, host)
-
- def update_member(self, context, old_member, member, host):
- return self._cast(context, 'update_member',
- {'old_member': old_member, 'member': member}, host)
-
- def delete_member(self, context, member, host):
- return self._cast(context, 'delete_member', {'member': member}, host)
-
- def create_pool_health_monitor(self, context, health_monitor, pool_id,
- host):
- return self._cast(context, 'create_pool_health_monitor',
- {'health_monitor': health_monitor,
- 'pool_id': pool_id}, host)
-
- def update_pool_health_monitor(self, context, old_health_monitor,
- health_monitor, pool_id, host):
- return self._cast(context, 'update_pool_health_monitor',
- {'old_health_monitor': old_health_monitor,
- 'health_monitor': health_monitor,
- 'pool_id': pool_id}, host)
-
- def delete_pool_health_monitor(self, context, health_monitor, pool_id,
- host):
- return self._cast(context, 'delete_pool_health_monitor',
- {'health_monitor': health_monitor,
- 'pool_id': pool_id}, host)
-
- def agent_updated(self, context, admin_state_up, host):
- return self._cast(context, 'agent_updated',
- {'payload': {'admin_state_up': admin_state_up}},
- host)
-
-
-class AgentBasedPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
-
- # name of device driver that should be used by the agent;
- # vendor specific plugin drivers must override it;
- device_driver = None
-
- def __init__(self, plugin):
- if not self.device_driver:
- raise DriverNotSpecified()
-
- self.agent_rpc = LoadBalancerAgentApi(TOPIC_LOADBALANCER_AGENT)
-
- self.plugin = plugin
- self._set_callbacks_on_plugin()
- self.plugin.agent_notifiers.update(
- {q_const.AGENT_TYPE_LOADBALANCER: self.agent_rpc})
-
- self.pool_scheduler = importutils.import_object(
- cfg.CONF.loadbalancer_pool_scheduler_driver)
-
- def _set_callbacks_on_plugin(self):
- # other agent based plugin driver might already set callbacks on plugin
- if hasattr(self.plugin, 'agent_callbacks'):
- return
-
- self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin)
- self.plugin.conn = rpc.create_connection(new=True)
- self.plugin.conn.create_consumer(
- TOPIC_LOADBALANCER_PLUGIN,
- self.plugin.agent_callbacks.create_rpc_dispatcher(),
- fanout=False)
- self.plugin.conn.consume_in_thread()
-
- def get_pool_agent(self, context, pool_id):
- agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id)
- if not agent:
- raise lbaas_agentscheduler.NoActiveLbaasAgent(pool_id=pool_id)
- return agent['agent']
-
- def create_vip(self, context, vip):
- agent = self.get_pool_agent(context, vip['pool_id'])
- self.agent_rpc.create_vip(context, vip, agent['host'])
-
- def update_vip(self, context, old_vip, vip):
- agent = self.get_pool_agent(context, vip['pool_id'])
- if vip['status'] in constants.ACTIVE_PENDING:
- self.agent_rpc.update_vip(context, old_vip, vip, agent['host'])
- else:
- self.agent_rpc.delete_vip(context, vip, agent['host'])
-
- def delete_vip(self, context, vip):
- self.plugin._delete_db_vip(context, vip['id'])
- agent = self.get_pool_agent(context, vip['pool_id'])
- self.agent_rpc.delete_vip(context, vip, agent['host'])
-
- def create_pool(self, context, pool):
- agent = self.pool_scheduler.schedule(self.plugin, context, pool,
- self.device_driver)
- if not agent:
- raise lbaas_agentscheduler.NoEligibleLbaasAgent(pool_id=pool['id'])
- self.agent_rpc.create_pool(context, pool, agent['host'],
- self.device_driver)
-
- def update_pool(self, context, old_pool, pool):
- agent = self.get_pool_agent(context, pool['id'])
- if pool['status'] in constants.ACTIVE_PENDING:
- self.agent_rpc.update_pool(context, old_pool, pool,
- agent['host'])
- else:
- self.agent_rpc.delete_pool(context, pool, agent['host'])
-
- def delete_pool(self, context, pool):
- # get agent first to know host as binding will be deleted
- # after pool is deleted from db
- agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool['id'])
- self.plugin._delete_db_pool(context, pool['id'])
- if agent:
- self.agent_rpc.delete_pool(context, pool, agent['agent']['host'])
-
- def create_member(self, context, member):
- agent = self.get_pool_agent(context, member['pool_id'])
- self.agent_rpc.create_member(context, member, agent['host'])
-
- def update_member(self, context, old_member, member):
- agent = self.get_pool_agent(context, member['pool_id'])
- # member may change pool id
- if member['pool_id'] != old_member['pool_id']:
- old_pool_agent = self.plugin.get_lbaas_agent_hosting_pool(
- context, old_member['pool_id'])
- if old_pool_agent:
- self.agent_rpc.delete_member(context, old_member,
- old_pool_agent['agent']['host'])
- self.agent_rpc.create_member(context, member, agent['host'])
- else:
- self.agent_rpc.update_member(context, old_member, member,
- agent['host'])
-
- def delete_member(self, context, member):
- self.plugin._delete_db_member(context, member['id'])
- agent = self.get_pool_agent(context, member['pool_id'])
- self.agent_rpc.delete_member(context, member, agent['host'])
-
- def create_pool_health_monitor(self, context, healthmon, pool_id):
- # healthmon is not used here
- agent = self.get_pool_agent(context, pool_id)
- self.agent_rpc.create_pool_health_monitor(context, healthmon,
- pool_id, agent['host'])
-
- def update_pool_health_monitor(self, context, old_health_monitor,
- health_monitor, pool_id):
- agent = self.get_pool_agent(context, pool_id)
- self.agent_rpc.update_pool_health_monitor(context, old_health_monitor,
- health_monitor, pool_id,
- agent['host'])
-
- def delete_pool_health_monitor(self, context, health_monitor, pool_id):
- self.plugin._delete_db_pool_health_monitor(
- context, health_monitor['id'], pool_id
- )
-
- agent = self.get_pool_agent(context, pool_id)
- self.agent_rpc.delete_pool_health_monitor(context, health_monitor,
- pool_id, agent['host'])
- def stats(self, context, pool_id):
- pass
+from neutron.services.loadbalancer.drivers.common import agent_driver_base
+from neutron.services.loadbalancer.drivers.haproxy import namespace_driver
-class HaproxyOnHostPluginDriver(AgentBasedPluginDriver):
- #TODO(obondarev): change hardcoded driver name
- # to namespace_driver.DRIVER_NAME after moving HaproxyOnHostPluginDriver
- # to a separate file (follow-up patch)
- device_driver = 'haproxy_ns'
+class HaproxyOnHostPluginDriver(agent_driver_base.AgentDriverBase):
+ device_driver = namespace_driver.DRIVER_NAME
import mock
from oslo.config import cfg
-from neutron.services.loadbalancer.drivers.haproxy import agent
+from neutron.services.loadbalancer.agent import agent
from neutron.tests import base
import mock
from neutron.plugins.common import constants
-from neutron.services.loadbalancer.drivers.haproxy import (
- agent_manager as manager
-)
+from neutron.services.loadbalancer.agent import agent_manager as manager
from neutron.tests import base
self.mock_importer = mock.patch.object(manager, 'importutils').start()
rpc_mock_cls = mock.patch(
- 'neutron.services.loadbalancer.drivers'
- '.haproxy.agent_api.LbaasAgentApi'
+ 'neutron.services.loadbalancer.agent.agent_api.LbaasAgentApi'
).start()
self.mgr = manager.LbaasAgentManager(mock_conf)
import mock
-from neutron.services.loadbalancer.drivers.haproxy import (
- agent_api as api
-)
+from neutron.services.loadbalancer.agent import agent_api as api
from neutron.tests import base
from neutron import manager
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants
-from neutron.services.loadbalancer.drivers.haproxy import (
- plugin_driver
-)
+from neutron.services.loadbalancer.drivers.common import agent_driver_base
from neutron.tests import base
from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
from neutron.tests.unit import testlib_api
def setUp(self):
def reset_device_driver():
- plugin_driver.AgentBasedPluginDriver.device_driver = None
+ agent_driver_base.AgentDriverBase.device_driver = None
self.addCleanup(reset_device_driver)
self.mock_importer = mock.patch.object(
- plugin_driver, 'importutils').start()
+ agent_driver_base, 'importutils').start()
self.addCleanup(mock.patch.stopall)
# needed to reload provider configuration
st_db.ServiceTypeManager._instance = None
- plugin_driver.AgentBasedPluginDriver.device_driver = 'dummy'
+ agent_driver_base.AgentDriverBase.device_driver = 'dummy'
super(TestLoadBalancerPluginBase, self).setUp(
lbaas_provider=('LOADBALANCER:lbaas:neutron.services.'
- 'loadbalancer.drivers.haproxy.plugin_driver.'
- 'AgentBasedPluginDriver:default'))
+ 'loadbalancer.drivers.common.agent_driver_base.'
+ 'AgentDriverBase:default'))
# we need access to loaded plugins to modify models
loaded_plugins = manager.NeutronManager().get_service_plugins()
def setUp(self):
super(TestLoadBalancerCallbacks, self).setUp()
- self.callbacks = plugin_driver.LoadBalancerCallbacks(
+ self.callbacks = agent_driver_base.LoadBalancerCallbacks(
self.plugin_instance
)
get_lbaas_agents_patcher = mock.patch(
self.assertEqual('ACTIVE', p['status'])
def test_update_status_pool_deleted_already(self):
- with mock.patch.object(plugin_driver, 'LOG') as mock_log:
+ with mock.patch.object(agent_driver_base, 'LOG') as mock_log:
pool_id = 'deleted_pool'
ctx = context.get_admin_context()
self.assertRaises(loadbalancer.PoolNotFound,
super(TestLoadBalancerAgentApi, self).setUp()
self.addCleanup(mock.patch.stopall)
- self.api = plugin_driver.LoadBalancerAgentApi('topic')
+ self.api = agent_driver_base.LoadBalancerAgentApi('topic')
self.mock_cast = mock.patch.object(self.api, 'cast').start()
self.mock_msg = mock.patch.object(self.api, 'make_msg').start()
class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
def setUp(self):
- self.log = mock.patch.object(plugin_driver, 'LOG')
- api_cls = mock.patch.object(plugin_driver,
+ self.log = mock.patch.object(agent_driver_base, 'LOG')
+ api_cls = mock.patch.object(agent_driver_base,
'LoadBalancerAgentApi').start()
super(TestLoadBalancerPluginNotificationWrapper, self).setUp()
self.mock_api = api_cls.return_value
self.mock_get_driver = mock.patch.object(self.plugin_instance,
'_get_driver')
- self.mock_get_driver.return_value = (plugin_driver.
- AgentBasedPluginDriver(
+ self.mock_get_driver.return_value = (agent_driver_base.
+ AgentDriverBase(
self.plugin_instance
))
neutron-dhcp-agent = neutron.agent.dhcp_agent:main
neutron-hyperv-agent = neutron.plugins.hyperv.agent.hyperv_neutron_agent:main
neutron-l3-agent = neutron.agent.l3_agent:main
- neutron-lbaas-agent = neutron.services.loadbalancer.drivers.haproxy.agent:main
+ neutron-lbaas-agent = neutron.services.loadbalancer.agent.agent:main
neutron-linuxbridge-agent = neutron.plugins.linuxbridge.agent.linuxbridge_neutron_agent:main
neutron-metadata-agent = neutron.agent.metadata.agent:main
neutron-mlnx-agent = neutron.plugins.mlnx.agent.eswitch_neutron_agent:main
quantum-dhcp-agent = neutron.agent.dhcp_agent:main
quantum-hyperv-agent = neutron.plugins.hyperv.agent.hyperv_neutron_agent:main
quantum-l3-agent = neutron.agent.l3_agent:main
- quantum-lbaas-agent = neutron.services.loadbalancer.drivers.haproxy.agent:main
+ quantum-lbaas-agent = neutron.services.loadbalancer.agent.agent:main
quantum-linuxbridge-agent = neutron.plugins.linuxbridge.agent.linuxbridge_neutron_agent:main
quantum-metadata-agent = neutron.agent.metadata.agent:main
quantum-mlnx-agent = neutron.plugins.mlnx.agent.eswitch_neutron_agent:main