# Enable or disable bulk create/update/delete operations
# allow_bulk = True
+# Enable or disable pagination
+# allow_pagination = False
+# Enable or disable sorting
+# allow_sorting = False
# Enable or disable overlapping IPs for subnets
# Attention: the following parameter MUST be set to False if Quantum is
# being used in conjunction with nova security groups and/or metadata service.
# The actual topic names will be %s.%(default_notification_level)s
notification_topics = notifications
+# Default maximum number of items returned in a single response,
+# value == infinite and value < 0 means no max limit, and value must
+# greater than 0. If the number of items requested is greater than
+# pagination_max_limit, server will just return pagination_max_limit
+# of number of items.
+# pagination_max_limit = -1
+
[QUOTAS]
# resource name(s) that are supported in quota features
# quota_items = network,subnet,port
# License for the specific language governing permissions and limitations
# under the License.
+import urllib
+
from webob import exc
+from quantum.common import constants
+from quantum.common import exceptions
+from quantum.openstack.common import cfg
from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
+def get_filters(request, attr_info, skips=[]):
+ """
+ Extracts the filters from the request string
+ Returns a dict of lists for the filters:
+ check=a&check=b&name=Bob&
+ becomes:
+ {'check': [u'a', u'b'], 'name': [u'Bob']}
+ """
+ res = {}
+ for key, values in request.GET.dict_of_lists().iteritems():
+ if key in skips:
+ continue
+ values = [v for v in values if v]
+ key_attr_info = attr_info.get(key, {})
+ if 'convert_list_to' in key_attr_info:
+ values = key_attr_info['convert_list_to'](values)
+ elif 'convert_to' in key_attr_info:
+ convert_to = key_attr_info['convert_to']
+ values = [convert_to(v) for v in values]
+ if values:
+ res[key] = values
+ return res
+
+
+def get_previous_link(request, items, id_key):
+ params = request.GET.copy()
+ params.pop('marker', None)
+ if items:
+ marker = items[0][id_key]
+ params['marker'] = marker
+ params['page_reverse'] = True
+ return "%s?%s" % (request.path_url, urllib.urlencode(params))
+
+
+def get_next_link(request, items, id_key):
+ params = request.GET.copy()
+ params.pop('marker', None)
+ if items:
+ marker = items[-1][id_key]
+ params['marker'] = marker
+ params.pop('page_reverse', None)
+ return "%s?%s" % (request.path_url, urllib.urlencode(params))
+
+
+def get_limit_and_marker(request):
+ """Return marker, limit tuple from request.
+
+ :param request: `wsgi.Request` possibly containing 'marker' and 'limit'
+ GET variables. 'marker' is the id of the last element
+ the client has seen, and 'limit' is the maximum number
+ of items to return. If limit == 0, it means we needn't
+ pagination, then return None.
+ """
+ max_limit = _get_pagination_max_limit()
+ limit = _get_limit_param(request, max_limit)
+ if max_limit > 0:
+ limit = min(max_limit, limit) or max_limit
+ if not limit:
+ return None, None
+ marker = request.GET.get('marker', None)
+ return limit, marker
+
+
+def _get_pagination_max_limit():
+ max_limit = -1
+ if (cfg.CONF.pagination_max_limit.lower() !=
+ constants.PAGINATION_INFINITE):
+ try:
+ max_limit = int(cfg.CONF.pagination_max_limit)
+ if max_limit == 0:
+ raise ValueError()
+ except ValueError:
+ LOG.warn(_("Invalid value for pagination_max_limit: %s. It "
+ "should be an integer greater to 0"),
+ cfg.CONF.pagination_max_limit)
+ return max_limit
+
+
+def _get_limit_param(request, max_limit):
+ """Extract integer limit from request or fail."""
+ try:
+ limit = int(request.GET.get('limit', 0))
+ if limit >= 0:
+ return limit
+ except ValueError:
+ pass
+ msg = _("Limit must be an integer 0 or greater and not '%d'")
+ raise exceptions.BadRequest(resource='limit', msg=msg)
+
+
+def list_args(request, arg):
+ """Extracts the list of arg from request"""
+ return [v for v in request.GET.getall(arg) if v]
+
+
+def get_sorts(request, attr_info):
+ """Extract sort_key and sort_dir from request, return as:
+ [(key1, value1), (key2, value2)]
+ """
+ sort_keys = list_args(request, "sort_key")
+ sort_dirs = list_args(request, "sort_dir")
+ if len(sort_keys) != len(sort_dirs):
+ msg = _("The number of sort_keys and sort_dirs must be same")
+ raise exc.HTTPBadRequest(explanation=msg)
+ valid_dirs = [constants.SORT_DIRECTION_ASC, constants.SORT_DIRECTION_DESC]
+ absent_keys = [x for x in sort_keys if x not in attr_info]
+ if absent_keys:
+ msg = _("%s is invalid attribute for sort_keys") % absent_keys
+ raise exc.HTTPBadRequest(explanation=msg)
+ invalid_dirs = [x for x in sort_dirs if x not in valid_dirs]
+ if invalid_dirs:
+ msg = (_("%(invalid_dirs)s is invalid value for sort_dirs, "
+ "valid value is '%(asc)s' and '%(desc)s'") %
+ {'invalid_dirs': invalid_dirs,
+ 'asc': constants.SORT_DIRECTION_ASC,
+ 'desc': constants.SORT_DIRECTION_DESC})
+ raise exc.HTTPBadRequest(explanation=msg)
+ return zip(sort_keys,
+ [x == constants.SORT_DIRECTION_ASC for x in sort_dirs])
+
+
+def get_page_reverse(request):
+ data = request.GET.get('page_reverse', 'False')
+ return data.lower() == "true"
+
+
+def get_pagination_links(request, items, limit,
+ marker, page_reverse, key="id"):
+ key = key if key else 'id'
+ links = []
+ if not limit:
+ return links
+ if not (len(items) < limit and not page_reverse):
+ links.append({"rel": "next",
+ "href": get_next_link(request, items,
+ key)})
+ if not (len(items) < limit and page_reverse):
+ links.append({"rel": "previous",
+ "href": get_previous_link(request, items,
+ key)})
+ return links
+
+
+class PaginationHelper(object):
+
+ def __init__(self, request, primary_key='id'):
+ self.request = request
+ self.primary_key = primary_key
+
+ def update_fields(self, original_fields, fields_to_add):
+ pass
+
+ def update_args(self, args):
+ pass
+
+ def paginate(self, items):
+ return items
+
+ def get_links(self, items):
+ return {}
+
+
+class PaginationEmulatedHelper(PaginationHelper):
+
+ def __init__(self, request, primary_key='id'):
+ super(PaginationEmulatedHelper, self).__init__(request, primary_key)
+ self.limit, self.marker = get_limit_and_marker(request)
+ self.page_reverse = get_page_reverse(request)
+
+ def update_fields(self, original_fields, fields_to_add):
+ if not original_fields:
+ return
+ if self.primary_key not in original_fields:
+ original_fields.append(self.primary_key)
+ fields_to_add.append(self.primary_key)
+
+ def paginate(self, items):
+ if not self.limit:
+ return items
+ i = -1
+ if self.marker:
+ for item in items:
+ i = i + 1
+ if item[self.primary_key] == self.marker:
+ break
+ if self.page_reverse:
+ return items[i - self.limit:i]
+ return items[i + 1:i + self.limit + 1]
+
+ def get_links(self, items):
+ return get_pagination_links(
+ self.request, items, self.limit, self.marker,
+ self.page_reverse, self.primary_key)
+
+
+class PaginationNativeHelper(PaginationEmulatedHelper):
+
+ def update_args(self, args):
+ if self.primary_key not in dict(args.get('sorts', [])).keys():
+ args.setdefault('sorts', []).append((self.primary_key, True))
+ args.update({'limit': self.limit, 'marker': self.marker,
+ 'page_reverse': self.page_reverse})
+
+ def paginate(self, items):
+ return items
+
+
+class NoPaginationHelper(PaginationHelper):
+ pass
+
+
+class SortingHelper(object):
+
+ def __init__(self, request, attr_info):
+ pass
+
+ def update_args(self, args):
+ pass
+
+ def update_fields(self, original_fields, fields_to_add):
+ pass
+
+ def sort(self, items):
+ return items
+
+
+class SortingEmulatedHelper(SortingHelper):
+
+ def __init__(self, request, attr_info):
+ super(SortingEmulatedHelper, self).__init__(request, attr_info)
+ self.sort_dict = get_sorts(request, attr_info)
+
+ def update_fields(self, original_fields, fields_to_add):
+ if not original_fields:
+ return
+ for key in dict(self.sort_dict).keys():
+ if key not in original_fields:
+ original_fields.append(key)
+ fields_to_add.append(key)
+
+ def sort(self, items):
+ def cmp_func(obj1, obj2):
+ for key, direction in self.sort_dict:
+ ret = cmp(obj1[key], obj2[key])
+ if ret:
+ return ret * (1 if direction else -1)
+ return 0
+ return sorted(items, cmp=cmp_func)
+
+
+class SortingNativeHelper(SortingHelper):
+
+ def __init__(self, request, attr_info):
+ self.sort_dict = get_sorts(request, attr_info)
+
+ def update_args(self, args):
+ args['sorts'] = self.sort_dict
+
+
+class NoSortingHelper(SortingHelper):
+ pass
+
+
class QuantumController(object):
""" Base controller class for Quantum API """
# _resource_name will be redefined in sub concrete controller
'networks': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
- 'is_visible': True},
+ 'is_visible': True,
+ 'primary_key': True},
'name': {'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
'default': '', 'is_visible': True},
'ports': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
- 'is_visible': True},
+ 'is_visible': True,
+ 'primary_key': True},
'name': {'allow_post': True, 'allow_put': True, 'default': '',
'validate': {'type:string': None},
'is_visible': True},
'subnets': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
- 'is_visible': True},
+ 'is_visible': True,
+ 'primary_key': True},
'name': {'allow_post': True, 'allow_put': True, 'default': '',
'validate': {'type:string': None},
'is_visible': True},
from oslo.config import cfg
+from quantum.api import api_common
from quantum.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from quantum.api.v2 import attributes
from quantum.api.v2 import resource as wsgi_resource
}
-def _fields(request):
- """
- Extracts the list of fields to return
- """
- return [v for v in request.GET.getall('fields') if v]
-
-
-def _filters(request, attr_info):
- """
- Extracts the filters from the request string
-
- Returns a dict of lists for the filters:
-
- check=a&check=b&name=Bob&
-
- becomes
-
- {'check': [u'a', u'b'], 'name': [u'Bob']}
- """
- res = {}
- for key, values in request.GET.dict_of_lists().iteritems():
- if key == 'fields':
- continue
- values = [v for v in values if v]
- key_attr_info = attr_info.get(key, {})
- if 'convert_list_to' in key_attr_info:
- values = key_attr_info['convert_list_to'](values)
- elif 'convert_to' in key_attr_info:
- convert_to = key_attr_info['convert_to']
- values = [convert_to(v) for v in values]
- if values:
- res[key] = values
- return res
-
-
class Controller(object):
LIST = 'list'
SHOW = 'show'
DELETE = 'delete'
def __init__(self, plugin, collection, resource, attr_info,
- allow_bulk=False, member_actions=None, parent=None):
+ allow_bulk=False, member_actions=None, parent=None,
+ allow_pagination=False, allow_sorting=False):
if member_actions is None:
member_actions = []
self._plugin = plugin
self._resource = resource.replace('-', '_')
self._attr_info = attr_info
self._allow_bulk = allow_bulk
+ self._allow_pagination = allow_pagination
+ self._allow_sorting = allow_sorting
self._native_bulk = self._is_native_bulk_supported()
+ self._native_pagination = self._is_native_pagination_supported()
+ self._native_sorting = self._is_native_sorting_supported()
self._policy_attrs = [name for (name, info) in self._attr_info.items()
if info.get('required_by_policy')]
self._publisher_id = notifier_api.publisher_id('network')
self._dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
self._member_actions = member_actions
+ self._primary_key = self._get_primary_key()
+ if self._allow_pagination and self._native_pagination:
+ # Native pagination need native sorting support
+ if not self._native_sorting:
+ raise Exception(_("Native pagination depend on native "
+ "sorting"))
+ if not self._allow_sorting:
+ LOG.info(_("Allow sorting is enabled because native "
+ "pagination requires native sorting"))
+ self._allow_sorting = True
if parent:
self._parent_id_name = '%s_id' % parent['member_name']
self._plugin_handlers[action] = '%s%s_%s' % (action, parent_part,
self._resource)
+ def _get_primary_key(self, default_primary_key='id'):
+ for key, value in self._attr_info.iteritems():
+ if value.get('primary_key', False):
+ return key
+ return default_primary_key
+
def _is_native_bulk_supported(self):
native_bulk_attr_name = ("_%s__native_bulk_support"
% self._plugin.__class__.__name__)
return getattr(self._plugin, native_bulk_attr_name, False)
+ def _is_native_pagination_supported(self):
+ native_pagination_attr_name = ("_%s__native_pagination_support"
+ % self._plugin.__class__.__name__)
+ return getattr(self._plugin, native_pagination_attr_name, False)
+
+ def _is_native_sorting_supported(self):
+ native_sorting_attr_name = ("_%s__native_sorting_support"
+ % self._plugin.__class__.__name__)
+ return getattr(self._plugin, native_sorting_attr_name, False)
+
def _is_visible(self, attr):
attr_val = self._attr_info.get(attr)
return attr_val and attr_val['is_visible']
else:
raise AttributeError
+ def _get_pagination_helper(self, request):
+ if self._allow_pagination and self._native_pagination:
+ return api_common.PaginationNativeHelper(request,
+ self._primary_key)
+ elif self._allow_pagination:
+ return api_common.PaginationEmulatedHelper(request,
+ self._primary_key)
+ return api_common.NoPaginationHelper(request, self._primary_key)
+
+ def _get_sorting_helper(self, request):
+ if self._allow_sorting and self._native_sorting:
+ return api_common.SortingNativeHelper(request, self._attr_info)
+ elif self._allow_sorting:
+ return api_common.SortingEmulatedHelper(request, self._attr_info)
+ return api_common.NoSortingHelper(request, self._attr_info)
+
def _items(self, request, do_authz=False, parent_id=None):
"""Retrieves and formats a list of elements of the requested entity"""
# NOTE(salvatore-orlando): The following ensures that fields which
# are needed for authZ policy validation are not stripped away by the
# plugin before returning.
- original_fields, fields_to_add = self._do_field_list(_fields(request))
- kwargs = {'filters': _filters(request, self._attr_info),
+ original_fields, fields_to_add = self._do_field_list(
+ api_common.list_args(request, 'fields'))
+ filters = api_common.get_filters(request, self._attr_info,
+ ['fields', 'sort_key', 'sort_dir',
+ 'limit', 'marker', 'page_reverse'])
+ kwargs = {'filters': filters,
'fields': original_fields}
+ sorting_helper = self._get_sorting_helper(request)
+ pagination_helper = self._get_pagination_helper(request)
+ sorting_helper.update_args(kwargs)
+ sorting_helper.update_fields(original_fields, fields_to_add)
+ pagination_helper.update_args(kwargs)
+ pagination_helper.update_fields(original_fields, fields_to_add)
if parent_id:
kwargs[self._parent_id_name] = parent_id
obj_getter = getattr(self._plugin, self._plugin_handlers[self.LIST])
obj_list = obj_getter(request.context, **kwargs)
+ obj_list = sorting_helper.sort(obj_list)
+ obj_list = pagination_helper.paginate(obj_list)
+
# Check authz
if do_authz:
# FIXME(salvatore-orlando): obj_getter might return references to
self._plugin_handlers[self.SHOW],
obj,
plugin=self._plugin)]
- return {self._collection: [self._view(obj,
- fields_to_strip=fields_to_add)
- for obj in obj_list]}
+ collection = {self._collection:
+ [self._view(obj,
+ fields_to_strip=fields_to_add)
+ for obj in obj_list]}
+ pagination_links = pagination_helper.get_links(obj_list)
+ if pagination_links:
+ collection[self._collection + "_links"] = pagination_links
+
+ return collection
def _item(self, request, id, do_authz=False, field_list=None,
parent_id=None):
# NOTE(salvatore-orlando): The following ensures that fields
# which are needed for authZ policy validation are not stripped
# away by the plugin before returning.
- field_list, added_fields = self._do_field_list(_fields(request))
+ field_list, added_fields = self._do_field_list(
+ api_common.list_args(request, "fields"))
parent_id = kwargs.get(self._parent_id_name)
return {self._resource:
self._view(self._item(request,
def create_resource(collection, resource, plugin, params, allow_bulk=False,
- member_actions=None, parent=None):
+ member_actions=None, parent=None, allow_pagination=False,
+ allow_sorting=False):
controller = Controller(plugin, collection, resource, params, allow_bulk,
- member_actions=member_actions, parent=parent)
+ member_actions=member_actions, parent=parent,
+ allow_pagination=allow_pagination,
+ allow_sorting=allow_sorting)
return wsgi_resource.Resource(controller, FAULT_MAP)
def _map_resource(collection, resource, params, parent=None):
allow_bulk = cfg.CONF.allow_bulk
- controller = base.create_resource(collection, resource,
- plugin, params,
- allow_bulk=allow_bulk,
- parent=parent)
+ allow_pagination = cfg.CONF.allow_pagination
+ allow_sorting = cfg.CONF.allow_sorting
+ controller = base.create_resource(
+ collection, resource, plugin, params, allow_bulk=allow_bulk,
+ parent=parent, allow_pagination=allow_pagination,
+ allow_sorting=allow_sorting)
path_prefix = None
if parent:
path_prefix = "/%s/{%s_id}/%s" % (parent['collection_name'],
from paste import deploy
from quantum.api.v2 import attributes
+from quantum.common import constants
from quantum.common import utils
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
help=_("How many times Quantum will retry MAC generation")),
cfg.BoolOpt('allow_bulk', default=True,
help=_("Allow the usage of the bulk API")),
+ cfg.BoolOpt('allow_pagination', default=False,
+ help=_("Allow the usage of the pagination")),
+ cfg.BoolOpt('allow_sorting', default=False,
+ help=_("Allow the usage of the sorting")),
+ cfg.StrOpt('pagination_max_limit', default="-1",
+ help=_("The maximum number of items returned in a single "
+ "response, value was 'infinite' or negative integer "
+ "means no limit")),
cfg.IntOpt('max_dns_nameservers', default=5,
help=_("Maximum number of DNS nameservers")),
cfg.IntOpt('max_subnet_host_routes', default=20,
AGENT_TYPE_LINUXBRIDGE = 'Linux bridge agent'
AGENT_TYPE_L3 = 'L3 agent'
L2_AGENT_TOPIC = 'N/A'
+
+PAGINATION_INFINITE = 'infinite'
+
+SORT_DIRECTION_ASC = 'asc'
+SORT_DIRECTION_DESC = 'desc'
from quantum.common import exceptions as q_exc
from quantum.db import api as db
from quantum.db import models_v2
+from quantum.db import sqlalchemyutils
from quantum.openstack.common import log as logging
from quantum.openstack.common import timeutils
from quantum.openstack.common import uuidutils
"""
# This attribute specifies whether the plugin supports or not
- # bulk operations. Name mangling is used in order to ensure it
- # is qualified by class
+ # bulk/pagination/sorting operations. Name mangling is used in
+ # order to ensure it is qualified by class
__native_bulk_support = True
+ __native_pagination_support = True
+ __native_sorting_support = True
# Plugins, mixin classes implementing extension will register
# hooks into the dict below for "augmenting" the "core way" of
# building a query for retrieving objects from a model class.
query = query.filter(column.in_(value))
return query
- def _get_collection_query(self, context, model, filters=None):
+ def _get_collection_query(self, context, model, filters=None,
+ sorts=None, limit=None, marker_obj=None,
+ page_reverse=False):
collection = self._model_query(context, model)
collection = self._apply_filters_to_query(collection, model, filters)
+ if limit and page_reverse and sorts:
+ sorts = [(s[0], not s[1]) for s in sorts]
+ collection = sqlalchemyutils.paginate_query(collection, model, limit,
+ sorts,
+ marker_obj=marker_obj)
return collection
def _get_collection(self, context, model, dict_func, filters=None,
- fields=None):
- query = self._get_collection_query(context, model, filters)
- return [dict_func(c, fields) for c in query.all()]
+ fields=None, sorts=None, limit=None, marker_obj=None,
+ page_reverse=False):
+ query = self._get_collection_query(context, model, filters=filters,
+ sorts=sorts,
+ limit=limit,
+ marker_obj=marker_obj,
+ page_reverse=page_reverse)
+ items = [dict_func(c, fields) for c in query.all()]
+ if limit and page_reverse:
+ items.reverse()
+ return items
def _get_collection_count(self, context, model, filters=None):
return self._get_collection_query(context, model, filters).count()
raise e
return objects
+ def _get_marker_obj(self, context, resource, limit, marker):
+ if limit and marker:
+ return getattr(self, '_get_%s' % resource)(context, marker)
+ return None
+
def create_network_bulk(self, context, networks):
return self._create_bulk('network', context, networks)
network = self._get_network(context, id)
return self._make_network_dict(network, fields)
- def get_networks(self, context, filters=None, fields=None):
+ def get_networks(self, context, filters=None, fields=None,
+ sorts=None, limit=None, marker=None,
+ page_reverse=False):
+ marker_obj = self._get_marker_obj(context, 'network', limit, marker)
return self._get_collection(context, models_v2.Network,
self._make_network_dict,
- filters=filters, fields=fields)
+ filters=filters, fields=fields,
+ sorts=sorts,
+ limit=limit,
+ marker_obj=marker_obj,
+ page_reverse=page_reverse)
def get_networks_count(self, context, filters=None):
return self._get_collection_count(context, models_v2.Network,
subnet = self._get_subnet(context, id)
return self._make_subnet_dict(subnet, fields)
- def get_subnets(self, context, filters=None, fields=None):
+ def get_subnets(self, context, filters=None, fields=None,
+ sorts=None, limit=None, marker=None,
+ page_reverse=False):
+ marker_obj = self._get_marker_obj(context, 'subnet', limit, marker)
return self._get_collection(context, models_v2.Subnet,
self._make_subnet_dict,
- filters=filters, fields=fields)
+ filters=filters, fields=fields,
+ sorts=sorts,
+ limit=limit,
+ marker_obj=marker_obj,
+ page_reverse=page_reverse)
def get_subnets_count(self, context, filters=None):
return self._get_collection_count(context, models_v2.Subnet,
port = self._get_port(context, id)
return self._make_port_dict(port, fields)
- def _get_ports_query(self, context, filters=None):
+ def _get_ports_query(self, context, filters=None, sorts=None, limit=None,
+ marker_obj=None, page_reverse=False):
Port = models_v2.Port
IPAllocation = models_v2.IPAllocation
query = query.filter(IPAllocation.subnet_id.in_(subnet_ids))
query = self._apply_filters_to_query(query, Port, filters)
+ if limit and page_reverse and sorts:
+ sorts = [(s[0], not s[1]) for s in sorts]
+ query = sqlalchemyutils.paginate_query(query, Port, limit,
+ sorts, marker_obj)
return query
- def get_ports(self, context, filters=None, fields=None):
- query = self._get_ports_query(context, filters)
- return [self._make_port_dict(c, fields) for c in query.all()]
+ def get_ports(self, context, filters=None, fields=None,
+ sorts=None, limit=None, marker=None,
+ page_reverse=False):
+ marker_obj = self._get_marker_obj(context, 'port', limit, marker)
+ query = self._get_ports_query(context, filters=filters,
+ sorts=sorts, limit=limit,
+ marker_obj=marker_obj,
+ page_reverse=page_reverse)
+ items = [self._make_port_dict(c, fields) for c in query.all()]
+ if limit and page_reverse:
+ items.reverse()
+ return items
def get_ports_count(self, context, filters=None):
return self._get_ports_query(context, filters).count()
router = self._get_router(context, id)
return self._make_router_dict(router, fields)
- def get_routers(self, context, filters=None, fields=None):
+ def get_routers(self, context, filters=None, fields=None,
+ sorts=None, limit=None, marker=None,
+ page_reverse=False):
+ marker_obj = self._get_marker_obj(context, 'router', limit, marker)
return self._get_collection(context, Router,
self._make_router_dict,
- filters=filters, fields=fields)
+ filters=filters, fields=fields,
+ sorts=sorts,
+ limit=limit,
+ marker_obj=marker_obj,
+ page_reverse=page_reverse)
def get_routers_count(self, context, filters=None):
return self._get_collection_count(context, Router,
floatingip = self._get_floatingip(context, id)
return self._make_floatingip_dict(floatingip, fields)
- def get_floatingips(self, context, filters=None, fields=None):
+ def get_floatingips(self, context, filters=None, fields=None,
+ sorts=None, limit=None, marker=None,
+ page_reverse=False):
+ marker_obj = self._get_marker_obj(context, 'floatingip', limit,
+ marker)
return self._get_collection(context, FloatingIP,
self._make_floatingip_dict,
- filters=filters, fields=fields)
+ filters=filters, fields=fields,
+ sorts=sorts,
+ limit=limit,
+ marker_obj=marker_obj,
+ page_reverse=page_reverse)
def get_floatingips_count(self, context, filters=None):
return self._get_collection_count(context, FloatingIP,
return collection
def _get_collection(self, context, model, dict_func, filters=None,
- fields=None):
+ fields=None, sorts=None, limit=None, marker_obj=None,
+ page_reverse=False):
query = self._get_collection_query(context, model, filters)
return [dict_func(c, fields) for c in query.all()]
return self._make_security_group_dict(security_group_db)
- def get_security_groups(self, context, filters=None, fields=None):
- return self._get_collection(context, SecurityGroup,
+ def get_security_groups(self, context, filters=None, fields=None,
+ sorts=None, limit=None,
+ marker=None, page_reverse=False):
+ marker_obj = self._get_marker_obj(context, 'security_group', limit,
+ marker)
+ return self._get_collection(context,
+ SecurityGroup,
self._make_security_group_dict,
- filters=filters, fields=fields)
+ filters=filters, fields=fields,
+ sorts=sorts,
+ limit=limit, marker_obj=marker_obj,
+ page_reverse=page_reverse)
def get_security_groups_count(self, context, filters=None):
return self._get_collection_count(context, SecurityGroup,
def _get_port_security_group_bindings(self, context,
filters=None, fields=None):
- return self._get_collection(context, SecurityGroupPortBinding,
+ return self._get_collection(context,
+ SecurityGroupPortBinding,
self._make_security_group_binding_dict,
filters=filters, fields=fields)
if rules:
raise ext_sg.SecurityGroupRuleExists(id=str(rules[0]['id']))
- def get_security_group_rules(self, context, filters=None, fields=None):
- return self._get_collection(context, SecurityGroupRule,
+ def get_security_group_rules(self, context, filters=None, fields=None,
+ sorts=None, limit=None, marker=None,
+ page_reverse=False):
+ marker_obj = self._get_marker_obj(context, 'security_group_rule',
+ limit, marker)
+ return self._get_collection(context,
+ SecurityGroupRule,
self._make_security_group_rule_dict,
- filters=filters, fields=fields)
+ filters=filters, fields=fields,
+ sorts=sorts,
+ limit=limit, marker_obj=marker_obj,
+ page_reverse=page_reverse)
def get_security_group_rules_count(self, context, filters=None):
return self._get_collection_count(context, SecurityGroupRule,
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import sqlalchemy
+from sqlalchemy.orm.properties import RelationshipProperty
+
+from quantum.common import exceptions as q_exc
+from quantum.openstack.common import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+def paginate_query(query, model, limit, sorts, marker_obj=None):
+ """Returns a query with sorting / pagination criteria added.
+
+ Pagination works by requiring a unique sort key, specified by sorts.
+ (If sort keys is not unique, then we risk looping through values.)
+ We use the last row in the previous page as the 'marker' for pagination.
+ So we must return values that follow the passed marker in the order.
+ With a single-valued sort key, this would be easy: sort_key > X.
+ With a compound-values sort key, (k1, k2, k3) we must do this to repeat
+ the lexicographical ordering:
+ (k1 > X1) or (k1 == X1 && k2 > X2) or (k1 == X1 && k2 == X2 && k3 > X3)
+ The reason of didn't use OFFSET clause was it don't scale, please refer
+ discussion at https://lists.launchpad.net/openstack/msg02547.html
+
+ We also have to cope with different sort directions.
+
+ Typically, the id of the last row is used as the client-facing pagination
+ marker, then the actual marker object must be fetched from the db and
+ passed in to us as marker.
+
+ :param query: the query object to which we should add paging/sorting
+ :param model: the ORM model class
+ :param limit: maximum number of items to return
+ :param sorts: array of attributes and direction by which results should
+ be sorted
+ :param marker: the last item of the previous page; we returns the next
+ results after this value.
+ :rtype: sqlalchemy.orm.query.Query
+ :return: The query with sorting/pagination added.
+ """
+ if not sorts:
+ return query
+
+ # A primary key must be specified in sort keys
+ assert not (limit and
+ len(set(dict(sorts).keys()) &
+ set(model.__table__.primary_key.columns.keys())) == 0)
+
+ # Add sorting
+ for sort_key, sort_direction in sorts:
+ sort_dir_func = sqlalchemy.asc if sort_direction else sqlalchemy.desc
+ try:
+ sort_key_attr = getattr(model, sort_key)
+ except AttributeError:
+ # Extension attribute doesn't support for sorting. Because it
+ # existed in attr_info, it will be catched at here
+ msg = _("%s is invalid attribute for sort_key") % sort_key
+ raise q_exc.BadRequest(resource=model.__tablename__, msg=msg)
+ if isinstance(sort_key_attr.property, RelationshipProperty):
+ msg = _("The attribute '%(attr)s' is reference to other "
+ "resource, can't used by sort "
+ "'%(resource)s'") % {'attr': sort_key,
+ 'resource': model.__tablename__}
+ raise q_exc.BadRequest(resource=model.__tablename__, msg=msg)
+ query = query.order_by(sort_dir_func(sort_key_attr))
+
+ # Add pagination
+ if marker_obj:
+ marker_values = [getattr(marker_obj, sort[0]) for sort in sorts]
+
+ # Build up an array of sort criteria as in the docstring
+ criteria_list = []
+ for i, sort in enumerate(sorts):
+ crit_attrs = [(getattr(model, sorts[j][0]) == marker_values[j])
+ for j in xrange(i)]
+ model_attr = getattr(model, sort[0])
+ if sort[1]:
+ crit_attrs.append((model_attr > marker_values[i]))
+ else:
+ crit_attrs.append((model_attr < marker_values[i]))
+
+ criteria = sqlalchemy.sql.and_(*crit_attrs)
+ criteria_list.append(criteria)
+
+ f = sqlalchemy.sql.or_(*criteria_list)
+ query = query.filter(f)
+
+ if limit:
+ query = query.limit(limit)
+
+ return query
'routers': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
- 'is_visible': True},
+ 'is_visible': True,
+ 'primary_key': True},
'name': {'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
'is_visible': True, 'default': ''},
'floatingips': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
- 'is_visible': True},
+ 'is_visible': True,
+ 'primary_key': True},
'floating_ip_address': {'allow_post': False, 'allow_put': False,
'validate': {'type:ip_address_or_none': None},
'is_visible': True},
quota.QUOTAS.register_resource_by_name(resource_name)
- controller = base.create_resource(collection_name,
- resource_name,
- plugin, params,
- member_actions=member_actions)
+ controller = base.create_resource(
+ collection_name, resource_name, plugin, params,
+ member_actions=member_actions,
+ allow_pagination=cfg.CONF.allow_pagination,
+ allow_sorting=cfg.CONF.allow_sorting)
ex = extensions.ResourceExtension(collection_name,
controller,
pass
@abstractmethod
- def get_routers(self, context, filters=None, fields=None):
+ def get_routers(self, context, filters=None, fields=None,
+ sorts=None, limit=None, marker=None, page_reverse=False):
pass
@abstractmethod
pass
@abstractmethod
- def get_floatingips(self, context, filters=None, fields=None):
+ def get_floatingips(self, context, filters=None, fields=None,
+ sorts=None, limit=None, marker=None,
+ page_reverse=False):
pass
def get_routers_count(self, context, filters=None):
from quantum.api.v2 import base
from quantum.common import exceptions as qexception
from quantum import manager
+from quantum.openstack.common import cfg
from quantum.plugins.common import constants
from quantum.plugins.services.service_base import ServicePluginBase
'vips': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
- 'is_visible': True},
+ 'is_visible': True,
+ 'primary_key': True},
'tenant_id': {'allow_post': True, 'allow_put': False,
'validate': {'type:string': None},
'required_by_policy': True,
'pools': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
- 'is_visible': True},
+ 'is_visible': True,
+ 'primary_key': True},
'tenant_id': {'allow_post': True, 'allow_put': False,
'validate': {'type:string': None},
'required_by_policy': True,
'members': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
- 'is_visible': True},
+ 'is_visible': True,
+ 'primary_key': True},
'tenant_id': {'allow_post': True, 'allow_put': False,
'validate': {'type:string': None},
'required_by_policy': True,
'health_monitors': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
- 'is_visible': True},
+ 'is_visible': True,
+ 'primary_key': True},
'tenant_id': {'allow_post': True, 'allow_put': False,
'validate': {'type:string': None},
'required_by_policy': True,
if resource_name == 'pool':
member_actions = {'stats': 'GET'}
- controller = base.create_resource(collection_name,
- resource_name,
- plugin, params,
- member_actions=member_actions)
+ controller = base.create_resource(
+ collection_name, resource_name, plugin, params,
+ member_actions=member_actions,
+ allow_pagination=cfg.CONF.allow_pagination,
+ allow_sorting=cfg.CONF.allow_sorting)
resource = extensions.ResourceExtension(
collection_name,
'security_groups': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
- 'is_visible': True},
+ 'is_visible': True,
+ 'primary_key': True},
'name': {'allow_post': True, 'allow_put': False,
'is_visible': True, 'default': '',
'validate': {'type:name_not_default': None}},
'security_group_rules': {
'id': {'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None},
- 'is_visible': True},
+ 'is_visible': True,
+ 'primary_key': True},
# external_id can be used to be backwards compatible with nova
'external_id': {'allow_post': True, 'allow_put': False,
'is_visible': True, 'default': None,
quota.QUOTAS.register_resource_by_name(resource_name)
controller = base.create_resource(collection_name,
resource_name,
- plugin, params, allow_bulk=True)
+ plugin, params, allow_bulk=True,
+ allow_pagination=True,
+ allow_sorting=True)
ex = extensions.ResourceExtension(collection_name,
controller,
pass
@abstractmethod
- def get_security_groups(self, context, filters=None, fields=None):
+ def get_security_groups(self, context, filters=None, fields=None,
+ sorts=None, limit=None, marker=None,
+ page_reverse=False):
pass
@abstractmethod
pass
@abstractmethod
- def get_security_group_rules(self, context, filters=None, fields=None):
+ def get_security_group_rules(self, context, filters=None, fields=None,
+ sorts=None, limit=None, marker=None,
+ page_reverse=False):
pass
@abstractmethod
"""
# This attribute specifies whether the plugin supports or not
- # bulk operations. Name mangling is used in order to ensure it
- # is qualified by class
+ # bulk/pagination/sorting operations. Name mangling is used in
+ # order to ensure it is qualified by class
__native_bulk_support = True
+ __native_pagination_support = True
+ __native_sorting_support = True
supported_extension_aliases = ["provider", "router", "binding", "quotas",
"security-group", "agent"]
self._extend_network_dict_l3(context, net)
return self._fields(net, fields)
- def get_networks(self, context, filters=None, fields=None):
+ def get_networks(self, context, filters=None, fields=None,
+ sorts=None, limit=None, marker=None, page_reverse=False):
session = context.session
with session.begin(subtransactions=True):
- nets = super(LinuxBridgePluginV2, self).get_networks(context,
- filters,
- None)
+ nets = super(LinuxBridgePluginV2,
+ self).get_networks(context, filters, None, sorts,
+ limit, marker, page_reverse)
for net in nets:
self._extend_network_dict_provider(context, net)
self._extend_network_dict_l3(context, net)
self._extend_port_dict_binding(context, port),
return self._fields(port, fields)
- def get_ports(self, context, filters=None, fields=None):
+ def get_ports(self, context, filters=None, fields=None,
+ sorts=None, limit=None, marker=None, page_reverse=False):
res_ports = []
with context.session.begin(subtransactions=True):
- ports = super(LinuxBridgePluginV2, self).get_ports(context,
- filters,
- fields)
+ ports = super(LinuxBridgePluginV2,
+ self).get_ports(context, filters, fields, sorts,
+ limit, marker, page_reverse)
#TODO(nati) filter by security group
for port in ports:
self._extend_port_dict_security_group(context, port)
"""
# This attribute specifies whether the plugin supports or not
- # bulk operations. Name mangling is used in order to ensure it
- # is qualified by class
+ # bulk/pagination/sorting operations. Name mangling is used in
+ # order to ensure it is qualified by class
__native_bulk_support = True
+ __native_pagination_support = True
+ __native_sorting_support = True
+
supported_extension_aliases = ["provider", "router",
"binding", "quotas", "security-group",
"agent"]
self._extend_network_dict_l3(context, net)
return self._fields(net, fields)
- def get_networks(self, context, filters=None, fields=None):
+ def get_networks(self, context, filters=None, fields=None,
+ sorts=None,
+ limit=None, marker=None, page_reverse=False):
session = context.session
with session.begin(subtransactions=True):
- nets = super(OVSQuantumPluginV2, self).get_networks(context,
- filters,
- None)
+ nets = super(OVSQuantumPluginV2,
+ self).get_networks(context, filters, None, sorts,
+ limit, marker, page_reverse)
for net in nets:
self._extend_network_dict_provider(context, net)
self._extend_network_dict_l3(context, net)
self._extend_port_dict_binding(context, port)
return self._fields(port, fields)
- def get_ports(self, context, filters=None, fields=None):
+ def get_ports(self, context, filters=None, fields=None,
+ sorts=None, limit=None, marker=None,
+ page_reverse=False):
with context.session.begin(subtransactions=True):
ports = super(OVSQuantumPluginV2, self).get_ports(
- context, filters, fields)
+ context, filters, fields, sorts, limit, marker,
+ page_reverse)
#TODO(nati) filter by security group
for port in ports:
self._extend_port_dict_security_group(context, port)
pass
@abstractmethod
- def get_subnets(self, context, filters=None, fields=None):
+ def get_subnets(self, context, filters=None, fields=None,
+ sorts=None, limit=None, marker=None, page_reverse=False):
"""
Retrieve a list of subnets. The contents of the list depends on
the identity of the user making the request (as indicated by the
pass
@abstractmethod
- def get_networks(self, context, filters=None, fields=None):
+ def get_networks(self, context, filters=None, fields=None,
+ sorts=None, limit=None, marker=None, page_reverse=False):
"""
Retrieve a list of networks. The contents of the list depends on
the identity of the user making the request (as indicated by the
pass
@abstractmethod
- def get_ports(self, context, filters=None, fields=None):
+ def get_ports(self, context, filters=None, fields=None,
+ sorts=None, limit=None, marker=None, page_reverse=False):
"""
Retrieve a list of ports. The contents of the list depends on
the identity of the user making the request (as indicated by the
cfg.CONF.set_override('core_plugin', core_plugin)
cfg.CONF.set_override('service_plugins', service_plugins)
cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab")
+ cfg.CONF.set_override('allow_pagination', True)
+ cfg.CONF.set_override('allow_sorting', True)
self.api = APIRouter()
plugin = loadbalancerPlugin.LoadBalancerPlugin()
self.assertEqual(res.status_int, webob.exc.HTTPOk.code)
return self.deserialize(res)
+ def _test_list_with_sort(self, collection, items, sorts, query_params=''):
+ query_str = query_params
+ for key, direction in sorts:
+ query_str = query_str + "&sort_key=%s&sort_dir=%s" % (key,
+ direction)
+ req = self.new_list_request('%ss' % collection,
+ params=query_str)
+ api = self._api_for_resource('%ss' % collection)
+ res = self.deserialize(req.get_response(api))
+ collection = collection.replace('-', '_')
+ expected_res = [item[collection]['id'] for item in items]
+ self.assertListEqual([n['id'] for n in res["%ss" % collection]],
+ expected_res)
+
+ def _test_list_with_pagination(self, collection, items, sort,
+ limit, expected_page_num, query_params=''):
+ if self.fmt == 'xml':
+ self.skipTest("Skip xml test for pagination")
+ query_str = query_params + '&' if query_params else ''
+ query_str = query_str + ("limit=%s&sort_key=%s&"
+ "sort_dir=%s") % (limit, sort[0], sort[1])
+ req = self.new_list_request("%ss" % collection, params=query_str)
+ items_res = []
+ page_num = 0
+ api = self._api_for_resource('%ss' % collection)
+ collection = collection.replace('-', '_')
+ while req:
+ page_num = page_num + 1
+ res = self.deserialize(req.get_response(api))
+ self.assertLessEqual(len(res["%ss" % collection]), limit)
+ items_res = items_res + res["%ss" % collection]
+ req = None
+ if '%ss_links' % collection in res:
+ for link in res['%ss_links' % collection]:
+ if link['rel'] == 'next':
+ req = create_request(link['href'],
+ '', 'application/json')
+ self.assertEqual(len(res["%ss" % collection]),
+ limit)
+ self.assertEqual(page_num, expected_page_num)
+ self.assertListEqual([n['id'] for n in items_res],
+ [item[collection]['id'] for item in items])
+
+ def _test_list_with_pagination_reverse(self, collection, items, sort,
+ limit, expected_page_num,
+ query_params=''):
+ if self.fmt == 'xml':
+ self.skipTest("Skip xml test for pagination")
+ resources = '%ss' % collection
+ collection = collection.replace('-', '_')
+ api = self._api_for_resource(resources)
+ marker = items[-1][collection]['id']
+ query_str = query_params + '&' if query_params else ''
+ query_str = query_str + ("limit=%s&page_reverse=True&"
+ "sort_key=%s&sort_dir=%s&"
+ "marker=%s") % (limit, sort[0], sort[1],
+ marker)
+ req = self.new_list_request(resources, params=query_str)
+ item_res = [items[-1][collection]]
+ page_num = 0
+ while req:
+ page_num = page_num + 1
+ res = self.deserialize(req.get_response(api))
+ self.assertLessEqual(len(res["%ss" % collection]), limit)
+ res["%ss" % collection].reverse()
+ item_res = item_res + res["%ss" % collection]
+ req = None
+ if '%ss_links' % collection in res:
+ for link in res['%ss_links' % collection]:
+ if link['rel'] == 'previous':
+ req = create_request(link['href'],
+ '', 'application/json')
+ self.assertEqual(len(res["%ss" % collection]),
+ limit)
+ self.assertEqual(page_num, expected_page_num)
+ expected_res = [item[collection]['id'] for item in items]
+ expected_res.reverse()
+ self.assertListEqual([n['id'] for n in item_res],
+ expected_res)
+
@contextlib.contextmanager
def vip(self, fmt=None, name='vip1', pool=None,
protocol='HTTP', port=80, admin_state_up=True, no_delete=False,
for k, v in keys:
self.assertEqual(res['vips'][0][k], v)
+ def test_list_vips_with_sort_emulated(self):
+ with contextlib.nested(self.vip(name='vip1', port=81),
+ self.vip(name='vip2', port=82),
+ self.vip(name='vip3', port=82)
+ ) as (vip1, vip2, vip3):
+ self._test_list_with_sort('vip', (vip1, vip3, vip2),
+ [('port', 'asc'), ('name', 'desc')])
+
+ def test_list_vips_with_pagination_emulated(self):
+ with contextlib.nested(self.vip(name='vip1'),
+ self.vip(name='vip2'),
+ self.vip(name='vip3')
+ ) as (vip1, vip2, vip3):
+ self._test_list_with_pagination('vip',
+ (vip1, vip2, vip3),
+ ('name', 'asc'), 2, 2)
+
+ def test_list_vips_with_pagination_reverse_emulated(self):
+ with contextlib.nested(self.vip(name='vip1'),
+ self.vip(name='vip2'),
+ self.vip(name='vip3')
+ ) as (vip1, vip2, vip3):
+ self._test_list_with_pagination_reverse('vip',
+ (vip1, vip2, vip3),
+ ('name', 'asc'), 2, 2)
+
def test_create_pool_with_invalid_values(self):
name = 'pool3'
for k, v in keys:
self.assertEqual(res['pool'][k], v)
+ def test_list_pools_with_sort_emulated(self):
+ with contextlib.nested(self.pool(name='p1'),
+ self.pool(name='p2'),
+ self.pool(name='p3')
+ ) as (p1, p2, p3):
+ self._test_list_with_sort('pool', (p3, p2, p1),
+ [('name', 'desc')])
+
+ def test_list_pools_with_pagination_emulated(self):
+ with contextlib.nested(self.pool(name='p1'),
+ self.pool(name='p2'),
+ self.pool(name='p3')
+ ) as (p1, p2, p3):
+ self._test_list_with_pagination('pool',
+ (p1, p2, p3),
+ ('name', 'asc'), 2, 2)
+
+ def test_list_pools_with_pagination_reverse_emulated(self):
+ with contextlib.nested(self.pool(name='p1'),
+ self.pool(name='p2'),
+ self.pool(name='p3')
+ ) as (p1, p2, p3):
+ self._test_list_with_pagination_reverse('pool',
+ (p1, p2, p3),
+ ('name', 'asc'), 2, 2)
+
def test_create_member(self):
with self.pool() as pool:
pool_id = pool['pool']['id']
for k, v in keys:
self.assertEqual(res['member'][k], v)
+ def test_list_members_with_sort_emulated(self):
+ with self.pool() as pool:
+ with contextlib.nested(self.member(pool_id=pool['pool']['id'],
+ port=81),
+ self.member(pool_id=pool['pool']['id'],
+ port=82),
+ self.member(pool_id=pool['pool']['id'],
+ port=83)
+ ) as (m1, m2, m3):
+ self._test_list_with_sort('member', (m3, m2, m1),
+ [('port', 'desc')])
+
+ def test_list_members_with_pagination_emulated(self):
+ with self.pool() as pool:
+ with contextlib.nested(self.member(pool_id=pool['pool']['id'],
+ port=81),
+ self.member(pool_id=pool['pool']['id'],
+ port=82),
+ self.member(pool_id=pool['pool']['id'],
+ port=83)
+ ) as (m1, m2, m3):
+ self._test_list_with_pagination('member',
+ (m1, m2, m3),
+ ('port', 'asc'), 2, 2)
+
+ def test_list_members_with_pagination_reverse_emulated(self):
+ with self.pool() as pool:
+ with contextlib.nested(self.member(pool_id=pool['pool']['id'],
+ port=81),
+ self.member(pool_id=pool['pool']['id'],
+ port=82),
+ self.member(pool_id=pool['pool']['id'],
+ port=83)
+ ) as (m1, m2, m3):
+ self._test_list_with_pagination_reverse('member',
+ (m1, m2, m3),
+ ('port', 'asc'), 2, 2)
+
def test_create_healthmonitor(self):
keys = [('type', "TCP"),
('tenant_id', self._tenant_id),
for k, v in keys:
self.assertEqual(res['health_monitor'][k], v)
+ def test_list_healthmonitors_with_sort_emulated(self):
+ with contextlib.nested(self.health_monitor(delay=30),
+ self.health_monitor(delay=31),
+ self.health_monitor(delay=32)
+ ) as (m1, m2, m3):
+ self._test_list_with_sort('health_monitor', (m3, m2, m1),
+ [('delay', 'desc')])
+
+ def test_list_healthmonitors_with_pagination_emulated(self):
+ with contextlib.nested(self.health_monitor(delay=30),
+ self.health_monitor(delay=31),
+ self.health_monitor(delay=32)
+ ) as (m1, m2, m3):
+ self._test_list_with_pagination('health_monitor',
+ (m1, m2, m3),
+ ('delay', 'asc'), 2, 2)
+
+ def test_list_healthmonitors_with_pagination_reverse_emulated(self):
+ with contextlib.nested(self.health_monitor(delay=30),
+ self.health_monitor(delay=31),
+ self.health_monitor(delay=32)
+ ) as (m1, m2, m3):
+ self._test_list_with_pagination_reverse('health_monitor',
+ (m1, m2, m3),
+ ('delay', 'asc'), 2, 2)
+
def test_get_pool_stats(self):
keys = [("bytes_in", 0),
("bytes_out", 0),
# under the License.
import os
+import urlparse
import mock
from oslo.config import cfg
import webtest
from quantum.api.extensions import PluginAwareExtensionManager
+from quantum.api import api_common
from quantum.api.v2 import attributes
from quantum.api.v2 import base
from quantum.api.v2 import router
config.parse(args=args)
# Update the plugin
cfg.CONF.set_override('core_plugin', plugin)
-
+ cfg.CONF.set_override('allow_pagination', True)
+ cfg.CONF.set_override('allow_sorting', True)
self._plugin_patcher = mock.patch(plugin, autospec=True)
self.plugin = self._plugin_patcher.start()
-
+ instance = self.plugin.return_value
+ instance._QuantumPluginBaseV2__native_pagination_support = True
+ instance._QuantumPluginBaseV2__native_sorting_support = True
api = router.APIRouter()
self.api = webtest.TestApp(api)
super(APIv2TestBase, self).setUp()
cfg.CONF.reset()
+class _ArgMatcher(object):
+ """ An adapter to assist mock assertions, used to custom compare """
+
+ def __init__(self, cmp, obj):
+ self.cmp = cmp
+ self.obj = obj
+
+ def __eq__(self, other):
+ return self.cmp(self.obj, other)
+
+
+def _list_cmp(l1, l2):
+ return set(l1) == set(l2)
+
+
class APIv2TestCase(APIv2TestBase):
# NOTE(jkoelker) This potentially leaks the mock object if the setUp
# raises without being caught. Using unittest2
def _do_field_list(self, resource, base_fields):
attr_info = attributes.RESOURCE_ATTRIBUTE_MAP[resource]
policy_attrs = [name for (name, info) in attr_info.items()
- if info.get('required_by_policy')]
+ if info.get('required_by_policy') or
+ info.get('primary_key')]
fields = base_fields
fields.extend(policy_attrs)
return fields
+ def _get_collection_kwargs(self, skipargs=[], **kwargs):
+ args_list = ['filters', 'fields', 'sorts', 'limit', 'marker',
+ 'page_reverse']
+ args_dict = dict((arg, mock.ANY)
+ for arg in set(args_list) - set(skipargs))
+ args_dict.update(kwargs)
+ return args_dict
+
def test_fields(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'), {'fields': 'foo'})
fields = self._do_field_list('networks', ['foo'])
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=mock.ANY,
- fields=fields)
+ kwargs = self._get_collection_kwargs(fields=fields)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_fields_multiple(self):
instance = self.plugin.return_value
fields = self._do_field_list('networks', ['foo', 'bar'])
self.api.get(_get_path('networks'), {'fields': ['foo', 'bar']})
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=mock.ANY,
- fields=fields)
+ kwargs = self._get_collection_kwargs(fields=fields)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_fields_multiple_with_empty(self):
instance = self.plugin.return_value
fields = self._do_field_list('networks', ['foo'])
self.api.get(_get_path('networks'), {'fields': ['foo', '']})
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=mock.ANY,
- fields=fields)
+ kwargs = self._get_collection_kwargs(fields=fields)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_fields_empty(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'), {'fields': ''})
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=mock.ANY,
- fields=[])
+ kwargs = self._get_collection_kwargs(fields=[])
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_fields_multiple_empty(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
self.api.get(_get_path('networks'), {'fields': ['', '']})
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=mock.ANY,
- fields=[])
+ kwargs = self._get_collection_kwargs(fields=[])
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_filters(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
- self.api.get(_get_path('networks'), {'foo': 'bar'})
- filters = {'foo': ['bar']}
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=filters,
- fields=mock.ANY)
+ self.api.get(_get_path('networks'), {'name': 'bar'})
+ filters = {'name': ['bar']}
+ kwargs = self._get_collection_kwargs(filters=filters)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_filters_empty(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
- self.api.get(_get_path('networks'), {'foo': ''})
+ self.api.get(_get_path('networks'), {'name': ''})
filters = {}
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=filters,
- fields=mock.ANY)
+ kwargs = self._get_collection_kwargs(filters=filters)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_filters_multiple_empty(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
- self.api.get(_get_path('networks'), {'foo': ['', '']})
+ self.api.get(_get_path('networks'), {'name': ['', '']})
filters = {}
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=filters,
- fields=mock.ANY)
+ kwargs = self._get_collection_kwargs(filters=filters)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_filters_multiple_with_empty(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
- self.api.get(_get_path('networks'), {'foo': ['bar', '']})
- filters = {'foo': ['bar']}
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=filters,
- fields=mock.ANY)
+ self.api.get(_get_path('networks'), {'name': ['bar', '']})
+ filters = {'name': ['bar']}
+ kwargs = self._get_collection_kwargs(filters=filters)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_filters_multiple_values(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
- self.api.get(_get_path('networks'), {'foo': ['bar', 'bar2']})
- filters = {'foo': ['bar', 'bar2']}
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=filters,
- fields=mock.ANY)
+ self.api.get(_get_path('networks'), {'name': ['bar', 'bar2']})
+ filters = {'name': ['bar', 'bar2']}
+ kwargs = self._get_collection_kwargs(filters=filters)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_filters_multiple(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
- self.api.get(_get_path('networks'), {'foo': 'bar',
- 'foo2': 'bar2'})
- filters = {'foo': ['bar'], 'foo2': ['bar2']}
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=filters,
- fields=mock.ANY)
+ self.api.get(_get_path('networks'), {'name': 'bar',
+ 'tenant_id': 'bar2'})
+ filters = {'name': ['bar'], 'tenant_id': ['bar2']}
+ kwargs = self._get_collection_kwargs(filters=filters)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_filters_with_fields(self):
instance = self.plugin.return_value
instance.get_networks.return_value = []
- self.api.get(_get_path('networks'), {'foo': 'bar', 'fields': 'foo'})
- filters = {'foo': ['bar']}
+ self.api.get(_get_path('networks'), {'name': 'bar', 'fields': 'foo'})
+ filters = {'name': ['bar']}
fields = self._do_field_list('networks', ['foo'])
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=filters,
- fields=fields)
+ kwargs = self._get_collection_kwargs(filters=filters, fields=fields)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
def test_filters_with_convert_to(self):
instance = self.plugin.return_value
self.api.get(_get_path('ports'), {'admin_state_up': 'true'})
filters = {'admin_state_up': [True]}
- instance.get_ports.assert_called_once_with(mock.ANY,
- filters=filters,
- fields=mock.ANY)
+ kwargs = self._get_collection_kwargs(filters=filters)
+ instance.get_ports.assert_called_once_with(mock.ANY, **kwargs)
def test_filters_with_convert_list_to(self):
instance = self.plugin.return_value
self.api.get(_get_path('ports'),
{'fixed_ips': ['ip_address=foo', 'subnet_id=bar']})
filters = {'fixed_ips': {'ip_address': ['foo'], 'subnet_id': ['bar']}}
- instance.get_ports.assert_called_once_with(mock.ANY,
- filters=filters,
- fields=mock.ANY)
+ kwargs = self._get_collection_kwargs(filters=filters)
+ instance.get_ports.assert_called_once_with(mock.ANY, **kwargs)
+
+ def test_limit(self):
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+
+ self.api.get(_get_path('networks'),
+ {'limit': '10'})
+ kwargs = self._get_collection_kwargs(limit=10)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
+
+ def test_limit_with_great_than_max_limit(self):
+ cfg.CONF.set_default('pagination_max_limit', '1000')
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+
+ self.api.get(_get_path('networks'),
+ {'limit': '1001'})
+ kwargs = self._get_collection_kwargs(limit=1000)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
+
+ def test_limit_with_zero(self):
+ cfg.CONF.set_default('pagination_max_limit', '1000')
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+
+ self.api.get(_get_path('networks'), {'limit': '0'})
+ kwargs = self._get_collection_kwargs(limit=1000)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
+
+ def test_limit_with_unspecific(self):
+ cfg.CONF.set_default('pagination_max_limit', '1000')
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+
+ self.api.get(_get_path('networks'))
+ kwargs = self._get_collection_kwargs(limit=1000)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
+
+ def test_limit_with_negative_value(self):
+ cfg.CONF.set_default('pagination_max_limit', '1000')
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+
+ res = self.api.get(_get_path('networks'), {'limit': -1},
+ expect_errors=True)
+ self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
+
+ def test_limit_with_non_integer(self):
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+
+ res = self.api.get(_get_path('networks'),
+ {'limit': 'abc'}, expect_errors=True)
+ self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
+
+ def test_limit_with_infinite_pagination_max_limit(self):
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+ cfg.CONF.set_override('pagination_max_limit', 'Infinite')
+ self.api.get(_get_path('networks'))
+ kwargs = self._get_collection_kwargs(limit=None)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
+
+ def test_limit_with_negative_pagination_max_limit(self):
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+ cfg.CONF.set_default('pagination_max_limit', '-1')
+ self.api.get(_get_path('networks'))
+ kwargs = self._get_collection_kwargs(limit=None)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
+
+ def test_limit_with_non_integer_pagination_max_limit(self):
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+ cfg.CONF.set_default('pagination_max_limit', 'abc')
+ self.api.get(_get_path('networks'))
+ kwargs = self._get_collection_kwargs(limit=None)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
+
+ def test_marker(self):
+ cfg.CONF.set_override('pagination_max_limit', '1000')
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+ marker = _uuid()
+ self.api.get(_get_path('networks'),
+ {'marker': marker})
+ kwargs = self._get_collection_kwargs(limit=1000, marker=marker)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
+
+ def test_page_reverse(self):
+ calls = []
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+ self.api.get(_get_path('networks'),
+ {'page_reverse': 'True'})
+ kwargs = self._get_collection_kwargs(page_reverse=True)
+ calls.append(mock.call.get_networks(mock.ANY, **kwargs))
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
+
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+ self.api.get(_get_path('networks'),
+ {'page_reverse': 'False'})
+ kwargs = self._get_collection_kwargs(page_reverse=False)
+ calls.append(mock.call.get_networks(mock.ANY, **kwargs))
+
+ def test_page_reverse_with_non_bool(self):
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+
+ self.api.get(_get_path('networks'),
+ {'page_reverse': 'abc'})
+ kwargs = self._get_collection_kwargs(page_reverse=False)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
+
+ def test_page_reverse_with_unspecific(self):
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+
+ self.api.get(_get_path('networks'))
+ kwargs = self._get_collection_kwargs(page_reverse=False)
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
+
+ def test_sort(self):
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+
+ self.api.get(_get_path('networks'),
+ {'sort_key': ['name', 'admin_state_up'],
+ 'sort_dir': ['desc', 'asc']})
+ kwargs = self._get_collection_kwargs(sorts=[('name', False),
+ ('admin_state_up', True),
+ ('id', True)])
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
+
+ def test_sort_with_primary_key(self):
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+
+ self.api.get(_get_path('networks'),
+ {'sort_key': ['name', 'admin_state_up', 'id'],
+ 'sort_dir': ['desc', 'asc', 'desc']})
+ kwargs = self._get_collection_kwargs(sorts=[('name', False),
+ ('admin_state_up', True),
+ ('id', False)])
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
+
+ def test_sort_without_direction(self):
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+
+ res = self.api.get(_get_path('networks'), {'sort_key': ['name']},
+ expect_errors=True)
+ self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
+
+ def test_sort_with_invalid_attribute(self):
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+
+ res = self.api.get(_get_path('networks'),
+ {'sort_key': 'abc',
+ 'sort_dir': 'asc'},
+ expect_errors=True)
+ self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
+
+ def test_sort_with_invalid_dirs(self):
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+
+ res = self.api.get(_get_path('networks'),
+ {'sort_key': 'name',
+ 'sort_dir': 'abc'},
+ expect_errors=True)
+ self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
+
+ def test_emulated_sort(self):
+ instance = self.plugin.return_value
+ instance._QuantumPluginBaseV2__native_pagination_support = False
+ instance._QuantumPluginBaseV2__native_sorting_support = False
+ instance.get_networks.return_value = []
+ api = webtest.TestApp(router.APIRouter())
+ api.get(_get_path('networks'), {'sort_key': ['name', 'status'],
+ 'sort_dir': ['desc', 'asc']})
+ kwargs = self._get_collection_kwargs(
+ skipargs=['sorts', 'limit', 'marker', 'page_reverse'])
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
+
+ def test_emulated_sort_without_sort_field(self):
+ instance = self.plugin.return_value
+ instance._QuantumPluginBaseV2__native_pagination_support = False
+ instance._QuantumPluginBaseV2__native_sorting_support = False
+ instance.get_networks.return_value = []
+ api = webtest.TestApp(router.APIRouter())
+ api.get(_get_path('networks'), {'sort_key': ['name', 'status'],
+ 'sort_dir': ['desc', 'asc'],
+ 'fields': ['subnets']})
+ kwargs = self._get_collection_kwargs(
+ skipargs=['sorts', 'limit', 'marker', 'page_reverse'],
+ fields=_ArgMatcher(_list_cmp, ['name',
+ 'status',
+ 'id',
+ 'subnets',
+ 'shared',
+ 'tenant_id']))
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
+
+ def test_emulated_pagination(self):
+ instance = self.plugin.return_value
+ instance._QuantumPluginBaseV2__native_pagination_support = False
+ instance.get_networks.return_value = []
+ api = webtest.TestApp(router.APIRouter())
+ api.get(_get_path('networks'), {'limit': 10,
+ 'marker': 'foo',
+ 'page_reverse': False})
+ kwargs = self._get_collection_kwargs(skipargs=['limit',
+ 'marker',
+ 'page_reverse'])
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
+
+ def test_native_pagination_without_native_sorting(self):
+ instance = self.plugin.return_value
+ instance._QuantumPluginBaseV2__native_sorting_support = False
+ self.assertRaises(Exception, router.APIRouter)
+
+ def test_native_pagination_without_allow_sorting(self):
+ cfg.CONF.set_override('allow_sorting', False)
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = []
+ api = webtest.TestApp(router.APIRouter())
+ api.get(_get_path('networks'),
+ {'sort_key': ['name', 'admin_state_up'],
+ 'sort_dir': ['desc', 'asc']})
+ kwargs = self._get_collection_kwargs(sorts=[('name', False),
+ ('admin_state_up', True),
+ ('id', True)])
+ instance.get_networks.assert_called_once_with(mock.ANY, **kwargs)
# Note: since all resources use the same controller and validation
tenant_id = _uuid()
self._test_list(tenant_id + "bad", tenant_id)
+ def test_list_pagination(self):
+ id1 = str(_uuid())
+ id2 = str(_uuid())
+ input_dict1 = {'id': id1,
+ 'name': 'net1',
+ 'admin_state_up': True,
+ 'status': "ACTIVE",
+ 'tenant_id': '',
+ 'shared': False,
+ 'subnets': []}
+ input_dict2 = {'id': id2,
+ 'name': 'net2',
+ 'admin_state_up': True,
+ 'status': "ACTIVE",
+ 'tenant_id': '',
+ 'shared': False,
+ 'subnets': []}
+ return_value = [input_dict1, input_dict2]
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = return_value
+ params = {'limit': ['2'],
+ 'marker': [str(_uuid())],
+ 'sort_key': ['name'],
+ 'sort_dir': ['asc']}
+ res = self.api.get(_get_path('networks'),
+ params=params).json
+
+ self.assertEqual(len(res['networks']), 2)
+ self.assertItemsEqual([id1, id2],
+ [res['networks'][0]['id'],
+ res['networks'][1]['id']])
+
+ self.assertIn('networks_links', res)
+ next_links = []
+ previous_links = []
+ for r in res['networks_links']:
+ if r['rel'] == 'next':
+ next_links.append(r)
+ if r['rel'] == 'previous':
+ previous_links.append(r)
+ self.assertEqual(len(next_links), 1)
+ self.assertEqual(len(previous_links), 1)
+
+ url = urlparse.urlparse(next_links[0]['href'])
+ self.assertEqual(url.path, _get_path('networks'))
+ params['marker'] = [id2]
+ self.assertEqual(urlparse.parse_qs(url.query), params)
+
+ url = urlparse.urlparse(previous_links[0]['href'])
+ self.assertEqual(url.path, _get_path('networks'))
+ params['marker'] = [id1]
+ params['page_reverse'] = ['True']
+ self.assertEqual(urlparse.parse_qs(url.query), params)
+
+ def test_list_pagination_with_last_page(self):
+ id = str(_uuid())
+ input_dict = {'id': id,
+ 'name': 'net1',
+ 'admin_state_up': True,
+ 'status': "ACTIVE",
+ 'tenant_id': '',
+ 'shared': False,
+ 'subnets': []}
+ return_value = [input_dict]
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = return_value
+ params = {'limit': ['2'],
+ 'marker': str(_uuid())}
+ res = self.api.get(_get_path('networks'),
+ params=params).json
+
+ self.assertEqual(len(res['networks']), 1)
+ self.assertEqual(id, res['networks'][0]['id'])
+
+ self.assertIn('networks_links', res)
+ previous_links = []
+ for r in res['networks_links']:
+ self.assertNotEqual(r['rel'], 'next')
+ if r['rel'] == 'previous':
+ previous_links.append(r)
+ self.assertEqual(len(previous_links), 1)
+
+ url = urlparse.urlparse(previous_links[0]['href'])
+ self.assertEqual(url.path, _get_path('networks'))
+ expect_params = params.copy()
+ expect_params['marker'] = [id]
+ expect_params['page_reverse'] = ['True']
+ self.assertEqual(urlparse.parse_qs(url.query), expect_params)
+
+ def test_list_pagination_with_empty_page(self):
+ return_value = []
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = return_value
+ params = {'limit': ['2'],
+ 'marker': str(_uuid())}
+ res = self.api.get(_get_path('networks'),
+ params=params).json
+
+ self.assertEqual(res['networks'], [])
+
+ previous_links = []
+ if 'networks_links' in res:
+ for r in res['networks_links']:
+ self.assertNotEqual(r['rel'], 'next')
+ if r['rel'] == 'previous':
+ previous_links.append(r)
+ self.assertEqual(len(previous_links), 1)
+
+ url = urlparse.urlparse(previous_links[0]['href'])
+ self.assertEqual(url.path, _get_path('networks'))
+ expect_params = params.copy()
+ del expect_params['marker']
+ expect_params['page_reverse'] = ['True']
+ self.assertEqual(urlparse.parse_qs(url.query), expect_params)
+
+ def test_list_pagination_reverse_with_last_page(self):
+ id = str(_uuid())
+ input_dict = {'id': id,
+ 'name': 'net1',
+ 'admin_state_up': True,
+ 'status': "ACTIVE",
+ 'tenant_id': '',
+ 'shared': False,
+ 'subnets': []}
+ return_value = [input_dict]
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = return_value
+ params = {'limit': ['2'],
+ 'marker': [str(_uuid())],
+ 'page_reverse': ['True']}
+ res = self.api.get(_get_path('networks'),
+ params=params).json
+
+ self.assertEqual(len(res['networks']), 1)
+ self.assertEqual(id, res['networks'][0]['id'])
+
+ self.assertIn('networks_links', res)
+ next_links = []
+ for r in res['networks_links']:
+ self.assertNotEqual(r['rel'], 'previous')
+ if r['rel'] == 'next':
+ next_links.append(r)
+ self.assertEqual(len(next_links), 1)
+
+ url = urlparse.urlparse(next_links[0]['href'])
+ self.assertEqual(url.path, _get_path('networks'))
+ expected_params = params.copy()
+ del expected_params['page_reverse']
+ expected_params['marker'] = [id]
+ self.assertEqual(urlparse.parse_qs(url.query),
+ expected_params)
+
+ def test_list_pagination_reverse_with_empty_page(self):
+ return_value = []
+ instance = self.plugin.return_value
+ instance.get_networks.return_value = return_value
+ params = {'limit': ['2'],
+ 'marker': [str(_uuid())],
+ 'page_reverse': ['True']}
+ res = self.api.get(_get_path('networks'),
+ params=params).json
+ self.assertEqual(res['networks'], [])
+
+ next_links = []
+ if 'networks_links' in res:
+ for r in res['networks_links']:
+ self.assertNotEqual(r['rel'], 'previous')
+ if r['rel'] == 'next':
+ next_links.append(r)
+ self.assertEqual(len(next_links), 1)
+
+ url = urlparse.urlparse(next_links[0]['href'])
+ self.assertEqual(url.path, _get_path('networks'))
+ expect_params = params.copy()
+ del expect_params['marker']
+ del expect_params['page_reverse']
+ self.assertEqual(urlparse.parse_qs(url.query), expect_params)
+
def test_create(self):
net_id = _uuid()
data = {'network': {'name': 'net1', 'admin_state_up': True,
self._plugin_patcher.stop()
self.api = None
self.plugin = None
+ router.SUB_RESOURCES = {}
cfg.CONF.reset()
def test_index_sub_resource(self):
return
-class FieldsTestCase(unittest.TestCase):
- def test_with_fields(self):
+class ListArgsTestCase(unittest.TestCase):
+ def test_list_args(self):
path = '/?fields=4&foo=3&fields=2&bar=1'
request = webob.Request.blank(path)
expect_val = ['2', '4']
- actual_val = base._fields(request)
+ actual_val = api_common.list_args(request, 'fields')
self.assertItemsEqual(actual_val, expect_val)
- def test_without_fields(self):
+ def test_list_args_with_empty(self):
path = '/?foo=4&bar=3&baz=2&qux=1'
request = webob.Request.blank(path)
- self.assertListEqual([], base._fields(request))
+ self.assertEqual([], api_common.list_args(request, 'fields'))
class FiltersTestCase(unittest.TestCase):
- def test_all_fields(self):
+ def test_all_skip_args(self):
path = '/?fields=4&fields=3&fields=2&fields=1'
request = webob.Request.blank(path)
- self.assertDictEqual({}, base._filters(request, None))
+ self.assertEqual({}, api_common.get_filters(request, None,
+ ["fields"]))
def test_blank_values(self):
path = '/?foo=&bar=&baz=&qux='
request = webob.Request.blank(path)
- self.assertDictEqual({}, base._filters(request, {}))
+ self.assertEqual({}, api_common.get_filters(request, {}))
def test_no_attr_info(self):
path = '/?foo=4&bar=3&baz=2&qux=1'
request = webob.Request.blank(path)
expect_val = {'foo': ['4'], 'bar': ['3'], 'baz': ['2'], 'qux': ['1']}
- actual_val = base._filters(request, {})
- self.assertDictEqual(actual_val, expect_val)
+ actual_val = api_common.get_filters(request, {})
+ self.assertEqual(actual_val, expect_val)
def test_attr_info_without_conversion(self):
path = '/?foo=4&bar=3&baz=2&qux=1'
request = webob.Request.blank(path)
attr_info = {'foo': {'key': 'val'}}
expect_val = {'foo': ['4'], 'bar': ['3'], 'baz': ['2'], 'qux': ['1']}
- actual_val = base._filters(request, attr_info)
- self.assertDictEqual(actual_val, expect_val)
+ actual_val = api_common.get_filters(request, attr_info)
+ self.assertEqual(actual_val, expect_val)
def test_attr_info_with_convert_list_to(self):
path = '/?foo=key=4&bar=3&foo=key=2&qux=1'
}
}
expect_val = {'foo': {'key': ['2', '4']}, 'bar': ['3'], 'qux': ['1']}
- actual_val = base._filters(request, attr_info)
- self.assertDictEqual(actual_val, expect_val)
+ actual_val = api_common.get_filters(request, attr_info)
+ self.assertEqual(actual_val, expect_val)
def test_attr_info_with_convert_to(self):
path = '/?foo=4&bar=3&baz=2&qux=1'
request = webob.Request.blank(path)
attr_info = {'foo': {'convert_to': attributes.convert_to_int}}
expect_val = {'foo': [4], 'bar': ['3'], 'baz': ['2'], 'qux': ['1']}
- actual_val = base._filters(request, attr_info)
- self.assertDictEqual(actual_val, expect_val)
+ actual_val = api_common.get_filters(request, attr_info)
+ self.assertEqual(actual_val, expect_val)
class CreateResourceTestCase(unittest.TestCase):
import webob.exc
import quantum
+from quantum.api import api_common
from quantum.api.extensions import PluginAwareExtensionManager
from quantum.api.v2 import attributes
from quantum.api.v2.attributes import ATTR_NOT_SPECIFIED
return os.path.join(ETCDIR, *p)
+def _fake_get_pagination_helper(self, request):
+ return api_common.PaginationEmulatedHelper(request, self._primary_key)
+
+
+def _fake_get_sorting_helper(self, request):
+ return api_common.SortingEmulatedHelper(request, self._attr_info)
+
+
class QuantumDbPluginV2TestCase(testlib_api.WebTestCase):
fmt = 'json'
cfg.CONF.set_override('base_mac', "12:34:56:78:90:ab")
cfg.CONF.set_override('max_dns_nameservers', 2)
cfg.CONF.set_override('max_subnet_host_routes', 2)
+ cfg.CONF.set_override('allow_pagination', True)
+ cfg.CONF.set_override('allow_sorting', True)
self.api = APIRouter()
# Set the defualt port status
self.port_create_status = 'ACTIVE'
self._skip_native_bulk = not _is_native_bulk_supported()
+ def _is_native_pagination_support():
+ native_pagination_attr_name = (
+ "_%s__native_pagination_support" %
+ QuantumManager.get_plugin().__class__.__name__)
+ return (cfg.CONF.allow_pagination and
+ getattr(QuantumManager.get_plugin(),
+ native_pagination_attr_name, False))
+
+ self._skip_native_pagination = not _is_native_pagination_support()
+
+ def _is_native_sorting_support():
+ native_sorting_attr_name = (
+ "_%s__native_sorting_support" %
+ QuantumManager.get_plugin().__class__.__name__)
+ return (cfg.CONF.allow_sorting and
+ getattr(QuantumManager.get_plugin(),
+ native_sorting_attr_name, False))
+
+ self._skip_native_sorting = not _is_native_sorting_support()
+
ext_mgr = test_config.get('extension_manager', None)
if ext_mgr:
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
self.api = None
self._deserializers = None
self._skip_native_bulk = None
+ self._skip_native_pagination = None
+ self._skip_native_sortin = None
self.ext_api = None
# NOTE(jkoelker) for a 'pluggable' framework, Quantum sure
# doesn't like when the plugin changes ;)
res = self._list('%ss' % resource,
quantum_context=quantum_context,
query_params=query_params)
+ resource = resource.replace('-', '_')
self.assertItemsEqual([i['id'] for i in res['%ss' % resource]],
[i[resource]['id'] for i in items])
if not no_delete:
self._delete('ports', port['port']['id'])
+ def _test_list_with_sort(self, collection, items, sorts, query_params=''):
+ query_str = query_params
+ for key, direction in sorts:
+ query_str = query_str + "&sort_key=%s&sort_dir=%s" % (key,
+ direction)
+ req = self.new_list_request('%ss' % collection,
+ params=query_str)
+ api = self._api_for_resource('%ss' % collection)
+ res = self.deserialize(self.fmt, req.get_response(api))
+ collection = collection.replace('-', '_')
+ expected_res = [item[collection]['id'] for item in items]
+ self.assertListEqual([n['id'] for n in res["%ss" % collection]],
+ expected_res)
+
+ def _test_list_with_pagination(self, collection, items, sort,
+ limit, expected_page_num, query_params='',
+ verify_key='id'):
+ if self.fmt == 'xml':
+ self.skipTest("Skip xml test for pagination")
+ query_str = query_params + '&' if query_params else ''
+ query_str = query_str + ("limit=%s&sort_key=%s&"
+ "sort_dir=%s") % (limit, sort[0], sort[1])
+ req = self.new_list_request("%ss" % collection, params=query_str)
+ items_res = []
+ page_num = 0
+ api = self._api_for_resource('%ss' % collection)
+ collection = collection.replace('-', '_')
+ while req:
+ page_num = page_num + 1
+ res = self.deserialize(self.fmt, req.get_response(api))
+ self.assertLessEqual(len(res["%ss" % collection]), limit)
+ items_res = items_res + res["%ss" % collection]
+ req = None
+ if '%ss_links' % collection in res:
+ for link in res['%ss_links' % collection]:
+ if link['rel'] == 'next':
+ content_type = 'application/%s' % self.fmt
+ req = testlib_api.create_request(link['href'],
+ '', content_type)
+ self.assertEqual(len(res["%ss" % collection]),
+ limit)
+ self.assertEqual(page_num, expected_page_num)
+ self.assertListEqual([n[verify_key] for n in items_res],
+ [item[collection][verify_key] for item in items])
+
+ def _test_list_with_pagination_reverse(self, collection, items, sort,
+ limit, expected_page_num,
+ query_params=''):
+ if self.fmt == 'xml':
+ self.skipTest("Skip xml test for pagination")
+ resources = '%ss' % collection
+ collection = collection.replace('-', '_')
+ api = self._api_for_resource(resources)
+ marker = items[-1][collection]['id']
+ query_str = query_params + '&' if query_params else ''
+ query_str = query_str + ("limit=%s&page_reverse=True&"
+ "sort_key=%s&sort_dir=%s&"
+ "marker=%s") % (limit, sort[0], sort[1],
+ marker)
+ req = self.new_list_request(resources, params=query_str)
+ item_res = [items[-1][collection]]
+ page_num = 0
+ while req:
+ page_num = page_num + 1
+ res = self.deserialize(self.fmt, req.get_response(api))
+ self.assertLessEqual(len(res["%ss" % collection]), limit)
+ res["%ss" % collection].reverse()
+ item_res = item_res + res["%ss" % collection]
+ req = None
+ if '%ss_links' % collection in res:
+ for link in res['%ss_links' % collection]:
+ if link['rel'] == 'previous':
+ content_type = 'application/%s' % self.fmt
+ req = testlib_api.create_request(link['href'],
+ '', content_type)
+ self.assertEqual(len(res["%ss" % collection]),
+ limit)
+ self.assertEqual(page_num, expected_page_num)
+ expected_res = [item[collection]['id'] for item in items]
+ expected_res.reverse()
+ self.assertListEqual([n['id'] for n in item_res],
+ expected_res)
+
class TestBasicGet(QuantumDbPluginV2TestCase):
self._test_list_resources('port', [port2],
quantum_context=q_context)
+ def test_list_ports_with_sort_native(self):
+ if self._skip_native_sorting:
+ self.skipTest("Skip test for not implemented sorting feature")
+ cfg.CONF.set_default('allow_overlapping_ips', True)
+ with contextlib.nested(self.port(admin_state_up='True',
+ mac_address='00:00:00:00:00:01'),
+ self.port(admin_state_up='False',
+ mac_address='00:00:00:00:00:02'),
+ self.port(admin_state_up='False',
+ mac_address='00:00:00:00:00:03')
+ ) as (port1, port2, port3):
+ self._test_list_with_sort('port', (port3, port2, port1),
+ [('admin_state_up', 'asc'),
+ ('mac_address', 'desc')])
+
+ def test_list_ports_with_sort_emulated(self):
+ helper_patcher = mock.patch(
+ 'quantum.api.v2.base.Controller._get_sorting_helper',
+ new=_fake_get_sorting_helper)
+ helper_patcher.start()
+ try:
+ cfg.CONF.set_default('allow_overlapping_ips', True)
+ with contextlib.nested(self.port(admin_state_up='True',
+ mac_address='00:00:00:00:00:01'),
+ self.port(admin_state_up='False',
+ mac_address='00:00:00:00:00:02'),
+ self.port(admin_state_up='False',
+ mac_address='00:00:00:00:00:03')
+ ) as (port1, port2, port3):
+ self._test_list_with_sort('port', (port3, port2, port1),
+ [('admin_state_up', 'asc'),
+ ('mac_address', 'desc')])
+ finally:
+ helper_patcher.stop()
+
+ def test_list_ports_with_pagination_native(self):
+ if self._skip_native_pagination:
+ self.skipTest("Skip test for not implemented pagination feature")
+ cfg.CONF.set_default('allow_overlapping_ips', True)
+ with contextlib.nested(self.port(mac_address='00:00:00:00:00:01'),
+ self.port(mac_address='00:00:00:00:00:02'),
+ self.port(mac_address='00:00:00:00:00:03')
+ ) as (port1, port2, port3):
+ self._test_list_with_pagination('port',
+ (port1, port2, port3),
+ ('mac_address', 'asc'), 2, 2)
+
+ def test_list_ports_with_pagination_emulated(self):
+ helper_patcher = mock.patch(
+ 'quantum.api.v2.base.Controller._get_pagination_helper',
+ new=_fake_get_pagination_helper)
+ helper_patcher.start()
+ try:
+ cfg.CONF.set_default('allow_overlapping_ips', True)
+ with contextlib.nested(self.port(mac_address='00:00:00:00:00:01'),
+ self.port(mac_address='00:00:00:00:00:02'),
+ self.port(mac_address='00:00:00:00:00:03')
+ ) as (port1, port2, port3):
+ self._test_list_with_pagination('port',
+ (port1, port2, port3),
+ ('mac_address', 'asc'), 2, 2)
+ finally:
+ helper_patcher.stop()
+
+ def test_list_ports_with_pagination_reverse_native(self):
+ if self._skip_native_pagination:
+ self.skipTest("Skip test for not implemented pagination feature")
+ cfg.CONF.set_default('allow_overlapping_ips', True)
+ with contextlib.nested(self.port(mac_address='00:00:00:00:00:01'),
+ self.port(mac_address='00:00:00:00:00:02'),
+ self.port(mac_address='00:00:00:00:00:03')
+ ) as (port1, port2, port3):
+ self._test_list_with_pagination_reverse('port',
+ (port1, port2, port3),
+ ('mac_address', 'asc'),
+ 2, 2)
+
+ def test_list_ports_with_pagination_reverse_emulated(self):
+ helper_patcher = mock.patch(
+ 'quantum.api.v2.base.Controller._get_pagination_helper',
+ new=_fake_get_pagination_helper)
+ helper_patcher.start()
+ try:
+ cfg.CONF.set_default('allow_overlapping_ips', True)
+ with contextlib.nested(self.port(mac_address='00:00:00:00:00:01'),
+ self.port(mac_address='00:00:00:00:00:02'),
+ self.port(mac_address='00:00:00:00:00:03')
+ ) as (port1, port2, port3):
+ self._test_list_with_pagination_reverse('port',
+ (port1, port2, port3),
+ ('mac_address', 'asc'),
+ 2, 2)
+ finally:
+ helper_patcher.stop()
+
def test_show_port(self):
with self.port() as port:
req = self.new_show_request('ports', port['port']['id'], self.fmt)
self.network()) as networks:
self._test_list_resources('network', networks)
+ def test_list_networks_with_sort_native(self):
+ if self._skip_native_sorting:
+ self.skipTest("Skip test for not implemented sorting feature")
+ with contextlib.nested(self.network(admin_status_up=True,
+ name='net1'),
+ self.network(admin_status_up=False,
+ name='net2'),
+ self.network(admin_status_up=False,
+ name='net3')
+ ) as (net1, net2, net3):
+ self._test_list_with_sort('network', (net3, net2, net1),
+ [('admin_state_up', 'asc'),
+ ('name', 'desc')])
+
+ def test_list_networks_with_sort_extended_attr_native_returns_400(self):
+ if self._skip_native_sorting:
+ self.skipTest("Skip test for not implemented sorting feature")
+ with contextlib.nested(self.network(admin_status_up=True,
+ name='net1'),
+ self.network(admin_status_up=False,
+ name='net2'),
+ self.network(admin_status_up=False,
+ name='net3')
+ ):
+ req = self.new_list_request(
+ 'networks',
+ params='sort_key=provider:segmentation_id&sort_dir=asc')
+ res = req.get_response(self.api)
+ self.assertEqual(400, res.status_int)
+
+ def test_list_networks_with_sort_remote_key_native_returns_400(self):
+ if self._skip_native_sorting:
+ self.skipTest("Skip test for not implemented sorting feature")
+ with contextlib.nested(self.network(admin_status_up=True,
+ name='net1'),
+ self.network(admin_status_up=False,
+ name='net2'),
+ self.network(admin_status_up=False,
+ name='net3')
+ ):
+ req = self.new_list_request(
+ 'networks', params='sort_key=subnets&sort_dir=asc')
+ res = req.get_response(self.api)
+ self.assertEqual(400, res.status_int)
+
+ def test_list_networks_with_sort_emulated(self):
+ helper_patcher = mock.patch(
+ 'quantum.api.v2.base.Controller._get_sorting_helper',
+ new=_fake_get_sorting_helper)
+ helper_patcher.start()
+ try:
+ with contextlib.nested(self.network(admin_status_up=True,
+ name='net1'),
+ self.network(admin_status_up=False,
+ name='net2'),
+ self.network(admin_status_up=False,
+ name='net3')
+ ) as (net1, net2, net3):
+ self._test_list_with_sort('network', (net3, net2, net1),
+ [('admin_state_up', 'asc'),
+ ('name', 'desc')])
+ finally:
+ helper_patcher.stop()
+
+ def test_list_networks_with_pagination_native(self):
+ if self._skip_native_pagination:
+ self.skipTest("Skip test for not implemented pagination feature")
+ with contextlib.nested(self.network(name='net1'),
+ self.network(name='net2'),
+ self.network(name='net3')
+ ) as (net1, net2, net3):
+ self._test_list_with_pagination('network',
+ (net1, net2, net3),
+ ('name', 'asc'), 2, 2)
+
+ def test_list_networks_with_pagination_emulated(self):
+ helper_patcher = mock.patch(
+ 'quantum.api.v2.base.Controller._get_pagination_helper',
+ new=_fake_get_pagination_helper)
+ helper_patcher.start()
+ try:
+ with contextlib.nested(self.network(name='net1'),
+ self.network(name='net2'),
+ self.network(name='net3')
+ ) as (net1, net2, net3):
+ self._test_list_with_pagination('network',
+ (net1, net2, net3),
+ ('name', 'asc'), 2, 2)
+ finally:
+ helper_patcher.stop()
+
+ def test_list_networks_without_pk_in_fields_pagination_emulated(self):
+ helper_patcher = mock.patch(
+ 'quantum.api.v2.base.Controller._get_pagination_helper',
+ new=_fake_get_pagination_helper)
+ helper_patcher.start()
+ try:
+ with contextlib.nested(self.network(name='net1',
+ shared=True),
+ self.network(name='net2',
+ shared=False),
+ self.network(name='net3',
+ shared=True)
+ ) as (net1, net2, net3):
+ self._test_list_with_pagination('network',
+ (net1, net2, net3),
+ ('name', 'asc'), 2, 2,
+ query_params="fields=name",
+ verify_key='name')
+ finally:
+ helper_patcher.stop()
+
+ def test_list_networks_without_pk_in_fields_pagination_native(self):
+ if self._skip_native_pagination:
+ self.skipTest("Skip test for not implemented pagination feature")
+ with contextlib.nested(self.network(name='net1'),
+ self.network(name='net2'),
+ self.network(name='net3')
+ ) as (net1, net2, net3):
+ self._test_list_with_pagination('network',
+ (net1, net2, net3),
+ ('name', 'asc'), 2, 2,
+ query_params="fields=shared",
+ verify_key='shared')
+
+ def test_list_networks_with_pagination_reverse_native(self):
+ if self._skip_native_pagination:
+ self.skipTest("Skip test for not implemented pagination feature")
+ with contextlib.nested(self.network(name='net1'),
+ self.network(name='net2'),
+ self.network(name='net3')
+ ) as (net1, net2, net3):
+ self._test_list_with_pagination_reverse('network',
+ (net1, net2, net3),
+ ('name', 'asc'), 2, 2)
+
+ def test_list_networks_with_pagination_reverse_emulated(self):
+ helper_patcher = mock.patch(
+ 'quantum.api.v2.base.Controller._get_pagination_helper',
+ new=_fake_get_pagination_helper)
+ helper_patcher.start()
+ try:
+ with contextlib.nested(self.network(name='net1'),
+ self.network(name='net2'),
+ self.network(name='net3')
+ ) as (net1, net2, net3):
+ self._test_list_with_pagination_reverse('network',
+ (net1, net2, net3),
+ ('name', 'asc'), 2, 2)
+ finally:
+ helper_patcher.stop()
+
def test_list_networks_with_parameters(self):
with contextlib.nested(self.network(name='net1',
admin_state_up=False),
self._test_list_resources('subnet', [],
query_params=query_params)
+ def test_list_subnets_with_sort_native(self):
+ if self._skip_native_sorting:
+ self.skipTest("Skip test for not implemented sorting feature")
+ with contextlib.nested(self.subnet(enable_dhcp=True,
+ cidr='10.0.0.0/24'),
+ self.subnet(enable_dhcp=False,
+ cidr='11.0.0.0/24'),
+ self.subnet(enable_dhcp=False,
+ cidr='12.0.0.0/24')
+ ) as (subnet1, subnet2, subnet3):
+ self._test_list_with_sort('subnet', (subnet3, subnet2, subnet1),
+ [('enable_dhcp', 'asc'),
+ ('cidr', 'desc')])
+
+ def test_list_subnets_with_sort_emulated(self):
+ helper_patcher = mock.patch(
+ 'quantum.api.v2.base.Controller._get_sorting_helper',
+ new=_fake_get_sorting_helper)
+ helper_patcher.start()
+ try:
+ with contextlib.nested(self.subnet(enable_dhcp=True,
+ cidr='10.0.0.0/24'),
+ self.subnet(enable_dhcp=False,
+ cidr='11.0.0.0/24'),
+ self.subnet(enable_dhcp=False,
+ cidr='12.0.0.0/24')
+ ) as (subnet1, subnet2, subnet3):
+ self._test_list_with_sort('subnet', (subnet3,
+ subnet2,
+ subnet1),
+ [('enable_dhcp', 'asc'),
+ ('cidr', 'desc')])
+ finally:
+ helper_patcher.stop()
+
+ def test_list_subnets_with_pagination_native(self):
+ if self._skip_native_pagination:
+ self.skipTest("Skip test for not implemented sorting feature")
+ with contextlib.nested(self.subnet(cidr='10.0.0.0/24'),
+ self.subnet(cidr='11.0.0.0/24'),
+ self.subnet(cidr='12.0.0.0/24')
+ ) as (subnet1, subnet2, subnet3):
+ self._test_list_with_pagination('subnet',
+ (subnet1, subnet2, subnet3),
+ ('cidr', 'asc'), 2, 2)
+
+ def test_list_subnets_with_pagination_emulated(self):
+ helper_patcher = mock.patch(
+ 'quantum.api.v2.base.Controller._get_pagination_helper',
+ new=_fake_get_pagination_helper)
+ helper_patcher.start()
+ try:
+ with contextlib.nested(self.subnet(cidr='10.0.0.0/24'),
+ self.subnet(cidr='11.0.0.0/24'),
+ self.subnet(cidr='12.0.0.0/24')
+ ) as (subnet1, subnet2, subnet3):
+ self._test_list_with_pagination('subnet',
+ (subnet1, subnet2, subnet3),
+ ('cidr', 'asc'), 2, 2)
+ finally:
+ helper_patcher.stop()
+
+ def test_list_subnets_with_pagination_reverse_native(self):
+ if self._skip_native_sorting:
+ self.skipTest("Skip test for not implemented sorting feature")
+ with contextlib.nested(self.subnet(cidr='10.0.0.0/24'),
+ self.subnet(cidr='11.0.0.0/24'),
+ self.subnet(cidr='12.0.0.0/24')
+ ) as (subnet1, subnet2, subnet3):
+ self._test_list_with_pagination_reverse('subnet',
+ (subnet1, subnet2,
+ subnet3),
+ ('cidr', 'asc'), 2, 2)
+
+ def test_list_subnets_with_pagination_reverse_emulated(self):
+ helper_patcher = mock.patch(
+ 'quantum.api.v2.base.Controller._get_pagination_helper',
+ new=_fake_get_pagination_helper)
+ helper_patcher.start()
+ try:
+ with contextlib.nested(self.subnet(cidr='10.0.0.0/24'),
+ self.subnet(cidr='11.0.0.0/24'),
+ self.subnet(cidr='12.0.0.0/24')
+ ) as (subnet1, subnet2, subnet3):
+ self._test_list_with_pagination_reverse('subnet',
+ (subnet1, subnet2,
+ subnet3),
+ ('cidr', 'asc'), 2, 2)
+ finally:
+ helper_patcher.stop()
+
def test_invalid_ip_version(self):
with self.network() as network:
data = {'subnet': {'network_id': network['network']['id'],
associating ports with security groups.
"""
+ __native_pagination_support = True
+ __native_sorting_support = True
+
supported_extension_aliases = ["security-group"]
def create_port(self, context, port):
return super(SecurityGroupTestPlugin, self).create_network(context,
network)
- def get_ports(self, context, filters=None, fields=None):
+ def get_ports(self, context, filters=None, fields=None,
+ sorts=[], limit=None, marker=None,
+ page_reverse=False):
quantum_lports = super(SecurityGroupTestPlugin, self).get_ports(
- context, filters)
+ context, filters, sorts=sorts, limit=limit, marker=marker,
+ page_reverse=page_reverse)
for quantum_lport in quantum_lports:
self._extend_port_dict_security_group(context, quantum_lport)
return quantum_lports
self.assertEqual(res.status_int, 409)
def test_list_security_groups(self):
- name = 'webservers'
- description = 'my webservers'
- with self.security_group(name, description):
- res = self.new_list_request('security-groups')
- groups = self.deserialize(self.fmt, res.get_response(self.ext_api))
- self.assertEqual(len(groups['security_groups']), 2)
- for group in groups['security_groups']:
- if group['name'] == 'default':
- self.assertEquals(len(group['security_group_rules']), 2)
- else:
- self.assertEquals(len(group['security_group_rules']), 0)
+ with contextlib.nested(self.security_group(name='sg1',
+ description='sg'),
+ self.security_group(name='sg2',
+ description='sg'),
+ self.security_group(name='sg3',
+ description='sg')
+ ) as security_groups:
+ self._test_list_resources('security-group',
+ security_groups,
+ query_params='description=sg')
+
+ def test_list_security_groups_with_sort(self):
+ with contextlib.nested(self.security_group(name='sg1',
+ description='sg'),
+ self.security_group(name='sg2',
+ description='sg'),
+ self.security_group(name='sg3',
+ description='sg')
+ ) as (sg1, sg2, sg3):
+ self._test_list_with_sort('security-group',
+ (sg3, sg2, sg1),
+ [('name', 'desc')],
+ query_params='description=sg')
+
+ def test_list_security_groups_with_pagination(self):
+ with contextlib.nested(self.security_group(name='sg1',
+ description='sg'),
+ self.security_group(name='sg2',
+ description='sg'),
+ self.security_group(name='sg3',
+ description='sg')
+ ) as (sg1, sg2, sg3):
+ self._test_list_with_pagination('security-group',
+ (sg1, sg2, sg3),
+ ('name', 'asc'), 2, 2,
+ query_params='description=sg')
+
+ def test_list_security_groups_with_pagination_reverse(self):
+ with contextlib.nested(self.security_group(name='sg1',
+ description='sg'),
+ self.security_group(name='sg2',
+ description='sg'),
+ self.security_group(name='sg3',
+ description='sg')
+ ) as (sg1, sg2, sg3):
+ self._test_list_with_pagination_reverse(
+ 'security-group', (sg1, sg2, sg3), ('name', 'asc'), 2, 2,
+ query_params='description=sg')
def test_create_security_group_rule_ethertype_invalid_as_number(self):
name = 'webservers'
self.assertEquals(len(port[ext_sg.SECURITYGROUPS]), 1)
self._delete('ports', port['id'])
+ def test_list_security_group_rules(self):
+ with self.security_group(name='sg') as sg:
+ security_group_id = sg['security_group']['id']
+ with contextlib.nested(self.security_group_rule(security_group_id,
+ direction='egress',
+ port_range_min=22,
+ port_range_max=22),
+ self.security_group_rule(security_group_id,
+ direction='egress',
+ port_range_min=23,
+ port_range_max=23),
+ self.security_group_rule(security_group_id,
+ direction='egress',
+ port_range_min=24,
+ port_range_max=24)
+ ) as (sgr1, sgr2, sgr3):
+ self._test_list_resources('security-group-rule',
+ [sgr1, sgr2, sgr3],
+ query_params="direction=egress")
+
+ def test_list_security_group_rules_with_sort(self):
+ with self.security_group(name='sg') as sg:
+ security_group_id = sg['security_group']['id']
+ with contextlib.nested(self.security_group_rule(security_group_id,
+ direction='egress',
+ port_range_min=22,
+ port_range_max=22),
+ self.security_group_rule(security_group_id,
+ direction='egress',
+ port_range_min=23,
+ port_range_max=23),
+ self.security_group_rule(security_group_id,
+ direction='egress',
+ port_range_min=24,
+ port_range_max=24)
+ ) as (sgr1, sgr2, sgr3):
+ self._test_list_with_sort('security-group-rule',
+ (sgr3, sgr2, sgr1),
+ [('port_range_max', 'desc')],
+ query_params='direction=egress')
+
+ def test_list_security_group_rules_with_pagination(self):
+ with self.security_group(name='sg') as sg:
+ security_group_id = sg['security_group']['id']
+ with contextlib.nested(self.security_group_rule(security_group_id,
+ direction='egress',
+ port_range_min=22,
+ port_range_max=22),
+ self.security_group_rule(security_group_id,
+ direction='egress',
+ port_range_min=23,
+ port_range_max=23),
+ self.security_group_rule(security_group_id,
+ direction='egress',
+ port_range_min=24,
+ port_range_max=24)
+ ) as (sgr1, sgr2, sgr3):
+ self._test_list_with_pagination(
+ 'security-group-rule', (sgr3, sgr2, sgr1),
+ ('port_range_max', 'desc'), 2, 2,
+ query_params='direction=egress')
+
+ def test_list_security_group_rules_with_pagination_reverse(self):
+ with self.security_group(name='sg') as sg:
+ security_group_id = sg['security_group']['id']
+ with contextlib.nested(self.security_group_rule(security_group_id,
+ direction='egress',
+ port_range_min=22,
+ port_range_max=22),
+ self.security_group_rule(security_group_id,
+ direction='egress',
+ port_range_min=23,
+ port_range_max=23),
+ self.security_group_rule(security_group_id,
+ direction='egress',
+ port_range_min=24,
+ port_range_max=24)
+ ) as (sgr1, sgr2, sgr3):
+ self._test_list_with_pagination_reverse(
+ 'security-group-rule', (sgr3, sgr2, sgr1),
+ ('port_range_max', 'desc'), 2, 2,
+ query_params='direction=egress')
+
def test_update_port_with_security_group(self):
with self.network() as n:
with self.subnet(n):
from quantum.db import l3_rpc_agent_api
from quantum.db import models_v2
from quantum.extensions import l3
-from quantum import manager
+from quantum.manager import QuantumManager
from quantum.openstack.common import log as logging
from quantum.openstack.common.notifier import api as notifier_api
from quantum.openstack.common.notifier import test_notifier
plugin = 'quantum.extensions.l3.RouterPluginBase'
# Ensure 'stale' patched copies of the plugin are never returned
- manager.QuantumManager._instance = None
+ QuantumManager._instance = None
# Ensure existing ExtensionManager is not used
extensions.PluginAwareExtensionManager._instance = None
# Update the plugin and extensions path
cfg.CONF.set_override('core_plugin', plugin)
+ cfg.CONF.set_override('allow_pagination', True)
+ cfg.CONF.set_override('allow_sorting', True)
self._plugin_patcher = mock.patch(plugin, autospec=True)
self.plugin = self._plugin_patcher.start()
+ instances = self.plugin.return_value
+ instances._RouterPluginBase__native_pagination_support = True
+ instances._RouterPluginBase__native_sorting_support = True
# Instantiate mock plugin and enable the 'router' extension
- manager.QuantumManager.get_plugin().supported_extension_aliases = (
+ QuantumManager.get_plugin().supported_extension_aliases = (
["router"])
-
ext_mgr = L3TestExtensionManager()
self.ext_mdw = test_extensions.setup_extensions_middleware(ext_mgr)
self.api = webtest.TestApp(self.ext_mdw)
res = self.api.get(_get_path('routers', fmt=self.fmt))
instance.get_routers.assert_called_with(mock.ANY, fields=mock.ANY,
- filters=mock.ANY)
+ filters=mock.ANY,
+ sorts=mock.ANY,
+ limit=mock.ANY,
+ marker=mock.ANY,
+ page_reverse=mock.ANY)
self.assertEqual(res.status_int, exc.HTTPOk.code)
res = self.deserialize(res)
self.assertTrue('routers' in res)
# This plugin class is just for testing
class TestL3NatPlugin(db_base_plugin_v2.QuantumDbPluginV2,
l3_db.L3_NAT_db_mixin):
+
+ __native_pagination_support = True
+ __native_sorting_support = True
+
supported_extension_aliases = ["router"]
def create_network(self, context, network):
self._extend_network_dict_l3(context, net)
return self._fields(net, fields)
- def get_networks(self, context, filters=None, fields=None):
- nets = super(TestL3NatPlugin, self).get_networks(context, filters,
- None)
+ def get_networks(self, context, filters=None, fields=None,
+ sorts=[], limit=None, marker=None,
+ page_reverse=False):
+ nets = super(TestL3NatPlugin, self).get_networks(
+ context, filters=filters, fields=fields, sorts=sorts, limit=limit,
+ marker=marker, page_reverse=page_reverse)
for net in nets:
self._extend_network_dict_l3(context, net)
nets = self._filter_nets_l3(context, nets, filters)
self._test_list_resources('router', [],
query_params=query_params)
+ def test_router_list_with_sort(self):
+ with contextlib.nested(self.router(name='router1'),
+ self.router(name='router2'),
+ self.router(name='router3')
+ ) as (router1, router2, router3):
+ self._test_list_with_sort('router', (router3, router2, router1),
+ [('name', 'desc')])
+
+ def test_router_list_with_pagination(self):
+ with contextlib.nested(self.router(name='router1'),
+ self.router(name='router2'),
+ self.router(name='router3')
+ ) as (router1, router2, router3):
+ self._test_list_with_pagination('router',
+ (router1, router2, router3),
+ ('name', 'asc'), 2, 2)
+
+ def test_router_list_with_pagination_reverse(self):
+ with contextlib.nested(self.router(name='router1'),
+ self.router(name='router2'),
+ self.router(name='router3')
+ ) as (router1, router2, router3):
+ self._test_list_with_pagination_reverse('router',
+ (router1, router2,
+ router3),
+ ('name', 'asc'), 2, 2)
+
def test_router_update(self):
rname1 = "yourrouter"
rname2 = "nachorouter"
uuidutils.generate_uuid(), 'iamnotnanip')
self.assertEqual(res.status_int, 400)
+ def test_floatingip_list_with_sort(self):
+ with contextlib.nested(self.subnet(cidr="10.0.0.0/24"),
+ self.subnet(cidr="11.0.0.0/24"),
+ self.subnet(cidr="12.0.0.0/24")
+ ) as (s1, s2, s3):
+ network_id1 = s1['subnet']['network_id']
+ network_id2 = s2['subnet']['network_id']
+ network_id3 = s3['subnet']['network_id']
+ self._set_net_external(network_id1)
+ self._set_net_external(network_id2)
+ self._set_net_external(network_id3)
+ fp1 = self._make_floatingip(self.fmt, network_id1)
+ fp2 = self._make_floatingip(self.fmt, network_id2)
+ fp3 = self._make_floatingip(self.fmt, network_id3)
+ try:
+ self._test_list_with_sort('floatingip', (fp3, fp2, fp1),
+ [('floating_ip_address', 'desc')])
+ finally:
+ self._delete('floatingips', fp1['floatingip']['id'])
+ self._delete('floatingips', fp2['floatingip']['id'])
+ self._delete('floatingips', fp3['floatingip']['id'])
+
+ def test_floatingip_list_with_pagination(self):
+ with contextlib.nested(self.subnet(cidr="10.0.0.0/24"),
+ self.subnet(cidr="11.0.0.0/24"),
+ self.subnet(cidr="12.0.0.0/24")
+ ) as (s1, s2, s3):
+ network_id1 = s1['subnet']['network_id']
+ network_id2 = s2['subnet']['network_id']
+ network_id3 = s3['subnet']['network_id']
+ self._set_net_external(network_id1)
+ self._set_net_external(network_id2)
+ self._set_net_external(network_id3)
+ fp1 = self._make_floatingip(self.fmt, network_id1)
+ fp2 = self._make_floatingip(self.fmt, network_id2)
+ fp3 = self._make_floatingip(self.fmt, network_id3)
+ try:
+ self._test_list_with_pagination(
+ 'floatingip', (fp1, fp2, fp3),
+ ('floating_ip_address', 'asc'), 2, 2)
+ finally:
+ self._delete('floatingips', fp1['floatingip']['id'])
+ self._delete('floatingips', fp2['floatingip']['id'])
+ self._delete('floatingips', fp3['floatingip']['id'])
+
+ def test_floatingip_list_with_pagination_reverse(self):
+ with contextlib.nested(self.subnet(cidr="10.0.0.0/24"),
+ self.subnet(cidr="11.0.0.0/24"),
+ self.subnet(cidr="12.0.0.0/24")
+ ) as (s1, s2, s3):
+ network_id1 = s1['subnet']['network_id']
+ network_id2 = s2['subnet']['network_id']
+ network_id3 = s3['subnet']['network_id']
+ self._set_net_external(network_id1)
+ self._set_net_external(network_id2)
+ self._set_net_external(network_id3)
+ fp1 = self._make_floatingip(self.fmt, network_id1)
+ fp2 = self._make_floatingip(self.fmt, network_id2)
+ fp3 = self._make_floatingip(self.fmt, network_id3)
+ try:
+ self._test_list_with_pagination_reverse(
+ 'floatingip', (fp1, fp2, fp3),
+ ('floating_ip_address', 'asc'), 2, 2)
+ finally:
+ self._delete('floatingips', fp1['floatingip']['id'])
+ self._delete('floatingips', fp2['floatingip']['id'])
+ self._delete('floatingips', fp3['floatingip']['id'])
+
def test_floatingip_delete_router_intf_with_subnet_id_returns_409(self):
found = False
with self.floatingip_with_assoc():
self.assertEqual(len(body['networks']), 1)
def test_get_network_succeeds_without_filter(self):
- plugin = manager.QuantumManager.get_plugin()
+ plugin = QuantumManager.get_plugin()
ctx = context.Context(None, None, is_admin=True)
result = plugin.get_networks(ctx, filters=None)
self.assertEqual(result, [])
def test_network_filter_hook_admin_context(self):
- plugin = manager.QuantumManager.get_plugin()
+ plugin = QuantumManager.get_plugin()
ctx = context.Context(None, None, is_admin=True)
model = models_v2.Network
conditions = plugin._network_filter_hook(ctx, model, [])
self.assertEqual(conditions, [])
def test_network_filter_hook_nonadmin_context(self):
- plugin = manager.QuantumManager.get_plugin()
+ plugin = QuantumManager.get_plugin()
ctx = context.Context('edinson', 'cavani')
model = models_v2.Network
txt = "externalnetworks.network_id IS NOT NULL"