Unifies haproxy reference implementation to make common agent based plugin driver
which is suitable for all vendors who wants to use async mechanism.
- Agent API as well as device driver API changed to handle
loadbalancer objects individually;
- Agent loads device drivers according to config;
- LogicalDeviceCache class was removed from agent as it was used only
as a list - to put and remove entries ant check whether entry is in or not.
It was replaced with instance_mapping dict in agent to store known instances and
corresponding device_drivers;
- Agent reports which device drivers are supported (needs for scheduling on plugin side);
- Agent-to-plugin API was extended to provide an ability for agent to update
statuses of pools/vips/members/health_monitors;
- Vendor should only implement device driver; plugin driver just needs
to inherit AgentBasedPluginDriver and override device_driver member;
- This patch doesn't move files to make review easier;
all rename/replace will be done in a subsequent patch;
DocImpact
NOTE: Since the change in the agent RPC API is backward-incompatible
(major RPC version change), LBaaS server-agent communications will be
completely broken until both sides are upgraded so users will be unable to
create new or update existing HAProxy loadbalancer instances during upgrade
Implements blueprint lbaas-common-agent-driver
Change-Id: I9fd90a1321611d202ef838681273081fa6c1686a
# Example of interface_driver option for LinuxBridge
# interface_driver = neutron.agent.linux.interface.BridgeInterfaceDriver
-# The agent requires a driver to manage the loadbalancer. HAProxy is the
-# opensource version.
+# The agent requires drivers to manage the loadbalancer. HAProxy is the opensource version.
+# Multiple device drivers reflecting different service providers could be specified:
+# device_driver = path.to.provider1.driver.Driver
+# device_driver = path.to.provider2.driver.Driver
+# Default is:
# device_driver = neutron.services.loadbalancer.drivers.haproxy.namespace_driver.HaproxyNSDriver
+[haproxy]
+# Location to store config and state files
+# loadbalancer_state_path = $state_path/lbaas
+
# The user group
# user_group = nogroup
else:
return {'pools': []}
+ def get_lbaas_agent_candidates(self, device_driver, active_agents):
+ candidates = []
+ for agent in active_agents:
+ agent_conf = self.get_configuration_dict(agent)
+ if device_driver in agent_conf['device_drivers']:
+ candidates.append(agent)
+ return candidates
+
class ChanceScheduler(object):
"""Allocate a loadbalancer agent for a vip in a random way."""
- def schedule(self, plugin, context, pool):
+ def schedule(self, plugin, context, pool, device_driver):
"""Schedule the pool to an active loadbalancer agent if there
is no enabled agent hosting it.
"""
'agent_id': lbaas_agent['id']})
return
- candidates = plugin.get_lbaas_agents(context, active=True)
- if not candidates:
+ active_agents = plugin.get_lbaas_agents(context, active=True)
+ if not active_agents:
LOG.warn(_('No active lbaas agents for pool %s'), pool['id'])
return
+ candidates = plugin.get_lbaas_agent_candidates(device_driver,
+ active_agents)
+ if not candidates:
+ LOG.warn(_('No lbaas agent supporting device driver %s'),
+ device_driver)
+ return
+
chosen_agent = random.choice(candidates)
binding = PoolLoadbalancerAgentBinding()
binding.agent = chosen_agent
pass
@abc.abstractmethod
- def update_health_monitor(self, context,
- old_health_monitor,
- health_monitor,
- pool_id):
+ def update_pool_health_monitor(self, context,
+ old_health_monitor,
+ health_monitor,
+ pool_id):
pass
@abc.abstractmethod
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 OpenStack Foundation. All rights reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+
+import six
+
+
+@six.add_metaclass(abc.ABCMeta)
+class AgentDeviceDriver(object):
+ """Abstract device driver that defines the API required by LBaaS agent."""
+
+ @abc.abstractmethod
+ def get_name(cls):
+ """Returns unique name across all LBaaS device drivers."""
+ pass
+
+ @abc.abstractmethod
+ def deploy_instance(self, logical_config):
+ """Fully deploys a loadbalancer instance from a given config."""
+ pass
+
+ @abc.abstractmethod
+ def undeploy_instance(self, pool_id):
+ """Fully undeploys the loadbalancer instance."""
+ pass
+
+ @abc.abstractmethod
+ def get_stats(self, pool_id):
+ pass
+
+ def remove_orphans(self, known_pool_ids):
+ # Not all drivers will support this
+ raise NotImplementedError()
+
+ @abc.abstractmethod
+ def create_vip(self, vip):
+ pass
+
+ @abc.abstractmethod
+ def update_vip(self, old_vip, vip):
+ pass
+
+ @abc.abstractmethod
+ def delete_vip(self, vip):
+ pass
+
+ @abc.abstractmethod
+ def create_pool(self, pool):
+ pass
+
+ @abc.abstractmethod
+ def update_pool(self, old_pool, pool):
+ pass
+
+ @abc.abstractmethod
+ def delete_pool(self, pool):
+ pass
+
+ @abc.abstractmethod
+ def create_member(self, member):
+ pass
+
+ @abc.abstractmethod
+ def update_member(self, old_member, member):
+ pass
+
+ @abc.abstractmethod
+ def delete_member(self, context, member):
+ pass
+
+ @abc.abstractmethod
+ def create_pool_health_monitor(self, health_monitor, pool_id):
+ pass
+
+ @abc.abstractmethod
+ def update_pool_health_monitor(self,
+ old_health_monitor,
+ health_monitor,
+ pool_id):
+ pass
+
+ @abc.abstractmethod
+ def delete_pool_health_monitor(self, context, health_monitor, pool_id):
+ pass
class LbaasAgentApi(proxy.RpcProxy):
"""Agent side of the Agent to Plugin RPC API."""
- API_VERSION = '1.0'
+ API_VERSION = '2.0'
+ # history
+ # 1.0 Initial version
+ # 2.0 Generic API for agent based drivers
+ # - get_logical_device() handling changed on plugin side;
+ # - pool_deployed() and update_status() methods added;
def __init__(self, topic, context, host):
super(LbaasAgentApi, self).__init__(topic, self.API_VERSION)
topic=self.topic
)
+ def pool_destroyed(self, pool_id):
+ return self.call(
+ self.context,
+ self.make_msg('pool_destroyed', pool_id=pool_id),
+ topic=self.topic
+ )
+
+ def pool_deployed(self, pool_id):
+ return self.call(
+ self.context,
+ self.make_msg('pool_deployed', pool_id=pool_id),
+ topic=self.topic
+ )
+
def get_logical_device(self, pool_id):
return self.call(
self.context,
self.make_msg(
'get_logical_device',
- pool_id=pool_id,
- host=self.host
+ pool_id=pool_id
),
topic=self.topic
)
- def pool_destroyed(self, pool_id):
+ def update_status(self, obj_type, obj_id, status):
return self.call(
self.context,
- self.make_msg('pool_destroyed', pool_id=pool_id, host=self.host),
+ self.make_msg('update_status', obj_type=obj_type, obj_id=obj_id,
+ status=status),
topic=self.topic
)
#
# @author: Mark McClain, DreamHost
-import weakref
-
from oslo.config import cfg
-from neutron.agent.common import config
from neutron.agent import rpc as agent_rpc
-from neutron.common import constants
+from neutron.common import constants as n_const
+from neutron.common import exceptions as n_exc
from neutron import context
from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common import periodic_task
+from neutron.plugins.common import constants
from neutron.services.loadbalancer.drivers.haproxy import (
agent_api,
plugin_driver
)
LOG = logging.getLogger(__name__)
-NS_PREFIX = 'qlbaas-'
OPTS = [
- cfg.StrOpt(
+ cfg.MultiStrOpt(
'device_driver',
- default=('neutron.services.loadbalancer.drivers'
- '.haproxy.namespace_driver.HaproxyNSDriver'),
- help=_('The driver used to manage the loadbalancing device'),
- ),
- cfg.StrOpt(
- 'loadbalancer_state_path',
- default='$state_path/lbaas',
- help=_('Location to store config and state files'),
+ default=['neutron.services.loadbalancer.drivers'
+ '.haproxy.namespace_driver.HaproxyNSDriver'],
+ help=_('Drivers used to manage loadbalancing devices'),
),
cfg.StrOpt(
'interface_driver',
help=_('The driver used to manage the virtual interface')
),
- cfg.StrOpt(
- 'user_group',
- default='nogroup',
- help=_('The user group'),
- ),
]
-class LogicalDeviceCache(object):
- """Manage a cache of known devices."""
-
- class Device(object):
- """Inner classes used to hold values for weakref lookups."""
- def __init__(self, port_id, pool_id):
- self.port_id = port_id
- self.pool_id = pool_id
-
- def __eq__(self, other):
- return self.__dict__ == other.__dict__
-
- def __hash__(self):
- return hash((self.port_id, self.pool_id))
-
- def __init__(self):
- self.devices = set()
- self.port_lookup = weakref.WeakValueDictionary()
- self.pool_lookup = weakref.WeakValueDictionary()
-
- def put(self, device):
- port_id = device['vip']['port_id']
- pool_id = device['pool']['id']
- d = self.Device(device['vip']['port_id'], device['pool']['id'])
- if d not in self.devices:
- self.devices.add(d)
- self.port_lookup[port_id] = d
- self.pool_lookup[pool_id] = d
-
- def remove(self, device):
- if not isinstance(device, self.Device):
- device = self.Device(
- device['vip']['port_id'], device['pool']['id']
- )
- if device in self.devices:
- self.devices.remove(device)
-
- def remove_by_pool_id(self, pool_id):
- d = self.pool_lookup.get(pool_id)
- if d:
- self.devices.remove(d)
-
- def get_by_pool_id(self, pool_id):
- return self.pool_lookup.get(pool_id)
-
- def get_by_port_id(self, port_id):
- return self.port_lookup.get(port_id)
-
- def get_pool_ids(self):
- return self.pool_lookup.keys()
+class DeviceNotFoundOnAgent(n_exc.NotFound):
+ msg = _('Unknown device with pool_id %(pool_id)s')
class LbaasAgentManager(periodic_task.PeriodicTasks):
+ RPC_API_VERSION = '2.0'
# history
# 1.0 Initial version
# 1.1 Support agent_updated call
- RPC_API_VERSION = '1.1'
+ # 2.0 Generic API for agent based drivers
+ # - modify/reload/destroy_pool methods were removed;
+ # - added methods to handle create/update/delete for every lbaas
+ # object individually;
def __init__(self, conf):
self.conf = conf
- try:
- vif_driver = importutils.import_object(conf.interface_driver, conf)
- except ImportError:
- msg = _('Error importing interface driver: %s')
- raise SystemExit(msg % conf.interface_driver)
-
- try:
- self.driver = importutils.import_object(
- conf.device_driver,
- config.get_root_helper(self.conf),
- conf.loadbalancer_state_path,
- vif_driver,
- self._vip_plug_callback
- )
- except ImportError:
- msg = _('Error importing loadbalancer device driver: %s')
- raise SystemExit(msg % conf.device_driver)
+ self.context = context.get_admin_context_without_session()
+ self.plugin_rpc = agent_api.LbaasAgentApi(
+ plugin_driver.TOPIC_LOADBALANCER_PLUGIN,
+ self.context,
+ self.conf.host
+ )
+ self._load_drivers()
self.agent_state = {
'binary': 'neutron-lbaas-agent',
'host': conf.host,
'topic': plugin_driver.TOPIC_LOADBALANCER_AGENT,
- 'configurations': {'device_driver': conf.device_driver,
- 'interface_driver': conf.interface_driver},
- 'agent_type': constants.AGENT_TYPE_LOADBALANCER,
+ 'configurations': {'device_drivers': self.device_drivers.keys()},
+ 'agent_type': n_const.AGENT_TYPE_LOADBALANCER,
'start_flag': True}
self.admin_state_up = True
- self.context = context.get_admin_context_without_session()
- self._setup_rpc()
+ self._setup_state_rpc()
self.needs_resync = False
- self.cache = LogicalDeviceCache()
+ # pool_id->device_driver_name mapping used to store known instances
+ self.instance_mapping = {}
- def _setup_rpc(self):
- self.plugin_rpc = agent_api.LbaasAgentApi(
- plugin_driver.TOPIC_PROCESS_ON_HOST,
- self.context,
- self.conf.host
- )
+ def _load_drivers(self):
+ self.device_drivers = {}
+ for driver in self.conf.device_driver:
+ try:
+ driver_inst = importutils.import_object(
+ driver,
+ self.conf,
+ self.plugin_rpc
+ )
+ except ImportError:
+ msg = _('Error importing loadbalancer device driver: %s')
+ raise SystemExit(msg % driver)
+
+ driver_name = driver_inst.get_name()
+ if driver_name not in self.device_drivers:
+ self.device_drivers[driver_name] = driver_inst
+ else:
+ msg = _('Multiple device drivers with the same name found: %s')
+ raise SystemExit(msg % driver_name)
+
+ def _setup_state_rpc(self):
self.state_rpc = agent_rpc.PluginReportStateAPI(
- plugin_driver.TOPIC_PROCESS_ON_HOST)
+ plugin_driver.TOPIC_LOADBALANCER_PLUGIN)
report_interval = self.conf.AGENT.report_interval
if report_interval:
heartbeat = loopingcall.FixedIntervalLoopingCall(
def _report_state(self):
try:
- device_count = len(self.cache.devices)
- self.agent_state['configurations']['devices'] = device_count
+ instance_count = len(self.instance_mapping)
+ self.agent_state['configurations']['instances'] = instance_count
self.state_rpc.report_state(self.context,
self.agent_state)
self.agent_state.pop('start_flag', None)
@periodic_task.periodic_task(spacing=6)
def collect_stats(self, context):
- for pool_id in self.cache.get_pool_ids():
+ for pool_id, driver_name in self.instance_mapping.items():
+ driver = self.device_drivers[driver_name]
try:
- stats = self.driver.get_stats(pool_id)
+ stats = driver.get_stats(pool_id)
if stats:
self.plugin_rpc.update_pool_stats(pool_id, stats)
except Exception:
LOG.exception(_('Error upating stats'))
self.needs_resync = True
- def _vip_plug_callback(self, action, port):
- if action == 'plug':
- self.plugin_rpc.plug_vip_port(port['id'])
- elif action == 'unplug':
- self.plugin_rpc.unplug_vip_port(port['id'])
-
def sync_state(self):
- known_devices = set(self.cache.get_pool_ids())
+ known_instances = set(self.instance_mapping.keys())
try:
- ready_logical_devices = set(self.plugin_rpc.get_ready_devices())
+ ready_instances = set(self.plugin_rpc.get_ready_devices())
- for deleted_id in known_devices - ready_logical_devices:
- self.destroy_device(deleted_id)
+ for deleted_id in known_instances - ready_instances:
+ self._destroy_pool(deleted_id)
- for pool_id in ready_logical_devices:
- self.refresh_device(pool_id)
+ for pool_id in ready_instances:
+ self._reload_pool(pool_id)
except Exception:
LOG.exception(_('Unable to retrieve ready devices'))
self.remove_orphans()
- def refresh_device(self, pool_id):
+ def _get_driver(self, pool_id):
+ if pool_id not in self.instance_mapping:
+ raise DeviceNotFoundOnAgent(pool_id=pool_id)
+
+ driver_name = self.instance_mapping[pool_id]
+ return self.device_drivers[driver_name]
+
+ def _reload_pool(self, pool_id):
try:
logical_config = self.plugin_rpc.get_logical_device(pool_id)
-
- if self.driver.exists(pool_id):
- self.driver.update(logical_config)
- else:
- self.driver.create(logical_config)
- self.cache.put(logical_config)
+ driver_name = logical_config['driver']
+ if driver_name not in self.device_drivers:
+ LOG.error(_('No device driver '
+ 'on agent: %s.'), driver_name)
+ self.plugin_rpc.update_status(
+ 'pool', pool_id, constants.ERROR)
+ return
+
+ self.device_drivers[driver_name].deploy_instance(logical_config)
+ self.instance_mapping[pool_id] = driver_name
+ self.plugin_rpc.pool_deployed(pool_id)
except Exception:
- LOG.exception(_('Unable to refresh device for pool: %s'), pool_id)
+ LOG.exception(_('Unable to deploy instance for pool: %s'), pool_id)
self.needs_resync = True
- def destroy_device(self, pool_id):
- device = self.cache.get_by_pool_id(pool_id)
- if not device:
- return
+ def _destroy_pool(self, pool_id):
+ driver = self._get_driver(pool_id)
try:
- self.driver.destroy(pool_id)
+ driver.undeploy_instance(pool_id)
+ del self.instance_mapping[pool_id]
self.plugin_rpc.pool_destroyed(pool_id)
except Exception:
LOG.exception(_('Unable to destroy device for pool: %s'), pool_id)
self.needs_resync = True
- self.cache.remove(device)
def remove_orphans(self):
+ for driver_name in self.device_drivers:
+ pool_ids = [pool_id for pool_id in self.instance_mapping
+ if self.instance_mapping[pool_id] == driver_name]
+ try:
+ self.device_drivers[driver_name].remove_orphans(pool_ids)
+ except NotImplementedError:
+ pass # Not all drivers will support this
+
+ def _handle_failed_driver_call(self, operation, obj_type, obj_id, driver):
+ LOG.exception(_('%(operation)s %(obj)s %(id)s failed on device driver '
+ '%(driver)s'),
+ {'operation': operation.capitalize(), 'obj': obj_type,
+ 'id': obj_id, 'driver': driver})
+ self.plugin_rpc.update_status(obj_type, obj_id, constants.ERROR)
+
+ def create_vip(self, context, vip):
+ driver = self._get_driver(vip['pool_id'])
try:
- self.driver.remove_orphans(self.cache.get_pool_ids())
- except NotImplementedError:
- pass # Not all drivers will support this
+ driver.create_vip(vip)
+ except Exception:
+ self._handle_failed_driver_call('create', 'vip', vip['id'],
+ driver.get_name())
+ else:
+ self.plugin_rpc.update_status('vip', vip['id'], constants.ACTIVE)
- def reload_pool(self, context, pool_id=None, host=None):
- """Handle RPC cast from plugin to reload a pool."""
- if pool_id:
- self.refresh_device(pool_id)
+ def update_vip(self, context, old_vip, vip):
+ driver = self._get_driver(vip['pool_id'])
+ try:
+ driver.update_vip(old_vip, vip)
+ except Exception:
+ self._handle_failed_driver_call('update', 'vip', vip['id'],
+ driver.get_name())
+ else:
+ self.plugin_rpc.update_status('vip', vip['id'], constants.ACTIVE)
+
+ def delete_vip(self, context, vip):
+ driver = self._get_driver(vip['pool_id'])
+ driver.delete_vip(vip)
+
+ def create_pool(self, context, pool, driver_name):
+ if driver_name not in self.device_drivers:
+ LOG.error(_('No device driver on agent: %s.'), driver_name)
+ self.plugin_rpc.update_status('pool', pool['id'], constants.ERROR)
+ return
- def modify_pool(self, context, pool_id=None, host=None):
- """Handle RPC cast from plugin to modify a pool if known to agent."""
- if self.cache.get_by_pool_id(pool_id):
- self.refresh_device(pool_id)
+ driver = self.device_drivers[driver_name]
+ try:
+ driver.create_pool(pool)
+ except Exception:
+ self._handle_failed_driver_call('create', 'pool', pool['id'],
+ driver.get_name())
+ else:
+ self.instance_mapping[pool['id']] = driver_name
+ self.plugin_rpc.update_status('pool', pool['id'], constants.ACTIVE)
+
+ def update_pool(self, context, old_pool, pool):
+ driver = self._get_driver(pool['id'])
+ try:
+ driver.update_pool(old_pool, pool)
+ except Exception:
+ self._handle_failed_driver_call('update', 'pool', pool['id'],
+ driver.get_name())
+ else:
+ self.plugin_rpc.update_status('pool', pool['id'], constants.ACTIVE)
+
+ def delete_pool(self, context, pool):
+ driver = self._get_driver(pool['id'])
+ driver.delete_pool(pool)
+ del self.instance_mapping[pool['id']]
+
+ def create_member(self, context, member):
+ driver = self._get_driver(member['pool_id'])
+ try:
+ driver.create_member(member)
+ except Exception:
+ self._handle_failed_driver_call('create', 'member', member['id'],
+ driver.get_name())
+ else:
+ self.plugin_rpc.update_status('member', member['id'],
+ constants.ACTIVE)
+
+ def update_member(self, context, old_member, member):
+ driver = self._get_driver(member['pool_id'])
+ try:
+ driver.update_member(old_member, member)
+ except Exception:
+ self._handle_failed_driver_call('update', 'member', member['id'],
+ driver.get_name())
+ else:
+ self.plugin_rpc.update_status('member', member['id'],
+ constants.ACTIVE)
+
+ def delete_member(self, context, member):
+ driver = self._get_driver(member['pool_id'])
+ driver.delete_member(member)
+
+ def create_pool_health_monitor(self, context, health_monitor, pool_id):
+ driver = self._get_driver(pool_id)
+ assoc_id = {'pool_id': pool_id, 'monitor_id': health_monitor['id']}
+ try:
+ driver.create_pool_health_monitor(health_monitor, pool_id)
+ except Exception:
+ self._handle_failed_driver_call(
+ 'create', 'health_monitor', assoc_id, driver.get_name())
+ else:
+ self.plugin_rpc.update_status(
+ 'health_monitor', assoc_id, constants.ACTIVE)
+
+ def update_pool_health_monitor(self, context, old_health_monitor,
+ health_monitor, pool_id):
+ driver = self._get_driver(pool_id)
+ assoc_id = {'pool_id': pool_id, 'monitor_id': health_monitor['id']}
+ try:
+ driver.update_pool_health_monitor(old_health_monitor,
+ health_monitor,
+ pool_id)
+ except Exception:
+ self._handle_failed_driver_call(
+ 'update', 'health_monitor', assoc_id, driver.get_name())
+ else:
+ self.plugin_rpc.update_status(
+ 'health_monitor', assoc_id, constants.ACTIVE)
- def destroy_pool(self, context, pool_id=None, host=None):
- """Handle RPC cast from plugin to destroy a pool if known to agent."""
- if self.cache.get_by_pool_id(pool_id):
- self.destroy_device(pool_id)
+ def delete_pool_health_monitor(self, context, health_monitor, pool_id):
+ driver = self._get_driver(pool_id)
+ driver.delete_pool_health_monitor(health_monitor, pool_id)
def agent_updated(self, context, payload):
"""Handle the agent_updated notification event."""
if self.admin_state_up:
self.needs_resync = True
else:
- for pool_id in self.cache.get_pool_ids():
- self.destroy_device(pool_id)
- LOG.info(_("agent_updated by server side %s!"), payload)
+ for pool_id in self.instance_mapping.keys():
+ LOG.info(_("Destroying pool %s due to agent disabling"),
+ pool_id)
+ self._destroy_pool(pool_id)
+ LOG.info(_("Agent_updated by server side %s!"), payload)
import itertools
-from oslo.config import cfg
-
from neutron.agent.linux import utils
from neutron.plugins.common import constants as qconstants
from neutron.services.loadbalancer import constants
INACTIVE = qconstants.INACTIVE
-def save_config(conf_path, logical_config, socket_path=None):
+def save_config(conf_path, logical_config, socket_path=None,
+ user_group='nogroup'):
"""Convert a logical configuration to the HAProxy version."""
data = []
- data.extend(_build_global(logical_config, socket_path=socket_path))
+ data.extend(_build_global(logical_config, socket_path=socket_path,
+ user_group=user_group))
data.extend(_build_defaults(logical_config))
data.extend(_build_frontend(logical_config))
data.extend(_build_backend(logical_config))
utils.replace_file(conf_path, '\n'.join(data))
-def _build_global(config, socket_path=None):
+def _build_global(config, socket_path=None, user_group='nogroup'):
opts = [
'daemon',
'user nobody',
- 'group %s' % cfg.CONF.user_group,
+ 'group %s' % user_group,
'log /dev/log local0',
'log /dev/log local1 notice'
]
import socket
import netaddr
+from oslo.config import cfg
+from neutron.agent.common import config
from neutron.agent.linux import ip_lib
from neutron.agent.linux import utils
from neutron.common import exceptions
+from neutron.openstack.common import importutils
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants
from neutron.services.loadbalancer import constants as lb_const
+from neutron.services.loadbalancer.drivers import agent_device_driver
from neutron.services.loadbalancer.drivers.haproxy import cfg as hacfg
LOG = logging.getLogger(__name__)
NS_PREFIX = 'qlbaas-'
+DRIVER_NAME = 'haproxy_ns'
+
+ACTIVE_PENDING = (
+ constants.ACTIVE,
+ constants.PENDING_CREATE,
+ constants.PENDING_UPDATE
+)
+
+STATE_PATH_DEFAULT = '$state_path/lbaas'
+USER_GROUP_DEFAULT = 'nogroup'
+OPTS = [
+ cfg.StrOpt(
+ 'loadbalancer_state_path',
+ default=STATE_PATH_DEFAULT,
+ help=_('Location to store config and state files'),
+ deprecated_opts=[cfg.DeprecatedOpt('loadbalancer_state_path')],
+ ),
+ cfg.StrOpt(
+ 'user_group',
+ default=USER_GROUP_DEFAULT,
+ help=_('The user group'),
+ deprecated_opts=[cfg.DeprecatedOpt('user_group')],
+ )
+]
+cfg.CONF.register_opts(OPTS, 'haproxy')
+
+
+class HaproxyNSDriver(agent_device_driver.AgentDeviceDriver):
+ def __init__(self, conf, plugin_rpc):
+ self.conf = conf
+ self.root_helper = config.get_root_helper(conf)
+ self.state_path = conf.haproxy.loadbalancer_state_path
+ try:
+ vif_driver = importutils.import_object(conf.interface_driver, conf)
+ except ImportError:
+ msg = (_('Error importing interface driver: %s')
+ % conf.haproxy.interface_driver)
+ LOG.error(msg)
+ raise
-
-class HaproxyNSDriver(object):
- def __init__(self, root_helper, state_path, vif_driver, vip_plug_callback):
- self.root_helper = root_helper
- self.state_path = state_path
self.vif_driver = vif_driver
- self.vip_plug_callback = vip_plug_callback
+ self.plugin_rpc = plugin_rpc
self.pool_to_port_id = {}
+ @classmethod
+ def get_name(cls):
+ return DRIVER_NAME
+
def create(self, logical_config):
pool_id = logical_config['pool']['id']
namespace = get_ns_name(pool_id)
conf_path = self._get_state_file_path(pool_id, 'conf')
pid_path = self._get_state_file_path(pool_id, 'pid')
sock_path = self._get_state_file_path(pool_id, 'sock')
+ user_group = self.conf.haproxy.user_group
- hacfg.save_config(conf_path, logical_config, sock_path)
+ hacfg.save_config(conf_path, logical_config, sock_path, user_group)
cmd = ['haproxy', '-f', conf_path, '-p', pid_path]
cmd.extend(extra_cmd_args)
# remember the pool<>port mapping
self.pool_to_port_id[pool_id] = logical_config['vip']['port']['id']
- def destroy(self, pool_id):
+ def undeploy_instance(self, pool_id):
namespace = get_ns_name(pool_id)
ns = ip_lib.IPWrapper(self.root_helper, namespace)
pid_path = self._get_state_file_path(pool_id, 'pid')
return res_stats
- def remove_orphans(self, known_pool_ids):
- raise NotImplementedError()
-
def _get_state_file_path(self, pool_id, kind, ensure_state_dir=True):
"""Returns the file name for a given kind of config file."""
confs_dir = os.path.abspath(os.path.normpath(self.state_path))
return os.path.join(conf_dir, kind)
def _plug(self, namespace, port, reuse_existing=True):
- self.vip_plug_callback('plug', port)
+ self.plugin_rpc.plug_vip_port(port['id'])
interface_name = self.vif_driver.get_device_name(Wrap(port))
if ip_lib.device_exists(interface_name, self.root_helper, namespace):
def _unplug(self, namespace, port_id):
port_stub = {'id': port_id}
- self.vip_plug_callback('unplug', port_stub)
+ self.plugin_rpc.unplug_vip_port(port_id)
interface_name = self.vif_driver.get_device_name(Wrap(port_stub))
self.vif_driver.unplug(interface_name, namespace=namespace)
+ def deploy_instance(self, logical_config):
+ # do actual deploy only if vip is configured and active
+ if ('vip' not in logical_config or
+ logical_config['vip']['status'] not in ACTIVE_PENDING or
+ not logical_config['vip']['admin_state_up']):
+ return
+
+ if self.exists(logical_config['pool']['id']):
+ self.update(logical_config)
+ else:
+ self.create(logical_config)
+
+ def _refresh_device(self, pool_id):
+ logical_config = self.plugin_rpc.get_logical_device(pool_id)
+ self.deploy_instance(logical_config)
+
+ def create_vip(self, vip):
+ self._refresh_device(vip['pool_id'])
+
+ def update_vip(self, old_vip, vip):
+ self._refresh_device(vip['pool_id'])
+
+ def delete_vip(self, vip):
+ self.undeploy_instance(vip['pool_id'])
+
+ def create_pool(self, pool):
+ # nothing to do here because a pool needs a vip to be useful
+ pass
+
+ def update_pool(self, old_pool, pool):
+ self._refresh_device(pool['id'])
+
+ def delete_pool(self, pool):
+ # delete_pool may be called before vip deletion in case
+ # pool's admin state set to down
+ if self.exists(pool['id']):
+ self.undeploy_instance(pool['id'])
+
+ def create_member(self, member):
+ self._refresh_device(member['pool_id'])
+
+ def update_member(self, old_member, member):
+ self._refresh_device(member['pool_id'])
+
+ def delete_member(self, member):
+ self._refresh_device(member['pool_id'])
+
+ def create_pool_health_monitor(self, health_monitor, pool_id):
+ self._refresh_device(pool_id)
+
+ def update_pool_health_monitor(self, old_health_monitor, health_monitor,
+ pool_id):
+ self._refresh_device(pool_id)
+
+ def delete_pool_health_monitor(self, health_monitor, pool_id):
+ self._refresh_device(pool_id)
+
# NOTE (markmcclain) For compliance with interface.py which expects objects
class Wrap(object):
cfg.CONF.register_opts(AGENT_SCHEDULER_OPTS)
# topic name for this particular agent implementation
-TOPIC_PROCESS_ON_HOST = 'q-lbaas-process-on-host'
-TOPIC_LOADBALANCER_AGENT = 'lbaas_process_on_host_agent'
+TOPIC_LOADBALANCER_PLUGIN = 'n-lbaas-plugin'
+TOPIC_LOADBALANCER_AGENT = 'n-lbaas_agent'
+
+
+class DriverNotSpecified(q_exc.NeutronException):
+ message = _("Device driver for agent should be specified "
+ "in plugin driver.")
class LoadBalancerCallbacks(object):
- RPC_API_VERSION = '1.0'
+ RPC_API_VERSION = '2.0'
+ # history
+ # 1.0 Initial version
+ # 2.0 Generic API for agent based drivers
+ # - get_logical_device() handling changed;
+ # - pool_deployed() and update_status() methods added;
def __init__(self, plugin):
self.plugin = plugin
def get_ready_devices(self, context, host=None):
with context.session.begin(subtransactions=True):
- qry = (context.session.query(loadbalancer_db.Pool.id).
- join(loadbalancer_db.Vip))
-
- qry = qry.filter(loadbalancer_db.Vip.status.in_(ACTIVE_PENDING))
- qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING))
- up = True # makes pep8 and sqlalchemy happy
- qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up)
- qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
agents = self.plugin.get_lbaas_agents(context,
filters={'host': [host]})
if not agents:
return []
elif len(agents) > 1:
LOG.warning(_('Multiple lbaas agents found on host %s'), host)
-
pools = self.plugin.list_pools_on_lbaas_agent(context,
agents[0].id)
pool_ids = [pool['id'] for pool in pools['pools']]
+
+ qry = context.session.query(loadbalancer_db.Pool.id)
qry = qry.filter(loadbalancer_db.Pool.id.in_(pool_ids))
+ qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING))
+ up = True # makes pep8 and sqlalchemy happy
+ qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
return [id for id, in qry]
- def get_logical_device(self, context, pool_id=None, activate=True,
- **kwargs):
+ def get_logical_device(self, context, pool_id=None):
with context.session.begin(subtransactions=True):
qry = context.session.query(loadbalancer_db.Pool)
qry = qry.filter_by(id=pool_id)
pool = qry.one()
- if activate:
- # set all resources to active
- if pool.status in ACTIVE_PENDING:
- pool.status = constants.ACTIVE
-
- if pool.vip.status in ACTIVE_PENDING:
- pool.vip.status = constants.ACTIVE
-
- for m in pool.members:
- if m.status in ACTIVE_PENDING:
- m.status = constants.ACTIVE
-
- for hm in pool.monitors:
- if hm.status in ACTIVE_PENDING:
- hm.status = constants.ACTIVE
-
- if (pool.status != constants.ACTIVE
- or pool.vip.status != constants.ACTIVE):
- raise q_exc.Invalid(_('Expected active pool and vip'))
+ if pool.status != constants.ACTIVE:
+ raise q_exc.Invalid(_('Expected active pool'))
retval = {}
retval['pool'] = self.plugin._make_pool_dict(pool)
- retval['vip'] = self.plugin._make_vip_dict(pool.vip)
- retval['vip']['port'] = (
- self.plugin._core_plugin._make_port_dict(pool.vip.port)
- )
- for fixed_ip in retval['vip']['port']['fixed_ips']:
- fixed_ip['subnet'] = (
- self.plugin._core_plugin.get_subnet(
- context,
- fixed_ip['subnet_id']
- )
+
+ if pool.vip:
+ retval['vip'] = self.plugin._make_vip_dict(pool.vip)
+ retval['vip']['port'] = (
+ self.plugin._core_plugin._make_port_dict(pool.vip.port)
)
+ for fixed_ip in retval['vip']['port']['fixed_ips']:
+ fixed_ip['subnet'] = (
+ self.plugin._core_plugin.get_subnet(
+ context,
+ fixed_ip['subnet_id']
+ )
+ )
retval['members'] = [
self.plugin._make_member_dict(m)
for m in pool.members if m.status in (constants.ACTIVE,
for hm in pool.monitors
if hm.status == constants.ACTIVE
]
+ retval['driver'] = (
+ self.plugin.drivers[pool.provider.provider_name].device_driver)
return retval
- def pool_destroyed(self, context, pool_id=None, host=None):
+ def pool_deployed(self, context, pool_id):
+ with context.session.begin(subtransactions=True):
+ qry = context.session.query(loadbalancer_db.Pool)
+ qry = qry.filter_by(id=pool_id)
+ pool = qry.one()
+
+ # set all resources to active
+ if pool.status in ACTIVE_PENDING:
+ pool.status = constants.ACTIVE
+
+ if pool.vip and pool.vip.status in ACTIVE_PENDING:
+ pool.vip.status = constants.ACTIVE
+
+ for m in pool.members:
+ if m.status in ACTIVE_PENDING:
+ m.status = constants.ACTIVE
+
+ for hm in pool.monitors:
+ if hm.status in ACTIVE_PENDING:
+ hm.status = constants.ACTIVE
+
+ def update_status(self, context, obj_type, obj_id, status):
+ model_mapping = {
+ 'pool': loadbalancer_db.Pool,
+ 'vip': loadbalancer_db.Vip,
+ 'member': loadbalancer_db.Member,
+ 'health_monitor': loadbalancer_db.PoolMonitorAssociation
+ }
+ if obj_type not in model_mapping:
+ raise q_exc.Invalid(_('Unknown object type: %s') % obj_type)
+ elif obj_type == 'health_monitor':
+ self.plugin.update_pool_health_monitor(
+ context, obj_id['monitor_id'], obj_id['pool_id'], status)
+ else:
+ self.plugin.update_status(
+ context, model_mapping[obj_type], obj_id, status)
+
+ def pool_destroyed(self, context, pool_id=None):
"""Agent confirmation hook that a pool has been destroyed.
This method exists for subclasses to change the deletion
class LoadBalancerAgentApi(proxy.RpcProxy):
"""Plugin side of plugin to agent RPC API."""
- BASE_RPC_API_VERSION = '1.0'
+ BASE_RPC_API_VERSION = '2.0'
# history
# 1.0 Initial version
# 1.1 Support agent_updated call
+ # 2.0 Generic API for agent based drivers
+ # - modify/reload/destroy_pool methods were removed;
+ # - added methods to handle create/update/delete for every lbaas
+ # object individually;
def __init__(self, topic):
super(LoadBalancerAgentApi, self).__init__(
topic, default_version=self.BASE_RPC_API_VERSION)
- def reload_pool(self, context, pool_id, host):
+ def _cast(self, context, method_name, method_args, host, version=None):
return self.cast(
context,
- self.make_msg('reload_pool', pool_id=pool_id, host=host),
- topic='%s.%s' % (self.topic, host)
+ self.make_msg(method_name, **method_args),
+ topic='%s.%s' % (self.topic, host),
+ version=version
)
- def destroy_pool(self, context, pool_id, host):
- return self.cast(
- context,
- self.make_msg('destroy_pool', pool_id=pool_id, host=host),
- topic='%s.%s' % (self.topic, host)
- )
+ def create_vip(self, context, vip, host):
+ return self._cast(context, 'create_vip', {'vip': vip}, host)
- def modify_pool(self, context, pool_id, host):
- return self.cast(
- context,
- self.make_msg('modify_pool', pool_id=pool_id, host=host),
- topic='%s.%s' % (self.topic, host)
- )
+ def update_vip(self, context, old_vip, vip, host):
+ return self._cast(context, 'update_vip',
+ {'old_vip': old_vip, 'vip': vip}, host)
+
+ def delete_vip(self, context, vip, host):
+ return self._cast(context, 'delete_vip', {'vip': vip}, host)
+
+ def create_pool(self, context, pool, host, driver_name):
+ return self._cast(context, 'create_pool',
+ {'pool': pool, 'driver_name': driver_name}, host)
+
+ def update_pool(self, context, old_pool, pool, host):
+ return self._cast(context, 'update_pool',
+ {'old_pool': old_pool, 'pool': pool}, host)
+
+ def delete_pool(self, context, pool, host):
+ return self._cast(context, 'delete_pool', {'pool': pool}, host)
+
+ def create_member(self, context, member, host):
+ return self._cast(context, 'create_member', {'member': member}, host)
+
+ def update_member(self, context, old_member, member, host):
+ return self._cast(context, 'update_member',
+ {'old_member': old_member, 'member': member}, host)
+
+ def delete_member(self, context, member, host):
+ return self._cast(context, 'delete_member', {'member': member}, host)
+
+ def create_pool_health_monitor(self, context, health_monitor, pool_id,
+ host):
+ return self._cast(context, 'create_pool_health_monitor',
+ {'health_monitor': health_monitor,
+ 'pool_id': pool_id}, host)
+
+ def update_pool_health_monitor(self, context, old_health_monitor,
+ health_monitor, pool_id, host):
+ return self._cast(context, 'update_pool_health_monitor',
+ {'old_health_monitor': old_health_monitor,
+ 'health_monitor': health_monitor,
+ 'pool_id': pool_id}, host)
+
+ def delete_pool_health_monitor(self, context, health_monitor, pool_id,
+ host):
+ return self._cast(context, 'delete_pool_health_monitor',
+ {'health_monitor': health_monitor,
+ 'pool_id': pool_id}, host)
def agent_updated(self, context, admin_state_up, host):
- return self.cast(
- context,
- self.make_msg('agent_updated',
- payload={'admin_state_up': admin_state_up}),
- topic='%s.%s' % (self.topic, host),
- version='1.1'
- )
+ return self._cast(context, 'agent_updated',
+ {'payload': {'admin_state_up': admin_state_up}},
+ host)
-class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
+class AgentBasedPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
+
+ # name of device driver that should be used by the agent;
+ # vendor specific plugin drivers must override it;
+ device_driver = None
def __init__(self, plugin):
+ if not self.device_driver:
+ raise DriverNotSpecified()
+
self.agent_rpc = LoadBalancerAgentApi(TOPIC_LOADBALANCER_AGENT)
- self.callbacks = LoadBalancerCallbacks(plugin)
- self.conn = rpc.create_connection(new=True)
- self.conn.create_consumer(
- TOPIC_PROCESS_ON_HOST,
- self.callbacks.create_rpc_dispatcher(),
- fanout=False)
- self.conn.consume_in_thread()
self.plugin = plugin
+ self._set_callbacks_on_plugin()
self.plugin.agent_notifiers.update(
{q_const.AGENT_TYPE_LOADBALANCER: self.agent_rpc})
self.pool_scheduler = importutils.import_object(
cfg.CONF.loadbalancer_pool_scheduler_driver)
+ def _set_callbacks_on_plugin(self):
+ # other agent based plugin driver might already set callbacks on plugin
+ if hasattr(self.plugin, 'agent_callbacks'):
+ return
+
+ self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin)
+ self.plugin.conn = rpc.create_connection(new=True)
+ self.plugin.conn.create_consumer(
+ TOPIC_LOADBALANCER_PLUGIN,
+ self.plugin.agent_callbacks.create_rpc_dispatcher(),
+ fanout=False)
+ self.plugin.conn.consume_in_thread()
+
def get_pool_agent(self, context, pool_id):
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id)
if not agent:
def create_vip(self, context, vip):
agent = self.get_pool_agent(context, vip['pool_id'])
- self.agent_rpc.reload_pool(context, vip['pool_id'], agent['host'])
+ self.agent_rpc.create_vip(context, vip, agent['host'])
def update_vip(self, context, old_vip, vip):
agent = self.get_pool_agent(context, vip['pool_id'])
if vip['status'] in ACTIVE_PENDING:
- self.agent_rpc.reload_pool(context, vip['pool_id'], agent['host'])
+ self.agent_rpc.update_vip(context, old_vip, vip, agent['host'])
else:
- self.agent_rpc.destroy_pool(context, vip['pool_id'], agent['host'])
+ self.agent_rpc.delete_vip(context, vip, agent['host'])
def delete_vip(self, context, vip):
self.plugin._delete_db_vip(context, vip['id'])
agent = self.get_pool_agent(context, vip['pool_id'])
- self.agent_rpc.destroy_pool(context, vip['pool_id'], agent['host'])
+ self.agent_rpc.delete_vip(context, vip, agent['host'])
def create_pool(self, context, pool):
- if not self.pool_scheduler.schedule(self.plugin, context, pool):
+ agent = self.pool_scheduler.schedule(self.plugin, context, pool,
+ self.device_driver)
+ if not agent:
raise lbaas_agentscheduler.NoEligibleLbaasAgent(pool_id=pool['id'])
- # don't notify here because a pool needs a vip to be useful
+ self.agent_rpc.create_pool(context, pool, agent['host'],
+ self.device_driver)
def update_pool(self, context, old_pool, pool):
agent = self.get_pool_agent(context, pool['id'])
if pool['status'] in ACTIVE_PENDING:
- if pool['vip_id'] is not None:
- self.agent_rpc.reload_pool(context, pool['id'], agent['host'])
+ self.agent_rpc.update_pool(context, old_pool, pool,
+ agent['host'])
else:
- self.agent_rpc.destroy_pool(context, pool['id'], agent['host'])
+ self.agent_rpc.delete_pool(context, pool, agent['host'])
def delete_pool(self, context, pool):
+ # get agent first to know host as binding will be deleted
+ # after pool is deleted from db
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool['id'])
- if agent:
- self.agent_rpc.destroy_pool(context, pool['id'],
- agent['agent']['host'])
self.plugin._delete_db_pool(context, pool['id'])
+ if agent:
+ self.agent_rpc.delete_pool(context, pool, agent['agent']['host'])
def create_member(self, context, member):
agent = self.get_pool_agent(context, member['pool_id'])
- self.agent_rpc.modify_pool(context, member['pool_id'], agent['host'])
+ self.agent_rpc.create_member(context, member, agent['host'])
def update_member(self, context, old_member, member):
+ agent = self.get_pool_agent(context, member['pool_id'])
# member may change pool id
if member['pool_id'] != old_member['pool_id']:
- agent = self.plugin.get_lbaas_agent_hosting_pool(
+ old_pool_agent = self.plugin.get_lbaas_agent_hosting_pool(
context, old_member['pool_id'])
- if agent:
- self.agent_rpc.modify_pool(context,
- old_member['pool_id'],
- agent['agent']['host'])
- agent = self.get_pool_agent(context, member['pool_id'])
- self.agent_rpc.modify_pool(context, member['pool_id'], agent['host'])
+ if old_pool_agent:
+ self.agent_rpc.delete_member(context, old_member,
+ old_pool_agent['agent']['host'])
+ self.agent_rpc.create_member(context, member, agent['host'])
+ else:
+ self.agent_rpc.update_member(context, old_member, member,
+ agent['host'])
def delete_member(self, context, member):
self.plugin._delete_db_member(context, member['id'])
agent = self.get_pool_agent(context, member['pool_id'])
- self.agent_rpc.modify_pool(context, member['pool_id'], agent['host'])
-
- def update_health_monitor(self, context, old_health_monitor,
- health_monitor, pool_id):
- # monitors are unused here because agent will fetch what is necessary
- agent = self.get_pool_agent(context, pool_id)
- self.agent_rpc.modify_pool(context, pool_id, agent['host'])
+ self.agent_rpc.delete_member(context, member, agent['host'])
def create_pool_health_monitor(self, context, healthmon, pool_id):
# healthmon is not used here
agent = self.get_pool_agent(context, pool_id)
- self.agent_rpc.modify_pool(context, pool_id, agent['host'])
+ self.agent_rpc.create_pool_health_monitor(context, healthmon,
+ pool_id, agent['host'])
+
+ def update_pool_health_monitor(self, context, old_health_monitor,
+ health_monitor, pool_id):
+ agent = self.get_pool_agent(context, pool_id)
+ self.agent_rpc.update_pool_health_monitor(context, old_health_monitor,
+ health_monitor, pool_id,
+ agent['host'])
def delete_pool_health_monitor(self, context, health_monitor, pool_id):
self.plugin._delete_db_pool_health_monitor(
context, health_monitor['id'], pool_id
)
- # healthmon_id is not used here
agent = self.get_pool_agent(context, pool_id)
- self.agent_rpc.modify_pool(context, pool_id, agent['host'])
+ self.agent_rpc.delete_pool_health_monitor(context, health_monitor,
+ pool_id, agent['host'])
def stats(self, context, pool_id):
pass
+
+
+class HaproxyOnHostPluginDriver(AgentBasedPluginDriver):
+ #TODO(obondarev): change hardcoded driver name
+ # to namespace_driver.DRIVER_NAME after moving HaproxyOnHostPluginDriver
+ # to a separate file (follow-up patch)
+ device_driver = 'haproxy_ns'
# Anything to do here? the hm is not connected to the graph yet
pass
- def update_health_monitor(self, context, old_health_monitor,
- health_monitor,
- pool_id):
+ def update_pool_health_monitor(self, context, old_health_monitor,
+ health_monitor,
+ pool_id):
self._handle_pool_health_monitor(context, health_monitor, pool_id)
def create_pool_health_monitor(self, context,
).filter_by(monitor_id=hm['id']).join(ldb.Pool)
for assoc in qry:
driver = self._get_driver_for_pool(context, assoc['pool_id'])
- driver.update_health_monitor(context, old_hm,
- hm, assoc['pool_id'])
+ driver.update_pool_health_monitor(context, old_hm,
+ hm, assoc['pool_id'])
return hm
def _delete_db_pool_health_monitor(self, context, hm_id, pool_id):
def delete_member(self, context, member):
self.plugin._delete_db_member(context, member["id"])
- def update_health_monitor(self, context, old_health_monitor,
- health_monitor,
- pool_association):
+ def update_pool_health_monitor(self, context, old_health_monitor,
+ health_monitor,
+ pool_association):
pass
def create_pool_health_monitor(self, context,
import mock
+from neutron.plugins.common import constants
from neutron.services.loadbalancer.drivers.haproxy import (
agent_manager as manager
)
from neutron.tests import base
-class TestLogicalDeviceCache(base.BaseTestCase):
- def setUp(self):
- super(TestLogicalDeviceCache, self).setUp()
- self.cache = manager.LogicalDeviceCache()
-
- def test_put(self):
- fake_device = {
- 'vip': {'port_id': 'port_id'},
- 'pool': {'id': 'pool_id'}
- }
- self.cache.put(fake_device)
-
- self.assertEqual(len(self.cache.devices), 1)
- self.assertEqual(len(self.cache.port_lookup), 1)
- self.assertEqual(len(self.cache.pool_lookup), 1)
-
- def test_double_put(self):
- fake_device = {
- 'vip': {'port_id': 'port_id'},
- 'pool': {'id': 'pool_id'}
- }
- self.cache.put(fake_device)
- self.cache.put(fake_device)
-
- self.assertEqual(len(self.cache.devices), 1)
- self.assertEqual(len(self.cache.port_lookup), 1)
- self.assertEqual(len(self.cache.pool_lookup), 1)
-
- def test_remove_in_cache(self):
- fake_device = {
- 'vip': {'port_id': 'port_id'},
- 'pool': {'id': 'pool_id'}
- }
- self.cache.put(fake_device)
-
- self.assertEqual(len(self.cache.devices), 1)
-
- self.cache.remove(fake_device)
-
- self.assertFalse(len(self.cache.devices))
- self.assertFalse(self.cache.port_lookup)
- self.assertFalse(self.cache.pool_lookup)
-
- def test_remove_in_cache_same_object(self):
- fake_device = {
- 'vip': {'port_id': 'port_id'},
- 'pool': {'id': 'pool_id'}
- }
- self.cache.put(fake_device)
-
- self.assertEqual(len(self.cache.devices), 1)
-
- self.cache.remove(set(self.cache.devices).pop())
-
- self.assertFalse(len(self.cache.devices))
- self.assertFalse(self.cache.port_lookup)
- self.assertFalse(self.cache.pool_lookup)
-
- def test_remove_by_pool_id(self):
- fake_device = {
- 'vip': {'port_id': 'port_id'},
- 'pool': {'id': 'pool_id'}
- }
- self.cache.put(fake_device)
-
- self.assertEqual(len(self.cache.devices), 1)
-
- self.cache.remove_by_pool_id('pool_id')
-
- self.assertFalse(len(self.cache.devices))
- self.assertFalse(self.cache.port_lookup)
- self.assertFalse(self.cache.pool_lookup)
-
- def test_get_by_pool_id(self):
- fake_device = {
- 'vip': {'port_id': 'port_id'},
- 'pool': {'id': 'pool_id'}
- }
- self.cache.put(fake_device)
-
- dev = self.cache.get_by_pool_id('pool_id')
-
- self.assertEqual(dev.pool_id, 'pool_id')
- self.assertEqual(dev.port_id, 'port_id')
-
- def test_get_by_port_id(self):
- fake_device = {
- 'vip': {'port_id': 'port_id'},
- 'pool': {'id': 'pool_id'}
- }
- self.cache.put(fake_device)
-
- dev = self.cache.get_by_port_id('port_id')
-
- self.assertEqual(dev.pool_id, 'pool_id')
- self.assertEqual(dev.port_id, 'port_id')
-
- def test_get_pool_ids(self):
- fake_device = {
- 'vip': {'port_id': 'port_id'},
- 'pool': {'id': 'pool_id'}
- }
- self.cache.put(fake_device)
-
- self.assertEqual(self.cache.get_pool_ids(), ['pool_id'])
-
-
class TestManager(base.BaseTestCase):
def setUp(self):
super(TestManager, self).setUp()
self.addCleanup(mock.patch.stopall)
mock_conf = mock.Mock()
- mock_conf.interface_driver = 'intdriver'
- mock_conf.device_driver = 'devdriver'
- mock_conf.AGENT.root_helper = 'sudo'
- mock_conf.loadbalancer_state_path = '/the/path'
+ mock_conf.device_driver = ['devdriver']
self.mock_importer = mock.patch.object(manager, 'importutils').start()
self.mgr = manager.LbaasAgentManager(mock_conf)
self.rpc_mock = rpc_mock_cls.return_value
self.log = mock.patch.object(manager, 'LOG').start()
+ self.driver_mock = mock.Mock()
+ self.mgr.device_drivers = {'devdriver': self.driver_mock}
+ self.mgr.instance_mapping = {'1': 'devdriver', '2': 'devdriver'}
self.mgr.needs_resync = False
def test_initialize_service_hook(self):
self.assertFalse(sync.called)
def test_collect_stats(self):
- with mock.patch.object(self.mgr, 'cache') as cache:
- cache.get_pool_ids.return_value = ['1', '2']
- self.mgr.collect_stats(mock.Mock())
- self.rpc_mock.update_pool_stats.assert_has_calls([
- mock.call('1', mock.ANY),
- mock.call('2', mock.ANY)
- ])
+ self.mgr.collect_stats(mock.Mock())
+ self.rpc_mock.update_pool_stats.assert_has_calls([
+ mock.call('1', mock.ANY),
+ mock.call('2', mock.ANY)
+ ])
def test_collect_stats_exception(self):
- with mock.patch.object(self.mgr, 'cache') as cache:
- cache.get_pool_ids.return_value = ['1', '2']
- with mock.patch.object(self.mgr, 'driver') as driver:
- driver.get_stats.side_effect = Exception
-
- self.mgr.collect_stats(mock.Mock())
+ self.driver_mock.get_stats.side_effect = Exception
- self.assertFalse(self.rpc_mock.called)
- self.assertTrue(self.mgr.needs_resync)
- self.assertTrue(self.log.exception.called)
+ self.mgr.collect_stats(mock.Mock())
- def test_vip_plug_callback(self):
- self.mgr._vip_plug_callback('plug', {'id': 'id'})
- self.rpc_mock.plug_vip_port.assert_called_once_with('id')
-
- def test_vip_unplug_callback(self):
- self.mgr._vip_plug_callback('unplug', {'id': 'id'})
- self.rpc_mock.unplug_vip_port.assert_called_once_with('id')
+ self.assertFalse(self.rpc_mock.called)
+ self.assertTrue(self.mgr.needs_resync)
+ self.assertTrue(self.log.exception.called)
- def _sync_state_helper(self, cache, ready, refreshed, destroyed):
+ def _sync_state_helper(self, ready, reloaded, destroyed):
with contextlib.nested(
- mock.patch.object(self.mgr, 'cache'),
- mock.patch.object(self.mgr, 'refresh_device'),
- mock.patch.object(self.mgr, 'destroy_device')
- ) as (mock_cache, refresh, destroy):
+ mock.patch.object(self.mgr, '_reload_pool'),
+ mock.patch.object(self.mgr, '_destroy_pool')
+ ) as (reload, destroy):
- mock_cache.get_pool_ids.return_value = cache
self.rpc_mock.get_ready_devices.return_value = ready
self.mgr.sync_state()
- self.assertEqual(len(refreshed), len(refresh.mock_calls))
+ self.assertEqual(len(reloaded), len(reload.mock_calls))
self.assertEqual(len(destroyed), len(destroy.mock_calls))
- refresh.assert_has_calls([mock.call(i) for i in refreshed])
+ reload.assert_has_calls([mock.call(i) for i in reloaded])
destroy.assert_has_calls([mock.call(i) for i in destroyed])
self.assertFalse(self.mgr.needs_resync)
def test_sync_state_all_known(self):
- self._sync_state_helper(['1', '2'], ['1', '2'], ['1', '2'], [])
+ self._sync_state_helper(['1', '2'], ['1', '2'], [])
def test_sync_state_all_unknown(self):
- self._sync_state_helper([], ['1', '2'], ['1', '2'], [])
+ self.mgr.instance_mapping = {}
+ self._sync_state_helper(['1', '2'], ['1', '2'], [])
def test_sync_state_destroy_all(self):
- self._sync_state_helper(['1', '2'], [], [], ['1', '2'])
+ self._sync_state_helper([], [], ['1', '2'])
def test_sync_state_both(self):
- self._sync_state_helper(['1'], ['2'], ['2'], ['1'])
+ self.mgr.instance_mapping = {'1': 'devdriver'}
+ self._sync_state_helper(['2'], ['2'], ['1'])
def test_sync_state_exception(self):
self.rpc_mock.get_ready_devices.side_effect = Exception
self.assertTrue(self.log.exception.called)
self.assertTrue(self.mgr.needs_resync)
- def test_refresh_device_exists(self):
- config = self.rpc_mock.get_logical_device.return_value
-
- with mock.patch.object(self.mgr, 'driver') as driver:
- with mock.patch.object(self.mgr, 'cache') as cache:
- driver.exists.return_value = True
-
- self.mgr.refresh_device(config)
-
- driver.exists.assert_called_once_with(config)
- driver.update.assert_called_once_with(config)
- cache.put.assert_called_once_with(config)
- self.assertFalse(self.mgr.needs_resync)
-
- def test_refresh_device_new(self):
- config = self.rpc_mock.get_logical_device.return_value
-
- with mock.patch.object(self.mgr, 'driver') as driver:
- with mock.patch.object(self.mgr, 'cache') as cache:
- driver.exists.return_value = False
-
- self.mgr.refresh_device(config)
-
- driver.exists.assert_called_once_with(config)
- driver.create.assert_called_once_with(config)
- cache.put.assert_called_once_with(config)
- self.assertFalse(self.mgr.needs_resync)
-
- def test_refresh_device_exception(self):
- config = self.rpc_mock.get_logical_device.return_value
-
- with mock.patch.object(self.mgr, 'driver') as driver:
- with mock.patch.object(self.mgr, 'cache') as cache:
- driver.exists.side_effect = Exception
- self.mgr.refresh_device(config)
-
- driver.exists.assert_called_once_with(config)
- self.assertTrue(self.mgr.needs_resync)
- self.assertTrue(self.log.exception.called)
- self.assertFalse(cache.put.called)
-
- def test_destroy_device_known(self):
- with mock.patch.object(self.mgr, 'driver') as driver:
- with mock.patch.object(self.mgr, 'cache') as cache:
- cache.get_by_pool_id.return_value = True
-
- self.mgr.destroy_device('pool_id')
- cache.get_by_pool_id.assert_called_once_with('pool_id')
- driver.destroy.assert_called_once_with('pool_id')
- self.rpc_mock.pool_destroyed.assert_called_once_with(
- 'pool_id'
- )
- cache.remove.assert_called_once_with(True)
- self.assertFalse(self.mgr.needs_resync)
-
- def test_destroy_device_unknown(self):
- with mock.patch.object(self.mgr, 'driver') as driver:
- with mock.patch.object(self.mgr, 'cache') as cache:
- cache.get_by_pool_id.return_value = None
-
- self.mgr.destroy_device('pool_id')
- cache.get_by_pool_id.assert_called_once_with('pool_id')
- self.assertFalse(driver.destroy.called)
-
- def test_destroy_device_exception(self):
- with mock.patch.object(self.mgr, 'driver') as driver:
- with mock.patch.object(self.mgr, 'cache') as cache:
- cache.get_by_pool_id.return_value = True
- driver.destroy.side_effect = Exception
-
- self.mgr.destroy_device('pool_id')
- cache.get_by_pool_id.assert_called_once_with('pool_id')
-
- self.assertTrue(self.log.exception.called)
- self.assertTrue(self.mgr.needs_resync)
-
- def test_remove_orphans(self):
- with mock.patch.object(self.mgr, 'driver') as driver:
- with mock.patch.object(self.mgr, 'cache') as cache:
- cache.get_pool_ids.return_value = ['1', '2']
- self.mgr.remove_orphans()
-
- driver.remove_orphans.assert_called_once_with(['1', '2'])
-
def test_reload_pool(self):
- with mock.patch.object(self.mgr, 'refresh_device') as refresh:
- self.mgr.reload_pool(mock.Mock(), pool_id='pool_id')
- refresh.assert_called_once_with('pool_id')
-
- def test_modify_pool_known(self):
- with mock.patch.object(self.mgr, 'refresh_device') as refresh:
- with mock.patch.object(self.mgr, 'cache') as cache:
- cache.get_by_pool_id.return_value = True
-
- self.mgr.reload_pool(mock.Mock(), pool_id='pool_id')
-
- refresh.assert_called_once_with('pool_id')
-
- def test_modify_pool_unknown(self):
- with mock.patch.object(self.mgr, 'refresh_device') as refresh:
- with mock.patch.object(self.mgr, 'cache') as cache:
- cache.get_by_pool_id.return_value = False
+ config = {'driver': 'devdriver'}
+ self.rpc_mock.get_logical_device.return_value = config
+ pool_id = 'new_id'
+ self.assertNotIn(pool_id, self.mgr.instance_mapping)
+
+ self.mgr._reload_pool(pool_id)
+
+ self.driver_mock.deploy_instance.assert_called_once_with(config)
+ self.assertIn(pool_id, self.mgr.instance_mapping)
+ self.rpc_mock.pool_deployed.assert_called_once_with(pool_id)
+
+ def test_reload_pool_driver_not_found(self):
+ config = {'driver': 'unknown_driver'}
+ self.rpc_mock.get_logical_device.return_value = config
+ pool_id = 'new_id'
+ self.assertNotIn(pool_id, self.mgr.instance_mapping)
+
+ self.mgr._reload_pool(pool_id)
+
+ self.assertTrue(self.log.error.called)
+ self.assertFalse(self.driver_mock.deploy_instance.called)
+ self.assertNotIn(pool_id, self.mgr.instance_mapping)
+ self.assertFalse(self.rpc_mock.pool_deployed.called)
+
+ def test_reload_pool_exception_on_driver(self):
+ config = {'driver': 'devdriver'}
+ self.rpc_mock.get_logical_device.return_value = config
+ self.driver_mock.deploy_instance.side_effect = Exception
+ pool_id = 'new_id'
+ self.assertNotIn(pool_id, self.mgr.instance_mapping)
+
+ self.mgr._reload_pool(pool_id)
+
+ self.driver_mock.deploy_instance.assert_called_once_with(config)
+ self.assertNotIn(pool_id, self.mgr.instance_mapping)
+ self.assertFalse(self.rpc_mock.pool_deployed.called)
+ self.assertTrue(self.log.exception.called)
+ self.assertTrue(self.mgr.needs_resync)
- self.mgr.modify_pool(mock.Mock(), pool_id='pool_id')
+ def test_destroy_pool(self):
+ pool_id = '1'
+ self.assertIn(pool_id, self.mgr.instance_mapping)
- self.assertFalse(refresh.called)
+ self.mgr._destroy_pool(pool_id)
- def test_destroy_pool_known(self):
- with mock.patch.object(self.mgr, 'destroy_device') as destroy:
- with mock.patch.object(self.mgr, 'cache') as cache:
- cache.get_by_pool_id.return_value = True
+ self.driver_mock.undeploy_instance.assert_called_once_with(pool_id)
+ self.assertNotIn(pool_id, self.mgr.instance_mapping)
+ self.rpc_mock.pool_destroyed.assert_called_once_with(pool_id)
+ self.assertFalse(self.mgr.needs_resync)
- self.mgr.destroy_pool(mock.Mock(), pool_id='pool_id')
+ def test_destroy_pool_exception_on_driver(self):
+ pool_id = '1'
+ self.assertIn(pool_id, self.mgr.instance_mapping)
+ self.driver_mock.undeploy_instance.side_effect = Exception
- destroy.assert_called_once_with('pool_id')
+ self.mgr._destroy_pool(pool_id)
- def test_destroy_pool_unknown(self):
- with mock.patch.object(self.mgr, 'destroy_device') as destroy:
- with mock.patch.object(self.mgr, 'cache') as cache:
- cache.get_by_pool_id.return_value = False
+ self.driver_mock.undeploy_instance.assert_called_once_with(pool_id)
+ self.assertIn(pool_id, self.mgr.instance_mapping)
+ self.assertFalse(self.rpc_mock.pool_destroyed.called)
+ self.assertTrue(self.log.exception.called)
+ self.assertTrue(self.mgr.needs_resync)
- self.mgr.destroy_pool(mock.Mock(), pool_id='pool_id')
+ def test_get_driver_unknown_device(self):
+ self.assertRaises(manager.DeviceNotFoundOnAgent,
+ self.mgr._get_driver, 'unknown')
- self.assertFalse(destroy.called)
+ def test_remove_orphans(self):
+ self.mgr.remove_orphans()
+ self.driver_mock.remove_orphans.assert_called_once_with(['1', '2'])
+
+ def test_create_vip(self):
+ vip = {'id': 'id1', 'pool_id': '1'}
+ self.mgr.create_vip(mock.Mock(), vip)
+ self.driver_mock.create_vip.assert_called_once_with(vip)
+ self.rpc_mock.update_status.assert_called_once_with('vip', vip['id'],
+ constants.ACTIVE)
+
+ def test_create_vip_failed(self):
+ vip = {'id': 'id1', 'pool_id': '1'}
+ self.driver_mock.create_vip.side_effect = Exception
+ self.mgr.create_vip(mock.Mock(), vip)
+ self.driver_mock.create_vip.assert_called_once_with(vip)
+ self.rpc_mock.update_status.assert_called_once_with('vip', vip['id'],
+ constants.ERROR)
+
+ def test_update_vip(self):
+ old_vip = {'id': 'id1'}
+ vip = {'id': 'id1', 'pool_id': '1'}
+ self.mgr.update_vip(mock.Mock(), old_vip, vip)
+ self.driver_mock.update_vip.assert_called_once_with(old_vip, vip)
+ self.rpc_mock.update_status.assert_called_once_with('vip', vip['id'],
+ constants.ACTIVE)
+
+ def test_update_vip_failed(self):
+ old_vip = {'id': 'id1'}
+ vip = {'id': 'id1', 'pool_id': '1'}
+ self.driver_mock.update_vip.side_effect = Exception
+ self.mgr.update_vip(mock.Mock(), old_vip, vip)
+ self.driver_mock.update_vip.assert_called_once_with(old_vip, vip)
+ self.rpc_mock.update_status.assert_called_once_with('vip', vip['id'],
+ constants.ERROR)
+
+ def test_delete_vip(self):
+ vip = {'id': 'id1', 'pool_id': '1'}
+ self.mgr.delete_vip(mock.Mock(), vip)
+ self.driver_mock.delete_vip.assert_called_once_with(vip)
+
+ def test_create_pool(self):
+ pool = {'id': 'id1'}
+ self.assertNotIn(pool['id'], self.mgr.instance_mapping)
+ self.mgr.create_pool(mock.Mock(), pool, 'devdriver')
+ self.driver_mock.create_pool.assert_called_once_with(pool)
+ self.rpc_mock.update_status.assert_called_once_with('pool', pool['id'],
+ constants.ACTIVE)
+ self.assertIn(pool['id'], self.mgr.instance_mapping)
+
+ def test_create_pool_failed(self):
+ pool = {'id': 'id1'}
+ self.assertNotIn(pool['id'], self.mgr.instance_mapping)
+ self.driver_mock.create_pool.side_effect = Exception
+ self.mgr.create_pool(mock.Mock(), pool, 'devdriver')
+ self.driver_mock.create_pool.assert_called_once_with(pool)
+ self.rpc_mock.update_status.assert_called_once_with('pool', pool['id'],
+ constants.ERROR)
+ self.assertNotIn(pool['id'], self.mgr.instance_mapping)
+
+ def test_update_pool(self):
+ old_pool = {'id': '1'}
+ pool = {'id': '1'}
+ self.mgr.update_pool(mock.Mock(), old_pool, pool)
+ self.driver_mock.update_pool.assert_called_once_with(old_pool, pool)
+ self.rpc_mock.update_status.assert_called_once_with('pool', pool['id'],
+ constants.ACTIVE)
+
+ def test_update_pool_failed(self):
+ old_pool = {'id': '1'}
+ pool = {'id': '1'}
+ self.driver_mock.update_pool.side_effect = Exception
+ self.mgr.update_pool(mock.Mock(), old_pool, pool)
+ self.driver_mock.update_pool.assert_called_once_with(old_pool, pool)
+ self.rpc_mock.update_status.assert_called_once_with('pool', pool['id'],
+ constants.ERROR)
+
+ def test_delete_pool(self):
+ pool = {'id': '1'}
+ self.assertIn(pool['id'], self.mgr.instance_mapping)
+ self.mgr.delete_pool(mock.Mock(), pool)
+ self.driver_mock.delete_pool.assert_called_once_with(pool)
+ self.assertNotIn(pool['id'], self.mgr.instance_mapping)
+
+ def test_create_member(self):
+ member = {'id': 'id1', 'pool_id': '1'}
+ self.mgr.create_member(mock.Mock(), member)
+ self.driver_mock.create_member.assert_called_once_with(member)
+ self.rpc_mock.update_status.assert_called_once_with('member',
+ member['id'],
+ constants.ACTIVE)
+
+ def test_create_member_failed(self):
+ member = {'id': 'id1', 'pool_id': '1'}
+ self.driver_mock.create_member.side_effect = Exception
+ self.mgr.create_member(mock.Mock(), member)
+ self.driver_mock.create_member.assert_called_once_with(member)
+ self.rpc_mock.update_status.assert_called_once_with('member',
+ member['id'],
+ constants.ERROR)
+
+ def test_update_member(self):
+ old_member = {'id': 'id1'}
+ member = {'id': 'id1', 'pool_id': '1'}
+ self.mgr.update_member(mock.Mock(), old_member, member)
+ self.driver_mock.update_member.assert_called_once_with(old_member,
+ member)
+ self.rpc_mock.update_status.assert_called_once_with('member',
+ member['id'],
+ constants.ACTIVE)
+
+ def test_update_member_failed(self):
+ old_member = {'id': 'id1'}
+ member = {'id': 'id1', 'pool_id': '1'}
+ self.driver_mock.update_member.side_effect = Exception
+ self.mgr.update_member(mock.Mock(), old_member, member)
+ self.driver_mock.update_member.assert_called_once_with(old_member,
+ member)
+ self.rpc_mock.update_status.assert_called_once_with('member',
+ member['id'],
+ constants.ERROR)
+
+ def test_delete_member(self):
+ member = {'id': 'id1', 'pool_id': '1'}
+ self.mgr.delete_member(mock.Mock(), member)
+ self.driver_mock.delete_member.assert_called_once_with(member)
+
+ def test_create_monitor(self):
+ monitor = {'id': 'id1'}
+ assoc_id = {'monitor_id': monitor['id'], 'pool_id': '1'}
+ self.mgr.create_pool_health_monitor(mock.Mock(), monitor, '1')
+ self.driver_mock.create_pool_health_monitor.assert_called_once_with(
+ monitor, '1')
+ self.rpc_mock.update_status.assert_called_once_with('health_monitor',
+ assoc_id,
+ constants.ACTIVE)
+
+ def test_create_monitor_failed(self):
+ monitor = {'id': 'id1'}
+ assoc_id = {'monitor_id': monitor['id'], 'pool_id': '1'}
+ self.driver_mock.create_pool_health_monitor.side_effect = Exception
+ self.mgr.create_pool_health_monitor(mock.Mock(), monitor, '1')
+ self.driver_mock.create_pool_health_monitor.assert_called_once_with(
+ monitor, '1')
+ self.rpc_mock.update_status.assert_called_once_with('health_monitor',
+ assoc_id,
+ constants.ERROR)
+
+ def test_update_monitor(self):
+ monitor = {'id': 'id1'}
+ assoc_id = {'monitor_id': monitor['id'], 'pool_id': '1'}
+ self.mgr.update_pool_health_monitor(mock.Mock(), monitor, monitor, '1')
+ self.driver_mock.update_pool_health_monitor.assert_called_once_with(
+ monitor, monitor, '1')
+ self.rpc_mock.update_status.assert_called_once_with('health_monitor',
+ assoc_id,
+ constants.ACTIVE)
+
+ def test_update_monitor_failed(self):
+ monitor = {'id': 'id1'}
+ assoc_id = {'monitor_id': monitor['id'], 'pool_id': '1'}
+ self.driver_mock.update_pool_health_monitor.side_effect = Exception
+ self.mgr.update_pool_health_monitor(mock.Mock(), monitor, monitor, '1')
+ self.driver_mock.update_pool_health_monitor.assert_called_once_with(
+ monitor, monitor, '1')
+ self.rpc_mock.update_status.assert_called_once_with('health_monitor',
+ assoc_id,
+ constants.ERROR)
+
+ def test_delete_monitor(self):
+ monitor = {'id': 'id1'}
+ self.mgr.delete_pool_health_monitor(mock.Mock(), monitor, '1')
+ self.driver_mock.delete_pool_health_monitor.assert_called_once_with(
+ monitor, '1')
+
+ def test_agent_disabled(self):
+ payload = {'admin_state_up': False}
+ self.mgr.agent_updated(mock.Mock(), payload)
+ self.driver_mock.undeploy_instance.assert_has_calls(
+ [mock.call('1'), mock.call('2')])
self.make_msg.assert_called_once_with(
'get_logical_device',
- pool_id='pool_id',
- host='host')
+ pool_id='pool_id')
self.mock_call.assert_called_once_with(
mock.sentinel.context,
self.make_msg.assert_called_once_with(
'pool_destroyed',
- pool_id='pool_id',
- host='host')
+ pool_id='pool_id')
+
+ self.mock_call.assert_called_once_with(
+ mock.sentinel.context,
+ self.make_msg.return_value,
+ topic='topic'
+ )
+
+ def test_pool_deployed(self):
+ self.assertEqual(
+ self.api.pool_deployed('pool_id'),
+ self.mock_call.return_value
+ )
+
+ self.make_msg.assert_called_once_with(
+ 'pool_deployed',
+ pool_id='pool_id')
+
+ self.mock_call.assert_called_once_with(
+ mock.sentinel.context,
+ self.make_msg.return_value,
+ topic='topic'
+ )
+
+ def test_update_status(self):
+ self.assertEqual(
+ self.api.update_status('pool', 'pool_id', 'ACTIVE'),
+ self.mock_call.return_value
+ )
+
+ self.make_msg.assert_called_once_with(
+ 'update_status',
+ obj_type='pool',
+ obj_id='pool_id',
+ status='ACTIVE')
self.mock_call.assert_called_once_with(
mock.sentinel.context,
# @author: Oleg Bondarev (obondarev@mirantis.com)
import contextlib
-import mock
-from oslo.config import cfg as config
+import mock
from neutron.services.loadbalancer.drivers.haproxy import cfg
from neutron.tests import base
'\n'.join(test_config))
def test_build_global(self):
- if not hasattr(config.CONF, 'user_group'):
- config.CONF.register_opt(config.StrOpt('user_group'))
- config.CONF.set_override('user_group', 'test_group')
expected_opts = ['global',
'\tdaemon',
'\tuser nobody',
'\tlog /dev/log local0',
'\tlog /dev/log local1 notice',
'\tstats socket test_path mode 0666 level user']
- opts = cfg._build_global(mock.Mock(), 'test_path')
+ opts = cfg._build_global(mock.Mock(), 'test_path', 'test_group')
self.assertEqual(expected_opts, list(opts))
- config.CONF.reset()
def test_build_defaults(self):
expected_opts = ['defaults',
'\ttimeout server 50000']
opts = cfg._build_defaults(mock.Mock())
self.assertEqual(expected_opts, list(opts))
- config.CONF.reset()
def test_build_frontend(self):
test_config = {'vip': {'id': 'vip_id',
# @author: Mark McClain, DreamHost
import contextlib
+
import mock
from neutron.common import exceptions
class TestHaproxyNSDriver(base.BaseTestCase):
def setUp(self):
super(TestHaproxyNSDriver, self).setUp()
+ self.addCleanup(mock.patch.stopall)
- self.vif_driver = mock.Mock()
- self.vip_plug_callback = mock.Mock()
+ conf = mock.Mock()
+ conf.haproxy.loadbalancer_state_path = '/the/path'
+ conf.interface_driver = 'intdriver'
+ conf.haproxy.user_group = 'test_group'
+ conf.AGENT.root_helper = 'sudo_test'
+ self.mock_importer = mock.patch.object(namespace_driver,
+ 'importutils').start()
+ self.rpc_mock = mock.Mock()
self.driver = namespace_driver.HaproxyNSDriver(
- 'sudo',
- '/the/path',
- self.vif_driver,
- self.vip_plug_callback
+ conf,
+ self.rpc_mock
)
+ self.vif_driver = mock.Mock()
+ self.driver.vif_driver = self.vif_driver
self.fake_config = {
'pool': {'id': 'pool_id'},
- 'vip': {'id': 'vip_id', 'port': {'id': 'port_id'}}
+ 'vip': {'id': 'vip_id', 'port': {'id': 'port_id'},
+ 'status': 'ACTIVE', 'admin_state_up': True}
}
+ def test_get_name(self):
+ self.assertEqual(self.driver.get_name(), namespace_driver.DRIVER_NAME)
+
def test_create(self):
with mock.patch.object(self.driver, '_plug') as plug:
with mock.patch.object(self.driver, '_spawn') as spawn:
self.driver._spawn(self.fake_config)
- mock_save.assert_called_once_with('conf', self.fake_config, 'sock')
+ mock_save.assert_called_once_with('conf', self.fake_config,
+ 'sock', 'test_group')
cmd = ['haproxy', '-f', 'conf', '-p', 'pid']
ip_wrap.assert_has_calls([
- mock.call('sudo', 'qlbaas-pool_id'),
+ mock.call('sudo_test', 'qlbaas-pool_id'),
mock.call().netns.execute(cmd)
])
- def test_destroy(self):
+ def test_undeploy_instance(self):
with contextlib.nested(
mock.patch.object(self.driver, '_get_state_file_path'),
mock.patch.object(namespace_driver, 'kill_pids_in_file'),
self.driver.pool_to_port_id['pool_id'] = 'port_id'
isdir.return_value = True
- self.driver.destroy('pool_id')
+ self.driver.undeploy_instance('pool_id')
- kill.assert_called_once_with('sudo', '/pool/pid')
+ kill.assert_called_once_with('sudo_test', '/pool/pid')
unplug.assert_called_once_with('qlbaas-pool_id', 'port_id')
- isdir.called_once_with('/pool')
+ isdir.assert_called_once_with('/pool')
rmtree.assert_called_once_with('/pool')
ip_wrap.assert_has_calls([
- mock.call('sudo', 'qlbaas-pool_id'),
+ mock.call('sudo_test', 'qlbaas-pool_id'),
mock.call().garbage_collect_namespace()
])
self.driver.exists('pool_id')
ip_wrap.assert_has_calls([
- mock.call('sudo'),
+ mock.call('sudo_test'),
mock.call().netns.exists('qlbaas-pool_id')
])
ip_net.prefixlen = 24
self.driver._plug('test_ns', test_port)
- self.vip_plug_callback.assert_called_once_with('plug', test_port)
+ self.rpc_mock.plug_vip_port.assert_called_once_with(
+ test_port['id'])
self.assertTrue(dev_exists.called)
self.vif_driver.plug.assert_called_once_with('net_id', 'port_id',
'test_interface',
'test_ns')
cmd = ['route', 'add', 'default', 'gw', '10.0.0.1']
ip_wrap.assert_has_calls([
- mock.call('sudo', namespace='test_ns'),
+ mock.call('sudo_test', namespace='test_ns'),
mock.call().netns.execute(cmd, check_exit_code=False),
])
ip_net.prefixlen = 24
self.driver._plug('test_ns', test_port)
- self.vip_plug_callback.assert_called_once_with('plug', test_port)
+ self.rpc_mock.plug_vip_port.assert_called_once_with(
+ test_port['id'])
self.assertTrue(dev_exists.called)
self.vif_driver.plug.assert_called_once_with('net_id', 'port_id',
'test_interface',
self.vif_driver.get_device_name.return_value = 'test_interface'
self.driver._unplug('test_ns', 'port_id')
- self.vip_plug_callback.assert_called_once_with('unplug',
- {'id': 'port_id'})
+ self.rpc_mock.unplug_vip_port.assert_called_once_with('port_id')
self.vif_driver.unplug('test_interface', namespace='test_ns')
def test_kill_pids_in_file(self):
file_mock.__iter__.return_value = iter(['123'])
path_exists.return_value = False
- namespace_driver.kill_pids_in_file('sudo', 'test_path')
+ namespace_driver.kill_pids_in_file('sudo_test', 'test_path')
path_exists.assert_called_once_with('test_path')
self.assertFalse(mock_open.called)
self.assertFalse(mock_execute.called)
path_exists.return_value = True
mock_execute.side_effect = RuntimeError
- namespace_driver.kill_pids_in_file('sudo', 'test_path')
+ namespace_driver.kill_pids_in_file('sudo_test', 'test_path')
self.assertTrue(mock_log.called)
mock_execute.assert_called_once_with(
- ['kill', '-9', '123'], 'sudo')
+ ['kill', '-9', '123'], 'sudo_test')
def test_get_state_file_path(self):
with mock.patch('os.makedirs') as mkdir:
path = self.driver._get_state_file_path('pool_id', 'conf')
self.assertEqual('/the/path/pool_id/conf', path)
mkdir.assert_called_once_with('/the/path/pool_id', 0o755)
+
+ def test_deploy_instance(self):
+ with mock.patch.object(self.driver, 'exists') as exists:
+ with mock.patch.object(self.driver, 'update') as update:
+ self.driver.deploy_instance(self.fake_config)
+ exists.assert_called_once_with(self.fake_config['pool']['id'])
+ update.assert_called_once_with(self.fake_config)
+
+ def test_deploy_instance_non_existing(self):
+ with mock.patch.object(self.driver, 'exists') as exists:
+ with mock.patch.object(self.driver, 'create') as create:
+ exists.return_value = False
+ self.driver.deploy_instance(self.fake_config)
+ exists.assert_called_once_with(self.fake_config['pool']['id'])
+ create.assert_called_once_with(self.fake_config)
+
+ def test_deploy_instance_vip_status_non_active(self):
+ with mock.patch.object(self.driver, 'exists') as exists:
+ self.fake_config['vip']['status'] = 'NON_ACTIVE'
+ self.driver.deploy_instance(self.fake_config)
+ self.assertFalse(exists.called)
+
+ def test_deploy_instance_vip_admin_state_down(self):
+ with mock.patch.object(self.driver, 'exists') as exists:
+ self.fake_config['vip']['admin_state_up'] = False
+ self.driver.deploy_instance(self.fake_config)
+ self.assertFalse(exists.called)
+
+ def test_deploy_instance_no_vip(self):
+ with mock.patch.object(self.driver, 'exists') as exists:
+ del self.fake_config['vip']
+ self.driver.deploy_instance(self.fake_config)
+ self.assertFalse(exists.called)
+
+ def test_refresh_device(self):
+ with mock.patch.object(self.driver, 'deploy_instance') as deploy:
+ pool_id = 'pool_id1'
+ self.driver._refresh_device(pool_id)
+ self.rpc_mock.get_logical_device.assert_called_once_with(pool_id)
+ deploy.assert_called_once_with(
+ self.rpc_mock.get_logical_device.return_value)
+
+ def test_create_vip(self):
+ with mock.patch.object(self.driver, '_refresh_device') as refresh:
+ self.driver.create_vip({'pool_id': '1'})
+ refresh.assert_called_once_with('1')
+
+ def test_update_vip(self):
+ with mock.patch.object(self.driver, '_refresh_device') as refresh:
+ self.driver.update_vip({}, {'pool_id': '1'})
+ refresh.assert_called_once_with('1')
+
+ def test_delete_vip(self):
+ with mock.patch.object(self.driver, 'undeploy_instance') as undeploy:
+ self.driver.delete_vip({'pool_id': '1'})
+ undeploy.assert_called_once_with('1')
+
+ def test_create_pool(self):
+ with mock.patch.object(self.driver, '_refresh_device') as refresh:
+ self.driver.create_pool({'id': '1'})
+ self.assertFalse(refresh.called)
+
+ def test_update_pool(self):
+ with mock.patch.object(self.driver, '_refresh_device') as refresh:
+ self.driver.update_pool({}, {'id': '1'})
+ refresh.assert_called_once_with('1')
+
+ def test_delete_pool_existing(self):
+ with mock.patch.object(self.driver, 'undeploy_instance') as undeploy:
+ with mock.patch.object(self.driver, 'exists') as exists:
+ exists.return_value = True
+ self.driver.delete_pool({'id': '1'})
+ undeploy.assert_called_once_with('1')
+
+ def test_delete_pool_non_existing(self):
+ with mock.patch.object(self.driver, 'undeploy_instance') as undeploy:
+ with mock.patch.object(self.driver, 'exists') as exists:
+ exists.return_value = False
+ self.driver.delete_pool({'id': '1'})
+ self.assertFalse(undeploy.called)
+
+ def test_create_member(self):
+ with mock.patch.object(self.driver, '_refresh_device') as refresh:
+ self.driver.create_member({'pool_id': '1'})
+ refresh.assert_called_once_with('1')
+
+ def test_update_member(self):
+ with mock.patch.object(self.driver, '_refresh_device') as refresh:
+ self.driver.update_member({}, {'pool_id': '1'})
+ refresh.assert_called_once_with('1')
+
+ def test_delete_member(self):
+ with mock.patch.object(self.driver, '_refresh_device') as refresh:
+ self.driver.delete_member({'pool_id': '1'})
+ refresh.assert_called_once_with('1')
+
+ def test_create_pool_health_monitor(self):
+ with mock.patch.object(self.driver, '_refresh_device') as refresh:
+ self.driver.create_pool_health_monitor('', '1')
+ refresh.assert_called_once_with('1')
+
+ def test_update_pool_health_monitor(self):
+ with mock.patch.object(self.driver, '_refresh_device') as refresh:
+ self.driver.update_pool_health_monitor('', '', '1')
+ refresh.assert_called_once_with('1')
+
+ def test_delete_pool_health_monitor(self):
+ with mock.patch.object(self.driver, '_refresh_device') as refresh:
+ self.driver.delete_pool_health_monitor('', '1')
+ refresh.assert_called_once_with('1')
#
# @author: Mark McClain, DreamHost
+import contextlib
+
import mock
from webob import exc
test_db_loadbalancer.LoadBalancerPluginDbTestCase):
def setUp(self):
+ def reset_device_driver():
+ plugin_driver.AgentBasedPluginDriver.device_driver = None
+ self.addCleanup(reset_device_driver)
+
+ self.mock_importer = mock.patch.object(
+ plugin_driver, 'importutils').start()
+ self.addCleanup(mock.patch.stopall)
+
# needed to reload provider configuration
st_db.ServiceTypeManager._instance = None
+ plugin_driver.AgentBasedPluginDriver.device_driver = 'dummy'
super(TestLoadBalancerPluginBase, self).setUp(
lbaas_provider=('LOADBALANCER:lbaas:neutron.services.'
'loadbalancer.drivers.haproxy.plugin_driver.'
- 'HaproxyOnHostPluginDriver:default'))
- # create another API instance to make testing easier
- # pass a mock to our API instance
+ 'AgentBasedPluginDriver:default'))
# we need access to loaded plugins to modify models
loaded_plugins = manager.NeutronManager().get_service_plugins()
'.LbaasAgentSchedulerDbMixin.get_lbaas_agents')
get_lbaas_agents_patcher.start()
- # mocking plugin_driver create_pool() as it does nothing more than
- # pool scheduling which is beyond the scope of this test case
- mock.patch('neutron.services.loadbalancer.drivers.haproxy'
- '.plugin_driver.HaproxyOnHostPluginDriver'
- '.create_pool').start()
-
self.addCleanup(mock.patch.stopall)
def test_get_ready_devices(self):
{'id': pools[1].id},
{'id': pools[2].id}]}
ready = self.callbacks.get_ready_devices(ctx)
- self.assertEqual(len(ready), 2)
+ self.assertEqual(len(ready), 3)
self.assertIn(pools[0].id, ready)
self.assertIn(pools[1].id, ready)
- self.assertNotIn(pools[2].id, ready)
+ self.assertIn(pools[2].id, ready)
# cleanup
ctx.session.query(ldb.Pool).delete()
ctx.session.query(ldb.Vip).delete()
ready = self.callbacks.get_ready_devices(
context.get_admin_context(),
)
- self.assertFalse(ready)
+ self.assertEqual([vip['vip']['pool_id']], ready)
def test_get_ready_devices_inactive_pool(self):
with self.vip() as vip:
exceptions.Invalid,
self.callbacks.get_logical_device,
context.get_admin_context(),
- pool['pool']['id'],
- activate=False
- )
+ pool['pool']['id'])
- def test_get_logical_device_activate(self):
+ def test_get_logical_device_active(self):
with self.pool() as pool:
with self.vip(pool=pool) as vip:
with self.member(pool_id=vip['vip']['pool_id']) as member:
ctx = context.get_admin_context()
+ # activate objects
+ self.plugin_instance.update_status(
+ ctx, ldb.Pool, pool['pool']['id'], 'ACTIVE')
+ self.plugin_instance.update_status(
+ ctx, ldb.Member, member['member']['id'], 'ACTIVE')
+ self.plugin_instance.update_status(
+ ctx, ldb.Vip, vip['vip']['id'], 'ACTIVE')
# build the expected
port = self.plugin_instance._core_plugin.get_port(
'pool': pool,
'vip': vip['vip'],
'members': [member['member']],
- 'healthmonitors': []
+ 'healthmonitors': [],
+ 'driver': 'dummy'
}
logical_config = self.callbacks.get_logical_device(
- ctx, pool['id'], activate=True
+ ctx, pool['id']
)
self.assertEqual(logical_config, expected)
'INACTIVE')
logical_config = self.callbacks.get_logical_device(
- ctx, pool['pool']['id'], activate=False)
+ ctx, pool['pool']['id'])
member['member']['status'] = constants.INACTIVE
self.assertEqual([member['member']],
host='host'
)
+ def test_pool_deployed(self):
+ with self.pool() as pool:
+ with self.vip(pool=pool) as vip:
+ with self.member(pool_id=vip['vip']['pool_id']) as member:
+ ctx = context.get_admin_context()
+ p = self.plugin_instance.get_pool(ctx, pool['pool']['id'])
+ self.assertEqual('PENDING_CREATE', p['status'])
+ v = self.plugin_instance.get_vip(ctx, vip['vip']['id'])
+ self.assertEqual('PENDING_CREATE', v['status'])
+ m = self.plugin_instance.get_member(
+ ctx, member['member']['id'])
+ self.assertEqual('PENDING_CREATE', m['status'])
+
+ self.callbacks.pool_deployed(ctx, pool['pool']['id'])
+
+ p = self.plugin_instance.get_pool(ctx, pool['pool']['id'])
+ self.assertEqual('ACTIVE', p['status'])
+ v = self.plugin_instance.get_vip(ctx, vip['vip']['id'])
+ self.assertEqual('ACTIVE', v['status'])
+ m = self.plugin_instance.get_member(
+ ctx, member['member']['id'])
+ self.assertEqual('ACTIVE', m['status'])
+
+ def test_update_status_pool(self):
+ with self.pool() as pool:
+ pool_id = pool['pool']['id']
+ ctx = context.get_admin_context()
+ p = self.plugin_instance.get_pool(ctx, pool_id)
+ self.assertEqual('PENDING_CREATE', p['status'])
+ self.callbacks.update_status(ctx, 'pool', pool_id, 'ACTIVE')
+ p = self.plugin_instance.get_pool(ctx, pool_id)
+ self.assertEqual('ACTIVE', p['status'])
+
+ def test_update_status_health_monitor(self):
+ with contextlib.nested(
+ self.pool(),
+ self.health_monitor()
+ ) as (pool, hm):
+ pool_id = pool['pool']['id']
+ ctx = context.get_admin_context()
+ self.plugin_instance.create_pool_health_monitor(ctx, hm, pool_id)
+ hm_id = hm['health_monitor']['id']
+ h = self.plugin_instance.get_pool_health_monitor(ctx, hm_id,
+ pool_id)
+ self.assertEqual('PENDING_CREATE', h['status'])
+ self.callbacks.update_status(
+ ctx, 'health_monitor',
+ {'monitor_id': hm_id, 'pool_id': pool_id}, 'ACTIVE')
+ h = self.plugin_instance.get_pool_health_monitor(ctx, hm_id,
+ pool_id)
+ self.assertEqual('ACTIVE', h['status'])
+
class TestLoadBalancerAgentApi(base.BaseTestCase):
def setUp(self):
def test_init(self):
self.assertEqual(self.api.topic, 'topic')
- def _call_test_helper(self, method_name):
- rv = getattr(self.api, method_name)(mock.sentinel.context, 'test',
- 'host')
+ def _call_test_helper(self, method_name, method_args):
+ rv = getattr(self.api, method_name)(mock.sentinel.context,
+ host='host',
+ **method_args)
self.assertEqual(rv, self.mock_cast.return_value)
self.mock_cast.assert_called_once_with(
mock.sentinel.context,
self.mock_msg.return_value,
- topic='topic.host'
+ topic='topic.host',
+ version=None
)
+ if method_name == 'agent_updated':
+ method_args = {'payload': method_args}
self.mock_msg.assert_called_once_with(
method_name,
- pool_id='test',
- host='host'
+ **method_args
)
- def test_reload_pool(self):
- self._call_test_helper('reload_pool')
+ def test_agent_updated(self):
+ self._call_test_helper('agent_updated', {'admin_state_up': 'test'})
- def test_destroy_pool(self):
- self._call_test_helper('destroy_pool')
+ def test_create_pool(self):
+ self._call_test_helper('create_pool', {'pool': 'test',
+ 'driver_name': 'dummy'})
- def test_modify_pool(self):
- self._call_test_helper('modify_pool')
+ def test_update_pool(self):
+ self._call_test_helper('update_pool', {'old_pool': 'test',
+ 'pool': 'test'})
- def test_agent_updated(self):
- rv = self.api.agent_updated(mock.sentinel.context, True, 'host')
- self.assertEqual(rv, self.mock_cast.return_value)
- self.mock_cast.assert_called_once_with(
- mock.sentinel.context,
- self.mock_msg.return_value,
- topic='topic.host',
- version='1.1'
- )
+ def test_delete_pool(self):
+ self._call_test_helper('delete_pool', {'pool': 'test'})
- self.mock_msg.assert_called_once_with(
- 'agent_updated',
- payload={'admin_state_up': True}
- )
+ def test_create_vip(self):
+ self._call_test_helper('create_vip', {'vip': 'test'})
+
+ def test_update_vip(self):
+ self._call_test_helper('update_vip', {'old_vip': 'test',
+ 'vip': 'test'})
+
+ def test_delete_vip(self):
+ self._call_test_helper('delete_vip', {'vip': 'test'})
+
+ def test_create_member(self):
+ self._call_test_helper('create_member', {'member': 'test'})
+
+ def test_update_member(self):
+ self._call_test_helper('update_member', {'old_member': 'test',
+ 'member': 'test'})
+
+ def test_delete_member(self):
+ self._call_test_helper('delete_member', {'member': 'test'})
+
+ def test_create_monitor(self):
+ self._call_test_helper('create_pool_health_monitor',
+ {'health_monitor': 'test', 'pool_id': 'test'})
+
+ def test_update_monitor(self):
+ self._call_test_helper('update_pool_health_monitor',
+ {'old_health_monitor': 'test',
+ 'health_monitor': 'test',
+ 'pool_id': 'test'})
+
+ def test_delete_monitor(self):
+ self._call_test_helper('delete_pool_health_monitor',
+ {'health_monitor': 'test', 'pool_id': 'test'})
class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
super(TestLoadBalancerPluginNotificationWrapper, self).setUp()
self.mock_api = api_cls.return_value
- # mocking plugin_driver create_pool() as it does nothing more than
- # pool scheduling which is beyond the scope of this test case
- mock.patch('neutron.services.loadbalancer.drivers.haproxy'
- '.plugin_driver.HaproxyOnHostPluginDriver'
- '.create_pool').start()
-
self.mock_get_driver = mock.patch.object(self.plugin_instance,
'_get_driver')
self.mock_get_driver.return_value = (plugin_driver.
- HaproxyOnHostPluginDriver(
+ AgentBasedPluginDriver(
self.plugin_instance
))
with self.subnet() as subnet:
with self.pool(subnet=subnet) as pool:
with self.vip(pool=pool, subnet=subnet) as vip:
- self.mock_api.reload_pool.assert_called_once_with(
+ self.mock_api.create_vip.assert_called_once_with(
mock.ANY,
- vip['vip']['pool_id'],
+ vip['vip'],
'host'
)
with self.subnet() as subnet:
with self.pool(subnet=subnet) as pool:
with self.vip(pool=pool, subnet=subnet) as vip:
- self.mock_api.reset_mock()
ctx = context.get_admin_context()
+ old_vip = vip['vip'].copy()
vip['vip'].pop('status')
new_vip = self.plugin_instance.update_vip(
ctx,
vip
)
- self.mock_api.reload_pool.assert_called_once_with(
+ self.mock_api.update_vip.assert_called_once_with(
mock.ANY,
- vip['vip']['pool_id'],
+ old_vip,
+ new_vip,
'host'
)
with self.subnet() as subnet:
with self.pool(subnet=subnet) as pool:
with self.vip(pool=pool, subnet=subnet, no_delete=True) as vip:
- self.mock_api.reset_mock()
ctx = context.get_admin_context()
self.plugin_instance.delete_vip(ctx, vip['vip']['id'])
- self.mock_api.destroy_pool.assert_called_once_with(
+ vip['vip']['status'] = 'PENDING_DELETE'
+ self.mock_api.delete_vip.assert_called_once_with(
mock.ANY,
- vip['vip']['pool_id'],
+ vip['vip'],
'host'
)
def test_create_pool(self):
- with self.pool():
- self.assertFalse(self.mock_api.reload_pool.called)
- self.assertFalse(self.mock_api.modify_pool.called)
- self.assertFalse(self.mock_api.destroy_pool.called)
+ with self.pool() as pool:
+ self.mock_api.create_pool.assert_called_once_with(
+ mock.ANY,
+ pool['pool'],
+ mock.ANY,
+ 'dummy'
+ )
def test_update_pool_non_active(self):
with self.pool() as pool:
pool['pool']['status'] = 'INACTIVE'
ctx = context.get_admin_context()
+ orig_pool = pool['pool'].copy()
del pool['pool']['provider']
self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool)
- self.mock_api.destroy_pool.assert_called_once_with(
- mock.ANY, pool['pool']['id'], 'host')
- self.assertFalse(self.mock_api.reload_pool.called)
- self.assertFalse(self.mock_api.modify_pool.called)
+ self.mock_api.delete_pool.assert_called_once_with(
+ mock.ANY, orig_pool, 'host')
def test_update_pool_no_vip_id(self):
with self.pool() as pool:
ctx = context.get_admin_context()
+ orig_pool = pool['pool'].copy()
del pool['pool']['provider']
- self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool)
- self.assertFalse(self.mock_api.destroy_pool.called)
- self.assertFalse(self.mock_api.reload_pool.called)
- self.assertFalse(self.mock_api.modify_pool.called)
+ updated = self.plugin_instance.update_pool(
+ ctx, pool['pool']['id'], pool)
+ self.mock_api.update_pool.assert_called_once_with(
+ mock.ANY, orig_pool, updated, 'host')
def test_update_pool_with_vip_id(self):
with self.pool() as pool:
- with self.vip(pool=pool):
+ with self.vip(pool=pool) as vip:
ctx = context.get_admin_context()
+ old_pool = pool['pool'].copy()
+ old_pool['vip_id'] = vip['vip']['id']
del pool['pool']['provider']
- self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool)
- self.mock_api.reload_pool.assert_called_once_with(
- mock.ANY, pool['pool']['id'], 'host')
- self.assertFalse(self.mock_api.destroy_pool.called)
- self.assertFalse(self.mock_api.modify_pool.called)
+ updated = self.plugin_instance.update_pool(
+ ctx, pool['pool']['id'], pool)
+ self.mock_api.update_pool.assert_called_once_with(
+ mock.ANY, old_pool, updated, 'host')
def test_delete_pool(self):
with self.pool(no_delete=True) as pool:
pool['pool']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, exc.HTTPNoContent.code)
- self.mock_api.destroy_pool.assert_called_once_with(
- mock.ANY, pool['pool']['id'], 'host')
+ pool['pool']['status'] = 'PENDING_DELETE'
+ self.mock_api.delete_pool.assert_called_once_with(
+ mock.ANY, pool['pool'], 'host')
def test_create_member(self):
with self.pool() as pool:
pool_id = pool['pool']['id']
- with self.member(pool_id=pool_id):
- self.mock_api.modify_pool.assert_called_once_with(
- mock.ANY, pool_id, 'host')
+ with self.member(pool_id=pool_id) as member:
+ self.mock_api.create_member.assert_called_once_with(
+ mock.ANY, member['member'], 'host')
def test_update_member(self):
with self.pool() as pool:
pool_id = pool['pool']['id']
with self.member(pool_id=pool_id) as member:
ctx = context.get_admin_context()
- self.mock_api.modify_pool.reset_mock()
- self.plugin_instance.update_member(
+ updated = self.plugin_instance.update_member(
ctx, member['member']['id'], member)
- self.mock_api.modify_pool.assert_called_once_with(
- mock.ANY, pool_id, 'host')
+ self.mock_api.update_member.assert_called_once_with(
+ mock.ANY, member['member'], updated, 'host')
def test_update_member_new_pool(self):
with self.pool() as pool1:
with self.pool() as pool2:
pool2_id = pool2['pool']['id']
with self.member(pool_id=pool1_id) as member:
+ self.mock_api.create_member.reset_mock()
ctx = context.get_admin_context()
- self.mock_api.modify_pool.reset_mock()
+ old_member = member['member'].copy()
member['member']['pool_id'] = pool2_id
- self.plugin_instance.update_member(ctx,
- member['member']['id'],
- member)
- self.assertEqual(2, self.mock_api.modify_pool.call_count)
- self.mock_api.modify_pool.assert_has_calls(
- [mock.call(mock.ANY, pool1_id, 'host'),
- mock.call(mock.ANY, pool2_id, 'host')])
+ updated = self.plugin_instance.update_member(
+ ctx, member['member']['id'], member)
+ self.mock_api.delete_member.assert_called_once_with(
+ mock.ANY, old_member, 'host')
+ self.mock_api.create_member.assert_called_once_with(
+ mock.ANY, updated, 'host')
def test_delete_member(self):
with self.pool() as pool:
pool_id = pool['pool']['id']
with self.member(pool_id=pool_id,
no_delete=True) as member:
- self.mock_api.modify_pool.reset_mock()
req = self.new_delete_request('members',
member['member']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(res.status_int, exc.HTTPNoContent.code)
- self.mock_api.modify_pool.assert_called_once_with(
- mock.ANY, pool_id, 'host')
+ member['member']['status'] = 'PENDING_DELETE'
+ self.mock_api.delete_member.assert_called_once_with(
+ mock.ANY, member['member'], 'host')
def test_create_pool_health_monitor(self):
- with self.pool() as pool:
+ with contextlib.nested(
+ self.pool(),
+ self.health_monitor()
+ ) as (pool, hm):
pool_id = pool['pool']['id']
- with self.health_monitor() as hm:
- ctx = context.get_admin_context()
- self.plugin_instance.create_pool_health_monitor(ctx,
- hm,
- pool_id)
- self.mock_api.modify_pool.assert_called_once_with(
- mock.ANY, pool_id, 'host')
+ ctx = context.get_admin_context()
+ self.plugin_instance.create_pool_health_monitor(ctx, hm, pool_id)
+ # hm now has a ref to the pool with which it is associated
+ hm = self.plugin.get_health_monitor(
+ ctx, hm['health_monitor']['id'])
+ self.mock_api.create_pool_health_monitor.assert_called_once_with(
+ mock.ANY, hm, pool_id, 'host')
def test_delete_pool_health_monitor(self):
- with self.pool() as pool:
+ with contextlib.nested(
+ self.pool(),
+ self.health_monitor()
+ ) as (pool, hm):
pool_id = pool['pool']['id']
- with self.health_monitor() as hm:
- ctx = context.get_admin_context()
- self.plugin_instance.create_pool_health_monitor(ctx,
- hm,
- pool_id)
- self.mock_api.modify_pool.reset_mock()
- self.plugin_instance.delete_pool_health_monitor(
- ctx, hm['health_monitor']['id'], pool_id)
- self.mock_api.modify_pool.assert_called_once_with(
- mock.ANY, pool_id, 'host')
+ ctx = context.get_admin_context()
+ self.plugin_instance.create_pool_health_monitor(ctx, hm, pool_id)
+ # hm now has a ref to the pool with which it is associated
+ hm = self.plugin.get_health_monitor(
+ ctx, hm['health_monitor']['id'])
+ hm['pools'][0]['status'] = 'PENDING_DELETE'
+ self.plugin_instance.delete_pool_health_monitor(
+ ctx, hm['id'], pool_id)
+ self.mock_api.delete_pool_health_monitor.assert_called_once_with(
+ mock.ANY, hm, pool_id, 'host')
def test_update_health_monitor_associated_with_pool(self):
- with self.health_monitor(type='HTTP') as monitor:
- with self.pool() as pool:
- data = {
- 'health_monitor': {
- 'id': monitor['health_monitor']['id'],
- 'tenant_id': self._tenant_id
- }
+ with contextlib.nested(
+ self.health_monitor(type='HTTP'),
+ self.pool()
+ ) as (monitor, pool):
+ data = {
+ 'health_monitor': {
+ 'id': monitor['health_monitor']['id'],
+ 'tenant_id': self._tenant_id
}
- req = self.new_create_request(
- 'pools',
- data,
- fmt=self.fmt,
- id=pool['pool']['id'],
- subresource='health_monitors')
- res = req.get_response(self.ext_api)
- self.assertEqual(res.status_int, exc.HTTPCreated.code)
- self.mock_api.modify_pool.assert_called_once_with(
- mock.ANY,
- pool['pool']['id'],
- 'host'
- )
+ }
+ req = self.new_create_request(
+ 'pools',
+ data,
+ fmt=self.fmt,
+ id=pool['pool']['id'],
+ subresource='health_monitors')
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, exc.HTTPCreated.code)
+ # hm now has a ref to the pool with which it is associated
+ ctx = context.get_admin_context()
+ hm = self.plugin.get_health_monitor(
+ ctx, monitor['health_monitor']['id'])
+ self.mock_api.create_pool_health_monitor.assert_called_once_with(
+ mock.ANY,
+ hm,
+ pool['pool']['id'],
+ 'host'
+ )
- self.mock_api.reset_mock()
- data = {'health_monitor': {'delay': 20,
- 'timeout': 20,
- 'max_retries': 2,
- 'admin_state_up': False}}
- req = self.new_update_request("health_monitors",
- data,
- monitor['health_monitor']['id'])
- req.get_response(self.ext_api)
- self.mock_api.modify_pool.assert_called_once_with(
- mock.ANY,
- pool['pool']['id'],
- 'host'
- )
+ self.mock_api.reset_mock()
+ data = {'health_monitor': {'delay': 20,
+ 'timeout': 20,
+ 'max_retries': 2,
+ 'admin_state_up': False}}
+ updated = hm.copy()
+ updated.update(data['health_monitor'])
+ req = self.new_update_request("health_monitors",
+ data,
+ monitor['health_monitor']['id'])
+ req.get_response(self.ext_api)
+ self.mock_api.update_pool_health_monitor.assert_called_once_with(
+ mock.ANY,
+ hm,
+ updated,
+ pool['pool']['id'],
+ 'host')
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron import context
-from neutron.db import servicetype_db as sdb
+from neutron.db import servicetype_db as st_db
from neutron.extensions import agent
from neutron.extensions import lbaas_agentscheduler
from neutron import manager
'HaproxyOnHostPluginDriver:default')],
'service_providers')
- #force service type manager to reload configuration:
- sdb.ServiceTypeManager._instance = None
+ # need to reload provider configuration
+ st_db.ServiceTypeManager._instance = None
super(LBaaSAgentSchedulerTestCase, self).setUp(
self.plugin_str, service_plugins=service_plugins)
'binary': 'neutron-loadbalancer-agent',
'host': LBAAS_HOSTA,
'topic': 'LOADBALANCER_AGENT',
- 'configurations': {'device_driver': 'device_driver',
- 'interface_driver': 'interface_driver'},
+ 'configurations': {'device_drivers': ['haproxy_ns']},
'agent_type': constants.AGENT_TYPE_LOADBALANCER}
self._register_one_agent_state(lbaas_hosta)
with self.pool() as pool:
'binary': 'neutron-loadbalancer-agent',
'host': LBAAS_HOSTA,
'topic': 'LOADBALANCER_AGENT',
- 'configurations': {'device_driver': 'device_driver',
- 'interface_driver': 'interface_driver'},
+ 'configurations': {'device_drivers': ['haproxy_ns']},
'agent_type': constants.AGENT_TYPE_LOADBALANCER}
self._register_one_agent_state(lbaas_hosta)
is_agent_down_str = 'neutron.db.agents_db.AgentDbMixin.is_agent_down'
'binary': 'neutron-loadbalancer-agent',
'host': LBAAS_HOSTA,
'topic': 'LOADBALANCER_AGENT',
- 'configurations': {'device_driver': 'device_driver',
- 'interface_driver': 'interface_driver',
- },
+ 'configurations': {'device_drivers': ['haproxy_ns']},
'agent_type': constants.AGENT_TYPE_LOADBALANCER}
lbaas_hostb = copy.deepcopy(lbaas_hosta)
lbaas_hostb['host'] = LBAAS_HOSTB