]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Make reference lbaas implementation as a pluggable driver
authorEugene Nikanorov <enikanorov@mirantis.com>
Sun, 5 May 2013 02:34:44 +0000 (06:34 +0400)
committerEugene Nikanorov <enikanorov@mirantis.com>
Thu, 13 Jun 2013 05:00:34 +0000 (09:00 +0400)
implements blueprint multi-vendor-support-for-lbaas-step1

This patch implements the following changes:
 * merge lbaas_plugin.py and plugin.py into 'plugin.py'
   After that the default 'reference' implementation is available again.
 * move all code related to reference implementation from plugin.py to
   drivers/haproxy/plugin_driver.py
 * Inherit HaproxyOnHostPluginDriver from abstract driver and implement
   its interface.
 * modify tests accordingly

Change-Id: Ib4bfe286826acdedeadbeeff4713448c073378d2

15 files changed:
bin/quantum-lbaas-agent
quantum/common/topics.py
quantum/db/loadbalancer/loadbalancer_db.py
quantum/plugins/services/agent_loadbalancer/drivers/haproxy/agent.py [moved from quantum/plugins/services/agent_loadbalancer/agent/__init__.py with 91% similarity]
quantum/plugins/services/agent_loadbalancer/drivers/haproxy/agent_api.py [moved from quantum/plugins/services/agent_loadbalancer/agent/api.py with 100% similarity]
quantum/plugins/services/agent_loadbalancer/drivers/haproxy/agent_manager.py [moved from quantum/plugins/services/agent_loadbalancer/agent/manager.py with 97% similarity]
quantum/plugins/services/agent_loadbalancer/drivers/haproxy/plugin_driver.py [new file with mode: 0644]
quantum/plugins/services/agent_loadbalancer/lbaas_plugin.py [deleted file]
quantum/plugins/services/agent_loadbalancer/plugin.py
quantum/tests/unit/db/loadbalancer/test_db_loadbalancer.py
quantum/tests/unit/services/agent_loadbalancer/agent/__init__.py [deleted file]
quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_agent.py [moved from quantum/tests/unit/services/agent_loadbalancer/agent/test_init.py with 96% similarity]
quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_agent_manager.py [moved from quantum/tests/unit/services/agent_loadbalancer/agent/test_manager.py with 98% similarity]
quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_api.py [moved from quantum/tests/unit/services/agent_loadbalancer/agent/test_api.py with 97% similarity]
quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_plugin_driver.py [moved from quantum/tests/unit/services/agent_loadbalancer/test_plugin.py with 77% similarity]

index c1b3be43aa6e70abce87929fd3a7fb50396ad852..7c11322c084471bfc51c7d7bc1f5abeb04097ad9 100755 (executable)
@@ -20,7 +20,7 @@ import os
 import sys
 sys.path.insert(0, os.getcwd())
 
-from quantum.plugins.services.agent_loadbalancer.agent import main
+from quantum.plugins.services.agent_loadbalancer.drivers.haproxy.agent import main
 
 
 main()
index 06db338019f662524d208b0decdba42623b2cf0c..a766430c58b80add2f0fdba28cb5deac662f3e57 100644 (file)
@@ -25,11 +25,9 @@ UPDATE = 'update'
 AGENT = 'q-agent-notifier'
 PLUGIN = 'q-plugin'
 DHCP = 'q-dhcp-notifer'
-LOADBALANCER_PLUGIN = 'q-loadbalancer-plugin'
 
 L3_AGENT = 'l3_agent'
 DHCP_AGENT = 'dhcp_agent'
-LOADBALANCER_AGENT = 'loadbalancer_agent'
 
 
 def get_topic_name(prefix, table, operation):
index 332aa96dd4cef319c050ea05c34ac323372bdf85..44062479a55a31f76c62a2a51e94cf5ea5cd9ad9 100644 (file)
@@ -426,7 +426,6 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase,
             context.session.delete(vip)
             if vip.port:  # this is a Quantum port
                 self._core_plugin.delete_port(context, vip.port.id)
-            context.session.flush()
 
     def get_vip(self, context, id, fields=None):
         vip = self._get_resource(context, Vip, id)
similarity index 91%
rename from quantum/plugins/services/agent_loadbalancer/agent/__init__.py
rename to quantum/plugins/services/agent_loadbalancer/drivers/haproxy/agent.py
index 3632729c0b938fdec948bccccc32e2e26cf7a26d..3aee11b73c705269c81bf5724e0d013a8f8528ea 100644 (file)
@@ -21,11 +21,12 @@ from oslo.config import cfg
 
 from quantum.agent.common import config
 from quantum.agent.linux import interface
-from quantum.common import topics
 from quantum.openstack.common.rpc import service as rpc_service
 from quantum.openstack.common import service
-from quantum.plugins.services.agent_loadbalancer.agent import manager
-
+from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
+    agent_manager as manager,
+    plugin_driver
+)
 
 OPTS = [
     cfg.IntOpt(
@@ -61,7 +62,7 @@ def main():
     mgr = manager.LbaasAgentManager(cfg.CONF)
     svc = LbaasAgentService(
         host=cfg.CONF.host,
-        topic=topics.LOADBALANCER_AGENT,
+        topic=plugin_driver.TOPIC_LOADBALANCER_AGENT,
         manager=mgr
     )
     service.launch(svc).wait()
similarity index 97%
rename from quantum/plugins/services/agent_loadbalancer/agent/manager.py
rename to quantum/plugins/services/agent_loadbalancer/drivers/haproxy/agent_manager.py
index d84bdfc62ae27d7c89d3b463e356ceafc27f014e..ee35e4e197550000ec811a23cea22fbf1510eb79 100644 (file)
@@ -21,12 +21,14 @@ import weakref
 from oslo.config import cfg
 
 from quantum.agent.common import config
-from quantum.common import topics
 from quantum import context
 from quantum.openstack.common import importutils
 from quantum.openstack.common import log as logging
 from quantum.openstack.common import periodic_task
-from quantum.plugins.services.agent_loadbalancer.agent import api
+from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
+    agent_api,
+    plugin_driver
+)
 
 LOG = logging.getLogger(__name__)
 NS_PREFIX = 'qlbaas-'
@@ -128,8 +130,8 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
             msg = _('Error importing loadbalancer device driver: %s')
             raise SystemExit(msg % conf.device_driver)
         ctx = context.get_admin_context_without_session()
-        self.plugin_rpc = api.LbaasAgentApi(
-            topics.LOADBALANCER_PLUGIN,
+        self.plugin_rpc = agent_api.LbaasAgentApi(
+            plugin_driver.TOPIC_PROCESS_ON_HOST,
             ctx,
             conf.host
         )
diff --git a/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/plugin_driver.py b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/plugin_driver.py
new file mode 100644 (file)
index 0000000..9dd0468
--- /dev/null
@@ -0,0 +1,300 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Mark McClain, DreamHost
+
+import uuid
+
+from oslo.config import cfg
+
+from quantum.common import exceptions as q_exc
+from quantum.common import rpc as q_rpc
+from quantum.db.loadbalancer import loadbalancer_db
+from quantum.openstack.common import log as logging
+from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import proxy
+from quantum.plugins.common import constants
+from quantum.plugins.services.agent_loadbalancer.drivers import (
+    abstract_driver
+)
+
+LOG = logging.getLogger(__name__)
+
+ACTIVE_PENDING = (
+    constants.ACTIVE,
+    constants.PENDING_CREATE,
+    constants.PENDING_UPDATE
+)
+
+# topic name for this particular agent implementation
+TOPIC_PROCESS_ON_HOST = 'q-lbaas-process-on-host'
+TOPIC_LOADBALANCER_AGENT = 'lbaas_process_on_host_agent'
+
+
+class LoadBalancerCallbacks(object):
+    RPC_API_VERSION = '1.0'
+
+    def __init__(self, plugin):
+        self.plugin = plugin
+
+    def create_rpc_dispatcher(self):
+        return q_rpc.PluginRpcDispatcher([self])
+
+    def get_ready_devices(self, context, host=None):
+        with context.session.begin(subtransactions=True):
+            qry = (context.session.query(loadbalancer_db.Pool.id).
+                   join(loadbalancer_db.Vip))
+
+            qry = qry.filter(loadbalancer_db.Vip.status.in_(ACTIVE_PENDING))
+            qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING))
+            up = True  # makes pep8 and sqlalchemy happy
+            qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up)
+            qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
+            return [id for id, in qry]
+
+    def get_logical_device(self, context, pool_id=None, activate=True,
+                           **kwargs):
+        with context.session.begin(subtransactions=True):
+            qry = context.session.query(loadbalancer_db.Pool)
+            qry = qry.filter_by(id=pool_id)
+            pool = qry.one()
+
+            if activate:
+                # set all resources to active
+                if pool.status in ACTIVE_PENDING:
+                    pool.status = constants.ACTIVE
+
+                if pool.vip.status in ACTIVE_PENDING:
+                    pool.vip.status = constants.ACTIVE
+
+                for m in pool.members:
+                    if m.status in ACTIVE_PENDING:
+                        m.status = constants.ACTIVE
+
+                for hm in pool.monitors:
+                    if hm.healthmonitor.status in ACTIVE_PENDING:
+                        hm.healthmonitor.status = constants.ACTIVE
+
+            if (pool.status != constants.ACTIVE
+                or pool.vip.status != constants.ACTIVE):
+                raise q_exc.Invalid(_('Expected active pool and vip'))
+
+            retval = {}
+            retval['pool'] = self.plugin._make_pool_dict(pool)
+            retval['vip'] = self.plugin._make_vip_dict(pool.vip)
+            retval['vip']['port'] = (
+                self.plugin._core_plugin._make_port_dict(pool.vip.port)
+            )
+            for fixed_ip in retval['vip']['port']['fixed_ips']:
+                fixed_ip['subnet'] = (
+                    self.plugin._core_plugin.get_subnet(
+                        context,
+                        fixed_ip['subnet_id']
+                    )
+                )
+            retval['members'] = [
+                self.plugin._make_member_dict(m)
+                for m in pool.members if m.status == constants.ACTIVE
+            ]
+            retval['healthmonitors'] = [
+                self.plugin._make_health_monitor_dict(hm.healthmonitor)
+                for hm in pool.monitors
+                if hm.healthmonitor.status == constants.ACTIVE
+            ]
+
+            return retval
+
+    def pool_destroyed(self, context, pool_id=None, host=None):
+        """Agent confirmation hook that a pool has been destroyed.
+
+        This method exists for subclasses to change the deletion
+        behavior.
+        """
+        pass
+
+    def plug_vip_port(self, context, port_id=None, host=None):
+        if not port_id:
+            return
+
+        try:
+            port = self.plugin._core_plugin.get_port(
+                context,
+                port_id
+            )
+        except q_exc.PortNotFound:
+            msg = _('Unable to find port %s to plug.')
+            LOG.debug(msg, port_id)
+            return
+
+        port['admin_state_up'] = True
+        port['device_owner'] = 'quantum:' + constants.LOADBALANCER
+        port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host)))
+
+        self.plugin._core_plugin.update_port(
+            context,
+            port_id,
+            {'port': port}
+        )
+
+    def unplug_vip_port(self, context, port_id=None, host=None):
+        if not port_id:
+            return
+
+        try:
+            port = self.plugin._core_plugin.get_port(
+                context,
+                port_id
+            )
+        except q_exc.PortNotFound:
+            msg = _('Unable to find port %s to unplug.  This can occur when '
+                    'the Vip has been deleted first.')
+            LOG.debug(msg, port_id)
+            return
+
+        port['admin_state_up'] = False
+        port['device_owner'] = ''
+        port['device_id'] = ''
+
+        try:
+            self.plugin._core_plugin.update_port(
+                context,
+                port_id,
+                {'port': port}
+            )
+
+        except q_exc.PortNotFound:
+            msg = _('Unable to find port %s to unplug.  This can occur when '
+                    'the Vip has been deleted first.')
+            LOG.debug(msg, port_id)
+
+    def update_pool_stats(self, context, pool_id=None, stats=None, host=None):
+        # TODO(markmcclain): add stats collection
+        pass
+
+
+class LoadBalancerAgentApi(proxy.RpcProxy):
+    """Plugin side of plugin to agent RPC API."""
+
+    API_VERSION = '1.0'
+
+    def __init__(self, topic, host):
+        super(LoadBalancerAgentApi, self).__init__(topic, self.API_VERSION)
+        self.host = host
+
+    def reload_pool(self, context, pool_id):
+        return self.cast(
+            context,
+            self.make_msg('reload_pool', pool_id=pool_id, host=self.host),
+            topic=self.topic
+        )
+
+    def destroy_pool(self, context, pool_id):
+        return self.cast(
+            context,
+            self.make_msg('destroy_pool', pool_id=pool_id, host=self.host),
+            topic=self.topic
+        )
+
+    def modify_pool(self, context, pool_id):
+        return self.cast(
+            context,
+            self.make_msg('modify_pool', pool_id=pool_id, host=self.host),
+            topic=self.topic
+        )
+
+
+class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
+    def __init__(self, plugin):
+        self.agent_rpc = LoadBalancerAgentApi(
+            TOPIC_LOADBALANCER_AGENT,
+            cfg.CONF.host
+        )
+        self.callbacks = LoadBalancerCallbacks(plugin)
+
+        self.conn = rpc.create_connection(new=True)
+        self.conn.create_consumer(
+            TOPIC_PROCESS_ON_HOST,
+            self.callbacks.create_rpc_dispatcher(),
+            fanout=False)
+        self.conn.consume_in_thread()
+        self.plugin = plugin
+
+    def create_vip(self, context, vip):
+        self.agent_rpc.reload_pool(context, vip['pool_id'])
+
+    def update_vip(self, context, old_vip, vip):
+        if vip['status'] in ACTIVE_PENDING:
+            self.agent_rpc.reload_pool(context, vip['pool_id'])
+        else:
+            self.agent_rpc.destroy_pool(context, vip['pool_id'])
+
+    def delete_vip(self, context, vip):
+        self.plugin._delete_db_vip(context, vip['id'])
+        self.agent_rpc.destroy_pool(context, vip['pool_id'])
+
+    def create_pool(self, context, pool):
+        # don't notify here because a pool needs a vip to be useful
+        pass
+
+    def update_pool(self, context, old_pool, pool):
+        if pool['status'] in ACTIVE_PENDING:
+            if pool['vip_id'] is not None:
+                self.agent_rpc.reload_pool(context, pool['id'])
+        else:
+            self.agent_rpc.destroy_pool(context, pool['id'])
+
+    def delete_pool(self, context, pool):
+        self.plugin._delete_db_pool(context, pool['id'])
+        self.agent_rpc.destroy_pool(context, pool['id'])
+
+    def create_member(self, context, member):
+        self.agent_rpc.modify_pool(context, member['pool_id'])
+
+    def update_member(self, context, old_member, member):
+        # member may change pool id
+        if member['pool_id'] != old_member['pool_id']:
+            self.agent_rpc.modify_pool(context, old_member['pool_id'])
+        self.agent_rpc.modify_pool(context, member['pool_id'])
+
+    def delete_member(self, context, member):
+        self.plugin._delete_db_member(context, member['id'])
+        self.agent_rpc.modify_pool(context, member['pool_id'])
+
+    def update_health_monitor(self, context, healthmon, pool_id):
+        # healthmon is unused here because agent will fetch what is necessary
+        self.agent_rpc.modify_pool(context, pool_id)
+
+    def delete_health_monitor(self, context, healthmon_id, pool_id):
+        # healthmon_id is not used in this driver
+        self.agent_rpc.modify_pool(context, pool_id)
+
+    def create_pool_health_monitor(self, context, healthmon, pool_id):
+        # healthmon is not used here
+        self.agent_rpc.modify_pool(context, pool_id)
+
+    def delete_pool_health_monitor(self, context, health_monitor, pool_id):
+        self.plugin._delete_db_pool_health_monitor(
+            context, health_monitor['id'], pool_id
+        )
+
+        # healthmon_id is not used here
+        self.agent_rpc.modify_pool(context, pool_id)
+
+    def create_health_monitor(self, context, health_monitor):
+        pass
+
+    def stats(self, context, pool_id):
+        pass
diff --git a/quantum/plugins/services/agent_loadbalancer/lbaas_plugin.py b/quantum/plugins/services/agent_loadbalancer/lbaas_plugin.py
deleted file mode 100644 (file)
index 1eb247f..0000000
+++ /dev/null
@@ -1,221 +0,0 @@
-#
-# Copyright 2013 Radware LTD.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-#
-# @author: Avishay Balderman, Radware
-
-from oslo.config import cfg
-
-from quantum.db import api as qdbapi
-from quantum.db.loadbalancer import loadbalancer_db
-from quantum.openstack.common import importutils
-from quantum.openstack.common import log as logging
-from quantum.plugins.common import constants
-
-LOG = logging.getLogger(__name__)
-
-DEFAULT_DRIVER = ("quantum.plugins.services.agent_loadbalancer"
-                  ".drivers.noop"
-                  ".noop_driver.NoopLbaaSDriver")
-
-lbaas_plugin_opts = [
-    cfg.StrOpt('driver_fqn',
-               default=DEFAULT_DRIVER,
-               help=_('LBaaS driver Fully Qualified Name'))
-]
-
-cfg.CONF.register_opts(lbaas_plugin_opts, "LBAAS")
-
-
-class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
-
-    """Implementation of the Quantum Loadbalancer Service Plugin.
-
-    This class manages the workflow of LBaaS request/response.
-    Most DB related works are implemented in class
-    loadbalancer_db.LoadBalancerPluginDb.
-    """
-    supported_extension_aliases = ["lbaas"]
-
-    def __init__(self):
-        """Initialization for the loadbalancer service plugin."""
-
-        qdbapi.register_models()
-        self.driver = importutils.import_object(
-            cfg.CONF.LBAAS.driver_fqn, self)
-
-    def get_plugin_type(self):
-        return constants.LOADBALANCER
-
-    def get_plugin_description(self):
-        return "Quantum LoadBalancer Service Plugin"
-
-    def create_vip(self, context, vip):
-        v = super(LoadBalancerPlugin, self).create_vip(context, vip)
-        self.driver.create_vip(context, v)
-        return v
-
-    def update_vip(self, context, id, vip):
-        if 'status' not in vip['vip']:
-            vip['vip']['status'] = constants.PENDING_UPDATE
-        old_vip = self.get_vip(context, id)
-        v = super(LoadBalancerPlugin, self).update_vip(context, id, vip)
-        self.driver.update_vip(context, old_vip, v)
-        return v
-
-    def _delete_db_vip(self, context, id):
-        super(LoadBalancerPlugin, self).delete_vip(context, id)
-
-    def delete_vip(self, context, id):
-        self.update_status(context, loadbalancer_db.Vip,
-                           id, constants.PENDING_DELETE)
-        v = self.get_vip(context, id)
-        self.driver.delete_vip(context, v)
-
-    def create_pool(self, context, pool):
-        p = super(LoadBalancerPlugin, self).create_pool(context, pool)
-        self.driver.create_pool(context, p)
-        return p
-
-    def update_pool(self, context, id, pool):
-        if 'status' not in pool['pool']:
-            pool['pool']['status'] = constants.PENDING_UPDATE
-        old_pool = self.get_pool(context, id)
-        p = super(LoadBalancerPlugin, self).update_pool(context, id, pool)
-        self.driver.update_pool(context, old_pool, p)
-        return p
-
-    def _delete_db_pool(self, context, id):
-        super(LoadBalancerPlugin, self).delete_pool(context, id)
-
-    def delete_pool(self, context, id):
-        self.update_status(context, loadbalancer_db.Pool,
-                           id, constants.PENDING_DELETE)
-        p = self.get_pool(context, id)
-        self.driver.delete_pool(context, p)
-
-    def create_member(self, context, member):
-        m = super(LoadBalancerPlugin, self).create_member(context, member)
-        self.driver.create_member(context, m)
-        return m
-
-    def update_member(self, context, id, member):
-        if 'status' not in member['member']:
-            member['member']['status'] = constants.PENDING_UPDATE
-        old_member = self.get_member(context, id)
-        m = super(LoadBalancerPlugin, self).update_member(context, id, member)
-        self.driver.update_member(context, old_member, m)
-        return m
-
-    def _delete_db_member(self, context, id):
-        super(LoadBalancerPlugin, self).delete_member(context, id)
-
-    def delete_member(self, context, id):
-        self.update_status(context, loadbalancer_db.Member,
-                           id, constants.PENDING_DELETE)
-        m = self.get_member(context, id)
-        self.driver.delete_member(context, m)
-
-    def create_health_monitor(self, context, health_monitor):
-        hm = super(LoadBalancerPlugin, self).create_health_monitor(
-            context,
-            health_monitor
-        )
-        self.driver.create_health_monitor(context, hm)
-        return hm
-
-    def update_health_monitor(self, context, id, health_monitor):
-        if 'status' not in health_monitor['health_monitor']:
-            health_monitor['health_monitor']['status'] = (
-                constants.PENDING_UPDATE
-            )
-        old_hm = self.get_health_monitor(context, id)
-        hm = super(LoadBalancerPlugin, self).update_health_monitor(
-            context,
-            id,
-            health_monitor
-        )
-
-        with context.session.begin(subtransactions=True):
-            qry = context.session.query(
-                loadbalancer_db.PoolMonitorAssociation
-            )
-            qry = qry.filter_by(monitor_id=hm['id'])
-            assocs = qry.all()
-        for assoc in assocs:
-            self.driver.update_health_monitor(context, old_hm, hm, assoc)
-        return hm
-
-    def _delete_db_pool_health_monitor(self, context, hm_id, pool_id):
-        super(LoadBalancerPlugin, self).delete_pool_health_monitor(context,
-                                                                   hm_id,
-                                                                   pool_id)
-
-    def delete_health_monitor(self, context, id):
-        with context.session.begin(subtransactions=True):
-            qry = context.session.query(
-                loadbalancer_db.PoolMonitorAssociation
-            )
-            qry = qry.filter_by(monitor_id=id)
-            assocs = qry.all()
-        hm = self.get_health_monitor(context, id)
-        for assoc in assocs:
-            self.driver.delete_pool_health_monitor(context,
-                                                   hm,
-                                                   assoc['pool_id'])
-
-    def create_pool_health_monitor(self, context, health_monitor, pool_id):
-        retval = super(LoadBalancerPlugin, self).create_pool_health_monitor(
-            context,
-            health_monitor,
-            pool_id
-        )
-        # open issue: PoolMonitorAssociation has no status field
-        # so we cant set the status to pending and let the driver
-        # set the real status of the association
-        self.driver.create_pool_health_monitor(
-            context, health_monitor, pool_id)
-        return retval
-
-    def delete_pool_health_monitor(self, context, id, pool_id):
-        hm = self.get_health_monitor(context, id)
-        self.driver.delete_pool_health_monitor(
-            context, hm, pool_id)
-
-    def stats(self, context, pool_id):
-        stats_data = self.driver.stats(context, pool_id)
-        # if we get something from the driver -
-        # update the db and return the value from db
-        # else - return what we have in db
-        if stats_data:
-            super(LoadBalancerPlugin, self)._update_pool_stats(
-                context,
-                pool_id,
-                stats_data
-            )
-        return super(LoadBalancerPlugin, self).stats(context,
-                                                     pool_id)
-
-    def populate_vip_graph(self, context, vip):
-        """Populate the vip with: pool, members, healthmonitors."""
-
-        pool = self.get_pool(context, vip['pool_id'])
-        vip['pool'] = pool
-        vip['members'] = [
-            self.get_member(context, member_id)
-            for member_id in pool['members']]
-        vip['health_monitors'] = [
-            self.get_health_monitor(context, hm_id)
-            for hm_id in pool['health_monitors']]
-        return vip
index ba6112178c894728cde3660533966d6d06e93063..edaf0a0dcdfbdcbb3552f16a28701618b7245198 100644 (file)
@@ -1,7 +1,5 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2013 OpenStack Foundation.
-# All Rights Reserved.
+#
+# Copyright 2013 Radware LTD.
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 #    License for the specific language governing permissions and limitations
 #    under the License.
-
-import uuid
+#
+# @author: Avishay Balderman, Radware
 
 from oslo.config import cfg
 
-from quantum.common import exceptions as q_exc
-from quantum.common import rpc as q_rpc
-from quantum.common import topics
 from quantum.db import api as qdbapi
 from quantum.db.loadbalancer import loadbalancer_db
+from quantum.openstack.common import importutils
 from quantum.openstack.common import log as logging
-from quantum.openstack.common import rpc
-from quantum.openstack.common.rpc import proxy
 from quantum.plugins.common import constants
 
 LOG = logging.getLogger(__name__)
 
-ACTIVE_PENDING = (
-    constants.ACTIVE,
-    constants.PENDING_CREATE,
-    constants.PENDING_UPDATE
-)
-
-
-class LoadBalancerCallbacks(object):
-    RPC_API_VERSION = '1.0'
-
-    def __init__(self, plugin):
-        self.plugin = plugin
-
-    def create_rpc_dispatcher(self):
-        return q_rpc.PluginRpcDispatcher([self])
-
-    def get_ready_devices(self, context, host=None):
-        with context.session.begin(subtransactions=True):
-            qry = (context.session.query(loadbalancer_db.Pool.id).
-                   join(loadbalancer_db.Vip))
-
-            qry = qry.filter(loadbalancer_db.Vip.status.in_(ACTIVE_PENDING))
-            qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING))
-            up = True  # makes pep8 and sqlalchemy happy
-            qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up)
-            qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
-            return [id for id, in qry]
-
-    def get_logical_device(self, context, pool_id=None, activate=True,
-                           **kwargs):
-        with context.session.begin(subtransactions=True):
-            qry = context.session.query(loadbalancer_db.Pool)
-            qry = qry.filter_by(id=pool_id)
-            pool = qry.one()
-
-            if activate:
-                # set all resources to active
-                if pool.status in ACTIVE_PENDING:
-                    pool.status = constants.ACTIVE
-
-                if pool.vip.status in ACTIVE_PENDING:
-                    pool.vip.status = constants.ACTIVE
-
-                for m in pool.members:
-                    if m.status in ACTIVE_PENDING:
-                        m.status = constants.ACTIVE
-
-                for hm in pool.monitors:
-                    if hm.healthmonitor.status in ACTIVE_PENDING:
-                        hm.healthmonitor.status = constants.ACTIVE
-
-            if (pool.status != constants.ACTIVE
-                or pool.vip.status != constants.ACTIVE):
-                raise q_exc.Invalid(_('Expected active pool and vip'))
-
-            retval = {}
-            retval['pool'] = self.plugin._make_pool_dict(pool)
-            retval['vip'] = self.plugin._make_vip_dict(pool.vip)
-            retval['vip']['port'] = (
-                self.plugin._core_plugin._make_port_dict(pool.vip.port)
-            )
-            for fixed_ip in retval['vip']['port']['fixed_ips']:
-                fixed_ip['subnet'] = (
-                    self.plugin._core_plugin.get_subnet(
-                        context,
-                        fixed_ip['subnet_id']
-                    )
-                )
-            retval['members'] = [
-                self.plugin._make_member_dict(m)
-                for m in pool.members if m.status == constants.ACTIVE
-            ]
-            retval['healthmonitors'] = [
-                self.plugin._make_health_monitor_dict(hm.healthmonitor)
-                for hm in pool.monitors
-                if hm.healthmonitor.status == constants.ACTIVE
-            ]
-
-            return retval
-
-    def pool_destroyed(self, context, pool_id=None, host=None):
-        """Agent confirmation hook that a pool has been destroyed.
-
-        This method exists for subclasses to change the deletion
-        behavior.
-        """
-        pass
-
-    def plug_vip_port(self, context, port_id=None, host=None):
-        if not port_id:
-            return
-
-        try:
-            port = self.plugin._core_plugin.get_port(
-                context,
-                port_id
-            )
-        except q_exc.PortNotFound:
-            msg = _('Unable to find port %s to plug.')
-            LOG.debug(msg, port_id)
-            return
-
-        port['admin_state_up'] = True
-        port['device_owner'] = 'quantum:' + constants.LOADBALANCER
-        port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host)))
-
-        self.plugin._core_plugin.update_port(
-            context,
-            port_id,
-            {'port': port}
-        )
-
-    def unplug_vip_port(self, context, port_id=None, host=None):
-        if not port_id:
-            return
-
-        try:
-            port = self.plugin._core_plugin.get_port(
-                context,
-                port_id
-            )
-        except q_exc.PortNotFound:
-            msg = _('Unable to find port %s to unplug.  This can occur when '
-                    'the Vip has been deleted first.')
-            LOG.debug(msg, port_id)
-            return
-
-        port['admin_state_up'] = False
-        port['device_owner'] = ''
-        port['device_id'] = ''
-
-        try:
-            self.plugin._core_plugin.update_port(
-                context,
-                port_id,
-                {'port': port}
-            )
-
-        except q_exc.PortNotFound:
-            msg = _('Unable to find port %s to unplug.  This can occur when '
-                    'the Vip has been deleted first.')
-            LOG.debug(msg, port_id)
-
-    def update_pool_stats(self, context, pool_id=None, stats=None, host=None):
-        # TODO(markmcclain): add stats collection
-        pass
-
+DEFAULT_DRIVER = ("quantum.plugins.services.agent_loadbalancer"
+                  ".drivers.haproxy"
+                  ".plugin_driver.HaproxyOnHostPluginDriver")
 
