From: Eugene Nikanorov Date: Sun, 5 May 2013 02:34:44 +0000 (+0400) Subject: Make reference lbaas implementation as a pluggable driver X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=7176de17bd52d21016a7ffb2488a1bef644c84db;p=openstack-build%2Fneutron-build.git Make reference lbaas implementation as a pluggable driver implements blueprint multi-vendor-support-for-lbaas-step1 This patch implements the following changes: * merge lbaas_plugin.py and plugin.py into 'plugin.py' After that the default 'reference' implementation is available again. * move all code related to reference implementation from plugin.py to drivers/haproxy/plugin_driver.py * Inherit HaproxyOnHostPluginDriver from abstract driver and implement its interface. * modify tests accordingly Change-Id: Ib4bfe286826acdedeadbeeff4713448c073378d2 --- diff --git a/bin/quantum-lbaas-agent b/bin/quantum-lbaas-agent index c1b3be43a..7c11322c0 100755 --- a/bin/quantum-lbaas-agent +++ b/bin/quantum-lbaas-agent @@ -20,7 +20,7 @@ import os import sys sys.path.insert(0, os.getcwd()) -from quantum.plugins.services.agent_loadbalancer.agent import main +from quantum.plugins.services.agent_loadbalancer.drivers.haproxy.agent import main main() diff --git a/quantum/common/topics.py b/quantum/common/topics.py index 06db33801..a766430c5 100644 --- a/quantum/common/topics.py +++ b/quantum/common/topics.py @@ -25,11 +25,9 @@ UPDATE = 'update' AGENT = 'q-agent-notifier' PLUGIN = 'q-plugin' DHCP = 'q-dhcp-notifer' -LOADBALANCER_PLUGIN = 'q-loadbalancer-plugin' L3_AGENT = 'l3_agent' DHCP_AGENT = 'dhcp_agent' -LOADBALANCER_AGENT = 'loadbalancer_agent' def get_topic_name(prefix, table, operation): diff --git a/quantum/db/loadbalancer/loadbalancer_db.py b/quantum/db/loadbalancer/loadbalancer_db.py index 332aa96dd..44062479a 100644 --- a/quantum/db/loadbalancer/loadbalancer_db.py +++ b/quantum/db/loadbalancer/loadbalancer_db.py @@ -426,7 +426,6 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase, context.session.delete(vip) if vip.port: # this is a Quantum port self._core_plugin.delete_port(context, vip.port.id) - context.session.flush() def get_vip(self, context, id, fields=None): vip = self._get_resource(context, Vip, id) diff --git a/quantum/plugins/services/agent_loadbalancer/agent/__init__.py b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/agent.py similarity index 91% rename from quantum/plugins/services/agent_loadbalancer/agent/__init__.py rename to quantum/plugins/services/agent_loadbalancer/drivers/haproxy/agent.py index 3632729c0..3aee11b73 100644 --- a/quantum/plugins/services/agent_loadbalancer/agent/__init__.py +++ b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/agent.py @@ -21,11 +21,12 @@ from oslo.config import cfg from quantum.agent.common import config from quantum.agent.linux import interface -from quantum.common import topics from quantum.openstack.common.rpc import service as rpc_service from quantum.openstack.common import service -from quantum.plugins.services.agent_loadbalancer.agent import manager - +from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import ( + agent_manager as manager, + plugin_driver +) OPTS = [ cfg.IntOpt( @@ -61,7 +62,7 @@ def main(): mgr = manager.LbaasAgentManager(cfg.CONF) svc = LbaasAgentService( host=cfg.CONF.host, - topic=topics.LOADBALANCER_AGENT, + topic=plugin_driver.TOPIC_LOADBALANCER_AGENT, manager=mgr ) service.launch(svc).wait() diff --git a/quantum/plugins/services/agent_loadbalancer/agent/api.py b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/agent_api.py similarity index 100% rename from quantum/plugins/services/agent_loadbalancer/agent/api.py rename to quantum/plugins/services/agent_loadbalancer/drivers/haproxy/agent_api.py diff --git a/quantum/plugins/services/agent_loadbalancer/agent/manager.py b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/agent_manager.py similarity index 97% rename from quantum/plugins/services/agent_loadbalancer/agent/manager.py rename to quantum/plugins/services/agent_loadbalancer/drivers/haproxy/agent_manager.py index d84bdfc62..ee35e4e19 100644 --- a/quantum/plugins/services/agent_loadbalancer/agent/manager.py +++ b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/agent_manager.py @@ -21,12 +21,14 @@ import weakref from oslo.config import cfg from quantum.agent.common import config -from quantum.common import topics from quantum import context from quantum.openstack.common import importutils from quantum.openstack.common import log as logging from quantum.openstack.common import periodic_task -from quantum.plugins.services.agent_loadbalancer.agent import api +from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import ( + agent_api, + plugin_driver +) LOG = logging.getLogger(__name__) NS_PREFIX = 'qlbaas-' @@ -128,8 +130,8 @@ class LbaasAgentManager(periodic_task.PeriodicTasks): msg = _('Error importing loadbalancer device driver: %s') raise SystemExit(msg % conf.device_driver) ctx = context.get_admin_context_without_session() - self.plugin_rpc = api.LbaasAgentApi( - topics.LOADBALANCER_PLUGIN, + self.plugin_rpc = agent_api.LbaasAgentApi( + plugin_driver.TOPIC_PROCESS_ON_HOST, ctx, conf.host ) diff --git a/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/plugin_driver.py b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/plugin_driver.py new file mode 100644 index 000000000..9dd04686c --- /dev/null +++ b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/plugin_driver.py @@ -0,0 +1,300 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 New Dream Network, LLC (DreamHost) +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Mark McClain, DreamHost + +import uuid + +from oslo.config import cfg + +from quantum.common import exceptions as q_exc +from quantum.common import rpc as q_rpc +from quantum.db.loadbalancer import loadbalancer_db +from quantum.openstack.common import log as logging +from quantum.openstack.common import rpc +from quantum.openstack.common.rpc import proxy +from quantum.plugins.common import constants +from quantum.plugins.services.agent_loadbalancer.drivers import ( + abstract_driver +) + +LOG = logging.getLogger(__name__) + +ACTIVE_PENDING = ( + constants.ACTIVE, + constants.PENDING_CREATE, + constants.PENDING_UPDATE +) + +# topic name for this particular agent implementation +TOPIC_PROCESS_ON_HOST = 'q-lbaas-process-on-host' +TOPIC_LOADBALANCER_AGENT = 'lbaas_process_on_host_agent' + + +class LoadBalancerCallbacks(object): + RPC_API_VERSION = '1.0' + + def __init__(self, plugin): + self.plugin = plugin + + def create_rpc_dispatcher(self): + return q_rpc.PluginRpcDispatcher([self]) + + def get_ready_devices(self, context, host=None): + with context.session.begin(subtransactions=True): + qry = (context.session.query(loadbalancer_db.Pool.id). + join(loadbalancer_db.Vip)) + + qry = qry.filter(loadbalancer_db.Vip.status.in_(ACTIVE_PENDING)) + qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING)) + up = True # makes pep8 and sqlalchemy happy + qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up) + qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up) + return [id for id, in qry] + + def get_logical_device(self, context, pool_id=None, activate=True, + **kwargs): + with context.session.begin(subtransactions=True): + qry = context.session.query(loadbalancer_db.Pool) + qry = qry.filter_by(id=pool_id) + pool = qry.one() + + if activate: + # set all resources to active + if pool.status in ACTIVE_PENDING: + pool.status = constants.ACTIVE + + if pool.vip.status in ACTIVE_PENDING: + pool.vip.status = constants.ACTIVE + + for m in pool.members: + if m.status in ACTIVE_PENDING: + m.status = constants.ACTIVE + + for hm in pool.monitors: + if hm.healthmonitor.status in ACTIVE_PENDING: + hm.healthmonitor.status = constants.ACTIVE + + if (pool.status != constants.ACTIVE + or pool.vip.status != constants.ACTIVE): + raise q_exc.Invalid(_('Expected active pool and vip')) + + retval = {} + retval['pool'] = self.plugin._make_pool_dict(pool) + retval['vip'] = self.plugin._make_vip_dict(pool.vip) + retval['vip']['port'] = ( + self.plugin._core_plugin._make_port_dict(pool.vip.port) + ) + for fixed_ip in retval['vip']['port']['fixed_ips']: + fixed_ip['subnet'] = ( + self.plugin._core_plugin.get_subnet( + context, + fixed_ip['subnet_id'] + ) + ) + retval['members'] = [ + self.plugin._make_member_dict(m) + for m in pool.members if m.status == constants.ACTIVE + ] + retval['healthmonitors'] = [ + self.plugin._make_health_monitor_dict(hm.healthmonitor) + for hm in pool.monitors + if hm.healthmonitor.status == constants.ACTIVE + ] + + return retval + + def pool_destroyed(self, context, pool_id=None, host=None): + """Agent confirmation hook that a pool has been destroyed. + + This method exists for subclasses to change the deletion + behavior. + """ + pass + + def plug_vip_port(self, context, port_id=None, host=None): + if not port_id: + return + + try: + port = self.plugin._core_plugin.get_port( + context, + port_id + ) + except q_exc.PortNotFound: + msg = _('Unable to find port %s to plug.') + LOG.debug(msg, port_id) + return + + port['admin_state_up'] = True + port['device_owner'] = 'quantum:' + constants.LOADBALANCER + port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host))) + + self.plugin._core_plugin.update_port( + context, + port_id, + {'port': port} + ) + + def unplug_vip_port(self, context, port_id=None, host=None): + if not port_id: + return + + try: + port = self.plugin._core_plugin.get_port( + context, + port_id + ) + except q_exc.PortNotFound: + msg = _('Unable to find port %s to unplug. This can occur when ' + 'the Vip has been deleted first.') + LOG.debug(msg, port_id) + return + + port['admin_state_up'] = False + port['device_owner'] = '' + port['device_id'] = '' + + try: + self.plugin._core_plugin.update_port( + context, + port_id, + {'port': port} + ) + + except q_exc.PortNotFound: + msg = _('Unable to find port %s to unplug. This can occur when ' + 'the Vip has been deleted first.') + LOG.debug(msg, port_id) + + def update_pool_stats(self, context, pool_id=None, stats=None, host=None): + # TODO(markmcclain): add stats collection + pass + + +class LoadBalancerAgentApi(proxy.RpcProxy): + """Plugin side of plugin to agent RPC API.""" + + API_VERSION = '1.0' + + def __init__(self, topic, host): + super(LoadBalancerAgentApi, self).__init__(topic, self.API_VERSION) + self.host = host + + def reload_pool(self, context, pool_id): + return self.cast( + context, + self.make_msg('reload_pool', pool_id=pool_id, host=self.host), + topic=self.topic + ) + + def destroy_pool(self, context, pool_id): + return self.cast( + context, + self.make_msg('destroy_pool', pool_id=pool_id, host=self.host), + topic=self.topic + ) + + def modify_pool(self, context, pool_id): + return self.cast( + context, + self.make_msg('modify_pool', pool_id=pool_id, host=self.host), + topic=self.topic + ) + + +class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver): + def __init__(self, plugin): + self.agent_rpc = LoadBalancerAgentApi( + TOPIC_LOADBALANCER_AGENT, + cfg.CONF.host + ) + self.callbacks = LoadBalancerCallbacks(plugin) + + self.conn = rpc.create_connection(new=True) + self.conn.create_consumer( + TOPIC_PROCESS_ON_HOST, + self.callbacks.create_rpc_dispatcher(), + fanout=False) + self.conn.consume_in_thread() + self.plugin = plugin + + def create_vip(self, context, vip): + self.agent_rpc.reload_pool(context, vip['pool_id']) + + def update_vip(self, context, old_vip, vip): + if vip['status'] in ACTIVE_PENDING: + self.agent_rpc.reload_pool(context, vip['pool_id']) + else: + self.agent_rpc.destroy_pool(context, vip['pool_id']) + + def delete_vip(self, context, vip): + self.plugin._delete_db_vip(context, vip['id']) + self.agent_rpc.destroy_pool(context, vip['pool_id']) + + def create_pool(self, context, pool): + # don't notify here because a pool needs a vip to be useful + pass + + def update_pool(self, context, old_pool, pool): + if pool['status'] in ACTIVE_PENDING: + if pool['vip_id'] is not None: + self.agent_rpc.reload_pool(context, pool['id']) + else: + self.agent_rpc.destroy_pool(context, pool['id']) + + def delete_pool(self, context, pool): + self.plugin._delete_db_pool(context, pool['id']) + self.agent_rpc.destroy_pool(context, pool['id']) + + def create_member(self, context, member): + self.agent_rpc.modify_pool(context, member['pool_id']) + + def update_member(self, context, old_member, member): + # member may change pool id + if member['pool_id'] != old_member['pool_id']: + self.agent_rpc.modify_pool(context, old_member['pool_id']) + self.agent_rpc.modify_pool(context, member['pool_id']) + + def delete_member(self, context, member): + self.plugin._delete_db_member(context, member['id']) + self.agent_rpc.modify_pool(context, member['pool_id']) + + def update_health_monitor(self, context, healthmon, pool_id): + # healthmon is unused here because agent will fetch what is necessary + self.agent_rpc.modify_pool(context, pool_id) + + def delete_health_monitor(self, context, healthmon_id, pool_id): + # healthmon_id is not used in this driver + self.agent_rpc.modify_pool(context, pool_id) + + def create_pool_health_monitor(self, context, healthmon, pool_id): + # healthmon is not used here + self.agent_rpc.modify_pool(context, pool_id) + + def delete_pool_health_monitor(self, context, health_monitor, pool_id): + self.plugin._delete_db_pool_health_monitor( + context, health_monitor['id'], pool_id + ) + + # healthmon_id is not used here + self.agent_rpc.modify_pool(context, pool_id) + + def create_health_monitor(self, context, health_monitor): + pass + + def stats(self, context, pool_id): + pass diff --git a/quantum/plugins/services/agent_loadbalancer/lbaas_plugin.py b/quantum/plugins/services/agent_loadbalancer/lbaas_plugin.py deleted file mode 100644 index 1eb247fa3..000000000 --- a/quantum/plugins/services/agent_loadbalancer/lbaas_plugin.py +++ /dev/null @@ -1,221 +0,0 @@ -# -# Copyright 2013 Radware LTD. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# @author: Avishay Balderman, Radware - -from oslo.config import cfg - -from quantum.db import api as qdbapi -from quantum.db.loadbalancer import loadbalancer_db -from quantum.openstack.common import importutils -from quantum.openstack.common import log as logging -from quantum.plugins.common import constants - -LOG = logging.getLogger(__name__) - -DEFAULT_DRIVER = ("quantum.plugins.services.agent_loadbalancer" - ".drivers.noop" - ".noop_driver.NoopLbaaSDriver") - -lbaas_plugin_opts = [ - cfg.StrOpt('driver_fqn', - default=DEFAULT_DRIVER, - help=_('LBaaS driver Fully Qualified Name')) -] - -cfg.CONF.register_opts(lbaas_plugin_opts, "LBAAS") - - -class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb): - - """Implementation of the Quantum Loadbalancer Service Plugin. - - This class manages the workflow of LBaaS request/response. - Most DB related works are implemented in class - loadbalancer_db.LoadBalancerPluginDb. - """ - supported_extension_aliases = ["lbaas"] - - def __init__(self): - """Initialization for the loadbalancer service plugin.""" - - qdbapi.register_models() - self.driver = importutils.import_object( - cfg.CONF.LBAAS.driver_fqn, self) - - def get_plugin_type(self): - return constants.LOADBALANCER - - def get_plugin_description(self): - return "Quantum LoadBalancer Service Plugin" - - def create_vip(self, context, vip): - v = super(LoadBalancerPlugin, self).create_vip(context, vip) - self.driver.create_vip(context, v) - return v - - def update_vip(self, context, id, vip): - if 'status' not in vip['vip']: - vip['vip']['status'] = constants.PENDING_UPDATE - old_vip = self.get_vip(context, id) - v = super(LoadBalancerPlugin, self).update_vip(context, id, vip) - self.driver.update_vip(context, old_vip, v) - return v - - def _delete_db_vip(self, context, id): - super(LoadBalancerPlugin, self).delete_vip(context, id) - - def delete_vip(self, context, id): - self.update_status(context, loadbalancer_db.Vip, - id, constants.PENDING_DELETE) - v = self.get_vip(context, id) - self.driver.delete_vip(context, v) - - def create_pool(self, context, pool): - p = super(LoadBalancerPlugin, self).create_pool(context, pool) - self.driver.create_pool(context, p) - return p - - def update_pool(self, context, id, pool): - if 'status' not in pool['pool']: - pool['pool']['status'] = constants.PENDING_UPDATE - old_pool = self.get_pool(context, id) - p = super(LoadBalancerPlugin, self).update_pool(context, id, pool) - self.driver.update_pool(context, old_pool, p) - return p - - def _delete_db_pool(self, context, id): - super(LoadBalancerPlugin, self).delete_pool(context, id) - - def delete_pool(self, context, id): - self.update_status(context, loadbalancer_db.Pool, - id, constants.PENDING_DELETE) - p = self.get_pool(context, id) - self.driver.delete_pool(context, p) - - def create_member(self, context, member): - m = super(LoadBalancerPlugin, self).create_member(context, member) - self.driver.create_member(context, m) - return m - - def update_member(self, context, id, member): - if 'status' not in member['member']: - member['member']['status'] = constants.PENDING_UPDATE - old_member = self.get_member(context, id) - m = super(LoadBalancerPlugin, self).update_member(context, id, member) - self.driver.update_member(context, old_member, m) - return m - - def _delete_db_member(self, context, id): - super(LoadBalancerPlugin, self).delete_member(context, id) - - def delete_member(self, context, id): - self.update_status(context, loadbalancer_db.Member, - id, constants.PENDING_DELETE) - m = self.get_member(context, id) - self.driver.delete_member(context, m) - - def create_health_monitor(self, context, health_monitor): - hm = super(LoadBalancerPlugin, self).create_health_monitor( - context, - health_monitor - ) - self.driver.create_health_monitor(context, hm) - return hm - - def update_health_monitor(self, context, id, health_monitor): - if 'status' not in health_monitor['health_monitor']: - health_monitor['health_monitor']['status'] = ( - constants.PENDING_UPDATE - ) - old_hm = self.get_health_monitor(context, id) - hm = super(LoadBalancerPlugin, self).update_health_monitor( - context, - id, - health_monitor - ) - - with context.session.begin(subtransactions=True): - qry = context.session.query( - loadbalancer_db.PoolMonitorAssociation - ) - qry = qry.filter_by(monitor_id=hm['id']) - assocs = qry.all() - for assoc in assocs: - self.driver.update_health_monitor(context, old_hm, hm, assoc) - return hm - - def _delete_db_pool_health_monitor(self, context, hm_id, pool_id): - super(LoadBalancerPlugin, self).delete_pool_health_monitor(context, - hm_id, - pool_id) - - def delete_health_monitor(self, context, id): - with context.session.begin(subtransactions=True): - qry = context.session.query( - loadbalancer_db.PoolMonitorAssociation - ) - qry = qry.filter_by(monitor_id=id) - assocs = qry.all() - hm = self.get_health_monitor(context, id) - for assoc in assocs: - self.driver.delete_pool_health_monitor(context, - hm, - assoc['pool_id']) - - def create_pool_health_monitor(self, context, health_monitor, pool_id): - retval = super(LoadBalancerPlugin, self).create_pool_health_monitor( - context, - health_monitor, - pool_id - ) - # open issue: PoolMonitorAssociation has no status field - # so we cant set the status to pending and let the driver - # set the real status of the association - self.driver.create_pool_health_monitor( - context, health_monitor, pool_id) - return retval - - def delete_pool_health_monitor(self, context, id, pool_id): - hm = self.get_health_monitor(context, id) - self.driver.delete_pool_health_monitor( - context, hm, pool_id) - - def stats(self, context, pool_id): - stats_data = self.driver.stats(context, pool_id) - # if we get something from the driver - - # update the db and return the value from db - # else - return what we have in db - if stats_data: - super(LoadBalancerPlugin, self)._update_pool_stats( - context, - pool_id, - stats_data - ) - return super(LoadBalancerPlugin, self).stats(context, - pool_id) - - def populate_vip_graph(self, context, vip): - """Populate the vip with: pool, members, healthmonitors.""" - - pool = self.get_pool(context, vip['pool_id']) - vip['pool'] = pool - vip['members'] = [ - self.get_member(context, member_id) - for member_id in pool['members']] - vip['health_monitors'] = [ - self.get_health_monitor(context, hm_id) - for hm_id in pool['health_monitors']] - return vip diff --git a/quantum/plugins/services/agent_loadbalancer/plugin.py b/quantum/plugins/services/agent_loadbalancer/plugin.py index ba6112178..edaf0a0dc 100644 --- a/quantum/plugins/services/agent_loadbalancer/plugin.py +++ b/quantum/plugins/services/agent_loadbalancer/plugin.py @@ -1,7 +1,5 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2013 OpenStack Foundation. -# All Rights Reserved. +# +# Copyright 2013 Radware LTD. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -14,200 +12,30 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - -import uuid +# +# @author: Avishay Balderman, Radware from oslo.config import cfg -from quantum.common import exceptions as q_exc -from quantum.common import rpc as q_rpc -from quantum.common import topics from quantum.db import api as qdbapi from quantum.db.loadbalancer import loadbalancer_db +from quantum.openstack.common import importutils from quantum.openstack.common import log as logging -from quantum.openstack.common import rpc -from quantum.openstack.common.rpc import proxy from quantum.plugins.common import constants LOG = logging.getLogger(__name__) -ACTIVE_PENDING = ( - constants.ACTIVE, - constants.PENDING_CREATE, - constants.PENDING_UPDATE -) - - -class LoadBalancerCallbacks(object): - RPC_API_VERSION = '1.0' - - def __init__(self, plugin): - self.plugin = plugin - - def create_rpc_dispatcher(self): - return q_rpc.PluginRpcDispatcher([self]) - - def get_ready_devices(self, context, host=None): - with context.session.begin(subtransactions=True): - qry = (context.session.query(loadbalancer_db.Pool.id). - join(loadbalancer_db.Vip)) - - qry = qry.filter(loadbalancer_db.Vip.status.in_(ACTIVE_PENDING)) - qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING)) - up = True # makes pep8 and sqlalchemy happy - qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up) - qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up) - return [id for id, in qry] - - def get_logical_device(self, context, pool_id=None, activate=True, - **kwargs): - with context.session.begin(subtransactions=True): - qry = context.session.query(loadbalancer_db.Pool) - qry = qry.filter_by(id=pool_id) - pool = qry.one() - - if activate: - # set all resources to active - if pool.status in ACTIVE_PENDING: - pool.status = constants.ACTIVE - - if pool.vip.status in ACTIVE_PENDING: - pool.vip.status = constants.ACTIVE - - for m in pool.members: - if m.status in ACTIVE_PENDING: - m.status = constants.ACTIVE - - for hm in pool.monitors: - if hm.healthmonitor.status in ACTIVE_PENDING: - hm.healthmonitor.status = constants.ACTIVE - - if (pool.status != constants.ACTIVE - or pool.vip.status != constants.ACTIVE): - raise q_exc.Invalid(_('Expected active pool and vip')) - - retval = {} - retval['pool'] = self.plugin._make_pool_dict(pool) - retval['vip'] = self.plugin._make_vip_dict(pool.vip) - retval['vip']['port'] = ( - self.plugin._core_plugin._make_port_dict(pool.vip.port) - ) - for fixed_ip in retval['vip']['port']['fixed_ips']: - fixed_ip['subnet'] = ( - self.plugin._core_plugin.get_subnet( - context, - fixed_ip['subnet_id'] - ) - ) - retval['members'] = [ - self.plugin._make_member_dict(m) - for m in pool.members if m.status == constants.ACTIVE - ] - retval['healthmonitors'] = [ - self.plugin._make_health_monitor_dict(hm.healthmonitor) - for hm in pool.monitors - if hm.healthmonitor.status == constants.ACTIVE - ] - - return retval - - def pool_destroyed(self, context, pool_id=None, host=None): - """Agent confirmation hook that a pool has been destroyed. - - This method exists for subclasses to change the deletion - behavior. - """ - pass - - def plug_vip_port(self, context, port_id=None, host=None): - if not port_id: - return - - try: - port = self.plugin._core_plugin.get_port( - context, - port_id - ) - except q_exc.PortNotFound: - msg = _('Unable to find port %s to plug.') - LOG.debug(msg, port_id) - return - - port['admin_state_up'] = True - port['device_owner'] = 'quantum:' + constants.LOADBALANCER - port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host))) - - self.plugin._core_plugin.update_port( - context, - port_id, - {'port': port} - ) - - def unplug_vip_port(self, context, port_id=None, host=None): - if not port_id: - return - - try: - port = self.plugin._core_plugin.get_port( - context, - port_id - ) - except q_exc.PortNotFound: - msg = _('Unable to find port %s to unplug. This can occur when ' - 'the Vip has been deleted first.') - LOG.debug(msg, port_id) - return - - port['admin_state_up'] = False - port['device_owner'] = '' - port['device_id'] = '' - - try: - self.plugin._core_plugin.update_port( - context, - port_id, - {'port': port} - ) - - except q_exc.PortNotFound: - msg = _('Unable to find port %s to unplug. This can occur when ' - 'the Vip has been deleted first.') - LOG.debug(msg, port_id) - - def update_pool_stats(self, context, pool_id=None, stats=None, host=None): - # TODO(markmcclain): add stats collection - pass - +DEFAULT_DRIVER = ("quantum.plugins.services.agent_loadbalancer" + ".drivers.haproxy" + ".plugin_driver.HaproxyOnHostPluginDriver") -class LoadBalancerAgentApi(proxy.RpcProxy): - """Plugin side of plugin to agent RPC API.""" +lbaas_plugin_opts = [ + cfg.StrOpt('driver_fqn', + default=DEFAULT_DRIVER, + help=_('LBaaS driver Fully Qualified Name')) +] - API_VERSION = '1.0' - - def __init__(self, topic, host): - super(LoadBalancerAgentApi, self).__init__(topic, self.API_VERSION) - self.host = host - - def reload_pool(self, context, pool_id): - return self.cast( - context, - self.make_msg('reload_pool', pool_id=pool_id, host=self.host), - topic=self.topic - ) - - def destroy_pool(self, context, pool_id): - return self.cast( - context, - self.make_msg('destroy_pool', pool_id=pool_id, host=self.host), - topic=self.topic - ) - - def modify_pool(self, context, pool_id): - return self.cast( - context, - self.make_msg('modify_pool', pool_id=pool_id, host=self.host), - topic=self.topic - ) +cfg.CONF.register_opts(lbaas_plugin_opts, "LBAAS") class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb): @@ -221,22 +49,22 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb): supported_extension_aliases = ["lbaas"] def __init__(self): - """Do the initialization for the loadbalancer service plugin here.""" - qdbapi.register_models() + """Initialization for the loadbalancer service plugin.""" - self.callbacks = LoadBalancerCallbacks(self) + qdbapi.register_models() + self._load_drivers() - self.conn = rpc.create_connection(new=True) - self.conn.create_consumer( - topics.LOADBALANCER_PLUGIN, - self.callbacks.create_rpc_dispatcher(), - fanout=False) - self.conn.consume_in_thread() + def _load_drivers(self): + """Loads plugin-driver from configuration. - self.agent_rpc = LoadBalancerAgentApi( - topics.LOADBALANCER_AGENT, - cfg.CONF.host - ) + That method will later leverage service type framework + """ + try: + self.driver = importutils.import_object( + cfg.CONF.LBAAS.driver_fqn, self) + except ImportError: + LOG.exception(_("Error loading LBaaS driver %s"), + cfg.CONF.LBAAS.driver_fqn) def get_plugin_type(self): return constants.LOADBALANCER @@ -245,68 +73,89 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb): return "Quantum LoadBalancer Service Plugin" def create_vip(self, context, vip): - vip['vip']['status'] = constants.PENDING_CREATE v = super(LoadBalancerPlugin, self).create_vip(context, vip) - self.agent_rpc.reload_pool(context, v['pool_id']) + self.driver.create_vip(context, v) return v def update_vip(self, context, id, vip): if 'status' not in vip['vip']: vip['vip']['status'] = constants.PENDING_UPDATE + old_vip = self.get_vip(context, id) v = super(LoadBalancerPlugin, self).update_vip(context, id, vip) - if v['status'] in ACTIVE_PENDING: - self.agent_rpc.reload_pool(context, v['pool_id']) - else: - self.agent_rpc.destroy_pool(context, v['pool_id']) + self.driver.update_vip(context, old_vip, v) return v - def delete_vip(self, context, id): - vip = self.get_vip(context, id) + def _delete_db_vip(self, context, id): + # proxy the call until plugin inherits from DBPlugin super(LoadBalancerPlugin, self).delete_vip(context, id) - self.agent_rpc.destroy_pool(context, vip['pool_id']) + + def delete_vip(self, context, id): + self.update_status(context, loadbalancer_db.Vip, + id, constants.PENDING_DELETE) + v = self.get_vip(context, id) + self.driver.delete_vip(context, v) def create_pool(self, context, pool): p = super(LoadBalancerPlugin, self).create_pool(context, pool) - # don't notify here because a pool needs a vip to be useful + self.driver.create_pool(context, p) return p def update_pool(self, context, id, pool): if 'status' not in pool['pool']: pool['pool']['status'] = constants.PENDING_UPDATE + old_pool = self.get_pool(context, id) p = super(LoadBalancerPlugin, self).update_pool(context, id, pool) - if p['status'] in ACTIVE_PENDING: - if p['vip_id'] is not None: - self.agent_rpc.reload_pool(context, p['id']) - else: - self.agent_rpc.destroy_pool(context, p['id']) + self.driver.update_pool(context, old_pool, p) return p - def delete_pool(self, context, id): + def _delete_db_pool(self, context, id): + # proxy the call until plugin inherits from DBPlugin super(LoadBalancerPlugin, self).delete_pool(context, id) - self.agent_rpc.destroy_pool(context, id) + + def delete_pool(self, context, id): + self.update_status(context, loadbalancer_db.Pool, + id, constants.PENDING_DELETE) + p = self.get_pool(context, id) + self.driver.delete_pool(context, p) def create_member(self, context, member): m = super(LoadBalancerPlugin, self).create_member(context, member) - self.agent_rpc.modify_pool(context, m['pool_id']) + self.driver.create_member(context, m) return m def update_member(self, context, id, member): if 'status' not in member['member']: member['member']['status'] = constants.PENDING_UPDATE + old_member = self.get_member(context, id) m = super(LoadBalancerPlugin, self).update_member(context, id, member) - self.agent_rpc.modify_pool(context, m['pool_id']) + self.driver.update_member(context, old_member, m) return m + def _delete_db_member(self, context, id): + # proxy the call until plugin inherits from DBPlugin + super(LoadBalancerPlugin, self).delete_member(context, id) + def delete_member(self, context, id): + self.update_status(context, loadbalancer_db.Member, + id, constants.PENDING_DELETE) m = self.get_member(context, id) - super(LoadBalancerPlugin, self).delete_member(context, id) - self.agent_rpc.modify_pool(context, m['pool_id']) + self.driver.delete_member(context, m) + + def create_health_monitor(self, context, health_monitor): + # no PENDING_CREATE status sinse healthmon is shared DB object + hm = super(LoadBalancerPlugin, self).create_health_monitor( + context, + health_monitor + ) + self.driver.create_health_monitor(context, hm) + return hm def update_health_monitor(self, context, id, health_monitor): if 'status' not in health_monitor['health_monitor']: health_monitor['health_monitor']['status'] = ( constants.PENDING_UPDATE ) + old_hm = self.get_health_monitor(context, id) hm = super(LoadBalancerPlugin, self).update_health_monitor( context, id, @@ -316,24 +165,26 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb): with context.session.begin(subtransactions=True): qry = context.session.query( loadbalancer_db.PoolMonitorAssociation - ) - qry = qry.filter_by(monitor_id=hm['id']) - + ).filter_by(monitor_id=hm['id']) for assoc in qry: - self.agent_rpc.modify_pool(context, assoc['pool_id']) + self.driver.update_health_monitor(context, old_hm, hm, assoc) return hm + def _delete_db_pool_health_monitor(self, context, hm_id, pool_id): + super(LoadBalancerPlugin, self).delete_pool_health_monitor(context, + hm_id, + pool_id) + def delete_health_monitor(self, context, id): with context.session.begin(subtransactions=True): + hm = self.get_health_monitor(context, id) qry = context.session.query( loadbalancer_db.PoolMonitorAssociation - ) - qry = qry.filter_by(monitor_id=id) - - pool_ids = [a['pool_id'] for a in qry] - super(LoadBalancerPlugin, self).delete_health_monitor(context, id) - for pid in pool_ids: - self.agent_rpc.modify_pool(context, pid) + ).filter_by(monitor_id=id) + for assoc in qry: + self.driver.delete_pool_health_monitor(context, + hm, + assoc['pool_id']) def create_pool_health_monitor(self, context, health_monitor, pool_id): retval = super(LoadBalancerPlugin, self).create_pool_health_monitor( @@ -341,16 +192,41 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb): health_monitor, pool_id ) - self.agent_rpc.modify_pool(context, pool_id) - + # open issue: PoolMonitorAssociation has no status field + # so we cant set the status to pending and let the driver + # set the real status of the association + self.driver.create_pool_health_monitor( + context, health_monitor, pool_id) return retval def delete_pool_health_monitor(self, context, id, pool_id): - retval = super(LoadBalancerPlugin, self).delete_pool_health_monitor( - context, - id, - pool_id - ) - self.agent_rpc.modify_pool(context, pool_id) - - return retval + hm = self.get_health_monitor(context, id) + self.driver.delete_pool_health_monitor( + context, hm, pool_id) + + def stats(self, context, pool_id): + stats_data = self.driver.stats(context, pool_id) + # if we get something from the driver - + # update the db and return the value from db + # else - return what we have in db + if stats_data: + super(LoadBalancerPlugin, self)._update_pool_stats( + context, + pool_id, + stats_data + ) + return super(LoadBalancerPlugin, self).stats(context, + pool_id) + + def populate_vip_graph(self, context, vip): + """Populate the vip with: pool, members, healthmonitors.""" + + pool = self.get_pool(context, vip['pool_id']) + vip['pool'] = pool + vip['members'] = [ + self.get_member(context, member_id) + for member_id in pool['members']] + vip['health_monitors'] = [ + self.get_health_monitor(context, hm_id) + for hm_id in pool['health_monitors']] + return vip diff --git a/quantum/tests/unit/db/loadbalancer/test_db_loadbalancer.py b/quantum/tests/unit/db/loadbalancer/test_db_loadbalancer.py index 02a8f3c49..f8b360b3f 100644 --- a/quantum/tests/unit/db/loadbalancer/test_db_loadbalancer.py +++ b/quantum/tests/unit/db/loadbalancer/test_db_loadbalancer.py @@ -39,7 +39,7 @@ LOG = logging.getLogger(__name__) DB_CORE_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2' DB_LB_PLUGIN_KLASS = ( "quantum.plugins.services.agent_loadbalancer." - "lbaas_plugin.LoadBalancerPlugin" + "plugin.LoadBalancerPlugin" ) ROOTDIR = os.path.dirname(__file__) + '../../../..' ETCDIR = os.path.join(ROOTDIR, 'etc') diff --git a/quantum/tests/unit/services/agent_loadbalancer/agent/__init__.py b/quantum/tests/unit/services/agent_loadbalancer/agent/__init__.py deleted file mode 100644 index ce18bf6d6..000000000 --- a/quantum/tests/unit/services/agent_loadbalancer/agent/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -# -# Copyright 2013 New Dream Network, LLC (DreamHost) -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# @author: Mark McClain, DreamHost diff --git a/quantum/tests/unit/services/agent_loadbalancer/agent/test_init.py b/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_agent.py similarity index 96% rename from quantum/tests/unit/services/agent_loadbalancer/agent/test_init.py rename to quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_agent.py index e09f0278f..ad136d5b6 100644 --- a/quantum/tests/unit/services/agent_loadbalancer/agent/test_init.py +++ b/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_agent.py @@ -20,7 +20,7 @@ import contextlib import mock from oslo.config import cfg -from quantum.plugins.services.agent_loadbalancer import agent +from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import agent from quantum.tests import base diff --git a/quantum/tests/unit/services/agent_loadbalancer/agent/test_manager.py b/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_agent_manager.py similarity index 98% rename from quantum/tests/unit/services/agent_loadbalancer/agent/test_manager.py rename to quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_agent_manager.py index e284a3126..ed5d9ecbf 100644 --- a/quantum/tests/unit/services/agent_loadbalancer/agent/test_manager.py +++ b/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_agent_manager.py @@ -20,7 +20,9 @@ import contextlib import mock -from quantum.plugins.services.agent_loadbalancer.agent import manager +from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import ( + agent_manager as manager +) from quantum.tests import base @@ -145,8 +147,8 @@ class TestManager(base.BaseTestCase): self.mock_importer = mock.patch.object(manager, 'importutils').start() rpc_mock_cls = mock.patch( - 'quantum.plugins.services.agent_loadbalancer.agent.api' - '.LbaasAgentApi' + 'quantum.plugins.services.agent_loadbalancer.drivers' + '.haproxy.agent_api.LbaasAgentApi' ).start() self.mgr = manager.LbaasAgentManager(mock_conf) diff --git a/quantum/tests/unit/services/agent_loadbalancer/agent/test_api.py b/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_api.py similarity index 97% rename from quantum/tests/unit/services/agent_loadbalancer/agent/test_api.py rename to quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_api.py index 9b36cd4f7..70521439f 100644 --- a/quantum/tests/unit/services/agent_loadbalancer/agent/test_api.py +++ b/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_api.py @@ -18,7 +18,9 @@ import mock -from quantum.plugins.services.agent_loadbalancer.agent import api +from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import ( + agent_api as api +) from quantum.tests import base diff --git a/quantum/tests/unit/services/agent_loadbalancer/test_plugin.py b/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_plugin_driver.py similarity index 77% rename from quantum/tests/unit/services/agent_loadbalancer/test_plugin.py rename to quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_plugin_driver.py index 77b295bf8..217da560e 100644 --- a/quantum/tests/unit/services/agent_loadbalancer/test_plugin.py +++ b/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_plugin_driver.py @@ -1,7 +1,6 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2013 OpenStack Foundation. -# All Rights Reserved. +# +# Copyright 2013 New Dream Network, LLC (DreamHost) # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -23,10 +22,11 @@ from quantum.common import exceptions from quantum import context from quantum.db.loadbalancer import loadbalancer_db as ldb from quantum import manager -from quantum.openstack.common import importutils from quantum.openstack.common import uuidutils from quantum.plugins.common import constants -from quantum.plugins.services.agent_loadbalancer import plugin +from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import ( + plugin_driver +) from quantum.tests import base from quantum.tests.unit.db.loadbalancer import test_db_loadbalancer @@ -42,25 +42,18 @@ class TestLoadBalancerPluginBase( # we need access to loaded plugins to modify models loaded_plugins = manager.QuantumManager().get_service_plugins() - # TODO(avishayb) - below is a little hack that helps the - # test to pass :-) - # the problem is the code below assumes the existance of 'callbacks' - # on the plugin. So the bypass is to load the plugin that has - # the callbacks as a member.The hack will be removed once we will - # have one lbaas plugin. (we currently have 2 - (Grizzly and Havana)) - hack = True - if hack: - HACK_KLASS = ( - "quantum.plugins.services.agent_loadbalancer." - "plugin.LoadBalancerPlugin" - ) - self.plugin_instance = importutils.import_object(HACK_KLASS) - else: - self.plugin_instance = loaded_plugins[constants.LOADBALANCER] - self.callbacks = self.plugin_instance.callbacks + + self.plugin_instance = loaded_plugins[constants.LOADBALANCER] class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase): + def setUp(self): + super(TestLoadBalancerCallbacks, self).setUp() + + self.callbacks = plugin_driver.LoadBalancerCallbacks( + self.plugin_instance + ) + def test_get_ready_devices(self): with self.vip() as vip: ready = self.callbacks.get_ready_devices( @@ -242,7 +235,7 @@ class TestLoadBalancerAgentApi(base.BaseTestCase): super(TestLoadBalancerAgentApi, self).setUp() self.addCleanup(mock.patch.stopall) - self.api = plugin.LoadBalancerAgentApi('topic', 'host') + self.api = plugin_driver.LoadBalancerAgentApi('topic', 'host') self.mock_cast = mock.patch.object(self.api, 'cast').start() self.mock_msg = mock.patch.object(self.api, 'make_msg').start() @@ -273,3 +266,58 @@ class TestLoadBalancerAgentApi(base.BaseTestCase): def test_modify_pool(self): self._call_test_helper('modify_pool') + + +class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase): + def setUp(self): + self.log = mock.patch.object(plugin_driver, 'LOG') + api_cls = mock.patch.object(plugin_driver, + 'LoadBalancerAgentApi').start() + super(TestLoadBalancerPluginNotificationWrapper, self).setUp() + self.mock_api = api_cls.return_value + + self.addCleanup(mock.patch.stopall) + + def test_create_vip(self): + with self.subnet() as subnet: + with self.pool(subnet=subnet) as pool: + with self.vip(pool=pool, subnet=subnet) as vip: + self.mock_api.reload_pool.assert_called_once_with( + mock.ANY, + vip['vip']['pool_id'] + ) + + def test_update_vip(self): + with self.subnet() as subnet: + with self.pool(subnet=subnet) as pool: + with self.vip(pool=pool, subnet=subnet) as vip: + self.mock_api.reset_mock() + ctx = context.get_admin_context() + vip['vip'].pop('status') + new_vip = self.plugin_instance.update_vip( + ctx, + vip['vip']['id'], + vip + ) + + self.mock_api.reload_pool.assert_called_once_with( + mock.ANY, + vip['vip']['pool_id'] + ) + + self.assertEqual( + new_vip['status'], + constants.PENDING_UPDATE + ) + + def test_delete_vip(self): + with self.subnet() as subnet: + with self.pool(subnet=subnet) as pool: + with self.vip(pool=pool, subnet=subnet, no_delete=True) as vip: + self.mock_api.reset_mock() + ctx = context.get_admin_context() + self.plugin_instance.delete_vip(ctx, vip['vip']['id']) + self.mock_api.destroy_pool.assert_called_once_with( + mock.ANY, + vip['vip']['pool_id'] + )