from quantum.manager import QuantumManager
from quantum.plugins.common import constants
from quantum.plugins.services.loadbalancer import loadbalancerPlugin
+from quantum.tests.unit import test_db_plugin
+from quantum.tests.unit import test_extensions
from quantum.tests.unit import testlib_api
from quantum.tests.unit.testlib_api import create_request
from quantum import wsgi
return os.path.join(ETCDIR, *p)
-class LoadBalancerPluginDbTestCase(testlib_api.WebTestCase):
+class LoadBalancerPluginDbTestCase(test_db_plugin.QuantumDbPluginV2TestCase):
+ resource_prefix_map = dict(
+ (k, constants.COMMON_PREFIXES[constants.LOADBALANCER])
+ for k in loadbalancer.RESOURCE_ATTRIBUTE_MAP.keys()
+ )
def setUp(self, core_plugin=None, lb_plugin=None):
- super(LoadBalancerPluginDbTestCase, self).setUp()
+ service_plugins = {'lb_plugin_name': DB_LB_PLUGIN_KLASS}
- db._ENGINE = None
- db._MAKER = None
+ super(LoadBalancerPluginDbTestCase, self).setUp(
+ service_plugins=service_plugins
+ )
- QuantumManager._instance = None
- PluginAwareExtensionManager._instance = None
- self._attribute_map_bk = {}
- self._attribute_map_bk = loadbalancer.RESOURCE_ATTRIBUTE_MAP.copy()
- self._tenant_id = "test-tenant"
self._subnet_id = "0c798ed8-33ba-11e2-8b28-000c291c4d14"
- if not core_plugin:
- core_plugin = test_config.get('plugin_name_v2',
- DB_CORE_PLUGIN_KLASS)
- if not lb_plugin:
- lb_plugin = test_config.get('lb_plugin_name', DB_LB_PLUGIN_KLASS)
-
- # point config file to: quantum/tests/etc/quantum.conf.test
- args = ['--config-file', etcdir('quantum.conf.test')]
- config.parse(args=args)
- # Update the plugin
- service_plugins = [lb_plugin]
- cfg.CONF.set_override('core_plugin', core_plugin)
- cfg.CONF.set_override('service_plugins', service_plugins)
- cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab")
- cfg.CONF.set_override('allow_pagination', True)
- cfg.CONF.set_override('allow_sorting', True)
- self.api = APIRouter()
-
plugin = loadbalancerPlugin.LoadBalancerPlugin()
ext_mgr = PluginAwareExtensionManager(
extensions_path,
)
app = config.load_paste_app('extensions_test_app')
self.ext_api = ExtensionMiddleware(app, ext_mgr=ext_mgr)
- super(LoadBalancerPluginDbTestCase, self).setUp()
-
- def tearDown(self):
- super(LoadBalancerPluginDbTestCase, self).tearDown()
- self.api = None
- self._skip_native_bulk = None
- self.ext_api = None
-
- db.clear_db()
- db._ENGINE = None
- db._MAKER = None
- cfg.CONF.reset()
- # Restore the original attribute map
- loadbalancer.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk
-
- def _req(self, method, resource, data=None, fmt=None,
- id=None, subresource=None, sub_id=None, params=None, action=None):
- if not fmt:
- fmt = self.fmt
- if id and action:
- path = '/lb/%(resource)s/%(id)s/%(action)s.%(fmt)s' % locals()
- elif id and subresource and sub_id:
- path = (
- '/lb/%(resource)s/%(id)s/%(subresource)s/'
- '%(sub_id)s.%(fmt)s') % locals()
- elif id and subresource:
- path = (
- '/lb/%(resource)s/%(id)s/'
- '%(subresource)s.%(fmt)s') % locals()
- elif id:
- path = '/lb/%(resource)s/%(id)s.%(fmt)s' % locals()
- else:
- path = '/lb/%(resource)s.%(fmt)s' % locals()
-
- content_type = 'application/%s' % fmt
- body = None
- if data is not None: # empty dict is valid
- body = wsgi.Serializer(
- attributes.get_attr_metadata()).serialize(data, content_type)
-
- req = create_request(path,
- body,
- content_type,
- method,
- query_string=params)
- return req
-
- def new_create_request(self, resource, data, fmt=None, id=None,
- subresource=None):
- return self._req('POST', resource, data, fmt, id=id,
- subresource=subresource)
-
- def new_list_request(self, resource, fmt=None, params=None):
- return self._req('GET', resource, None, fmt, params=params)
-
- def new_show_request(self, resource, id, fmt=None, action=None,
- subresource=None, sub_id=None):
- return self._req('GET', resource, None, fmt, id=id, action=action,
- subresource=subresource, sub_id=sub_id)
-
- def new_delete_request(self, resource, id, fmt=None,
- subresource=None, sub_id=None):
- return self._req('DELETE', resource, None, fmt, id=id,
- subresource=subresource, sub_id=sub_id)
-
- def new_update_request(self, resource, data, id, fmt=None):
- return self._req('PUT', resource, data, fmt, id=id)
def _create_vip(self, fmt, name, pool_id, protocol, port, admin_state_up,
expected_res_status=None, **kwargs):
else:
return self.ext_api
- def _delete(self, collection, id,
- expected_code=webob.exc.HTTPNoContent.code):
- req = self.new_delete_request(collection, id)
- res = req.get_response(self._api_for_resource(collection))
- self.assertEqual(res.status_int, expected_code)
-
- def _show(self, resource, id, expected_code=webob.exc.HTTPOk.code):
- req = self.new_show_request(resource, id)
- res = req.get_response(self._api_for_resource(resource))
- self.assertEqual(res.status_int, expected_code)
- return self.deserialize(res)
-
- def _update(self, resource, id, new_data,
- expected_code=webob.exc.HTTPOk.code):
- req = self.new_update_request(resource, new_data, id)
- res = req.get_response(self._api_for_resource(resource))
- self.assertEqual(res.status_int, expected_code)
- return self.deserialize(res)
-
- def _list(self, resource, fmt=None, query_params=None):
- req = self.new_list_request(resource, fmt, query_params)
- res = req.get_response(self._api_for_resource(resource))
- self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
- return self.deserialize(res)
-
- def _test_list_with_sort(self, collection, items, sorts, query_params=''):
- query_str = query_params
- for key, direction in sorts:
- query_str = query_str + "&sort_key=%s&sort_dir=%s" % (key,
- direction)
- req = self.new_list_request('%ss' % collection,
- params=query_str)
- api = self._api_for_resource('%ss' % collection)
- res = self.deserialize(req.get_response(api))
- collection = collection.replace('-', '_')
- expected_res = [item[collection]['id'] for item in items]
- self.assertListEqual([n['id'] for n in res["%ss" % collection]],
- expected_res)
-
- def _test_list_with_pagination(self, collection, items, sort,
- limit, expected_page_num, query_params=''):
- query_str = query_params + '&' if query_params else ''
- query_str = query_str + ("limit=%s&sort_key=%s&"
- "sort_dir=%s") % (limit, sort[0], sort[1])
- req = self.new_list_request("%ss" % collection, params=query_str)
- items_res = []
- page_num = 0
- api = self._api_for_resource('%ss' % collection)
- collection = collection.replace('-', '_')
- while req:
- page_num = page_num + 1
- res = self.deserialize(req.get_response(api))
- self.assertLessEqual(len(res["%ss" % collection]), limit)
- items_res = items_res + res["%ss" % collection]
- req = None
- if '%ss_links' % collection in res:
- for link in res['%ss_links' % collection]:
- if link['rel'] == 'next':
- req = create_request(link['href'],
- '', 'application/json')
- self.assertEqual(len(res["%ss" % collection]),
- limit)
- self.assertEqual(page_num, expected_page_num)
- self.assertListEqual([n['id'] for n in items_res],
- [item[collection]['id'] for item in items])
-
- def _test_list_with_pagination_reverse(self, collection, items, sort,
- limit, expected_page_num,
- query_params=''):
- resources = '%ss' % collection
- collection = collection.replace('-', '_')
- api = self._api_for_resource(resources)
- marker = items[-1][collection]['id']
- query_str = query_params + '&' if query_params else ''
- query_str = query_str + ("limit=%s&page_reverse=True&"
- "sort_key=%s&sort_dir=%s&"
- "marker=%s") % (limit, sort[0], sort[1],
- marker)
- req = self.new_list_request(resources, params=query_str)
- item_res = [items[-1][collection]]
- page_num = 0
- while req:
- page_num = page_num + 1
- res = self.deserialize(req.get_response(api))
- self.assertLessEqual(len(res["%ss" % collection]), limit)
- res["%ss" % collection].reverse()
- item_res = item_res + res["%ss" % collection]
- req = None
- if '%ss_links' % collection in res:
- for link in res['%ss_links' % collection]:
- if link['rel'] == 'previous':
- req = create_request(link['href'],
- '', 'application/json')
- self.assertEqual(len(res["%ss" % collection]),
- limit)
- self.assertEqual(page_num, expected_page_num)
- expected_res = [item[collection]['id'] for item in items]
- expected_res.reverse()
- self.assertListEqual([n['id'] for n in item_res],
- expected_res)
-
@contextlib.contextmanager
def vip(self, fmt=None, name='vip1', pool=None,
protocol='HTTP', port=80, admin_state_up=True, no_delete=False,
admin_state_up,
address=address,
**kwargs)
- vip = self.deserialize(res)
+ vip = self.deserialize(fmt or self.fmt, res)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
yield vip
admin_state_up,
address=address,
**kwargs)
- vip = self.deserialize(res)
+ vip = self.deserialize(fmt or self.fmt, res)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
yield vip
protocol,
admin_state_up,
**kwargs)
- pool = self.deserialize(res)
+ pool = self.deserialize(fmt or self.fmt, res)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
yield pool
port,
admin_state_up,
**kwargs)
- member = self.deserialize(res)
+ member = self.deserialize(fmt or self.fmt, res)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
yield member
max_retries,
admin_state_up,
**kwargs)
- health_monitor = self.deserialize(res)
+ health_monitor = self.deserialize(fmt or self.fmt, res)
the_health_monitor = health_monitor['health_monitor']
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
# Try resetting session_persistence
req = self.new_update_request('vips', update_info, v['vip']['id'])
- res = self.deserialize(req.get_response(self.ext_api))
+ res = self.deserialize(self.fmt, req.get_response(self.ext_api))
# If session persistence has been removed, it won't be present in
# the response.
'cookie_name': "jesssionId"},
'admin_state_up': False}}
req = self.new_update_request('vips', data, vip['vip']['id'])
- res = self.deserialize(req.get_response(self.ext_api))
+ res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in keys:
self.assertEqual(res['vip'][k], v)
with self.vip(name=name) as vip:
req = self.new_show_request('vips',
vip['vip']['id'])
- res = self.deserialize(req.get_response(self.ext_api))
+ res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in keys:
self.assertEqual(res['vip'][k], v)
('status', 'PENDING_CREATE')]
with self.vip(name=name):
req = self.new_list_request('vips')
- res = self.deserialize(req.get_response(self.ext_api))
+ res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in keys:
self.assertEqual(res['vips'][0][k], v)
req = self.new_show_request('pools',
pool_id,
fmt=self.fmt)
- pool_updated = self.deserialize(req.get_response(self.ext_api))
+ pool_updated = self.deserialize(
+ self.fmt,
+ req.get_response(self.ext_api)
+ )
- member1 = self.deserialize(res1)
+ member1 = self.deserialize(self.fmt, res1)
self.assertEqual(member1['member']['id'],
pool_updated['pool']['members'][0])
self.assertEqual(len(pool_updated['pool']['members']), 1)
req = self.new_show_request('pools',
pool['pool']['id'],
fmt=self.fmt)
- res = self.deserialize(req.get_response(self.ext_api))
+ res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in keys:
self.assertEqual(res['pool'][k], v)
pool_id,
fmt=self.fmt)
pool_update = self.deserialize(
- req.get_response(self.ext_api))
+ self.fmt,
+ req.get_response(self.ext_api)
+ )
self.assertIn(member1['member']['id'],
pool_update['pool']['members'])
self.assertIn(member2['member']['id'],
pool1['pool']['id'],
fmt=self.fmt)
pool1_update = self.deserialize(
- req.get_response(self.ext_api))
+ self.fmt,
+ req.get_response(self.ext_api)
+ )
self.assertEqual(len(pool1_update['pool']['members']), 1)
req = self.new_show_request('pools',
pool2['pool']['id'],
fmt=self.fmt)
pool2_update = self.deserialize(
- req.get_response(self.ext_api))
+ self.fmt,
+ req.get_response(self.ext_api)
+ )
self.assertEqual(len(pool1_update['pool']['members']), 1)
self.assertEqual(len(pool2_update['pool']['members']), 0)
req = self.new_update_request('members',
data,
member['member']['id'])
- res = self.deserialize(req.get_response(self.ext_api))
+ res = self.deserialize(
+ self.fmt,
+ req.get_response(self.ext_api)
+ )
for k, v in keys:
self.assertEqual(res['member'][k], v)
pool1['pool']['id'],
fmt=self.fmt)
pool1_update = self.deserialize(
- req.get_response(self.ext_api))
+ self.fmt,
+ req.get_response(self.ext_api)
+ )
req = self.new_show_request('pools',
pool2['pool']['id'],
fmt=self.fmt)
pool2_update = self.deserialize(
- req.get_response(self.ext_api))
+ self.fmt,
+ req.get_response(self.ext_api)
+ )
self.assertEqual(len(pool2_update['pool']['members']), 1)
self.assertEqual(len(pool1_update['pool']['members']), 0)
pool_id,
fmt=self.fmt)
pool_update = self.deserialize(
- req.get_response(self.ext_api))
+ self.fmt,
+ req.get_response(self.ext_api)
+ )
self.assertEqual(len(pool_update['pool']['members']), 0)
def test_show_member(self):
req = self.new_show_request('members',
member['member']['id'],
fmt=self.fmt)
- res = self.deserialize(req.get_response(self.ext_api))
+ res = self.deserialize(
+ self.fmt,
+ req.get_response(self.ext_api)
+ )
for k, v in keys:
self.assertEqual(res['member'][k], v)
req = self.new_update_request("health_monitors",
data,
monitor['health_monitor']['id'])
- res = self.deserialize(req.get_response(self.ext_api))
+ res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in keys:
self.assertEqual(res['health_monitor'][k], v)
req = self.new_show_request('health_monitors',
monitor['health_monitor']['id'],
fmt=self.fmt)
- res = self.deserialize(req.get_response(self.ext_api))
+ res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in keys:
self.assertEqual(res['health_monitor'][k], v)
pool['pool']['id'],
subresource="stats",
fmt=self.fmt)
- res = self.deserialize(req.get_response(self.ext_api))
+ res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in keys:
self.assertEqual(res['stats'][k], v)
'pools',
pool['pool']['id'],
fmt=self.fmt)
- res = self.deserialize(req.get_response(self.ext_api))
+ res = self.deserialize(
+ self.fmt,
+ req.get_response(self.ext_api)
+ )
self.assertIn(monitor1['health_monitor']['id'],
res['pool']['health_monitors'])
self.assertIn(monitor2['health_monitor']['id'],
'pools',
pool['pool']['id'],
fmt=self.fmt)
- res = self.deserialize(req.get_response(self.ext_api))
+ res = self.deserialize(
+ self.fmt,
+ req.get_response(self.ext_api)
+ )
self.assertNotIn(monitor1['health_monitor']['id'],
res['pool']['health_monitors'])
self.assertIn(monitor2['health_monitor']['id'],
'10',
'3',
True)
- health_monitor = self.deserialize(req)
+ health_monitor = self.deserialize(self.fmt, req)
self.assertEqual(req.status_int, 201)
# Associate the health_monitor to the pool
req = self.new_show_request('pools',
pool_id,
fmt=self.fmt)
- pool_updated = self.deserialize(req.get_response(self.ext_api))
- member1 = self.deserialize(res1)
- member2 = self.deserialize(res2)
+ pool_updated = self.deserialize(
+ self.fmt,
+ req.get_response(self.ext_api)
+ )
+ member1 = self.deserialize(self.fmt, res1)
+ member2 = self.deserialize(self.fmt, res2)
self.assertIn(member1['member']['id'],
pool_updated['pool']['members'])
self.assertIn(member2['member']['id'],
req = self.new_show_request('vips',
vip_id,
fmt=self.fmt)
- vip_updated = self.deserialize(req.get_response(self.ext_api))
+ vip_updated = self.deserialize(
+ self.fmt,
+ req.get_response(self.ext_api)
+ )
self.assertEqual(vip_updated['vip']['pool_id'],
pool_updated['pool']['id'])
ETCDIR = os.path.join(ROOTDIR, 'etc')
-@contextlib.contextmanager
-def dummy_context_func():
- yield None
+def optional_ctx(obj, fallback):
+ if not obj:
+ return fallback()
+
+ @contextlib.contextmanager
+ def context_wrapper():
+ yield obj
+ return context_wrapper()
def etcdir(*p):
class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
fmt = 'json'
+ resource_prefix_map = {}
- def setUp(self, plugin=None):
+ def setUp(self, plugin=None, service_plugins=None):
super(QuantumDbPluginV2TestCase, self).setUp()
# NOTE(jkoelker) for a 'pluggable' framework, Quantum sure
# doesn't like when the plugin changes ;)
if not plugin:
plugin = test_config.get('plugin_name_v2', DB_PLUGIN_KLASS)
+
# Create the default configurations
args = ['--config-file', etcdir('quantum.conf.test')]
# If test_config specifies some config-file, use it, as well
config.parse(args=args)
# Update the plugin
cfg.CONF.set_override('core_plugin', plugin)
+ cfg.CONF.set_override(
+ 'service_plugins',
+ [test_config.get(key, default)
+ for key, default in (service_plugins or {}).iteritems()]
+ )
+
cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab")
cfg.CONF.set_override('max_dns_nameservers', 2)
cfg.CONF.set_override('max_subnet_host_routes', 2)
self.api = APIRouter()
# Set the defualt port status
self.port_create_status = 'ACTIVE'
- super(QuantumDbPluginV2TestCase, self).setUp()
def _is_native_bulk_supported():
plugin_obj = QuantumManager.get_plugin()
# Restore the original attribute map
attributes.RESOURCE_ATTRIBUTE_MAP = self._attribute_map_bk
- def _req(self, method, resource, data=None, fmt=None,
- id=None, params=None, action=None):
+ def _req(self, method, resource, data=None, fmt=None, id=None, params=None,
+ action=None, subresource=None, sub_id=None):
fmt = fmt or self.fmt
- if id and action:
- path = '/%(resource)s/%(id)s/%(action)s.%(fmt)s' % locals()
- elif id:
- path = '/%(resource)s/%(id)s.%(fmt)s' % locals()
- else:
- path = '/%(resource)s.%(fmt)s' % locals()
+
+ path = '/%s.%s' % (
+ '/'.join(p for p in
+ (resource, id, subresource, sub_id, action) if p),
+ fmt
+ )
+
+ prefix = self.resource_prefix_map.get(resource)
+ if prefix:
+ path = prefix + path
content_type = 'application/%s' % fmt
body = None
return testlib_api.create_request(path, body, content_type, method,
query_string=params)
- def new_create_request(self, resource, data, fmt=None):
- return self._req('POST', resource, data, fmt)
-
- def new_list_request(self, resource, fmt=None, params=None):
- return self._req('GET', resource, None, fmt, params=params)
-
- def new_show_request(self, resource, id, fmt=None):
- return self._req('GET', resource, None, fmt, id=id)
-
- def new_delete_request(self, resource, id, fmt=None):
- return self._req('DELETE', resource, None, fmt, id=id)
-
- def new_update_request(self, resource, data, id, fmt=None):
- return self._req('PUT', resource, data, fmt, id=id)
-
- def new_action_request(self, resource, data, id, action, fmt=None):
- return self._req('PUT', resource, data, fmt, id=id, action=action)
+ def new_create_request(self, resource, data, fmt=None, id=None,
+ subresource=None):
+ return self._req('POST', resource, data, fmt, id=id,
+ subresource=subresource)
+
+ def new_list_request(self, resource, fmt=None, params=None,
+ subresource=None):
+ return self._req(
+ 'GET', resource, None, fmt, params=params, subresource=subresource
+ )
+
+ def new_show_request(self, resource, id, fmt=None, subresource=None):
+ return self._req(
+ 'GET', resource, None, fmt, id=id, subresource=subresource
+ )
+
+ def new_delete_request(self, resource, id, fmt=None, subresource=None,
+ sub_id=None):
+ return self._req(
+ 'DELETE',
+ resource,
+ None,
+ fmt,
+ id=id,
+ subresource=subresource,
+ sub_id=sub_id
+ )
+
+ def new_update_request(self, resource, data, id, fmt=None,
+ subresource=None):
+ return self._req(
+ 'PUT', resource, data, fmt, id=id, subresource=subresource
+ )
+
+ def new_action_request(self, resource, data, id, action, fmt=None,
+ subresource=None):
+ return self._req(
+ 'PUT',
+ resource,
+ data,
+ fmt,
+ id=id,
+ action=action,
+ subresource=subresource
+ )
def deserialize(self, content_type, response):
ctype = 'application/%s' % content_type
host_routes=None,
shared=None,
do_delete=True):
- with (self.network() if not network
- else dummy_context_func()) as network_to_use:
- if network:
- network_to_use = network
+ with optional_ctx(network, self.network) as network_to_use:
subnet = self._make_subnet(fmt or self.fmt,
network_to_use,
gateway_ip,
@contextlib.contextmanager
def port(self, subnet=None, fmt=None, no_delete=False,
**kwargs):
- with (self.subnet() if not subnet
- else dummy_context_func()) as subnet_to_use:
- if subnet:
- subnet_to_use = subnet
+ with optional_ctx(subnet, self.subnet) as subnet_to_use:
net_id = subnet_to_use['subnet']['network_id']
port = self._make_port(fmt or self.fmt, net_id, **kwargs)
try: