--- /dev/null
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 Openstack, LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+import sys
+sys.path.insert(0, os.getcwd())
+
+from quantum.plugins.services.agent_loadbalancer.agent import main
+
+
+main()
--- /dev/null
+[DEFAULT]
+# Show debugging output in log (sets DEBUG log level output)
+# debug = true
+
+# The LBaaS agent will resync its state with Quantum to recover from any
+# transient notification or rpc errors. The interval is number of
+# seconds between attempts.
+# periodic_interval = 10
+
+# OVS based plugins(OVS, Ryu, NEC, NVP, BigSwitch/Floodlight)
+interface_driver = quantum.agent.linux.interface.OVSInterfaceDriver
+# OVS based plugins(Ryu, NEC, NVP, BigSwitch/Floodlight) that use OVS
+# as OpenFlow switch and check port status
+# ovs_use_veth = True
+# LinuxBridge
+# interface_driver = quantum.agent.linux.interface.BridgeInterfaceDriver
+
+# The agent requires a driver to manage the loadbalancer. HAProxy is the
+# opensource version.
+device_driver = quantum.plugins.services.agent_loadbalancer.drivers.haproxy.namespace_driver.HaproxyNSDriver
+
+# Allow overlapping IP (Must have kernel build with CONFIG_NET_NS=y and
+# iproute2 package that supports namespaces).
+# use_namespaces = True
--- /dev/null
+# quantum-rootwrap command filters for nodes on which quantum is
+# expected to control network
+#
+# This file should be owned by (and only-writeable by) the root user
+
+# format seems to be
+# cmd-name: filter-name, raw-command, user, args
+
+[Filters]
+
+# haproxy
+haproxy: CommandFilter, /usr/sbin/haproxy, root
+
+# lbaas-agent uses kill as well, that's handled by the generic KillFilter
+kill_haproxy_usr: KillFilter, root, /usr/sbin/haproxy, -9, -HUP
+
+# lbaas-agent uses cat
+cat: RegExpFilter, /bin/cat, root, cat, /proc/\d+/cmdline
+
+ovs-vsctl: CommandFilter, /bin/ovs-vsctl, root
+ovs-vsctl_usr: CommandFilter, /usr/bin/ovs-vsctl, root
+ovs-vsctl_sbin: CommandFilter, /sbin/ovs-vsctl, root
+ovs-vsctl_sbin_usr: CommandFilter, /usr/sbin/ovs-vsctl, root
+
+# ip_lib
+ip: IpFilter, /sbin/ip, root
+ip_usr: IpFilter, /usr/sbin/ip, root
+ip_exec: IpNetnsExecFilter, /sbin/ip, root
+ip_exec_usr: IpNetnsExecFilter, /usr/sbin/ip, root
import socket
import StringIO
import sys
-import tempfile
import netaddr
from oslo.config import cfg
def interface_name(self, value):
interface_file_path = self.get_conf_file_name('interface',
ensure_conf_dir=True)
- replace_file(interface_file_path, value)
+ utils.replace_file(interface_file_path, value)
@abc.abstractmethod
def spawn_process(self):
(port.mac_address, name, alloc.ip_address))
name = self.get_conf_file_name('host')
- replace_file(name, buf.getvalue())
+ utils.replace_file(name, buf.getvalue())
return name
def _output_opts_file(self):
options.append(self._format_option(i, 'router'))
name = self.get_conf_file_name('opts')
- replace_file(name, '\n'.join(options))
+ utils.replace_file(name, '\n'.join(options))
return name
def _make_subnet_interface_ip_map(self):
sock.connect(dhcp_relay_socket)
sock.send(jsonutils.dumps(data))
sock.close()
-
-
-def replace_file(file_name, data):
- """Replaces the contents of file_name with data in a safe manner.
-
- First write to a temp file and then rename. Since POSIX renames are
- atomic, the file is unlikely to be corrupted by competing writes.
-
- We create the tempfile on the same device to ensure that it can be renamed.
- """
-
- base_dir = os.path.dirname(os.path.abspath(file_name))
- tmp_file = tempfile.NamedTemporaryFile('w+', dir=base_dir, delete=False)
- tmp_file.write(data)
- tmp_file.close()
- os.chmod(tmp_file.name, 0644)
- os.rename(tmp_file.name, file_name)
import shlex
import socket
import struct
+import tempfile
from eventlet.green import subprocess
struct.pack('256s', interface[:DEVICE_NAME_LEN]))
return ''.join(['%02x:' % ord(char)
for char in info[MAC_START:MAC_END]])[:-1]
+
+
+def replace_file(file_name, data):
+ """Replaces the contents of file_name with data in a safe manner.
+
+ First write to a temp file and then rename. Since POSIX renames are
+ atomic, the file is unlikely to be corrupted by competing writes.
+
+ We create the tempfile on the same device to ensure that it can be renamed.
+ """
+
+ base_dir = os.path.dirname(os.path.abspath(file_name))
+ tmp_file = tempfile.NamedTemporaryFile('w+', dir=base_dir, delete=False)
+ tmp_file.write(data)
+ tmp_file.close()
+ os.chmod(tmp_file.name, 0644)
+ os.rename(tmp_file.name, file_name)
AGENT = 'q-agent-notifier'
PLUGIN = 'q-plugin'
DHCP = 'q-dhcp-notifer'
+LOADBALANCER_PLUGIN = 'q-loadbalancer-plugin'
L3_AGENT = 'l3_agent'
DHCP_AGENT = 'dhcp_agent'
+LOADBALANCER_AGENT = 'loadbalancer_agent'
def get_topic_name(prefix, table, operation):
protocol_port = sa.Column(sa.Integer, nullable=False)
protocol = sa.Column(sa.Enum("HTTP", "HTTPS", "TCP", name="lb_protocols"),
nullable=False)
- pool_id = sa.Column(sa.String(36), nullable=False)
+ pool_id = sa.Column(sa.String(36), nullable=False, unique=True)
session_persistence = orm.relationship(SessionPersistence,
uselist=False,
backref="vips",
cascade="all, delete-orphan")
monitors = orm.relationship("PoolMonitorAssociation", backref="pools",
cascade="all, delete-orphan")
+ vip = orm.relationship(Vip, backref='pool')
class HealthMonitor(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
raise
return r
+ def assert_modification_allowed(self, obj):
+ status = getattr(obj, 'status', None)
+
+ if status == constants.PENDING_DELETE:
+ raise loadbalancer.StateInvalid(id=id, state=status)
+
########################################################
# VIP DB access
def _make_vip_dict(self, vip, fields=None):
return self._fields(res, fields)
- def _update_pool_vip_info(self, context, pool_id, vip_id):
- pool_db = self._get_resource(context, Pool, pool_id)
- with context.session.begin(subtransactions=True):
- pool_db.update({'vip_id': vip_id})
-
def _check_session_persistence_info(self, info):
""" Performs sanity check on session persistence info.
:param info: Session persistence info
tenant_id = self._get_tenant_id_for_create(context, v)
with context.session.begin(subtransactions=True):
+ # validate that the pool has same tenant
+ if v['pool_id']:
+ pool = self._get_resource(context, Pool, v['pool_id'])
+ if pool['tenant_id'] != tenant_id:
+ raise q_exc.NotAuthorized()
+ else:
+ pool = None
+
vip_db = Vip(id=uuidutils.generate_uuid(),
tenant_id=tenant_id,
name=v['name'],
admin_state_up=v['admin_state_up'],
status=constants.PENDING_CREATE)
- vip_id = vip_db['id']
session_info = v['session_persistence']
if session_info:
- s_p = self._create_session_persistence_db(session_info, vip_id)
+ s_p = self._create_session_persistence_db(
+ session_info,
+ vip_db['id'])
vip_db.session_persistence = s_p
context.session.add(vip_db)
context.session.flush()
+ # create a port to reserve address for IPAM
self._create_port_for_vip(
context,
vip_db,
v.get('address')
)
- self._update_pool_vip_info(context, v['pool_id'], vip_id)
+ if pool:
+ pool['vip_id'] = vip_db['id']
+
return self._make_vip_dict(vip_db)
def update_vip(self, context, id, vip):
sess_persist = v.pop('session_persistence', None)
with context.session.begin(subtransactions=True):
+ vip_db = self._get_resource(context, Vip, id)
+
+ self.assert_modification_allowed(vip_db)
+
if sess_persist:
self._update_vip_session_persistence(context, id, sess_persist)
else:
self._delete_session_persistence(context, id)
- vip_db = self._get_resource(context, Vip, id)
- old_pool_id = vip_db['pool_id']
if v:
vip_db.update(v)
# If the pool_id is changed, we need to update
# the associated pools
if 'pool_id' in v:
- self._update_pool_vip_info(context, old_pool_id, None)
- self._update_pool_vip_info(context, v['pool_id'], id)
+ new_pool = self._get_resource(context, Pool, v['pool_id'])
+ self.assert_modification_allowed(new_pool)
+
+ # check that the pool matches the tenant_id
+ if new_pool['tenant_id'] != vip_db['tenant_id']:
+ raise q_exc.NotAuthorized()
+
+ if vip_db['pool_id']:
+ old_pool = self._get_resource(
+ context,
+ Pool,
+ vip_db['pool_id']
+ )
+ old_pool['vip_id'] = None
+
+ new_pool['vip_id'] = vip_db['id']
return self._make_vip_dict(vip_db)
########################################################
# Pool DB access
- def _make_pool_dict(self, context, pool, fields=None):
+ def _make_pool_dict(self, pool, fields=None):
res = {'id': pool['id'],
'tenant_id': pool['tenant_id'],
'name': pool['name'],
return self._fields(res, fields)
- def _update_pool_member_info(self, context, pool_id, membersInfo):
- with context.session.begin(subtransactions=True):
- member_qry = context.session.query(Member)
- for member_id in membersInfo:
- try:
- member = member_qry.filter_by(id=member_id).one()
- member.update({'pool_id': pool_id})
- except exc.NoResultFound:
- raise loadbalancer.MemberNotFound(member_id=member_id)
-
def _create_pool_stats(self, context, pool_id):
# This is internal method to add pool statistics. It won't
# be exposed to API
context.session.add(pool_db)
pool_db = self._get_resource(context, Pool, pool_db['id'])
- return self._make_pool_dict(context, pool_db)
+ return self._make_pool_dict(pool_db)
def update_pool(self, context, id, pool):
- v = pool['pool']
+ p = pool['pool']
with context.session.begin(subtransactions=True):
pool_db = self._get_resource(context, Pool, id)
- if v:
- pool_db.update(v)
+ if p:
+ pool_db.update(p)
- return self._make_pool_dict(context, pool_db)
+ return self._make_pool_dict(pool_db)
def delete_pool(self, context, id):
# Check if the pool is in use
def get_pool(self, context, id, fields=None):
pool = self._get_resource(context, Pool, id)
- return self._make_pool_dict(context, pool, fields)
+ return self._make_pool_dict(pool, fields)
def get_pools(self, context, filters=None, fields=None):
collection = self._model_query(context, Pool)
collection = self._apply_filters_to_query(collection, Pool, filters)
- return [self._make_pool_dict(context, c, fields)
+ return [self._make_pool_dict(c, fields)
for c in collection.all()]
- def get_stats(self, context, pool_id):
+ def stats(self, context, pool_id):
with context.session.begin(subtransactions=True):
pool_qry = context.session.query(Pool)
try:
raise loadbalancer.HealthMonitorNotFound(monitor_id=id)
def get_pool_health_monitor(self, context, id, pool_id, fields=None):
+ # TODO(markmcclain) look into why pool_id is ignored
healthmonitor = self._get_resource(context, HealthMonitor, id)
return self._make_health_monitor_dict(healthmonitor, fields)
v = member['member']
with context.session.begin(subtransactions=True):
member_db = self._get_resource(context, Member, id)
- old_pool_id = member_db['pool_id']
if v:
member_db.update(v)
sa.Column(u'admin_state_up', sa.Boolean(), nullable=False),
sa.Column(u'connection_limit', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['port_id'], ['ports.id'], ),
+ sa.UniqueConstraint('pool_id'),
sa.PrimaryKeyConstraint(u'id')
)
op.create_table(
'is_visible': True},
'name': {'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
+ 'default': '',
'is_visible': True},
'description': {'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
'is_visible': True},
'name': {'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
+ 'default': '',
'is_visible': True},
'description': {'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Mark McClain, DreamHost
+
+import eventlet
+from oslo.config import cfg
+
+from quantum.agent.common import config
+from quantum.agent.linux import interface
+from quantum.common import topics
+from quantum.openstack.common.rpc import service as rpc_service
+from quantum.openstack.common import service
+from quantum.plugins.services.agent_loadbalancer.agent import manager
+
+
+OPTS = [
+ cfg.IntOpt(
+ 'periodic_interval',
+ default=10,
+ help=_('Seconds between periodic task runs')
+ )
+]
+
+
+class LbaasAgentService(rpc_service.Service):
+ def start(self):
+ super(LbaasAgentService, self).start()
+ self.tg.add_timer(
+ cfg.CONF.periodic_interval,
+ self.manager.run_periodic_tasks,
+ None,
+ None
+ )
+
+
+def main():
+ eventlet.monkey_patch()
+ cfg.CONF.register_opts(OPTS)
+ cfg.CONF.register_opts(manager.OPTS)
+ # import interface options just in case the driver uses namespaces
+ cfg.CONF.register_opts(interface.OPTS)
+ config.register_root_helper(cfg.CONF)
+
+ cfg.CONF(project='quantum')
+ config.setup_logging(cfg.CONF)
+
+ mgr = manager.LbaasAgentManager(cfg.CONF)
+ svc = LbaasAgentService(
+ host=cfg.CONF.host,
+ topic=topics.LOADBALANCER_AGENT,
+ manager=mgr
+ )
+ service.launch(svc).wait()
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Mark McClain, DreamHost
+
+from quantum.openstack.common.rpc import proxy
+
+
+class LbaasAgentApi(proxy.RpcProxy):
+ """Agent side of the Agent to Plugin RPC API."""
+
+ API_VERSION = '1.0'
+
+ def __init__(self, topic, context, host):
+ super(LbaasAgentApi, self).__init__(topic, self.API_VERSION)
+ self.context = context
+ self.host = host
+
+ def get_ready_devices(self):
+ return self.call(
+ self.context,
+ self.make_msg('get_ready_devices', host=self.host),
+ topic=self.topic
+ )
+
+ def get_logical_device(self, pool_id):
+ return self.call(
+ self.context,
+ self.make_msg(
+ 'get_logical_device',
+ pool_id=pool_id,
+ host=self.host
+ ),
+ topic=self.topic
+ )
+
+ def pool_destroyed(self, pool_id):
+ return self.call(
+ self.context,
+ self.make_msg('pool_destroyed', pool_id=pool_id, host=self.host),
+ topic=self.topic
+ )
+
+ def plug_vip_port(self, port_id):
+ return self.call(
+ self.context,
+ self.make_msg('plug_vip_port', port_id=port_id, host=self.host),
+ topic=self.topic
+ )
+
+ def unplug_vip_port(self, port_id):
+ return self.call(
+ self.context,
+ self.make_msg('unplug_vip_port', port_id=port_id, host=self.host),
+ topic=self.topic
+ )
+
+ def update_pool_stats(self, pool_id, stats):
+ return self.call(
+ self.context,
+ self.make_msg(
+ 'update_pool_stats',
+ pool_id=pool_id,
+ stats=stats,
+ host=self.host
+ ),
+ topic=self.topic
+ )
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Mark McClain, DreamHost
+
+import weakref
+
+from oslo.config import cfg
+
+from quantum.agent.common import config
+from quantum.common import topics
+from quantum import context
+from quantum.openstack.common import importutils
+from quantum.openstack.common import log as logging
+from quantum.openstack.common import periodic_task
+from quantum.plugins.services.agent_loadbalancer.agent import api
+
+LOG = logging.getLogger(__name__)
+NS_PREFIX = 'qlbaas-'
+
+OPTS = [
+ cfg.StrOpt(
+ 'device_driver',
+ help=_('The driver used to manage the loadbalancing device'),
+ ),
+ cfg.StrOpt(
+ 'loadbalancer_state_path',
+ default='$state_path/lbaas',
+ help=_('Location to store config and state files'),
+ ),
+ cfg.StrOpt(
+ 'interface_driver',
+ help=_('The driver used to manage the virtual interface')
+ )
+]
+
+
+class LogicalDeviceCache(object):
+ """Manage a cache of known devices."""
+
+ class Device(object):
+ """Inner classes used to hold values for weakref lookups"""
+ def __init__(self, port_id, pool_id):
+ self.port_id = port_id
+ self.pool_id = pool_id
+
+ def __eq__(self, other):
+ return self.__dict__ == other.__dict__
+
+ def __hash__(self):
+ return hash((self.port_id, self.pool_id))
+
+ def __init__(self):
+ self.devices = set()
+ self.port_lookup = weakref.WeakValueDictionary()
+ self.pool_lookup = weakref.WeakValueDictionary()
+
+ def put(self, device):
+ port_id = device['vip']['port_id']
+ pool_id = device['pool']['id']
+ d = self.Device(device['vip']['port_id'], device['pool']['id'])
+ if d not in self.devices:
+ self.devices.add(d)
+ self.port_lookup[port_id] = d
+ self.pool_lookup[pool_id] = d
+
+ def remove(self, device):
+ if not isinstance(device, self.Device):
+ device = self.Device(
+ device['vip']['port_id'], device['pool']['id']
+ )
+ if device in self.devices:
+ self.devices.remove(device)
+
+ def remove_by_pool_id(self, pool_id):
+ d = self.pool_lookup.get(pool_id)
+ if d:
+ self.devices.remove(d)
+
+ def get_by_pool_id(self, pool_id):
+ return self.pool_lookup.get(pool_id)
+
+ def get_by_port_id(self, port_id):
+ return self.port_lookup.get(port_id)
+
+ def get_pool_ids(self):
+ return self.pool_lookup.keys()
+
+
+class LbaasAgentManager(periodic_task.PeriodicTasks):
+ def __init__(self, conf):
+ self.conf = conf
+ try:
+ vif_driver = importutils.import_object(conf.interface_driver, conf)
+ except ImportError:
+ # the driver is optional
+ msg = _('Error importing interface driver: %s')
+ raise SystemExit(msg % conf.interface_driver)
+ vif_driver = None
+
+ try:
+ self.driver = importutils.import_object(
+ conf.device_driver,
+ config.get_root_helper(self.conf),
+ conf.loadbalancer_state_path,
+ vif_driver,
+ self._vip_plug_callback
+ )
+ except ImportError:
+ msg = _('Error importing loadbalancer device driver: %s')
+ raise SystemExit(msg % conf.device_driver)
+ ctx = context.get_admin_context_without_session()
+ self.plugin_rpc = api.LbaasAgentApi(
+ topics.LOADBALANCER_PLUGIN,
+ ctx,
+ conf.host
+ )
+ self.needs_resync = False
+ self.cache = LogicalDeviceCache()
+
+ def initialize_service_hook(self, started_by):
+ self.sync_state()
+
+ @periodic_task.periodic_task
+ def periodic_resync(self, context):
+ if self.needs_resync:
+ self.needs_resync = False
+ self.sync_state()
+
+ @periodic_task.periodic_task(ticks_between_runs=6)
+ def collect_stats(self, context):
+ for pool_id in self.cache.get_pool_ids():
+ try:
+ stats = self.driver.get_stats(pool_id)
+ if stats:
+ self.plugin_rpc.update_pool_stats(pool_id, stats)
+ except Exception:
+ LOG.exception(_('Error upating stats'))
+ self.needs_resync = True
+
+ def _vip_plug_callback(self, action, port):
+ if action == 'plug':
+ self.plugin_rpc.plug_vip_port(port['id'])
+ elif action == 'unplug':
+ self.plugin_rpc.unplug_vip_port(port['id'])
+
+ def sync_state(self):
+ known_devices = set(self.cache.get_pool_ids())
+ try:
+ ready_logical_devices = set(self.plugin_rpc.get_ready_devices())
+
+ for deleted_id in known_devices - ready_logical_devices:
+ self.destroy_device(deleted_id)
+
+ for pool_id in ready_logical_devices:
+ self.refresh_device(pool_id)
+
+ except Exception:
+ LOG.exception(_('Unable to retrieve ready devices'))
+ self.needs_resync = True
+
+ self.remove_orphans()
+
+ def refresh_device(self, pool_id):
+ try:
+ logical_config = self.plugin_rpc.get_logical_device(pool_id)
+
+ if self.driver.exists(pool_id):
+ self.driver.update(logical_config)
+ else:
+ self.driver.create(logical_config)
+ self.cache.put(logical_config)
+ except Exception:
+ LOG.exception(_('Unable to refresh device for pool: %s'), pool_id)
+ self.needs_resync = True
+
+ def destroy_device(self, pool_id):
+ device = self.cache.get_by_pool_id(pool_id)
+ if not device:
+ return
+ try:
+ self.driver.destroy(pool_id)
+ self.plugin_rpc.pool_destroyed(pool_id)
+ except Exception:
+ LOG.exception(_('Unable to destroy device for pool: %s'), pool_id)
+ self.needs_resync = True
+ self.cache.remove(device)
+
+ def remove_orphans(self):
+ try:
+ self.driver.remove_orphans(self.cache.get_pool_ids())
+ except NotImplementedError:
+ pass # Not all drivers will support this
+
+ def reload_pool(self, context, pool_id=None, host=None):
+ """Handle RPC cast from plugin to reload a pool."""
+ if pool_id:
+ self.refresh_device(pool_id)
+
+ def modify_pool(self, context, pool_id=None, host=None):
+ """Handle RPC cast from plugin to modify a pool if known to agent."""
+ if self.cache.get_by_pool_id(pool_id):
+ self.refresh_device(pool_id)
+
+ def destroy_pool(self, context, pool_id=None, host=None):
+ """Handle RPC cast from plugin to destroy a pool if known to agent."""
+ if self.cache.get_by_pool_id(pool_id):
+ self.destroy_device(pool_id)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Mirantis, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+LB_METHOD_ROUND_ROBIN = 'ROUND_ROBIN'
+LB_METHOD_LEAST_CONNECTIONS = 'LEAST_CONNECTIONS'
+LB_METHOD_SOURCE_IP = 'SOURCE_IP'
+
+PROTOCOL_TCP = 'TCP'
+PROTOCOL_HTTP = 'HTTP'
+PROTOCOL_HTTPS = 'HTTPS'
+
+HEALTH_MONITOR_PING = 'PING'
+HEALTH_MONITOR_TCP = 'TCP'
+HEALTH_MONITOR_HTTP = 'HTTP'
+HEALTH_MONITOR_HTTPS = 'HTTPS'
+
+SESSION_PERSISTENCE_SOURCE_IP = 'SOURCE_IP'
+SESSION_PERSISTENCE_HTTP_COOKIE = 'HTTP_COOKIE'
+SESSION_PERSISTENCE_APP_COOKIE = 'APP_COOKIE'
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Mark McClain, DreamHost
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Mark McClain, DreamHost
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Mark McClain, DreamHost
+
+import itertools
+
+from quantum.agent.linux import utils
+from quantum.plugins.common import constants as qconstants
+from quantum.plugins.services.agent_loadbalancer import constants
+
+
+PROTOCOL_MAP = {
+ constants.PROTOCOL_TCP: 'tcp',
+ constants.PROTOCOL_HTTP: 'http',
+ constants.PROTOCOL_HTTPS: 'tcp',
+}
+
+BALANCE_MAP = {
+ constants.LB_METHOD_ROUND_ROBIN: 'roundrobin',
+ constants.LB_METHOD_LEAST_CONNECTIONS: 'leastconn',
+ constants.LB_METHOD_SOURCE_IP: 'source'
+}
+
+ACTIVE = qconstants.ACTIVE
+
+
+def save_config(conf_path, logical_config, socket_path=None):
+ """Convert a logical configuration to the HAProxy version"""
+ data = []
+ data.extend(_build_global(logical_config, socket_path=socket_path))
+ data.extend(_build_defaults(logical_config))
+ data.extend(_build_frontend(logical_config))
+ data.extend(_build_backend(logical_config))
+ utils.replace_file(conf_path, '\n'.join(data))
+
+
+def _build_global(config, socket_path=None):
+ opts = [
+ 'daemon',
+ 'user nobody',
+ 'group nogroup',
+ 'log /dev/log local0',
+ 'log /dev/log local1 notice'
+ ]
+
+ if socket_path:
+ opts.append('stats socket %s mode 0666 level user' % socket_path)
+
+ return itertools.chain(['global'], ('\t' + o for o in opts))
+
+
+def _build_defaults(config):
+ opts = [
+ 'log global',
+ 'retries 3',
+ 'option redispatch',
+ 'timeout connect 5000',
+ 'timeout client 50000',
+ 'timeout server 50000',
+ ]
+
+ return itertools.chain(['defaults'], ('\t' + o for o in opts))
+
+
+def _build_frontend(config):
+ protocol = config['vip']['protocol']
+
+ opts = [
+ 'option tcplog',
+ 'bind %s:%d' % (
+ _get_first_ip_from_port(config['vip']['port']),
+ config['vip']['protocol_port']
+ ),
+ 'mode %s' % PROTOCOL_MAP[protocol],
+ 'default_backend %s' % config['pool']['id'],
+ ]
+
+ if config['vip']['connection_limit'] >= 0:
+ opts.append('maxconn %s' % config['vip']['connection_limit'])
+
+ if protocol == constants.PROTOCOL_HTTP:
+ opts.append('option forwardfor')
+
+ return itertools.chain(
+ ['frontend %s' % config['vip']['id']],
+ ('\t' + o for o in opts)
+ )
+
+
+def _build_backend(config):
+ protocol = config['pool']['protocol']
+ lb_method = config['pool']['lb_method']
+
+ opts = [
+ 'mode %s' % PROTOCOL_MAP[protocol],
+ 'balance %s' % BALANCE_MAP.get(lb_method, 'roundrobin')
+ ]
+
+ if protocol == constants.PROTOCOL_HTTP:
+ opts.append('option forwardfor')
+
+ # add the first health_monitor (if available)
+ server_addon, health_opts = _get_server_health_option(config)
+ opts.extend(health_opts)
+
+ # add the members
+ opts.extend(
+ (('server %(id)s %(address)s:%(protocol_port)s '
+ 'weight %(weight)s') % member) + server_addon
+ for member in config['members']
+ if (member['status'] == ACTIVE and member['admin_state_up'])
+ )
+
+ return itertools.chain(
+ ['backend %s' % config['pool']['id']],
+ ('\t' + o for o in opts)
+ )
+
+
+def _get_first_ip_from_port(port):
+ for fixed_ip in port['fixed_ips']:
+ return fixed_ip['ip_address']
+
+
+def _get_server_health_option(config):
+ """return the first active health option"""
+ for monitor in config['healthmonitors']:
+ if monitor['status'] == ACTIVE and monitor['admin_state_up']:
+ break
+ else:
+ return '', []
+
+ server_addon = ' check inter %(delay)ds fall %(max_retries)d' % monitor
+ opts = [
+ 'timeout check %ds' % monitor['timeout']
+ ]
+
+ if monitor['type'] in (constants.HEALTH_MONITOR_HTTP,
+ constants.HEALTH_MONITOR_HTTPS):
+ opts.append('option httpchk %(http_method)s %(url_path)s' % monitor)
+ opts.append(
+ 'http-check expect rstatus %s' %
+ '|'.join(_expand_expected_codes(monitor['expected_codes']))
+ )
+
+ if monitor['type'] == constants.HEALTH_MONITOR_HTTPS:
+ opts.append('option ssl-hello-chk')
+
+ return server_addon, opts
+
+
+def _expand_expected_codes(codes):
+ """Expand the expected code string in set of codes.
+
+ 200-204 -> 200, 201, 202, 204
+ 200, 203 -> 200, 203
+ """
+
+ retval = set()
+ for code in codes.replace(',', ' ').split(' '):
+ code = code.strip()
+
+ if not code:
+ continue
+ elif '-' in code:
+ low, hi = code.split('-')[:2]
+ retval.update(str(i) for i in xrange(int(low), int(hi)))
+ else:
+ retval.add(code)
+ return retval
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Mark McClain, DreamHost
+import os
+import shutil
+import socket
+
+import netaddr
+
+from quantum.agent.linux import ip_lib
+from quantum.common import exceptions
+from quantum.openstack.common import log as logging
+from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
+ cfg as hacfg
+)
+
+LOG = logging.getLogger(__name__)
+NS_PREFIX = 'qlbaas-'
+
+
+class HaproxyNSDriver(object):
+ def __init__(self, root_helper, state_path, vif_driver, vip_plug_callback):
+ self.root_helper = root_helper
+ self.state_path = state_path
+ self.vif_driver = vif_driver
+ self.vip_plug_callback = vip_plug_callback
+ self.pool_to_port_id = {}
+
+ def create(self, logical_config):
+ pool_id = logical_config['pool']['id']
+ namespace = get_ns_name(pool_id)
+
+ self._plug(namespace, logical_config['vip']['port'])
+ self._spawn(logical_config)
+
+ def update(self, logical_config):
+ pool_id = logical_config['pool']['id']
+ pid_path = self._get_state_file_path(pool_id, 'pid')
+
+ extra_args = ['-sf']
+ extra_args.extend(p.strip() for p in open(pid_path, 'r'))
+ self._spawn(logical_config, extra_args)
+
+ def _spawn(self, logical_config, extra_cmd_args=()):
+ pool_id = logical_config['pool']['id']
+ namespace = get_ns_name(pool_id)
+ conf_path = self._get_state_file_path(pool_id, 'conf')
+ pid_path = self._get_state_file_path(pool_id, 'pid')
+ sock_path = self._get_state_file_path(pool_id, 'sock')
+
+ hacfg.save_config(conf_path, logical_config, sock_path)
+ cmd = ['haproxy', '-f', conf_path, '-p', pid_path]
+ cmd.extend(extra_cmd_args)
+
+ ns = ip_lib.IPWrapper(self.root_helper, namespace)
+ ns.netns.execute(cmd)
+
+ # remember the pool<>port mapping
+ self.pool_to_port_id[pool_id] = logical_config['vip']['port']['id']
+
+ def destroy(self, pool_id):
+ namespace = get_ns_name(pool_id)
+ ns = ip_lib.IPWrapper(self.root_helper, namespace)
+ pid_path = self._get_state_file_path(pool_id, 'pid')
+ sock_path = self._get_state_file_path(pool_id, 'sock')
+
+ # kill the process
+ kill_pids_in_file(ns, pid_path)
+
+ # unplug the ports
+ if pool_id in self.pool_to_port_id:
+ self._unplug(namespace, self.pool_to_port_id[pool_id])
+
+ # remove the configuration directory
+ conf_dir = os.path.dirname(self._get_state_file_path(pool_id, ''))
+ if os.path.isdir(conf_dir):
+ shutil.rmtree(conf_dir)
+ ns.garbage_collect_namespace()
+
+ def exists(self, pool_id):
+ namespace = get_ns_name(pool_id)
+ root_ns = ip_lib.IPWrapper(self.root_helper)
+
+ socket_path = self._get_state_file_path(pool_id, 'sock')
+ if root_ns.netns.exists(namespace) and os.path.exists(socket_path):
+ try:
+ s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ s.connect(socket_path)
+ return True
+ except socket.error:
+ pass
+ return False
+
+ def get_stats(self, pool_id):
+ pass
+
+ def remove_orphans(self, known_pool_ids):
+ raise NotImplementedError()
+
+ def _get_state_file_path(self, pool_id, kind, ensure_state_dir=True):
+ """Returns the file name for a given kind of config file."""
+ confs_dir = os.path.abspath(os.path.normpath(self.state_path))
+ conf_dir = os.path.join(confs_dir, pool_id)
+ if ensure_state_dir:
+ if not os.path.isdir(conf_dir):
+ os.makedirs(conf_dir, 0755)
+ return os.path.join(conf_dir, kind)
+
+ def _plug(self, namespace, port, reuse_existing=True):
+ self.vip_plug_callback('plug', port)
+ interface_name = self.vif_driver.get_device_name(Wrap(port))
+
+ if ip_lib.device_exists(interface_name, self.root_helper, namespace):
+ if not reuse_existing:
+ raise exceptions.PreexistingDeviceFailure(
+ dev_name=interface_name
+ )
+ else:
+ self.vif_driver.plug(
+ port['network_id'],
+ port['id'],
+ interface_name,
+ port['mac_address'],
+ namespace=namespace
+ )
+
+ cidrs = [
+ '%s/%s' % (ip['ip_address'],
+ netaddr.IPNetwork(ip['subnet']['cidr']).prefixlen)
+ for ip in port['fixed_ips']
+ ]
+ self.vif_driver.init_l3(interface_name, cidrs, namespace=namespace)
+
+ def _unplug(self, namespace, port_id):
+ port_stub = {'id': port_id}
+ self.vip_plug_callback('unplug', port_stub)
+ interface_name = self.vif_driver.get_device_name(Wrap(port_stub))
+ self.vif_driver.unplug(interface_name, namespace=namespace)
+
+
+# NOTE (markmcclain) For compliance with interface.py which expects objects
+class Wrap(object):
+ """A light attribute wrapper for compatibility with the interface lib."""
+ def __init__(self, d):
+ self.__dict__.update(d)
+
+ def __getitem__(self, key):
+ return self.__dict__[key]
+
+
+def get_ns_name(namespace_id):
+ return NS_PREFIX + namespace_id
+
+
+def kill_pids_in_file(namespace_wrapper, pid_path):
+ if os.path.exists(pid_path):
+ with open(pid_path, 'r') as pids:
+ for pid in pids:
+ pid = pid.strip()
+ try:
+ namespace_wrapper.netns.execute(
+ ['kill', '-9', pid.strip()]
+ )
+ except RuntimeError:
+ LOG.exception(
+ _('Unable to kill haproxy process: %s'),
+ pid
+ )
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+from oslo.config import cfg
+
+from quantum.common import exceptions as q_exc
+from quantum.common import rpc as q_rpc
+from quantum.common import topics
+from quantum.db import api as qdbapi
+from quantum.db.loadbalancer import loadbalancer_db
+from quantum.openstack.common import log as logging
+from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import proxy
+from quantum.plugins.common import constants
+
+LOG = logging.getLogger(__name__)
+
+ACTIVE_PENDING = (
+ constants.ACTIVE,
+ constants.PENDING_CREATE,
+ constants.PENDING_UPDATE
+)
+
+
+class LoadBalancerCallbacks(object):
+ RPC_API_VERSION = '1.0'
+
+ def __init__(self, plugin):
+ self.plugin = plugin
+
+ def create_rpc_dispatcher(self):
+ return q_rpc.PluginRpcDispatcher([self])
+
+ def get_ready_devices(self, context, host=None):
+ with context.session.begin(subtransactions=True):
+ qry = context.session.query(
+ loadbalancer_db.Vip, loadbalancer_db.Pool
+ )
+ qry = qry.filter(loadbalancer_db.Vip.status.in_(ACTIVE_PENDING))
+ qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING))
+ up = True # makes pep8 and sqlalchemy happy
+ qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up)
+ qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
+ return [p.id for v, p in qry.all()]
+
+ def get_logical_device(self, context, pool_id=None, activate=True,
+ **kwargs):
+ with context.session.begin(subtransactions=True):
+ qry = context.session.query(loadbalancer_db.Pool)
+ qry = qry.filter_by(id=pool_id)
+ pool = qry.one()
+
+ if activate:
+ # set all resources to active
+ if pool.status in ACTIVE_PENDING:
+ pool.status = constants.ACTIVE
+
+ if pool.vip.status in ACTIVE_PENDING:
+ pool.vip.status = constants.ACTIVE
+
+ for m in pool.members:
+ if m.status in ACTIVE_PENDING:
+ m.status = constants.ACTIVE
+
+ for hm in pool.monitors:
+ if hm.monitor.status in ACTIVE_PENDING:
+ hm.monitor.status = constants.ACTIVE
+
+ if (pool.status != constants.ACTIVE
+ or pool.vip.status != constants.ACTIVE):
+ raise Exception(_('Expected active pool and vip'))
+
+ retval = {}
+ retval['pool'] = self.plugin._make_pool_dict(pool)
+ retval['vip'] = self.plugin._make_vip_dict(pool.vip)
+ retval['vip']['port'] = (
+ self.plugin._core_plugin._make_port_dict(pool.vip.port)
+ )
+ for fixed_ip in retval['vip']['port']['fixed_ips']:
+ fixed_ip['subnet'] = (
+ self.plugin._core_plugin.get_subnet(
+ context,
+ fixed_ip['subnet_id']
+ )
+ )
+ retval['members'] = [
+ self.plugin._make_member_dict(m)
+ for m in pool.members if m.status == constants.ACTIVE
+ ]
+ retval['healthmonitors'] = [
+ self.plugin._make_health_monitor_dict(hm.monitor)
+ for hm in pool.monitors
+ if hm.monitor.status == constants.ACTIVE
+ ]
+
+ return retval
+
+ def pool_destroyed(self, context, pool_id=None, host=None):
+ """Agent confirmation hook that a pool has been destroyed.
+
+ This method exists for subclasses to change the deletion
+ behavior.
+ """
+ pass
+
+ def plug_vip_port(self, context, port_id=None, host=None):
+ if not port_id:
+ return
+
+ port = self.plugin._core_plugin.get_port(
+ context,
+ port_id
+ )
+
+ port['admin_state_up'] = True
+ port['device_owner'] = 'quantum:' + constants.LOADBALANCER
+ port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host)))
+
+ self.plugin._core_plugin.update_port(
+ context,
+ port_id,
+ {'port': port}
+ )
+
+ def unplug_vip_port(self, context, port_id=None, host=None):
+ if not port_id:
+ return
+
+ port = self.plugin._core_plugin.get_port(
+ context,
+ port_id
+ )
+
+ port['admin_state_up'] = False
+ port['device_owner'] = ''
+ port['device_id'] = ''
+
+ try:
+ self.plugin._core_plugin.update_port(
+ context,
+ port_id,
+ {'port': port}
+ )
+
+ except q_exc.PortNotFound:
+ msg = _('Unable to find port %s to unplug. This can occur when '
+ 'the Vip has been deleted first.')
+ LOG.debug(msg, port_id)
+
+ def update_pool_stats(self, context, pool_id=None, stats=None, host=None):
+ # TODO (markmcclain): add stats collection
+ pass
+
+
+class LoadBalancerAgentApi(proxy.RpcProxy):
+ """Plugin side of plugin to agent RPC API."""
+
+ API_VERSION = '1.0'
+
+ def __init__(self, topic, host):
+ super(LoadBalancerAgentApi, self).__init__(topic, self.API_VERSION)
+ self.host = host
+
+ def reload_pool(self, context, pool_id):
+ return self.cast(
+ context,
+ self.make_msg('reload_pool', pool_id=pool_id, host=self.host),
+ topic=self.topic
+ )
+
+ def destroy_pool(self, context, pool_id):
+ return self.cast(
+ context,
+ self.make_msg('destroy_pool', pool_id=pool_id, host=self.host),
+ topic=self.topic
+ )
+
+ def modify_pool(self, context, pool_id):
+ return self.cast(
+ context,
+ self.make_msg('modify_pool', pool_id=pool_id, host=self.host),
+ topic=self.topic
+ )
+
+
+class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
+
+ """
+ Implementation of the Quantum Loadbalancer Service Plugin.
+
+ This class manages the workflow of LBaaS request/response.
+ Most DB related works are implemented in class
+ loadbalancer_db.LoadBalancerPluginDb.
+ """
+ supported_extension_aliases = ["lbaas"]
+
+ def __init__(self):
+ """
+ Do the initialization for the loadbalancer service plugin here.
+ """
+ qdbapi.register_models()
+
+ self.callbacks = LoadBalancerCallbacks(self)
+
+ self.conn = rpc.create_connection(new=True)
+ self.conn.create_consumer(
+ topics.LOADBALANCER_PLUGIN,
+ self.callbacks.create_rpc_dispatcher(),
+ fanout=False)
+ self.conn.consume_in_thread()
+
+ self.agent_rpc = LoadBalancerAgentApi(
+ topics.LOADBALANCER_AGENT,
+ cfg.CONF.host
+ )
+
+ def get_plugin_type(self):
+ return constants.LOADBALANCER
+
+ def get_plugin_description(self):
+ return "Quantum LoadBalancer Service Plugin"
+
+ def create_vip(self, context, vip):
+ vip['vip']['status'] = constants.PENDING_CREATE
+ v = super(LoadBalancerPlugin, self).create_vip(context, vip)
+ self.agent_rpc.reload_pool(context, v['pool_id'])
+ return v
+
+ def update_vip(self, context, id, vip):
+ if 'status' not in vip['vip']:
+ vip['vip']['status'] = constants.PENDING_UPDATE
+ v = super(LoadBalancerPlugin, self).update_vip(context, id, vip)
+ if v['status'] in ACTIVE_PENDING:
+ self.agent_rpc.reload_pool(context, v['pool_id'])
+ else:
+ self.agent_rpc.destroy_pool(context, v['pool_id'])
+ return v
+
+ def delete_vip(self, context, id):
+ vip = self.get_vip(context, id)
+ super(LoadBalancerPlugin, self).delete_vip(context, id)
+ self.agent_rpc.destroy_pool(context, vip['pool_id'])
+ pass
+
+ def create_pool(self, context, pool):
+ p = super(LoadBalancerPlugin, self).create_pool(context, pool)
+ # don't notify here because a pool needs a vip to be useful
+ return p
+
+ def update_pool(self, context, id, pool):
+ if 'status' not in pool['pool']:
+ pool['pool']['status'] = constants.PENDING_UPDATE
+ p = super(LoadBalancerPlugin, self).update_pool(context, id, pool)
+ if p['status'] in ACTIVE_PENDING:
+ self.agent_rpc.reload_pool(context, p['id'])
+ else:
+ self.agent_rpc.destroy_pool(context, p['id'])
+ return p
+
+ def delete_pool(self, context, id):
+ super(LoadBalancerPlugin, self).delete_pool(context, id)
+ self.agent_rpc.destroy_pool(context, id)
+
+ def create_member(self, context, member):
+ m = super(LoadBalancerPlugin, self).create_member(context, member)
+ self.agent_rpc.modify_pool(context, m['pool_id'])
+ return m
+
+ def update_member(self, context, id, member):
+ if 'status' not in member['member']:
+ member['member']['status'] = constants.PENDING_UPDATE
+ m = super(LoadBalancerPlugin, self).update_member(context, id, member)
+ self.agent_rpc.modify_pool(context, m['pool_id'])
+ return m
+
+ def delete_member(self, context, id):
+ m = self.get_member(context, id)
+ super(LoadBalancerPlugin, self).delete_member(context, id)
+ self.agent_rpc.modify_pool(context, m['pool_id'])
+
+ def update_health_monitor(self, context, id, health_monitor):
+ if 'status' not in health_monitor['health_monitor']:
+ health_monitor['health_monitor']['status'] = (
+ constants.PENDING_UPDATE
+ )
+ hm = super(LoadBalancerPlugin, self).update_health_monitor(
+ context,
+ id,
+ health_monitor
+ )
+
+ with context.session.begin(subtransactions=True):
+ qry = context.session.query(
+ loadbalancer_db.PoolMonitorAssociation
+ )
+ qry = qry.filter_by(monitor_id=hm['id'])
+
+ for assoc in qry.all():
+ self.agent_rpc.modify_pool(context, assoc['pool_id'])
+ return hm
+
+ def delete_health_monitor(self, context, id):
+ with context.session.begin(subtransactions=True):
+ qry = context.session.query(
+ loadbalancer_db.PoolMonitorAssociation
+ )
+ qry = qry.filter_by(monitor_id=id)
+
+ pool_ids = [a['pool_id'] for a in qry.all()]
+ super(LoadBalancerPlugin, self).delete_health_monitor(context, id)
+ for pid in pool_ids:
+ self.agent_rpc.modify_pool(context, pid)
+
+ def create_pool_health_monitor(self, context, health_monitor, pool_id):
+ retval = super(LoadBalancerPlugin, self).create_pool_health_monitor(
+ context,
+ health_monitor,
+ pool_id
+ )
+ self.agent_rpc.modify_pool(context, pool_id)
+
+ return retval
+++ /dev/null
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2013 OpenStack LLC.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-
-from quantum.db import api as qdbapi
-from quantum.db import model_base
-from quantum.db.loadbalancer import loadbalancer_db
-from quantum.extensions import loadbalancer
-from quantum.openstack.common import log as logging
-from quantum.plugins.common import constants
-
-LOG = logging.getLogger(__name__)
-
-
-class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
-
- """
- Implementation of the Quantum Loadbalancer Service Plugin.
-
- This class manages the workflow of LBaaS request/response.
- Most DB related works are implemented in class
- loadbalancer_db.LoadBalancerPluginDb.
- """
- supported_extension_aliases = ["lbaas"]
-
- def __init__(self):
- """
- Do the initialization for the loadbalancer service plugin here.
- """
- qdbapi.register_models(base=model_base.BASEV2)
-
- # TODO: we probably need to setup RPC channel (to talk to LbAgent) here
-
- def get_plugin_type(self):
- return constants.LOADBALANCER
-
- def get_plugin_description(self):
- return "Quantum LoadBalancer Service Plugin"
-
- def create_vip(self, context, vip):
- v = super(LoadBalancerPlugin, self).create_vip(context, vip)
- self.update_status(context, loadbalancer_db.Vip, v['id'],
- constants.PENDING_CREATE)
- LOG.debug(_("Create vip: %s"), v['id'])
-
- # If we adopt asynchronous mode, this method should return immediately
- # and let client to query the object status. The plugin will listen on
- # the event from device and update the object status by calling
- # self.update_state(context, Vip, id, ACTIVE/ERROR)
- #
- # In synchronous mode, send the request to device here and wait for
- # response. Eventually update the object status prior to the return.
- v_query = self.get_vip(context, v['id'])
- return v_query
-
- def update_vip(self, context, id, vip):
- v_query = self.get_vip(
- context, id, fields=["status"])
- if v_query['status'] in [
- constants.PENDING_DELETE, constants.ERROR]:
- raise loadbalancer.StateInvalid(id=id,
- state=v_query['status'])
-
- v = super(LoadBalancerPlugin, self).update_vip(context, id, vip)
- self.update_status(context, loadbalancer_db.Vip, id,
- constants.PENDING_UPDATE)
- LOG.debug(_("Update vip: %s"), id)
-
- # TODO notify lbagent
- v_rt = self.get_vip(context, id)
- return v_rt
-
- def delete_vip(self, context, id):
- self.update_status(context, loadbalancer_db.Vip, id,
- constants.PENDING_DELETE)
- LOG.debug(_("Delete vip: %s"), id)
-
- # TODO notify lbagent
- super(LoadBalancerPlugin, self).delete_vip(context, id)
-
- def get_vip(self, context, id, fields=None):
- res = super(LoadBalancerPlugin, self).get_vip(context, id, fields)
- LOG.debug(_("Get vip: %s"), id)
- return res
-
- def get_vips(self, context, filters=None, fields=None):
- res = super(LoadBalancerPlugin, self).get_vips(
- context, filters, fields)
- LOG.debug(_("Get vips"))
- return res
-
- def create_pool(self, context, pool):
- p = super(LoadBalancerPlugin, self).create_pool(context, pool)
- self.update_status(context, loadbalancer_db.Pool, p['id'],
- constants.PENDING_CREATE)
- LOG.debug(_("Create pool: %s"), p['id'])
-
- # TODO notify lbagent
- p_rt = self.get_pool(context, p['id'])
- return p_rt
-
- def update_pool(self, context, id, pool):
- p_query = self.get_pool(context, id, fields=["status"])
- if p_query['status'] in [
- constants.PENDING_DELETE, constants.ERROR]:
- raise loadbalancer.StateInvalid(id=id,
- state=p_query['status'])
- p = super(LoadBalancerPlugin, self).update_pool(context, id, pool)
- LOG.debug(_("Update pool: %s"), p['id'])
- # TODO notify lbagent
- p_rt = self.get_pool(context, id)
- return p_rt
-
- def delete_pool(self, context, id):
- self.update_status(context, loadbalancer_db.Pool, id,
- constants.PENDING_DELETE)
- # TODO notify lbagent
- super(LoadBalancerPlugin, self).delete_pool(context, id)
- LOG.debug(_("Delete pool: %s"), id)
-
- def get_pool(self, context, id, fields=None):
- res = super(LoadBalancerPlugin, self).get_pool(context, id, fields)
- LOG.debug(_("Get pool: %s"), id)
- return res
-
- def get_pools(self, context, filters=None, fields=None):
- res = super(LoadBalancerPlugin, self).get_pools(
- context, filters, fields)
- LOG.debug(_("Get Pools"))
- return res
-
- def stats(self, context, pool_id):
- res = super(LoadBalancerPlugin, self).get_stats(context, pool_id)
- LOG.debug(_("Get stats of Pool: %s"), pool_id)
- return res
-
- def create_pool_health_monitor(self, context, health_monitor, pool_id):
- m = super(LoadBalancerPlugin, self).create_pool_health_monitor(
- context, health_monitor, pool_id)
- LOG.debug(_("Create health_monitor of pool: %s"), pool_id)
- return m
-
- def get_pool_health_monitor(self, context, id, pool_id, fields=None):
- m = super(LoadBalancerPlugin, self).get_pool_health_monitor(
- context, id, pool_id, fields)
- LOG.debug(_("Get health_monitor of pool: %s"), pool_id)
- return m
-
- def delete_pool_health_monitor(self, context, id, pool_id):
- super(LoadBalancerPlugin, self).delete_pool_health_monitor(
- context, id, pool_id)
- LOG.debug(_("Delete health_monitor %(id)s of pool: %(pool_id)s"),
- {"id": id, "pool_id": pool_id})
-
- def get_member(self, context, id, fields=None):
- res = super(LoadBalancerPlugin, self).get_member(
- context, id, fields)
- LOG.debug(_("Get member: %s"), id)
- return res
-
- def get_members(self, context, filters=None, fields=None):
- res = super(LoadBalancerPlugin, self).get_members(
- context, filters, fields)
- LOG.debug(_("Get members"))
- return res
-
- def create_member(self, context, member):
- m = super(LoadBalancerPlugin, self).create_member(context, member)
- self.update_status(context, loadbalancer_db.Member, m['id'],
- constants.PENDING_CREATE)
- LOG.debug(_("Create member: %s"), m['id'])
- # TODO notify lbagent
- m_rt = self.get_member(context, m['id'])
- return m_rt
-
- def update_member(self, context, id, member):
- m_query = self.get_member(context, id, fields=["status"])
- if m_query['status'] in [
- constants.PENDING_DELETE, constants.ERROR]:
- raise loadbalancer.StateInvalid(id=id,
- state=m_query['status'])
- m = super(LoadBalancerPlugin, self).update_member(context, id, member)
- self.update_status(context, loadbalancer_db.Member, id,
- constants.PENDING_UPDATE)
- LOG.debug(_("Update member: %s"), m['id'])
- # TODO notify lbagent
- m_rt = self.get_member(context, id)
- return m_rt
-
- def delete_member(self, context, id):
- self.update_status(context, loadbalancer_db.Member, id,
- constants.PENDING_DELETE)
- LOG.debug(_("Delete member: %s"), id)
- # TODO notify lbagent
- super(LoadBalancerPlugin, self).delete_member(context, id)
-
- def get_health_monitor(self, context, id, fields=None):
- res = super(LoadBalancerPlugin, self).get_health_monitor(
- context, id, fields)
- LOG.debug(_("Get health_monitor: %s"), id)
- return res
-
- def get_health_monitors(self, context, filters=None, fields=None):
- res = super(LoadBalancerPlugin, self).get_health_monitors(
- context, filters, fields)
- LOG.debug(_("Get health_monitors"))
- return res
-
- def create_health_monitor(self, context, health_monitor):
- h = super(LoadBalancerPlugin, self).create_health_monitor(
- context, health_monitor)
- self.update_status(context, loadbalancer_db.HealthMonitor, h['id'],
- constants.PENDING_CREATE)
- LOG.debug(_("Create health_monitor: %s"), h['id'])
- # TODO notify lbagent
- h_rt = self.get_health_monitor(context, h['id'])
- return h_rt
-
- def update_health_monitor(self, context, id, health_monitor):
- h_query = self.get_health_monitor(context, id, fields=["status"])
- if h_query['status'] in [
- constants.PENDING_DELETE, constants.ERROR]:
- raise loadbalancer.StateInvalid(id=id,
- state=h_query['status'])
- h = super(LoadBalancerPlugin, self).update_health_monitor(
- context, id, health_monitor)
- self.update_status(context, loadbalancer_db.HealthMonitor, id,
- constants.PENDING_UPDATE)
- LOG.debug(_("Update health_monitor: %s"), h['id'])
- # TODO notify lbagent
- h_rt = self.get_health_monitor(context, id)
- return h_rt
-
- def delete_health_monitor(self, context, id):
- self.update_status(context, loadbalancer_db.HealthMonitor, id,
- constants.PENDING_DELETE)
- LOG.debug(_("Delete health_monitor: %s"), id)
- super(LoadBalancerPlugin, self).delete_health_monitor(context, id)
from quantum.extensions import loadbalancer
from quantum.manager import QuantumManager
from quantum.plugins.common import constants
-from quantum.plugins.services.loadbalancer import loadbalancerPlugin
+from quantum.plugins.services.agent_loadbalancer import (
+ plugin as loadbalancer_plugin
+)
from quantum.tests.unit import test_db_plugin
from quantum.tests.unit import test_extensions
from quantum.tests.unit import testlib_api
DB_CORE_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2'
DB_LB_PLUGIN_KLASS = (
- "quantum.plugins.services.loadbalancer."
- "loadbalancerPlugin.LoadBalancerPlugin"
+ "quantum.plugins.services.agent_loadbalancer.plugin.LoadBalancerPlugin"
)
ROOTDIR = os.path.dirname(__file__) + '../../../..'
ETCDIR = os.path.join(ROOTDIR, 'etc')
self._subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14"
- plugin = loadbalancerPlugin.LoadBalancerPlugin()
+ plugin = loadbalancer_plugin.LoadBalancerPlugin()
ext_mgr = PluginAwareExtensionManager(
extensions_path,
{constants.LOADBALANCER: plugin}
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Mark McClain, DreamHost
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Mark McClain, DreamHost
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Mark McClain, DreamHost
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Mark McClain, DreamHost
+
+import mock
+import testtools
+
+from quantum.plugins.services.agent_loadbalancer.agent import api
+
+
+class TestApiCache(testtools.TestCase):
+ def setUp(self):
+ super(TestApiCache, self).setUp()
+ self.addCleanup(mock.patch.stopall)
+
+ self.api = api.LbaasAgentApi('topic', mock.sentinel.context, 'host')
+ self.make_msg = mock.patch.object(self.api, 'make_msg').start()
+ self.mock_call = mock.patch.object(self.api, 'call').start()
+
+ def test_init(self):
+ self.assertEqual(self.api.host, 'host')
+ self.assertEqual(self.api.context, mock.sentinel.context)
+
+ def test_get_ready_devices(self):
+ self.assertEqual(
+ self.api.get_ready_devices(),
+ self.mock_call.return_value
+ )
+
+ self.make_msg.assert_called_once_with('get_ready_devices', host='host')
+ self.mock_call.assert_called_once_with(
+ mock.sentinel.context,
+ self.make_msg.return_value,
+ topic='topic'
+ )
+
+ def test_get_logical_device(self):
+ self.assertEqual(
+ self.api.get_logical_device('pool_id'),
+ self.mock_call.return_value
+ )
+
+ self.make_msg.assert_called_once_with(
+ 'get_logical_device',
+ pool_id='pool_id',
+ host='host')
+
+ self.mock_call.assert_called_once_with(
+ mock.sentinel.context,
+ self.make_msg.return_value,
+ topic='topic'
+ )
+
+ def test_pool_destroyed(self):
+ self.assertEqual(
+ self.api.pool_destroyed('pool_id'),
+ self.mock_call.return_value
+ )
+
+ self.make_msg.assert_called_once_with(
+ 'pool_destroyed',
+ pool_id='pool_id',
+ host='host')
+
+ self.mock_call.assert_called_once_with(
+ mock.sentinel.context,
+ self.make_msg.return_value,
+ topic='topic'
+ )
+
+ def test_plug_vip_port(self):
+ self.assertEqual(
+ self.api.plug_vip_port('port_id'),
+ self.mock_call.return_value
+ )
+
+ self.make_msg.assert_called_once_with(
+ 'plug_vip_port',
+ port_id='port_id',
+ host='host')
+
+ self.mock_call.assert_called_once_with(
+ mock.sentinel.context,
+ self.make_msg.return_value,
+ topic='topic'
+ )
+
+ def test_unplug_vip_port(self):
+ self.assertEqual(
+ self.api.unplug_vip_port('port_id'),
+ self.mock_call.return_value
+ )
+
+ self.make_msg.assert_called_once_with(
+ 'unplug_vip_port',
+ port_id='port_id',
+ host='host')
+
+ self.mock_call.assert_called_once_with(
+ mock.sentinel.context,
+ self.make_msg.return_value,
+ topic='topic'
+ )
+
+ def test_update_pool_stats(self):
+ self.assertEqual(
+ self.api.update_pool_stats('pool_id', {'stat': 'stat'}),
+ self.mock_call.return_value
+ )
+
+ self.make_msg.assert_called_once_with(
+ 'update_pool_stats',
+ pool_id='pool_id',
+ stats={'stat': 'stat'},
+ host='host')
+
+ self.mock_call.assert_called_once_with(
+ mock.sentinel.context,
+ self.make_msg.return_value,
+ topic='topic'
+ )
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Mark McClain, DreamHost
+
+import contextlib
+import mock
+from oslo.config import cfg
+import testtools
+
+from quantum.plugins.services.agent_loadbalancer import agent
+
+
+class TestLbaasService(testtools.TestCase):
+ def setUp(self):
+ super(TestLbaasService, self).setUp()
+ self.addCleanup(cfg.CONF.reset)
+
+ cfg.CONF.register_opts(agent.OPTS)
+
+ def test_start(self):
+ with mock.patch.object(
+ agent.rpc_service.Service, 'start'
+ ) as mock_start:
+
+ mgr = mock.Mock()
+ agent_service = agent.LbaasAgentService('host', 'topic', mgr)
+ agent_service.start()
+
+ self.assertTrue(mock_start.called)
+
+ def test_main(self):
+ with contextlib.nested(
+ mock.patch.object(agent.service, 'launch'),
+ mock.patch.object(agent, 'eventlet'),
+ mock.patch('sys.argv'),
+ mock.patch.object(agent.manager, 'LbaasAgentManager')
+ ) as (mock_launch, mock_eventlet, sys_argv, mgr_cls):
+ agent.main()
+
+ self.assertTrue(mock_eventlet.monkey_patch.called)
+ mock_launch.assert_called_once_with(mock.ANY)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Mark McClain, DreamHost
+
+import contextlib
+
+import mock
+import testtools
+
+from quantum.plugins.services.agent_loadbalancer.agent import manager
+
+
+class TestLogicalDeviceCache(testtools.TestCase):
+ def setUp(self):
+ super(TestLogicalDeviceCache, self).setUp()
+ self.cache = manager.LogicalDeviceCache()
+
+ def test_put(self):
+ fake_device = {
+ 'vip': {'port_id': 'port_id'},
+ 'pool': {'id': 'pool_id'}
+ }
+ self.cache.put(fake_device)
+
+ self.assertEqual(len(self.cache.devices), 1)
+ self.assertEqual(len(self.cache.port_lookup), 1)
+ self.assertEqual(len(self.cache.pool_lookup), 1)
+
+ def test_double_put(self):
+ fake_device = {
+ 'vip': {'port_id': 'port_id'},
+ 'pool': {'id': 'pool_id'}
+ }
+ self.cache.put(fake_device)
+ self.cache.put(fake_device)
+
+ self.assertEqual(len(self.cache.devices), 1)
+ self.assertEqual(len(self.cache.port_lookup), 1)
+ self.assertEqual(len(self.cache.pool_lookup), 1)
+
+ def test_remove_in_cache(self):
+ fake_device = {
+ 'vip': {'port_id': 'port_id'},
+ 'pool': {'id': 'pool_id'}
+ }
+ self.cache.put(fake_device)
+
+ self.assertEqual(len(self.cache.devices), 1)
+
+ self.cache.remove(fake_device)
+
+ self.assertFalse(len(self.cache.devices))
+ self.assertFalse(self.cache.port_lookup)
+ self.assertFalse(self.cache.pool_lookup)
+
+ def test_remove_in_cache_same_object(self):
+ fake_device = {
+ 'vip': {'port_id': 'port_id'},
+ 'pool': {'id': 'pool_id'}
+ }
+ self.cache.put(fake_device)
+
+ self.assertEqual(len(self.cache.devices), 1)
+
+ self.cache.remove(set(self.cache.devices).pop())
+
+ self.assertFalse(len(self.cache.devices))
+ self.assertFalse(self.cache.port_lookup)
+ self.assertFalse(self.cache.pool_lookup)
+
+ def test_remove_by_pool_id(self):
+ fake_device = {
+ 'vip': {'port_id': 'port_id'},
+ 'pool': {'id': 'pool_id'}
+ }
+ self.cache.put(fake_device)
+
+ self.assertEqual(len(self.cache.devices), 1)
+
+ self.cache.remove_by_pool_id('pool_id')
+
+ self.assertFalse(len(self.cache.devices))
+ self.assertFalse(self.cache.port_lookup)
+ self.assertFalse(self.cache.pool_lookup)
+
+ def test_get_by_pool_id(self):
+ fake_device = {
+ 'vip': {'port_id': 'port_id'},
+ 'pool': {'id': 'pool_id'}
+ }
+ self.cache.put(fake_device)
+
+ dev = self.cache.get_by_pool_id('pool_id')
+
+ self.assertEqual(dev.pool_id, 'pool_id')
+ self.assertEqual(dev.port_id, 'port_id')
+
+ def test_get_by_port_id(self):
+ fake_device = {
+ 'vip': {'port_id': 'port_id'},
+ 'pool': {'id': 'pool_id'}
+ }
+ self.cache.put(fake_device)
+
+ dev = self.cache.get_by_port_id('port_id')
+
+ self.assertEqual(dev.pool_id, 'pool_id')
+ self.assertEqual(dev.port_id, 'port_id')
+
+ def test_get_pool_ids(self):
+ fake_device = {
+ 'vip': {'port_id': 'port_id'},
+ 'pool': {'id': 'pool_id'}
+ }
+ self.cache.put(fake_device)
+
+ self.assertEqual(self.cache.get_pool_ids(), ['pool_id'])
+
+
+class TestManager(testtools.TestCase):
+ def setUp(self):
+ super(TestManager, self).setUp()
+ self.addCleanup(mock.patch.stopall)
+
+ mock_conf = mock.Mock()
+ mock_conf.interface_driver = 'intdriver'
+ mock_conf.device_driver = 'devdriver'
+ mock_conf.AGENT.root_helper = 'sudo'
+ mock_conf.loadbalancer_state_path = '/the/path'
+
+ self.mock_importer = mock.patch.object(manager, 'importutils').start()
+
+ rpc_mock_cls = mock.patch(
+ 'quantum.plugins.services.agent_loadbalancer.agent.api'
+ '.LbaasAgentApi'
+ ).start()
+
+ self.mgr = manager.LbaasAgentManager(mock_conf)
+ self.rpc_mock = rpc_mock_cls.return_value
+ self.log = mock.patch.object(manager, 'LOG').start()
+ self.mgr.needs_resync = False
+
+ def test_initialize_service_hook(self):
+ with mock.patch.object(self.mgr, 'sync_state') as sync:
+ self.mgr.initialize_service_hook(mock.Mock())
+ sync.assert_called_once_with()
+
+ def test_periodic_resync_needs_sync(self):
+ with mock.patch.object(self.mgr, 'sync_state') as sync:
+ self.mgr.needs_resync = True
+ self.mgr.periodic_resync(mock.Mock())
+ sync.assert_called_once_with()
+
+ def test_periodic_resync_no_sync(self):
+ with mock.patch.object(self.mgr, 'sync_state') as sync:
+ self.mgr.needs_resync = False
+ self.mgr.periodic_resync(mock.Mock())
+ self.assertFalse(sync.called)
+
+ def test_collect_stats(self):
+ with mock.patch.object(self.mgr, 'cache') as cache:
+ cache.get_pool_ids.return_value = ['1', '2']
+ self.mgr.collect_stats(mock.Mock())
+ self.rpc_mock.update_pool_stats.assert_has_calls([
+ mock.call('1', mock.ANY),
+ mock.call('2', mock.ANY)
+ ])
+
+ def test_collect_stats_exception(self):
+ with mock.patch.object(self.mgr, 'cache') as cache:
+ cache.get_pool_ids.return_value = ['1', '2']
+ with mock.patch.object(self.mgr, 'driver') as driver:
+ driver.get_stats.side_effect = Exception
+
+ self.mgr.collect_stats(mock.Mock())
+
+ self.assertFalse(self.rpc_mock.called)
+ self.assertTrue(self.mgr.needs_resync)
+ self.assertTrue(self.log.exception.called)
+
+ def test_vip_plug_callback(self):
+ self.mgr._vip_plug_callback('plug', {'id': 'id'})
+ self.rpc_mock.plug_vip_port.assert_called_once_with('id')
+
+ def test_vip_unplug_callback(self):
+ self.mgr._vip_plug_callback('unplug', {'id': 'id'})
+ self.rpc_mock.unplug_vip_port.assert_called_once_with('id')
+
+ def _sync_state_helper(self, cache, ready, refreshed, destroyed):
+ with contextlib.nested(
+ mock.patch.object(self.mgr, 'cache'),
+ mock.patch.object(self.mgr, 'refresh_device'),
+ mock.patch.object(self.mgr, 'destroy_device')
+ ) as (mock_cache, refresh, destroy):
+
+ mock_cache.get_pool_ids.return_value = cache
+ self.rpc_mock.get_ready_devices.return_value = ready
+
+ self.mgr.sync_state()
+
+ self.assertEqual(len(refreshed), len(refresh.mock_calls))
+ self.assertEqual(len(destroyed), len(destroy.mock_calls))
+
+ refresh.assert_has_calls([mock.call(i) for i in refreshed])
+ destroy.assert_has_calls([mock.call(i) for i in destroyed])
+ self.assertFalse(self.mgr.needs_resync)
+
+ def test_sync_state_all_known(self):
+ self._sync_state_helper(['1', '2'], ['1', '2'], ['1', '2'], [])
+
+ def test_sync_state_all_unknown(self):
+ self._sync_state_helper([], ['1', '2'], ['1', '2'], [])
+
+ def test_sync_state_destroy_all(self):
+ self._sync_state_helper(['1', '2'], [], [], ['1', '2'])
+
+ def test_sync_state_both(self):
+ self._sync_state_helper(['1'], ['2'], ['2'], ['1'])
+
+ def test_sync_state_exception(self):
+ self.rpc_mock.get_ready_devices.side_effect = Exception
+
+ self.mgr.sync_state()
+
+ self.assertTrue(self.log.exception.called)
+ self.assertTrue(self.mgr.needs_resync)
+
+ def test_refresh_device_exists(self):
+ config = self.rpc_mock.get_logical_device.return_value
+
+ with mock.patch.object(self.mgr, 'driver') as driver:
+ with mock.patch.object(self.mgr, 'cache') as cache:
+ driver.exists.return_value = True
+
+ self.mgr.refresh_device(config)
+
+ driver.exists.assert_called_once_with(config)
+ driver.update.assert_called_once_with(config)
+ cache.put.assert_called_once_with(config)
+ self.assertFalse(self.mgr.needs_resync)
+
+ def test_refresh_device_new(self):
+ config = self.rpc_mock.get_logical_device.return_value
+
+ with mock.patch.object(self.mgr, 'driver') as driver:
+ with mock.patch.object(self.mgr, 'cache') as cache:
+ driver.exists.return_value = False
+
+ self.mgr.refresh_device(config)
+
+ driver.exists.assert_called_once_with(config)
+ driver.create.assert_called_once_with(config)
+ cache.put.assert_called_once_with(config)
+ self.assertFalse(self.mgr.needs_resync)
+
+ def test_refresh_device_exception(self):
+ config = self.rpc_mock.get_logical_device.return_value
+
+ with mock.patch.object(self.mgr, 'driver') as driver:
+ with mock.patch.object(self.mgr, 'cache') as cache:
+ driver.exists.side_effect = Exception
+ self.mgr.refresh_device(config)
+
+ driver.exists.assert_called_once_with(config)
+ self.assertTrue(self.mgr.needs_resync)
+ self.assertTrue(self.log.exception.called)
+ self.assertFalse(cache.put.called)
+
+ def test_destroy_device_known(self):
+ with mock.patch.object(self.mgr, 'driver') as driver:
+ with mock.patch.object(self.mgr, 'cache') as cache:
+ cache.get_by_pool_id.return_value = True
+
+ self.mgr.destroy_device('pool_id')
+ cache.get_by_pool_id.assert_called_once_with('pool_id')
+ driver.destroy.assert_called_once_with('pool_id')
+ self.rpc_mock.pool_destroyed.assert_called_once_with(
+ 'pool_id'
+ )
+ cache.remove.assert_called_once_with(True)
+ self.assertFalse(self.mgr.needs_resync)
+
+ def test_destroy_device_unknown(self):
+ with mock.patch.object(self.mgr, 'driver') as driver:
+ with mock.patch.object(self.mgr, 'cache') as cache:
+ cache.get_by_pool_id.return_value = None
+
+ self.mgr.destroy_device('pool_id')
+ cache.get_by_pool_id.assert_called_once_with('pool_id')
+ self.assertFalse(driver.destroy.called)
+
+ def test_destroy_device_exception(self):
+ with mock.patch.object(self.mgr, 'driver') as driver:
+ with mock.patch.object(self.mgr, 'cache') as cache:
+ cache.get_by_pool_id.return_value = True
+ driver.destroy.side_effect = Exception
+
+ self.mgr.destroy_device('pool_id')
+ cache.get_by_pool_id.assert_called_once_with('pool_id')
+
+ self.assertTrue(self.log.exception.called)
+ self.assertTrue(self.mgr.needs_resync)
+
+ def test_remove_orphans(self):
+ with mock.patch.object(self.mgr, 'driver') as driver:
+ with mock.patch.object(self.mgr, 'cache') as cache:
+ cache.get_pool_ids.return_value = ['1', '2']
+ self.mgr.remove_orphans()
+
+ driver.remove_orphans.assert_called_once_with(['1', '2'])
+
+ def test_reload_pool(self):
+ with mock.patch.object(self.mgr, 'refresh_device') as refresh:
+ self.mgr.reload_pool(mock.Mock(), pool_id='pool_id')
+ refresh.assert_called_once_with('pool_id')
+
+ def test_modify_pool_known(self):
+ with mock.patch.object(self.mgr, 'refresh_device') as refresh:
+ with mock.patch.object(self.mgr, 'cache') as cache:
+ cache.get_by_pool_id.return_value = True
+
+ self.mgr.reload_pool(mock.Mock(), pool_id='pool_id')
+
+ refresh.assert_called_once_with('pool_id')
+
+ def test_modify_pool_unknown(self):
+ with mock.patch.object(self.mgr, 'refresh_device') as refresh:
+ with mock.patch.object(self.mgr, 'cache') as cache:
+ cache.get_by_pool_id.return_value = False
+
+ self.mgr.modify_pool(mock.Mock(), pool_id='pool_id')
+
+ self.assertFalse(refresh.called)
+
+ def test_destroy_pool_known(self):
+ with mock.patch.object(self.mgr, 'destroy_device') as destroy:
+ with mock.patch.object(self.mgr, 'cache') as cache:
+ cache.get_by_pool_id.return_value = True
+
+ self.mgr.destroy_pool(mock.Mock(), pool_id='pool_id')
+
+ destroy.assert_called_once_with('pool_id')
+
+ def test_destroy_pool_unknown(self):
+ with mock.patch.object(self.mgr, 'destroy_device') as destroy:
+ with mock.patch.object(self.mgr, 'cache') as cache:
+ cache.get_by_pool_id.return_value = False
+
+ self.mgr.destroy_pool(mock.Mock(), pool_id='pool_id')
+
+ self.assertFalse(destroy.called)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Mark McClain, DreamHost
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Mark McClain, DreamHost
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Mark McClain, DreamHost
+
+import contextlib
+import mock
+import testtools
+
+from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
+ namespace_driver
+)
+
+
+class TestHaproxyNSDriver(testtools.TestCase):
+ def setUp(self):
+ super(TestHaproxyNSDriver, self).setUp()
+
+ self.vif_driver = mock.Mock()
+ self.vip_plug_callback = mock.Mock()
+
+ self.driver = namespace_driver.HaproxyNSDriver(
+ 'sudo',
+ '/the/path',
+ self.vif_driver,
+ self.vip_plug_callback
+ )
+
+ self.fake_config = {
+ 'pool': {'id': 'pool_id'},
+ 'vip': {'id': 'vip_id', 'port': {'id': 'port_id'}}
+ }
+
+ def test_create(self):
+ with mock.patch.object(self.driver, '_plug') as plug:
+ with mock.patch.object(self.driver, '_spawn') as spawn:
+ self.driver.create(self.fake_config)
+
+ plug.assert_called_once_with(
+ 'qlbaas-pool_id', {'id': 'port_id'}
+ )
+ spawn.assert_called_once_with(self.fake_config)
+
+ def test_update(self):
+ with contextlib.nested(
+ mock.patch.object(self.driver, '_get_state_file_path'),
+ mock.patch.object(self.driver, '_spawn'),
+ mock.patch('__builtin__.open')
+ ) as (gsp, spawn, mock_open):
+ mock_open.return_value = ['5']
+
+ self.driver.update(self.fake_config)
+
+ mock_open.assert_called_once_with(gsp.return_value, 'r')
+ spawn.assert_called_once_with(self.fake_config, ['-sf', '5'])
+
+ def test_spawn(self):
+ with contextlib.nested(
+ mock.patch.object(namespace_driver.hacfg, 'save_config'),
+ mock.patch.object(self.driver, '_get_state_file_path'),
+ mock.patch('quantum.agent.linux.ip_lib.IPWrapper')
+ ) as (mock_save, gsp, ip_wrap):
+ gsp.side_effect = lambda x, y: y
+
+ self.driver._spawn(self.fake_config)
+
+ mock_save.assert_called_once_with('conf', self.fake_config, 'sock')
+ cmd = ['haproxy', '-f', 'conf', '-p', 'pid']
+ ip_wrap.assert_has_calls([
+ mock.call('sudo', 'qlbaas-pool_id'),
+ mock.call().netns.execute(cmd)
+ ])
+
+ def test_destroy(self):
+ with contextlib.nested(
+ mock.patch.object(self.driver, '_get_state_file_path'),
+ mock.patch.object(namespace_driver, 'kill_pids_in_file'),
+ mock.patch.object(self.driver, '_unplug'),
+ mock.patch('quantum.agent.linux.ip_lib.IPWrapper'),
+ mock.patch('os.path.isdir'),
+ mock.patch('shutil.rmtree')
+ ) as (gsp, kill, unplug, ip_wrap, isdir, rmtree):
+ gsp.side_effect = lambda x, y: '/pool/' + y
+
+ self.driver.pool_to_port_id['pool_id'] = 'port_id'
+ isdir.return_value = True
+
+ self.driver.destroy('pool_id')
+
+ kill.assert_called_once_with(ip_wrap(), '/pool/pid')
+ unplug.assert_called_once_with('qlbaas-pool_id', 'port_id')
+ isdir.called_once_with('/pool')
+ rmtree.assert_called_once_with('/pool')
+ ip_wrap.assert_has_calls([
+ mock.call('sudo', 'qlbaas-pool_id'),
+ mock.call().garbage_collect_namespace()
+ ])
+
+ def test_exists(self):
+ with contextlib.nested(
+ mock.patch.object(self.driver, '_get_state_file_path'),
+ mock.patch('quantum.agent.linux.ip_lib.IPWrapper'),
+ mock.patch('socket.socket'),
+ mock.patch('os.path.exists'),
+ ) as (gsp, ip_wrap, socket, path_exists):
+ gsp.side_effect = lambda x, y: '/pool/' + y
+
+ ip_wrap.return_value.netns.exists.return_value = True
+ path_exists.return_value = True
+
+ self.driver.exists('pool_id')
+
+ ip_wrap.assert_has_calls([
+ mock.call('sudo'),
+ mock.call().netns.exists('qlbaas-pool_id')
+ ])
+
+ self.assertTrue(self.driver.exists('pool_id'))
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# @author: Mark McClain, DreamHost
+
+import mock
+import testtools
+
+from quantum import context
+from quantum import manager
+from quantum.plugins.common import constants
+from quantum.plugins.services.agent_loadbalancer import plugin
+from quantum.tests.unit.db.loadbalancer import test_db_loadbalancer
+
+
+class TestLoadBalancerPluginBase(
+ test_db_loadbalancer.LoadBalancerPluginDbTestCase):
+
+ def setUp(self):
+ super(TestLoadBalancerPluginBase, self).setUp()
+
+ # create another API instance to make testing easier
+ # pass a mock to our API instance
+
+ # we need access to loaded plugins to modify models
+ loaded_plugins = manager.QuantumManager().get_service_plugins()
+ self.plugin_instance = loaded_plugins[constants.LOADBALANCER]
+ self.callbacks = self.plugin_instance.callbacks
+
+
+class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
+ def test_get_ready_devices(self):
+ with self.vip() as vip:
+ ready = self.callbacks.get_ready_devices(
+ context.get_admin_context(),
+ )
+ self.assertEqual(ready, [vip['vip']['pool_id']])
+
+ def test_get_ready_devices_inactive_vip(self):
+ with self.vip() as vip:
+
+ # set the vip inactive need to use plugin directly since
+ # status is not tenant mutable
+ self.plugin_instance.update_vip(
+ context.get_admin_context(),
+ vip['vip']['id'],
+ {'vip': {'status': constants.INACTIVE}}
+ )
+
+ ready = self.callbacks.get_ready_devices(
+ context.get_admin_context(),
+ )
+ self.assertFalse(ready)
+
+ def test_get_ready_devices_inactive_pool(self):
+ with self.vip() as vip:
+
+ # set the pool inactive need to use plugin directly since
+ # status is not tenant mutable
+ self.plugin_instance.update_pool(
+ context.get_admin_context(),
+ vip['vip']['pool_id'],
+ {'pool': {'status': constants.INACTIVE}}
+ )
+
+ ready = self.callbacks.get_ready_devices(
+ context.get_admin_context(),
+ )
+ self.assertFalse(ready)
+
+ def test_get_logical_device_inactive(self):
+ with self.pool() as pool:
+ with self.vip(pool=pool) as vip:
+ with self.member(pool_id=vip['vip']['pool_id']) as member:
+ self.assertRaises(
+ Exception,
+ self.callbacks.get_logical_device,
+ context.get_admin_context(),
+ pool['pool']['id'],
+ activate=False
+ )
+
+ def test_get_logical_device_activate(self):
+ with self.pool() as pool:
+ with self.vip(pool=pool) as vip:
+ with self.member(pool_id=vip['vip']['pool_id']) as member:
+ ctx = context.get_admin_context()
+
+ # build the expected
+ port = self.plugin_instance._core_plugin.get_port(
+ ctx, vip['vip']['port_id']
+ )
+ subnet = self.plugin_instance._core_plugin.get_subnet(
+ ctx, vip['vip']['subnet_id']
+ )
+ port['fixed_ips'][0]['subnet'] = subnet
+
+ # reload pool to add members and vip
+ pool = self.plugin_instance.get_pool(
+ ctx, pool['pool']['id']
+ )
+
+ pool['status'] = constants.ACTIVE
+ vip['vip']['status'] = constants.ACTIVE
+ vip['vip']['port'] = port
+ member['member']['status'] = constants.ACTIVE
+
+ expected = {
+ 'pool': pool,
+ 'vip': vip['vip'],
+ 'members': [member['member']],
+ 'healthmonitors': []
+ }
+
+ logical_config = self.callbacks.get_logical_device(
+ ctx, pool['id'], activate=True
+ )
+
+ self.assertEqual(logical_config, expected)
+
+ def _update_port_test_helper(self, expected, func, **kwargs):
+ core = self.plugin_instance._core_plugin
+
+ with self.pool() as pool:
+ with self.vip(pool=pool) as vip:
+ with self.member(pool_id=vip['vip']['pool_id']) as member:
+ ctx = context.get_admin_context()
+ func(ctx, port_id=vip['vip']['port_id'], **kwargs)
+
+ db_port = core.get_port(ctx, vip['vip']['port_id'])
+
+ for k, v in expected.iteritems():
+ self.assertEqual(db_port[k], v)
+
+ def test_plug_vip_port(self):
+ exp = {
+ 'device_owner': 'quantum:' + constants.LOADBALANCER,
+ 'device_id': 'c596ce11-db30-5c72-8243-15acaae8690f',
+ 'admin_state_up': True
+ }
+ self._update_port_test_helper(
+ exp,
+ self.callbacks.plug_vip_port,
+ host='host'
+ )
+
+ def test_unplug_vip_port(self):
+ exp = {
+ 'device_owner': '',
+ 'device_id': '',
+ 'admin_state_up': False
+ }
+ self._update_port_test_helper(
+ exp,
+ self.callbacks.unplug_vip_port,
+ host='host'
+ )
+
+
+class TestLoadBalancerAgentApi(testtools.TestCase):
+ def setUp(self):
+ super(TestLoadBalancerAgentApi, self).setUp()
+ self.addCleanup(mock.patch.stopall)
+
+ self.api = plugin.LoadBalancerAgentApi('topic', 'host')
+ self.mock_cast = mock.patch.object(self.api, 'cast').start()
+ self.mock_msg = mock.patch.object(self.api, 'make_msg').start()
+
+ def test_init(self):
+ self.assertEqual(self.api.topic, 'topic')
+ self.assertEqual(self.api.host, 'host')
+
+ def _call_test_helper(self, method_name):
+ rv = getattr(self.api, method_name)(mock.sentinel.context, 'the_id')
+ self.assertEqual(rv, self.mock_cast.return_value)
+ self.mock_cast.assert_called_once_with(
+ mock.sentinel.context,
+ self.mock_msg.return_value,
+ topic='topic'
+ )
+
+ self.mock_msg.assert_called_once_with(
+ method_name,
+ pool_id='the_id',
+ host='host'
+ )
+
+ def test_reload_pool(self):
+ self._call_test_helper('reload_pool')
+
+ def test_destroy_pool(self):
+ self._call_test_helper('destroy_pool')
+
+ def test_modify_pool(self):
+ self._call_test_helper('modify_pool')
+
+
+class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
+ def setUp(self):
+ self.log = mock.patch.object(plugin, 'LOG')
+ api_cls = mock.patch.object(plugin, 'LoadBalancerAgentApi').start()
+ super(TestLoadBalancerPluginNotificationWrapper, self).setUp()
+ self.mock_api = api_cls.return_value
+
+ self.addCleanup(mock.patch.stopall)
+
+ def test_create_vip(self):
+ with self.subnet() as subnet:
+ with self.pool(subnet=subnet) as pool:
+ with self.vip(pool=pool, subnet=subnet) as vip:
+ self.mock_api.reload_pool.assert_called_once_with(
+ mock.ANY,
+ vip['vip']['pool_id']
+ )
+
+ def test_update_vip(self):
+ with self.subnet() as subnet:
+ with self.pool(subnet=subnet) as pool:
+ with self.vip(pool=pool, subnet=subnet) as vip:
+ self.mock_api.reset_mock()
+ ctx = context.get_admin_context()
+ vip['vip'].pop('status')
+ new_vip = self.plugin_instance.update_vip(
+ ctx,
+ vip['vip']['id'],
+ vip
+ )
+
+ self.mock_api.reload_pool.assert_called_once_with(
+ mock.ANY,
+ vip['vip']['pool_id']
+ )
+
+ self.assertEqual(
+ new_vip['status'],
+ constants.PENDING_UPDATE
+ )
+
+ def t2est_delete_vip(self):
+ with self.subnet() as subnet:
+ with self.pool(subnet=subnet) as pool:
+ with self.vip(pool=pool, subnet=subnet, no_delete=True) as vip:
+ self.mock_api.reset_mock()
+ ctx = context.get_admin_context()
+ self.plugin_instance.delete_vip(context, vip['vip']['id'])
+ self.mock_api.destroy_pool.assert_called_once_with(
+ mock.ANY,
+ vip['vip']['pool_id']
+ )
'\x00' * 232])
actual_val = utils.get_interface_mac('eth0')
self.assertEqual(actual_val, expect_val)
+
+
+class AgentUtilsReplaceFile(testtools.TestCase):
+ def test_replace_file(self):
+ # make file to replace
+ with mock.patch('tempfile.NamedTemporaryFile') as ntf:
+ ntf.return_value.name = '/baz'
+ with mock.patch('os.chmod') as chmod:
+ with mock.patch('os.rename') as rename:
+ utils.replace_file('/foo', 'bar')
+
+ expected = [mock.call('w+', dir='/', delete=False),
+ mock.call().write('bar'),
+ mock.call().close()]
+
+ ntf.assert_has_calls(expected)
+ chmod.assert_called_once_with('/baz', 0644)
+ rename.assert_called_once_with('/baz', '/foo')
def test_base_abc_error(self):
self.assertRaises(TypeError, dhcp.DhcpBase, None)
- def test_replace_file(self):
- # make file to replace
- with mock.patch('tempfile.NamedTemporaryFile') as ntf:
- ntf.return_value.name = '/baz'
- with mock.patch('os.chmod') as chmod:
- with mock.patch('os.rename') as rename:
- dhcp.replace_file('/foo', 'bar')
-
- expected = [mock.call('w+', dir='/', delete=False),
- mock.call().write('bar'),
- mock.call().close()]
-
- ntf.assert_has_calls(expected)
- chmod.assert_called_once_with('/baz', 0644)
- rename.assert_called_once_with('/baz', '/foo')
-
def test_restart(self):
class SubClass(dhcp.DhcpBase):
def __init__(self):
self.conf.set_override('state_path', '')
self.conf.use_namespaces = True
- self.replace_p = mock.patch('quantum.agent.linux.dhcp.replace_file')
+ self.replace_p = mock.patch('quantum.agent.linux.utils.replace_file')
self.execute_p = mock.patch('quantum.agent.linux.utils.execute')
self.addCleanup(self.execute_p.stop)
self.safe = self.replace_p.start()
self.assertEqual(lp.interface_name, 'tap0')
def test_set_interface_name(self):
- with mock.patch('quantum.agent.linux.dhcp.replace_file') as replace:
+ with mock.patch('quantum.agent.linux.utils.replace_file') as replace:
lp = LocalChild(self.conf, FakeDualNetwork())
with mock.patch.object(lp, 'get_conf_file_name') as conf_file:
conf_file.return_value = '/interface'
'quantum-debug = quantum.debug.shell:main',
'quantum-ovs-cleanup = quantum.agent.ovs_cleanup_util:main',
'quantum-db-manage = quantum.db.migration.cli:main',
+ ('quantum-lbaas-agent = '
+ 'quantum.plugins.services.agent_loadbalancer.agent:main'),
('quantum-check-nvp-config = '
'quantum.plugins.nicira.nicira_nvp_plugin.check_nvp_config:main'),
]