]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Add agent scheduling for LBaaS namespace agent
authorOleg Bondarev <obondarev@mirantis.com>
Wed, 29 May 2013 07:58:17 +0000 (11:58 +0400)
committerOleg Bondarev <obondarev@mirantis.com>
Fri, 19 Jul 2013 09:25:04 +0000 (13:25 +0400)
- adds simple chance scheduling on create pool operation
- adds PoolsLoadbalancerAgentBinding db table
- adds lbaas_agentscheduler extension to list pools hosted by a particular agent
  and to get an agent hosting a particular pool
- adds agent notifiers mapping to AgentSchedulerDbMixin to make it easier
  for services to add their agent notifiers to the core plugin

Implements blueprint lbaas-agent-scheduler
Change-Id: Id98649fd5c7873dcd5be1a2b117b8bed25f06cc2

27 files changed:
etc/neutron.conf
etc/policy.json
neutron/common/constants.py
neutron/db/agents_db.py
neutron/db/agentschedulers_db.py
neutron/db/migration/alembic_migrations/versions/52c5e4a18807_lbaas_pool_scheduler.py [new file with mode: 0644]
neutron/extensions/lbaas_agentscheduler.py [new file with mode: 0644]
neutron/manager.py
neutron/plugins/brocade/NeutronPlugin.py
neutron/plugins/linuxbridge/lb_neutron_plugin.py
neutron/plugins/ml2/plugin.py
neutron/plugins/nec/nec_plugin.py
neutron/plugins/nicira/NeutronPlugin.py
neutron/plugins/openvswitch/ovs_neutron_plugin.py
neutron/services/loadbalancer/agent_scheduler.py [new file with mode: 0644]
neutron/services/loadbalancer/drivers/haproxy/agent.py
neutron/services/loadbalancer/drivers/haproxy/agent_manager.py
neutron/services/loadbalancer/drivers/haproxy/plugin_driver.py
neutron/services/loadbalancer/drivers/noop/noop_driver.py
neutron/services/loadbalancer/plugin.py
neutron/tests/unit/db/loadbalancer/test_db_loadbalancer.py
neutron/tests/unit/dummy_plugin.py
neutron/tests/unit/openvswitch/test_agent_scheduler.py
neutron/tests/unit/services/loadbalancer/drivers/haproxy/test_plugin_driver.py
neutron/tests/unit/services/loadbalancer/test_agent_scheduler.py [new file with mode: 0644]
neutron/tests/unit/test_agent_ext_plugin.py
neutron/tests/unit/test_neutron_manager.py

index 8968f52e8d599c862413e8e407486ca646d5b3e3..a5d286fc5eff34ea2094cc113ddb736dcd105133 100644 (file)
@@ -220,6 +220,8 @@ notification_topics = notifications
 # network_scheduler_driver = neutron.scheduler.dhcp_agent_scheduler.ChanceScheduler
 # Driver to use for scheduling router to a default L3 agent
 # router_scheduler_driver = neutron.scheduler.l3_agent_scheduler.ChanceScheduler
+# Driver to use for scheduling a loadbalancer pool to an lbaas agent
+# loadbalancer_pool_scheduler_driver = neutron.services.loadbalancer.agent_scheduler.ChanceScheduler
 
 # Allow auto scheduling networks to DHCP agent. It will schedule non-hosted
 # networks to first DHCP agent which sends get_active_networks message to
index b85384e993d3826425341cc6ddfada6473a37a5a..2bf5aa6b96eb1452e029c35df67ee672d217e975 100644 (file)
@@ -79,6 +79,8 @@
     "get_l3-routers": "rule:admin_only",
     "get_dhcp-agents": "rule:admin_only",
     "get_l3-agents": "rule:admin_only",
+    "get_loadbalancer-agent": "rule:admin_only",
+    "get_loadbalancer-pools": "rule:admin_only",
 
     "create_router": "rule:regular_user",
     "get_router": "rule:admin_or_owner",
index a644022d4d357debd08900223635e4a1213e336c..3909044aa3cf8a4ae3251a18b035cada5b3d0e05 100644 (file)
@@ -67,6 +67,7 @@ AGENT_TYPE_OVS = 'Open vSwitch agent'
 AGENT_TYPE_LINUXBRIDGE = 'Linux bridge agent'
 AGENT_TYPE_NEC = 'NEC plugin agent'
 AGENT_TYPE_L3 = 'L3 agent'
+AGENT_TYPE_LOADBALANCER = 'Loadbalancer agent'
 L2_AGENT_TOPIC = 'N/A'
 
 PAGINATION_INFINITE = 'infinite'
@@ -76,3 +77,4 @@ SORT_DIRECTION_DESC = 'desc'
 
 L3_AGENT_SCHEDULER_EXT_ALIAS = 'l3_agent_scheduler'
 DHCP_AGENT_SCHEDULER_EXT_ALIAS = 'dhcp_agent_scheduler'
+LBAAS_AGENT_SCHEDULER_EXT_ALIAS = 'lbaas_agent_scheduler'
index b4a8b21657378b306367fe8721b7059868eb55d7..3e4ce5ea2007d63875a4153d47f207c1c02d1f4a 100644 (file)
@@ -162,6 +162,9 @@ class AgentExtRpcCallback(object):
     RPC_API_VERSION = '1.0'
     START_TIME = timeutils.utcnow()
 
+    def __init__(self, plugin=None):
+        self.plugin = plugin
+
     def report_state(self, context, **kwargs):
         """Report state from agent to server."""
         time = kwargs['time']
@@ -170,5 +173,6 @@ class AgentExtRpcCallback(object):
             LOG.debug(_("Message with invalid timestamp received"))
             return
         agent_state = kwargs['agent_state']['agent_state']
-        plugin = manager.NeutronManager.get_plugin()
-        plugin.create_or_update_agent(context, agent_state)
+        if not self.plugin:
+            self.plugin = manager.NeutronManager.get_plugin()
+        self.plugin.create_or_update_agent(context, agent_state)
index 49d6f383214a381871e0082a955bc4482b6cd1e3..d44b4b509f20e377d81e781a7538520b2858308a 100644 (file)
@@ -79,8 +79,13 @@ class RouterL3AgentBinding(model_base.BASEV2, models_v2.HasId):
 class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
     """Common class for agent scheduler mixins."""
 
-    dhcp_agent_notifier = None
-    l3_agent_notifier = None
+    # agent notifiers to handle agent update operations;
+    # should be updated by plugins;
+    agent_notifiers = {
+        constants.AGENT_TYPE_DHCP: None,
+        constants.AGENT_TYPE_L3: None,
+        constants.AGENT_TYPE_LOADBALANCER: None,
+    }
 
     @staticmethod
     def is_eligible_agent(active, agent):
@@ -100,18 +105,13 @@ class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
         result = super(AgentSchedulerDbMixin, self).update_agent(
             context, id, agent)
         agent_data = agent['agent']