-class LoadBalancerAgentApi(proxy.RpcProxy):
-    """Plugin side of plugin to agent RPC API."""
+lbaas_plugin_opts = [
+    cfg.StrOpt('driver_fqn',
+               default=DEFAULT_DRIVER,
+               help=_('LBaaS driver Fully Qualified Name'))
+]
 
-    API_VERSION = '1.0'
-
-    def __init__(self, topic, host):
-        super(LoadBalancerAgentApi, self).__init__(topic, self.API_VERSION)
-        self.host = host
-
-    def reload_pool(self, context, pool_id):
-        return self.cast(
-            context,
-            self.make_msg('reload_pool', pool_id=pool_id, host=self.host),
-            topic=self.topic
-        )
-
-    def destroy_pool(self, context, pool_id):
-        return self.cast(
-            context,
-            self.make_msg('destroy_pool', pool_id=pool_id, host=self.host),
-            topic=self.topic
-        )
-
-    def modify_pool(self, context, pool_id):
-        return self.cast(
-            context,
-            self.make_msg('modify_pool', pool_id=pool_id, host=self.host),
-            topic=self.topic
-        )
+cfg.CONF.register_opts(lbaas_plugin_opts, "LBAAS")
 
 
 class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
@@ -221,22 +49,22 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
     supported_extension_aliases = ["lbaas"]
 
     def __init__(self):
