--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+#
+# Copyright 2013 OpenStack LLC. All rights reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import sqlalchemy as sa
+from sqlalchemy import orm
+from sqlalchemy.orm import exc
+from sqlalchemy.sql import expression as expr
+import webob.exc as w_exc
+
+from quantum import policy
+from quantum.api.v2 import attributes
+from quantum.common import exceptions as q_exc
+from quantum.db import db_base_plugin_v2
+from quantum.db import model_base
+from quantum.db import models_v2
+from quantum.extensions import loadbalancer
+from quantum.extensions.loadbalancer import LoadBalancerPluginBase
+from quantum.openstack.common import cfg
+from quantum.openstack.common import log as logging
+from quantum.openstack.common import uuidutils
+from quantum.plugins.common import constants
+
+
+LOG = logging.getLogger(__name__)
+
+
+class SessionPersistence(model_base.BASEV2):
+ vip_id = sa.Column(sa.String(36),
+ sa.ForeignKey("vips.id"),
+ primary_key=True)
+ type = sa.Column(sa.Enum("SOURCE_IP",
+ "HTTP_COOKIE",
+ "APP_COOKIE",
+ name="type"),
+ nullable=False)
+ cookie_name = sa.Column(sa.String(1024))
+
+
+class PoolStatistics(model_base.BASEV2):
+ """Represents pool statistics """
+ pool_id = sa.Column(sa.String(36), sa.ForeignKey("pools.id"),
+ primary_key=True)
+ bytes_in = sa.Column(sa.Integer, nullable=False)
+ bytes_out = sa.Column(sa.Integer, nullable=False)
+ active_connections = sa.Column(sa.Integer, nullable=False)
+ total_connections = sa.Column(sa.Integer, nullable=False)
+
+
+class Vip(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
+ """Represents a v2 quantum loadbalancer vip."""
+ name = sa.Column(sa.String(255))
+ description = sa.Column(sa.String(255))
+ subnet_id = sa.Column(sa.String(36), nullable=False)
+ address = sa.Column(sa.String(64))
+ port = sa.Column(sa.Integer, nullable=False)
+ protocol = sa.Column(sa.Enum("HTTP", "HTTPS", name="protocol"),
+ nullable=False)
+ pool_id = sa.Column(sa.String(36), nullable=False)
+ session_persistence = orm.relationship(SessionPersistence,
+ uselist=False,
+ backref="vips",
+ cascade="all, delete-orphan")
+ status = sa.Column(sa.String(16), nullable=False)
+ admin_state_up = sa.Column(sa.Boolean(), nullable=False)
+ connection_limit = sa.Column(sa.Integer)
+
+
+class Member(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
+ """Represents a v2 quantum loadbalancer member."""
+ pool_id = sa.Column(sa.String(36), sa.ForeignKey("pools.id"),
+ nullable=False)
+ address = sa.Column(sa.String(64), nullable=False)
+ port = sa.Column(sa.Integer, nullable=False)
+ weight = sa.Column(sa.Integer, nullable=False)
+ status = sa.Column(sa.String(16), nullable=False)
+ admin_state_up = sa.Column(sa.Boolean(), nullable=False)
+
+
+class Pool(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
+ """Represents a v2 quantum loadbalancer pool."""
+ vip_id = sa.Column(sa.String(36), sa.ForeignKey("vips.id"))
+ name = sa.Column(sa.String(255))
+ description = sa.Column(sa.String(255))
+ subnet_id = sa.Column(sa.String(36), nullable=False)
+ protocol = sa.Column(sa.String(64), nullable=False)
+ lb_method = sa.Column(sa.Enum("ROUND_ROBIN",
+ "LEAST_CONNECTIONS",
+ "SOURCE_IP"),
+ nullable=False)
+ status = sa.Column(sa.String(16), nullable=False)
+ admin_state_up = sa.Column(sa.Boolean(), nullable=False)
+ stats = orm.relationship(PoolStatistics,
+ uselist=False,
+ backref="pools",
+ cascade="all, delete-orphan")
+ members = orm.relationship(Member, backref="pools",
+ cascade="all, delete-orphan")
+ monitors = orm.relationship("PoolMonitorAssociation", backref="pools",
+ cascade="all, delete-orphan")
+
+
+class HealthMonitor(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
+ """Represents a v2 quantum loadbalancer healthmonitor."""
+ type = sa.Column(sa.Enum("PING", "TCP", "HTTP", "HTTPS", name="type"),
+ nullable=False)
+ delay = sa.Column(sa.Integer, nullable=False)
+ timeout = sa.Column(sa.Integer, nullable=False)
+ max_retries = sa.Column(sa.Integer, nullable=False)
+ http_method = sa.Column(sa.String(16))
+ url_path = sa.Column(sa.String(255))
+ expected_codes = sa.Column(sa.String(64))
+ status = sa.Column(sa.String(16), nullable=False)
+ admin_state_up = sa.Column(sa.Boolean(), nullable=False)
+
+
+class PoolMonitorAssociation(model_base.BASEV2):
+ """
+ Represents the many-to-many association between pool and
+ healthMonitor classes
+ """
+ pool_id = sa.Column(sa.String(36),
+ sa.ForeignKey("pools.id"),
+ primary_key=True)
+ monitor_id = sa.Column(sa.String(36),
+ sa.ForeignKey("healthmonitors.id"),
+ primary_key=True)
+ monitor = orm.relationship("HealthMonitor",
+ backref="pools_poolmonitorassociations")
+
+
+class LoadBalancerPluginDb(LoadBalancerPluginBase):
+ """
+ A class that wraps the implementation of the Quantum
+ loadbalancer plugin database access interface using SQLAlchemy models.
+ """
+
+ # TODO(lcui):
+ # A set of internal facility methods are borrowed from QuantumDbPluginV2
+ # class and hence this is duplicate. We need to pull out those methods
+ # into a seperate class which can be used by both QuantumDbPluginV2 and
+ # this class (and others).
+ def _get_tenant_id_for_create(self, context, resource):
+ if context.is_admin and 'tenant_id' in resource:
+ tenant_id = resource['tenant_id']
+ elif ('tenant_id' in resource and
+ resource['tenant_id'] != context.tenant_id):
+ reason = _('Cannot create resource for another tenant')
+ raise q_exc.AdminRequired(reason=reason)
+ else:
+ tenant_id = context.tenant_id
+ return tenant_id
+
+ def _fields(self, resource, fields):
+ if fields:
+ return dict((key, item) for key, item in resource.iteritems()
+ if key in fields)
+ return resource
+
+ def _apply_filters_to_query(self, query, model, filters):
+ if filters:
+ for key, value in filters.iteritems():
+ column = getattr(model, key, None)
+ if column:
+ query = query.filter(column.in_(value))
+ return query
+
+ def _get_collection_query(self, context, model, filters=None):
+ collection = self._model_query(context, model)
+ collection = self._apply_filters_to_query(collection, model, filters)
+ return collection
+
+ def _get_collection(self, context, model, dict_func, filters=None,
+ fields=None):
+ query = self._get_collection_query(context, model, filters)
+ return [dict_func(c, fields) for c in query.all()]
+
+ def _get_collection_count(self, context, model, filters=None):
+ return self._get_collection_query(context, model, filters).count()
+
+ def _model_query(self, context, model):
+ query = context.session.query(model)
+ query_filter = None
+ if not context.is_admin and hasattr(model, 'tenant_id'):
+ if hasattr(model, 'shared'):
+ query_filter = ((model.tenant_id == context.tenant_id) |
+ (model.shared))
+ else:
+ query_filter = (model.tenant_id == context.tenant_id)
+
+ if query_filter is not None:
+ query = query.filter(query_filter)
+ return query
+
+ def _get_by_id(self, context, model, id):
+ query = self._model_query(context, model)
+ return query.filter(model.id == id).one()
+
+ def update_status(self, context, model, id, status):
+ with context.session.begin(subtransactions=True):
+ v_db = self._get_resource(context, model, id)
+ v_db.update({'status': status})
+
+ def _get_resource(self, context, model, id):
+ try:
+ r = self._get_by_id(context, model, id)
+ except exc.NoResultFound:
+ if issubclass(model, Vip):
+ raise loadbalancer.VipNotFound(vip_id=id)
+ elif issubclass(model, Pool):
+ raise loadbalancer.PoolNotFound(pool_id=id)
+ elif issubclass(model, Member):
+ raise loadbalancer.MemberNotFound(member_id=id)
+ elif issubclass(model, HealthMonitor):
+ raise loadbalancer.HealthMonitorNotFound(monitor_id=id)
+ else:
+ raise
+ return r
+
+ ########################################################
+ # VIP DB access
+ def _make_vip_dict(self, vip, fields=None):
+ res = {'id': vip['id'],
+ 'tenant_id': vip['tenant_id'],
+ 'name': vip['name'],
+ 'description': vip['description'],
+ 'subnet_id': vip['subnet_id'],
+ 'address': vip['address'],
+ 'port': vip['port'],
+ 'protocol': vip['protocol'],
+ 'pool_id': vip['pool_id'],
+ 'connection_limit': vip['connection_limit'],
+ 'admin_state_up': vip['admin_state_up'],
+ 'status': vip['status']}
+ if vip['session_persistence']:
+ res['session_persistence'] = {
+ 'type': vip['session_persistence']['type'],
+ 'cookie_name': vip['session_persistence']['cookie_name']
+ }
+ return self._fields(res, fields)
+
+ def _update_pool_vip_info(self, context, pool_id, vip_id):
+ pool_db = self._get_resource(context, Pool, pool_id)
+ with context.session.begin(subtransactions=True):
+ pool_db.update({'vip_id': vip_id})
+
+ def _update_vip_session_persistence_info(self, context, vip_id, info):
+ vip = self._get_resource(context, Vip, vip_id)
+
+ with context.session.begin(subtransactions=True):
+ # Update sessionPersistence table
+ sess_qry = context.session.query(SessionPersistence)
+ sesspersist_db = sess_qry.filter_by(vip_id=vip_id).first()
+ if sesspersist_db:
+ sesspersist_db.update(info)
+ else:
+ sesspersist_db = SessionPersistence(
+ type=info['type'],
+ cookie_name=info['cookie_name'],
+ vip_id=vip_id)
+ context.session.add(sesspersist_db)
+ # Update vip table
+ vip.session_persistence = sesspersist_db
+ context.session.add(vip)
+
+ def _delete_sessionpersistence(self, context, id):
+ with context.session.begin(subtransactions=True):
+ sess_qry = context.session.query(SessionPersistence)
+ sess_qry.filter_by(vip_id=id).delete()
+
+ def create_vip(self, context, vip):
+ v = vip['vip']
+ tenant_id = self._get_tenant_id_for_create(context, v)
+
+ with context.session.begin(subtransactions=True):
+ if v['address'] == attributes.ATTR_NOT_SPECIFIED:
+ address = None
+ else:
+ address = v['address']
+ vip_db = Vip(id=uuidutils.generate_uuid(),
+ tenant_id=tenant_id,
+ name=v['name'],
+ description=v['description'],
+ subnet_id=v['subnet_id'],
+ address=address,
+ port=v['port'],
+ protocol=v['protocol'],
+ pool_id=v['pool_id'],
+ connection_limit=v['connection_limit'],
+ admin_state_up=v['admin_state_up'],
+ status=constants.PENDING_CREATE)
+ vip_id = vip_db['id']
+
+ sessionInfo = v['session_persistence']
+ if sessionInfo:
+ has_session_persistence = True
+ sesspersist_db = SessionPersistence(
+ type=sessionInfo['type'],
+ cookie_name=sessionInfo['cookie_name'],
+ vip_id=vip_id)
+ vip_db.session_persistence = sesspersist_db
+
+ context.session.add(vip_db)
+ self._update_pool_vip_info(context, v['pool_id'], vip_id)
+
+ vip_db = self._get_resource(context, Vip, vip_id)
+ return self._make_vip_dict(vip_db)
+
+ def update_vip(self, context, id, vip):
+ v = vip['vip']
+
+ sesspersist_info = v.pop('session_persistence', None)
+ with context.session.begin(subtransactions=True):
+ if sesspersist_info:
+ self._update_vip_session_persistence_info(context,
+ id,
+ sesspersist_info)
+
+ vip_db = self._get_resource(context, Vip, id)
+ old_pool_id = vip_db['pool_id']
+ if v:
+ vip_db.update(v)
+ # If the pool_id is changed, we need to update
+ # the associated pools
+ if 'pool_id' in v:
+ self._update_pool_vip_info(context, old_pool_id, None)
+ self._update_pool_vip_info(context, v['pool_id'], id)
+
+ return self._make_vip_dict(vip_db)
+
+ def delete_vip(self, context, id):
+ with context.session.begin(subtransactions=True):
+ vip = self._get_resource(context, Vip, id)
+ qry = context.session.query(Pool)
+ for pool in qry.filter_by(vip_id=id).all():
+ pool.update({"vip_id": None})
+ context.session.delete(vip)
+
+ def get_vip(self, context, id, fields=None):
+ vip = self._get_resource(context, Vip, id)
+ return self._make_vip_dict(vip, fields)
+
+ def get_vips(self, context, filters=None, fields=None):
+ return self._get_collection(context, Vip,
+ self._make_vip_dict,
+ filters=filters, fields=fields)
+
+ ########################################################
+ # Pool DB access
+ def _make_pool_dict(self, context, pool, fields=None):
+ res = {'id': pool['id'],
+ 'tenant_id': pool['tenant_id'],
+ 'name': pool['name'],
+ 'description': pool['description'],
+ 'subnet_id': pool['subnet_id'],
+ 'protocol': pool['protocol'],
+ 'vip_id': pool['vip_id'],
+ 'lb_method': pool['lb_method'],
+ 'admin_state_up': pool['admin_state_up'],
+ 'status': pool['status']}
+
+ # Get the associated members
+ res['members'] = [member['id'] for member in pool['members']]
+
+ # Get the associated health_monitors
+ res['health_monitors'] = [
+ monitor['monitor_id'] for monitor in pool['monitors']]
+
+ return self._fields(res, fields)
+
+ def _update_pool_member_info(self, context, pool_id, membersInfo):
+ with context.session.begin(subtransactions=True):
+ member_qry = context.session.query(Member)
+ for member_id in membersInfo:
+ try:
+ member = member_qry.filter_by(id=member_id).one()
+ member.update({'pool_id': pool_id})
+ except exc.NoResultFound:
+ raise loadbalancer.MemberNotFound(member_id=member_id)
+
+ def _create_pool_stats(self, context, pool_id):
+ # This is internal method to add pool statistics. It won't
+ # be exposed to API
+ stats_db = PoolStatistics(
+ pool_id=pool_id,
+ bytes_in=0,
+ bytes_out=0,
+ active_connections=0,
+ total_connections=0
+ )
+ return stats_db
+
+ def _delete_pool_stats(self, context, pool_id):
+ # This is internal method to delete pool statistics. It won't
+ # be exposed to API
+ with context.session.begin(subtransactions=True):
+ stats_qry = context.session.query(PoolStatistics)
+ try:
+ stats = stats_qry.filter_by(pool_id=pool_id).one()
+ except exc.NoResultFound:
+ raise loadbalancer.PoolStatsNotFound(pool_id=pool_id)
+ context.session.delete(stats)
+
+ def create_pool(self, context, pool):
+ v = pool['pool']
+
+ tenant_id = self._get_tenant_id_for_create(context, v)
+ with context.session.begin(subtransactions=True):
+ pool_db = Pool(id=uuidutils.generate_uuid(),
+ tenant_id=tenant_id,
+ name=v['name'],
+ description=v['description'],
+ subnet_id=v['subnet_id'],
+ protocol=v['protocol'],
+ lb_method=v['lb_method'],
+ admin_state_up=v['admin_state_up'],
+ status=constants.PENDING_CREATE)
+ pool_db.stats = self._create_pool_stats(context, pool_db['id'])
+ context.session.add(pool_db)
+
+ pool_db = self._get_resource(context, Pool, pool_db['id'])
+ return self._make_pool_dict(context, pool_db)
+
+ def update_pool(self, context, id, pool):
+ v = pool['pool']
+
+ with context.session.begin(subtransactions=True):
+ pool_db = self._get_resource(context, Pool, id)
+ if v:
+ pool_db.update(v)
+
+ return self._make_pool_dict(context, pool_db)
+
+ def delete_pool(self, context, id):
+ # Check if the pool is in use
+ vip = context.session.query(Vip).filter_by(pool_id=id).first()
+ if vip:
+ raise loadbalancer.PoolInUse(pool_id=id)
+
+ with context.session.begin(subtransactions=True):
+ self._delete_pool_stats(context, id)
+ pool_db = self._get_resource(context, Pool, id)
+ context.session.delete(pool_db)
+
+ def get_pool(self, context, id, fields=None):
+ pool = self._get_resource(context, Pool, id)
+ return self._make_pool_dict(context, pool, fields)
+
+ def get_pools(self, context, filters=None, fields=None):
+ collection = self._model_query(context, Pool)
+ collection = self._apply_filters_to_query(collection, Pool, filters)
+ return [self._make_pool_dict(context, c, fields)
+ for c in collection.all()]
+
+ def get_stats(self, context, pool_id):
+ with context.session.begin(subtransactions=True):
+ pool_qry = context.session.query(Pool)
+ try:
+ pool = pool_qry.filter_by(id=pool_id).one()
+ stats = pool['stats']
+ except exc.NoResultFound:
+ raise loadbalancer.PoolStatsNotFound(pool_id=pool_id)
+
+ res = {'bytes_in': stats['bytes_in'],
+ 'bytes_out': stats['bytes_out'],
+ 'active_connections': stats['active_connections'],
+ 'total_connections': stats['total_connections']}
+ return {'stats': res}
+
+ def create_pool_health_monitor(self, context, health_monitor, pool_id):
+ monitor_id = health_monitor['health_monitor']['id']
+ with context.session.begin(subtransactions=True):
+ monitor_qry = context.session.query(HealthMonitor)
+ try:
+ monitor = monitor_qry.filter_by(id=monitor_id).one()
+ monitor.update({'pool_id': pool_id})
+ except exc.NoResultFound:
+ raise loadbalancer.HealthMonitorNotFound(monitor_id=monitor_id)
+ try:
+ qry = context.session.query(Pool)
+ pool = qry.filter_by(id=pool_id).one()
+ except exc.NoResultFound:
+ raise loadbalancer.PoolNotFound(pool_id=pool_id)
+
+ assoc = PoolMonitorAssociation(pool_id=pool_id,
+ monitor_id=monitor_id)
+ assoc.monitor = monitor
+ pool.monitors.append(assoc)
+
+ monitors = []
+ try:
+ qry = context.session.query(Pool)
+ pool = qry.filter_by(id=pool_id).one()
+ for monitor in pool['monitors']:
+ monitors.append(monitor['monitor_id'])
+ except exc.NoResultFound:
+ pass
+
+ res = {"health_monitor": monitors}
+ return res
+
+ def delete_pool_health_monitor(self, context, id, pool_id):
+ with context.session.begin(subtransactions=True):
+ try:
+ pool_qry = context.session.query(Pool)
+ pool = pool_qry.filter_by(id=pool_id).one()
+ except exc.NoResultFound:
+ raise loadbalancer.PoolNotFound(pool_id=pool_id)
+ try:
+ monitor_qry = context.session.query(PoolMonitorAssociation)
+ monitor = monitor_qry.filter_by(monitor_id=id,
+ pool_id=pool_id).one()
+ pool.monitors.remove(monitor)
+ except exc.NoResultFound:
+ raise loadbalancer.HealthMonitorNotFound(monitor_id=id)
+
+ def get_pool_health_monitor(self, context, id, pool_id, fields=None):
+ healthmonitor = self._get_resource(context, HealthMonitor, id)
+ return self._make_health_monitor_dict(healthmonitor, fields)
+
+ ########################################################
+ # Member DB access
+ def _make_member_dict(self, member, fields=None):
+ res = {'id': member['id'],
+ 'tenant_id': member['tenant_id'],
+ 'pool_id': member['pool_id'],
+ 'address': member['address'],
+ 'port': member['port'],
+ 'weight': member['weight'],
+ 'admin_state_up': member['admin_state_up'],
+ 'status': member['status']}
+ return self._fields(res, fields)
+
+ def create_member(self, context, member):
+ v = member['member']
+ tenant_id = self._get_tenant_id_for_create(context, v)
+
+ with context.session.begin(subtransactions=True):
+ pool = None
+ try:
+ qry = context.session.query(Pool)
+ pool = qry.filter_by(id=v['pool_id']).one()
+ except exc.NoResultFound:
+ raise loadbalancer.PoolNotFound(pool_id=v['pool_id'])
+
+ member_db = Member(id=uuidutils.generate_uuid(),
+ tenant_id=tenant_id,
+ pool_id=v['pool_id'],
+ address=v['address'],
+ port=v['port'],
+ weight=v['weight'],
+ admin_state_up=v['admin_state_up'],
+ status=constants.PENDING_CREATE)
+ context.session.add(member_db)
+
+ return self._make_member_dict(member_db)
+
+ def update_member(self, context, id, member):
+ v = member['member']
+ with context.session.begin(subtransactions=True):
+ member_db = self._get_resource(context, Member, id)
+ old_pool_id = member_db['pool_id']
+ if v:
+ member_db.update(v)
+
+ return self._make_member_dict(member_db)
+
+ def delete_member(self, context, id):
+ with context.session.begin(subtransactions=True):
+ member_db = self._get_resource(context, Member, id)
+ context.session.delete(member_db)
+
+ def get_member(self, context, id, fields=None):
+ member = self._get_resource(context, Member, id)
+ return self._make_member_dict(member, fields)
+
+ def get_members(self, context, filters=None, fields=None):
+ return self._get_collection(context, Member,
+ self._make_member_dict,
+ filters=filters, fields=fields)
+
+ ########################################################
+ # HealthMonitor DB access
+ def _make_health_monitor_dict(self, health_monitor, fields=None):
+ res = {'id': health_monitor['id'],
+ 'tenant_id': health_monitor['tenant_id'],
+ 'type': health_monitor['type'],
+ 'delay': health_monitor['delay'],
+ 'timeout': health_monitor['timeout'],
+ 'max_retries': health_monitor['max_retries'],
+ 'http_method': health_monitor['http_method'],
+ 'url_path': health_monitor['url_path'],
+ 'expected_codes': health_monitor['expected_codes'],
+ 'admin_state_up': health_monitor['admin_state_up'],
+ 'status': health_monitor['status']}
+ return self._fields(res, fields)
+
+ def create_health_monitor(self, context, health_monitor):
+ v = health_monitor['health_monitor']
+ tenant_id = self._get_tenant_id_for_create(context, v)
+ with context.session.begin(subtransactions=True):
+ monitor_db = HealthMonitor(id=uuidutils.generate_uuid(),
+ tenant_id=tenant_id,
+ type=v['type'],
+ delay=v['delay'],
+ timeout=v['timeout'],
+ max_retries=v['max_retries'],
+ http_method=v['http_method'],
+ url_path=v['url_path'],
+ expected_codes=v['expected_codes'],
+ admin_state_up=v['admin_state_up'],
+ status=constants.PENDING_CREATE)
+ context.session.add(monitor_db)
+ return self._make_health_monitor_dict(monitor_db)
+
+ def update_health_monitor(self, context, id, health_monitor):
+ v = health_monitor['health_monitor']
+ with context.session.begin(subtransactions=True):
+ monitor_db = self._get_resource(context, HealthMonitor, id)
+ if v:
+ monitor_db.update(v)
+ return self._make_health_monitor_dict(monitor_db)
+
+ def delete_health_monitor(self, context, id):
+ with context.session.begin(subtransactions=True):
+ assoc_qry = context.session.query(PoolMonitorAssociation)
+ pool_qry = context.session.query(Pool)
+ for assoc in assoc_qry.filter_by(monitor_id=id).all():
+ try:
+ pool = pool_qry.filter_by(id=assoc['pool_id']).one()
+ except exc.NoResultFound:
+ raise loadbalancer.PoolNotFound(pool_id=pool['id'])
+ pool.monitors.remove(assoc)
+ monitor_db = self._get_resource(context, HealthMonitor, id)
+ context.session.delete(monitor_db)
+
+ def get_health_monitor(self, context, id, fields=None):
+ healthmonitor = self._get_resource(context, HealthMonitor, id)
+ return self._make_health_monitor_dict(healthmonitor, fields)
+
+ def get_health_monitors(self, context, filters=None, fields=None):
+ return self._get_collection(context, HealthMonitor,
+ self._make_health_monitor_dict,
+ filters=filters, fields=fields)
from quantum.api import extensions
from quantum.api.v2 import attributes as attr
from quantum.api.v2 import base
+from quantum.common import exceptions as qexception
from quantum import manager
from quantum.plugins.common import constants
from quantum.plugins.services.service_base import ServicePluginBase
+# Loadbalancer Exceptions
+class VipNotFound(qexception.NotFound):
+ message = _("Vip %(vip_id)s could not be found")
+
+
+class PoolNotFound(qexception.NotFound):
+ message = _("Pool %(pool_id)s could not be found")
+
+
+class MemberNotFound(qexception.NotFound):
+ message = _("Member %(member_id)s could not be found")
+
+
+class HealthMonitorNotFound(qexception.NotFound):
+ message = _("Health_monitor %(monitor_id)s could not be found")
+
+
+class StateInvalid(qexception.QuantumException):
+ message = _("Invalid state %(state)s of Loadbalancer resource %(id)s")
+
+
+class PoolInUse(qexception.InUse):
+ message = _("Pool %(pool_id)s is still in use")
+
+
+class PoolStatsNotFound(qexception.NotFound):
+ message = _("Statistics of Pool %(pool_id)s could not be found")
+
+
RESOURCE_ATTRIBUTE_MAP = {
'vips': {
'id': {'allow_post': False, 'allow_put': False,
class LoadBalancerPluginBase(ServicePluginBase):
__metaclass__ = abc.ABCMeta
+ def get_plugin_name(self):
+ return constants.LOADBALANCER
+
def get_plugin_type(self):
return constants.LOADBALANCER
DUMMY: "/dummy_svc",
LOADBALANCER: "/lb",
}
+
+# Service operation status constants
+ACTIVE = "ACTIVE"
+PENDING_CREATE = "PENDING_CREATE"
+PENDING_UPDATE = "PENDING_UPDATE"
+PENDING_DELETE = "PENDING_DELETE"
+INACTIVE = "INACTIVE"
+ERROR = "ERROR"
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+from quantum.db import api as qdbapi
+from quantum.db import model_base
+from quantum.db.loadbalancer import loadbalancer_db
+from quantum.extensions import loadbalancer
+from quantum.openstack.common import log as logging
+from quantum.plugins.common import constants
+
+LOG = logging.getLogger(__name__)
+
+
+class LoadBalancerPlugin(loadbalancer_db.LoadBalancerPluginDb):
+
+ """
+ Implementation of the Quantum Loadbalancer Service Plugin.
+
+ This class manages the workflow of LBaaS request/response.
+ Most DB related works are implemented in class
+ loadbalancer_db.LoadBalancerPluginDb.
+ """
+ supported_extension_aliases = ["lbaas"]
+
+ def __init__(self):
+ """
+ Do the initialization for the loadbalancer service plugin here.
+ """
+ qdbapi.register_models(base=model_base.BASEV2)
+
+ # TODO: we probably need to setup RPC channel (to talk to LbAgent) here
+
+ def get_plugin_type(self):
+ return constants.LOADBALANCER
+
+ def get_plugin_description(self):
+ return "Quantum LoadBalancer Service Plugin"
+
+ def create_vip(self, context, vip):
+ v = super(LoadBalancerPlugin, self).create_vip(context, vip)
+ self.update_status(context, loadbalancer_db.Vip, v['id'],
+ constants.PENDING_CREATE)
+ LOG.debug(_("Create vip: %s") % v['id'])
+
+ # If we adopt asynchronous mode, this method should return immediately
+ # and let client to query the object status. The plugin will listen on
+ # the event from device and update the object status by calling
+ # self.update_state(context, Vip, id, ACTIVE/ERROR)
+ #
+ # In synchronous mode, send the request to device here and wait for
+ # response. Eventually update the object status prior to the return.
+ v_query = self.get_vip(context, v['id'])
+ return v_query
+
+ def update_vip(self, context, id, vip):
+ v_query = self.get_vip(
+ context, id, fields=["status"])
+ if v_query['status'] in [
+ constants.PENDING_DELETE, constants.ERROR]:
+ raise loadbalancer.StateInvalid(id=id,
+ state=v_query['status'])
+
+ v = super(LoadBalancerPlugin, self).update_vip(context, id, vip)
+ self.update_status(context, loadbalancer_db.Vip, id,
+ constants.PENDING_UPDATE)
+ LOG.debug(_("Update vip: %s"), id)
+
+ # TODO notify lbagent
+ v_rt = self.get_vip(context, id)
+ return v_rt
+
+ def delete_vip(self, context, id):
+ self.update_status(context, loadbalancer_db.Vip, id,
+ constants.PENDING_DELETE)
+ LOG.debug(_("Delete vip: %s"), id)
+
+ # TODO notify lbagent
+ super(LoadBalancerPlugin, self).delete_vip(context, id)
+
+ def get_vip(self, context, id, fields=None):
+ res = super(LoadBalancerPlugin, self).get_vip(context, id, fields)
+ LOG.debug(_("Get vip: %s"), id)
+ return res
+
+ def get_vips(self, context, filters=None, fields=None):
+ res = super(LoadBalancerPlugin, self).get_vips_db(
+ context, filters, fields)
+ LOG.debug(_("Get vips"))
+ return res
+
+ def create_pool(self, context, pool):
+ p = super(LoadBalancerPlugin, self).create_pool(context, pool)
+ self.update_status(context, loadbalancer_db.Pool, p['id'],
+ constants.PENDING_CREATE)
+ LOG.debug(_("Create pool: %s"), p['id'])
+
+ # TODO notify lbagent
+ p_rt = self.get_pool(context, p['id'])
+ return p_rt
+
+ def update_pool(self, context, id, pool):
+ p_query = self.get_pool(context, id, fields=["status"])
+ if p_query['status'] in [
+ constants.PENDING_DELETE, constants.ERROR]:
+ raise loadbalancer.StateInvalid(id=id,
+ state=p_query['status'])
+ p = super(LoadBalancerPlugin, self).update_pool(context, id, pool)
+ LOG.debug(_("Update pool: %s"), p['id'])
+ # TODO notify lbagent
+ p_rt = self.get_pool(context, id)
+ return p_rt
+
+ def delete_pool(self, context, id):
+ self.update_status(context, loadbalancer_db.Pool, id,
+ constants.PENDING_DELETE)
+ # TODO notify lbagent
+ super(LoadBalancerPlugin, self).delete_pool(context, id)
+ LOG.debug(_("Delete pool: %s"), id)
+
+ def get_pool(self, context, id, fields=None):
+ res = super(LoadBalancerPlugin, self).get_pool(context, id, fields)
+ LOG.debug(_("Get pool: %s"), id)
+ return res
+
+ def get_pools(self, context, filters=None, fields=None):
+ res = super(LoadBalancerPlugin, self).get_pools(
+ context, filters, fields)
+ LOG.debug(_("Get Pools"))
+ return res
+
+ def stats(self, context, pool_id):
+ res = super(LoadBalancerPlugin, self).get_stats(context, pool_id)
+ LOG.debug(_("Get stats of Pool: %s"), pool_id)
+ return res
+
+ def create_pool_health_monitor(self, context, health_monitor, pool_id):
+ m = super(LoadBalancerPlugin, self).create_pool_health_monitor(
+ context, health_monitor, pool_id)
+ LOG.debug(_("Create health_monitor of pool: %s"), pool_id)
+ return m
+
+ def get_pool_health_monitor(self, context, id, pool_id, fields=None):
+ m = super(LoadBalancerPlugin, self).get_pool_health_monitor(
+ context, id, pool_id, fields)
+ LOG.debug(_("Get health_monitor of pool: %s"), pool_id)
+ return m
+
+ def delete_pool_health_monitor(self, context, id, pool_id):
+ super(LoadBalancerPlugin, self).delete_pool_health_monitor(
+ context, id, pool_id)
+ LOG.debug(_("Delete health_monitor %(id)s of pool: %(pool_id)s"),
+ {"id": id, "pool_id": pool_id})
+
+ def get_member(self, context, id, fields=None):
+ res = super(LoadBalancerPlugin, self).get_member(
+ context, id, fields)
+ LOG.debug(_("Get member: %s"), id)
+ return res
+
+ def get_members(self, context, filters=None, fields=None):
+ res = super(LoadBalancerPlugin, self).get_members(
+ context, filters, fields)
+ LOG.debug(_("Get members"))
+ return res
+
+ def create_member(self, context, member):
+ m = super(LoadBalancerPlugin, self).create_member(context, member)
+ self.update_status(context, loadbalancer_db.Member, m['id'],
+ constants.PENDING_CREATE)
+ LOG.debug(_("Create member: %s"), m['id'])
+ # TODO notify lbagent
+ m_rt = self.get_member(context, m['id'])
+ return m_rt
+
+ def update_member(self, context, id, member):
+ m_query = self.get_member(context, id, fields=["status"])
+ if m_query['status'] in [
+ constants.PENDING_DELETE, constants.ERROR]:
+ raise loadbalancer.StateInvalid(id=id,
+ state=m_query['status'])
+ m = super(LoadBalancerPlugin, self).update_member(context, id, member)
+ self.update_status(context, loadbalancer_db.Member, id,
+ constants.PENDING_UPDATE)
+ LOG.debug(_("Update member: %s"), m['id'])
+ # TODO notify lbagent
+ m_rt = self.get_member(context, id)
+ return m_rt
+
+ def delete_member(self, context, id):
+ self.update_status(context, loadbalancer_db.Member, id,
+ constants.PENDING_DELETE)
+ LOG.debug(_("Delete member: %s"), id)
+ # TODO notify lbagent
+ super(LoadBalancerPlugin, self).delete_member(context, id)
+
+ def get_health_monitor(self, context, id, fields=None):
+ res = super(LoadBalancerPlugin, self).get_health_monitor(
+ context, id, fields)
+ LOG.debug(_("Get health_monitor: %s"), id)
+ return res
+
+ def get_health_monitors(self, context, filters=None, fields=None):
+ res = super(LoadBalancerPlugin, self).get_health_monitors(
+ context, filters, fields)
+ LOG.debug(_("Get health_monitors"))
+ return res
+
+ def create_health_monitor(self, context, health_monitor):
+ h = super(LoadBalancerPlugin, self).create_health_monitor(
+ context, health_monitor)
+ self.update_status(context, loadbalancer_db.HealthMonitor, h['id'],
+ constants.PENDING_CREATE)
+ LOG.debug(_("Create health_monitor: %s"), h['id'])
+ # TODO notify lbagent
+ h_rt = self.get_health_monitor(context, h['id'])
+ return h_rt
+
+ def update_health_monitor(self, context, id, health_monitor):
+ h_query = self.get_health_monitor(context, id, fields=["status"])
+ if h_query['status'] in [
+ constants.PENDING_DELETE, constants.ERROR]:
+ raise loadbalancer.StateInvalid(id=id,
+ state=h_query['status'])
+ h = super(LoadBalancerPlugin, self).update_health_monitor(
+ context, id, health_monitor)
+ self.update_status(context, loadbalancer_db.HealthMonitor, id,
+ constants.PENDING_UPDATE)
+ LOG.debug(_("Update health_monitor: %s"), h['id'])
+ # TODO notify lbagent
+ h_rt = self.get_health_monitor(context, id)
+ return h_rt
+
+ def delete_health_monitor(self, context, id):
+ self.update_status(context, loadbalancer_db.HealthMonitor, id,
+ constants.PENDING_DELETE)
+ LOG.debug(_("Delete health_monitor: %s"), id)
+ super(LoadBalancerPlugin, self).delete_health_monitor(context, id)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
--- /dev/null
+# Copyright (c) 2012 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import contextlib
+import logging
+import os
+import unittest2
+import webob.exc
+
+import quantum
+from quantum import context
+from quantum.api.extensions import PluginAwareExtensionManager
+from quantum.api.extensions import ExtensionMiddleware
+from quantum.api.v2 import attributes
+from quantum.api.v2.attributes import ATTR_NOT_SPECIFIED
+from quantum.api.v2.router import APIRouter
+from quantum.common import config
+from quantum.common import exceptions as q_exc
+from quantum.common.test_lib import test_config
+from quantum.db import api as db
+from quantum.db import db_base_plugin_v2
+from quantum.db import models_v2
+from quantum.extensions import loadbalancer
+from quantum.manager import QuantumManager
+from quantum.openstack.common import cfg
+from quantum.openstack.common import timeutils
+from quantum.plugins.common import constants
+from quantum.plugins.services.loadbalancer import loadbalancerPlugin
+from quantum.tests.unit import test_extensions
+from quantum.tests.unit.testlib_api import create_request
+from quantum.wsgi import Serializer, JSONDeserializer
+
+
+LOG = logging.getLogger(__name__)
+
+DB_CORE_PLUGIN_KLASS = 'quantum.db.db_base_plugin_v2.QuantumDbPluginV2'
+DB_LB_PLUGIN_KLASS = (
+ "quantum.plugins.services.loadbalancer."
+ "loadbalancerPlugin.LoadBalancerPlugin"
+)
+ROOTDIR = os.path.dirname(__file__) + '../../../..'
+ETCDIR = os.path.join(ROOTDIR, 'etc')
+
+extensions_path = ':'.join(quantum.extensions.__path__)
+
+
+def etcdir(*p):
+ return os.path.join(ETCDIR, *p)
+
+
+class LoadBalancerPluginDbTestCase(unittest2.TestCase):
+
+ def setUp(self, core_plugin=None, lb_plugin=None):
+ super(LoadBalancerPluginDbTestCase, self).setUp()
+
+ db._ENGINE = None
+ db._MAKER = None
+
+ QuantumManager._instance = None
+ PluginAwareExtensionManager._instance = None
+ self._attribute_map_bk = {}
+ self._attribute_map_bk = loadbalancer.RESOURCE_ATTRIBUTE_MAP.copy()
+ self._tenant_id = "test-tenant"
+ self._subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14"
+
+ json_deserializer = JSONDeserializer()
+ self._deserializers = {
+ 'application/json': json_deserializer,
+ }
+
+ if not core_plugin:
+ core_plugin = test_config.get('plugin_name_v2',
+ DB_CORE_PLUGIN_KLASS)
+ if not lb_plugin:
+ lb_plugin = test_config.get('lb_plugin_name', DB_LB_PLUGIN_KLASS)
+
+ # point config file to: quantum/tests/etc/quantum.conf.test
+ args = ['--config-file', etcdir('quantum.conf.test')]
+ config.parse(args=args)
+ # Update the plugin
+ service_plugins = [lb_plugin]
+ cfg.CONF.set_override('core_plugin', core_plugin)
+ cfg.CONF.set_override('service_plugins', service_plugins)
+ cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab")
+ self.api = APIRouter()
+
+ plugin = loadbalancerPlugin.LoadBalancerPlugin()
+ ext_mgr = PluginAwareExtensionManager(
+ extensions_path,
+ {constants.LOADBALANCER: plugin}
+ )
+ app = config.load_paste_app('extensions_test_app')
+ self.ext_api = ExtensionMiddleware(app, ext_mgr=ext_mgr)
+
+ def tearDown(self):
+ super(LoadBalancerPluginDbTestCase, self).tearDown()
+ self.api = None
+ self._deserializers = None
+ self._skip_native_bulk = None
+ self.ext_api = None
+
+ db.clear_db()
+ db._ENGINE = None
+ db._MAKER = None
+ cfg.CONF.reset()
+ # Restore the original attribute map
+ loadbalancer.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk
+
+ def _req(self, method, resource, data=None, fmt='json',
+ id=None, subresource=None, sub_id=None, params=None, action=None):
+ if id and action:
+ path = '/lb/%(resource)s/%(id)s/%(action)s.%(fmt)s' % locals()
+ elif id and subresource and sub_id:
+ path = (
+ '/lb/%(resource)s/%(id)s/%(subresource)s/'
+ '%(sub_id)s.%(fmt)s') % locals()
+ elif id and subresource:
+ path = (
+ '/lb/%(resource)s/%(id)s/'
+ '%(subresource)s.%(fmt)s') % locals()
+ elif id:
+ path = '/lb/%(resource)s/%(id)s.%(fmt)s' % locals()
+ else:
+ path = '/lb/%(resource)s.%(fmt)s' % locals()
+
+ content_type = 'application/%s' % fmt
+ body = None
+ if data is not None: # empty dict is valid
+ body = Serializer().serialize(data, content_type)
+
+ req = create_request(path,
+ body,
+ content_type,
+ method,
+ query_string=params)
+ return req
+
+ def new_create_request(self, resource, data, fmt='json', id=None,
+ subresource=None):
+ return self._req('POST', resource, data, fmt, id=id,
+ subresource=subresource)
+
+ def new_list_request(self, resource, fmt='json', params=None):
+ return self._req('GET', resource, None, fmt, params=params)
+
+ def new_show_request(self, resource, id, fmt='json', action=None,
+ subresource=None, sub_id=None):
+ return self._req('GET', resource, None, fmt, id=id, action=action,
+ subresource=subresource, sub_id=sub_id)
+
+ def new_delete_request(self, resource, id, fmt='json',
+ subresource=None, sub_id=None):
+ return self._req('DELETE', resource, None, fmt, id=id,
+ subresource=subresource, sub_id=sub_id)
+
+ def new_update_request(self, resource, data, id, fmt='json'):
+ return self._req('PUT', resource, data, fmt, id=id)
+
+ def deserialize(self, content_type, response):
+ ctype = 'application/%s' % content_type
+ data = self._deserializers[ctype].deserialize(response.body)['body']
+ return data
+
+ def _create_vip(self, fmt, name, pool_id, protocol, port, admin_status_up,
+ expected_res_status=None, **kwargs):
+ data = {'vip': {'name': name,
+ 'subnet_id': self._subnet_id,
+ 'pool_id': pool_id,
+ 'protocol': protocol,
+ 'port': port,
+ 'admin_state_up': admin_status_up,
+ 'tenant_id': self._tenant_id}}
+ for arg in ('description', 'address',
+ 'session_persistence', 'connection_limit'):
+ if arg in kwargs and kwargs[arg] is not None:
+ data['vip'][arg] = kwargs[arg]
+
+ vip_req = self.new_create_request('vips', data, fmt)
+ vip_res = vip_req.get_response(self.ext_api)
+ if expected_res_status:
+ self.assertEqual(vip_res.status_int, expected_res_status)
+
+ return vip_res
+
+ def _create_pool(self, fmt, name, lb_method, protocol, admin_status_up,
+ expected_res_status=None, **kwargs):
+ data = {'pool': {'name': name,
+ 'subnet_id': self._subnet_id,
+ 'lb_method': lb_method,
+ 'protocol': protocol,
+ 'admin_state_up': admin_status_up,
+ 'tenant_id': self._tenant_id}}
+ for arg in ('description'):
+ if arg in kwargs and kwargs[arg] is not None:
+ data['pool'][arg] = kwargs[arg]
+
+ pool_req = self.new_create_request('pools', data, fmt)
+ pool_res = pool_req.get_response(self.ext_api)
+ if expected_res_status:
+ self.assertEqual(pool_res.status_int, expected_res_status)
+
+ return pool_res
+
+ def _create_member(self, fmt, address, port, admin_status_up,
+ expected_res_status=None, **kwargs):
+ data = {'member': {'address': address,
+ 'port': port,
+ 'admin_state_up': admin_status_up,
+ 'tenant_id': self._tenant_id}}
+ for arg in ('weight', 'pool_id'):
+ if arg in kwargs and kwargs[arg] is not None:
+ data['member'][arg] = kwargs[arg]
+
+ member_req = self.new_create_request('members', data, fmt)
+ member_res = member_req.get_response(self.ext_api)
+ if expected_res_status:
+ self.assertEqual(member_res.status_int, expected_res_status)
+
+ return member_res
+
+ def _create_health_monitor(self, fmt, type, delay, timeout, max_retries,
+ admin_status_up, expected_res_status=None,
+ **kwargs):
+ data = {'health_monitor': {'type': type,
+ 'delay': delay,
+ 'timeout': timeout,
+ 'max_retries': max_retries,
+ 'admin_status_up': admin_status_up,
+ 'tenant_id': self._tenant_id}}
+ for arg in ('http_method', 'path', 'expected_code'):
+ if arg in kwargs and kwargs[arg] is not None:
+ data['health_monitor'][arg] = kwargs[arg]
+
+ req = self.new_create_request('health_monitors', data, fmt)
+
+ res = req.get_response(self.ext_api)
+ if expected_res_status:
+ self.assertEqual(res.status_int, expected_res_status)
+
+ return res
+
+ def _api_for_resource(self, resource):
+ if resource in ['networks', 'subnets', 'ports']:
+ return self.api
+ else:
+ return self.ext_api
+
+ def _delete(self, collection, id,
+ expected_code=webob.exc.HTTPNoContent.code):
+ req = self.new_delete_request(collection, id)
+ res = req.get_response(self._api_for_resource(collection))
+ self.assertEqual(res.status_int, expected_code)
+
+ def _show(self, resource, id, expected_code=webob.exc.HTTPOk.code):
+ req = self.new_show_request(resource, id)
+ res = req.get_response(self._api_for_resource(resource))
+ self.assertEqual(res.status_int, expected_code)
+ return self.deserialize('json', res)
+
+ def _update(self, resource, id, new_data,
+ expected_code=webob.exc.HTTPOk.code):
+ req = self.new_update_request(resource, new_data, id)
+ res = req.get_response(self._api_for_resource(resource))
+ self.assertEqual(res.status_int, expected_code)
+ return self.deserialize('json', res)
+
+ def _list(self, resource, fmt='json', query_params=None):
+ req = self.new_list_request(resource, fmt, query_params)
+ res = req.get_response(self._api_for_resource(resource))
+ self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
+ return self.deserialize('json', res)
+
+ @contextlib.contextmanager
+ def vip(self, fmt='json', name='vip1', pool=None,
+ protocol='HTTP', port=80, admin_status_up=True, no_delete=False,
+ **kwargs):
+ if not pool:
+ with self.pool() as pool:
+ pool_id = pool['pool']['id']
+ res = self._create_vip(fmt,
+ name,
+ pool_id,
+ protocol,
+ port,
+ admin_status_up,
+ address="172.16.1.123",
+ **kwargs)
+ vip = self.deserialize(fmt, res)
+ if res.status_int >= 400:
+ raise webob.exc.HTTPClientError(code=res.status_int)
+ yield vip
+ if not no_delete:
+ self._delete('vips', vip['vip']['id'])
+ else:
+ pool_id = pool['pool']['id']
+ res = self._create_vip(fmt,
+ name,
+ pool_id,
+ protocol,
+ port,
+ admin_status_up,
+ address="172.16.1.123",
+ **kwargs)
+ vip = self.deserialize(fmt, res)
+ if res.status_int >= 400:
+ raise webob.exc.HTTPClientError(code=res.status_int)
+ yield vip
+ if not no_delete:
+ self._delete('vips', vip['vip']['id'])
+
+ @contextlib.contextmanager
+ def pool(self, fmt='json', name='pool1', lb_method='ROUND_ROBIN',
+ protocol='HTTP', admin_status_up=True, no_delete=False,
+ **kwargs):
+ res = self._create_pool(fmt,
+ name,
+ lb_method,
+ protocol,
+ admin_status_up,
+ **kwargs)
+ pool = self.deserialize(fmt, res)
+ if res.status_int >= 400:
+ raise webob.exc.HTTPClientError(code=res.status_int)
+ yield pool
+ if not no_delete:
+ self._delete('pools', pool['pool']['id'])
+
+ @contextlib.contextmanager
+ def member(self, fmt='json', address='192.168.1.100',
+ port=80, admin_status_up=True, no_delete=False,
+ **kwargs):
+ res = self._create_member(fmt,
+ address,
+ port,
+ admin_status_up,
+ **kwargs)
+ member = self.deserialize(fmt, res)
+ if res.status_int >= 400:
+ raise webob.exc.HTTPClientError(code=res.status_int)
+ yield member
+ if not no_delete:
+ self._delete('members', member['member']['id'])
+
+ @contextlib.contextmanager
+ def health_monitor(self, fmt='json', type='TCP',
+ delay=30, timeout=10, max_retries=3,
+ admin_status_up=True,
+ no_delete=False, **kwargs):
+ res = self._create_health_monitor(fmt,
+ type,
+ delay,
+ timeout,
+ max_retries,
+ admin_status_up,
+ **kwargs)
+ health_monitor = self.deserialize(fmt, res)
+ if res.status_int >= 400:
+ raise webob.exc.HTTPClientError(code=res.status_int)
+ yield health_monitor
+ if not no_delete:
+ self._delete('health_monitors',
+ health_monitor['health_monitor']['id'])
+
+
+class TestLoadBalancer(LoadBalancerPluginDbTestCase):
+ def test_create_vip(self):
+ name = 'vip1'
+ keys = [('name', name),
+ ('subnet_id', self._subnet_id),
+ ('address', "172.16.1.123"),
+ ('port', 80),
+ ('protocol', 'HTTP'),
+ ('connection_limit', -1),
+ ('admin_state_up', True),
+ ('status', 'PENDING_CREATE')]
+
+ with self.vip(name=name) as vip:
+ for k, v in keys:
+ self.assertEqual(vip['vip'][k], v)
+
+ def test_create_vip_with_session_persistence(self):
+ name = 'vip2'
+ keys = [('name', name),
+ ('subnet_id', self._subnet_id),
+ ('address', "172.16.1.123"),
+ ('port', 80),
+ ('protocol', 'HTTP'),
+ ('session_persistence', {'type': "HTTP_COOKIE",
+ 'cookie_name': "jessionId"}),
+ ('connection_limit', -1),
+ ('admin_state_up', True),
+ ('status', 'PENDING_CREATE')]
+
+ with self.vip(name=name,
+ session_persistence={'type': "HTTP_COOKIE",
+ 'cookie_name': "jessionId"}) as vip:
+ for k, v in keys:
+ self.assertEqual(vip['vip'][k], v)
+
+ def test_update_vip(self):
+ name = 'new_vip'
+ keys = [('name', name),
+ ('subnet_id', self._subnet_id),
+ ('address', "172.16.1.123"),
+ ('port', 80),
+ ('connection_limit', 100),
+ ('admin_state_up', False),
+ ('status', 'PENDING_UPDATE')]
+
+ with self.vip(name=name) as vip:
+ data = {'vip': {'name': name,
+ 'connection_limit': 100,
+ 'session_persistence':
+ {'type': "HTTP_COOKIE",
+ 'cookie_name': "jesssionId"},
+ 'admin_state_up': False}}
+ req = self.new_update_request('vips', data, vip['vip']['id'])
+ res = self.deserialize('json', req.get_response(self.ext_api))
+ for k, v in keys:
+ self.assertEqual(res['vip'][k], v)
+
+ def test_delete_vip(self):
+ with self.pool() as pool:
+ with self.vip(no_delete=True) as vip:
+ req = self.new_delete_request('vips',
+ vip['vip']['id'])
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 204)
+
+ def test_show_vip(self):
+ name = "vip_show"
+ keys = [('name', name),
+ ('subnet_id', self._subnet_id),
+ ('address', "172.16.1.123"),
+ ('port', 80),
+ ('protocol', 'HTTP'),
+ ('connection_limit', -1),
+ ('admin_state_up', True),
+ ('status', 'PENDING_CREATE')]
+ with self.vip(name=name) as vip:
+ req = self.new_show_request('vips',
+ vip['vip']['id'])
+ res = self.deserialize('json', req.get_response(self.ext_api))
+ for k, v in keys:
+ self.assertEqual(res['vip'][k], v)
+
+ def test_create_pool(self):
+ name = "pool1"
+ keys = [('name', name),
+ ('subnet_id', self._subnet_id),
+ ('tenant_id', self._tenant_id),
+ ('protocol', 'HTTP'),
+ ('lb_method', 'ROUND_ROBIN'),
+ ('admin_state_up', True),
+ ('status', 'PENDING_CREATE')]
+ with self.pool(name=name) as pool:
+ for k, v in keys:
+ self.assertEqual(pool['pool'][k], v)
+
+ def test_create_pool_with_members(self):
+ name = "pool2"
+ with self.pool(name=name) as pool:
+ pool_id = pool['pool']['id']
+ res1 = self._create_member('json',
+ '192.168.1.100',
+ '80',
+ True,
+ pool_id=pool_id,
+ weight=1)
+ req = self.new_show_request('pools',
+ pool_id,
+ fmt='json')
+ pool_updated = self.deserialize('json',
+ req.get_response(self.ext_api))
+
+ member1 = self.deserialize('json', res1)
+ self.assertEqual(member1['member']['id'],
+ pool_updated['pool']['members'][0])
+ self.assertEqual(len(pool_updated['pool']['members']), 1)
+
+ keys = [('address', '192.168.1.100'),
+ ('port', 80),
+ ('weight', 1),
+ ('pool_id', pool_id),
+ ('admin_state_up', True),
+ ('status', 'PENDING_CREATE')]
+ for k, v in keys:
+ self.assertEqual(member1['member'][k], v)
+ self._delete('members', member1['member']['id'])
+
+ def test_delete_pool(self):
+ with self.pool(no_delete=True) as pool:
+ with self.member(no_delete=True,
+ pool_id=pool['pool']['id']) as member:
+ req = self.new_delete_request('pools',
+ pool['pool']['id'])
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 204)
+
+ def test_show_pool(self):
+ name = "pool1"
+ keys = [('name', name),
+ ('subnet_id', self._subnet_id),
+ ('tenant_id', self._tenant_id),
+ ('protocol', 'HTTP'),
+ ('lb_method', 'ROUND_ROBIN'),
+ ('admin_state_up', True),
+ ('status', 'PENDING_CREATE')]
+ with self.pool(name=name) as pool:
+ req = self.new_show_request('pools',
+ pool['pool']['id'],
+ fmt='json')
+ res = self.deserialize('json', req.get_response(self.ext_api))
+ for k, v in keys:
+ self.assertEqual(res['pool'][k], v)
+
+ def test_create_member(self):
+ with self.pool() as pool:
+ pool_id = pool['pool']['id']
+ with self.member(address='192.168.1.100',
+ port=80,
+ pool_id=pool_id) as member1:
+ with self.member(address='192.168.1.101',
+ port=80,
+ pool_id=pool_id) as member2:
+ req = self.new_show_request('pools',
+ pool_id,
+ fmt='json')
+ pool_update = self.deserialize(
+ 'json',
+ req.get_response(self.ext_api))
+ self.assertIn(member1['member']['id'],
+ pool_update['pool']['members'])
+ self.assertIn(member2['member']['id'],
+ pool_update['pool']['members'])
+
+ def test_update_member(self):
+ with self.pool(name="pool1") as pool1:
+ with self.pool(name="pool2") as pool2:
+ keys = [('address', "192.168.1.100"),
+ ('tenant_id', self._tenant_id),
+ ('port', 80),
+ ('weight', 10),
+ ('pool_id', pool2['pool']['id']),
+ ('admin_state_up', False),
+ ('status', 'PENDING_UPDATE')]
+ with self.member(pool_id=pool1['pool']['id']) as member:
+ req = self.new_show_request('pools',
+ pool1['pool']['id'],
+ fmt='json')
+ pool1_update = self.deserialize(
+ 'json',
+ req.get_response(self.ext_api))
+ self.assertEqual(len(pool1_update['pool']['members']), 1)
+
+ req = self.new_show_request('pools',
+ pool2['pool']['id'],
+ fmt='json')
+ pool2_update = self.deserialize(
+ 'json',
+ req.get_response(self.ext_api))
+ self.assertEqual(len(pool1_update['pool']['members']), 1)
+ self.assertEqual(len(pool2_update['pool']['members']), 0)
+
+ data = {'member': {'pool_id': pool2['pool']['id'],
+ 'weight': 10,
+ 'admin_state_up': False}}
+ req = self.new_update_request('members',
+ data,
+ member['member']['id'])
+ res = self.deserialize('json',
+ req.get_response(self.ext_api))
+ for k, v in keys:
+ self.assertEqual(res['member'][k], v)
+
+ req = self.new_show_request('pools',
+ pool1['pool']['id'],
+ fmt='json')
+ pool1_update = self.deserialize(
+ 'json',
+ req.get_response(self.ext_api))
+
+ req = self.new_show_request('pools',
+ pool2['pool']['id'],
+ fmt='json')
+ pool2_update = self.deserialize(
+ 'json',
+ req.get_response(self.ext_api))
+
+ self.assertEqual(len(pool2_update['pool']['members']), 1)
+ self.assertEqual(len(pool1_update['pool']['members']), 0)
+
+ def test_delete_member(self):
+ with self.pool() as pool:
+ pool_id = pool['pool']['id']
+ with self.member(pool_id=pool_id,
+ no_delete=True) as member:
+ req = self.new_delete_request('members',
+ member['member']['id'])
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 204)
+
+ req = self.new_show_request('pools',
+ pool_id,
+ fmt='json')
+ pool_update = self.deserialize(
+ 'json',
+ req.get_response(self.ext_api))
+ self.assertEqual(len(pool_update['pool']['members']), 0)
+
+ def test_show_member(self):
+ with self.pool() as pool:
+ keys = [('address', "192.168.1.100"),
+ ('tenant_id', self._tenant_id),
+ ('port', 80),
+ ('weight', 1),
+ ('pool_id', pool['pool']['id']),
+ ('admin_state_up', True),
+ ('status', 'PENDING_CREATE')]
+ with self.member(pool_id=pool['pool']['id']) as member:
+ req = self.new_show_request('members',
+ member['member']['id'],
+ fmt='json')
+ res = self.deserialize('json', req.get_response(self.ext_api))
+ for k, v in keys:
+ self.assertEqual(res['member'][k], v)
+
+ def test_create_healthmonitor(self):
+ keys = [('type', "TCP"),
+ ('tenant_id', self._tenant_id),
+ ('delay', 30),
+ ('timeout', 10),
+ ('max_retries', 3),
+ ('admin_state_up', True),
+ ('status', 'PENDING_CREATE')]
+ with self.health_monitor() as monitor:
+ for k, v in keys:
+ self.assertEqual(monitor['health_monitor'][k], v)
+
+ def test_update_healthmonitor(self):
+ keys = [('type', "TCP"),
+ ('tenant_id', self._tenant_id),
+ ('delay', 20),
+ ('timeout', 20),
+ ('max_retries', 2),
+ ('admin_state_up', False),
+ ('status', 'PENDING_UPDATE')]
+ with self.health_monitor() as monitor:
+ data = {'health_monitor': {'delay': 20,
+ 'timeout': 20,
+ 'max_retries': 2,
+ 'admin_state_up': False}}
+ req = self.new_update_request("health_monitors",
+ data,
+ monitor['health_monitor']['id'])
+ res = self.deserialize('json', req.get_response(self.ext_api))
+ for k, v in keys:
+ self.assertEqual(res['health_monitor'][k], v)
+
+ def test_delete_healthmonitor(self):
+ with self.health_monitor(no_delete=True) as monitor:
+ req = self.new_delete_request('health_monitors',
+ monitor['health_monitor']['id'])
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 204)
+
+ def test_show_healthmonitor(self):
+ with self.health_monitor() as monitor:
+ keys = [('type', "TCP"),
+ ('tenant_id', self._tenant_id),
+ ('delay', 30),
+ ('timeout', 10),
+ ('max_retries', 3),
+ ('admin_state_up', True),
+ ('status', 'PENDING_CREATE')]
+ req = self.new_show_request('health_monitors',
+ monitor['health_monitor']['id'],
+ fmt='json')
+ res = self.deserialize('json', req.get_response(self.ext_api))
+ for k, v in keys:
+ self.assertEqual(res['health_monitor'][k], v)
+
+ def test_get_pool_stats(self):
+ keys = [("bytes_in", 0),
+ ("bytes_out", 0),
+ ("active_connections", 0),
+ ("total_connections", 0)]
+ with self.pool() as pool:
+ req = self.new_show_request("pools",
+ pool['pool']['id'],
+ subresource="stats",
+ fmt='json')
+ res = self.deserialize('json', req.get_response(self.ext_api))
+ for k, v in keys:
+ self.assertEqual(res['stats'][k], v)
+
+ def test_create_healthmonitor_of_pool(self):
+ with self.health_monitor(type="TCP") as monitor1:
+ with self.health_monitor(type="HTTP") as monitor2:
+ with self.pool() as pool:
+ data = {"health_monitor": {
+ "id": monitor1['health_monitor']['id'],
+ 'tenant_id': self._tenant_id}}
+ req = self.new_create_request(
+ "pools",
+ data,
+ fmt='json',
+ id=pool['pool']['id'],
+ subresource="health_monitors")
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 201)
+
+ data = {"health_monitor": {
+ "id": monitor2['health_monitor']['id'],
+ 'tenant_id': self._tenant_id}}
+ req = self.new_create_request(
+ "pools",
+ data,
+ fmt='json',
+ id=pool['pool']['id'],
+ subresource="health_monitors")
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 201)
+
+ req = self.new_show_request(
+ 'pools',
+ pool['pool']['id'],
+ fmt='json')
+ res = self.deserialize('json',
+ req.get_response(self.ext_api))
+ self.assertIn(monitor1['health_monitor']['id'],
+ res['pool']['health_monitors'])
+ self.assertIn(monitor2['health_monitor']['id'],
+ res['pool']['health_monitors'])
+
+ def test_delete_healthmonitor_of_pool(self):
+ with self.health_monitor(type="TCP") as monitor1:
+ with self.health_monitor(type="HTTP") as monitor2:
+ with self.pool() as pool:
+ # add the monitors to the pool
+ data = {"health_monitor": {
+ "id": monitor1['health_monitor']['id'],
+ 'tenant_id': self._tenant_id}}
+ req = self.new_create_request(
+ "pools",
+ data,
+ fmt='json',
+ id=pool['pool']['id'],
+ subresource="health_monitors")
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 201)
+
+ data = {"health_monitor": {
+ "id": monitor2['health_monitor']['id'],
+ 'tenant_id': self._tenant_id}}
+ req = self.new_create_request(
+ "pools",
+ data,
+ fmt='json',
+ id=pool['pool']['id'],
+ subresource="health_monitors")
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 201)
+
+ # remove one of healthmonitor from the pool
+ req = self.new_delete_request(
+ "pools",
+ fmt='json',
+ id=pool['pool']['id'],
+ sub_id=monitor1['health_monitor']['id'],
+ subresource="health_monitors")
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 204)
+
+ req = self.new_show_request(
+ 'pools',
+ pool['pool']['id'],
+ fmt='json')
+ res = self.deserialize('json',
+ req.get_response(self.ext_api))
+ self.assertNotIn(monitor1['health_monitor']['id'],
+ res['pool']['health_monitors'])
+ self.assertIn(monitor2['health_monitor']['id'],
+ res['pool']['health_monitors'])
+
+ def test_create_loadbalancer(self):
+ vip_name = "vip3"
+ pool_name = "pool3"
+
+ with self.pool(name=pool_name) as pool:
+ with self.vip(name=vip_name, pool=pool) as vip:
+ pool_id = pool['pool']['id']
+ vip_id = vip['vip']['id']
+ # Add two members
+ res1 = self._create_member('json',
+ '192.168.1.100',
+ '80',
+ True,
+ pool_id=pool_id,
+ weight=1)
+ res2 = self._create_member('json',
+ '192.168.1.101',
+ '80',
+ True,
+ pool_id=pool_id,
+ weight=2)
+ # Add a health_monitor
+ req = self._create_health_monitor('json',
+ 'HTTP',
+ '10',
+ '10',
+ '3',
+ True)
+ health_monitor = self.deserialize('json', req)
+ self.assertEqual(req.status_int, 201)
+
+ # Associate the health_monitor to the pool
+ data = {"health_monitor": {
+ "id": health_monitor['health_monitor']['id'],
+ 'tenant_id': self._tenant_id}}
+ req = self.new_create_request("pools",
+ data,
+ fmt='json',
+ id=pool['pool']['id'],
+ subresource="health_monitors")
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 201)
+
+ # Get pool and vip
+ req = self.new_show_request('pools',
+ pool_id,
+ fmt='json')
+ pool_updated = self.deserialize('json',
+ req.get_response(self.ext_api))
+ member1 = self.deserialize('json', res1)
+ member2 = self.deserialize('json', res2)
+ self.assertIn(member1['member']['id'],
+ pool_updated['pool']['members'])
+ self.assertIn(member2['member']['id'],
+ pool_updated['pool']['members'])
+ self.assertIn(health_monitor['health_monitor']['id'],
+ pool_updated['pool']['health_monitors'])
+
+ req = self.new_show_request('vips',
+ vip_id,
+ fmt='json')
+ vip_updated = self.deserialize('json',
+ req.get_response(self.ext_api))
+ self.assertEqual(vip_updated['vip']['pool_id'],
+ pool_updated['pool']['id'])
+
+ # clean up
+ self._delete('health_monitors',
+ health_monitor['health_monitor']['id'])
+ self._delete('members', member1['member']['id'])
+ self._delete('members', member2['member']['id'])