-        if ('admin_state_up' in agent_data and
+        agent_notifier = self.agent_notifiers.get(original_agent['agent_type'])
+        if (agent_notifier and
+            'admin_state_up' in agent_data and
             original_agent['admin_state_up'] != agent_data['admin_state_up']):
-            if (original_agent['agent_type'] == constants.AGENT_TYPE_DHCP and
-                self.dhcp_agent_notifier):
-                self.dhcp_agent_notifier.agent_updated(
-                    context, agent_data['admin_state_up'],
-                    original_agent['host'])
-            elif (original_agent['agent_type'] == constants.AGENT_TYPE_L3 and
-                  self.l3_agent_notifier):
-                self.l3_agent_notifier.agent_updated(
-                    context, agent_data['admin_state_up'],
-                    original_agent['host'])
+            agent_notifier.agent_updated(context,
+                                         agent_data['admin_state_up'],
+                                         original_agent['host'])
         return result
 
 
@@ -148,8 +148,9 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
                 raise l3agentscheduler.RouterSchedulingFailed(
                     router_id=router_id, agent_id=id)
 
-        if self.l3_agent_notifier:
-            self.l3_agent_notifier.router_added_to_agent(
+        l3_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_L3)
+        if l3_notifier:
+            l3_notifier.router_added_to_agent(
                 context, [router_id], agent_db.host)
 
     def remove_router_from_l3_agent(self, context, id, router_id):
@@ -170,8 +171,9 @@ class L3AgentSchedulerDbMixin(l3agentscheduler.L3AgentSchedulerPluginBase,
                 raise l3agentscheduler.RouterNotHostedByL3Agent(
                     router_id=router_id, agent_id=id)
             context.session.delete(binding)
-        if self.l3_agent_notifier:
-            self.l3_agent_notifier.router_removed_from_agent(
+        l3_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_L3)
+        if l3_notifier:
+            l3_notifier.router_removed_from_agent(
                 context, router_id, agent.host)
 
     def list_routers_on_l3_agent(self, context, id):
@@ -356,8 +358,9 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
             binding.dhcp_agent_id = id
             binding.network_id = network_id
             context.session.add(binding)
-        if self.dhcp_agent_notifier:
-            self.dhcp_agent_notifier.network_added_to_agent(
+        dhcp_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_DHCP)
+        if dhcp_notifier:
+            dhcp_notifier.network_added_to_agent(
                 context, network_id, agent_db.host)
 
     def remove_network_from_dhcp_agent(self, context, id, network_id):
@@ -372,8 +375,9 @@ class DhcpAgentSchedulerDbMixin(dhcpagentscheduler
                 raise dhcpagentscheduler.NetworkNotHostedByDhcpAgent(
                     network_id=network_id, agent_id=id)
             context.session.delete(binding)
-        if self.dhcp_agent_notifier:
-            self.dhcp_agent_notifier.network_removed_from_agent(
+        dhcp_notifier = self.agent_notifiers.get(constants.AGENT_TYPE_DHCP)
+        if dhcp_notifier:
+            dhcp_notifier.network_removed_from_agent(
                 context, network_id, agent.host)
 
     def list_networks_on_dhcp_agent(self, context, id):
diff --git a/neutron/db/migration/alembic_migrations/versions/52c5e4a18807_lbaas_pool_scheduler.py b/neutron/db/migration/alembic_migrations/versions/52c5e4a18807_lbaas_pool_scheduler.py
new file mode 100644 (file)
index 0000000..b0f12e9
--- /dev/null
@@ -0,0 +1,53 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 OpenStack Foundation
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+
+"""LBaaS Pool scheduler
+
+Revision ID: 52c5e4a18807
+Revises: 2032abe8edac
+Create Date: 2013-06-14 03:23:47.815865
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '52c5e4a18807'
+down_revision = '2032abe8edac'
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade(active_plugin=None, options=None):
+    ### commands auto generated by Alembic - please adjust! ###
+    op.create_table(
+        'poolloadbalanceragentbindings',
+        sa.Column('pool_id', sa.String(length=36), nullable=False),
+        sa.Column('loadbalancer_agent_id', sa.String(length=36),
+                  nullable=False),
+        sa.ForeignKeyConstraint(['loadbalancer_agent_id'], ['agents.id'],
+                                ondelete='CASCADE'),
+        sa.ForeignKeyConstraint(['pool_id'], ['pools.id'],
+                                ondelete='CASCADE'),
+        sa.PrimaryKeyConstraint('pool_id')
+    )
+    ### end Alembic commands ###
+
+
+def downgrade(active_plugin=None, options=None):
+    ### commands auto generated by Alembic - please adjust! ###
+    op.drop_table('poolloadbalanceragentbindings')
+    ### end Alembic commands ###
diff --git a/neutron/extensions/lbaas_agentscheduler.py b/neutron/extensions/lbaas_agentscheduler.py
new file mode 100644 (file)
index 0000000..5d2e64c
--- /dev/null
@@ -0,0 +1,138 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 OpenStack Foundation.
+# All rights reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from abc import abstractmethod
+
+from neutron.api import extensions
+from neutron.api.v2 import base
+from neutron.api.v2 import resource
+from neutron.common import constants
+from neutron.extensions import agent
+from neutron import manager
+from neutron.plugins.common import constants as plugin_const
+from neutron import policy
+from neutron import wsgi
+
+LOADBALANCER_POOL = 'loadbalancer-pool'
+LOADBALANCER_POOLS = LOADBALANCER_POOL + 's'
+LOADBALANCER_AGENT = 'loadbalancer-agent'
+
+
+class PoolSchedulerController(wsgi.Controller):
+    def index(self, request, **kwargs):
+        lbaas_plugin = manager.NeutronManager.get_service_plugins().get(
+            plugin_const.LOADBALANCER)
+        if not lbaas_plugin:
+            return {'pools': []}
+
+        policy.enforce(request.context,
+                       "get_%s" % LOADBALANCER_POOLS,
+                       {},
+                       plugin=lbaas_plugin)
+        return lbaas_plugin.list_pools_on_lbaas_agent(
+            request.context, kwargs['agent_id'])
+
+
+class LbaasAgentHostingPoolController(wsgi.Controller):
+    def index(self, request, **kwargs):
+        lbaas_plugin = manager.NeutronManager.get_service_plugins().get(
+            plugin_const.LOADBALANCER)
+        if not lbaas_plugin:
+            return
+
+        policy.enforce(request.context,
+                       "get_%s" % LOADBALANCER_AGENT,
+                       {},
+                       plugin=lbaas_plugin)
+        return lbaas_plugin.get_lbaas_agent_hosting_pool(
+            request.context, kwargs['pool_id'])
+
+
+class Lbaas_agentscheduler(extensions.ExtensionDescriptor):
+    """Extension class supporting l3 agent scheduler.
+    """
+
+    @classmethod
+    def get_name(cls):
+        return "Loadbalancer Agent Scheduler"
+
+    @classmethod
+    def get_alias(cls):
+        return constants.LBAAS_AGENT_SCHEDULER_EXT_ALIAS
+
+    @classmethod
+    def get_description(cls):
+        return "Schedule pools among lbaas agents"
+
+    @classmethod
+    def get_namespace(cls):
+        return "http://docs.openstack.org/ext/lbaas_agent_scheduler/api/v1.0"
+
+    @classmethod
+    def get_updated(cls):
+        return "2013-02-07T10:00:00-00:00"
+
+    @classmethod
+    def get_resources(cls):
+        """Returns Ext Resources."""
+        exts = []
+        parent = dict(member_name="agent",
+                      collection_name="agents")
+
+        controller = resource.Resource(PoolSchedulerController(),
+                                       base.FAULT_MAP)
+        exts.append(extensions.ResourceExtension(
+            LOADBALANCER_POOLS, controller, parent))
+
+        parent = dict(member_name="pool",
+                      collection_name="pools")
+
+        controller = resource.Resource(LbaasAgentHostingPoolController(),
+                                       base.FAULT_MAP)
+        exts.append(extensions.ResourceExtension(
+            LOADBALANCER_AGENT, controller, parent,
+            path_prefix=plugin_const.
+            COMMON_PREFIXES[plugin_const.LOADBALANCER]))
+        return exts
+
+    def get_extended_resources(self, version):
+        return {}
+
+
+class NoEligibleLbaasAgent(agent.AgentNotFound):
+    message = _("No eligible loadbalancer agent found "
+                "for pool %(pool_id)s.")
+
+
+class NoActiveLbaasAgent(agent.AgentNotFound):
+    message = _("No active loadbalancer agent found "
+                "for pool %(pool_id)s.")
+
+
+class LbaasAgentSchedulerPluginBase(object):
+    """REST API to operate the lbaas agent scheduler.
+
+    All of method must be in an admin context.
+    """
+
+    @abstractmethod
+    def list_pools_on_lbaas_agent(self, context, id):
+        pass
+
+    @abstractmethod
+    def get_lbaas_agent_hosting_pool(self, context, pool_id):
+        pass
index bf54716335ad839b79ea7e787a6a5b652c1e2446..bc71464191248c67171339af64c3532705fd978d 100644 (file)
@@ -177,6 +177,12 @@ class NeutronManager(object):
 
             self.service_plugins[plugin_inst.get_plugin_type()] = plugin_inst
 
+            # search for possible agent notifiers declared in service plugin
+            # (needed by agent management extension)
+            if (hasattr(self.plugin, 'agent_notifiers') and
+                    hasattr(plugin_inst, 'agent_notifiers')):
+                self.plugin.agent_notifiers.update(plugin_inst.agent_notifiers)
+
             LOG.debug(_("Successfully loaded %(type)s plugin. "
                         "Description: %(desc)s"),
                       {"type": plugin_inst.get_plugin_type(),
index f809a5132cdc99dab22f811cfa5abecda3683647..2a676e918d6394074abf6a4d74250a7054a614da 100644 (file)
@@ -30,6 +30,7 @@ from oslo.config import cfg
 from neutron.agent import securitygroups_rpc as sg_rpc
 from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
 from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
+from neutron.common import constants as q_const
 from neutron.common import rpc as q_rpc
 from neutron.common import topics
 from neutron.common import utils
@@ -254,8 +255,12 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
         # Consume from all consumers in a thread
         self.conn.consume_in_thread()
         self.notifier = AgentNotifierApi(topics.AGENT)
-        self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
-        self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
+        self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
+            dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
+        )
+        self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
+            l3_rpc_agent_api.L3AgentNotify
+        )
 
     def create_network(self, context, network):
         """Create network.
index 1f6383fc33edf807d685d959b0b6ac7f1fc5108f..367d5d45d23772181a410858cf6440027be78f26 100644 (file)
@@ -268,8 +268,12 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
         # Consume from all consumers in a thread
         self.conn.consume_in_thread()
         self.notifier = AgentNotifierApi(topics.AGENT)
-        self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
-        self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
+        self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
+            dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
+        )
+        self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
+            l3_rpc_agent_api.L3AgentNotify
+        )
 
     def _parse_network_vlan_ranges(self):
         try:
index 45236b80ff2f3c5a2be4f6596021753c3fe73ecb..891386481ecac0b5e3bc672a8872be8d0b81e3c5 100644 (file)
@@ -107,8 +107,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
 
     def _setup_rpc(self):
         self.notifier = rpc.AgentNotifierApi(topics.AGENT)
-        self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
-        self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
+        self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
+            dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
+        )
+        self.agent_notifiers[const.AGENT_TYPE_L3] = (
+            l3_rpc_agent_api.L3AgentNotify
+        )
         self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager)
         self.topic = topics.PLUGIN
         self.conn = c_rpc.create_connection(new=True)
index 08b74403e6d93f2d2666b7cb60b8c75c39fecfa3..17c0e62a872b1333c9fa4d65f1e39cd478ef0164 100644 (file)
@@ -19,6 +19,7 @@
 from neutron.agent import securitygroups_rpc as sg_rpc
 from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
 from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