-        """Do the initialization for the loadbalancer service plugin here."""
-        qdbapi.register_models()
+        """Initialization for the loadbalancer service plugin."""
 
-        self.callbacks = LoadBalancerCallbacks(self)
+        qdbapi.register_models()
+        self._load_drivers()
 
-        self.conn = rpc.create_connection(new=True)
-        self.conn.create_consumer(
-            topics.LOADBALANCER_PLUGIN,
-            self.callbacks.create_rpc_dispatcher(),
-            fanout=False)
-        self.conn.consume_in_thread()
+    def _load_drivers(self):
+        """Loads plugin-driver from configuration.
 
-        self.agent_rpc = LoadBalancerAgentApi(
-            topics.LOADBALANCER_AGENT,
-            cfg.CONF.host
-        )
+           That method will later leverage service type framework
+        """
+        try:
+            self.driver = importutils.import_object(
+                cfg.CONF.LBAAS.driver_fqn, self)
+        except ImportError:
+            LOG.exception(_("Error loading LBaaS driver %s"),
+                          cfg.CONF.LBAAS.driver_fqn)
 
     def get_plugin_type(self):
         return constants.LOADBALANCER
@@ -245,68 +73,89 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
         return "Quantum LoadBalancer Service Plugin"
 
     def create_vip(self, context, vip):
