From 220b3f265681891c2dfd7cc1d2548457a5a44ad2 Mon Sep 17 00:00:00 2001 From: Leon Cui Date: Wed, 12 Dec 2012 16:04:24 -0800 Subject: [PATCH] The change implemented Lbaas CRUD Sqlalchemy operations. - This change only contains the database access implementation of LBaaS API. There is no real configuration down to device side. - This change implements a loadbalancer plugin which doesn't do much thing but only db access. The object status needs to be updated when integrate with the lbaas agent. - This change follows the new LBaaS API 1.0 model. Please see the spec on wiki. Implements: blueprint lbaas-plugin-api-crud Change-Id: I1bccec8c29e3e9486506a5b52ce69af0480b2300 --- quantum/db/loadbalancer/__init__.py | 15 + quantum/db/loadbalancer/loadbalancer_db.py | 657 +++++++++++++ quantum/extensions/loadbalancer.py | 33 + quantum/plugins/common/constants.py | 8 + .../plugins/services/loadbalancer/__init__.py | 16 + .../loadbalancer/loadbalancerPlugin.py | 252 +++++ quantum/tests/unit/db/__init__.py | 15 + .../tests/unit/db/loadbalancer/__init__.py | 15 + .../db/loadbalancer/test_db_loadbalancer.py | 868 ++++++++++++++++++ 9 files changed, 1879 insertions(+) create mode 100644 quantum/db/loadbalancer/__init__.py create mode 100644 quantum/db/loadbalancer/loadbalancer_db.py create mode 100644 quantum/plugins/services/loadbalancer/__init__.py create mode 100644 quantum/plugins/services/loadbalancer/loadbalancerPlugin.py create mode 100644 quantum/tests/unit/db/__init__.py create mode 100644 quantum/tests/unit/db/loadbalancer/__init__.py create mode 100644 quantum/tests/unit/db/loadbalancer/test_db_loadbalancer.py diff --git a/quantum/db/loadbalancer/__init__.py b/quantum/db/loadbalancer/__init__.py new file mode 100644 index 000000000..009a3d7b7 --- /dev/null +++ b/quantum/db/loadbalancer/__init__.py @@ -0,0 +1,15 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/quantum/db/loadbalancer/loadbalancer_db.py b/quantum/db/loadbalancer/loadbalancer_db.py new file mode 100644 index 000000000..41a7be830 --- /dev/null +++ b/quantum/db/loadbalancer/loadbalancer_db.py @@ -0,0 +1,657 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# +# Copyright 2013 OpenStack LLC. All rights reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import sqlalchemy as sa +from sqlalchemy import orm +from sqlalchemy.orm import exc +from sqlalchemy.sql import expression as expr +import webob.exc as w_exc + +from quantum import policy +from quantum.api.v2 import attributes +from quantum.common import exceptions as q_exc +from quantum.db import db_base_plugin_v2 +from quantum.db import model_base +from quantum.db import models_v2 +from quantum.extensions import loadbalancer +from quantum.extensions.loadbalancer import LoadBalancerPluginBase +from quantum.openstack.common import cfg +from quantum.openstack.common import log as logging +from quantum.openstack.common import uuidutils +from quantum.plugins.common import constants + + +LOG = logging.getLogger(__name__) + + +class SessionPersistence(model_base.BASEV2): + vip_id = sa.Column(sa.String(36), + sa.ForeignKey("vips.id"), + primary_key=True) + type = sa.Column(sa.Enum("SOURCE_IP", + "HTTP_COOKIE", + "APP_COOKIE", + name="type"), + nullable=False) + cookie_name = sa.Column(sa.String(1024)) + + +class PoolStatistics(model_base.BASEV2): + """Represents pool statistics """ + pool_id = sa.Column(sa.String(36), sa.ForeignKey("pools.id"), + primary_key=True) + bytes_in = sa.Column(sa.Integer, nullable=False) + bytes_out = sa.Column(sa.Integer, nullable=False) + active_connections = sa.Column(sa.Integer, nullable=False) + total_connections = sa.Column(sa.Integer, nullable=False) + + +class Vip(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): + """Represents a v2 quantum loadbalancer vip.""" + name = sa.Column(sa.String(255)) + description = sa.Column(sa.String(255)) + subnet_id = sa.Column(sa.String(36), nullable=False) + address = sa.Column(sa.String(64)) + port = sa.Column(sa.Integer, nullable=False) + protocol = sa.Column(sa.Enum("HTTP", "HTTPS", name="protocol"), + nullable=False) + pool_id = sa.Column(sa.String(36), nullable=False) + session_persistence = orm.relationship(SessionPersistence, + uselist=False, + backref="vips", + cascade="all, delete-orphan") + status = sa.Column(sa.String(16), nullable=False) + admin_state_up = sa.Column(sa.Boolean(), nullable=False) + connection_limit = sa.Column(sa.Integer) + + +class Member(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): + """Represents a v2 quantum loadbalancer member.""" + pool_id = sa.Column(sa.String(36), sa.ForeignKey("pools.id"), + nullable=False) + address = sa.Column(sa.String(64), nullable=False) + port = sa.Column(sa.Integer, nullable=False) + weight = sa.Column(sa.Integer, nullable=False) + status = sa.Column(sa.String(16), nullable=False) + admin_state_up = sa.Column(sa.Boolean(), nullable=False) + + +class Pool(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): + """Represents a v2 quantum loadbalancer pool.""" + vip_id = sa.Column(sa.String(36), sa.ForeignKey("vips.id")) + name = sa.Column(sa.String(255)) + description = sa.Column(sa.String(255)) + subnet_id = sa.Column(sa.String(36), nullable=False) + protocol = sa.Column(sa.String(64), nullable=False) + lb_method = sa.Column(sa.Enum("ROUND_ROBIN", + "LEAST_CONNECTIONS", + "SOURCE_IP"), + nullable=False) + status = sa.Column(sa.String(16), nullable=False) + admin_state_up = sa.Column(sa.Boolean(), nullable=False) + stats = orm.relationship(PoolStatistics, + uselist=False, + backref="pools", + cascade="all, delete-orphan") + members = orm.relationship(Member, backref="pools", + cascade="all, delete-orphan") + monitors = orm.relationship("PoolMonitorAssociation", backref="pools", + cascade="all, delete-orphan") + + +class HealthMonitor(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): + """Represents a v2 quantum loadbalancer healthmonitor.""" + type = sa.Column(sa.Enum("PING", "TCP", "HTTP", "HTTPS", name="type"), + nullable=False) + delay = sa.Column(sa.Integer, nullable=False) + timeout = sa.Column(sa.Integer, nullable=False) + max_retries = sa.Column(sa.Integer, nullable=False) + http_method = sa.Column(sa.String(16)) + url_path = sa.Column(sa.String(255)) + expected_codes = sa.Column(sa.String(64)) + status = sa.Column(sa.String(16), nullable=False) + admin_state_up = sa.Column(sa.Boolean(), nullable=False) + + +class PoolMonitorAssociation(model_base.BASEV2): + """ + Represents the many-to-many association between pool and + healthMonitor classes + """ + pool_id = sa.Column(sa.String(36), + sa.ForeignKey("pools.id"), + primary_key=True) + monitor_id = sa.Column(sa.String(36), + sa.ForeignKey("healthmonitors.id"), + primary_key=True) + monitor = orm.relationship("HealthMonitor", + backref="pools_poolmonitorassociations") + + +class LoadBalancerPluginDb(LoadBalancerPluginBase): + """ + A class that wraps the implementation of the Quantum + loadbalancer plugin database access interface using SQLAlchemy models. + """ + + # TODO(lcui): + # A set of internal facility methods are borrowed from QuantumDbPluginV2 + # class and hence this is duplicate. We need to pull out those methods + # into a seperate class which can be used by both QuantumDbPluginV2 and + # this class (and others). + def _get_tenant_id_for_create(self, context, resource): + if context.is_admin and 'tenant_id' in resource: + tenant_id = resource['tenant_id'] + elif ('tenant_id' in resource and + resource['tenant_id'] != context.tenant_id): + reason = _('Cannot create resource for another tenant') + raise q_exc.AdminRequired(reason=reason) + else: + tenant_id = context.tenant_id + return tenant_id + + def _fields(self, resource, fields): + if fields: + return dict((key, item) for key, item in resource.iteritems() + if key in fields) + return resource + + def _apply_filters_to_query(self, query, model, filters): + if filters: + for key, value in filters.iteritems(): + column = getattr(model, key, None) + if column: + query = query.filter(column.in_(value)) + return query + + def _get_collection_query(self, context, model, filters=None): + collection = self._model_query(context, model) + collection = self._apply_filters_to_query(collection, model, filters) + return collection + + def _get_collection(self, context, model, dict_func, filters=None, + fields=None): + query = self._get_collection_query(context, model, filters) + return [dict_func(c, fields) for c in query.all()] + + def _get_collection_count(self, context, model, filters=None): + return self._get_collection_query(context, model, filters).count() + + def _model_query(self, context, model): + query = context.session.query(model) + query_filter = None + if not context.is_admin and hasattr(model, 'tenant_id'): + if hasattr(model, 'shared'): + query_filter = ((model.tenant_id == context.tenant_id) | + (model.shared)) + else: + query_filter = (model.tenant_id == context.tenant_id) + + if query_filter is not None: + query = query.filter(query_filter) + return query + + def _get_by_id(self, context, model, id): + query = self._model_query(context, model) + return query.filter(model.id == id).one() + + def update_status(self, context, model, id, status): + with context.session.begin(subtransactions=True): + v_db = self._get_resource(context, model, id) + v_db.update({'status': status}) + + def _get_resource(self, context, model, id): + try: + r = self._get_by_id(context, model, id) + except exc.NoResultFound: + if issubclass(model, Vip): + raise loadbalancer.VipNotFound(vip_id=id) + elif issubclass(model, Pool): + raise loadbalancer.PoolNotFound(pool_id=id) + elif issubclass(model, Member): + raise loadbalancer.MemberNotFound(member_id=id) + elif issubclass(model, HealthMonitor): + raise loadbalancer.HealthMonitorNotFound(monitor_id=id) + else: + raise + return r + + ######################################################## + # VIP DB access + def _make_vip_dict(self, vip, fields=None): + res = {'id': vip['id'], + 'tenant_id': vip['tenant_id'], + 'name': vip['name'], + 'description': vip['description'], + 'subnet_id': vip['subnet_id'], + 'address': vip['address'], + 'port': vip['port'], + 'protocol': vip['protocol'], + 'pool_id': vip['pool_id'], + 'connection_limit': vip['connection_limit'], + 'admin_state_up': vip['admin_state_up'], + 'status': vip['status']} + if vip['session_persistence']: + res['session_persistence'] = { + 'type': vip['session_persistence']['type'], + 'cookie_name': vip['session_persistence']['cookie_name'] + } + return self._fields(res, fields) + + def _update_pool_vip_info(self, context, pool_id, vip_id): + pool_db = self._get_resource(context, Pool, pool_id) + with context.session.begin(subtransactions=True): + pool_db.update({'vip_id': vip_id}) + + def _update_vip_session_persistence_info(self, context, vip_id, info): + vip = self._get_resource(context, Vip, vip_id) + + with context.session.begin(subtransactions=True): + # Update sessionPersistence table + sess_qry = context.session.query(SessionPersistence) + sesspersist_db = sess_qry.filter_by(vip_id=vip_id).first() + if sesspersist_db: + sesspersist_db.update(info) + else: + sesspersist_db = SessionPersistence( + type=info['type'], + cookie_name=info['cookie_name'], + vip_id=vip_id) + context.session.add(sesspersist_db) + # Update vip table + vip.session_persistence = sesspersist_db + context.session.add(vip) + + def _delete_sessionpersistence(self, context, id): + with context.session.begin(subtransactions=True): + sess_qry = context.session.query(SessionPersistence) + sess_qry.filter_by(vip_id=id).delete() + + def create_vip(self, context, vip): + v = vip['vip'] + tenant_id = self._get_tenant_id_for_create(context, v) + + with context.session.begin(subtransactions=True): + if v['address'] == attributes.ATTR_NOT_SPECIFIED: + address = None + else: + address = v['address'] + vip_db = Vip(id=uuidutils.generate_uuid(), + tenant_id=tenant_id, + name=v['name'], + description=v['description'], + subnet_id=v['subnet_id'], + address=address, + port=v['port'], + protocol=v['protocol'], + pool_id=v['pool_id'], + connection_limit=v['connection_limit'], + admin_state_up=v['admin_state_up'], + status=constants.PENDING_CREATE) + vip_id = vip_db['id'] + + sessionInfo = v['session_persistence'] + if sessionInfo: + has_session_persistence = True + sesspersist_db = SessionPersistence( + type=sessionInfo['type'], + cookie_name=sessionInfo['cookie_name'], + vip_id=vip_id) + vip_db.session_persistence = sesspersist_db + + context.session.add(vip_db) + self._update_pool_vip_info(context, v['pool_id'], vip_id) + + vip_db = self._get_resource(context, Vip, vip_id) + return self._make_vip_dict(vip_db) + + def update_vip(self, context, id, vip): + v = vip['vip'] + + sesspersist_info = v.pop('session_persistence', None) + with context.session.begin(subtransactions=True): + if sesspersist_info: + self._update_vip_session_persistence_info(context, + id, + sesspersist_info) + + vip_db = self._get_resource(context, Vip, id) + old_pool_id = vip_db['pool_id'] + if v: + vip_db.update(v) + # If the pool_id is changed, we need to update + # the associated pools + if 'pool_id' in v: + self._update_pool_vip_info(context, old_pool_id, None) + self._update_pool_vip_info(context, v['pool_id'], id) + + return self._make_vip_dict(vip_db) + + def delete_vip(self, context, id): + with context.session.begin(subtransactions=True): + vip = self._get_resource(context, Vip, id) + qry = context.session.query(Pool) + for pool in qry.filter_by(vip_id=id).all(): + pool.update({"vip_id": None}) + context.session.delete(vip) + + def get_vip(self, context, id, fields=None): + vip = self._get_resource(context, Vip, id) + return self._make_vip_dict(vip, fields) + + def get_vips(self, context, filters=None, fields=None): + return self._get_collection(context, Vip, + self._make_vip_dict, + filters=filters, fields=fields) + + ######################################################## + # Pool DB access + def _make_pool_dict(self, context, pool, fields=None): + res = {'id': pool['id'], + 'tenant_id': pool['tenant_id'], + 'name': pool['name'], + 'description': pool['description'], + 'subnet_id': pool['subnet_id'], + 'protocol': pool['protocol'], + 'vip_id': pool['vip_id'], + 'lb_method': pool['lb_method'], + 'admin_state_up': pool['admin_state_up'], + 'status': pool['status']} + + # Get the associated members + res['members'] = [member['id'] for member in pool['members']] + + # Get the associated health_monitors + res['health_monitors'] = [ + monitor['monitor_id'] for monitor in pool['monitors']] + + return self._fields(res, fields) + + def _update_pool_member_info(self, context, pool_id, membersInfo): + with context.session.begin(subtransactions=True): + member_qry = context.session.query(Member) + for member_id in membersInfo: + try: + member = member_qry.filter_by(id=member_id).one() + member.update({'pool_id': pool_id}) + except exc.NoResultFound: + raise loadbalancer.MemberNotFound(member_id=member_id) + + def _create_pool_stats(self, context, pool_id): + # This is internal method to add pool statistics. It won't + # be exposed to API + stats_db = PoolStatistics( + pool_id=pool_id, + bytes_in=0, + bytes_out=0, + active_connections=0, + total_connections=0 + ) + return stats_db + + def _delete_pool_stats(self, context, pool_id): + # This is internal method to delete pool statistics. It won't + # be exposed to API + with context.session.begin(subtransactions=True): + stats_qry = context.session.query(PoolStatistics) + try: + stats = stats_qry.filter_by(pool_id=pool_id).one() + except exc.NoResultFound: + raise loadbalancer.PoolStatsNotFound(pool_id=pool_id) + context.session.delete(stats) + + def create_pool(self, context, pool): + v = pool['pool'] + + tenant_id = self._get_tenant_id_for_create(context, v) + with context.session.begin(subtransactions=True): + pool_db = Pool(id=uuidutils.generate_uuid(), + tenant_id=tenant_id, + name=v['name'], + description=v['description'], + subnet_id=v['subnet_id'], + protocol=v['protocol'], + lb_method=v['lb_method'], + admin_state_up=v['admin_state_up'], + status=constants.PENDING_CREATE) + pool_db.stats = self._create_pool_stats(context, pool_db['id']) + context.session.add(pool_db) + + pool_db = self._get_resource(context, Pool, pool_db['id']) + return self._make_pool_dict(context, pool_db) + + def update_pool(self, context, id, pool): + v = pool['pool'] + + with context.session.begin(subtransactions=True): + pool_db = self._get_resource(context, Pool, id) + if v: + pool_db.update(v) + + return self._make_pool_dict(context, pool_db) + + def delete_pool(self, context, id): + # Check if the pool is in use + vip = context.session.query(Vip).filter_by(pool_id=id).first() + if vip: + raise loadbalancer.PoolInUse(pool_id=id) + + with context.session.begin(subtransactions=True): + self._delete_pool_stats(context, id) + pool_db = self._get_resource(context, Pool, id) + context.session.delete(pool_db) + + def get_pool(self, context, id, fields=None): + pool = self._get_resource(context, Pool, id) + return self._make_pool_dict(context, pool, fields) + + def get_pools(self, context, filters=None, fields=None): + collection = self._model_query(context, Pool) + collection = self._apply_filters_to_query(collection, Pool, filters) + return [self._make_pool_dict(context, c, fields) + for c in collection.all()] + + def get_stats(self, context, pool_id): + with context.session.begin(subtransactions=True): + pool_qry = context.session.query(Pool) + try: + pool = pool_qry.filter_by(id=pool_id).one() + stats = pool['stats'] + except exc.NoResultFound: + raise loadbalancer.PoolStatsNotFound(pool_id=pool_id) + + res = {'bytes_in': stats['bytes_in'], + 'bytes_out': stats['bytes_out'], + 'active_connections': stats['active_connections'], + 'total_connections': stats['total_connections']} + return {'stats': res} + + def create_pool_health_monitor(self, context, health_monitor, pool_id): + monitor_id = health_monitor['health_monitor']['id'] + with context.session.begin(subtransactions=True): + monitor_qry = context.session.query(HealthMonitor) + try: + monitor = monitor_qry.filter_by(id=monitor_id).one() + monitor.update({'pool_id': pool_id}) + except exc.NoResultFound: + raise loadbalancer.HealthMonitorNotFound(monitor_id=monitor_id) + try: + qry = context.session.query(Pool) + pool = qry.filter_by(id=pool_id).one() + except exc.NoResultFound: + raise loadbalancer.PoolNotFound(pool_id=pool_id) + + assoc = PoolMonitorAssociation(pool_id=pool_id, + monitor_id=monitor_id) + assoc.monitor = monitor + pool.monitors.append(assoc) + + monitors = [] + try: + qry = context.session.query(Pool) + pool = qry.filter_by(id=pool_id).one() + for monitor in pool['monitors']: + monitors.append(monitor['monitor_id']) + except exc.NoResultFound: + pass + + res = {"health_monitor": monitors} + return res + + def delete_pool_health_monitor(self, context, id, pool_id): + with context.session.begin(subtransactions=True): + try: + pool_qry = context.session.query(Pool) + pool = pool_qry.filter_by(id=pool_id).one() + except exc.NoResultFound: + raise loadbalancer.PoolNotFound(pool_id=pool_id) + try: + monitor_qry = context.session.query(PoolMonitorAssociation) + monitor = monitor_qry.filter_by(monitor_id=id, + pool_id=pool_id).one() + pool.monitors.remove(monitor) + except exc.NoResultFound: + raise loadbalancer.HealthMonitorNotFound(monitor_id=id) + + def get_pool_health_monitor(self, context, id, pool_id, fields=None): + healthmonitor = self._get_resource(context, HealthMonitor, id) + return self._make_health_monitor_dict(healthmonitor, fields) + + ######################################################## + # Member DB access + def _make_member_dict(self, member, fields=None): + res = {'id': member['id'], + 'tenant_id': member['tenant_id'], + 'pool_id': member['pool_id'], + 'address': member['address'], + 'port': member['port'], + 'weight': member['weight'], + 'admin_state_up': member['admin_state_up'], + 'status': member['status']} + return self._fields(res, fields) + + def create_member(self, context, member): + v = member['member'] + tenant_id = self._get_tenant_id_for_create(context, v) + + with context.session.begin(subtransactions=True): + pool = None + try: + qry = context.session.query(Pool) + pool = qry.filter_by(id=v['pool_id']).one() + except exc.NoResultFound: + raise loadbalancer.PoolNotFound(pool_id=v['pool_id']) + + member_db = Member(id=uuidutils.generate_uuid(), + tenant_id=tenant_id, + pool_id=v['pool_id'], + address=v['address'], + port=v['port'], + weight=v['weight'], + admin_state_up=v['admin_state_up'], + status=constants.PENDING_CREATE) + context.session.add(member_db) + + return self._make_member_dict(member_db) + + def update_member(self, context, id, member): + v = member['member'] + with context.session.begin(subtransactions=True): + member_db = self._get_resource(context, Member, id) + old_pool_id = member_db['pool_id'] + if v: + member_db.update(v) + + return self._make_member_dict(member_db) + + def delete_member(self, context, id): + with context.session.begin(subtransactions=True): + member_db = self._get_resource(context, Member, id) + context.session.delete(member_db) + + def get_member(self, context, id, fields=None): + member = self._get_resource(context, Member, id) + return self._make_member_dict(member, fields) + + def get_members(self, context, filters=None, fields=None): + return self._get_collection(context, Member, + self._make_member_dict, + filters=filters, fields=fields) + + ######################################################## + # HealthMonitor DB access + def _make_health_monitor_dict(self, health_monitor, fields=None): + res = {'id': health_monitor['id'], + 'tenant_id': health_monitor['tenant_id'], + 'type': health_monitor['type'], + 'delay': health_monitor['delay'], + 'timeout': health_monitor['timeout'], + 'max_retries': health_monitor['max_retries'], + 'http_method': health_monitor['http_method'], + 'url_path': health_monitor['url_path'], + 'expected_codes': health_monitor['expected_codes'], + 'admin_state_up': health_monitor['admin_state_up'], + 'status': health_monitor['status']} + return self._fields(res, fields) + + def create_health_monitor(self, context, health_monitor): + v = health_monitor['health_monitor'] + tenant_id = self._get_tenant_id_for_create(context, v) + with context.session.begin(subtransactions=True): + monitor_db = HealthMonitor(id=uuidutils.generate_uuid(), + tenant_id=tenant_id, + type=v['type'], + delay=v['delay'], + timeout=v['timeout'], + max_retries=v['max_retries'], + http_method=v['http_method'], + url_path=v['url_path'], + expected_codes=v['expected_codes'], + admin_state_up=v['admin_state_up'], + status=constants.PENDING_CREATE) + context.session.add(monitor_db) + return self._make_health_monitor_dict(monitor_db) + + def update_health_monitor(self, context, id, health_monitor): + v = health_monitor['health_monitor'] + with context.session.begin(subtransactions=True): + monitor_db = self._get_resource(context, HealthMonitor, id) + if v: + monitor_db.update(v) + return self._make_health_monitor_dict(monitor_db) + + def delete_health_monitor(self, context, id): + with context.session.begin(subtransactions=True): + assoc_qry = context.session.query(PoolMonitorAssociation) + pool_qry = context.session.query(Pool) + for assoc in assoc_qry.filter_by(monitor_id=id).all(): + try: + pool = pool_qry.filter_by(id=assoc['pool_id']).one() + except exc.NoResultFound: + raise loadbalancer.PoolNotFound(pool_id=pool['id']) + pool.monitors.remove(assoc) + monitor_db = self._get_resource(context, HealthMonitor, id) + context.session.delete(monitor_db) + + def get_health_monitor(self, context, id, fields=None): + healthmonitor = self._get_resource(context, HealthMonitor, id) + return self._make_health_monitor_dict(healthmonitor, fields) + + def get_health_monitors(self, context, filters=None, fields=None): + return self._get_collection(context, HealthMonitor, + self._make_health_monitor_dict, + filters=filters, fields=fields) diff --git a/quantum/extensions/loadbalancer.py b/quantum/extensions/loadbalancer.py index 58822da41..de66a48bc 100644 --- a/quantum/extensions/loadbalancer.py +++ b/quantum/extensions/loadbalancer.py @@ -20,11 +20,41 @@ import abc from quantum.api import extensions from quantum.api.v2 import attributes as attr from quantum.api.v2 import base +from quantum.common import exceptions as qexception from quantum import manager from quantum.plugins.common import constants from quantum.plugins.services.service_base import ServicePluginBase +# Loadbalancer Exceptions +class VipNotFound(qexception.NotFound): + message = _("Vip %(vip_id)s could not be found") + + +class PoolNotFound(qexception.NotFound): + message = _("Pool %(pool_id)s could not be found") + + +class MemberNotFound(qexception.NotFound): + message = _("Member %(member_id)s could not be found") + + +class HealthMonitorNotFound(qexception.NotFound): + message = _("Health_monitor %(monitor_id)s could not be found") + + +class StateInvalid(qexception.QuantumException): + message = _("Invalid state %(state)s of Loadbalancer resource %(id)s") + + +class PoolInUse(qexception.InUse): + message = _("Pool %(pool_id)s is still in use") + + +class PoolStatsNotFound(qexception.NotFound): + message = _("Statistics of Pool %(pool_id)s could not be found") + + RESOURCE_ATTRIBUTE_MAP = { 'vips': { 'id': {'allow_post': False, 'allow_put': False, @@ -280,6 +310,9 @@ class Loadbalancer(extensions.ExtensionDescriptor): class LoadBalancerPluginBase(ServicePluginBase): __metaclass__ = abc.ABCMeta + def get_plugin_name(self): + return constants.LOADBALANCER + def get_plugin_type(self): return constants.LOADBALANCER diff --git a/quantum/plugins/common/constants.py b/quantum/plugins/common/constants.py index 2a5e27d46..2a560d6bd 100644 --- a/quantum/plugins/common/constants.py +++ b/quantum/plugins/common/constants.py @@ -28,3 +28,11 @@ COMMON_PREFIXES = { DUMMY: "/dummy_svc", LOADBALANCER: "/lb", } + +# Service operation status constants +ACTIVE = "ACTIVE" +PENDING_CREATE = "PENDING_CREATE" +PENDING_UPDATE = "PENDING_UPDATE" +PENDING_DELETE = "PENDING_DELETE" +INACTIVE = "INACTIVE" +ERROR = "ERROR" diff --git a/quantum/plugins/services/loadbalancer/__init__.py b/quantum/plugins/services/loadbalancer/__init__.py new file mode 100644 index 000000000..c65c52d23 --- /dev/null +++ b/quantum/plugins/services/loadbalancer/__init__.py @@ -0,0 +1,16 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/quantum/plugins/services/loadbalancer/loadbalancerPlugin.py b/quantum/plugins/services/loadbalancer/loadbalancerPlugin.py new file mode 100644 index 000000000..0edb6e52c --- /dev/null +++ b/quantum/plugins/services/loadbalancer/loadbalancerPlugin.py @@ -0,0 +1,252 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from quantum.db import api as qdbapi +from quantum.db import model_base +from quantum.db.loadbalancer import loadbalancer_db +from quantum.extensions import loadbalancer +from quantum.openstack.common import log as logging +from quantum.plugins.common import constants + +LOG = logging.getLogger(__name__) + + +class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb): + + """ + Implementation of the Quantum Loadbalancer Service Plugin. + + This class manages the workflow of LBaaS request/response. + Most DB related works are implemented in class + loadbalancer_db.LoadBalancerPluginDb. + """ + supported_extension_aliases = ["lbaas"] + + def __init__(self): + """ + Do the initialization for the loadbalancer service plugin here. + """ + qdbapi.register_models(base=model_base.BASEV2) + + # TODO: we probably need to setup RPC channel (to talk to LbAgent) here + + def get_plugin_type(self): + return constants.LOADBALANCER + + def get_plugin_description(self): + return "Quantum LoadBalancer Service Plugin" + + def create_vip(self, context, vip): + v = super(LoadBalancerPlugin, self).create_vip(context, vip) + self.update_status(context, loadbalancer_db.Vip, v['id'], + constants.PENDING_CREATE) + LOG.debug(_("Create vip: %s") % v['id']) + + # If we adopt asynchronous mode, this method should return immediately + # and let client to query the object status. The plugin will listen on + # the event from device and update the object status by calling + # self.update_state(context, Vip, id, ACTIVE/ERROR) + # + # In synchronous mode, send the request to device here and wait for + # response. Eventually update the object status prior to the return. + v_query = self.get_vip(context, v['id']) + return v_query + + def update_vip(self, context, id, vip): + v_query = self.get_vip( + context, id, fields=["status"]) + if v_query['status'] in [ + constants.PENDING_DELETE, constants.ERROR]: + raise loadbalancer.StateInvalid(id=id, + state=v_query['status']) + + v = super(LoadBalancerPlugin, self).update_vip(context, id, vip) + self.update_status(context, loadbalancer_db.Vip, id, + constants.PENDING_UPDATE) + LOG.debug(_("Update vip: %s"), id) + + # TODO notify lbagent + v_rt = self.get_vip(context, id) + return v_rt + + def delete_vip(self, context, id): + self.update_status(context, loadbalancer_db.Vip, id, + constants.PENDING_DELETE) + LOG.debug(_("Delete vip: %s"), id) + + # TODO notify lbagent + super(LoadBalancerPlugin, self).delete_vip(context, id) + + def get_vip(self, context, id, fields=None): + res = super(LoadBalancerPlugin, self).get_vip(context, id, fields) + LOG.debug(_("Get vip: %s"), id) + return res + + def get_vips(self, context, filters=None, fields=None): + res = super(LoadBalancerPlugin, self).get_vips_db( + context, filters, fields) + LOG.debug(_("Get vips")) + return res + + def create_pool(self, context, pool): + p = super(LoadBalancerPlugin, self).create_pool(context, pool) + self.update_status(context, loadbalancer_db.Pool, p['id'], + constants.PENDING_CREATE) + LOG.debug(_("Create pool: %s"), p['id']) + + # TODO notify lbagent + p_rt = self.get_pool(context, p['id']) + return p_rt + + def update_pool(self, context, id, pool): + p_query = self.get_pool(context, id, fields=["status"]) + if p_query['status'] in [ + constants.PENDING_DELETE, constants.ERROR]: + raise loadbalancer.StateInvalid(id=id, + state=p_query['status']) + p = super(LoadBalancerPlugin, self).update_pool(context, id, pool) + LOG.debug(_("Update pool: %s"), p['id']) + # TODO notify lbagent + p_rt = self.get_pool(context, id) + return p_rt + + def delete_pool(self, context, id): + self.update_status(context, loadbalancer_db.Pool, id, + constants.PENDING_DELETE) + # TODO notify lbagent + super(LoadBalancerPlugin, self).delete_pool(context, id) + LOG.debug(_("Delete pool: %s"), id) + + def get_pool(self, context, id, fields=None): + res = super(LoadBalancerPlugin, self).get_pool(context, id, fields) + LOG.debug(_("Get pool: %s"), id) + return res + + def get_pools(self, context, filters=None, fields=None): + res = super(LoadBalancerPlugin, self).get_pools( + context, filters, fields) + LOG.debug(_("Get Pools")) + return res + + def stats(self, context, pool_id): + res = super(LoadBalancerPlugin, self).get_stats(context, pool_id) + LOG.debug(_("Get stats of Pool: %s"), pool_id) + return res + + def create_pool_health_monitor(self, context, health_monitor, pool_id): + m = super(LoadBalancerPlugin, self).create_pool_health_monitor( + context, health_monitor, pool_id) + LOG.debug(_("Create health_monitor of pool: %s"), pool_id) + return m + + def get_pool_health_monitor(self, context, id, pool_id, fields=None): + m = super(LoadBalancerPlugin, self).get_pool_health_monitor( + context, id, pool_id, fields) + LOG.debug(_("Get health_monitor of pool: %s"), pool_id) + return m + + def delete_pool_health_monitor(self, context, id, pool_id): + super(LoadBalancerPlugin, self).delete_pool_health_monitor( + context, id, pool_id) + LOG.debug(_("Delete health_monitor %(id)s of pool: %(pool_id)s"), + {"id": id, "pool_id": pool_id}) + + def get_member(self, context, id, fields=None): + res = super(LoadBalancerPlugin, self).get_member( + context, id, fields) + LOG.debug(_("Get member: %s"), id) + return res + + def get_members(self, context, filters=None, fields=None): + res = super(LoadBalancerPlugin, self).get_members( + context, filters, fields) + LOG.debug(_("Get members")) + return res + + def create_member(self, context, member): + m = super(LoadBalancerPlugin, self).create_member(context, member) + self.update_status(context, loadbalancer_db.Member, m['id'], + constants.PENDING_CREATE) + LOG.debug(_("Create member: %s"), m['id']) + # TODO notify lbagent + m_rt = self.get_member(context, m['id']) + return m_rt + + def update_member(self, context, id, member): + m_query = self.get_member(context, id, fields=["status"]) + if m_query['status'] in [ + constants.PENDING_DELETE, constants.ERROR]: + raise loadbalancer.StateInvalid(id=id, + state=m_query['status']) + m = super(LoadBalancerPlugin, self).update_member(context, id, member) + self.update_status(context, loadbalancer_db.Member, id, + constants.PENDING_UPDATE) + LOG.debug(_("Update member: %s"), m['id']) + # TODO notify lbagent + m_rt = self.get_member(context, id) + return m_rt + + def delete_member(self, context, id): + self.update_status(context, loadbalancer_db.Member, id, + constants.PENDING_DELETE) + LOG.debug(_("Delete member: %s"), id) + # TODO notify lbagent + super(LoadBalancerPlugin, self).delete_member(context, id) + + def get_health_monitor(self, context, id, fields=None): + res = super(LoadBalancerPlugin, self).get_health_monitor( + context, id, fields) + LOG.debug(_("Get health_monitor: %s"), id) + return res + + def get_health_monitors(self, context, filters=None, fields=None): + res = super(LoadBalancerPlugin, self).get_health_monitors( + context, filters, fields) + LOG.debug(_("Get health_monitors")) + return res + + def create_health_monitor(self, context, health_monitor): + h = super(LoadBalancerPlugin, self).create_health_monitor( + context, health_monitor) + self.update_status(context, loadbalancer_db.HealthMonitor, h['id'], + constants.PENDING_CREATE) + LOG.debug(_("Create health_monitor: %s"), h['id']) + # TODO notify lbagent + h_rt = self.get_health_monitor(context, h['id']) + return h_rt + + def update_health_monitor(self, context, id, health_monitor): + h_query = self.get_health_monitor(context, id, fields=["status"]) + if h_query['status'] in [ + constants.PENDING_DELETE, constants.ERROR]: + raise loadbalancer.StateInvalid(id=id, + state=h_query['status']) + h = super(LoadBalancerPlugin, self).update_health_monitor( + context, id, health_monitor) + self.update_status(context, loadbalancer_db.HealthMonitor, id, + constants.PENDING_UPDATE) + LOG.debug(_("Update health_monitor: %s"), h['id']) + # TODO notify lbagent + h_rt = self.get_health_monitor(context, id) + return h_rt + + def delete_health_monitor(self, context, id): + self.update_status(context, loadbalancer_db.HealthMonitor, id, + constants.PENDING_DELETE) + LOG.debug(_("Delete health_monitor: %s"), id) + super(LoadBalancerPlugin, self).delete_health_monitor(context, id) diff --git a/quantum/tests/unit/db/__init__.py b/quantum/tests/unit/db/__init__.py new file mode 100644 index 000000000..009a3d7b7 --- /dev/null +++ b/quantum/tests/unit/db/__init__.py @@ -0,0 +1,15 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/quantum/tests/unit/db/loadbalancer/__init__.py b/quantum/tests/unit/db/loadbalancer/__init__.py new file mode 100644 index 000000000..009a3d7b7 --- /dev/null +++ b/quantum/tests/unit/db/loadbalancer/__init__.py @@ -0,0 +1,15 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/quantum/tests/unit/db/loadbalancer/test_db_loadbalancer.py b/quantum/tests/unit/db/loadbalancer/test_db_loadbalancer.py new file mode 100644 index 000000000..6938725aa --- /dev/null +++ b/quantum/tests/unit/db/loadbalancer/test_db_loadbalancer.py @@ -0,0 +1,868 @@ +# Copyright (c) 2012 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import logging +import os +import unittest2 +import webob.exc + +import quantum +from quantum import context +from quantum.api.extensions import PluginAwareExtensionManager +from quantum.api.extensions import ExtensionMiddleware +from quantum.api.v2 import attributes +from quantum.api.v2.attributes import ATTR_NOT_SPECIFIED +from quantum.api.v2.router import APIRouter +from quantum.common import config +from quantum.common import exceptions as q_exc +from quantum.common.test_lib import test_config +from quantum.db import api as db +from quantum.db import db_base_plugin_v2 +from quantum.db import models_v2 +from quantum.extensions import loadbalancer +from quantum.manager import QuantumManager +from quantum.openstack.common import cfg +from quantum.openstack.common import timeutils +from quantum.plugins.common import constants +from quantum.plugins.services.loadbalancer import loadbalancerPlugin +from quantum.tests.unit import test_extensions +from quantum.tests.unit.testlib_api import create_request +from quantum.wsgi import Serializer, JSONDeserializer + + +LOG = logging.getLogger(__name__) + +DB_CORE_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2' +DB_LB_PLUGIN_KLASS = ( + "quantum.plugins.services.loadbalancer." + "loadbalancerPlugin.LoadBalancerPlugin" +) +ROOTDIR = os.path.dirname(__file__) + '../../../..' +ETCDIR = os.path.join(ROOTDIR, 'etc') + +extensions_path = ':'.join(quantum.extensions.__path__) + + +def etcdir(*p): + return os.path.join(ETCDIR, *p) + + +class LoadBalancerPluginDbTestCase(unittest2.TestCase): + + def setUp(self, core_plugin=None, lb_plugin=None): + super(LoadBalancerPluginDbTestCase, self).setUp() + + db._ENGINE = None + db._MAKER = None + + QuantumManager._instance = None + PluginAwareExtensionManager._instance = None + self._attribute_map_bk = {} + self._attribute_map_bk = loadbalancer.RESOURCE_ATTRIBUTE_MAP.copy() + self._tenant_id = "test-tenant" + self._subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14" + + json_deserializer = JSONDeserializer() + self._deserializers = { + 'application/json': json_deserializer, + } + + if not core_plugin: + core_plugin = test_config.get('plugin_name_v2', + DB_CORE_PLUGIN_KLASS) + if not lb_plugin: + lb_plugin = test_config.get('lb_plugin_name', DB_LB_PLUGIN_KLASS) + + # point config file to: quantum/tests/etc/quantum.conf.test + args = ['--config-file', etcdir('quantum.conf.test')] + config.parse(args=args) + # Update the plugin + service_plugins = [lb_plugin] + cfg.CONF.set_override('core_plugin', core_plugin) + cfg.CONF.set_override('service_plugins', service_plugins) + cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab") + self.api = APIRouter() + + plugin = loadbalancerPlugin.LoadBalancerPlugin() + ext_mgr = PluginAwareExtensionManager( + extensions_path, + {constants.LOADBALANCER: plugin} + ) + app = config.load_paste_app('extensions_test_app') + self.ext_api = ExtensionMiddleware(app, ext_mgr=ext_mgr) + + def tearDown(self): + super(LoadBalancerPluginDbTestCase, self).tearDown() + self.api = None + self._deserializers = None + self._skip_native_bulk = None + self.ext_api = None + + db.clear_db() + db._ENGINE = None + db._MAKER = None + cfg.CONF.reset() + # Restore the original attribute map + loadbalancer.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk + + def _req(self, method, resource, data=None, fmt='json', + id=None, subresource=None, sub_id=None, params=None, action=None): + if id and action: + path = '/lb/%(resource)s/%(id)s/%(action)s.%(fmt)s' % locals() + elif id and subresource and sub_id: + path = ( + '/lb/%(resource)s/%(id)s/%(subresource)s/' + '%(sub_id)s.%(fmt)s') % locals() + elif id and subresource: + path = ( + '/lb/%(resource)s/%(id)s/' + '%(subresource)s.%(fmt)s') % locals() + elif id: + path = '/lb/%(resource)s/%(id)s.%(fmt)s' % locals() + else: + path = '/lb/%(resource)s.%(fmt)s' % locals() + + content_type = 'application/%s' % fmt + body = None + if data is not None: # empty dict is valid + body = Serializer().serialize(data, content_type) + + req = create_request(path, + body, + content_type, + method, + query_string=params) + return req + + def new_create_request(self, resource, data, fmt='json', id=None, + subresource=None): + return self._req('POST', resource, data, fmt, id=id, + subresource=subresource) + + def new_list_request(self, resource, fmt='json', params=None): + return self._req('GET', resource, None, fmt, params=params) + + def new_show_request(self, resource, id, fmt='json', action=None, + subresource=None, sub_id=None): + return self._req('GET', resource, None, fmt, id=id, action=action, + subresource=subresource, sub_id=sub_id) + + def new_delete_request(self, resource, id, fmt='json', + subresource=None, sub_id=None): + return self._req('DELETE', resource, None, fmt, id=id, + subresource=subresource, sub_id=sub_id) + + def new_update_request(self, resource, data, id, fmt='json'): + return self._req('PUT', resource, data, fmt, id=id) + + def deserialize(self, content_type, response): + ctype = 'application/%s' % content_type + data = self._deserializers[ctype].deserialize(response.body)['body'] + return data + + def _create_vip(self, fmt, name, pool_id, protocol, port, admin_status_up, + expected_res_status=None, **kwargs): + data = {'vip': {'name': name, + 'subnet_id': self._subnet_id, + 'pool_id': pool_id, + 'protocol': protocol, + 'port': port, + 'admin_state_up': admin_status_up, + 'tenant_id': self._tenant_id}} + for arg in ('description', 'address', + 'session_persistence', 'connection_limit'): + if arg in kwargs and kwargs[arg] is not None: + data['vip'][arg] = kwargs[arg] + + vip_req = self.new_create_request('vips', data, fmt) + vip_res = vip_req.get_response(self.ext_api) + if expected_res_status: + self.assertEqual(vip_res.status_int, expected_res_status) + + return vip_res + + def _create_pool(self, fmt, name, lb_method, protocol, admin_status_up, + expected_res_status=None, **kwargs): + data = {'pool': {'name': name, + 'subnet_id': self._subnet_id, + 'lb_method': lb_method, + 'protocol': protocol, + 'admin_state_up': admin_status_up, + 'tenant_id': self._tenant_id}} + for arg in ('description'): + if arg in kwargs and kwargs[arg] is not None: + data['pool'][arg] = kwargs[arg] + + pool_req = self.new_create_request('pools', data, fmt) + pool_res = pool_req.get_response(self.ext_api) + if expected_res_status: + self.assertEqual(pool_res.status_int, expected_res_status) + + return pool_res + + def _create_member(self, fmt, address, port, admin_status_up, + expected_res_status=None, **kwargs): + data = {'member': {'address': address, + 'port': port, + 'admin_state_up': admin_status_up, + 'tenant_id': self._tenant_id}} + for arg in ('weight', 'pool_id'): + if arg in kwargs and kwargs[arg] is not None: + data['member'][arg] = kwargs[arg] + + member_req = self.new_create_request('members', data, fmt) + member_res = member_req.get_response(self.ext_api) + if expected_res_status: + self.assertEqual(member_res.status_int, expected_res_status) + + return member_res + + def _create_health_monitor(self, fmt, type, delay, timeout, max_retries, + admin_status_up, expected_res_status=None, + **kwargs): + data = {'health_monitor': {'type': type, + 'delay': delay, + 'timeout': timeout, + 'max_retries': max_retries, + 'admin_status_up': admin_status_up, + 'tenant_id': self._tenant_id}} + for arg in ('http_method', 'path', 'expected_code'): + if arg in kwargs and kwargs[arg] is not None: + data['health_monitor'][arg] = kwargs[arg] + + req = self.new_create_request('health_monitors', data, fmt) + + res = req.get_response(self.ext_api) + if expected_res_status: + self.assertEqual(res.status_int, expected_res_status) + + return res + + def _api_for_resource(self, resource): + if resource in ['networks', 'subnets', 'ports']: + return self.api + else: + return self.ext_api + + def _delete(self, collection, id, + expected_code=webob.exc.HTTPNoContent.code): + req = self.new_delete_request(collection, id) + res = req.get_response(self._api_for_resource(collection)) + self.assertEqual(res.status_int, expected_code) + + def _show(self, resource, id, expected_code=webob.exc.HTTPOk.code): + req = self.new_show_request(resource, id) + res = req.get_response(self._api_for_resource(resource)) + self.assertEqual(res.status_int, expected_code) + return self.deserialize('json', res) + + def _update(self, resource, id, new_data, + expected_code=webob.exc.HTTPOk.code): + req = self.new_update_request(resource, new_data, id) + res = req.get_response(self._api_for_resource(resource)) + self.assertEqual(res.status_int, expected_code) + return self.deserialize('json', res) + + def _list(self, resource, fmt='json', query_params=None): + req = self.new_list_request(resource, fmt, query_params) + res = req.get_response(self._api_for_resource(resource)) + self.assertEqual(res.status_int, webob.exc.HTTPOk.code) + return self.deserialize('json', res) + + @contextlib.contextmanager + def vip(self, fmt='json', name='vip1', pool=None, + protocol='HTTP', port=80, admin_status_up=True, no_delete=False, + **kwargs): + if not pool: + with self.pool() as pool: + pool_id = pool['pool']['id'] + res = self._create_vip(fmt, + name, + pool_id, + protocol, + port, + admin_status_up, + address="172.16.1.123", + **kwargs) + vip = self.deserialize(fmt, res) + if res.status_int >= 400: + raise webob.exc.HTTPClientError(code=res.status_int) + yield vip + if not no_delete: + self._delete('vips', vip['vip']['id']) + else: + pool_id = pool['pool']['id'] + res = self._create_vip(fmt, + name, + pool_id, + protocol, + port, + admin_status_up, + address="172.16.1.123", + **kwargs) + vip = self.deserialize(fmt, res) + if res.status_int >= 400: + raise webob.exc.HTTPClientError(code=res.status_int) + yield vip + if not no_delete: + self._delete('vips', vip['vip']['id']) + + @contextlib.contextmanager + def pool(self, fmt='json', name='pool1', lb_method='ROUND_ROBIN', + protocol='HTTP', admin_status_up=True, no_delete=False, + **kwargs): + res = self._create_pool(fmt, + name, + lb_method, + protocol, + admin_status_up, + **kwargs) + pool = self.deserialize(fmt, res) + if res.status_int >= 400: + raise webob.exc.HTTPClientError(code=res.status_int) + yield pool + if not no_delete: + self._delete('pools', pool['pool']['id']) + + @contextlib.contextmanager + def member(self, fmt='json', address='192.168.1.100', + port=80, admin_status_up=True, no_delete=False, + **kwargs): + res = self._create_member(fmt, + address, + port, + admin_status_up, + **kwargs) + member = self.deserialize(fmt, res) + if res.status_int >= 400: + raise webob.exc.HTTPClientError(code=res.status_int) + yield member + if not no_delete: + self._delete('members', member['member']['id']) + + @contextlib.contextmanager + def health_monitor(self, fmt='json', type='TCP', + delay=30, timeout=10, max_retries=3, + admin_status_up=True, + no_delete=False, **kwargs): + res = self._create_health_monitor(fmt, + type, + delay, + timeout, + max_retries, + admin_status_up, + **kwargs) + health_monitor = self.deserialize(fmt, res) + if res.status_int >= 400: + raise webob.exc.HTTPClientError(code=res.status_int) + yield health_monitor + if not no_delete: + self._delete('health_monitors', + health_monitor['health_monitor']['id']) + + +class TestLoadBalancer(LoadBalancerPluginDbTestCase): + def test_create_vip(self): + name = 'vip1' + keys = [('name', name), + ('subnet_id', self._subnet_id), + ('address', "172.16.1.123"), + ('port', 80), + ('protocol', 'HTTP'), + ('connection_limit', -1), + ('admin_state_up', True), + ('status', 'PENDING_CREATE')] + + with self.vip(name=name) as vip: + for k, v in keys: + self.assertEqual(vip['vip'][k], v) + + def test_create_vip_with_session_persistence(self): + name = 'vip2' + keys = [('name', name), + ('subnet_id', self._subnet_id), + ('address', "172.16.1.123"), + ('port', 80), + ('protocol', 'HTTP'), + ('session_persistence', {'type': "HTTP_COOKIE", + 'cookie_name': "jessionId"}), + ('connection_limit', -1), + ('admin_state_up', True), + ('status', 'PENDING_CREATE')] + + with self.vip(name=name, + session_persistence={'type': "HTTP_COOKIE", + 'cookie_name': "jessionId"}) as vip: + for k, v in keys: + self.assertEqual(vip['vip'][k], v) + + def test_update_vip(self): + name = 'new_vip' + keys = [('name', name), + ('subnet_id', self._subnet_id), + ('address', "172.16.1.123"), + ('port', 80), + ('connection_limit', 100), + ('admin_state_up', False), + ('status', 'PENDING_UPDATE')] + + with self.vip(name=name) as vip: + data = {'vip': {'name': name, + 'connection_limit': 100, + 'session_persistence': + {'type': "HTTP_COOKIE", + 'cookie_name': "jesssionId"}, + 'admin_state_up': False}} + req = self.new_update_request('vips', data, vip['vip']['id']) + res = self.deserialize('json', req.get_response(self.ext_api)) + for k, v in keys: + self.assertEqual(res['vip'][k], v) + + def test_delete_vip(self): + with self.pool() as pool: + with self.vip(no_delete=True) as vip: + req = self.new_delete_request('vips', + vip['vip']['id']) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 204) + + def test_show_vip(self): + name = "vip_show" + keys = [('name', name), + ('subnet_id', self._subnet_id), + ('address', "172.16.1.123"), + ('port', 80), + ('protocol', 'HTTP'), + ('connection_limit', -1), + ('admin_state_up', True), + ('status', 'PENDING_CREATE')] + with self.vip(name=name) as vip: + req = self.new_show_request('vips', + vip['vip']['id']) + res = self.deserialize('json', req.get_response(self.ext_api)) + for k, v in keys: + self.assertEqual(res['vip'][k], v) + + def test_create_pool(self): + name = "pool1" + keys = [('name', name), + ('subnet_id', self._subnet_id), + ('tenant_id', self._tenant_id), + ('protocol', 'HTTP'), + ('lb_method', 'ROUND_ROBIN'), + ('admin_state_up', True), + ('status', 'PENDING_CREATE')] + with self.pool(name=name) as pool: + for k, v in keys: + self.assertEqual(pool['pool'][k], v) + + def test_create_pool_with_members(self): + name = "pool2" + with self.pool(name=name) as pool: + pool_id = pool['pool']['id'] + res1 = self._create_member('json', + '192.168.1.100', + '80', + True, + pool_id=pool_id, + weight=1) + req = self.new_show_request('pools', + pool_id, + fmt='json') + pool_updated = self.deserialize('json', + req.get_response(self.ext_api)) + + member1 = self.deserialize('json', res1) + self.assertEqual(member1['member']['id'], + pool_updated['pool']['members'][0]) + self.assertEqual(len(pool_updated['pool']['members']), 1) + + keys = [('address', '192.168.1.100'), + ('port', 80), + ('weight', 1), + ('pool_id', pool_id), + ('admin_state_up', True), + ('status', 'PENDING_CREATE')] + for k, v in keys: + self.assertEqual(member1['member'][k], v) + self._delete('members', member1['member']['id']) + + def test_delete_pool(self): + with self.pool(no_delete=True) as pool: + with self.member(no_delete=True, + pool_id=pool['pool']['id']) as member: + req = self.new_delete_request('pools', + pool['pool']['id']) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 204) + + def test_show_pool(self): + name = "pool1" + keys = [('name', name), + ('subnet_id', self._subnet_id), + ('tenant_id', self._tenant_id), + ('protocol', 'HTTP'), + ('lb_method', 'ROUND_ROBIN'), + ('admin_state_up', True), + ('status', 'PENDING_CREATE')] + with self.pool(name=name) as pool: + req = self.new_show_request('pools', + pool['pool']['id'], + fmt='json') + res = self.deserialize('json', req.get_response(self.ext_api)) + for k, v in keys: + self.assertEqual(res['pool'][k], v) + + def test_create_member(self): + with self.pool() as pool: + pool_id = pool['pool']['id'] + with self.member(address='192.168.1.100', + port=80, + pool_id=pool_id) as member1: + with self.member(address='192.168.1.101', + port=80, + pool_id=pool_id) as member2: + req = self.new_show_request('pools', + pool_id, + fmt='json') + pool_update = self.deserialize( + 'json', + req.get_response(self.ext_api)) + self.assertIn(member1['member']['id'], + pool_update['pool']['members']) + self.assertIn(member2['member']['id'], + pool_update['pool']['members']) + + def test_update_member(self): + with self.pool(name="pool1") as pool1: + with self.pool(name="pool2") as pool2: + keys = [('address', "192.168.1.100"), + ('tenant_id', self._tenant_id), + ('port', 80), + ('weight', 10), + ('pool_id', pool2['pool']['id']), + ('admin_state_up', False), + ('status', 'PENDING_UPDATE')] + with self.member(pool_id=pool1['pool']['id']) as member: + req = self.new_show_request('pools', + pool1['pool']['id'], + fmt='json') + pool1_update = self.deserialize( + 'json', + req.get_response(self.ext_api)) + self.assertEqual(len(pool1_update['pool']['members']), 1) + + req = self.new_show_request('pools', + pool2['pool']['id'], + fmt='json') + pool2_update = self.deserialize( + 'json', + req.get_response(self.ext_api)) + self.assertEqual(len(pool1_update['pool']['members']), 1) + self.assertEqual(len(pool2_update['pool']['members']), 0) + + data = {'member': {'pool_id': pool2['pool']['id'], + 'weight': 10, + 'admin_state_up': False}} + req = self.new_update_request('members', + data, + member['member']['id']) + res = self.deserialize('json', + req.get_response(self.ext_api)) + for k, v in keys: + self.assertEqual(res['member'][k], v) + + req = self.new_show_request('pools', + pool1['pool']['id'], + fmt='json') + pool1_update = self.deserialize( + 'json', + req.get_response(self.ext_api)) + + req = self.new_show_request('pools', + pool2['pool']['id'], + fmt='json') + pool2_update = self.deserialize( + 'json', + req.get_response(self.ext_api)) + + self.assertEqual(len(pool2_update['pool']['members']), 1) + self.assertEqual(len(pool1_update['pool']['members']), 0) + + def test_delete_member(self): + with self.pool() as pool: + pool_id = pool['pool']['id'] + with self.member(pool_id=pool_id, + no_delete=True) as member: + req = self.new_delete_request('members', + member['member']['id']) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 204) + + req = self.new_show_request('pools', + pool_id, + fmt='json') + pool_update = self.deserialize( + 'json', + req.get_response(self.ext_api)) + self.assertEqual(len(pool_update['pool']['members']), 0) + + def test_show_member(self): + with self.pool() as pool: + keys = [('address', "192.168.1.100"), + ('tenant_id', self._tenant_id), + ('port', 80), + ('weight', 1), + ('pool_id', pool['pool']['id']), + ('admin_state_up', True), + ('status', 'PENDING_CREATE')] + with self.member(pool_id=pool['pool']['id']) as member: + req = self.new_show_request('members', + member['member']['id'], + fmt='json') + res = self.deserialize('json', req.get_response(self.ext_api)) + for k, v in keys: + self.assertEqual(res['member'][k], v) + + def test_create_healthmonitor(self): + keys = [('type', "TCP"), + ('tenant_id', self._tenant_id), + ('delay', 30), + ('timeout', 10), + ('max_retries', 3), + ('admin_state_up', True), + ('status', 'PENDING_CREATE')] + with self.health_monitor() as monitor: + for k, v in keys: + self.assertEqual(monitor['health_monitor'][k], v) + + def test_update_healthmonitor(self): + keys = [('type', "TCP"), + ('tenant_id', self._tenant_id), + ('delay', 20), + ('timeout', 20), + ('max_retries', 2), + ('admin_state_up', False), + ('status', 'PENDING_UPDATE')] + with self.health_monitor() as monitor: + data = {'health_monitor': {'delay': 20, + 'timeout': 20, + 'max_retries': 2, + 'admin_state_up': False}} + req = self.new_update_request("health_monitors", + data, + monitor['health_monitor']['id']) + res = self.deserialize('json', req.get_response(self.ext_api)) + for k, v in keys: + self.assertEqual(res['health_monitor'][k], v) + + def test_delete_healthmonitor(self): + with self.health_monitor(no_delete=True) as monitor: + req = self.new_delete_request('health_monitors', + monitor['health_monitor']['id']) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 204) + + def test_show_healthmonitor(self): + with self.health_monitor() as monitor: + keys = [('type', "TCP"), + ('tenant_id', self._tenant_id), + ('delay', 30), + ('timeout', 10), + ('max_retries', 3), + ('admin_state_up', True), + ('status', 'PENDING_CREATE')] + req = self.new_show_request('health_monitors', + monitor['health_monitor']['id'], + fmt='json') + res = self.deserialize('json', req.get_response(self.ext_api)) + for k, v in keys: + self.assertEqual(res['health_monitor'][k], v) + + def test_get_pool_stats(self): + keys = [("bytes_in", 0), + ("bytes_out", 0), + ("active_connections", 0), + ("total_connections", 0)] + with self.pool() as pool: + req = self.new_show_request("pools", + pool['pool']['id'], + subresource="stats", + fmt='json') + res = self.deserialize('json', req.get_response(self.ext_api)) + for k, v in keys: + self.assertEqual(res['stats'][k], v) + + def test_create_healthmonitor_of_pool(self): + with self.health_monitor(type="TCP") as monitor1: + with self.health_monitor(type="HTTP") as monitor2: + with self.pool() as pool: + data = {"health_monitor": { + "id": monitor1['health_monitor']['id'], + 'tenant_id': self._tenant_id}} + req = self.new_create_request( + "pools", + data, + fmt='json', + id=pool['pool']['id'], + subresource="health_monitors") + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 201) + + data = {"health_monitor": { + "id": monitor2['health_monitor']['id'], + 'tenant_id': self._tenant_id}} + req = self.new_create_request( + "pools", + data, + fmt='json', + id=pool['pool']['id'], + subresource="health_monitors") + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 201) + + req = self.new_show_request( + 'pools', + pool['pool']['id'], + fmt='json') + res = self.deserialize('json', + req.get_response(self.ext_api)) + self.assertIn(monitor1['health_monitor']['id'], + res['pool']['health_monitors']) + self.assertIn(monitor2['health_monitor']['id'], + res['pool']['health_monitors']) + + def test_delete_healthmonitor_of_pool(self): + with self.health_monitor(type="TCP") as monitor1: + with self.health_monitor(type="HTTP") as monitor2: + with self.pool() as pool: + # add the monitors to the pool + data = {"health_monitor": { + "id": monitor1['health_monitor']['id'], + 'tenant_id': self._tenant_id}} + req = self.new_create_request( + "pools", + data, + fmt='json', + id=pool['pool']['id'], + subresource="health_monitors") + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 201) + + data = {"health_monitor": { + "id": monitor2['health_monitor']['id'], + 'tenant_id': self._tenant_id}} + req = self.new_create_request( + "pools", + data, + fmt='json', + id=pool['pool']['id'], + subresource="health_monitors") + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 201) + + # remove one of healthmonitor from the pool + req = self.new_delete_request( + "pools", + fmt='json', + id=pool['pool']['id'], + sub_id=monitor1['health_monitor']['id'], + subresource="health_monitors") + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 204) + + req = self.new_show_request( + 'pools', + pool['pool']['id'], + fmt='json') + res = self.deserialize('json', + req.get_response(self.ext_api)) + self.assertNotIn(monitor1['health_monitor']['id'], + res['pool']['health_monitors']) + self.assertIn(monitor2['health_monitor']['id'], + res['pool']['health_monitors']) + + def test_create_loadbalancer(self): + vip_name = "vip3" + pool_name = "pool3" + + with self.pool(name=pool_name) as pool: + with self.vip(name=vip_name, pool=pool) as vip: + pool_id = pool['pool']['id'] + vip_id = vip['vip']['id'] + # Add two members + res1 = self._create_member('json', + '192.168.1.100', + '80', + True, + pool_id=pool_id, + weight=1) + res2 = self._create_member('json', + '192.168.1.101', + '80', + True, + pool_id=pool_id, + weight=2) + # Add a health_monitor + req = self._create_health_monitor('json', + 'HTTP', + '10', + '10', + '3', + True) + health_monitor = self.deserialize('json', req) + self.assertEqual(req.status_int, 201) + + # Associate the health_monitor to the pool + data = {"health_monitor": { + "id": health_monitor['health_monitor']['id'], + 'tenant_id': self._tenant_id}} + req = self.new_create_request("pools", + data, + fmt='json', + id=pool['pool']['id'], + subresource="health_monitors") + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 201) + + # Get pool and vip + req = self.new_show_request('pools', + pool_id, + fmt='json') + pool_updated = self.deserialize('json', + req.get_response(self.ext_api)) + member1 = self.deserialize('json', res1) + member2 = self.deserialize('json', res2) + self.assertIn(member1['member']['id'], + pool_updated['pool']['members']) + self.assertIn(member2['member']['id'], + pool_updated['pool']['members']) + self.assertIn(health_monitor['health_monitor']['id'], + pool_updated['pool']['health_monitors']) + + req = self.new_show_request('vips', + vip_id, + fmt='json') + vip_updated = self.deserialize('json', + req.get_response(self.ext_api)) + self.assertEqual(vip_updated['vip']['pool_id'], + pool_updated['pool']['id']) + + # clean up + self._delete('health_monitors', + health_monitor['health_monitor']['id']) + self._delete('members', member1['member']['id']) + self._delete('members', member2['member']['id']) -- 2.45.2