from quantum.db import models_v2
from quantum.extensions import loadbalancer
from quantum.extensions.loadbalancer import LoadBalancerPluginBase
+from quantum import manager
from quantum.openstack.common import log as logging
from quantum.openstack.common import uuidutils
from quantum.plugins.common import constants
"""Represents a v2 quantum loadbalancer vip."""
name = sa.Column(sa.String(255))
description = sa.Column(sa.String(255))
- subnet_id = sa.Column(sa.String(36), nullable=False)
- address = sa.Column(sa.String(64))
- port = sa.Column(sa.Integer, nullable=False)
+ port_id = sa.Column(sa.String(36), sa.ForeignKey('ports.id'))
+ protocol_port = sa.Column(sa.Integer, nullable=False)
protocol = sa.Column(sa.Enum("HTTP", "HTTPS", "TCP", name="lb_protocols"),
nullable=False)
pool_id = sa.Column(sa.String(36), nullable=False)
status = sa.Column(sa.String(16), nullable=False)
admin_state_up = sa.Column(sa.Boolean(), nullable=False)
connection_limit = sa.Column(sa.Integer)
+ port = orm.relationship(models_v2.Port)
class Member(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
pool_id = sa.Column(sa.String(36), sa.ForeignKey("pools.id"),
nullable=False)
address = sa.Column(sa.String(64), nullable=False)
- port = sa.Column(sa.Integer, nullable=False)
+ protocol_port = sa.Column(sa.Integer, nullable=False)
weight = sa.Column(sa.Integer, nullable=False)
status = sa.Column(sa.String(16), nullable=False)
admin_state_up = sa.Column(sa.Boolean(), nullable=False)
loadbalancer plugin database access interface using SQLAlchemy models.
"""
+ @property
+ def _core_plugin(self):
+ return manager.QuantumManager.get_plugin()
+
# TODO(lcui):
# A set of internal facility methods are borrowed from QuantumDbPluginV2
# class and hence this is duplicate. We need to pull out those methods
########################################################
# VIP DB access
def _make_vip_dict(self, vip, fields=None):
+ fixed_ip = (vip.port.fixed_ips or [{}])[0]
+
res = {'id': vip['id'],
'tenant_id': vip['tenant_id'],
'name': vip['name'],
'description': vip['description'],
- 'subnet_id': vip['subnet_id'],
- 'address': vip['address'],
- 'port': vip['port'],
+ 'subnet_id': fixed_ip.get('subnet_id'),
+ 'address': fixed_ip.get('ip_address'),
+ 'port_id': vip['port_id'],
+ 'protocol_port': vip['protocol_port'],
'protocol': vip['protocol'],
'pool_id': vip['pool_id'],
'connection_limit': vip['connection_limit'],
'admin_state_up': vip['admin_state_up'],
'status': vip['status']}
+
if vip['session_persistence']:
s_p = {
'type': vip['session_persistence']['type']
sess_qry = context.session.query(SessionPersistence)
sess_qry.filter_by(vip_id=vip_id).delete()
+ def _create_port_for_vip(self, context, vip_db, subnet_id, ip_address):
+ # resolve subnet and create port
+ subnet = self._core_plugin.get_subnet(context, subnet_id)
+ fixed_ip = {'subnet_id': subnet['id']}
+ if ip_address and ip_address != attributes.ATTR_NOT_SPECIFIED:
+ fixed_ip['ip_address'] = ip_address
+
+ port_data = {
+ 'tenant_id': vip_db.tenant_id,
+ 'name': 'vip-' + vip_db.id,
+ 'network_id': subnet['network_id'],
+ 'mac_address': attributes.ATTR_NOT_SPECIFIED,
+ 'admin_state_up': False,
+ 'device_id': '',
+ 'device_owner': '',
+ 'fixed_ips': [fixed_ip]
+ }
+
+ port = self._core_plugin.create_port(context, {'port': port_data})
+ vip_db.port_id = port['id']
+
def create_vip(self, context, vip):
v = vip['vip']
tenant_id = self._get_tenant_id_for_create(context, v)
with context.session.begin(subtransactions=True):
- if v['address'] is attributes.ATTR_NOT_SPECIFIED:
- address = None
- else:
- address = v['address']
vip_db = Vip(id=uuidutils.generate_uuid(),
tenant_id=tenant_id,
name=v['name'],
description=v['description'],
- subnet_id=v['subnet_id'],
- address=address,
- port=v['port'],
+ port_id=None,
+ protocol_port=v['protocol_port'],
protocol=v['protocol'],
pool_id=v['pool_id'],
connection_limit=v['connection_limit'],
vip_db.session_persistence = s_p
context.session.add(vip_db)
- self._update_pool_vip_info(context, v['pool_id'], vip_id)
+ context.session.flush()
- vip_db = self._get_resource(context, Vip, vip_id)
+ self._create_port_for_vip(
+ context,
+ vip_db,
+ v['subnet_id'],
+ v.get('address')
+ )
+
+ self._update_pool_vip_info(context, v['pool_id'], vip_id)
return self._make_vip_dict(vip_db)
def update_vip(self, context, id, vip):
qry = context.session.query(Pool)
for pool in qry.filter_by(vip_id=id).all():
pool.update({"vip_id": None})
+
context.session.delete(vip)
+ if vip.port: # this is a Quantum port
+ self._core_plugin.delete_port(context, vip.port.id)
+ context.session.flush()
def get_vip(self, context, id, fields=None):
vip = self._get_resource(context, Vip, id)
'tenant_id': member['tenant_id'],
'pool_id': member['pool_id'],
'address': member['address'],
- 'port': member['port'],
+ 'protocol_port': member['protocol_port'],
'weight': member['weight'],
'admin_state_up': member['admin_state_up'],
'status': member['status']}
tenant_id=tenant_id,
pool_id=v['pool_id'],
address=v['address'],
- port=v['port'],
+ protocol_port=v['protocol_port'],
weight=v['weight'],
admin_state_up=v['admin_state_up'],
status=constants.PENDING_CREATE)
import contextlib
import logging
+import mock
import os
+import testtools
from oslo.config import cfg
import webob.exc
from quantum.api.v2 import attributes
from quantum.api.v2.router import APIRouter
from quantum.common import config
+from quantum.common import exceptions as q_exc
from quantum.common.test_lib import test_config
from quantum.db import api as db
import quantum.extensions
app = config.load_paste_app('extensions_test_app')
self.ext_api = ExtensionMiddleware(app, ext_mgr=ext_mgr)
- def _create_vip(self, fmt, name, pool_id, protocol, port, admin_state_up,
- expected_res_status=None, **kwargs):
+ def _create_vip(self, fmt, name, pool_id, protocol, protocol_port,
+ admin_state_up, expected_res_status=None, **kwargs):
data = {'vip': {'name': name,
- 'subnet_id': self._subnet_id,
'pool_id': pool_id,
'protocol': protocol,
- 'port': port,
+ 'protocol_port': protocol_port,
'admin_state_up': admin_state_up,
'tenant_id': self._tenant_id}}
- for arg in ('description', 'address',
+ for arg in ('description', 'subnet_id', 'address',
'session_persistence', 'connection_limit'):
if arg in kwargs and kwargs[arg] is not None:
data['vip'][arg] = kwargs[arg]
return pool_res
- def _create_member(self, fmt, address, port, admin_state_up,
+ def _create_member(self, fmt, address, protocol_port, admin_state_up,
expected_res_status=None, **kwargs):
data = {'member': {'address': address,
- 'port': port,
+ 'protocol_port': protocol_port,
'admin_state_up': admin_state_up,
'tenant_id': self._tenant_id}}
for arg in ('weight', 'pool_id'):
return self.ext_api
@contextlib.contextmanager
- def vip(self, fmt=None, name='vip1', pool=None,
- protocol='HTTP', port=80, admin_state_up=True, no_delete=False,
- address="172.16.1.123", **kwargs):
+ def vip(self, fmt=None, name='vip1', pool=None, subnet=None,
+ protocol='HTTP', protocol_port=80, admin_state_up=True,
+ no_delete=False, **kwargs):
if not fmt:
fmt = self.fmt
- if not pool:
- with self.pool() as pool:
- pool_id = pool['pool']['id']
+
+ with test_db_plugin.optional_ctx(subnet, self.subnet) as tmp_subnet:
+ with test_db_plugin.optional_ctx(pool, self.pool) as tmp_pool:
+ pool_id = tmp_pool['pool']['id']
res = self._create_vip(fmt,
name,
pool_id,
protocol,
- port,
+ protocol_port,
admin_state_up,
- address=address,
+ subnet_id=tmp_subnet['subnet']['id'],
**kwargs)
vip = self.deserialize(fmt or self.fmt, res)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
- yield vip
- if not no_delete:
- self._delete('vips', vip['vip']['id'])
- else:
- pool_id = pool['pool']['id']
- res = self._create_vip(fmt,
- name,
- pool_id,
- protocol,
- port,
- admin_state_up,
- address=address,
- **kwargs)
- vip = self.deserialize(fmt or self.fmt, res)
- if res.status_int >= 400:
- raise webob.exc.HTTPClientError(code=res.status_int)
- yield vip
- if not no_delete:
- self._delete('vips', vip['vip']['id'])
+ try:
+ yield vip
+ finally:
+ if not no_delete:
+ self._delete('vips', vip['vip']['id'])
@contextlib.contextmanager
def pool(self, fmt=None, name='pool1', lb_method='ROUND_ROBIN',
pool = self.deserialize(fmt or self.fmt, res)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
- yield pool
- if not no_delete:
- self._delete('pools', pool['pool']['id'])
+ try:
+ yield pool
+ finally:
+ if not no_delete:
+ self._delete('pools', pool['pool']['id'])
@contextlib.contextmanager
- def member(self, fmt=None, address='192.168.1.100',
- port=80, admin_state_up=True, no_delete=False,
- **kwargs):
+ def member(self, fmt=None, address='192.168.1.100', protocol_port=80,
+ admin_state_up=True, no_delete=False, **kwargs):
if not fmt:
fmt = self.fmt
res = self._create_member(fmt,
address,
- port,
+ protocol_port,
admin_state_up,
**kwargs)
member = self.deserialize(fmt or self.fmt, res)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
- yield member
- if not no_delete:
- self._delete('members', member['member']['id'])
+ try:
+ yield member
+ finally:
+ if not no_delete:
+ self._delete('members', member['member']['id'])
@contextlib.contextmanager
def health_monitor(self, fmt=None, type='TCP',
else:
for arg in http_related_attributes:
self.assertIsNone(the_health_monitor.get(arg))
- yield health_monitor
- if not no_delete:
- self._delete('health_monitors', the_health_monitor['id'])
+ try:
+ yield health_monitor
+ finally:
+ if not no_delete:
+ self._delete('health_monitors', the_health_monitor['id'])
class TestLoadBalancer(LoadBalancerPluginDbTestCase):
- def test_create_vip(self):
- name = 'vip1'
- keys = [('name', name),
- ('subnet_id', self._subnet_id),
- ('address', "172.16.1.123"),
- ('port', 80),
- ('protocol', 'HTTP'),
- ('connection_limit', -1),
- ('admin_state_up', True),
- ('status', 'PENDING_CREATE')]
-
- with self.vip(name=name) as vip:
- for k, v in keys:
- self.assertEqual(vip['vip'][k], v)
+ def test_create_vip(self, **extras):
+ expected = {
+ 'name': 'vip1',
+ 'description': '',
+ 'protocol_port': 80,
+ 'protocol': 'HTTP',
+ 'connection_limit': -1,
+ 'admin_state_up': True,
+ 'status': 'PENDING_CREATE',
+ 'tenant_id': self._tenant_id,
+ }
+
+ expected.update(extras)
+
+ with self.subnet() as subnet:
+ expected['subnet_id'] = subnet['subnet']['id']
+ name = expected['name']
+
+ with self.vip(name=name, subnet=subnet, **extras) as vip:
+ for k in ('id', 'address', 'port_id', 'pool_id'):
+ self.assertTrue(vip['vip'].get(k, None))
+
+ self.assertEqual(
+ dict((k, v)
+ for k, v in vip['vip'].items() if k in expected),
+ expected
+ )
+ return vip
def test_create_vip_with_invalid_values(self):
- name = 'vip3'
-
- vip = self.vip(name=name, protocol='UNSUPPORTED')
- self.assertRaises(webob.exc.HTTPClientError, vip.__enter__)
-
- vip = self.vip(name=name, port='NOT_AN_INT')
- self.assertRaises(webob.exc.HTTPClientError, vip.__enter__)
-
- # 100500 is not a valid port number
- vip = self.vip(name=name, port='100500')
- self.assertRaises(webob.exc.HTTPClientError, vip.__enter__)
-
- # 192.168.130.130.130 is not a valid IP address
- vip = self.vip(name=name, address='192.168.130.130.130')
- self.assertRaises(webob.exc.HTTPClientError, vip.__enter__)
+ invalid = {
+ 'protocol': 'UNSUPPORTED',
+ 'protocol_port': 'NOT_AN_INT',
+ 'protocol_port': 1000500,
+ 'subnet': {'subnet': {'id': 'invalid-subnet'}}
+ }
+
+ for param, value in invalid.items():
+ kwargs = {'name': 'the-vip', param: value}
+ with testtools.ExpectedException(webob.exc.HTTPClientError):
+ with self.vip(**kwargs):
+ pass
+
+ def test_create_vip_with_address(self):
+ self.test_create_vip(address='10.0.0.7')
+
+ def test_create_vip_with_address_outside_subnet(self):
+ with testtools.ExpectedException(webob.exc.HTTPClientError):
+ self.test_create_vip(address='9.9.9.9')
def test_create_vip_with_session_persistence(self):
- name = 'vip2'
- keys = [('name', name),
- ('subnet_id', self._subnet_id),
- ('address', "172.16.1.123"),
- ('port', 80),
- ('protocol', 'HTTP'),
- ('session_persistence', {'type': "HTTP_COOKIE"}),
- ('connection_limit', -1),
- ('admin_state_up', True),
- ('status', 'PENDING_CREATE')]
-
- with self.vip(name=name,
- session_persistence={'type': "HTTP_COOKIE"}) as vip:
- for k, v in keys:
- self.assertEqual(vip['vip'][k], v)
+ self.test_create_vip(session_persistence={'type': 'HTTP_COOKIE'})
def test_create_vip_with_session_persistence_with_app_cookie(self):
- name = 'vip7'
- keys = [('name', name),
- ('subnet_id', self._subnet_id),
- ('address', "172.16.1.123"),
- ('port', 80),
- ('protocol', 'HTTP'),
- ('session_persistence', {'type': "APP_COOKIE",
- 'cookie_name': 'sessionId'}),
- ('connection_limit', -1),
- ('admin_state_up', True),
- ('status', 'PENDING_CREATE')]
-
- with self.vip(name=name,
- session_persistence={'type': "APP_COOKIE",
- 'cookie_name': 'sessionId'}) as vip:
- for k, v in keys:
- self.assertEqual(vip['vip'][k], v)
+ sp = {'type': 'APP_COOKIE', 'cookie_name': 'sessionId'}
+ self.test_create_vip(session_persistence=sp)
def test_create_vip_with_session_persistence_unsupported_type(self):
- name = 'vip5'
-
- vip = self.vip(name=name, session_persistence={'type': "UNSUPPORTED"})
- self.assertRaises(webob.exc.HTTPClientError, vip.__enter__)
+ with testtools.ExpectedException(webob.exc.HTTPClientError):
+ self.test_create_vip(session_persistence={'type': 'UNSUPPORTED'})
def test_create_vip_with_unnecessary_cookie_name(self):
- name = 'vip8'
-
- s_p = {'type': "SOURCE_IP", 'cookie_name': 'sessionId'}
- vip = self.vip(name=name, session_persistence=s_p)
-
- self.assertRaises(webob.exc.HTTPClientError, vip.__enter__)
+ sp = {'type': "SOURCE_IP", 'cookie_name': 'sessionId'}
+ with testtools.ExpectedException(webob.exc.HTTPClientError):
+ self.test_create_vip(session_persistence=sp)
def test_create_vip_with_session_persistence_without_cookie_name(self):
- name = 'vip6'
-
- vip = self.vip(name=name, session_persistence={'type': "APP_COOKIE"})
- self.assertRaises(webob.exc.HTTPClientError, vip.__enter__)
+ sp = {'type': "APP_COOKIE"}
+ with testtools.ExpectedException(webob.exc.HTTPClientError):
+ self.test_create_vip(session_persistence=sp)
def test_reset_session_persistence(self):
name = 'vip4'
def test_update_vip(self):
name = 'new_vip'
keys = [('name', name),
- ('subnet_id', self._subnet_id),
- ('address', "172.16.1.123"),
- ('port', 80),
+ ('address', "10.0.0.2"),
+ ('protocol_port', 80),
('connection_limit', 100),
('admin_state_up', False),
('status', 'PENDING_UPDATE')]
with self.vip(name=name) as vip:
+ keys.append(('subnet_id', vip['vip']['subnet_id']))
data = {'vip': {'name': name,
'connection_limit': 100,
'session_persistence':
def test_show_vip(self):
name = "vip_show"
keys = [('name', name),
- ('subnet_id', self._subnet_id),
- ('address', "172.16.1.123"),
- ('port', 80),
+ ('address', "10.0.0.10"),
+ ('protocol_port', 80),
('protocol', 'HTTP'),
('connection_limit', -1),
('admin_state_up', True),
('status', 'PENDING_CREATE')]
- with self.vip(name=name) as vip:
+ with self.vip(name=name, address='10.0.0.10') as vip:
req = self.new_show_request('vips',
vip['vip']['id'])
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
def test_list_vips(self):
name = "vips_list"
keys = [('name', name),
- ('subnet_id', self._subnet_id),
- ('address', "172.16.1.123"),
- ('port', 80),
+ ('address', "10.0.0.2"),
+ ('protocol_port', 80),
('protocol', 'HTTP'),
('connection_limit', -1),
('admin_state_up', True),
('status', 'PENDING_CREATE')]
- with self.vip(name=name):
+ with self.vip(name=name) as vip:
+ keys.append(('subnet_id', vip['vip']['subnet_id']))
req = self.new_list_request('vips')
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
+ self.assertEqual(len(res), 1)
for k, v in keys:
self.assertEqual(res['vips'][0][k], v)
def test_list_vips_with_sort_emulated(self):
- with contextlib.nested(self.vip(name='vip1', port=81),
- self.vip(name='vip2', port=82),
- self.vip(name='vip3', port=82)
- ) as (vip1, vip2, vip3):
- self._test_list_with_sort('vip', (vip1, vip3, vip2),
- [('port', 'asc'), ('name', 'desc')])
+ with self.subnet() as subnet:
+ with contextlib.nested(
+ self.vip(name='vip1', subnet=subnet, protocol_port=81),
+ self.vip(name='vip2', subnet=subnet, protocol_port=82),
+ self.vip(name='vip3', subnet=subnet, protocol_port=82)
+ ) as (vip1, vip2, vip3):
+ self._test_list_with_sort(
+ 'vip',
+ (vip1, vip3, vip2),
+ [('protocol_port', 'asc'), ('name', 'desc')]
+ )
def test_list_vips_with_pagination_emulated(self):
- with contextlib.nested(self.vip(name='vip1'),
- self.vip(name='vip2'),
- self.vip(name='vip3')
- ) as (vip1, vip2, vip3):
- self._test_list_with_pagination('vip',
- (vip1, vip2, vip3),
- ('name', 'asc'), 2, 2)
+ with self.subnet() as subnet:
+ with contextlib.nested(self.vip(name='vip1', subnet=subnet),
+ self.vip(name='vip2', subnet=subnet),
+ self.vip(name='vip3', subnet=subnet)
+ ) as (vip1, vip2, vip3):
+ self._test_list_with_pagination('vip',
+ (vip1, vip2, vip3),
+ ('name', 'asc'), 2, 2)
def test_list_vips_with_pagination_reverse_emulated(self):
- with contextlib.nested(self.vip(name='vip1'),
- self.vip(name='vip2'),
- self.vip(name='vip3')
- ) as (vip1, vip2, vip3):
- self._test_list_with_pagination_reverse('vip',
- (vip1, vip2, vip3),
- ('name', 'asc'), 2, 2)
+ with self.subnet() as subnet:
+ with contextlib.nested(self.vip(name='vip1', subnet=subnet),
+ self.vip(name='vip2', subnet=subnet),
+ self.vip(name='vip3', subnet=subnet)
+ ) as (vip1, vip2, vip3):
+ self._test_list_with_pagination_reverse('vip',
+ (vip1, vip2, vip3),
+ ('name', 'asc'), 2, 2)
def test_create_pool_with_invalid_values(self):
name = 'pool3'
self.assertEqual(len(pool_updated['pool']['members']), 1)
keys = [('address', '192.168.1.100'),
- ('port', 80),
+ ('protocol_port', 80),
('weight', 1),
('pool_id', pool_id),
('admin_state_up', True),
with self.pool() as pool:
pool_id = pool['pool']['id']
with self.member(address='192.168.1.100',
- port=80,
+ protocol_port=80,
pool_id=pool_id) as member1:
with self.member(address='192.168.1.101',
- port=80,
+ protocol_port=80,
pool_id=pool_id) as member2:
req = self.new_show_request('pools',
pool_id,
with self.pool(name="pool2") as pool2:
keys = [('address', "192.168.1.100"),
('tenant_id', self._tenant_id),
- ('port', 80),
+ ('protocol_port', 80),
('weight', 10),
('pool_id', pool2['pool']['id']),
('admin_state_up', False),
with self.pool() as pool:
keys = [('address', "192.168.1.100"),
('tenant_id', self._tenant_id),
- ('port', 80),
+ ('protocol_port', 80),
('weight', 1),
('pool_id', pool['pool']['id']),
('admin_state_up', True),
def test_list_members_with_sort_emulated(self):
with self.pool() as pool:
with contextlib.nested(self.member(pool_id=pool['pool']['id'],
- port=81),
+ protocol_port=81),
self.member(pool_id=pool['pool']['id'],
- port=82),
+ protocol_port=82),
self.member(pool_id=pool['pool']['id'],
- port=83)
+ protocol_port=83)
) as (m1, m2, m3):
self._test_list_with_sort('member', (m3, m2, m1),
- [('port', 'desc')])
+ [('protocol_port', 'desc')])
def test_list_members_with_pagination_emulated(self):
with self.pool() as pool:
with contextlib.nested(self.member(pool_id=pool['pool']['id'],
- port=81),
+ protocol_port=81),
self.member(pool_id=pool['pool']['id'],
- port=82),
+ protocol_port=82),
self.member(pool_id=pool['pool']['id'],
- port=83)
+ protocol_port=83)
) as (m1, m2, m3):
- self._test_list_with_pagination('member',
- (m1, m2, m3),
- ('port', 'asc'), 2, 2)
+ self._test_list_with_pagination(
+ 'member', (m1, m2, m3), ('protocol_port', 'asc'), 2, 2
+ )
def test_list_members_with_pagination_reverse_emulated(self):
with self.pool() as pool:
with contextlib.nested(self.member(pool_id=pool['pool']['id'],
- port=81),
+ protocol_port=81),
self.member(pool_id=pool['pool']['id'],
- port=82),
+ protocol_port=82),
self.member(pool_id=pool['pool']['id'],
- port=83)
+ protocol_port=83)
) as (m1, m2, m3):
- self._test_list_with_pagination_reverse('member',
- (m1, m2, m3),
- ('port', 'asc'), 2, 2)
+ self._test_list_with_pagination_reverse(
+ 'member', (m1, m2, m3), ('protocol_port', 'asc'), 2, 2
+ )
def test_create_healthmonitor(self):
keys = [('type', "TCP"),
import quantum
from quantum.api import extensions
+from quantum.api.v2 import attributes
from quantum.api.v2 import router
from quantum.common import config
+from quantum import context as q_context
+from quantum.db import api as db
from quantum.db import db_base_plugin_v2
from quantum.db import l3_db
from quantum.db.loadbalancer import loadbalancer_db as lb_db
# Ensure 'stale' patched copies of the plugin are never returned
quantum.manager.QuantumManager._instance = None
+
+ # Ensure the database is reset between tests
+ db._ENGINE = None
+ db._MAKER = None
# Ensure existing ExtensionManager is not used
ext_mgr = extensions.PluginAwareExtensionManager(
res = self._do_request('GET', _get_path('service-types'))
self._service_type_id = res['service_types'][0]['id']
+ self._setup_core_resources()
+
+ # FIXME (markmcclain): The test setup makes it difficult to add core
+ # via the api. In the interim we'll create directly using the plugin with
+ # the side effect of polluting the fixture database until tearDown.
+
+ def _setup_core_resources(self):
+ core_plugin = quantum.manager.QuantumManager.get_plugin()
+
+ self._network = core_plugin.create_network(
+ q_context.get_admin_context(),
+ {
+ 'network':
+ {
+ 'tenant_id': self._tenant_id,
+ 'name': 'test net',
+ 'admin_state_up': True,
+ 'shared': False,
+ }
+ }
+ )
+
+ self._subnet = core_plugin.create_subnet(
+ q_context.get_admin_context(),
+ {
+ 'subnet':
+ {
+ 'network_id': self._network['id'],
+ 'name': 'test subnet',
+ 'cidr': '192.168.1.0/24',
+ 'ip_version': 4,
+ 'gateway_ip': '192.168.1.1',
+ 'allocation_pools': attributes.ATTR_NOT_SPECIFIED,
+ 'dns_nameservers': attributes.ATTR_NOT_SPECIFIED,
+ 'host_routes': attributes.ATTR_NOT_SPECIFIED,
+ 'enable_dhcp': True,
+ }
+ }
+ )
+
+ self._subnet_id = self._subnet['id']
+
def _do_request(self, method, path, data=None, params=None, action=None):
content_type = 'application/json'
body = None
'DELETE', _get_path('routers/{0}'.format(router['id'])))
def _test_lb_setup(self):
- self._subnet_id = _uuid()
router = self._router_create(self._service_type_id)
self._router_id = router['id']
"tenant_id": self._tenant_id,
"name": "test",
"protocol": "HTTP",
- "port": 80,
+ "protocol_port": 80,
"subnet_id": self._subnet_id,
"pool_id": self._pool_id,
- "address": "192.168.1.101",
+ "address": "192.168.1.102",
"connection_limit": 100,
"admin_state_up": True,
"router_id": router_id
def _test_resource_create(self, res):
getattr(self, "_test_{0}_setup".format(res))()
- obj = getattr(self, "_{0}_create".format(res))()
obj = getattr(self, "_{0}_create".format(res))(self._router_id)
self.assertEqual(obj['router_id'], self._router_id)
_get_path('lb/{0}s/{1}'.format(res, obj['id'])))
self.assertEqual(updated[res][update_attr], update_value)
- def _test_resource_delete(self, res):
+ def _test_resource_delete(self, res, with_router_id):
getattr(self, "_test_{0}_setup".format(res))()
- obj = getattr(self, "_{0}_create".format(res))()
- self._do_request(
- 'DELETE', _get_path('lb/{0}s/{1}'.format(res, obj['id'])))
- obj = getattr(self, "_{0}_create".format(res))(self._router_id)
+
+ func = getattr(self, "_{0}_create".format(res))
+
+ if with_router_id:
+ obj = func(self._router_id)
+ else:
+ obj = func()
self._do_request(
'DELETE', _get_path('lb/{0}s/{1}'.format(res, obj['id'])))
def test_pool_update_without_router_id(self):
self._test_resource_update('pool', False, 'name', _uuid())
- def test_pool_delete(self):
- self._test_resource_delete('pool')
+ def test_pool_delete_with_router_id(self):
+ self._test_resource_delete('pool', True)
+
+ def test_pool_delete_without_router_id(self):
+ self._test_resource_delete('pool', False)
def test_health_monitor_create(self):
self._test_resource_create('health_monitor')
def test_health_monitor_update_without_router_id(self):
self._test_resource_update('health_monitor', False, 'timeout', 2)
- def test_health_monitor_delete(self):
- self._test_resource_delete('health_monitor')
+ def test_health_monitor_delete_with_router_id(self):
+ self._test_resource_delete('health_monitor', True)
+
+ def test_health_monitor_delete_without_router_id(self):
+ self._test_resource_delete('health_monitor', False)
def test_vip_create(self):
self._test_resource_create('vip')
def test_vip_update_without_router_id(self):
self._test_resource_update('vip', False, 'name', _uuid())
- def test_vip_delete(self):
- self._test_resource_delete('vip')
+ def test_vip_delete_with_router_id(self):
+ self._test_resource_delete('vip', True)
+
+ def test_vip_delete_without_router_id(self):
+ self._test_resource_delete('vip', False)