-        vip['vip']['status'] = constants.PENDING_CREATE
         v = super(LoadBalancerPlugin, self).create_vip(context, vip)
-        self.agent_rpc.reload_pool(context, v['pool_id'])
+        self.driver.create_vip(context, v)
         return v
 
     def update_vip(self, context, id, vip):
         if 'status' not in vip['vip']:
             vip['vip']['status'] = constants.PENDING_UPDATE
+        old_vip = self.get_vip(context, id)
         v = super(LoadBalancerPlugin, self).update_vip(context, id, vip)
-        if v['status'] in ACTIVE_PENDING:
-            self.agent_rpc.reload_pool(context, v['pool_id'])
-        else:
-            self.agent_rpc.destroy_pool(context, v['pool_id'])
+        self.driver.update_vip(context, old_vip, v)
         return v
 
-    def delete_vip(self, context, id):
-        vip = self.get_vip(context, id)
+    def _delete_db_vip(self, context, id):
+        # proxy the call until plugin inherits from DBPlugin
         super(LoadBalancerPlugin, self).delete_vip(context, id)
-        self.agent_rpc.destroy_pool(context, vip['pool_id'])
+
+    def delete_vip(self, context, id):
+        self.update_status(context, loadbalancer_db.Vip,
+                           id, constants.PENDING_DELETE)
+        v = self.get_vip(context, id)
+        self.driver.delete_vip(context, v)
 
     def create_pool(self, context, pool):
         p = super(LoadBalancerPlugin, self).create_pool(context, pool)
-        # don't notify here because a pool needs a vip to be useful
+        self.driver.create_pool(context, p)
         return p
 
     def update_pool(self, context, id, pool):
         if 'status' not in pool['pool']:
             pool['pool']['status'] = constants.PENDING_UPDATE
+        old_pool = self.get_pool(context, id)
         p = super(LoadBalancerPlugin, self).update_pool(context, id, pool)
-        if p['status'] in ACTIVE_PENDING:
-            if p['vip_id'] is not None:
-                self.agent_rpc.reload_pool(context, p['id'])
-        else:
-            self.agent_rpc.destroy_pool(context, p['id'])
+        self.driver.update_pool(context, old_pool, p)
         return p
 
-    def delete_pool(self, context, id):
+    def _delete_db_pool(self, context, id):
+        # proxy the call until plugin inherits from DBPlugin
         super(LoadBalancerPlugin, self).delete_pool(context, id)
-        self.agent_rpc.destroy_pool(context, id)
+
+    def delete_pool(self, context, id):
+        self.update_status(context, loadbalancer_db.Pool,
+                           id, constants.PENDING_DELETE)
+        p = self.get_pool(context, id)
+        self.driver.delete_pool(context, p)
 
     def create_member(self, context, member):
         m = super(LoadBalancerPlugin, self).create_member(context, member)
-        self.agent_rpc.modify_pool(context, m['pool_id'])
+        self.driver.create_member(context, m)
         return m
 
     def update_member(self, context, id, member):
         if 'status' not in member['member']:
             member['member']['status'] = constants.PENDING_UPDATE
+        old_member = self.get_member(context, id)
         m = super(LoadBalancerPlugin, self).update_member(context, id, member)
-        self.agent_rpc.modify_pool(context, m['pool_id'])
+        self.driver.update_member(context, old_member, m)
         return m
 
+    def _delete_db_member(self, context, id):
+        # proxy the call until plugin inherits from DBPlugin
+        super(LoadBalancerPlugin, self).delete_member(context, id)
+
     def delete_member(self, context, id):
+        self.update_status(context, loadbalancer_db.Member,
+                           id, constants.PENDING_DELETE)
         m = self.get_member(context, id)