+from neutron.common import constants as q_const
 from neutron.common import exceptions as q_exc
 from neutron.common import rpc as q_rpc
 from neutron.common import topics
@@ -119,8 +120,12 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
         self.topic = topics.PLUGIN
         self.conn = rpc.create_connection(new=True)
         self.notifier = NECPluginV2AgentNotifierApi(topics.AGENT)
-        self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
-        self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
+        self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
+            dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
+        )
+        self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
+            l3_rpc_agent_api.L3AgentNotify
+        )
 
         # NOTE: callback_sg is referred to from the sg unit test.
         self.callback_sg = SecurityGroupServerRpcCallback()
index 0647a16fa395c407f84dc774ca09801fa595cca6..b5f50c3d26fe6c08aa79596c218829550f77028c 100644 (file)
@@ -819,7 +819,8 @@ class NvpPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
         self.dispatcher = NVPRpcCallbacks().create_rpc_dispatcher()
         self.conn.create_consumer(self.topic, self.dispatcher,
                                   fanout=False)
-        self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
+        self.agent_notifiers[constants.AGENT_TYPE_DHCP] = (
+            dhcp_rpc_agent_api.DhcpAgentNotifyAPI())
         # Consume from all consumers in a thread
         self.conn.consume_in_thread()
 
index 8a50b2ff9ab12ba73c58502c0a0522ecdc3b6796..bb29c91149c036608ceee4e0d794416086ff0f8e 100644 (file)
@@ -309,8 +309,12 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
         self.topic = topics.PLUGIN
         self.conn = rpc.create_connection(new=True)
         self.notifier = AgentNotifierApi(topics.AGENT)
-        self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
-        self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
+        self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
+            dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
+        )
+        self.agent_notifiers[q_const.AGENT_TYPE_L3] = (
+            l3_rpc_agent_api.L3AgentNotify
+        )
         self.callbacks = OVSRpcCallbacks(self.notifier, self.tunnel_type)
         self.dispatcher = self.callbacks.create_rpc_dispatcher()
         self.conn.create_consumer(self.topic, self.dispatcher,
diff --git a/neutron/services/loadbalancer/agent_scheduler.py b/neutron/services/loadbalancer/agent_scheduler.py
new file mode 100644 (file)
index 0000000..084496e
--- /dev/null
@@ -0,0 +1,114 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 OpenStack Foundation.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import random
+
+import sqlalchemy as sa
+from sqlalchemy import orm
+from sqlalchemy.orm import joinedload
+
+from neutron.common import constants
+from neutron.db import agents_db
+from neutron.db import agentschedulers_db
+from neutron.db import model_base
+from neutron.extensions import lbaas_agentscheduler
+from neutron.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+class PoolLoadbalancerAgentBinding(model_base.BASEV2):
+    """Represents binding between neutron loadbalancer pools and agents."""
+
+    pool_id = sa.Column(sa.String(36),
+                        sa.ForeignKey("pools.id", ondelete='CASCADE'),
+                        primary_key=True)
+    agent = orm.relation(agents_db.Agent)
+    agent_id = sa.Column(sa.String(36), sa.ForeignKey("agents.id",
+                                                      ondelete='CASCADE'))
+
+
+class LbaasAgentSchedulerDbMixin(agentschedulers_db.AgentSchedulerDbMixin,
+                                 lbaas_agentscheduler
+                                 .LbaasAgentSchedulerPluginBase):
+
+    def get_lbaas_agent_hosting_pool(self, context, pool_id, active=None):
+        query = context.session.query(PoolLoadbalancerAgentBinding)
+        query = query.options(joinedload('agent'))
+        binding = query.get(pool_id)
+
+        if (binding and self.is_eligible_agent(
+                active, binding.agent)):
+            return {'agent': self._make_agent_dict(binding.agent)}
+
+    def get_lbaas_agents(self, context, active=None, filters=None):
+        query = context.session.query(agents_db.Agent)
+        query = query.filter_by(agent_type=constants.AGENT_TYPE_LOADBALANCER)
+        if active is not None:
+            query = query.filter_by(admin_state_up=active)
+        if filters:
+            for key, value in filters.iteritems():
+                column = getattr(agents_db.Agent, key, None)
+                if column:
+                    query = query.filter(column.in_(value))
+
+        return [agent
+                for agent in query
+                if self.is_eligible_agent(active, agent)]
+
+    def list_pools_on_lbaas_agent(self, context, id):
+        query = context.session.query(PoolLoadbalancerAgentBinding.pool_id)
+        query = query.filter_by(agent_id=id)
+        pool_ids = [item[0] for item in query]
+        if pool_ids:
+            return {'pools': self.get_pools(context, filters={'id': pool_ids})}
+        else:
+            return {'pools': []}
+
+
+class ChanceScheduler(object):
+    """Allocate a loadbalancer agent for a vip in a random way."""
+
+    def schedule(self, plugin, context, pool):
+        """Schedule the pool to an active loadbalancer agent if there
+        is no enabled agent hosting it.
+        """
+        with context.session.begin(subtransactions=True):
+            lbaas_agent = plugin.get_lbaas_agent_hosting_pool(
+                context, pool['id'])
+            if lbaas_agent:
+                LOG.debug(_('Pool %(pool_id)s has already been hosted'
+                            ' by lbaas agent %(agent_id)s'),
+                          {'pool_id': pool['id'],
+                           'agent_id': lbaas_agent['id']})
+                return
+
+            candidates = plugin.get_lbaas_agents(context, active=True)
+            if not candidates:
+                LOG.warn(_('No active lbaas agents for pool %s') % pool['id'])
+                return
+
+            chosen_agent = random.choice(candidates)
+            binding = PoolLoadbalancerAgentBinding()
+            binding.agent = chosen_agent
+            binding.pool_id = pool['id']
+            context.session.add(binding)
+            LOG.debug(_('Pool %(pool_id)s is scheduled to '
+                        'lbaas agent %(agent_id)s'),
+                      {'pool_id': pool['id'],
+                       'agent_id': chosen_agent['id']})
+            return chosen_agent
index 0aa183a999b21e549e6aae28411aa045531cef01..71e123ff2270ad1f49a4df6f448e1fc567c19d8b 100644 (file)
@@ -55,6 +55,7 @@ def main():
     cfg.CONF.register_opts(manager.OPTS)
     # import interface options just in case the driver uses namespaces
     cfg.CONF.register_opts(interface.OPTS)
+    config.register_agent_state_opts_helper(cfg.CONF)
     config.register_root_helper(cfg.CONF)
 
     cfg.CONF(project='neutron')
index bea874dabb95483fe55766a30b521f640ca4d629..41da44ae41d8bb20207195c41b1b23b02bd5e073 100644 (file)
@@ -21,9 +21,12 @@ import weakref
 from oslo.config import cfg
 
 from neutron.agent.common import config
+from neutron.agent import rpc as agent_rpc
+from neutron.common import constants
 from neutron import context
 from neutron.openstack.common import importutils
 from neutron.openstack.common import log as logging
+from neutron.openstack.common import loopingcall
 from neutron.openstack.common import periodic_task
 from neutron.services.loadbalancer.drivers.haproxy import (
     agent_api,
@@ -110,6 +113,12 @@ class LogicalDeviceCache(object):
 
 
 class LbaasAgentManager(periodic_task.PeriodicTasks):
+
+    # history
+    #   1.0 Initial version
+    #   1.1 Support agent_updated call
+    RPC_API_VERSION = '1.1'
+
     def __init__(self, conf):
         self.conf = conf
         try:
@@ -131,14 +140,45 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
         except ImportError:
             msg = _('Error importing loadbalancer device driver: %s')
             raise SystemExit(msg % conf.device_driver)
-        ctx = context.get_admin_context_without_session()
+
+        self.agent_state = {
+            'binary': 'neutron-loadbalancer-agent',
+            'host': conf.host,
+            'topic': plugin_driver.TOPIC_LOADBALANCER_AGENT,
+            'configurations': {'device_driver': conf.device_driver,
+                               'interface_driver': conf.interface_driver},
+            'agent_type': constants.AGENT_TYPE_LOADBALANCER,
+            'start_flag': True}
+        self.admin_state_up = True
+
+        self.context = context.get_admin_context_without_session()
+        self._setup_rpc()
+        self.needs_resync = False
+        self.cache = LogicalDeviceCache()
+
+    def _setup_rpc(self):
         self.plugin_rpc = agent_api.LbaasAgentApi(
             plugin_driver.TOPIC_PROCESS_ON_HOST,
-            ctx,
-            conf.host
+            self.context,
+            self.conf.host
         )
-        self.needs_resync = False
-        self.cache = LogicalDeviceCache()
+        self.state_rpc = agent_rpc.PluginReportStateAPI(
+            plugin_driver.TOPIC_PROCESS_ON_HOST)
+        report_interval = self.conf.AGENT.report_interval
+        if report_interval:
+            heartbeat = loopingcall.FixedIntervalLoopingCall(
+                self._report_state)
+            heartbeat.start(interval=report_interval)
+
+    def _report_state(self):
+        try:
+            device_count = len(self.cache.devices)
+            self.agent_state['configurations']['devices'] = device_count
+            self.state_rpc.report_state(self.context,
+                                        self.agent_state)
+            self.agent_state.pop('start_flag', None)
+        except Exception:
+            LOG.exception("Failed reporting state!")
 
     def initialize_service_hook(self, started_by):
         self.sync_state()
@@ -228,3 +268,14 @@ class LbaasAgentManager(periodic_task.PeriodicTasks):
         """Handle RPC cast from plugin to destroy a pool if known to agent."""
         if self.cache.get_by_pool_id(pool_id):
             self.destroy_device(pool_id)
+
+    def agent_updated(self, context, payload):
+        """Handle the agent_updated notification event."""
+        if payload['admin_state_up'] != self.admin_state_up:
+            self.admin_state_up = payload['admin_state_up']
+            if self.admin_state_up:
+                self.needs_resync = True
+            else:
+                for pool_id in self.cache.get_pool_ids():
+                    self.destroy_device(pool_id)
+            LOG.info(_("agent_updated by server side %s!"), payload)
index 09e8f0da0fa57f55f4c5e55cb21b5875381d1670..85fb9de05e7532c6a42bcdffd81106ac9b998527 100644 (file)
@@ -20,9 +20,13 @@ import uuid
 
 from oslo.config import cfg
 
+from neutron.common import constants as q_const
 from neutron.common import exceptions as q_exc
 from neutron.common import rpc as q_rpc
+from neutron.db import agents_db
 from neutron.db.loadbalancer import loadbalancer_db
+from neutron.extensions import lbaas_agentscheduler
+from neutron.openstack.common import importutils
 from neutron.openstack.common import log as logging
 from neutron.openstack.common import rpc
 from neutron.openstack.common.rpc import proxy
@@ -37,19 +41,31 @@ ACTIVE_PENDING = (
     constants.PENDING_UPDATE
 )
 
+AGENT_SCHEDULER_OPTS = [
+    cfg.StrOpt('loadbalancer_pool_scheduler_driver',
+               default='neutron.services.loadbalancer.agent_scheduler'
+                       '.ChanceScheduler',
+               help=_('Driver to use for scheduling '
+                      'pool to a default loadbalancer agent')),
+]
+
+cfg.CONF.register_opts(AGENT_SCHEDULER_OPTS)
+
 # topic name for this particular agent implementation
 TOPIC_PROCESS_ON_HOST = 'q-lbaas-process-on-host'
 TOPIC_LOADBALANCER_AGENT = 'lbaas_process_on_host_agent'
 
 
 class LoadBalancerCallbacks(object):
+
     RPC_API_VERSION = '1.0'
 
     def __init__(self, plugin):
         self.plugin = plugin
 
     def create_rpc_dispatcher(self):
-        return q_rpc.PluginRpcDispatcher([self])
+        return q_rpc.PluginRpcDispatcher(
+            [self, agents_db.AgentExtRpcCallback(self.plugin)])
 
     def get_ready_devices(self, context, host=None):
         with context.session.begin(subtransactions=True):
@@ -61,6 +77,17 @@ class LoadBalancerCallbacks(object):
             up = True  # makes pep8 and sqlalchemy happy
             qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up)
             qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
+            agents = self.plugin.get_lbaas_agents(context,
+                                                  filters={'host': [host]})
+            if not agents:
+                return []
+            elif len(agents) > 1:
+                LOG.warning(_('Multiple lbaas agents found on host %s') % host)
+
+            pools = self.plugin.list_pools_on_lbaas_agent(context,
+                                                          agents[0].id)
+            pool_ids = [pool['id'] for pool in pools['pools']]
+            qry = qry.filter(loadbalancer_db.Pool.id.in_(pool_ids))
             return [id for id, in qry]
 
     def get_logical_device(self, context, pool_id=None, activate=True,
@@ -185,40 +212,50 @@ class LoadBalancerCallbacks(object):
 class LoadBalancerAgentApi(proxy.RpcProxy):
     """Plugin side of plugin to agent RPC API."""
 
-    API_VERSION = '1.0'
+    BASE_RPC_API_VERSION = '1.0'
+    # history
+    #   1.0 Initial version
+    #   1.1 Support agent_updated call
 
-    def __init__(self, topic, host):
-        super(LoadBalancerAgentApi, self).__init__(topic, self.API_VERSION)
-        self.host = host
+    def __init__(self, topic):
+        super(LoadBalancerAgentApi, self).__init__(
+            topic, default_version=self.BASE_RPC_API_VERSION)
+
+    def reload_pool(self, context, pool_id, host):
+        return self.cast(
+            context,
+            self.make_msg('reload_pool', pool_id=pool_id, host=host),
+            topic='%s.%s' % (self.topic, host)
+        )
 
-    def reload_pool(self, context, pool_id):
+    def destroy_pool(self, context, pool_id, host):
         return self.cast(
             context,
-            self.make_msg('reload_pool', pool_id=pool_id, host=self.host),
-            topic=self.topic
+            self.make_msg('destroy_pool', pool_id=pool_id, host=host),
+            topic='%s.%s' % (self.topic, host)
         )
 
-    def destroy_pool(self, context, pool_id):
+    def modify_pool(self, context, pool_id, host):
         return self.cast(
             context,
-            self.make_msg('destroy_pool', pool_id=pool_id, host=self.host),
-            topic=self.topic
+            self.make_msg('modify_pool', pool_id=pool_id, host=host),
+            topic='%s.%s' % (self.topic, host)
         )
 
-    def modify_pool(self, context, pool_id):
+    def agent_updated(self, context, admin_state_up, host):
         return self.cast(
             context,
-            self.make_msg('modify_pool', pool_id=pool_id, host=self.host),
-            topic=self.topic
+            self.make_msg('agent_updated',
+                          payload={'admin_state_up': admin_state_up}),
+            topic='%s.%s' % (self.topic, host),
+            version='1.1'
         )
 
 
 class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
+
     def __init__(self, plugin):
-        self.agent_rpc = LoadBalancerAgentApi(
-            TOPIC_LOADBALANCER_AGENT,
-            cfg.CONF.host
-        )
+        self.agent_rpc = LoadBalancerAgentApi(TOPIC_LOADBALANCER_AGENT)
         self.callbacks = LoadBalancerCallbacks(plugin)
 
         self.conn = rpc.create_connection(new=True)
@@ -228,56 +265,85 @@ class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
             fanout=False)
         self.conn.consume_in_thread()
         self.plugin = plugin
+        self.plugin.agent_notifiers.update(
+            {q_const.AGENT_TYPE_LOADBALANCER: self.agent_rpc})
+
+        self.pool_scheduler = importutils.import_object(
+            cfg.CONF.loadbalancer_pool_scheduler_driver)
+
+    def get_pool_agent(self, context, pool_id):
+        agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id)
+        if not agent:
+            raise lbaas_agentscheduler.NoActiveLbaasAgent(pool_id=pool_id)
+        return agent['agent']
 
     def create_vip(self, context, vip):
-        self.agent_rpc.reload_pool(context, vip['pool_id'])
+        agent = self.get_pool_agent(context, vip['pool_id'])
+        self.agent_rpc.reload_pool(context, vip['pool_id'], agent['host'])
 
     def update_vip(self, context, old_vip, vip):
+        agent = self.get_pool_agent(context, vip['pool_id'])
         if vip['status'] in ACTIVE_PENDING:
-            self.agent_rpc.reload_pool(context, vip['pool_id'])
+            self.agent_rpc.reload_pool(context, vip['pool_id'], agent['host'])
         else:
-            self.agent_rpc.destroy_pool(context, vip['pool_id'])
+            self.agent_rpc.destroy_pool(context, vip['pool_id'], agent['host'])
 
     def delete_vip(self, context, vip):
         self.plugin._delete_db_vip(context, vip['id'])
-        self.agent_rpc.destroy_pool(context, vip['pool_id'])
+        agent = self.get_pool_agent(context, vip['pool_id'])
+        self.agent_rpc.destroy_pool(context, vip['pool_id'], agent['host'])
 
     def create_pool(self, context, pool):
+        if not self.pool_scheduler.schedule(self.plugin, context, pool):
+            raise lbaas_agentscheduler.NoEligibleLbaasAgent(pool_id=pool['id'])
         # don't notify here because a pool needs a vip to be useful
-        pass
 
     def update_pool(self, context, old_pool, pool):
+        agent = self.get_pool_agent(context, pool['id'])
         if pool['status'] in ACTIVE_PENDING:
             if pool['vip_id'] is not None:
-                self.agent_rpc.reload_pool(context, pool['id'])
+                self.agent_rpc.reload_pool(context, pool['id'], agent['host'])
         else:
-            self.agent_rpc.destroy_pool(context, pool['id'])
+            self.agent_rpc.destroy_pool(context, pool['id'], agent['host'])
 
     def delete_pool(self, context, pool):
+        agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool['id'])
+        if agent:
+            self.agent_rpc.destroy_pool(context, pool['id'],
+                                        agent['agent']['host'])
         self.plugin._delete_db_pool(context, pool['id'])
-        self.agent_rpc.destroy_pool(context, pool['id'])
 
     def create_member(self, context, member):
-        self.agent_rpc.modify_pool(context, member['pool_id'])
+        agent = self.get_pool_agent(context, member['pool_id'])
+        self.agent_rpc.modify_pool(context, member['pool_id'], agent['host'])
 
     def update_member(self, context, old_member, member):
         # member may change pool id
         if member['pool_id'] != old_member['pool_id']:
