]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
LBaaS Agent Reference Implementation
authorMark McClain <mark.mcclain@dreamhost.com>
Sun, 24 Feb 2013 12:55:06 +0000 (07:55 -0500)
committerMark McClain <mark.mcclain@dreamhost.com>
Thu, 28 Feb 2013 02:49:57 +0000 (21:49 -0500)
implements blueprint lbaas-namespace-agent

This a reference implemention of the Quantum load balancing service
using HAProxy.  The implemention is designed for vendors, developers,
and deployers to become familiar with the API and service workflow.

This change also adds some constraint checks for data integrity.

Change-Id: I10a67da11840477ccf063b98149f4f77248802a1

34 files changed:
bin/quantum-lbaas-agent [new file with mode: 0755]
etc/lbaas_agent.ini [new file with mode: 0644]
etc/quantum/rootwrap.d/lbaas-haproxy.filters [new file with mode: 0644]
quantum/agent/linux/dhcp.py
quantum/agent/linux/utils.py
quantum/common/topics.py
quantum/db/loadbalancer/loadbalancer_db.py
quantum/db/migration/alembic_migrations/versions/54c2c487e913_lbaas.py
quantum/extensions/loadbalancer.py
quantum/plugins/services/agent_loadbalancer/__init__.py [moved from quantum/plugins/services/loadbalancer/__init__.py with 100% similarity]
quantum/plugins/services/agent_loadbalancer/agent/__init__.py [new file with mode: 0644]
quantum/plugins/services/agent_loadbalancer/agent/api.py [new file with mode: 0644]
quantum/plugins/services/agent_loadbalancer/agent/manager.py [new file with mode: 0644]
quantum/plugins/services/agent_loadbalancer/constants.py [new file with mode: 0644]
quantum/plugins/services/agent_loadbalancer/drivers/__init__.py [new file with mode: 0644]
quantum/plugins/services/agent_loadbalancer/drivers/haproxy/__init__.py [new file with mode: 0644]
quantum/plugins/services/agent_loadbalancer/drivers/haproxy/cfg.py [new file with mode: 0644]
quantum/plugins/services/agent_loadbalancer/drivers/haproxy/namespace_driver.py [new file with mode: 0644]
quantum/plugins/services/agent_loadbalancer/plugin.py [new file with mode: 0644]
quantum/plugins/services/loadbalancer/loadbalancerPlugin.py [deleted file]
quantum/tests/unit/db/loadbalancer/test_db_loadbalancer.py
quantum/tests/unit/services/__init__.py [new file with mode: 0644]
quantum/tests/unit/services/agent_loadbalancer/__init__.py [new file with mode: 0644]
quantum/tests/unit/services/agent_loadbalancer/agent/__init__.py [new file with mode: 0644]
quantum/tests/unit/services/agent_loadbalancer/agent/test_api.py [new file with mode: 0644]
quantum/tests/unit/services/agent_loadbalancer/agent/test_init.py [new file with mode: 0644]
quantum/tests/unit/services/agent_loadbalancer/agent/test_manager.py [new file with mode: 0644]
quantum/tests/unit/services/agent_loadbalancer/driver/__init__.py [new file with mode: 0644]
quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/__init__.py [new file with mode: 0644]
quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_namespace_driver.py [new file with mode: 0644]
quantum/tests/unit/services/agent_loadbalancer/test_plugin.py [new file with mode: 0644]
quantum/tests/unit/test_agent_linux_utils.py
quantum/tests/unit/test_linux_dhcp.py
setup.py

diff --git a/bin/quantum-lbaas-agent b/bin/quantum-lbaas-agent
new file mode 100755 (executable)
index 0000000..a53e457
--- /dev/null
@@ -0,0 +1,26 @@
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright (c) 2013 Openstack, LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import os
+import sys
+sys.path.insert(0, os.getcwd())
+
+from quantum.plugins.services.agent_loadbalancer.agent import main
+
+
+main()
diff --git a/etc/lbaas_agent.ini b/etc/lbaas_agent.ini
new file mode 100644 (file)
index 0000000..e3ea75c
--- /dev/null
@@ -0,0 +1,24 @@
+[DEFAULT]
+# Show debugging output in log (sets DEBUG log level output)
+# debug = true
+
+# The LBaaS agent will resync its state with Quantum to recover from any
+# transient notification or rpc errors. The interval is number of
+# seconds between attempts.
+# periodic_interval = 10
+
+# OVS based plugins(OVS, Ryu, NEC, NVP, BigSwitch/Floodlight)
+interface_driver = quantum.agent.linux.interface.OVSInterfaceDriver
+# OVS based plugins(Ryu, NEC, NVP, BigSwitch/Floodlight) that use OVS
+# as OpenFlow switch and check port status
+# ovs_use_veth = True
+# LinuxBridge
+# interface_driver = quantum.agent.linux.interface.BridgeInterfaceDriver
+
+# The agent requires a driver to manage the loadbalancer.  HAProxy is the
+# opensource version.
+device_driver = quantum.plugins.services.agent_loadbalancer.drivers.haproxy.namespace_driver.HaproxyNSDriver
+
+# Allow overlapping IP (Must have kernel build with CONFIG_NET_NS=y and
+# iproute2 package that supports namespaces).
+# use_namespaces = True
diff --git a/etc/quantum/rootwrap.d/lbaas-haproxy.filters b/etc/quantum/rootwrap.d/lbaas-haproxy.filters
new file mode 100644 (file)
index 0000000..e00a719
--- /dev/null
@@ -0,0 +1,29 @@
+# quantum-rootwrap command filters for nodes on which quantum is
+# expected to control network
+#
+# This file should be owned by (and only-writeable by) the root user
+
+# format seems to be
+# cmd-name: filter-name, raw-command, user, args
+
+[Filters]
+
+# haproxy
+haproxy: CommandFilter, /usr/sbin/haproxy, root
+
+# lbaas-agent uses kill as well, that's handled by the generic KillFilter
+kill_haproxy_usr: KillFilter, root, /usr/sbin/haproxy, -9, -HUP
+
+# lbaas-agent uses cat
+cat: RegExpFilter, /bin/cat, root, cat, /proc/\d+/cmdline
+
+ovs-vsctl: CommandFilter, /bin/ovs-vsctl, root
+ovs-vsctl_usr: CommandFilter, /usr/bin/ovs-vsctl, root
+ovs-vsctl_sbin: CommandFilter, /sbin/ovs-vsctl, root
+ovs-vsctl_sbin_usr: CommandFilter, /usr/sbin/ovs-vsctl, root
+
+# ip_lib
+ip: IpFilter, /sbin/ip, root
+ip_usr: IpFilter, /usr/sbin/ip, root
+ip_exec: IpNetnsExecFilter, /sbin/ip, root
+ip_exec_usr: IpNetnsExecFilter, /usr/sbin/ip, root
index 0c05fcaabb71e6110704614a69583cd84f75cd53..38e7fd273f89f4cc5372efaa80e428892c6495c0 100644 (file)
@@ -21,7 +21,6 @@ import re
 import socket
 import StringIO
 import sys
-import tempfile
 
 import netaddr
 from oslo.config import cfg
@@ -187,7 +186,7 @@ class DhcpLocalProcess(DhcpBase):
     def interface_name(self, value):
         interface_file_path = self.get_conf_file_name('interface',
                                                       ensure_conf_dir=True)
-        replace_file(interface_file_path, value)
+        utils.replace_file(interface_file_path, value)
 
     @abc.abstractmethod
     def spawn_process(self):
@@ -298,7 +297,7 @@ class Dnsmasq(DhcpLocalProcess):
                           (port.mac_address, name, alloc.ip_address))
 
         name = self.get_conf_file_name('host')
-        replace_file(name, buf.getvalue())
+        utils.replace_file(name, buf.getvalue())
         return name
 
     def _output_opts_file(self):
@@ -344,7 +343,7 @@ class Dnsmasq(DhcpLocalProcess):
                     options.append(self._format_option(i, 'router'))
 
         name = self.get_conf_file_name('opts')
-        replace_file(name, '\n'.join(options))
+        utils.replace_file(name, '\n'.join(options))
         return name
 
     def _make_subnet_interface_ip_map(self):
@@ -402,20 +401,3 @@ class Dnsmasq(DhcpLocalProcess):
             sock.connect(dhcp_relay_socket)
             sock.send(jsonutils.dumps(data))
             sock.close()
-
-
-def replace_file(file_name, data):
-    """Replaces the contents of file_name with data in a safe manner.
-
-    First write to a temp file and then rename. Since POSIX renames are
-    atomic, the file is unlikely to be corrupted by competing writes.
-
-    We create the tempfile on the same device to ensure that it can be renamed.
-    """
-
-    base_dir = os.path.dirname(os.path.abspath(file_name))
-    tmp_file = tempfile.NamedTemporaryFile('w+', dir=base_dir, delete=False)
-    tmp_file.write(data)
-    tmp_file.close()
-    os.chmod(tmp_file.name, 0644)
-    os.rename(tmp_file.name, file_name)
index a16d4815e1f697e519d45388ce273cc6059d5971..cbfefffebeb46f7e7563f763f459d2716f14bb77 100644 (file)
@@ -22,6 +22,7 @@ import os
 import shlex
 import socket
 import struct
+import tempfile
 
 from eventlet.green import subprocess
 
@@ -71,3 +72,20 @@ def get_interface_mac(interface):
                        struct.pack('256s', interface[:DEVICE_NAME_LEN]))
     return ''.join(['%02x:' % ord(char)
                     for char in info[MAC_START:MAC_END]])[:-1]
+
+
+def replace_file(file_name, data):
+    """Replaces the contents of file_name with data in a safe manner.
+
+    First write to a temp file and then rename. Since POSIX renames are
+    atomic, the file is unlikely to be corrupted by competing writes.
+
+    We create the tempfile on the same device to ensure that it can be renamed.
+    """
+
+    base_dir = os.path.dirname(os.path.abspath(file_name))
+    tmp_file = tempfile.NamedTemporaryFile('w+', dir=base_dir, delete=False)
+    tmp_file.write(data)
+    tmp_file.close()
+    os.chmod(tmp_file.name, 0644)
+    os.rename(tmp_file.name, file_name)
index 91970f2f982c1997af380039f2be9a981736b53d..4a25549586edecb650f1b129d71d0c890956a683 100644 (file)
@@ -25,9 +25,11 @@ UPDATE = 'update'
 AGENT = 'q-agent-notifier'
 PLUGIN = 'q-plugin'
 DHCP = 'q-dhcp-notifer'
+LOADBALANCER_PLUGIN = 'q-loadbalancer-plugin'
 
 L3_AGENT = 'l3_agent'
 DHCP_AGENT = 'dhcp_agent'
+LOADBALANCER_AGENT = 'loadbalancer_agent'
 
 
 def get_topic_name(prefix, table, operation):
index 69c9bc6003520360064953c54a1707971974ea82..7158cfe104b1442289b8429a4fd44106c5425309 100644 (file)
@@ -69,7 +69,7 @@ class Vip(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
     protocol_port = sa.Column(sa.Integer, nullable=False)
     protocol = sa.Column(sa.Enum("HTTP", "HTTPS", "TCP", name="lb_protocols"),
                          nullable=False)
-    pool_id = sa.Column(sa.String(36), nullable=False)
+    pool_id = sa.Column(sa.String(36), nullable=False, unique=True)
     session_persistence = orm.relationship(SessionPersistence,
                                            uselist=False,
                                            backref="vips",
@@ -114,6 +114,7 @@ class Pool(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
                                cascade="all, delete-orphan")
     monitors = orm.relationship("PoolMonitorAssociation", backref="pools",
                                 cascade="all, delete-orphan")
+    vip = orm.relationship(Vip, backref='pool')
 
 
 class HealthMonitor(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
@@ -239,6 +240,12 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
                 raise
         return r
 
+    def assert_modification_allowed(self, obj):
+        status = getattr(obj, 'status', None)
+
+        if status == constants.PENDING_DELETE:
+            raise loadbalancer.StateInvalid(id=id, state=status)
+
     ########################################################
     # VIP DB access
     def _make_vip_dict(self, vip, fields=None):
@@ -270,11 +277,6 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
 
         return self._fields(res, fields)
 
-    def _update_pool_vip_info(self, context, pool_id, vip_id):
-        pool_db = self._get_resource(context, Pool, pool_id)
-        with context.session.begin(subtransactions=True):
-            pool_db.update({'vip_id': vip_id})
-
     def _check_session_persistence_info(self, info):
         """ Performs sanity check on session persistence info.
         :param info: Session persistence info
@@ -355,6 +357,14 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
         tenant_id = self._get_tenant_id_for_create(context, v)
 
         with context.session.begin(subtransactions=True):
+            # validate that the pool has same tenant
+            if v['pool_id']:
+                pool = self._get_resource(context, Pool, v['pool_id'])
+                if pool['tenant_id'] != tenant_id:
+                    raise q_exc.NotAuthorized()
+            else:
+                pool = None
+
             vip_db = Vip(id=uuidutils.generate_uuid(),
                          tenant_id=tenant_id,
                          name=v['name'],
@@ -367,16 +377,18 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
                          admin_state_up=v['admin_state_up'],
                          status=constants.PENDING_CREATE)
 
-            vip_id = vip_db['id']
             session_info = v['session_persistence']
 
             if session_info:
-                s_p = self._create_session_persistence_db(session_info, vip_id)
+                s_p = self._create_session_persistence_db(
+                    session_info,
+                    vip_db['id'])
                 vip_db.session_persistence = s_p
 
             context.session.add(vip_db)
             context.session.flush()
 
+            # create a port to reserve address for IPAM
             self._create_port_for_vip(
                 context,
                 vip_db,
@@ -384,7 +396,9 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
                 v.get('address')
             )
 
-            self._update_pool_vip_info(context, v['pool_id'], vip_id)
+            if pool:
+                pool['vip_id'] = vip_db['id']
+
         return self._make_vip_dict(vip_db)
 
     def update_vip(self, context, id, vip):
@@ -392,20 +406,36 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
 
         sess_persist = v.pop('session_persistence', None)
         with context.session.begin(subtransactions=True):
+            vip_db = self._get_resource(context, Vip, id)
+
+            self.assert_modification_allowed(vip_db)
+
             if sess_persist:
                 self._update_vip_session_persistence(context, id, sess_persist)
             else:
                 self._delete_session_persistence(context, id)
 
-            vip_db = self._get_resource(context, Vip, id)
-            old_pool_id = vip_db['pool_id']
             if v:
                 vip_db.update(v)
                 # If the pool_id is changed, we need to update
                 # the associated pools
                 if 'pool_id' in v:
-                    self._update_pool_vip_info(context, old_pool_id, None)
-                    self._update_pool_vip_info(context, v['pool_id'], id)
+                    new_pool = self._get_resource(context, Pool, v['pool_id'])
+                    self.assert_modification_allowed(new_pool)
+
+                    # check that the pool matches the tenant_id
+                    if new_pool['tenant_id'] != vip_db['tenant_id']:
+                        raise q_exc.NotAuthorized()
+
+                    if vip_db['pool_id']:
+                        old_pool = self._get_resource(
+                            context,
+                            Pool,
+                            vip_db['pool_id']
+                        )
+                        old_pool['vip_id'] = None
+
+                    new_pool['vip_id'] = vip_db['id']
 
         return self._make_vip_dict(vip_db)
 
@@ -432,7 +462,7 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
 
     ########################################################
     # Pool DB access
-    def _make_pool_dict(self, context, pool, fields=None):
+    def _make_pool_dict(self, pool, fields=None):
         res = {'id': pool['id'],
                'tenant_id': pool['tenant_id'],
                'name': pool['name'],
@@ -453,16 +483,6 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
 
         return self._fields(res, fields)
 
-    def _update_pool_member_info(self, context, pool_id, membersInfo):
-        with context.session.begin(subtransactions=True):
-            member_qry = context.session.query(Member)
-            for member_id in membersInfo:
-                try:
-                    member = member_qry.filter_by(id=member_id).one()
-                    member.update({'pool_id': pool_id})
-                except exc.NoResultFound:
-                    raise loadbalancer.MemberNotFound(member_id=member_id)
-
     def _create_pool_stats(self, context, pool_id):
         # This is internal method to add pool statistics. It won't
         # be exposed to API
@@ -504,17 +524,17 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
             context.session.add(pool_db)
 
         pool_db = self._get_resource(context, Pool, pool_db['id'])
-        return self._make_pool_dict(context, pool_db)
+        return self._make_pool_dict(pool_db)
 
     def update_pool(self, context, id, pool):
-        v = pool['pool']
+        p = pool['pool']
 
         with context.session.begin(subtransactions=True):
             pool_db = self._get_resource(context, Pool, id)
-            if v:
-                pool_db.update(v)
+            if p:
+                pool_db.update(p)
 
-        return self._make_pool_dict(context, pool_db)
+        return self._make_pool_dict(pool_db)
 
     def delete_pool(self, context, id):
         # Check if the pool is in use
@@ -529,15 +549,15 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
 
     def get_pool(self, context, id, fields=None):
         pool = self._get_resource(context, Pool, id)
-        return self._make_pool_dict(context, pool, fields)
+        return self._make_pool_dict(pool, fields)
 
     def get_pools(self, context, filters=None, fields=None):
         collection = self._model_query(context, Pool)
         collection = self._apply_filters_to_query(collection, Pool, filters)
-        return [self._make_pool_dict(context, c, fields)
+        return [self._make_pool_dict(c, fields)
                 for c in collection.all()]
 
-    def get_stats(self, context, pool_id):
+    def stats(self, context, pool_id):
         with context.session.begin(subtransactions=True):
             pool_qry = context.session.query(Pool)
             try:
@@ -600,6 +620,7 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
                 raise loadbalancer.HealthMonitorNotFound(monitor_id=id)
 
     def get_pool_health_monitor(self, context, id, pool_id, fields=None):
+        # TODO(markmcclain) look into why pool_id is ignored
         healthmonitor = self._get_resource(context, HealthMonitor, id)
         return self._make_health_monitor_dict(healthmonitor, fields)
 
@@ -644,7 +665,6 @@ class LoadBalancerPluginDb(LoadBalancerPluginBase):
         v = member['member']
         with context.session.begin(subtransactions=True):
             member_db = self._get_resource(context, Member, id)
-            old_pool_id = member_db['pool_id']
             if v:
                 member_db.update(v)
 
index 799300313af623e5568f1d67e2bbe134544b8403..09ed5208dccf39c66d9a8484f09eb9bcf95df859 100644 (file)
@@ -58,6 +58,7 @@ def upgrade(active_plugin=None, options=None):
         sa.Column(u'admin_state_up', sa.Boolean(), nullable=False),
         sa.Column(u'connection_limit', sa.Integer(), nullable=True),
         sa.ForeignKeyConstraint(['port_id'], ['ports.id'], ),
+        sa.UniqueConstraint('pool_id'),
         sa.PrimaryKeyConstraint(u'id')
     )
     op.create_table(
index e1e6944d6bfecef9bdd36fbd78e15204d0daa44e..75e68a6a6b5772d7bf316f42628cb26914979c2c 100644 (file)
@@ -68,6 +68,7 @@ RESOURCE_ATTRIBUTE_MAP = {
                       'is_visible': True},
         'name': {'allow_post': True, 'allow_put': True,
                  'validate': {'type:string': None},
+                 'default': '',
                  'is_visible': True},
         'description': {'allow_post': True, 'allow_put': True,
                         'validate': {'type:string': None},
@@ -128,6 +129,7 @@ RESOURCE_ATTRIBUTE_MAP = {
                    'is_visible': True},
         'name': {'allow_post': True, 'allow_put': True,
                  'validate': {'type:string': None},
+                 'default': '',
                  'is_visible': True},
         'description': {'allow_post': True, 'allow_put': True,
                         'validate': {'type:string': None},
diff --git a/quantum/plugins/services/agent_loadbalancer/agent/__init__.py b/quantum/plugins/services/agent_loadbalancer/agent/__init__.py
new file mode 100644 (file)
index 0000000..3632729
--- /dev/null
@@ -0,0 +1,67 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Mark McClain, DreamHost
+
+import eventlet
+from oslo.config import cfg
+
+from quantum.agent.common import config
+from quantum.agent.linux import interface
+from quantum.common import topics
+from quantum.openstack.common.rpc import service as rpc_service
+from quantum.openstack.common import service
+from quantum.plugins.services.agent_loadbalancer.agent import manager
+
+
+OPTS = [
+    cfg.IntOpt(
+        'periodic_interval',
+        default=10,
+        help=_('Seconds between periodic task runs')
+    )
+]
+
+
+class LbaasAgentService(rpc_service.Service):
+    def start(self):
+        super(LbaasAgentService, self).start()
+        self.tg.add_timer(
+            cfg.CONF.periodic_interval,
+            self.manager.run_periodic_tasks,
+            None,
+            None
+        )
+
+
+def main():
+    eventlet.monkey_patch()
+    cfg.CONF.register_opts(OPTS)
+    cfg.CONF.register_opts(manager.OPTS)
+    # import interface options just in case the driver uses namespaces
+    cfg.CONF.register_opts(interface.OPTS)
+    config.register_root_helper(cfg.CONF)
+
+    cfg.CONF(project='quantum')
+    config.setup_logging(cfg.CONF)
+
+    mgr = manager.LbaasAgentManager(cfg.CONF)
+    svc = LbaasAgentService(
+        host=cfg.CONF.host,
+        topic=topics.LOADBALANCER_AGENT,
+        manager=mgr
+    )
+    service.launch(svc).wait()
diff --git a/quantum/plugins/services/agent_loadbalancer/agent/api.py b/quantum/plugins/services/agent_loadbalancer/agent/api.py
new file mode 100644 (file)
index 0000000..cd4314d
--- /dev/null
@@ -0,0 +1,81 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Mark McClain, DreamHost
+
+from quantum.openstack.common.rpc import proxy
+
+
+class LbaasAgentApi(proxy.RpcProxy):
+    """Agent side of the Agent to Plugin RPC API."""
+
+    API_VERSION = '1.0'
+
+    def __init__(self, topic, context, host):
+        super(LbaasAgentApi, self).__init__(topic, self.API_VERSION)
+        self.context = context
+        self.host = host
+
+    def get_ready_devices(self):
+        return self.call(
+            self.context,
+            self.make_msg('get_ready_devices', host=self.host),
+            topic=self.topic
+        )
+
+    def get_logical_device(self, pool_id):
+        return self.call(
+            self.context,
+            self.make_msg(
+                'get_logical_device',
+                pool_id=pool_id,
+                host=self.host
+            ),
+            topic=self.topic
+        )
+
+    def pool_destroyed(self, pool_id):
+        return self.call(
+            self.context,
+            self.make_msg('pool_destroyed', pool_id=pool_id, host=self.host),
+            topic=self.topic
+        )
+
+    def plug_vip_port(self, port_id):
+        return self.call(
+            self.context,
+            self.make_msg('plug_vip_port', port_id=port_id, host=self.host),
+            topic=self.topic
+        )
+
+    def unplug_vip_port(self, port_id):
+        return self.call(
+            self.context,
+            self.make_msg('unplug_vip_port', port_id=port_id, host=self.host),
+            topic=self.topic
+        )
+
+    def update_pool_stats(self, pool_id, stats):
+        return self.call(
+            self.context,
+            self.make_msg(
+                'update_pool_stats',
+                pool_id=pool_id,
+                stats=stats,
+                host=self.host
+            ),
+            topic=self.topic
+        )
diff --git a/quantum/plugins/services/agent_loadbalancer/agent/manager.py b/quantum/plugins/services/agent_loadbalancer/agent/manager.py
new file mode 100644 (file)
index 0000000..9dd2a70
--- /dev/null
@@ -0,0 +1,221 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Mark McClain, DreamHost
+
+import weakref
+
+from oslo.config import cfg
+
+from quantum.agent.common import config
+from quantum.common import topics
+from quantum import context
+from quantum.openstack.common import importutils
+from quantum.openstack.common import log as logging
+from quantum.openstack.common import periodic_task
+from quantum.plugins.services.agent_loadbalancer.agent import api
+
+LOG = logging.getLogger(__name__)
+NS_PREFIX = 'qlbaas-'
+
+OPTS = [
+    cfg.StrOpt(
+        'device_driver',
+        help=_('The driver used to manage the loadbalancing device'),
+    ),
+    cfg.StrOpt(
+        'loadbalancer_state_path',
+        default='$state_path/lbaas',
+        help=_('Location to store config and state files'),
+    ),
+    cfg.StrOpt(
+        'interface_driver',
+        help=_('The driver used to manage the virtual interface')
+    )
+]
+
+
+class LogicalDeviceCache(object):
+    """Manage a cache of known devices."""
+
+    class Device(object):
+        """Inner classes used to hold values for weakref lookups"""
+        def __init__(self, port_id, pool_id):
+            self.port_id = port_id
+            self.pool_id = pool_id
+
+        def __eq__(self, other):
+            return self.__dict__ == other.__dict__
+
+        def __hash__(self):
+            return hash((self.port_id, self.pool_id))
+
+    def __init__(self):
+        self.devices = set()
+        self.port_lookup = weakref.WeakValueDictionary()
+        self.pool_lookup = weakref.WeakValueDictionary()
+
+    def put(self, device):
+        port_id = device['vip']['port_id']
+        pool_id = device['pool']['id']
+        d = self.Device(device['vip']['port_id'], device['pool']['id'])
+        if d not in self.devices:
+            self.devices.add(d)
+            self.port_lookup[port_id] = d
+            self.pool_lookup[pool_id] = d
+
+    def remove(self, device):
+        if not isinstance(device, self.Device):
+            device = self.Device(
+                device['vip']['port_id'], device['pool']['id']
+            )
+        if device in self.devices:
+            self.devices.remove(device)
+
+    def remove_by_pool_id(self, pool_id):
+        d = self.pool_lookup.get(pool_id)
+        if d:
+            self.devices.remove(d)
+
+    def get_by_pool_id(self, pool_id):
+        return self.pool_lookup.get(pool_id)
+
+    def get_by_port_id(self, port_id):
+        return self.port_lookup.get(port_id)
+
+    def get_pool_ids(self):
+        return self.pool_lookup.keys()
+
+
+class LbaasAgentManager(periodic_task.PeriodicTasks):
+    def __init__(self, conf):
+        self.conf = conf
+        try:
+            vif_driver = importutils.import_object(conf.interface_driver, conf)
+        except ImportError:
+            # the driver is optional
+            msg = _('Error importing interface driver: %s')
+            raise SystemExit(msg % conf.interface_driver)
+            vif_driver = None
+
+        try:
+            self.driver = importutils.import_object(
+                conf.device_driver,
+                config.get_root_helper(self.conf),
+                conf.loadbalancer_state_path,
+                vif_driver,
+                self._vip_plug_callback
+            )
+        except ImportError:
+            msg = _('Error importing loadbalancer device driver: %s')
+            raise SystemExit(msg % conf.device_driver)
+        ctx = context.get_admin_context_without_session()
+        self.plugin_rpc = api.LbaasAgentApi(
+            topics.LOADBALANCER_PLUGIN,
+            ctx,
+            conf.host
+        )
+        self.needs_resync = False
+        self.cache = LogicalDeviceCache()
+
+    def initialize_service_hook(self, started_by):
+        self.sync_state()
+
+    @periodic_task.periodic_task
+    def periodic_resync(self, context):
+        if self.needs_resync:
+            self.needs_resync = False
+            self.sync_state()
+
+    @periodic_task.periodic_task(ticks_between_runs=6)
+    def collect_stats(self, context):
+        for pool_id in self.cache.get_pool_ids():
+            try:
+                stats = self.driver.get_stats(pool_id)
+                if stats:
+                    self.plugin_rpc.update_pool_stats(pool_id, stats)
+            except Exception:
+                LOG.exception(_('Error upating stats'))
+                self.needs_resync = True
+
+    def _vip_plug_callback(self, action, port):
+        if action == 'plug':
+            self.plugin_rpc.plug_vip_port(port['id'])
+        elif action == 'unplug':
+            self.plugin_rpc.unplug_vip_port(port['id'])
+
+    def sync_state(self):
+        known_devices = set(self.cache.get_pool_ids())
+        try:
+            ready_logical_devices = set(self.plugin_rpc.get_ready_devices())
+
+            for deleted_id in known_devices - ready_logical_devices:
+                self.destroy_device(deleted_id)
+
+            for pool_id in ready_logical_devices:
+                self.refresh_device(pool_id)
+
+        except Exception:
+            LOG.exception(_('Unable to retrieve ready devices'))
+            self.needs_resync = True
+
+        self.remove_orphans()
+
+    def refresh_device(self, pool_id):
+        try:
+            logical_config = self.plugin_rpc.get_logical_device(pool_id)
+
+            if self.driver.exists(pool_id):
+                self.driver.update(logical_config)
+            else:
+                self.driver.create(logical_config)
+            self.cache.put(logical_config)
+        except Exception:
+            LOG.exception(_('Unable to refresh device for pool: %s'), pool_id)
+            self.needs_resync = True
+
+    def destroy_device(self, pool_id):
+        device = self.cache.get_by_pool_id(pool_id)
+        if not device:
+            return
+        try:
+            self.driver.destroy(pool_id)
+            self.plugin_rpc.pool_destroyed(pool_id)
+        except Exception:
+            LOG.exception(_('Unable to destroy device for pool: %s'), pool_id)
+            self.needs_resync = True
+        self.cache.remove(device)
+
+    def remove_orphans(self):
+        try:
+            self.driver.remove_orphans(self.cache.get_pool_ids())
+        except NotImplementedError:
+            pass  # Not all drivers will support this
+
+    def reload_pool(self, context, pool_id=None, host=None):
+        """Handle RPC cast from plugin to reload a pool."""
+        if pool_id:
+            self.refresh_device(pool_id)
+
+    def modify_pool(self, context, pool_id=None, host=None):
+        """Handle RPC cast from plugin to modify a pool if known to agent."""
+        if self.cache.get_by_pool_id(pool_id):
+            self.refresh_device(pool_id)
+
+    def destroy_pool(self, context, pool_id=None, host=None):
+        """Handle RPC cast from plugin to destroy a pool if known to agent."""
+        if self.cache.get_by_pool_id(pool_id):
+            self.destroy_device(pool_id)
diff --git a/quantum/plugins/services/agent_loadbalancer/constants.py b/quantum/plugins/services/agent_loadbalancer/constants.py
new file mode 100644 (file)
index 0000000..82b049f
--- /dev/null
@@ -0,0 +1,33 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Mirantis, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+LB_METHOD_ROUND_ROBIN = 'ROUND_ROBIN'
+LB_METHOD_LEAST_CONNECTIONS = 'LEAST_CONNECTIONS'
+LB_METHOD_SOURCE_IP = 'SOURCE_IP'
+
+PROTOCOL_TCP = 'TCP'
+PROTOCOL_HTTP = 'HTTP'
+PROTOCOL_HTTPS = 'HTTPS'
+
+HEALTH_MONITOR_PING = 'PING'
+HEALTH_MONITOR_TCP = 'TCP'
+HEALTH_MONITOR_HTTP = 'HTTP'
+HEALTH_MONITOR_HTTPS = 'HTTPS'
+
+SESSION_PERSISTENCE_SOURCE_IP = 'SOURCE_IP'
+SESSION_PERSISTENCE_HTTP_COOKIE = 'HTTP_COOKIE'
+SESSION_PERSISTENCE_APP_COOKIE = 'APP_COOKIE'
diff --git a/quantum/plugins/services/agent_loadbalancer/drivers/__init__.py b/quantum/plugins/services/agent_loadbalancer/drivers/__init__.py
new file mode 100644 (file)
index 0000000..ce18bf6
--- /dev/null
@@ -0,0 +1,17 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Mark McClain, DreamHost
diff --git a/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/__init__.py b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/__init__.py
new file mode 100644 (file)
index 0000000..ce18bf6
--- /dev/null
@@ -0,0 +1,17 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Mark McClain, DreamHost
diff --git a/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/cfg.py b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/cfg.py
new file mode 100644 (file)
index 0000000..936bcff
--- /dev/null
@@ -0,0 +1,184 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Mark McClain, DreamHost
+
+import itertools
+
+from quantum.agent.linux import utils
+from quantum.plugins.common import constants as qconstants
+from quantum.plugins.services.agent_loadbalancer import constants
+
+
+PROTOCOL_MAP = {
+    constants.PROTOCOL_TCP: 'tcp',
+    constants.PROTOCOL_HTTP: 'http',
+    constants.PROTOCOL_HTTPS: 'tcp',
+}
+
+BALANCE_MAP = {
+    constants.LB_METHOD_ROUND_ROBIN: 'roundrobin',
+    constants.LB_METHOD_LEAST_CONNECTIONS: 'leastconn',
+    constants.LB_METHOD_SOURCE_IP: 'source'
+}
+
+ACTIVE = qconstants.ACTIVE
+
+
+def save_config(conf_path, logical_config, socket_path=None):
+    """Convert a logical configuration to the HAProxy version"""
+    data = []
+    data.extend(_build_global(logical_config, socket_path=socket_path))
+    data.extend(_build_defaults(logical_config))
+    data.extend(_build_frontend(logical_config))
+    data.extend(_build_backend(logical_config))
+    utils.replace_file(conf_path, '\n'.join(data))
+
+
+def _build_global(config, socket_path=None):
+    opts = [
+        'daemon',
+        'user nobody',
+        'group nogroup',
+        'log /dev/log local0',
+        'log /dev/log local1 notice'
+    ]
+
+    if socket_path:
+        opts.append('stats socket %s mode 0666 level user' % socket_path)
+
+    return itertools.chain(['global'], ('\t' + o for o in opts))
+
+
+def _build_defaults(config):
+    opts = [
+        'log global',
+        'retries 3',
+        'option redispatch',
+        'timeout connect 5000',
+        'timeout client 50000',
+        'timeout server 50000',
+    ]
+
+    return itertools.chain(['defaults'], ('\t' + o for o in opts))
+
+
+def _build_frontend(config):
+    protocol = config['vip']['protocol']
+
+    opts = [
+        'option tcplog',
+        'bind %s:%d' % (
+            _get_first_ip_from_port(config['vip']['port']),
+            config['vip']['protocol_port']
+        ),
+        'mode %s' % PROTOCOL_MAP[protocol],
+        'default_backend %s' % config['pool']['id'],
+    ]
+
+    if config['vip']['connection_limit'] >= 0:
+        opts.append('maxconn %s' % config['vip']['connection_limit'])
+
+    if protocol == constants.PROTOCOL_HTTP:
+        opts.append('option forwardfor')
+
+    return itertools.chain(
+        ['frontend %s' % config['vip']['id']],
+        ('\t' + o for o in opts)
+    )
+
+
+def _build_backend(config):
+    protocol = config['pool']['protocol']
+    lb_method = config['pool']['lb_method']
+
+    opts = [
+        'mode %s' % PROTOCOL_MAP[protocol],
+        'balance %s' % BALANCE_MAP.get(lb_method, 'roundrobin')
+    ]
+
+    if protocol == constants.PROTOCOL_HTTP:
+        opts.append('option forwardfor')
+
+    # add the first health_monitor (if available)
+    server_addon, health_opts = _get_server_health_option(config)
+    opts.extend(health_opts)
+
+    # add the members
+    opts.extend(
+        (('server %(id)s %(address)s:%(protocol_port)s '
+         'weight %(weight)s') % member) + server_addon
+        for member in config['members']
+        if (member['status'] == ACTIVE and member['admin_state_up'])
+    )
+
+    return itertools.chain(
+        ['backend %s' % config['pool']['id']],
+        ('\t' + o for o in opts)
+    )
+
+
+def _get_first_ip_from_port(port):
+    for fixed_ip in port['fixed_ips']:
+        return fixed_ip['ip_address']
+
+
+def _get_server_health_option(config):
+    """return the first active health option"""
+    for monitor in config['healthmonitors']:
+        if monitor['status'] == ACTIVE and monitor['admin_state_up']:
+            break
+    else:
+        return '', []
+
+    server_addon = ' check inter %(delay)ds fall %(max_retries)d' % monitor
+    opts = [
+        'timeout check %ds' % monitor['timeout']
+    ]
+
+    if monitor['type'] in (constants.HEALTH_MONITOR_HTTP,
+                           constants.HEALTH_MONITOR_HTTPS):
+        opts.append('option httpchk %(http_method)s %(url_path)s' % monitor)
+        opts.append(
+            'http-check expect rstatus %s' %
+            '|'.join(_expand_expected_codes(monitor['expected_codes']))
+        )
+
+    if monitor['type'] == constants.HEALTH_MONITOR_HTTPS:
+        opts.append('option ssl-hello-chk')
+
+    return server_addon, opts
+
+
+def _expand_expected_codes(codes):
+    """Expand the expected code string in set of codes.
+
+    200-204 -> 200, 201, 202, 204
+    200, 203 -> 200, 203
+    """
+
+    retval = set()
+    for code in codes.replace(',', ' ').split(' '):
+        code = code.strip()
+
+        if not code:
+            continue
+        elif '-' in code:
+            low, hi = code.split('-')[:2]
+            retval.update(str(i) for i in xrange(int(low), int(hi)))
+        else:
+            retval.add(code)
+    return retval
diff --git a/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/namespace_driver.py b/quantum/plugins/services/agent_loadbalancer/drivers/haproxy/namespace_driver.py
new file mode 100644 (file)
index 0000000..f4df283
--- /dev/null
@@ -0,0 +1,182 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Mark McClain, DreamHost
+import os
+import shutil
+import socket
+
+import netaddr
+
+from quantum.agent.linux import ip_lib
+from quantum.common import exceptions
+from quantum.openstack.common import log as logging
+from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
+    cfg as hacfg
+)
+
+LOG = logging.getLogger(__name__)
+NS_PREFIX = 'qlbaas-'
+
+
+class HaproxyNSDriver(object):
+    def __init__(self, root_helper, state_path, vif_driver, vip_plug_callback):
+        self.root_helper = root_helper
+        self.state_path = state_path
+        self.vif_driver = vif_driver
+        self.vip_plug_callback = vip_plug_callback
+        self.pool_to_port_id = {}
+
+    def create(self, logical_config):
+        pool_id = logical_config['pool']['id']
+        namespace = get_ns_name(pool_id)
+
+        self._plug(namespace, logical_config['vip']['port'])
+        self._spawn(logical_config)
+
+    def update(self, logical_config):
+        pool_id = logical_config['pool']['id']
+        pid_path = self._get_state_file_path(pool_id, 'pid')
+
+        extra_args = ['-sf']
+        extra_args.extend(p.strip() for p in open(pid_path, 'r'))
+        self._spawn(logical_config, extra_args)
+
+    def _spawn(self, logical_config, extra_cmd_args=()):
+        pool_id = logical_config['pool']['id']
+        namespace = get_ns_name(pool_id)
+        conf_path = self._get_state_file_path(pool_id, 'conf')
+        pid_path = self._get_state_file_path(pool_id, 'pid')
+        sock_path = self._get_state_file_path(pool_id, 'sock')
+
+        hacfg.save_config(conf_path, logical_config, sock_path)
+        cmd = ['haproxy', '-f', conf_path, '-p', pid_path]
+        cmd.extend(extra_cmd_args)
+
+        ns = ip_lib.IPWrapper(self.root_helper, namespace)
+        ns.netns.execute(cmd)
+
+        # remember the pool<>port mapping
+        self.pool_to_port_id[pool_id] = logical_config['vip']['port']['id']
+
+    def destroy(self, pool_id):
+        namespace = get_ns_name(pool_id)
+        ns = ip_lib.IPWrapper(self.root_helper, namespace)
+        pid_path = self._get_state_file_path(pool_id, 'pid')
+        sock_path = self._get_state_file_path(pool_id, 'sock')
+
+        # kill the process
+        kill_pids_in_file(ns, pid_path)
+
+        # unplug the ports
+        if pool_id in self.pool_to_port_id:
+            self._unplug(namespace, self.pool_to_port_id[pool_id])
+
+        # remove the configuration directory
+        conf_dir = os.path.dirname(self._get_state_file_path(pool_id, ''))
+        if os.path.isdir(conf_dir):
+            shutil.rmtree(conf_dir)
+        ns.garbage_collect_namespace()
+
+    def exists(self, pool_id):
+        namespace = get_ns_name(pool_id)
+        root_ns = ip_lib.IPWrapper(self.root_helper)
+
+        socket_path = self._get_state_file_path(pool_id, 'sock')
+        if root_ns.netns.exists(namespace) and os.path.exists(socket_path):
+            try:
+                s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+                s.connect(socket_path)
+                return True
+            except socket.error:
+                pass
+        return False
+
+    def get_stats(self, pool_id):
+        pass
+
+    def remove_orphans(self, known_pool_ids):
+        raise NotImplementedError()
+
+    def _get_state_file_path(self, pool_id, kind, ensure_state_dir=True):
+        """Returns the file name for a given kind of config file."""
+        confs_dir = os.path.abspath(os.path.normpath(self.state_path))
+        conf_dir = os.path.join(confs_dir, pool_id)
+        if ensure_state_dir:
+            if not os.path.isdir(conf_dir):
+                os.makedirs(conf_dir, 0755)
+        return os.path.join(conf_dir, kind)
+
+    def _plug(self, namespace, port, reuse_existing=True):
+        self.vip_plug_callback('plug', port)
+        interface_name = self.vif_driver.get_device_name(Wrap(port))
+
+        if ip_lib.device_exists(interface_name, self.root_helper, namespace):
+            if not reuse_existing:
+                raise exceptions.PreexistingDeviceFailure(
+                    dev_name=interface_name
+                )
+        else:
+            self.vif_driver.plug(
+                port['network_id'],
+                port['id'],
+                interface_name,
+                port['mac_address'],
+                namespace=namespace
+            )
+
+        cidrs = [
+            '%s/%s' % (ip['ip_address'],
+                       netaddr.IPNetwork(ip['subnet']['cidr']).prefixlen)
+            for ip in port['fixed_ips']
+        ]
+        self.vif_driver.init_l3(interface_name, cidrs, namespace=namespace)
+
+    def _unplug(self, namespace, port_id):
+        port_stub = {'id': port_id}
+        self.vip_plug_callback('unplug', port_stub)
+        interface_name = self.vif_driver.get_device_name(Wrap(port_stub))
+        self.vif_driver.unplug(interface_name, namespace=namespace)
+
+
+# NOTE (markmcclain) For compliance with interface.py which expects objects
+class Wrap(object):
+    """A light attribute wrapper for compatibility with the interface lib."""
+    def __init__(self, d):
+        self.__dict__.update(d)
+
+    def __getitem__(self, key):
+        return self.__dict__[key]
+
+
+def get_ns_name(namespace_id):
+    return NS_PREFIX + namespace_id
+
+
+def kill_pids_in_file(namespace_wrapper, pid_path):
+    if os.path.exists(pid_path):
+        with open(pid_path, 'r') as pids:
+            for pid in pids:
+                pid = pid.strip()
+                try:
+                    namespace_wrapper.netns.execute(
+                        ['kill', '-9', pid.strip()]
+                    )
+                except RuntimeError:
+                    LOG.exception(
+                        _('Unable to kill haproxy process: %s'),
+                        pid
+                    )
diff --git a/quantum/plugins/services/agent_loadbalancer/plugin.py b/quantum/plugins/services/agent_loadbalancer/plugin.py
new file mode 100644 (file)
index 0000000..3b7b48e
--- /dev/null
@@ -0,0 +1,338 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import uuid
+
+from oslo.config import cfg
+
+from quantum.common import exceptions as q_exc
+from quantum.common import rpc as q_rpc
+from quantum.common import topics
+from quantum.db import api as qdbapi
+from quantum.db.loadbalancer import loadbalancer_db
+from quantum.openstack.common import log as logging
+from quantum.openstack.common import rpc
+from quantum.openstack.common.rpc import proxy
+from quantum.plugins.common import constants
+
+LOG = logging.getLogger(__name__)
+
+ACTIVE_PENDING = (
+    constants.ACTIVE,
+    constants.PENDING_CREATE,
+    constants.PENDING_UPDATE
+)
+
+
+class LoadBalancerCallbacks(object):
+    RPC_API_VERSION = '1.0'
+
+    def __init__(self, plugin):
+        self.plugin = plugin
+
+    def create_rpc_dispatcher(self):
+        return q_rpc.PluginRpcDispatcher([self])
+
+    def get_ready_devices(self, context, host=None):
+        with context.session.begin(subtransactions=True):
+            qry = context.session.query(
+                loadbalancer_db.Vip, loadbalancer_db.Pool
+            )
+            qry = qry.filter(loadbalancer_db.Vip.status.in_(ACTIVE_PENDING))
+            qry = qry.filter(loadbalancer_db.Pool.status.in_(ACTIVE_PENDING))
+            up = True  # makes pep8 and sqlalchemy happy
+            qry = qry.filter(loadbalancer_db.Vip.admin_state_up == up)
+            qry = qry.filter(loadbalancer_db.Pool.admin_state_up == up)
+            return [p.id for v, p in qry.all()]
+
+    def get_logical_device(self, context, pool_id=None, activate=True,
+                           **kwargs):
+        with context.session.begin(subtransactions=True):
+            qry = context.session.query(loadbalancer_db.Pool)
+            qry = qry.filter_by(id=pool_id)
+            pool = qry.one()
+
+            if activate:
+                # set all resources to active
+                if pool.status in ACTIVE_PENDING:
+                    pool.status = constants.ACTIVE
+
+                if pool.vip.status in ACTIVE_PENDING:
+                    pool.vip.status = constants.ACTIVE
+
+                for m in pool.members:
+                    if m.status in ACTIVE_PENDING:
+                        m.status = constants.ACTIVE
+
+                for hm in pool.monitors:
+                    if hm.monitor.status in ACTIVE_PENDING:
+                        hm.monitor.status = constants.ACTIVE
+
+            if (pool.status != constants.ACTIVE
+                or pool.vip.status != constants.ACTIVE):
+                raise Exception(_('Expected active pool and vip'))
+
+            retval = {}
+            retval['pool'] = self.plugin._make_pool_dict(pool)
+            retval['vip'] = self.plugin._make_vip_dict(pool.vip)
+            retval['vip']['port'] = (
+                self.plugin._core_plugin._make_port_dict(pool.vip.port)
+            )
+            for fixed_ip in retval['vip']['port']['fixed_ips']:
+                fixed_ip['subnet'] = (
+                    self.plugin._core_plugin.get_subnet(
+                        context,
+                        fixed_ip['subnet_id']
+                    )
+                )
+            retval['members'] = [
+                self.plugin._make_member_dict(m)
+                for m in pool.members if m.status == constants.ACTIVE
+            ]
+            retval['healthmonitors'] = [
+                self.plugin._make_health_monitor_dict(hm.monitor)
+                for hm in pool.monitors
+                if hm.monitor.status == constants.ACTIVE
+            ]
+
+            return retval
+
+    def pool_destroyed(self, context, pool_id=None, host=None):
+        """Agent confirmation hook that a pool has been destroyed.
+
+           This method exists for subclasses to change the deletion
+           behavior.
+        """
+        pass
+
+    def plug_vip_port(self, context, port_id=None, host=None):
+        if not port_id:
+            return
+
+        port = self.plugin._core_plugin.get_port(
+            context,
+            port_id
+        )
+
+        port['admin_state_up'] = True
+        port['device_owner'] = 'quantum:' + constants.LOADBALANCER
+        port['device_id'] = str(uuid.uuid5(uuid.NAMESPACE_DNS, str(host)))
+
+        self.plugin._core_plugin.update_port(
+            context,
+            port_id,
+            {'port': port}
+        )
+
+    def unplug_vip_port(self, context, port_id=None, host=None):
+        if not port_id:
+            return
+
+        port = self.plugin._core_plugin.get_port(
+            context,
+            port_id
+        )
+
+        port['admin_state_up'] = False
+        port['device_owner'] = ''
+        port['device_id'] = ''
+
+        try:
+            self.plugin._core_plugin.update_port(
+                context,
+                port_id,
+                {'port': port}
+            )
+
+        except q_exc.PortNotFound:
+            msg = _('Unable to find port %s to unplug.  This can occur when '
+                    'the Vip has been deleted first.')
+            LOG.debug(msg, port_id)
+
+    def update_pool_stats(self, context, pool_id=None, stats=None, host=None):
+        # TODO (markmcclain): add stats collection
+        pass
+
+
+class LoadBalancerAgentApi(proxy.RpcProxy):
+    """Plugin side of plugin to agent RPC API."""
+
+    API_VERSION = '1.0'
+
+    def __init__(self, topic, host):
+        super(LoadBalancerAgentApi, self).__init__(topic, self.API_VERSION)
+        self.host = host
+
+    def reload_pool(self, context, pool_id):
+        return self.cast(
+            context,
+            self.make_msg('reload_pool', pool_id=pool_id, host=self.host),
+            topic=self.topic
+        )
+
+    def destroy_pool(self, context, pool_id):
+        return self.cast(
+            context,
+            self.make_msg('destroy_pool', pool_id=pool_id, host=self.host),
+            topic=self.topic
+        )
+
+    def modify_pool(self, context, pool_id):
+        return self.cast(
+            context,
+            self.make_msg('modify_pool', pool_id=pool_id, host=self.host),
+            topic=self.topic
+        )
+
+
+class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
+
+    """
+    Implementation of the Quantum Loadbalancer Service Plugin.
+
+    This class manages the workflow of LBaaS request/response.
+    Most DB related works are implemented in class
+    loadbalancer_db.LoadBalancerPluginDb.
+    """
+    supported_extension_aliases = ["lbaas"]
+
+    def __init__(self):
+        """
+        Do the initialization for the loadbalancer service plugin here.
+        """
+        qdbapi.register_models()
+
+        self.callbacks = LoadBalancerCallbacks(self)
+
+        self.conn = rpc.create_connection(new=True)
+        self.conn.create_consumer(
+            topics.LOADBALANCER_PLUGIN,
+            self.callbacks.create_rpc_dispatcher(),
+            fanout=False)
+        self.conn.consume_in_thread()
+
+        self.agent_rpc = LoadBalancerAgentApi(
+            topics.LOADBALANCER_AGENT,
+            cfg.CONF.host
+        )
+
+    def get_plugin_type(self):
+        return constants.LOADBALANCER
+
+    def get_plugin_description(self):
+        return "Quantum LoadBalancer Service Plugin"
+
+    def create_vip(self, context, vip):
+        vip['vip']['status'] = constants.PENDING_CREATE
+        v = super(LoadBalancerPlugin, self).create_vip(context, vip)
+        self.agent_rpc.reload_pool(context, v['pool_id'])
+        return v
+
+    def update_vip(self, context, id, vip):
+        if 'status' not in vip['vip']:
+            vip['vip']['status'] = constants.PENDING_UPDATE
+        v = super(LoadBalancerPlugin, self).update_vip(context, id, vip)
+        if v['status'] in ACTIVE_PENDING:
+            self.agent_rpc.reload_pool(context, v['pool_id'])
+        else:
+            self.agent_rpc.destroy_pool(context, v['pool_id'])
+        return v
+
+    def delete_vip(self, context, id):
+        vip = self.get_vip(context, id)
+        super(LoadBalancerPlugin, self).delete_vip(context, id)
+        self.agent_rpc.destroy_pool(context, vip['pool_id'])
+        pass
+
+    def create_pool(self, context, pool):
+        p = super(LoadBalancerPlugin, self).create_pool(context, pool)
+        # don't notify here because a pool needs a vip to be useful
+        return p
+
+    def update_pool(self, context, id, pool):
+        if 'status' not in pool['pool']:
+            pool['pool']['status'] = constants.PENDING_UPDATE
+        p = super(LoadBalancerPlugin, self).update_pool(context, id, pool)
+        if p['status'] in ACTIVE_PENDING:
+            self.agent_rpc.reload_pool(context, p['id'])
+        else:
+            self.agent_rpc.destroy_pool(context, p['id'])
+        return p
+
+    def delete_pool(self, context, id):
+        super(LoadBalancerPlugin, self).delete_pool(context, id)
+        self.agent_rpc.destroy_pool(context, id)
+
+    def create_member(self, context, member):
+        m = super(LoadBalancerPlugin, self).create_member(context, member)
+        self.agent_rpc.modify_pool(context, m['pool_id'])
+        return m
+
+    def update_member(self, context, id, member):
+        if 'status' not in member['member']:
+            member['member']['status'] = constants.PENDING_UPDATE
+        m = super(LoadBalancerPlugin, self).update_member(context, id, member)
+        self.agent_rpc.modify_pool(context, m['pool_id'])
+        return m
+
+    def delete_member(self, context, id):
+        m = self.get_member(context, id)
+        super(LoadBalancerPlugin, self).delete_member(context, id)
+        self.agent_rpc.modify_pool(context, m['pool_id'])
+
+    def update_health_monitor(self, context, id, health_monitor):
+        if 'status' not in health_monitor['health_monitor']:
+            health_monitor['health_monitor']['status'] = (
+                constants.PENDING_UPDATE
+            )
+        hm = super(LoadBalancerPlugin, self).update_health_monitor(
+            context,
+            id,
+            health_monitor
+        )
+
+        with context.session.begin(subtransactions=True):
+            qry = context.session.query(
+                loadbalancer_db.PoolMonitorAssociation
+            )
+            qry = qry.filter_by(monitor_id=hm['id'])
+
+            for assoc in qry.all():
+                self.agent_rpc.modify_pool(context, assoc['pool_id'])
+        return hm
+
+    def delete_health_monitor(self, context, id):
+        with context.session.begin(subtransactions=True):
+            qry = context.session.query(
+                loadbalancer_db.PoolMonitorAssociation
+            )
+            qry = qry.filter_by(monitor_id=id)
+
+            pool_ids = [a['pool_id'] for a in qry.all()]
+            super(LoadBalancerPlugin, self).delete_health_monitor(context, id)
+        for pid in pool_ids:
+            self.agent_rpc.modify_pool(context, pid)
+
+    def create_pool_health_monitor(self, context, health_monitor, pool_id):
+        retval = super(LoadBalancerPlugin, self).create_pool_health_monitor(
+            context,
+            health_monitor,
+            pool_id
+        )
+        self.agent_rpc.modify_pool(context, pool_id)
+
+        return retval
diff --git a/quantum/plugins/services/loadbalancer/loadbalancerPlugin.py b/quantum/plugins/services/loadbalancer/loadbalancerPlugin.py
deleted file mode 100644 (file)
index 27a4377..0000000
+++ /dev/null
@@ -1,252 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2013 OpenStack LLC.
-# All Rights Reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-
-from quantum.db import api as qdbapi
-from quantum.db import model_base
-from quantum.db.loadbalancer import loadbalancer_db
-from quantum.extensions import loadbalancer
-from quantum.openstack.common import log as logging
-from quantum.plugins.common import constants
-
-LOG = logging.getLogger(__name__)
-
-
-class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
-
-    """
-    Implementation of the Quantum Loadbalancer Service Plugin.
-
-    This class manages the workflow of LBaaS request/response.
-    Most DB related works are implemented in class
-    loadbalancer_db.LoadBalancerPluginDb.
-    """
-    supported_extension_aliases = ["lbaas"]
-
-    def __init__(self):
-        """
-        Do the initialization for the loadbalancer service plugin here.
-        """
-        qdbapi.register_models(base=model_base.BASEV2)
-
-        # TODO: we probably need to setup RPC channel (to talk to LbAgent) here
-
-    def get_plugin_type(self):
-        return constants.LOADBALANCER
-
-    def get_plugin_description(self):
-        return "Quantum LoadBalancer Service Plugin"
-
-    def create_vip(self, context, vip):
-        v = super(LoadBalancerPlugin, self).create_vip(context, vip)
-        self.update_status(context, loadbalancer_db.Vip, v['id'],
-                           constants.PENDING_CREATE)
-        LOG.debug(_("Create vip: %s"), v['id'])
-
-        # If we adopt asynchronous mode, this method should return immediately
-        # and let client to query the object status. The plugin will listen on
-        # the event from device and update the object status by calling
-        # self.update_state(context, Vip, id, ACTIVE/ERROR)
-        #
-        # In synchronous mode, send the request to device here and wait for
-        # response. Eventually update the object status prior to the return.
-        v_query = self.get_vip(context, v['id'])
-        return v_query
-
-    def update_vip(self, context, id, vip):
-        v_query = self.get_vip(
-            context, id, fields=["status"])
-        if v_query['status'] in [
-            constants.PENDING_DELETE, constants.ERROR]:
-            raise loadbalancer.StateInvalid(id=id,
-                                            state=v_query['status'])
-
-        v = super(LoadBalancerPlugin, self).update_vip(context, id, vip)
-        self.update_status(context, loadbalancer_db.Vip, id,
-                           constants.PENDING_UPDATE)
-        LOG.debug(_("Update vip: %s"), id)
-
-        # TODO notify lbagent
-        v_rt = self.get_vip(context, id)
-        return v_rt
-
-    def delete_vip(self, context, id):
-        self.update_status(context, loadbalancer_db.Vip, id,
-                           constants.PENDING_DELETE)
-        LOG.debug(_("Delete vip: %s"), id)
-
-        # TODO notify lbagent
-        super(LoadBalancerPlugin, self).delete_vip(context, id)
-
-    def get_vip(self, context, id, fields=None):
-        res = super(LoadBalancerPlugin, self).get_vip(context, id, fields)
-        LOG.debug(_("Get vip: %s"), id)
-        return res
-
-    def get_vips(self, context, filters=None, fields=None):
-        res = super(LoadBalancerPlugin, self).get_vips(
-            context, filters, fields)
-        LOG.debug(_("Get vips"))
-        return res
-
-    def create_pool(self, context, pool):
-        p = super(LoadBalancerPlugin, self).create_pool(context, pool)
-        self.update_status(context, loadbalancer_db.Pool, p['id'],
-                           constants.PENDING_CREATE)
-        LOG.debug(_("Create pool: %s"), p['id'])
-
-        # TODO notify lbagent
-        p_rt = self.get_pool(context, p['id'])
-        return p_rt
-
-    def update_pool(self, context, id, pool):
-        p_query = self.get_pool(context, id, fields=["status"])
-        if p_query['status'] in [
-            constants.PENDING_DELETE, constants.ERROR]:
-            raise loadbalancer.StateInvalid(id=id,
-                                            state=p_query['status'])
-        p = super(LoadBalancerPlugin, self).update_pool(context, id, pool)
-        LOG.debug(_("Update pool: %s"), p['id'])
-        # TODO notify lbagent
-        p_rt = self.get_pool(context, id)
-        return p_rt
-
-    def delete_pool(self, context, id):
-        self.update_status(context, loadbalancer_db.Pool, id,
-                           constants.PENDING_DELETE)
-        # TODO notify lbagent
-        super(LoadBalancerPlugin, self).delete_pool(context, id)
-        LOG.debug(_("Delete pool: %s"), id)
-
-    def get_pool(self, context, id, fields=None):
-        res = super(LoadBalancerPlugin, self).get_pool(context, id, fields)
-        LOG.debug(_("Get pool: %s"), id)
-        return res
-
-    def get_pools(self, context, filters=None, fields=None):
-        res = super(LoadBalancerPlugin, self).get_pools(
-            context, filters, fields)
-        LOG.debug(_("Get Pools"))
-        return res
-
-    def stats(self, context, pool_id):
-        res = super(LoadBalancerPlugin, self).get_stats(context, pool_id)
-        LOG.debug(_("Get stats of Pool: %s"), pool_id)
-        return res
-
-    def create_pool_health_monitor(self, context, health_monitor, pool_id):
-        m = super(LoadBalancerPlugin, self).create_pool_health_monitor(
-            context, health_monitor, pool_id)
-        LOG.debug(_("Create health_monitor of pool: %s"), pool_id)
-        return m
-
-    def get_pool_health_monitor(self, context, id, pool_id, fields=None):
-        m = super(LoadBalancerPlugin, self).get_pool_health_monitor(
-            context, id, pool_id, fields)
-        LOG.debug(_("Get health_monitor of pool: %s"), pool_id)
-        return m
-
-    def delete_pool_health_monitor(self, context, id, pool_id):
-        super(LoadBalancerPlugin, self).delete_pool_health_monitor(
-            context, id, pool_id)
-        LOG.debug(_("Delete health_monitor %(id)s of pool: %(pool_id)s"),
-                  {"id": id, "pool_id": pool_id})
-
-    def get_member(self, context, id, fields=None):
-        res = super(LoadBalancerPlugin, self).get_member(
-            context, id, fields)
-        LOG.debug(_("Get member: %s"), id)
-        return res
-
-    def get_members(self, context, filters=None, fields=None):
-        res = super(LoadBalancerPlugin, self).get_members(
-            context, filters, fields)
-        LOG.debug(_("Get members"))
-        return res
-
-    def create_member(self, context, member):
-        m = super(LoadBalancerPlugin, self).create_member(context, member)
-        self.update_status(context, loadbalancer_db.Member, m['id'],
-                           constants.PENDING_CREATE)
-        LOG.debug(_("Create member: %s"), m['id'])
-        # TODO notify lbagent
-        m_rt = self.get_member(context, m['id'])
-        return m_rt
-
-    def update_member(self, context, id, member):
-        m_query = self.get_member(context, id, fields=["status"])
-        if m_query['status'] in [
-            constants.PENDING_DELETE, constants.ERROR]:
-            raise loadbalancer.StateInvalid(id=id,
-                                            state=m_query['status'])
-        m = super(LoadBalancerPlugin, self).update_member(context, id, member)
-        self.update_status(context, loadbalancer_db.Member, id,
-                           constants.PENDING_UPDATE)
-        LOG.debug(_("Update member: %s"), m['id'])
-        # TODO notify lbagent
-        m_rt = self.get_member(context, id)
-        return m_rt
-
-    def delete_member(self, context, id):
-        self.update_status(context, loadbalancer_db.Member, id,
-                           constants.PENDING_DELETE)
-        LOG.debug(_("Delete member: %s"), id)
-        # TODO notify lbagent
-        super(LoadBalancerPlugin, self).delete_member(context, id)
-
-    def get_health_monitor(self, context, id, fields=None):
-        res = super(LoadBalancerPlugin, self).get_health_monitor(
-            context, id, fields)
-        LOG.debug(_("Get health_monitor: %s"), id)
-        return res
-
-    def get_health_monitors(self, context, filters=None, fields=None):
-        res = super(LoadBalancerPlugin, self).get_health_monitors(
-            context, filters, fields)
-        LOG.debug(_("Get health_monitors"))
-        return res
-
-    def create_health_monitor(self, context, health_monitor):
-        h = super(LoadBalancerPlugin, self).create_health_monitor(
-            context, health_monitor)
-        self.update_status(context, loadbalancer_db.HealthMonitor, h['id'],
-                           constants.PENDING_CREATE)
-        LOG.debug(_("Create health_monitor: %s"), h['id'])
-        # TODO notify lbagent
-        h_rt = self.get_health_monitor(context, h['id'])
-        return h_rt
-
-    def update_health_monitor(self, context, id, health_monitor):
-        h_query = self.get_health_monitor(context, id, fields=["status"])
-        if h_query['status'] in [
-            constants.PENDING_DELETE, constants.ERROR]:
-            raise loadbalancer.StateInvalid(id=id,
-                                            state=h_query['status'])
-        h = super(LoadBalancerPlugin, self).update_health_monitor(
-            context, id, health_monitor)
-        self.update_status(context, loadbalancer_db.HealthMonitor, id,
-                           constants.PENDING_UPDATE)
-        LOG.debug(_("Update health_monitor: %s"), h['id'])
-        # TODO notify lbagent
-        h_rt = self.get_health_monitor(context, id)
-        return h_rt
-
-    def delete_health_monitor(self, context, id):
-        self.update_status(context, loadbalancer_db.HealthMonitor, id,
-                           constants.PENDING_DELETE)
-        LOG.debug(_("Delete health_monitor: %s"), id)
-        super(LoadBalancerPlugin, self).delete_health_monitor(context, id)
index f866377f505a1f19b0e5c90dd42ad462b01b2b1c..a59992841a41a1718ca4157e4034f5e017dfd91f 100644 (file)
@@ -34,7 +34,9 @@ import quantum.extensions
 from quantum.extensions import loadbalancer
 from quantum.manager import QuantumManager
 from quantum.plugins.common import constants
-from quantum.plugins.services.loadbalancer import loadbalancerPlugin
+from quantum.plugins.services.agent_loadbalancer import (
+    plugin as loadbalancer_plugin
+)
 from quantum.tests.unit import test_db_plugin
 from quantum.tests.unit import test_extensions
 from quantum.tests.unit import testlib_api
@@ -46,8 +48,7 @@ LOG = logging.getLogger(__name__)
 
 DB_CORE_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2'
 DB_LB_PLUGIN_KLASS = (
-    "quantum.plugins.services.loadbalancer."
-    "loadbalancerPlugin.LoadBalancerPlugin"
+    "quantum.plugins.services.agent_loadbalancer.plugin.LoadBalancerPlugin"
 )
 ROOTDIR = os.path.dirname(__file__) + '../../../..'
 ETCDIR = os.path.join(ROOTDIR, 'etc')
@@ -74,7 +75,7 @@ class LoadBalancerPluginDbTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
 
         self._subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14"
 
-        plugin = loadbalancerPlugin.LoadBalancerPlugin()
+        plugin = loadbalancer_plugin.LoadBalancerPlugin()
         ext_mgr = PluginAwareExtensionManager(
             extensions_path,
             {constants.LOADBALANCER: plugin}
diff --git a/quantum/tests/unit/services/__init__.py b/quantum/tests/unit/services/__init__.py
new file mode 100644 (file)
index 0000000..ce18bf6
--- /dev/null
@@ -0,0 +1,17 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Mark McClain, DreamHost
diff --git a/quantum/tests/unit/services/agent_loadbalancer/__init__.py b/quantum/tests/unit/services/agent_loadbalancer/__init__.py
new file mode 100644 (file)
index 0000000..ce18bf6
--- /dev/null
@@ -0,0 +1,17 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Mark McClain, DreamHost
diff --git a/quantum/tests/unit/services/agent_loadbalancer/agent/__init__.py b/quantum/tests/unit/services/agent_loadbalancer/agent/__init__.py
new file mode 100644 (file)
index 0000000..ce18bf6
--- /dev/null
@@ -0,0 +1,17 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Mark McClain, DreamHost
diff --git a/quantum/tests/unit/services/agent_loadbalancer/agent/test_api.py b/quantum/tests/unit/services/agent_loadbalancer/agent/test_api.py
new file mode 100644 (file)
index 0000000..2d5a2ee
--- /dev/null
@@ -0,0 +1,135 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Mark McClain, DreamHost
+
+import mock
+import testtools
+
+from quantum.plugins.services.agent_loadbalancer.agent import api
+
+
+class TestApiCache(testtools.TestCase):
+    def setUp(self):
+        super(TestApiCache, self).setUp()
+        self.addCleanup(mock.patch.stopall)
+
+        self.api = api.LbaasAgentApi('topic', mock.sentinel.context, 'host')
+        self.make_msg = mock.patch.object(self.api, 'make_msg').start()
+        self.mock_call = mock.patch.object(self.api, 'call').start()
+
+    def test_init(self):
+        self.assertEqual(self.api.host, 'host')
+        self.assertEqual(self.api.context, mock.sentinel.context)
+
+    def test_get_ready_devices(self):
+        self.assertEqual(
+            self.api.get_ready_devices(),
+            self.mock_call.return_value
+        )
+
+        self.make_msg.assert_called_once_with('get_ready_devices', host='host')
+        self.mock_call.assert_called_once_with(
+            mock.sentinel.context,
+            self.make_msg.return_value,
+            topic='topic'
+        )
+
+    def test_get_logical_device(self):
+        self.assertEqual(
+            self.api.get_logical_device('pool_id'),
+            self.mock_call.return_value
+        )
+
+        self.make_msg.assert_called_once_with(
+            'get_logical_device',
+            pool_id='pool_id',
+            host='host')
+
+        self.mock_call.assert_called_once_with(
+            mock.sentinel.context,
+            self.make_msg.return_value,
+            topic='topic'
+        )
+
+    def test_pool_destroyed(self):
+        self.assertEqual(
+            self.api.pool_destroyed('pool_id'),
+            self.mock_call.return_value
+        )
+
+        self.make_msg.assert_called_once_with(
+            'pool_destroyed',
+            pool_id='pool_id',
+            host='host')
+
+        self.mock_call.assert_called_once_with(
+            mock.sentinel.context,
+            self.make_msg.return_value,
+            topic='topic'
+        )
+
+    def test_plug_vip_port(self):
+        self.assertEqual(
+            self.api.plug_vip_port('port_id'),
+            self.mock_call.return_value
+        )
+
+        self.make_msg.assert_called_once_with(
+            'plug_vip_port',
+            port_id='port_id',
+            host='host')
+
+        self.mock_call.assert_called_once_with(
+            mock.sentinel.context,
+            self.make_msg.return_value,
+            topic='topic'
+        )
+
+    def test_unplug_vip_port(self):
+        self.assertEqual(
+            self.api.unplug_vip_port('port_id'),
+            self.mock_call.return_value
+        )
+
+        self.make_msg.assert_called_once_with(
+            'unplug_vip_port',
+            port_id='port_id',
+            host='host')
+
+        self.mock_call.assert_called_once_with(
+            mock.sentinel.context,
+            self.make_msg.return_value,
+            topic='topic'
+        )
+
+    def test_update_pool_stats(self):
+        self.assertEqual(
+            self.api.update_pool_stats('pool_id', {'stat': 'stat'}),
+            self.mock_call.return_value
+        )
+
+        self.make_msg.assert_called_once_with(
+            'update_pool_stats',
+            pool_id='pool_id',
+            stats={'stat': 'stat'},
+            host='host')
+
+        self.mock_call.assert_called_once_with(
+            mock.sentinel.context,
+            self.make_msg.return_value,
+            topic='topic'
+        )
diff --git a/quantum/tests/unit/services/agent_loadbalancer/agent/test_init.py b/quantum/tests/unit/services/agent_loadbalancer/agent/test_init.py
new file mode 100644 (file)
index 0000000..f2e9f22
--- /dev/null
@@ -0,0 +1,55 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Mark McClain, DreamHost
+
+import contextlib
+import mock
+from oslo.config import cfg
+import testtools
+
+from quantum.plugins.services.agent_loadbalancer import agent
+
+
+class TestLbaasService(testtools.TestCase):
+    def setUp(self):
+        super(TestLbaasService, self).setUp()
+        self.addCleanup(cfg.CONF.reset)
+
+        cfg.CONF.register_opts(agent.OPTS)
+
+    def test_start(self):
+        with mock.patch.object(
+            agent.rpc_service.Service, 'start'
+        ) as mock_start:
+
+            mgr = mock.Mock()
+            agent_service = agent.LbaasAgentService('host', 'topic', mgr)
+            agent_service.start()
+
+            self.assertTrue(mock_start.called)
+
+    def test_main(self):
+        with contextlib.nested(
+            mock.patch.object(agent.service, 'launch'),
+            mock.patch.object(agent, 'eventlet'),
+            mock.patch('sys.argv'),
+            mock.patch.object(agent.manager, 'LbaasAgentManager')
+        ) as (mock_launch, mock_eventlet, sys_argv, mgr_cls):
+            agent.main()
+
+            self.assertTrue(mock_eventlet.monkey_patch.called)
+            mock_launch.assert_called_once_with(mock.ANY)
diff --git a/quantum/tests/unit/services/agent_loadbalancer/agent/test_manager.py b/quantum/tests/unit/services/agent_loadbalancer/agent/test_manager.py
new file mode 100644 (file)
index 0000000..b025809
--- /dev/null
@@ -0,0 +1,365 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Mark McClain, DreamHost
+
+import contextlib
+
+import mock
+import testtools
+
+from quantum.plugins.services.agent_loadbalancer.agent import manager
+
+
+class TestLogicalDeviceCache(testtools.TestCase):
+    def setUp(self):
+        super(TestLogicalDeviceCache, self).setUp()
+        self.cache = manager.LogicalDeviceCache()
+
+    def test_put(self):
+        fake_device = {
+            'vip': {'port_id': 'port_id'},
+            'pool': {'id': 'pool_id'}
+        }
+        self.cache.put(fake_device)
+
+        self.assertEqual(len(self.cache.devices), 1)
+        self.assertEqual(len(self.cache.port_lookup), 1)
+        self.assertEqual(len(self.cache.pool_lookup), 1)
+
+    def test_double_put(self):
+        fake_device = {
+            'vip': {'port_id': 'port_id'},
+            'pool': {'id': 'pool_id'}
+        }
+        self.cache.put(fake_device)
+        self.cache.put(fake_device)
+
+        self.assertEqual(len(self.cache.devices), 1)
+        self.assertEqual(len(self.cache.port_lookup), 1)
+        self.assertEqual(len(self.cache.pool_lookup), 1)
+
+    def test_remove_in_cache(self):
+        fake_device = {
+            'vip': {'port_id': 'port_id'},
+            'pool': {'id': 'pool_id'}
+        }
+        self.cache.put(fake_device)
+
+        self.assertEqual(len(self.cache.devices), 1)
+
+        self.cache.remove(fake_device)
+
+        self.assertFalse(len(self.cache.devices))
+        self.assertFalse(self.cache.port_lookup)
+        self.assertFalse(self.cache.pool_lookup)
+
+    def test_remove_in_cache_same_object(self):
+        fake_device = {
+            'vip': {'port_id': 'port_id'},
+            'pool': {'id': 'pool_id'}
+        }
+        self.cache.put(fake_device)
+
+        self.assertEqual(len(self.cache.devices), 1)
+
+        self.cache.remove(set(self.cache.devices).pop())
+
+        self.assertFalse(len(self.cache.devices))
+        self.assertFalse(self.cache.port_lookup)
+        self.assertFalse(self.cache.pool_lookup)
+
+    def test_remove_by_pool_id(self):
+        fake_device = {
+            'vip': {'port_id': 'port_id'},
+            'pool': {'id': 'pool_id'}
+        }
+        self.cache.put(fake_device)
+
+        self.assertEqual(len(self.cache.devices), 1)
+
+        self.cache.remove_by_pool_id('pool_id')
+
+        self.assertFalse(len(self.cache.devices))
+        self.assertFalse(self.cache.port_lookup)
+        self.assertFalse(self.cache.pool_lookup)
+
+    def test_get_by_pool_id(self):
+        fake_device = {
+            'vip': {'port_id': 'port_id'},
+            'pool': {'id': 'pool_id'}
+        }
+        self.cache.put(fake_device)
+
+        dev = self.cache.get_by_pool_id('pool_id')
+
+        self.assertEqual(dev.pool_id, 'pool_id')
+        self.assertEqual(dev.port_id, 'port_id')
+
+    def test_get_by_port_id(self):
+        fake_device = {
+            'vip': {'port_id': 'port_id'},
+            'pool': {'id': 'pool_id'}
+        }
+        self.cache.put(fake_device)
+
+        dev = self.cache.get_by_port_id('port_id')
+
+        self.assertEqual(dev.pool_id, 'pool_id')
+        self.assertEqual(dev.port_id, 'port_id')
+
+    def test_get_pool_ids(self):
+        fake_device = {
+            'vip': {'port_id': 'port_id'},
+            'pool': {'id': 'pool_id'}
+        }
+        self.cache.put(fake_device)
+
+        self.assertEqual(self.cache.get_pool_ids(), ['pool_id'])
+
+
+class TestManager(testtools.TestCase):
+    def setUp(self):
+        super(TestManager, self).setUp()
+        self.addCleanup(mock.patch.stopall)
+
+        mock_conf = mock.Mock()
+        mock_conf.interface_driver = 'intdriver'
+        mock_conf.device_driver = 'devdriver'
+        mock_conf.AGENT.root_helper = 'sudo'
+        mock_conf.loadbalancer_state_path = '/the/path'
+
+        self.mock_importer = mock.patch.object(manager, 'importutils').start()
+
+        rpc_mock_cls = mock.patch(
+            'quantum.plugins.services.agent_loadbalancer.agent.api'
+            '.LbaasAgentApi'
+        ).start()
+
+        self.mgr = manager.LbaasAgentManager(mock_conf)
+        self.rpc_mock = rpc_mock_cls.return_value
+        self.log = mock.patch.object(manager, 'LOG').start()
+        self.mgr.needs_resync = False
+
+    def test_initialize_service_hook(self):
+        with mock.patch.object(self.mgr, 'sync_state') as sync:
+            self.mgr.initialize_service_hook(mock.Mock())
+            sync.assert_called_once_with()
+
+    def test_periodic_resync_needs_sync(self):
+        with mock.patch.object(self.mgr, 'sync_state') as sync:
+            self.mgr.needs_resync = True
+            self.mgr.periodic_resync(mock.Mock())
+            sync.assert_called_once_with()
+
+    def test_periodic_resync_no_sync(self):
+        with mock.patch.object(self.mgr, 'sync_state') as sync:
+            self.mgr.needs_resync = False
+            self.mgr.periodic_resync(mock.Mock())
+            self.assertFalse(sync.called)
+
+    def test_collect_stats(self):
+        with mock.patch.object(self.mgr, 'cache') as cache:
+            cache.get_pool_ids.return_value = ['1', '2']
+            self.mgr.collect_stats(mock.Mock())
+            self.rpc_mock.update_pool_stats.assert_has_calls([
+                mock.call('1', mock.ANY),
+                mock.call('2', mock.ANY)
+            ])
+
+    def test_collect_stats_exception(self):
+        with mock.patch.object(self.mgr, 'cache') as cache:
+            cache.get_pool_ids.return_value = ['1', '2']
+            with mock.patch.object(self.mgr, 'driver') as driver:
+                driver.get_stats.side_effect = Exception
+
+                self.mgr.collect_stats(mock.Mock())
+
+                self.assertFalse(self.rpc_mock.called)
+                self.assertTrue(self.mgr.needs_resync)
+                self.assertTrue(self.log.exception.called)
+
+    def test_vip_plug_callback(self):
+        self.mgr._vip_plug_callback('plug', {'id': 'id'})
+        self.rpc_mock.plug_vip_port.assert_called_once_with('id')
+
+    def test_vip_unplug_callback(self):
+        self.mgr._vip_plug_callback('unplug', {'id': 'id'})
+        self.rpc_mock.unplug_vip_port.assert_called_once_with('id')
+
+    def _sync_state_helper(self, cache, ready, refreshed, destroyed):
+        with contextlib.nested(
+            mock.patch.object(self.mgr, 'cache'),
+            mock.patch.object(self.mgr, 'refresh_device'),
+            mock.patch.object(self.mgr, 'destroy_device')
+        ) as (mock_cache, refresh, destroy):
+
+            mock_cache.get_pool_ids.return_value = cache
+            self.rpc_mock.get_ready_devices.return_value = ready
+
+            self.mgr.sync_state()
+
+            self.assertEqual(len(refreshed), len(refresh.mock_calls))
+            self.assertEqual(len(destroyed), len(destroy.mock_calls))
+
+            refresh.assert_has_calls([mock.call(i) for i in refreshed])
+            destroy.assert_has_calls([mock.call(i) for i in destroyed])
+            self.assertFalse(self.mgr.needs_resync)
+
+    def test_sync_state_all_known(self):
+        self._sync_state_helper(['1', '2'], ['1', '2'], ['1', '2'], [])
+
+    def test_sync_state_all_unknown(self):
+        self._sync_state_helper([], ['1', '2'], ['1', '2'], [])
+
+    def test_sync_state_destroy_all(self):
+        self._sync_state_helper(['1', '2'], [], [], ['1', '2'])
+
+    def test_sync_state_both(self):
+        self._sync_state_helper(['1'], ['2'], ['2'], ['1'])
+
+    def test_sync_state_exception(self):
+        self.rpc_mock.get_ready_devices.side_effect = Exception
+
+        self.mgr.sync_state()
+
+        self.assertTrue(self.log.exception.called)
+        self.assertTrue(self.mgr.needs_resync)
+
+    def test_refresh_device_exists(self):
+        config = self.rpc_mock.get_logical_device.return_value
+
+        with mock.patch.object(self.mgr, 'driver') as driver:
+            with mock.patch.object(self.mgr, 'cache') as cache:
+                driver.exists.return_value = True
+
+                self.mgr.refresh_device(config)
+
+                driver.exists.assert_called_once_with(config)
+                driver.update.assert_called_once_with(config)
+                cache.put.assert_called_once_with(config)
+                self.assertFalse(self.mgr.needs_resync)
+
+    def test_refresh_device_new(self):
+        config = self.rpc_mock.get_logical_device.return_value
+
+        with mock.patch.object(self.mgr, 'driver') as driver:
+            with mock.patch.object(self.mgr, 'cache') as cache:
+                driver.exists.return_value = False
+
+                self.mgr.refresh_device(config)
+
+                driver.exists.assert_called_once_with(config)
+                driver.create.assert_called_once_with(config)
+                cache.put.assert_called_once_with(config)
+                self.assertFalse(self.mgr.needs_resync)
+
+    def test_refresh_device_exception(self):
+        config = self.rpc_mock.get_logical_device.return_value
+
+        with mock.patch.object(self.mgr, 'driver') as driver:
+            with mock.patch.object(self.mgr, 'cache') as cache:
+                driver.exists.side_effect = Exception
+                self.mgr.refresh_device(config)
+
+                driver.exists.assert_called_once_with(config)
+                self.assertTrue(self.mgr.needs_resync)
+                self.assertTrue(self.log.exception.called)
+                self.assertFalse(cache.put.called)
+
+    def test_destroy_device_known(self):
+        with mock.patch.object(self.mgr, 'driver') as driver:
+            with mock.patch.object(self.mgr, 'cache') as cache:
+                cache.get_by_pool_id.return_value = True
+
+                self.mgr.destroy_device('pool_id')
+                cache.get_by_pool_id.assert_called_once_with('pool_id')
+                driver.destroy.assert_called_once_with('pool_id')
+                self.rpc_mock.pool_destroyed.assert_called_once_with(
+                    'pool_id'
+                )
+                cache.remove.assert_called_once_with(True)
+                self.assertFalse(self.mgr.needs_resync)
+
+    def test_destroy_device_unknown(self):
+        with mock.patch.object(self.mgr, 'driver') as driver:
+            with mock.patch.object(self.mgr, 'cache') as cache:
+                cache.get_by_pool_id.return_value = None
+
+                self.mgr.destroy_device('pool_id')
+                cache.get_by_pool_id.assert_called_once_with('pool_id')
+                self.assertFalse(driver.destroy.called)
+
+    def test_destroy_device_exception(self):
+        with mock.patch.object(self.mgr, 'driver') as driver:
+            with mock.patch.object(self.mgr, 'cache') as cache:
+                cache.get_by_pool_id.return_value = True
+                driver.destroy.side_effect = Exception
+
+                self.mgr.destroy_device('pool_id')
+                cache.get_by_pool_id.assert_called_once_with('pool_id')
+
+                self.assertTrue(self.log.exception.called)
+                self.assertTrue(self.mgr.needs_resync)
+
+    def test_remove_orphans(self):
+        with mock.patch.object(self.mgr, 'driver') as driver:
+            with mock.patch.object(self.mgr, 'cache') as cache:
+                cache.get_pool_ids.return_value = ['1', '2']
+                self.mgr.remove_orphans()
+
+                driver.remove_orphans.assert_called_once_with(['1', '2'])
+
+    def test_reload_pool(self):
+        with mock.patch.object(self.mgr, 'refresh_device') as refresh:
+            self.mgr.reload_pool(mock.Mock(), pool_id='pool_id')
+            refresh.assert_called_once_with('pool_id')
+
+    def test_modify_pool_known(self):
+        with mock.patch.object(self.mgr, 'refresh_device') as refresh:
+            with mock.patch.object(self.mgr, 'cache') as cache:
+                cache.get_by_pool_id.return_value = True
+
+                self.mgr.reload_pool(mock.Mock(), pool_id='pool_id')
+
+                refresh.assert_called_once_with('pool_id')
+
+    def test_modify_pool_unknown(self):
+        with mock.patch.object(self.mgr, 'refresh_device') as refresh:
+            with mock.patch.object(self.mgr, 'cache') as cache:
+                cache.get_by_pool_id.return_value = False
+
+                self.mgr.modify_pool(mock.Mock(), pool_id='pool_id')
+
+                self.assertFalse(refresh.called)
+
+    def test_destroy_pool_known(self):
+        with mock.patch.object(self.mgr, 'destroy_device') as destroy:
+            with mock.patch.object(self.mgr, 'cache') as cache:
+                cache.get_by_pool_id.return_value = True
+
+                self.mgr.destroy_pool(mock.Mock(), pool_id='pool_id')
+
+                destroy.assert_called_once_with('pool_id')
+
+    def test_destroy_pool_unknown(self):
+        with mock.patch.object(self.mgr, 'destroy_device') as destroy:
+            with mock.patch.object(self.mgr, 'cache') as cache:
+                cache.get_by_pool_id.return_value = False
+
+                self.mgr.destroy_pool(mock.Mock(), pool_id='pool_id')
+
+                self.assertFalse(destroy.called)
diff --git a/quantum/tests/unit/services/agent_loadbalancer/driver/__init__.py b/quantum/tests/unit/services/agent_loadbalancer/driver/__init__.py
new file mode 100644 (file)
index 0000000..ce18bf6
--- /dev/null
@@ -0,0 +1,17 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Mark McClain, DreamHost
diff --git a/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/__init__.py b/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/__init__.py
new file mode 100644 (file)
index 0000000..ce18bf6
--- /dev/null
@@ -0,0 +1,17 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Mark McClain, DreamHost
diff --git a/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_namespace_driver.py b/quantum/tests/unit/services/agent_loadbalancer/driver/haproxy/test_namespace_driver.py
new file mode 100644 (file)
index 0000000..16c7e35
--- /dev/null
@@ -0,0 +1,131 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 New Dream Network, LLC (DreamHost)
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Mark McClain, DreamHost
+
+import contextlib
+import mock
+import testtools
+
+from quantum.plugins.services.agent_loadbalancer.drivers.haproxy import (
+    namespace_driver
+)
+
+
+class TestHaproxyNSDriver(testtools.TestCase):
+    def setUp(self):
+        super(TestHaproxyNSDriver, self).setUp()
+
+        self.vif_driver = mock.Mock()
+        self.vip_plug_callback = mock.Mock()
+
+        self.driver = namespace_driver.HaproxyNSDriver(
+            'sudo',
+            '/the/path',
+            self.vif_driver,
+            self.vip_plug_callback
+        )
+
+        self.fake_config = {
+            'pool': {'id': 'pool_id'},
+            'vip': {'id': 'vip_id', 'port': {'id': 'port_id'}}
+        }
+
+    def test_create(self):
+        with mock.patch.object(self.driver, '_plug') as plug:
+            with mock.patch.object(self.driver, '_spawn') as spawn:
+                self.driver.create(self.fake_config)
+
+                plug.assert_called_once_with(
+                    'qlbaas-pool_id', {'id': 'port_id'}
+                )
+                spawn.assert_called_once_with(self.fake_config)
+
+    def test_update(self):
+        with contextlib.nested(
+            mock.patch.object(self.driver, '_get_state_file_path'),
+            mock.patch.object(self.driver, '_spawn'),
+            mock.patch('__builtin__.open')
+        ) as (gsp, spawn, mock_open):
+            mock_open.return_value = ['5']
+
+            self.driver.update(self.fake_config)
+
+            mock_open.assert_called_once_with(gsp.return_value, 'r')
+            spawn.assert_called_once_with(self.fake_config, ['-sf', '5'])
+
+    def test_spawn(self):
+        with contextlib.nested(
+            mock.patch.object(namespace_driver.hacfg, 'save_config'),
+            mock.patch.object(self.driver, '_get_state_file_path'),
+            mock.patch('quantum.agent.linux.ip_lib.IPWrapper')
+        ) as (mock_save, gsp, ip_wrap):
+            gsp.side_effect = lambda x, y: y
+
+            self.driver._spawn(self.fake_config)
+
+            mock_save.assert_called_once_with('conf', self.fake_config, 'sock')
+            cmd = ['haproxy', '-f', 'conf', '-p', 'pid']
+            ip_wrap.assert_has_calls([
+                mock.call('sudo', 'qlbaas-pool_id'),
+                mock.call().netns.execute(cmd)
+            ])
+
+    def test_destroy(self):
+        with contextlib.nested(
+            mock.patch.object(self.driver, '_get_state_file_path'),
+            mock.patch.object(namespace_driver, 'kill_pids_in_file'),
+            mock.patch.object(self.driver, '_unplug'),
+            mock.patch('quantum.agent.linux.ip_lib.IPWrapper'),
+            mock.patch('os.path.isdir'),
+            mock.patch('shutil.rmtree')
+        ) as (gsp, kill, unplug, ip_wrap, isdir, rmtree):
+            gsp.side_effect = lambda x, y: '/pool/' + y
+
+            self.driver.pool_to_port_id['pool_id'] = 'port_id'
+            isdir.return_value = True
+
+            self.driver.destroy('pool_id')
+
+            kill.assert_called_once_with(ip_wrap(), '/pool/pid')
+            unplug.assert_called_once_with('qlbaas-pool_id', 'port_id')
+            isdir.called_once_with('/pool')
+            rmtree.assert_called_once_with('/pool')
+            ip_wrap.assert_has_calls([
+                mock.call('sudo', 'qlbaas-pool_id'),
+                mock.call().garbage_collect_namespace()
+            ])
+
+    def test_exists(self):
+        with contextlib.nested(
+            mock.patch.object(self.driver, '_get_state_file_path'),
+            mock.patch('quantum.agent.linux.ip_lib.IPWrapper'),
+            mock.patch('socket.socket'),
+            mock.patch('os.path.exists'),
+        ) as (gsp, ip_wrap, socket, path_exists):
+            gsp.side_effect = lambda x, y: '/pool/' + y
+
+            ip_wrap.return_value.netns.exists.return_value = True
+            path_exists.return_value = True
+
+            self.driver.exists('pool_id')
+
+            ip_wrap.assert_has_calls([
+                mock.call('sudo'),
+                mock.call().netns.exists('qlbaas-pool_id')
+            ])
+
+            self.assertTrue(self.driver.exists('pool_id'))
diff --git a/quantum/tests/unit/services/agent_loadbalancer/test_plugin.py b/quantum/tests/unit/services/agent_loadbalancer/test_plugin.py
new file mode 100644 (file)
index 0000000..a4289ef
--- /dev/null
@@ -0,0 +1,263 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack LLC.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+#
+# @author: Mark McClain, DreamHost
+
+import mock
+import testtools
+
+from quantum import context
+from quantum import manager
+from quantum.plugins.common import constants
+from quantum.plugins.services.agent_loadbalancer import plugin
+from quantum.tests.unit.db.loadbalancer import test_db_loadbalancer
+
+
+class TestLoadBalancerPluginBase(
+    test_db_loadbalancer.LoadBalancerPluginDbTestCase):
+
+    def setUp(self):
+        super(TestLoadBalancerPluginBase, self).setUp()
+
+        # create another API instance to make testing easier
+        # pass a mock to our API instance
+
+        # we need access to loaded plugins to modify models
+        loaded_plugins = manager.QuantumManager().get_service_plugins()
+        self.plugin_instance = loaded_plugins[constants.LOADBALANCER]
+        self.callbacks = self.plugin_instance.callbacks
+
+
+class TestLoadBalancerCallbacks(TestLoadBalancerPluginBase):
+    def test_get_ready_devices(self):
+        with self.vip() as vip:
+            ready = self.callbacks.get_ready_devices(
+                context.get_admin_context(),
+            )
+            self.assertEqual(ready, [vip['vip']['pool_id']])
+
+    def test_get_ready_devices_inactive_vip(self):
+        with self.vip() as vip:
+
+            # set the vip inactive need to use plugin directly since
+            # status is not tenant mutable
+            self.plugin_instance.update_vip(
+                context.get_admin_context(),
+                vip['vip']['id'],
+                {'vip': {'status': constants.INACTIVE}}
+            )
+
+            ready = self.callbacks.get_ready_devices(
+                context.get_admin_context(),
+            )
+            self.assertFalse(ready)
+
+    def test_get_ready_devices_inactive_pool(self):
+        with self.vip() as vip:
+
+            # set the pool inactive need to use plugin directly since
+            # status is not tenant mutable
+            self.plugin_instance.update_pool(
+                context.get_admin_context(),
+                vip['vip']['pool_id'],
+                {'pool': {'status': constants.INACTIVE}}
+            )
+
+            ready = self.callbacks.get_ready_devices(
+                context.get_admin_context(),
+            )
+            self.assertFalse(ready)
+
+    def test_get_logical_device_inactive(self):
+        with self.pool() as pool:
+            with self.vip(pool=pool) as vip:
+                with self.member(pool_id=vip['vip']['pool_id']) as member:
+                    self.assertRaises(
+                        Exception,
+                        self.callbacks.get_logical_device,
+                        context.get_admin_context(),
+                        pool['pool']['id'],
+                        activate=False
+                    )
+
+    def test_get_logical_device_activate(self):
+        with self.pool() as pool:
+            with self.vip(pool=pool) as vip:
+                with self.member(pool_id=vip['vip']['pool_id']) as member:
+                    ctx = context.get_admin_context()
+
+                    # build the expected
+                    port = self.plugin_instance._core_plugin.get_port(
+                        ctx, vip['vip']['port_id']
+                    )
+                    subnet = self.plugin_instance._core_plugin.get_subnet(
+                        ctx, vip['vip']['subnet_id']
+                    )
+                    port['fixed_ips'][0]['subnet'] = subnet
+
+                    # reload pool to add members and vip
+                    pool = self.plugin_instance.get_pool(
+                        ctx, pool['pool']['id']
+                    )
+
+                    pool['status'] = constants.ACTIVE
+                    vip['vip']['status'] = constants.ACTIVE
+                    vip['vip']['port'] = port
+                    member['member']['status'] = constants.ACTIVE
+
+                    expected = {
+                        'pool': pool,
+                        'vip': vip['vip'],
+                        'members': [member['member']],
+                        'healthmonitors': []
+                    }
+
+                    logical_config = self.callbacks.get_logical_device(
+                        ctx, pool['id'], activate=True
+                    )
+
+                    self.assertEqual(logical_config, expected)
+
+    def _update_port_test_helper(self, expected, func, **kwargs):
+        core = self.plugin_instance._core_plugin
+
+        with self.pool() as pool:
+            with self.vip(pool=pool) as vip:
+                with self.member(pool_id=vip['vip']['pool_id']) as member:
+                    ctx = context.get_admin_context()
+                    func(ctx, port_id=vip['vip']['port_id'], **kwargs)
+
+                    db_port = core.get_port(ctx, vip['vip']['port_id'])
+
+                    for k, v in expected.iteritems():
+                        self.assertEqual(db_port[k], v)
+
+    def test_plug_vip_port(self):
+        exp = {
+            'device_owner': 'quantum:' + constants.LOADBALANCER,
+            'device_id': 'c596ce11-db30-5c72-8243-15acaae8690f',
+            'admin_state_up': True
+        }
+        self._update_port_test_helper(
+            exp,
+            self.callbacks.plug_vip_port,
+            host='host'
+        )
+
+    def test_unplug_vip_port(self):
+        exp = {
+            'device_owner': '',
+            'device_id': '',
+            'admin_state_up': False
+        }
+        self._update_port_test_helper(
+            exp,
+            self.callbacks.unplug_vip_port,
+            host='host'
+        )
+
+
+class TestLoadBalancerAgentApi(testtools.TestCase):
+    def setUp(self):
+        super(TestLoadBalancerAgentApi, self).setUp()
+        self.addCleanup(mock.patch.stopall)
+
+        self.api = plugin.LoadBalancerAgentApi('topic', 'host')
+        self.mock_cast = mock.patch.object(self.api, 'cast').start()
+        self.mock_msg = mock.patch.object(self.api, 'make_msg').start()
+
+    def test_init(self):
+        self.assertEqual(self.api.topic, 'topic')
+        self.assertEqual(self.api.host, 'host')
+
+    def _call_test_helper(self, method_name):
+        rv = getattr(self.api, method_name)(mock.sentinel.context, 'the_id')
+        self.assertEqual(rv, self.mock_cast.return_value)
+        self.mock_cast.assert_called_once_with(
+            mock.sentinel.context,
+            self.mock_msg.return_value,
+            topic='topic'
+        )
+
+        self.mock_msg.assert_called_once_with(
+            method_name,
+            pool_id='the_id',
+            host='host'
+        )
+
+    def test_reload_pool(self):
+        self._call_test_helper('reload_pool')
+
+    def test_destroy_pool(self):
+        self._call_test_helper('destroy_pool')
+
+    def test_modify_pool(self):
+        self._call_test_helper('modify_pool')
+
+
+class TestLoadBalancerPluginNotificationWrapper(TestLoadBalancerPluginBase):
+    def setUp(self):
+        self.log = mock.patch.object(plugin, 'LOG')
+        api_cls = mock.patch.object(plugin, 'LoadBalancerAgentApi').start()
+        super(TestLoadBalancerPluginNotificationWrapper, self).setUp()
+        self.mock_api = api_cls.return_value
+
+        self.addCleanup(mock.patch.stopall)
+
+    def test_create_vip(self):
+        with self.subnet() as subnet:
+            with self.pool(subnet=subnet) as pool:
+                with self.vip(pool=pool, subnet=subnet) as vip:
+                    self.mock_api.reload_pool.assert_called_once_with(
+                        mock.ANY,
+                        vip['vip']['pool_id']
+                    )
+
+    def test_update_vip(self):
+        with self.subnet() as subnet:
+            with self.pool(subnet=subnet) as pool:
+                with self.vip(pool=pool, subnet=subnet) as vip:
+                    self.mock_api.reset_mock()
+                    ctx = context.get_admin_context()
+                    vip['vip'].pop('status')
+                    new_vip = self.plugin_instance.update_vip(
+                        ctx,
+                        vip['vip']['id'],
+                        vip
+                    )
+
+                    self.mock_api.reload_pool.assert_called_once_with(
+                        mock.ANY,
+                        vip['vip']['pool_id']
+                    )
+
+                    self.assertEqual(
+                        new_vip['status'],
+                        constants.PENDING_UPDATE
+                    )
+
+    def t2est_delete_vip(self):
+        with self.subnet() as subnet:
+            with self.pool(subnet=subnet) as pool:
+                with self.vip(pool=pool, subnet=subnet, no_delete=True) as vip:
+                    self.mock_api.reset_mock()
+                    ctx = context.get_admin_context()
+                    self.plugin_instance.delete_vip(context, vip['vip']['id'])
+                    self.mock_api.destroy_pool.assert_called_once_with(
+                        mock.ANY,
+                        vip['vip']['pool_id']
+                    )
index 840d9dcb7c38c93a61c26b43f148234fc8940251..b1f5c0e7fa14919b598e779b3e5a019a61e59627 100644 (file)
@@ -70,3 +70,21 @@ class AgentUtilsGetInterfaceMAC(testtools.TestCase):
                                           '\x00' * 232])
             actual_val = utils.get_interface_mac('eth0')
         self.assertEqual(actual_val, expect_val)
+
+
+class AgentUtilsReplaceFile(testtools.TestCase):
+    def test_replace_file(self):
+        # make file to replace
+        with mock.patch('tempfile.NamedTemporaryFile') as ntf:
+            ntf.return_value.name = '/baz'
+            with mock.patch('os.chmod') as chmod:
+                with mock.patch('os.rename') as rename:
+                    utils.replace_file('/foo', 'bar')
+
+                    expected = [mock.call('w+', dir='/', delete=False),
+                                mock.call().write('bar'),
+                                mock.call().close()]
+
+                    ntf.assert_has_calls(expected)
+                    chmod.assert_called_once_with('/baz', 0644)
+                    rename.assert_called_once_with('/baz', '/foo')
index c37139a93a437c1bb2ac8f9951e54e0b8fc0aad7..ee9557cff0680a2fecaaa907b478ed4e809913a0 100644 (file)
@@ -138,22 +138,6 @@ class TestDhcpBase(testtools.TestCase):
     def test_base_abc_error(self):
         self.assertRaises(TypeError, dhcp.DhcpBase, None)
 
-    def test_replace_file(self):
-        # make file to replace
-        with mock.patch('tempfile.NamedTemporaryFile') as ntf:
-            ntf.return_value.name = '/baz'
-            with mock.patch('os.chmod') as chmod:
-                with mock.patch('os.rename') as rename:
-                    dhcp.replace_file('/foo', 'bar')
-
-                    expected = [mock.call('w+', dir='/', delete=False),
-                                mock.call().write('bar'),
-                                mock.call().close()]
-
-                    ntf.assert_has_calls(expected)
-                    chmod.assert_called_once_with('/baz', 0644)
-                    rename.assert_called_once_with('/baz', '/foo')
-
     def test_restart(self):
         class SubClass(dhcp.DhcpBase):
             def __init__(self):
@@ -212,7 +196,7 @@ class TestBase(testtools.TestCase):
         self.conf.set_override('state_path', '')
         self.conf.use_namespaces = True
 
-        self.replace_p = mock.patch('quantum.agent.linux.dhcp.replace_file')
+        self.replace_p = mock.patch('quantum.agent.linux.utils.replace_file')
         self.execute_p = mock.patch('quantum.agent.linux.utils.execute')
         self.addCleanup(self.execute_p.stop)
         self.safe = self.replace_p.start()
@@ -392,7 +376,7 @@ class TestDhcpLocalProcess(TestBase):
             self.assertEqual(lp.interface_name, 'tap0')
 
     def test_set_interface_name(self):
-        with mock.patch('quantum.agent.linux.dhcp.replace_file') as replace:
+        with mock.patch('quantum.agent.linux.utils.replace_file') as replace:
             lp = LocalChild(self.conf, FakeDualNetwork())
             with mock.patch.object(lp, 'get_conf_file_name') as conf_file:
                 conf_file.return_value = '/interface'
index e265ab2eca7181a1a96ec309acad11f9a7f8785b..ac701900a71cf002a84a2f92f248f0d546aa4acd 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -137,6 +137,8 @@ else:
         'quantum-debug = quantum.debug.shell:main',
         'quantum-ovs-cleanup = quantum.agent.ovs_cleanup_util:main',
         'quantum-db-manage = quantum.db.migration.cli:main',
+        ('quantum-lbaas-agent = '
+         'quantum.plugins.services.agent_loadbalancer.agent:main'),
         ('quantum-check-nvp-config = '
          'quantum.plugins.nicira.nicira_nvp_plugin.check_nvp_config:main'),
     ]