-        super(LoadBalancerPlugin, self).delete_member(context, id)
-        self.agent_rpc.modify_pool(context, m['pool_id'])
+        self.driver.delete_member(context, m)
+
+    def create_health_monitor(self, context, health_monitor):
+        # no PENDING_CREATE status sinse healthmon is shared DB object
+        hm = super(LoadBalancerPlugin, self).create_health_monitor(
+            context,
+            health_monitor
+        )
+        self.driver.create_health_monitor(context, hm)
+        return hm
 
     def update_health_monitor(self, context, id, health_monitor):
         if 'status' not in health_monitor['health_monitor']:
             health_monitor['health_monitor']['status'] = (
                 constants.PENDING_UPDATE
             )
+        old_hm = self.get_health_monitor(context, id)
         hm = super(LoadBalancerPlugin, self).update_health_monitor(
             context,
             id,
@@ -316,24 +165,26 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
         with context.session.begin(subtransactions=True):
             qry = context.session.query(
                 loadbalancer_db.PoolMonitorAssociation
-            )
-            qry = qry.filter_by(monitor_id=hm['id'])
-
+            ).filter_by(monitor_id=hm['id'])
             for assoc in qry:
-                self.agent_rpc.modify_pool(context, assoc['pool_id'])
+                self.driver.update_health_monitor(context, old_hm, hm, assoc)
         return hm
 
+    def _delete_db_pool_health_monitor(self, context, hm_id, pool_id):
+        super(LoadBalancerPlugin, self).delete_pool_health_monitor(context,
+                                                                   hm_id,
+                                                                   pool_id)
+
     def delete_health_monitor(self, context, id):
         with context.session.begin(subtransactions=True):
+            hm = self.get_health_monitor(context, id)
             qry = context.session.query(
                 loadbalancer_db.PoolMonitorAssociation
-            )
-            qry = qry.filter_by(monitor_id=id)
-
-            pool_ids = [a['pool_id'] for a in qry]
-            super(LoadBalancerPlugin, self).delete_health_monitor(context, id)
-        for pid in pool_ids:
-            self.agent_rpc.modify_pool(context, pid)
+            ).filter_by(monitor_id=id)
+            for assoc in qry:
+                self.driver.delete_pool_health_monitor(context,
+                                                       hm,
+                                                       assoc['pool_id'])
 
     def create_pool_health_monitor(self, context, health_monitor, pool_id):
         retval = super(LoadBalancerPlugin, self).create_pool_health_monitor(
@@ -341,16 +192,41 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
             health_monitor,
             pool_id
         )
-        self.agent_rpc.modify_pool(context, pool_id)
-
+        # open issue: PoolMonitorAssociation has no status field
+        # so we cant set the status to pending and let the driver
+        # set the real status of the association
+        self.driver.create_pool_health_monitor(
+            context, health_monitor, pool_id)
         return retval
 
     def delete_pool_health_monitor(self, context, id, pool_id):
-        retval = super(LoadBalancerPlugin, self).delete_pool_health_monitor(
-            context,
-            id,
-            pool_id
-        )
-        self.agent_rpc.modify_pool(context, pool_id)
-
-        return retval
+        hm = self.get_health_monitor(context, id)
+        self.driver.delete_pool_health_monitor(
+            context, hm, pool_id)
+
+    def stats(self, context, pool_id):
+        stats_data = self.driver.stats(context, pool_id)
+        # if we get something from the driver -
+        # update the db and return the value from db
+        # else - return what we have in db
+        if stats_data:
+            super(LoadBalancerPlugin, self)._update_pool_stats(
+                context,
+                pool_id,
+                stats_data
+            )
+        return super(LoadBalancerPlugin, self).stats(context,
+                                                     pool_id)
+
+    def populate_vip_graph(self, context, vip):
+        """Populate the vip with: pool, members, healthmonitors."""
+
+        pool = self.get_pool(context, vip['pool_id'])
+        vip['pool'] = pool
+        vip['members'] = [
+            self.get_member(context, member_id)
+            for member_id in pool['members']]
+        vip['health_monitors'] = [
+            self.get_health_monitor(context, hm_id)
+            for hm_id in pool['health_monitors']]
+        return vip
index 02a8f3c4916c0da63450d1e0a05fb1b9b5f8bca5..f8b360b3fa6de1b2890126ae8f9421b445a59e05 100644 (file)
@@ -39,7 +39,7 @@ LOG = logging.getLogger(__name__)
 DB_CORE_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2'
 DB_LB_PLUGIN_KLASS = (
     "quantum.plugins.services.agent_loadbalancer."
-    "lbaas_plugin.LoadBalancerPlugin"
+    "plugin.LoadBalancerPlugin"
 )
 ROOTDIR = os.path.dirname(__file__) + '../../../..'
 ETCDIR = os.path.join(ROOTDIR, 'etc')
diff --git a/quantum/tests/unit/services/agent_loadbalancer/agent/__init__.py b/quantum/tests/unit/services/agent_loadbalancer/agent/__init__.py
deleted file mode 100644 (file)
index ce18bf6..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-#
-# Copyright 2013 New Dream Network, LLC (DreamHost)
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-#
-# @author: Mark McClain, DreamHost
similarity index 96%
rename from quantum/tests/unit/services/agent_loadbalancer/agent/test_init.py
rename to quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_agent.py
index e09f0278ff8b34b711f6c825b629aa1fef7004e5..ad136d5b6b08b2517d97c1bcc7893cb062f183d1 100644 (file)
@@ -20,7 +20,7 @@ import contextlib
 import mock
 from oslo.config import cfg
 
-from quantum.plugins.services.agent_loadbalancer import agent
+from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import agent
 from quantum.tests import base
 
 
similarity index 98%
rename from quantum/tests/unit/services/agent_loadbalancer/agent/test_manager.py
rename to quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_agent_manager.py
index e284a3126dd6cfbd9362bcdd7d2ef87d62096eaf..ed5d9ecbf18e2ce22eb07791a515279887a5fb57 100644 (file)
@@ -20,7 +20,9 @@ import contextlib
 
 import mock
 
-from quantum.plugins.services.agent_loadbalancer.agent import manager
+from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
+    agent_manager as manager
+)
 from quantum.tests import base
 
 