-            self.agent_rpc.modify_pool(context, old_member['pool_id'])
-        self.agent_rpc.modify_pool(context, member['pool_id'])
+            agent = self.plugin.get_lbaas_agent_hosting_pool(
+                context, old_member['pool_id'])
+            if agent:
+                self.agent_rpc.modify_pool(context,
+                                           old_member['pool_id'],
+                                           agent['agent']['host'])
+        agent = self.get_pool_agent(context, member['pool_id'])
+        self.agent_rpc.modify_pool(context, member['pool_id'], agent['host'])
 
     def delete_member(self, context, member):
         self.plugin._delete_db_member(context, member['id'])
-        self.agent_rpc.modify_pool(context, member['pool_id'])
+        agent = self.get_pool_agent(context, member['pool_id'])
+        self.agent_rpc.modify_pool(context, member['pool_id'], agent['host'])
 
     def update_health_monitor(self, context, old_health_monitor,
                               health_monitor, pool_id):
         # monitors are unused here because agent will fetch what is necessary
-        self.agent_rpc.modify_pool(context, pool_id)
+        agent = self.get_pool_agent(context, pool_id)
+        self.agent_rpc.modify_pool(context, pool_id, agent['host'])
 
     def create_pool_health_monitor(self, context, healthmon, pool_id):
         # healthmon is not used here
-        self.agent_rpc.modify_pool(context, pool_id)
+        agent = self.get_pool_agent(context, pool_id)
+        self.agent_rpc.modify_pool(context, pool_id, agent['host'])
 
     def delete_pool_health_monitor(self, context, health_monitor, pool_id):
         self.plugin._delete_db_pool_health_monitor(
@@ -285,7 +351,8 @@ class HaproxyOnHostPluginDriver(abstract_driver.LoadBalancerAbstractDriver):
         )
 
         # healthmon_id is not used here
-        self.agent_rpc.modify_pool(context, pool_id)
+        agent = self.get_pool_agent(context, pool_id)
+        self.agent_rpc.modify_pool(context, pool_id, agent['host'])
 
     def create_health_monitor(self, context, health_monitor):
         pass
index 73612c0751662a1df97ea4db9b999b2adc905a89..3183c448a950f0c15fbd1e24537789c36ca8007e 100644 (file)
@@ -58,7 +58,7 @@ class NoopLbaaSDriver(abstract_driver.LoadBalancerAbstractDriver):
 
     @log.log
     def delete_pool(self, context, pool):
-        pass
+        self.plugin._delete_db_pool(context, pool["id"])
 
     @log.log
     def stats(self, context, pool_id):
index 642b14dfa5d7469faa80998b9fc6b9bdfa32973e..9457ac3c1d2fa61ab297d7695cd73fa0aaa997fe 100644 (file)
@@ -23,6 +23,7 @@ from neutron.db.loadbalancer import loadbalancer_db
 from neutron.openstack.common import importutils
 from neutron.openstack.common import log as logging
 from neutron.plugins.common import constants
+from neutron.services.loadbalancer import agent_scheduler
 
 LOG = logging.getLogger(__name__)
 
@@ -39,7 +40,8 @@ cfg.CONF.register_opts(lbaas_plugin_opts, "LBAAS")
 legacy.override_config(cfg.CONF, [('LBAAS', 'driver_fqn')])
 
 
-class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
+class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb,
+                         agent_scheduler.LbaasAgentSchedulerDbMixin):
 
     """Implementation of the Neutron Loadbalancer Service Plugin.
 
@@ -47,7 +49,12 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
     Most DB related works are implemented in class
     loadbalancer_db.LoadBalancerPluginDb.
     """
-    supported_extension_aliases = ["lbaas"]
+    supported_extension_aliases = ["lbaas", "lbaas_agent_scheduler"]
+
+    # lbaas agent notifiers to handle agent update operations;
+    # can be updated by plugin drivers while loading;
+    # will be extracted by neutron manager when loading service plugins;
+    agent_notifiers = {}
 
     def __init__(self):
         """Initialization for the loadbalancer service plugin."""
@@ -213,7 +220,7 @@ class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
         # update the db and return the value from db
         # else - return what we have in db
         if stats_data:
-            super(LoadBalancerPlugin, self)._update_pool_stats(
+            super(LoadBalancerPlugin, self).update_pool_stats(
                 context,
                 pool_id,
                 stats_data
index 470d7e4e5886b7fc69adfd9b73b0d36185f19872..584569741546c05e4fb2959818d096c490aa03b7 100644 (file)
 import contextlib
 import logging
 import os
-import testtools
 
+import mock
+from oslo.config import cfg
+import testtools
 import webob.exc
 
 from neutron.api.extensions import ExtensionMiddleware
 from neutron.api.extensions import PluginAwareExtensionManager
 from neutron.common import config
 from neutron import context
+import neutron.db.l3_db  # noqa
 from neutron.db.loadbalancer import loadbalancer_db as ldb
 import neutron.extensions
 from neutron.extensions import loadbalancer
@@ -46,34 +49,19 @@ ETCDIR = os.path.join(ROOTDIR, 'etc')
 
 extensions_path = ':'.join(neutron.extensions.__path__)
 
+_subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14"
+
 
 def etcdir(*p):
     return os.path.join(ETCDIR, *p)
 
 
-class LoadBalancerPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
+class LoadBalancerTestMixin(object):
     resource_prefix_map = dict(
         (k, constants.COMMON_PREFIXES[constants.LOADBALANCER])
         for k in loadbalancer.RESOURCE_ATTRIBUTE_MAP.keys()
     )
 
-    def setUp(self, core_plugin=None, lb_plugin=None):
-        service_plugins = {'lb_plugin_name': DB_LB_PLUGIN_KLASS}
-
-        super(LoadBalancerPluginDbTestCase, self).setUp(
-            service_plugins=service_plugins
-        )
-
-        self._subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14"
-
-        self.plugin = loadbalancer_plugin.LoadBalancerPlugin()
-        ext_mgr = PluginAwareExtensionManager(
-            extensions_path,
-            {constants.LOADBALANCER: self.plugin}
-        )
-        app = config.load_paste_app('extensions_test_app')
-        self.ext_api = ExtensionMiddleware(app, ext_mgr=ext_mgr)
-
     def _create_vip(self, fmt, name, pool_id, protocol, protocol_port,
                     admin_state_up, expected_res_status=None, **kwargs):
         data = {'vip': {'name': name,
@@ -97,7 +85,7 @@ class LoadBalancerPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
     def _create_pool(self, fmt, name, lb_method, protocol, admin_state_up,
                      expected_res_status=None, **kwargs):
         data = {'pool': {'name': name,
-                         'subnet_id': self._subnet_id,
+                         'subnet_id': _subnet_id,
                          'lb_method': lb_method,
                          'protocol': protocol,
                          'admin_state_up': admin_state_up,
@@ -151,12 +139,6 @@ class LoadBalancerPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
 
         return res
 
-    def _api_for_resource(self, resource):
-        if resource in ['networks', 'subnets', 'ports']:
-            return self.api
-        else:
-            return self.ext_api
-
     @contextlib.contextmanager
     def vip(self, fmt=None, name='vip1', pool=None, subnet=None,
             protocol='HTTP', protocol_port=80, admin_state_up=True,
@@ -270,7 +252,43 @@ class LoadBalancerPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
                 self._delete('health_monitors', the_health_monitor['id'])
 
 
+class LoadBalancerPluginDbTestCase(LoadBalancerTestMixin,
+                                   test_db_plugin.NeutronDbPluginV2TestCase):
+    def setUp(self, core_plugin=None, lb_plugin=None):
+        service_plugins = {'lb_plugin_name': DB_LB_PLUGIN_KLASS}
+        super(LoadBalancerPluginDbTestCase, self).setUp(
+            service_plugins=service_plugins
+        )
+
+        self._subnet_id = _subnet_id
+
+        self.plugin = loadbalancer_plugin.LoadBalancerPlugin()
+
+        get_lbaas_agent_patcher = mock.patch(
+            'neutron.services.loadbalancer.agent_scheduler'
+            '.LbaasAgentSchedulerDbMixin.get_lbaas_agent_hosting_pool')
+        mock_lbaas_agent = mock.MagicMock()
+        get_lbaas_agent_patcher.start().return_value = mock_lbaas_agent
+        mock_lbaas_agent.__getitem__.return_value = {'host': 'host'}
+        self.addCleanup(mock.patch.stopall)
+
+        ext_mgr = PluginAwareExtensionManager(
+            extensions_path,
+            {constants.LOADBALANCER: self.plugin}
+        )
+        app = config.load_paste_app('extensions_test_app')
+        self.ext_api = ExtensionMiddleware(app, ext_mgr=ext_mgr)
+
+
 class TestLoadBalancer(LoadBalancerPluginDbTestCase):
+    def setUp(self):
+        cfg.CONF.set_override('driver_fqn',
+                              'neutron.services.loadbalancer.drivers.noop'
+                              '.noop_driver.NoopLbaaSDriver',
+                              group='LBAAS')
+        self.addCleanup(cfg.CONF.reset)
+        super(TestLoadBalancer, self).setUp()
+
     def test_create_vip(self, **extras):
         expected = {
             'name': 'vip1',
index 8a1f4a757a32aaeae4670e15de8bc83b3d29745f..435064291edb320f317fc15ee941158cc1c5644e 100644 (file)
@@ -95,6 +95,7 @@ class DummyServicePlugin(ServicePluginBase):
     """
 
     supported_extension_aliases = ['dummy', servicetype.EXT_ALIAS]
+    agent_notifiers = {'dummy': 'dummy_agent_notifier'}
 
     def __init__(self):
         self.svctype_mgr = servicetype_db.ServiceTypeManager.get_instance()
index c4ab9124be1893c3c3c415fff17b7326b55b4f2d..bc206662f11e180405a98f64bbdc79516f8a6d6a 100644 (file)
@@ -1106,7 +1106,8 @@ class OvsL3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin,
 
     def test_router_add_to_l3_agent_notification(self):
         plugin = manager.NeutronManager.get_plugin()
-        with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
+        l3_notifier = plugin.agent_notifiers[constants.AGENT_TYPE_L3]
+        with mock.patch.object(l3_notifier, 'cast') as mock_l3:
             with self.router() as router1:
                 self._register_agent_states()
                 hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
@@ -1116,14 +1117,15 @@ class OvsL3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin,
                 routers = [router1['router']['id']]
             mock_l3.assert_called_with(
                 mock.ANY,
-                plugin.l3_agent_notifier.make_msg(
+                l3_notifier.make_msg(
                     'router_added_to_agent',
                     payload=routers),
                 topic='l3_agent.hosta')
 
     def test_router_remove_from_l3_agent_notification(self):
         plugin = manager.NeutronManager.get_plugin()
-        with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
+        l3_notifier = plugin.agent_notifiers[constants.AGENT_TYPE_L3]
+        with mock.patch.object(l3_notifier, 'cast') as mock_l3:
             with self.router() as router1:
                 self._register_agent_states()
                 hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
@@ -1133,22 +1135,22 @@ class OvsL3AgentNotifierTestCase(test_l3_plugin.L3NatTestCaseMixin,
                 self._remove_router_from_l3_agent(hosta_id,
                                                   router1['router']['id'])
             mock_l3.assert_called_with(
-                mock.ANY, plugin.l3_agent_notifier.make_msg(
+                mock.ANY, l3_notifier.make_msg(
                     'router_removed_from_agent',
                     payload={'router_id': router1['router']['id']}),
                 topic='l3_agent.hosta')
 
     def test_agent_updated_l3_agent_notification(self):
         plugin = manager.NeutronManager.get_plugin()
-        with mock.patch.object(plugin.l3_agent_notifier, 'cast') as mock_l3:
+        l3_notifier = plugin.agent_notifiers[constants.AGENT_TYPE_L3]
+        with mock.patch.object(l3_notifier, 'cast') as mock_l3:
             self._register_agent_states()
             hosta_id = self._get_agent_id(constants.AGENT_TYPE_L3,
                                           L3_HOSTA)
             self._disable_agent(hosta_id, admin_state_up=False)
             mock_l3.assert_called_with(
-                mock.ANY, plugin.l3_agent_notifier.make_msg(
-                    'agent_updated',
-                    payload={'admin_state_up': False}),
+                mock.ANY, l3_notifier.make_msg(
+                    'agent_updated', payload={'admin_state_up': False}),
                 topic='l3_agent.hosta')
 
 
index ff6d0341b13ff3cbc43e2fb011386a692541b861..6cc0cc6c7beff8ab609ecfacd0ffed45dcd82530 100644 (file)
@@ -53,13 +53,30 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
         self.callbacks = plugin_driver.LoadBalancerCallbacks(
             self.plugin_instance
         )
+        get_lbaas_agents_patcher = mock.patch(
+            'neutron.services.loadbalancer.agent_scheduler'
+            '.LbaasAgentSchedulerDbMixin.get_lbaas_agents')
+        get_lbaas_agents_patcher.start()
+
+        # mocking plugin_driver create_pool() as it does nothing more than
+        # pool scheduling which is beyond the scope of this test case
+        mock.patch('neutron.services.loadbalancer.drivers.haproxy'
+                   '.plugin_driver.HaproxyOnHostPluginDriver'
+                   '.create_pool').start()
+
+        self.addCleanup(mock.patch.stopall)
 
     def test_get_ready_devices(self):
         with self.vip() as vip:
-            ready = self.callbacks.get_ready_devices(
-                context.get_admin_context(),
-            )
-            self.assertEqual(ready, [vip['vip']['pool_id']])
+            with mock.patch('neutron.services.loadbalancer.agent_scheduler'
+                            '.LbaasAgentSchedulerDbMixin.'
+                            'list_pools_on_lbaas_agent') as mock_agent_pools:
+                mock_agent_pools.return_value = {
+                    'pools': [{'id': vip['vip']['pool_id']}]}
+                ready = self.callbacks.get_ready_devices(
+                    context.get_admin_context(),
+                )
+                self.assertEqual(ready, [vip['vip']['pool_id']])
 
     def test_get_ready_devices_multiple_vips_and_pools(self):
         ctx = context.get_admin_context()
@@ -100,11 +117,17 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
 
         self.assertEqual(ctx.session.query(ldb.Pool).count(), 3)
         self.assertEqual(ctx.session.query(ldb.Vip).count(), 2)
-        ready = self.callbacks.get_ready_devices(ctx)
-        self.assertEqual(len(ready), 2)
-        self.assertIn(pools[0].id, ready)
-        self.assertIn(pools[1].id, ready)
-        self.assertNotIn(pools[2].id, ready)
+        with mock.patch('neutron.services.loadbalancer.agent_scheduler'
+                        '.LbaasAgentSchedulerDbMixin'
+                        '.list_pools_on_lbaas_agent') as mock_agent_pools:
+            mock_agent_pools.return_value = {'pools': [{'id': pools[0].id},
+                                                       {'id': pools[1].id},
+                                                       {'id': pools[2].id}]}
+            ready = self.callbacks.get_ready_devices(ctx)
+            self.assertEqual(len(ready), 2)
+            self.assertIn(pools[0].id, ready)
+            self.assertIn(pools[1].id, ready)
+            self.assertNotIn(pools[2].id, ready)
         # cleanup
         ctx.session.query(ldb.Pool).delete()
         ctx.session.query(ldb.Vip).delete()
@@ -119,11 +142,15 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
                 vip['vip']['id'],
                 {'vip': {'status': constants.INACTIVE}}
             )
-
-            ready = self.callbacks.get_ready_devices(
-                context.get_admin_context(),
-            )
-            self.assertFalse(ready)
+            with mock.patch('neutron.services.loadbalancer.agent_scheduler'
+                            '.LbaasAgentSchedulerDbMixin.'
+                            'list_pools_on_lbaas_agent') as mock_agent_pools:
+                mock_agent_pools.return_value = {
+                    'pools': [{'id': vip['vip']['pool_id']}]}
+                ready = self.callbacks.get_ready_devices(
+                    context.get_admin_context(),
+                )
+                self.assertFalse(ready)
 
     def test_get_ready_devices_inactive_pool(self):
         with self.vip() as vip:
@@ -135,11 +162,15 @@ class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
                 vip['vip']['pool_id'],
                 {'pool': {'status': constants.INACTIVE}}
             )
-
-            ready = self.callbacks.get_ready_devices(
-                context.get_admin_context(),
-            )
-            self.assertFalse(ready)
+            with mock.patch('neutron.services.loadbalancer.agent_scheduler'
+                            '.LbaasAgentSchedulerDbMixin.'
+                            'list_pools_on_lbaas_agent') as mock_agent_pools:
+                mock_agent_pools.return_value = {
+                    'pools': [{'id': vip['vip']['pool_id']}]}
+                ready = self.callbacks.get_ready_devices(
+                    context.get_admin_context(),
+                )
+                self.assertFalse(ready)
 
     def test_get_logical_device_inactive(self):
         with self.pool() as pool:
@@ -235,26 +266,26 @@ class TestLoadBalancerAgentApi(base.BaseTestCase):
         super(TestLoadBalancerAgentApi, self).setUp()
         self.addCleanup(mock.patch.stopall)
 
-        self.api = plugin_driver.LoadBalancerAgentApi('topic', 'host')
+        self.api = plugin_driver.LoadBalancerAgentApi('topic')
         self.mock_cast = mock.patch.object(self.api, 'cast').start()
         self.mock_msg = mock.patch.object(self.api, 'make_msg').start()
 
     def test_init(self):
         self.assertEqual(self.api.topic, 'topic')
-        self.assertEqual(self.api.host, 'host')
 
     def _call_test_helper(self, method_name):
-        rv = getattr(self.api, method_name)(mock.sentinel.context, 'the_id')
+        rv = getattr(self.api, method_name)(mock.sentinel.context, 'test',
+                                            'host')
         self.assertEqual(rv, self.mock_cast.return_value)
         self.mock_cast.assert_called_once_with(
             mock.sentinel.context,
             self.mock_msg.return_value,
-            topic='topic'
+            topic='topic.host'
         )
 
         self.mock_msg.assert_called_once_with(
             method_name,
-            pool_id='the_id',
+            pool_id='test',
             host='host'
         )
 
@@ -267,6 +298,21 @@ class TestLoadBalancerAgentApi(base.BaseTestCase):
     def test_modify_pool(self):
         self._call_test_helper('modify_pool')
 
+    def test_agent_updated(self):
+        rv = self.api.agent_updated(mock.sentinel.context, True, 'host')
+        self.assertEqual(rv, self.mock_cast.return_value)
+        self.mock_cast.assert_called_once_with(
+            mock.sentinel.context,
+            self.mock_msg.return_value,
+            topic='topic.host',
+            version='1.1'
+        )
+
+        self.mock_msg.assert_called_once_with(
+            'agent_updated',
+            payload={'admin_state_up': True}
+        )
+
 
 class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
     def setUp(self):
@@ -276,6 +322,12 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
         super(TestLoadBalancerPluginNotificationWrapper, self).setUp()
         self.mock_api = api_cls.return_value
 
+        # mocking plugin_driver create_pool() as it does nothing more than
+        # pool scheduling which is beyond the scope of this test case
+        mock.patch('neutron.services.loadbalancer.drivers.haproxy'
+                   '.plugin_driver.HaproxyOnHostPluginDriver'
+                   '.create_pool').start()
+
         self.addCleanup(mock.patch.stopall)
 
     def test_create_vip(self):
@@ -284,7 +336,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
                 with self.vip(pool=pool, subnet=subnet) as vip:
                     self.mock_api.reload_pool.assert_called_once_with(
                         mock.ANY,
-                        vip['vip']['pool_id']
+                        vip['vip']['pool_id'],
+                        'host'
                     )
 
     def test_update_vip(self):
@@ -302,7 +355,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
 
                     self.mock_api.reload_pool.assert_called_once_with(
                         mock.ANY,
-                        vip['vip']['pool_id']
+                        vip['vip']['pool_id'],
+                        'host'
                     )
 
                     self.assertEqual(
@@ -319,7 +373,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
                     self.plugin_instance.delete_vip(ctx, vip['vip']['id'])
                     self.mock_api.destroy_pool.assert_called_once_with(
                         mock.ANY,
-                        vip['vip']['pool_id']
+                        vip['vip']['pool_id'],
+                        'host'
                     )
 
     def test_create_pool(self):
@@ -334,7 +389,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
             ctx = context.get_admin_context()
             self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool)
             self.mock_api.destroy_pool.assert_called_once_with(
-                mock.ANY, pool['pool']['id'])
+                mock.ANY, pool['pool']['id'], 'host')
             self.assertFalse(self.mock_api.reload_pool.called)
             self.assertFalse(self.mock_api.modify_pool.called)
 
@@ -352,7 +407,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
                 ctx = context.get_admin_context()
                 self.plugin_instance.update_pool(ctx, pool['pool']['id'], pool)
                 self.mock_api.reload_pool.assert_called_once_with(
-                    mock.ANY, pool['pool']['id'])
+                    mock.ANY, pool['pool']['id'], 'host')
                 self.assertFalse(self.mock_api.destroy_pool.called)
                 self.assertFalse(self.mock_api.modify_pool.called)
 
@@ -363,14 +418,14 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
             res = req.get_response(self.ext_api)
             self.assertEqual(res.status_int, 204)
             self.mock_api.destroy_pool.assert_called_once_with(
-                mock.ANY, pool['pool']['id'])
+                mock.ANY, pool['pool']['id'], 'host')
 
     def test_create_member(self):
         with self.pool() as pool:
             pool_id = pool['pool']['id']
             with self.member(pool_id=pool_id):
                 self.mock_api.modify_pool.assert_called_once_with(
-                    mock.ANY, pool_id)
+                    mock.ANY, pool_id, 'host')
 
     def test_update_member(self):
         with self.pool() as pool:
@@ -381,7 +436,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
                 self.plugin_instance.update_member(
                     ctx, member['member']['id'], member)
                 self.mock_api.modify_pool.assert_called_once_with(
-                    mock.ANY, pool_id)
+                    mock.ANY, pool_id, 'host')
 
     def test_update_member_new_pool(self):
         with self.pool() as pool1:
@@ -397,8 +452,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
                                                        member)
                     self.assertEqual(2, self.mock_api.modify_pool.call_count)
                     self.mock_api.modify_pool.assert_has_calls(
-                        [mock.call(mock.ANY, pool1_id),
-                         mock.call(mock.ANY, pool2_id)])
+                        [mock.call(mock.ANY, pool1_id, 'host'),
+                         mock.call(mock.ANY, pool2_id, 'host')])
 
     def test_delete_member(self):
         with self.pool() as pool:
@@ -411,7 +466,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
                 res = req.get_response(self.ext_api)
                 self.assertEqual(res.status_int, 204)
                 self.mock_api.modify_pool.assert_called_once_with(
-                    mock.ANY, pool_id)
+                    mock.ANY, pool_id, 'host')
 
     def test_create_pool_health_monitor(self):
         with self.pool() as pool:
@@ -422,7 +477,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
                                                                 hm,
                                                                 pool_id)
                 self.mock_api.modify_pool.assert_called_once_with(
-                    mock.ANY, pool_id)
+                    mock.ANY, pool_id, 'host')
 
     def test_delete_pool_health_monitor(self):
         with self.pool() as pool:
@@ -436,7 +491,7 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
                 self.plugin_instance.delete_pool_health_monitor(
                     ctx, hm['health_monitor']['id'], pool_id)
                 self.mock_api.modify_pool.assert_called_once_with(
-                    mock.ANY, pool_id)
+                    mock.ANY, pool_id, 'host')
 
     def test_update_health_monitor_associated_with_pool(self):
         with self.health_monitor(type='HTTP') as monitor:
@@ -457,7 +512,8 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
                 self.assertEqual(res.status_int, 201)
                 self.mock_api.modify_pool.assert_called_once_with(
                     mock.ANY,
-                    pool['pool']['id']
+                    pool['pool']['id'],
+                    'host'
                 )
 
                 self.mock_api.reset_mock()
@@ -471,5 +527,6 @@ class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
                 req.get_response(self.ext_api)
                 self.mock_api.modify_pool.assert_called_once_with(
                     mock.ANY,
-                    pool['pool']['id']
+                    pool['pool']['id'],
+                    'host'
                 )
diff --git a/neutron/tests/unit/services/loadbalancer/test_agent_scheduler.py b/neutron/tests/unit/services/loadbalancer/test_agent_scheduler.py
new file mode 100644 (file)
index 0000000..e3d3df0
--- /dev/null
@@ -0,0 +1,200 @@
+# Copyright (c) 2013 OpenStack Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+from webob import exc
+
+from neutron.api import extensions
+from neutron.api.v2 import attributes
+from neutron.common import constants
+from neutron import context
+from neutron.extensions import agent
+from neutron.extensions import lbaas_agentscheduler
+from neutron import manager
+from neutron.plugins.common import constants as plugin_const
+from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
+from neutron.tests.unit.openvswitch import test_agent_scheduler
+from neutron.tests.unit import test_agent_ext_plugin
+from neutron.tests.unit import test_db_plugin as test_plugin
+from neutron.tests.unit import test_extensions
+
+LBAAS_HOSTA = 'hosta'
+
+
+class AgentSchedulerTestMixIn(test_agent_scheduler.AgentSchedulerTestMixIn):
+    def _list_pools_hosted_by_lbaas_agent(self, agent_id,
+                                          expected_code=exc.HTTPOk.code,
+                                          admin_context=True):
+        path = "/agents/%s/%s.%s" % (agent_id,
+                                     lbaas_agentscheduler.LOADBALANCER_POOLS,
+                                     self.fmt)
+        return self._request_list(path, expected_code=expected_code,
+                                  admin_context=admin_context)
+
+    def _get_lbaas_agent_hosting_pool(self, pool_id,
+                                      expected_code=exc.HTTPOk.code,
+                                      admin_context=True):
+        path = "/lb/pools/%s/%s.%s" % (pool_id,
+                                       lbaas_agentscheduler.LOADBALANCER_AGENT,
+                                       self.fmt)
+        return self._request_list(path, expected_code=expected_code,
+                                  admin_context=admin_context)
+
+
+class LBaaSAgentSchedulerTestCase(test_agent_ext_plugin.AgentDBTestMixIn,
+                                  AgentSchedulerTestMixIn,
+                                  test_db_loadbalancer.LoadBalancerTestMixin,
+                                  test_plugin.NeutronDbPluginV2TestCase):
+    fmt = 'json'
+    plugin_str = ('neutron.plugins.openvswitch.'
+                  'ovs_neutron_plugin.OVSNeutronPluginV2')
+
+    def setUp(self):
+        # Save the global RESOURCE_ATTRIBUTE_MAP
+        self.saved_attr_map = {}
+        for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems():
+            self.saved_attr_map[resource] = attrs.copy()
+        service_plugins = {
+            'lb_plugin_name': test_db_loadbalancer.DB_LB_PLUGIN_KLASS}
+        super(LBaaSAgentSchedulerTestCase, self).setUp(
+            self.plugin_str, service_plugins=service_plugins)
+        ext_mgr = extensions.PluginAwareExtensionManager.get_instance()
+        self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
+        self.adminContext = context.get_admin_context()
+        # Add the resources to the global attribute map
+        # This is done here as the setup process won't
+        # initialize the main API router which extends
+        # the global attribute map
+        attributes.RESOURCE_ATTRIBUTE_MAP.update(
+            agent.RESOURCE_ATTRIBUTE_MAP)
+        self.addCleanup(self.restore_attribute_map)
+
+    def restore_attribute_map(self):
+        # Restore the original RESOURCE_ATTRIBUTE_MAP
+        attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map
+
+    def test_report_states(self):
+        self._register_agent_states(lbaas_agents=True)
+        agents = self._list_agents()
+        self.assertEqual(6, len(agents['agents']))
+
+    def test_pool_scheduling_on_pool_creation(self):
+        self._register_agent_states(lbaas_agents=True)
+        with self.pool() as pool:
+            lbaas_agent = self._get_lbaas_agent_hosting_pool(
+                pool['pool']['id'])
+            self.assertIsNotNone(lbaas_agent)
+            self.assertEqual(lbaas_agent['agent']['agent_type'],
+                             constants.AGENT_TYPE_LOADBALANCER)
+            pools = self._list_pools_hosted_by_lbaas_agent(
+                lbaas_agent['agent']['id'])
+            self.assertEqual(1, len(pools['pools']))
+            self.assertEqual(pool['pool'], pools['pools'][0])
+
+    def test_schedule_poll_with_disabled_agent(self):
+        lbaas_hosta = {
+            'binary': 'neutron-loadbalancer-agent',
+            'host': LBAAS_HOSTA,
+            'topic': 'LOADBALANCER_AGENT',
+            'configurations': {'device_driver': 'device_driver',
+                               'interface_driver': 'interface_driver'},
+            'agent_type': constants.AGENT_TYPE_LOADBALANCER}
+        self._register_one_agent_state(lbaas_hosta)
+        with self.pool() as pool:
+            lbaas_agent = self._get_lbaas_agent_hosting_pool(
+                pool['pool']['id'])
+            self.assertIsNotNone(lbaas_agent)
+
+        agents = self._list_agents()
+        self._disable_agent(agents['agents'][0]['id'])
+        pool = {'pool': {'name': 'test',
+                         'subnet_id': 'test',
+                         'lb_method': 'ROUND_ROBIN',
+                         'protocol': 'HTTP',
+                         'admin_state_up': True,
+                         'tenant_id': 'test',
+                         'description': 'test'}}
+        lbaas_plugin = manager.NeutronManager.get_service_plugins()[
+            plugin_const.LOADBALANCER]
+        self.assertRaises(lbaas_agentscheduler.NoEligibleLbaasAgent,
+                          lbaas_plugin.create_pool, self.adminContext, pool)
+
+    def test_schedule_poll_with_down_agent(self):
+        lbaas_hosta = {
+            'binary': 'neutron-loadbalancer-agent',
+            'host': LBAAS_HOSTA,
+            'topic': 'LOADBALANCER_AGENT',
+            'configurations': {'device_driver': 'device_driver',
+                               'interface_driver': 'interface_driver'},
+            'agent_type': constants.AGENT_TYPE_LOADBALANCER}
+        self._register_one_agent_state(lbaas_hosta)
+        is_agent_down_str = 'neutron.db.agents_db.AgentDbMixin.is_agent_down'
+        with mock.patch(is_agent_down_str) as mock_is_agent_down:
+            mock_is_agent_down.return_value = False
+            with self.pool() as pool:
+                lbaas_agent = self._get_lbaas_agent_hosting_pool(
+                    pool['pool']['id'])
+            self.assertIsNotNone(lbaas_agent)
+        with mock.patch(is_agent_down_str) as mock_is_agent_down:
+            mock_is_agent_down.return_value = True
+            pool = {'pool': {'name': 'test',
+                             'subnet_id': 'test',
+                             'lb_method': 'ROUND_ROBIN',
+                             'protocol': 'HTTP',
+                             'admin_state_up': True,
+                             'tenant_id': 'test',
+                             'description': 'test'}}
+            lbaas_plugin = manager.NeutronManager.get_service_plugins()[
+                plugin_const.LOADBALANCER]
+            self.assertRaises(lbaas_agentscheduler.NoEligibleLbaasAgent,
+                              lbaas_plugin.create_pool,
+                              self.adminContext, pool)
+
+    def test_pool_unscheduling_on_pool_deletion(self):
+        self._register_agent_states(lbaas_agents=True)
+        with self.pool(no_delete=True) as pool:
+            lbaas_agent = self._get_lbaas_agent_hosting_pool(
+                pool['pool']['id'])
+            self.assertIsNotNone(lbaas_agent)
+            self.assertEqual(lbaas_agent['agent']['agent_type'],
+                             constants.AGENT_TYPE_LOADBALANCER)
+            pools = self._list_pools_hosted_by_lbaas_agent(
+                lbaas_agent['agent']['id'])
+            self.assertEqual(1, len(pools['pools']))
+            self.assertEqual(pool['pool'], pools['pools'][0])
+
+            req = self.new_delete_request('pools',
+                                          pool['pool']['id'])
+            res = req.get_response(self.ext_api)
+            self.assertEqual(res.status_int, 204)
+            pools = self._list_pools_hosted_by_lbaas_agent(
+                lbaas_agent['agent']['id'])
+            self.assertEqual(0, len(pools['pools']))
+
+    def test_pool_scheduling_non_admin_access(self):
+        self._register_agent_states(lbaas_agents=True)
+        with self.pool() as pool:
+            self._get_lbaas_agent_hosting_pool(
+                pool['pool']['id'],
+                expected_code=exc.HTTPForbidden.code,
+                admin_context=False)
+            self._list_pools_hosted_by_lbaas_agent(
+                'fake_id',
+                expected_code=exc.HTTPForbidden.code,
+                admin_context=False)
+
+
+class LBaaSAgentSchedulerTestCaseXML(LBaaSAgentSchedulerTestCase):
+    fmt = 'xml'
index 10a1ecf0dc44cfd18b135e69aff574aeca562290..eb65d79bbc0e483f9f95ffb1ee95abd9375fa59e 100644 (file)
@@ -45,6 +45,8 @@ DHCP_HOSTA = 'hosta'
 L3_HOSTB = 'hostb'
 DHCP_HOSTC = 'hostc'
 DHCP_HOST1 = 'host1'
+LBAAS_HOSTA = 'hosta'
+LBAAS_HOSTB = 'hostb'
 
 
 class AgentTestExtensionManager(object):
@@ -83,7 +85,7 @@ class AgentDBTestMixIn(object):
             self.assertEqual(agent_res.status_int, expected_res_status)
         return agent_res
 
-    def _register_agent_states(self):
+    def _register_agent_states(self, lbaas_agents=False):
         """Register two L3 agents and two DHCP agents."""
         l3_hosta = {
             'binary': 'neutron-l3-agent',
@@ -110,6 +112,16 @@ class AgentDBTestMixIn(object):
             'agent_type': constants.AGENT_TYPE_DHCP}
         dhcp_hostc = copy.deepcopy(dhcp_hosta)
         dhcp_hostc['host'] = DHCP_HOSTC
+        lbaas_hosta = {
+            'binary': 'neutron-loadbalancer-agent',
+            'host': LBAAS_HOSTA,
+            'topic': 'LOADBALANCER_AGENT',
+            'configurations': {'device_driver': 'device_driver',
+                               'interface_driver': 'interface_driver',
+                               },
+            'agent_type': constants.AGENT_TYPE_LOADBALANCER}
+        lbaas_hostb = copy.deepcopy(lbaas_hosta)
+        lbaas_hostb['host'] = LBAAS_HOSTB
         callback = agents_db.AgentExtRpcCallback()
         callback.report_state(self.adminContext,
                               agent_state={'agent_state': l3_hosta},
@@ -123,7 +135,18 @@ class AgentDBTestMixIn(object):
         callback.report_state(self.adminContext,
                               agent_state={'agent_state': dhcp_hostc},
                               time=timeutils.strtime())
-        return [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc]
+
+        res = [l3_hosta, l3_hostb, dhcp_hosta, dhcp_hostc]
+        if lbaas_agents:
+            callback.report_state(self.adminContext,
+                                  agent_state={'agent_state': lbaas_hosta},
+                                  time=timeutils.strtime())
+            callback.report_state(self.adminContext,
+                                  agent_state={'agent_state': lbaas_hostb},
+                                  time=timeutils.strtime())
+            res += [lbaas_hosta, lbaas_hostb]
+
+        return res
 
     def _register_one_dhcp_agent(self):
         """Register one DHCP agent."""
index 16033e3ca62fb0a0a8b22bbf85649ec0c607dd48..6c6a3eec72fd2495db74d27c78945d4ea1d96665 100644 (file)
@@ -47,6 +47,11 @@ class MultiServiceCorePlugin(object):
     supported_extension_aliases = ['lbaas', 'dummy']
 
 
+class CorePluginWithAgentNotifiers(object):
+    agent_notifiers = {'l3': 'l3_agent_notifier',
+                       'dhcp': 'dhcp_agent_notifier'}
+
+
 class NeutronManagerTestCase(base.BaseTestCase):
 
     def setUp(self):
@@ -121,3 +126,16 @@ class NeutronManagerTestCase(base.BaseTestCase):
         self.assertIsNotNone(validate_pre_plugin_load())
         cfg.CONF.set_override('core_plugin', 'dummy.plugin')
         self.assertIsNone(validate_pre_plugin_load())
+
+    def test_manager_gathers_agent_notifiers_from_service_plugins(self):
+        cfg.CONF.set_override("service_plugins",
+                              ["neutron.tests.unit.dummy_plugin."
+                               "DummyServicePlugin"])
+        cfg.CONF.set_override("core_plugin",
+                              "neutron.tests.unit.test_neutron_manager."
+                              "CorePluginWithAgentNotifiers")
+        expected = {'l3': 'l3_agent_notifier',
+                    'dhcp': 'dhcp_agent_notifier',
+                    'dummy': 'dummy_agent_notifier'}
+        core_plugin = NeutronManager.get_plugin()
+        self.assertEqual(expected, core_plugin.agent_notifiers)