@@ -145,8 +147,8 @@ class TestManager(base.BaseTestCase):
         self.mock_importer = mock.patch.object(manager, 'importutils').start()
 
         rpc_mock_cls = mock.patch(
-            'quantum.plugins.services.agent_loadbalancer.agent.api'
-            '.LbaasAgentApi'
+            'quantum.plugins.services.agent_loadbalancer.drivers'
+            '.haproxy.agent_api.LbaasAgentApi'
         ).start()
 
         self.mgr = manager.LbaasAgentManager(mock_conf)
similarity index 97%
rename from quantum/tests/unit/services/agent_loadbalancer/agent/test_api.py
rename to quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_api.py
index 9b36cd4f75811c58885a637eef6118d04d66adc1..70521439f145d6b5772fadf697e044e8343882ca 100644 (file)
@@ -18,7 +18,9 @@
 
 import mock
 
-from quantum.plugins.services.agent_loadbalancer.agent import api
+from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
+    agent_api as api
+)
 from quantum.tests import base
 
 
similarity index 77%
rename from quantum/tests/unit/services/agent_loadbalancer/test_plugin.py
rename to quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_plugin_driver.py
index 77b295bf83708d232e1a3bb713c3fdc0a3a365ed..217da560e4d324618ef4824c03ff67464069353e 100644 (file)
@@ -1,7 +1,6 @@
 # vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2013 OpenStack Foundation.
-# All Rights Reserved.
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
 #
 #    Licensed under the Apache License, Version 2.0 (the "License"); you may
 #    not use this file except in compliance with the License. You may obtain
@@ -23,10 +22,11 @@ from quantum.common import exceptions
 from quantum import context
 from quantum.db.loadbalancer import loadbalancer_db as ldb
 from quantum import manager
-from quantum.openstack.common import importutils
 from quantum.openstack.common import uuidutils
 from quantum.plugins.common import constants
-from quantum.plugins.services.agent_loadbalancer import plugin
+from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
+    plugin_driver
+)
 from quantum.tests import base
 from quantum.tests.unit.db.loadbalancer import test_db_loadbalancer
 
@@ -42,25 +42,18 @@ class TestLoadBalancerPluginBase(
 
         # we need access to loaded plugins to modify models
         loaded_plugins = manager.QuantumManager().get_service_plugins()
-        # TODO(avishayb) - below is a little hack that helps the
-        # test to pass :-)
-        # the problem is the code below assumes the existance of 'callbacks'
-        # on the plugin. So the bypass is to load the plugin that has
-        # the callbacks as a member.The hack will be removed once we will
-        # have one lbaas plugin. (we currently have 2 - (Grizzly and Havana))
-        hack = True
-        if hack:
-            HACK_KLASS = (
-                "quantum.plugins.services.agent_loadbalancer."
-                "plugin.LoadBalancerPlugin"
-            )
-            self.plugin_instance = importutils.import_object(HACK_KLASS)
-        else:
-            self.plugin_instance = loaded_plugins[constants.LOADBALANCER]
-        self.callbacks = self.plugin_instance.callbacks
+
+        self.plugin_instance = loaded_plugins[constants.LOADBALANCER]
 
 
 class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
+    def setUp(self):
+        super(TestLoadBalancerCallbacks, self).setUp()
+
+        self.callbacks = plugin_driver.LoadBalancerCallbacks(
+            self.plugin_instance
+        )
+
     def test_get_ready_devices(self):
         with self.vip() as vip:
             ready = self.callbacks.get_ready_devices(
@@ -242,7 +235,7 @@ class TestLoadBalancerAgentApi(base.BaseTestCase):
         super(TestLoadBalancerAgentApi, self).setUp()
         self.addCleanup(mock.patch.stopall)
 
-        self.api = plugin.LoadBalancerAgentApi('topic', 'host')
+        self.api = plugin_driver.LoadBalancerAgentApi('topic', 'host')
         self.mock_cast = mock.patch.object(self.api, 'cast').start()
         self.mock_msg = mock.patch.object(self.api, 'make_msg').start()
 
@@ -273,3 +266,58 @@ class TestLoadBalancerAgentApi(base.BaseTestCase):
 
     def test_modify_pool(self):
         self._call_test_helper('modify_pool')
+
+
+class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
+    def setUp(self):
+        self.log = mock.patch.object(plugin_driver, 'LOG')
+        api_cls = mock.patch.object(plugin_driver,
+                                    'LoadBalancerAgentApi').start()
+        super(TestLoadBalancerPluginNotificationWrapper, self).setUp()
+        self.mock_api = api_cls.return_value
+
+        self.addCleanup(mock.patch.stopall)
+
+    def test_create_vip(self):
+        with self.subnet() as subnet:
+            with self.pool(subnet=subnet) as pool:
+                with self.vip(pool=pool, subnet=subnet) as vip:
+                    self.mock_api.reload_pool.assert_called_once_with(
+                        mock.ANY,
+                        vip['vip']['pool_id']
+                    )
+
+    def test_update_vip(self):
+        with self.subnet() as subnet:
+            with self.pool(subnet=subnet) as pool:
+                with self.vip(pool=pool, subnet=subnet) as vip:
+                    self.mock_api.reset_mock()
+                    ctx = context.get_admin_context()
+                    vip['vip'].pop('status')
+                    new_vip = self.plugin_instance.update_vip(
+                        ctx,
+                        vip['vip']['id'],
+                        vip
+                    )
+
+                    self.mock_api.reload_pool.assert_called_once_with(
+                        mock.ANY,
+                        vip['vip']['pool_id']
+                    )
+
+                    self.assertEqual(
+                        new_vip['status'],
+                        constants.PENDING_UPDATE
+                    )
+
+    def test_delete_vip(self):
+        with self.subnet() as subnet:
+            with self.pool(subnet=subnet) as pool:
+                with self.vip(pool=pool, subnet=subnet, no_delete=True) as vip:
+                    self.mock_api.reset_mock()
+                    ctx = context.get_admin_context()
+                    self.plugin_instance.delete_vip(ctx, vip['vip']['id'])
+                    self.mock_api.destroy_pool.assert_called_once_with(
+                        mock.ANY,
+                        vip['vip']['pool_id']
+                    )