*.pid
*.log
quantum/vcsversion.py
-.ropeproject
\ No newline at end of file
+.ropeproject
from quantum.api import api_common as common
from quantum.api import faults
from quantum.api.views import networks as networks_view
+from quantum.api.views import filters
from quantum.common import exceptions as exception
LOG = logging.getLogger('quantum.api.networks')
# concerning logical ports as well.
network = self._plugin.get_network_details(
tenant_id, network_id)
- port_list = self._plugin.get_all_ports(
- tenant_id, network_id)
+ # Doing this in the API is inefficient
+ # TODO(salvatore-orlando): This should be fixed with Bug #834012
+ # Don't pass filter options
+ port_list = self._plugin.get_all_ports(tenant_id, network_id)
ports_data = [self._plugin.get_port_details(
tenant_id, network_id, port['port-id'])
for port in port_list]
return dict(network=result)
def _items(self, request, tenant_id, net_details=False):
- """ Returns a list of networks. """
- networks = self._plugin.get_all_networks(tenant_id)
+ """ Returns a list of networks.
+ Ideally, the plugin would perform filtering,
+ returning only the items matching filters specified
+ on the request query string.
+ However, plugins are not required to support filtering.
+ In this case, this function will filter the complete list
+ of networks returned by the plugin
+
+ """
+ filter_opts = {}
+ filter_opts.update(request.str_GET)
+ networks = self._plugin.get_all_networks(tenant_id,
+ filter_opts=filter_opts)
+ # Inefficient, API-layer filtering
+ # will be performed only for the filters not implemented by the plugin
+ # NOTE(salvatore-orlando): the plugin is supposed to leave only filters
+ # it does not implement in filter_opts
+ networks = filters.filter_networks(networks,
+ self._plugin,
+ tenant_id,
+ filter_opts)
builder = networks_view.get_view_builder(request, self.version)
result = [builder.build(network, net_details)['network']
for network in networks]
import logging
from quantum.api import api_common as common
+from quantum.api.views import filters
from quantum.api.views import ports as ports_view
from quantum.common import exceptions as exception
def _items(self, request, tenant_id, network_id,
port_details=False):
- """ Returns a list of ports. """
- port_list = self._plugin.get_all_ports(tenant_id, network_id)
+ """ Returns a list of ports.
+ Ideally, the plugin would perform filtering,
+ returning only the items matching filters specified
+ on the request query string.
+ However, plugins are not required to support filtering.
+ In this case, this function will filter the complete list
+ of ports returned by the plugin
+ """
+ filter_opts = {}
+ filter_opts.update(request.str_GET)
+ port_list = self._plugin.get_all_ports(tenant_id,
+ network_id,
+ filter_opts=filter_opts)
+
builder = ports_view.get_view_builder(request, self.version)
# Load extra data for ports if required.
+ # This can be inefficient.
+ # TODO(salvatore-orlando): the fix for bug #834012 should deal with it
if port_details:
port_list_detail = \
[self._plugin.get_port_details(
for port in port_list]
port_list = port_list_detail
+ # Perform manual filtering if not supported by plugin
+ # Inefficient, API-layer filtering
+ # will be performed only if the plugin does
+ # not support filtering
+ # NOTE(salvatore-orlando): the plugin is supposed to leave only filters
+ # it does not implement in filter_opts
+ port_list = filters.filter_ports(port_list, self._plugin,
+ tenant_id, network_id,
+ filter_opts)
+
result = [builder.build(port, port_details)['port']
for port in port_list]
return dict(ports=result)
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Citrix Systems
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+import logging
+
+
+LOG = logging.getLogger('quantum.api.views.filters')
+
+
+def _load_network_ports_details(network, **kwargs):
+ plugin = kwargs.get('plugin', None)
+ tenant_id = kwargs.get('tenant_id', None)
+ #load network details only if required
+ if not 'net-ports' in network:
+ # Don't pass filter options, don't care about unused filters
+ port_list = plugin.get_all_ports(tenant_id, network['net-id'])
+ ports_data = [plugin.get_port_details(
+ tenant_id, network['net-id'],
+ port['port-id'])
+ for port in port_list]
+ network['net-ports'] = ports_data
+
+
+def _filter_network_by_name(network, name, **kwargs):
+ return network.get('net-name', None) == name
+
+
+def _filter_network_with_operational_port(network, port_op_status,
+ **kwargs):
+ _load_network_ports_details(network, **kwargs)
+ return any([port['port-op-status'] == port_op_status
+ for port in network['net-ports']])
+
+
+def _filter_network_with_active_port(network, port_state, **kwargs):
+ _load_network_ports_details(network, **kwargs)
+ return any([port['port-state'] == port_state
+ for port in network['net-ports']])
+
+
+def _filter_network_has_interface(network, has_interface, **kwargs):
+ _load_network_ports_details(network, **kwargs)
+ # convert to bool
+ match_has_interface = has_interface.lower() == 'true'
+ really_has_interface = any([port['attachment'] is not None
+ for port in network['net-ports']])
+ return match_has_interface == really_has_interface
+
+
+def _filter_network_by_port(network, port_id, **kwargs):
+ _load_network_ports_details(network, **kwargs)
+ return any([port['port-id'] == port_id
+ for port in network['net-ports']])
+
+
+def _filter_network_by_interface(network, interface_id, **kwargs):
+ _load_network_ports_details(network, **kwargs)
+ return any([port.get('attachment', None) == interface_id
+ for port in network['net-ports']])
+
+
+def _filter_port_by_state(port, state, **kwargs):
+ return port.get('port-state', None) == state
+
+
+def _filter_network_by_op_status(network, op_status, **kwargs):
+ return network.get('net-op-status', None) == op_status
+
+
+def _filter_port_by_op_status(port, op_status, **kwargs):
+ return port.get('port-op-status', None) == op_status
+
+
+def _filter_port_by_interface(port, interface_id, **kwargs):
+ return port.get('attachment', None) == interface_id
+
+
+def _filter_port_has_interface(port, has_interface, **kwargs):
+ # convert to bool
+ match_has_interface = has_interface.lower() == 'true'
+ really_has_interface = 'attachment' in port and port['attachment'] != None
+ return match_has_interface == really_has_interface
+
+
+def _do_filtering(items, filters, filter_opts, plugin,
+ tenant_id, network_id=None):
+ filtered_items = []
+ for item in items:
+ is_filter_match = False
+ for flt in filters:
+ if flt in filter_opts:
+ is_filter_match = filters[flt](item,
+ filter_opts[flt],
+ plugin=plugin,
+ tenant_id=tenant_id,
+ network_id=network_id)
+ if not is_filter_match:
+ break
+ if is_filter_match:
+ filtered_items.append(item)
+ return filtered_items
+
+
+def filter_networks(networks, plugin, tenant_id, filter_opts):
+ # Do filtering only if the plugin supports it
+ # and if filtering options have been specific
+ if len(filter_opts) == 0:
+ return networks
+
+ # load filter functions
+ filters = {
+ 'name': _filter_network_by_name,
+ 'op-status': _filter_network_by_op_status,
+ 'port-op-status': _filter_network_with_operational_port,
+ 'port-state': _filter_network_with_active_port,
+ 'has-attachment': _filter_network_has_interface,
+ 'attachment': _filter_network_by_interface,
+ 'port': _filter_network_by_port}
+ # filter networks
+ return _do_filtering(networks, filters, filter_opts, plugin, tenant_id)
+
+
+def filter_ports(ports, plugin, tenant_id, network_id, filter_opts):
+ # Do filtering only if the plugin supports it
+ # and if filtering options have been specific
+ if len(filter_opts) == 0:
+ return ports
+
+ # load filter functions
+ filters = {
+ 'state': _filter_port_by_state,
+ 'op-status': _filter_port_by_op_status,
+ 'has-attachment': _filter_port_has_interface,
+ 'attachment': _filter_port_by_interface}
+ # port details are need for filtering
+ ports = [plugin.get_port_details(tenant_id, network_id,
+ port['port-id'])
+ for port in ports]
+ # filter ports
+ return _do_filtering(ports,
+ filters,
+ filter_opts,
+ plugin,
+ tenant_id,
+ network_id)
"""
Core API implementation
"""
- def get_all_networks(self, tenant_id):
+ def get_all_networks(self, tenant_id, **kwargs):
"""
Returns a dictionary containing all
<network_uuid, network_name> for
[])
return net_dict
- def get_all_ports(self, tenant_id, net_id):
+ def get_all_ports(self, tenant_id, net_id, **kwargs):
"""
Retrieves all port identifiers belonging to the
specified Virtual Network.
# % (vlan_id, network_id))
self.vmap.set_vlan(vlan_id, network_id)
- def get_all_networks(self, tenant_id):
+ def get_all_networks(self, tenant_id, **kwargs):
nets = []
for x in db.network_list(tenant_id):
LOG.debug("Adding network: %s" % x.uuid)
'net-id': port.network_id,
'attachment': port.interface_id}
- def get_all_ports(self, tenant_id, net_id):
+ def get_all_ports(self, tenant_id, net_id, **kwargs):
ids = []
ports = db.port_list(net_id)
+ # This plugin does not perform filtering at the moment
return [{'port-id': str(p.uuid)} for p in ports]
def create_port(self, tenant_id, net_id, port_state=None, **kwargs):
att_id=port['interface_id'],
att_port_id=port['uuid'])
- def get_all_networks(self, tenant_id):
+ def get_all_networks(self, tenant_id, **kwargs):
"""
Returns a dictionary containing all
<network_uuid, network_name> for
the specified tenant.
"""
LOG.debug("FakePlugin.get_all_networks() called")
+ filter_opts = kwargs.get('filter_opts', None)
+ if not filter_opts is None and len(filter_opts) > 0:
+ LOG.debug("filtering options were passed to the plugin"
+ "but the Fake plugin does not support them")
nets = []
for net in db.network_list(tenant_id):
net_item = {'net-id': str(net.uuid),
net = db.network_update(net_id, tenant_id, **kwargs)
return net
- def get_all_ports(self, tenant_id, net_id):
+ def get_all_ports(self, tenant_id, net_id, **kwargs):
"""
Retrieves all port identifiers belonging to the
specified Virtual Network.
"""
LOG.debug("FakePlugin.get_all_ports() called")
+ filter_opts = kwargs.get('filter_opts', None)
+ if not filter_opts is None and len(filter_opts) > 0:
+ LOG.debug("filtering options were passed to the plugin"
+ "but the Fake plugin does not support them")
port_ids = []
ports = db.port_list(net_id)
for x in ports:
__metaclass__ = ABCMeta
@abstractmethod
- def get_all_networks(self, tenant_id):
+ def get_all_networks(self, tenant_id, **kwargs):
"""
Returns a dictionary containing all
<network_uuid, network_name> for
the specified tenant.
+ :param tenant_id: unique identifier for the tenant whose networks
+ are being retrieved by this method
+ :param **kwargs: options to be passed to the plugin. The following
+ keywork based-options can be specified:
+ filter_opts - options for filtering network list
:returns: a list of mapping sequences with the following signature:
[ {'net-id': uuid that uniquely identifies
the particular quantum network,
pass
@abstractmethod
- def get_all_ports(self, tenant_id, net_id):
+ def get_all_ports(self, tenant_id, net_id, **kwargs):
"""
Retrieves all port identifiers belonging to the
specified Virtual Network.
-
+ :param tenant_id: unique identifier for the tenant for which this
+ method is going to retrieve ports
+ :param net_id: unique identifiers for the network whose ports are
+ about to be retrieved
+ :param **kwargs: options to be passed to the plugin. The following
+ keywork based-options can be specified:
+ filter_opts - options for filtering network list
:returns: a list of mapping sequences with the following signature:
[ {'port-id': uuid representing a particular port
on the specified quantum network
class AbstractAPITest(unittest.TestCase):
- """Abstract base class for Quantum API unit tests
- Defined according to operations defined for Quantum API v1.0
-
- """
+ """ Base class definiting some methods for API tests """
def _deserialize_net_response(self, content_type, response):
network_data = self._net_deserializers[content_type].\
if expected_res_status in (200, 202):
port_data = self._deserialize_port_response(content_type,
port_res)
- LOG.debug("PORT RESPONSE:%s", port_res.body)
- LOG.debug("PORT DATA:%s", port_data)
return port_data['port']['id']
+ def _set_attachment(self, network_id, port_id, interface_id, fmt,
+ expected_res_status=204):
+ put_attachment_req = testlib.put_attachment_request(self.tenant_id,
+ network_id,
+ port_id,
+ interface_id,
+ fmt)
+ put_attachment_res = put_attachment_req.get_response(self.api)
+ self.assertEqual(put_attachment_res.status_int, expected_res_status)
+
+ def setUp(self, api_router_klass, xml_metadata_dict):
+ options = {}
+ options['plugin_provider'] = test_config['plugin_name']
+ api_router_cls = utils.import_class(api_router_klass)
+ self.api = api_router_cls(options)
+ self.tenant_id = "test_tenant"
+ self.network_name = "test_network"
+
+ # Prepare XML & JSON deserializers
+ net_xml_deserializer = XMLDeserializer(xml_metadata_dict[NETS])
+ port_xml_deserializer = XMLDeserializer(xml_metadata_dict[PORTS])
+ att_xml_deserializer = XMLDeserializer(xml_metadata_dict[ATTS])
+
+ json_deserializer = JSONDeserializer()
+
+ self._net_deserializers = {
+ 'application/xml': net_xml_deserializer,
+ 'application/json': json_deserializer,
+ }
+ self._port_deserializers = {
+ 'application/xml': port_xml_deserializer,
+ 'application/json': json_deserializer,
+ }
+ self._att_deserializers = {
+ 'application/xml': att_xml_deserializer,
+ 'application/json': json_deserializer,
+ }
+
+ def tearDown(self):
+ """Clear the test environment"""
+ # Remove database contents
+ db.clear_db()
+
+
+class BaseAPIOperationsTest(AbstractAPITest):
+ """Abstract base class for Quantum API unit tests
+ Defined according to operations defined for Quantum API v1.0
+ """
+
def _test_create_network(self, fmt):
LOG.debug("_test_create_network - fmt:%s - START", fmt)
content_type = "application/%s" % fmt
LOG.debug("_test_unparsable_data - " \
"fmt:%s - END", fmt)
- def setUp(self, api_router_klass, xml_metadata_dict):
- options = {}
- options['plugin_provider'] = test_config['plugin_name']
- api_router_cls = utils.import_class(api_router_klass)
- self.api = api_router_cls(options)
- self.tenant_id = "test_tenant"
- self.network_name = "test_network"
-
- # Prepare XML & JSON deserializers
- net_xml_deserializer = XMLDeserializer(xml_metadata_dict[NETS])
- port_xml_deserializer = XMLDeserializer(xml_metadata_dict[PORTS])
- att_xml_deserializer = XMLDeserializer(xml_metadata_dict[ATTS])
-
- json_deserializer = JSONDeserializer()
-
- self._net_deserializers = {
- 'application/xml': net_xml_deserializer,
- 'application/json': json_deserializer,
- }
- self._port_deserializers = {
- 'application/xml': port_xml_deserializer,
- 'application/json': json_deserializer,
- }
- self._att_deserializers = {
- 'application/xml': att_xml_deserializer,
- 'application/json': json_deserializer,
- }
-
- def tearDown(self):
- """Clear the test environment"""
- # Remove database contents
- db.clear_db()
-
def test_list_networks_json(self):
self._test_list_networks('json')
# under the License.
# @author: Salvatore Orlando, Citrix Systems
+
+import logging
from webob import exc
import quantum.api.attachments as atts
import quantum.api.networks as nets
import quantum.api.ports as ports
import quantum.tests.unit._test_api as test_api
+import quantum.tests.unit.testlib_api as testlib
from quantum.common.test_lib import test_config
-class APITestV10(test_api.AbstractAPITest):
+LOG = logging.getLogger('quantum.tests.test_api')
+
+
+class APITestV10(test_api.BaseAPIOperationsTest):
def assert_network(self, **kwargs):
self.assertEqual({'id': kwargs['id'],
self._already_attached_code = 440
-class APITestV11(test_api.AbstractAPITest):
+class APITestV11(test_api.BaseAPIOperationsTest):
def assert_network(self, **kwargs):
self.assertEqual({'id': kwargs['id'],
self._port_state_invalid_code = exc.HTTPBadRequest.code
self._port_in_use_code = exc.HTTPConflict.code
self._already_attached_code = exc.HTTPConflict.code
+
+
+class APIFiltersTest(test_api.AbstractAPITest):
+ """ Test case for API filters.
+ Uses controller for API v1.1
+ """
+
+ def _do_filtered_network_list_request(self, flt):
+ list_network_req = testlib.network_list_request(self.tenant_id,
+ self.fmt,
+ query_string=flt)
+ list_network_res = list_network_req.get_response(self.api)
+ self.assertEqual(list_network_res.status_int, 200)
+ network_data = self._net_deserializers[self.content_type].\
+ deserialize(list_network_res.body)['body']
+ return network_data
+
+ def _do_filtered_port_list_request(self, flt, network_id):
+ list_port_req = testlib.port_list_request(self.tenant_id,
+ network_id,
+ self.fmt,
+ query_string=flt)
+ list_port_res = list_port_req.get_response(self.api)
+ self.assertEqual(list_port_res.status_int, 200)
+ port_data = self._port_deserializers[self.content_type].\
+ deserialize(list_port_res.body)['body']
+ return port_data
+
+ def setUp(self):
+ super(APIFiltersTest, self).setUp('quantum.api.APIRouterV11',
+ {test_api.NETS: nets.ControllerV11._serialization_metadata,
+ test_api.PORTS: ports.ControllerV11._serialization_metadata,
+ test_api.ATTS: atts.ControllerV11._serialization_metadata})
+ self.net_op_status = test_config.get('default_net_op_status',
+ 'UNKNOWN')
+ self.port_op_status = test_config.get('default_port_op_status',
+ 'UNKNOWN')
+ self.fmt = "xml"
+ self.content_type = "application/%s" % self.fmt
+ # create data for validating filters
+ # Create network "test-1"
+ self.net1_id = self._create_network(self.fmt, name="test-1")
+ # Add 2 ports, 1 ACTIVE, 1 DOWN
+ self.port11_id = self._create_port(self.net1_id, "ACTIVE", self.fmt)
+ self.port12_id = self._create_port(self.net1_id, "DOWN", self.fmt)
+ # Put attachment "test-1-att" in active port
+ self._set_attachment(self.net1_id,
+ self.port11_id,
+ "test-1-att",
+ self.fmt)
+ # Create network "test-2"
+ # Add 2 ports, 2 ACTIVE, 0 DOWN
+ self.net2_id = self._create_network(self.fmt, name="test-2")
+ self.port21_id = self._create_port(self.net2_id, "ACTIVE", self.fmt)
+ self.port22_id = self._create_port(self.net2_id, "ACTIVE", self.fmt)
+
+ def test_network_name_filter(self):
+ LOG.debug("test_network_name_filter - START")
+ flt = "name=test-1"
+ network_data = self._do_filtered_network_list_request(flt)
+ # Check network count: should return 1
+ self.assertEqual(len(network_data['networks']), 1)
+ self.assertEqual(network_data['networks'][0]['id'], self.net1_id)
+
+ flt = "name=non-existent"
+ network_data = self._do_filtered_network_list_request(flt)
+ # Check network count: should return 0
+ self.assertEqual(len(network_data['networks']), 0)
+
+ LOG.debug("test_network_name_filter - END")
+
+ def test_network_op_status_filter(self):
+ LOG.debug("test_network_op_status_filter - START")
+ # First filter for networks in default status
+ flt = "op-status=%s" % self.net_op_status
+ network_data = self._do_filtered_network_list_request(flt)
+ # Check network count: should return 2
+ self.assertEqual(len(network_data['networks']), 2)
+
+ # And then for networks in 'DOWN' status
+ flt = "op-status=DOWN"
+ network_data = self._do_filtered_network_list_request(flt)
+ # Check network count: should return 0
+ self.assertEqual(len(network_data['networks']), 0)
+ LOG.debug("test_network_op_status_filter - END")
+
+ def test_network_port_op_status_filter(self):
+ LOG.debug("test_network_port_op_status_filter - START")
+ # First filter for networks with ports in default op status
+ flt = "port-op-status=%s" % self.port_op_status
+ network_data = self._do_filtered_network_list_request(flt)
+ # Check network count: should return 2
+ self.assertEqual(len(network_data['networks']), 2)
+ LOG.debug("test_network_port_op_status_filter - END")
+
+ def test_network_port_state_filter(self):
+ LOG.debug("test_network_port_state_filter - START")
+ # First filter for networks with ports 'ACTIVE'
+ flt = "port-state=ACTIVE"
+ network_data = self._do_filtered_network_list_request(flt)
+ # Check network count: should return 2
+ self.assertEqual(len(network_data['networks']), 2)
+
+ # And then for networks with ports in 'DOWN' admin state
+ flt = "port-state=DOWN"
+ network_data = self._do_filtered_network_list_request(flt)
+ # Check network count: should return 1
+ self.assertEqual(len(network_data['networks']), 1)
+ LOG.debug("test_network_port_state_filter - END")
+
+ def test_network_has_attachment_filter(self):
+ LOG.debug("test_network_has_attachment_filter - START")
+ # First filter for networks with ports 'ACTIVE'
+ flt = "has-attachment=True"
+ network_data = self._do_filtered_network_list_request(flt)
+ # Check network count: should return 1
+ self.assertEqual(len(network_data['networks']), 1)
+
+ # And then for networks with ports in 'DOWN' admin state
+ flt = "has-attachment=False"
+ network_data = self._do_filtered_network_list_request(flt)
+ # Check network count: should return 1
+ self.assertEqual(len(network_data['networks']), 1)
+ LOG.debug("test_network_has_attachment_filter - END")
+
+ def test_network_port_filter(self):
+ LOG.debug("test_network_port_filter - START")
+
+ flt = "port=%s" % self.port11_id
+ network_data = self._do_filtered_network_list_request(flt)
+ # Check network count: should return 1
+ self.assertEqual(len(network_data['networks']), 1)
+ self.assertEqual(network_data['networks'][0]['id'], self.net1_id)
+
+ flt = "port=%s" % self.port21_id
+ network_data = self._do_filtered_network_list_request(flt)
+ # Check network count: should return 1
+ self.assertEqual(len(network_data['networks']), 1)
+ self.assertEqual(network_data['networks'][0]['id'], self.net2_id)
+ LOG.debug("test_network_port_filter - END")
+
+ def test_network_attachment_filter(self):
+ LOG.debug("test_network_attachment_filter - START")
+
+ flt = "attachment=test-1-att"
+ network_data = self._do_filtered_network_list_request(flt)
+ # Check network count: should return 1
+ self.assertEqual(len(network_data['networks']), 1)
+ self.assertEqual(network_data['networks'][0]['id'], self.net1_id)
+
+ flt = "attachment=non-existent"
+ network_data = self._do_filtered_network_list_request(flt)
+ # Check network count: should return 0
+ self.assertEqual(len(network_data['networks']), 0)
+ LOG.debug("test_network_attachment_filter - END")
+
+ def test_network_multiple_filters(self):
+ LOG.debug("test_network_multiple_filters - START")
+ # Add some data for having more fun
+ another_net_id = self._create_network(self.fmt, name="test-1")
+ # Add 1 ACTIVE port
+ self._create_port(another_net_id, "ACTIVE", self.fmt)
+ # Do the filtering
+ flt = "name=test-1&port-state=ACTIVE&attachment=test-1-att"
+ network_data = self._do_filtered_network_list_request(flt)
+ # Check network count: should return 1
+ self.assertEqual(len(network_data['networks']), 1)
+ self.assertEqual(network_data['networks'][0]['id'], self.net1_id)
+ LOG.debug("test_network_multiple_filters - END")
+
+ def test_port_state_filter(self):
+ LOG.debug("test_port_state_filter - START")
+ # First filter for 'ACTIVE' ports in 1st network
+ flt = "state=ACTIVE"
+ port_data = self._do_filtered_port_list_request(flt, self.net1_id)
+ # Check port count: should return 1
+ self.assertEqual(len(port_data['ports']), 1)
+
+ # And then in 2nd network
+ port_data = self._do_filtered_port_list_request(flt, self.net2_id)
+ # Check port count: should return 2
+ self.assertEqual(len(port_data['ports']), 2)
+ LOG.debug("test_port_state_filter - END")
+
+ def test_port_op_status_filter(self):
+ LOG.debug("test_port_op_status_filter - START")
+ # First filter for 'UP' ports in 1st network
+ flt = "op-status=%s" % self.port_op_status
+ port_data = self._do_filtered_port_list_request(flt, self.net1_id)
+ # Check port count: should return 2
+ self.assertEqual(len(port_data['ports']), 2)
+ LOG.debug("test_port_op_status_filter - END")
+
+ def test_port_has_attachment_filter(self):
+ LOG.debug("test_port_has_attachment_filter - START")
+ # First search for ports with attachments in 1st network
+ flt = "has-attachment=True"
+ port_data = self._do_filtered_port_list_request(flt, self.net1_id)
+ # Check port count: should return 1
+ self.assertEqual(len(port_data['ports']), 1)
+ self.assertEqual(port_data['ports'][0]['id'], self.port11_id)
+
+ # And then for ports without attachment in 2nd network
+ flt = "has-attachment=False"
+ port_data = self._do_filtered_port_list_request(flt, self.net2_id)
+ # Check port count: should return 2
+ self.assertEqual(len(port_data['ports']), 2)
+ LOG.debug("test_port_has_attachment_filter - END")
+
+ def test_port_attachment_filter(self):
+ LOG.debug("test_port_attachment_filter - START")
+ # First search for ports with attachments in 1st network
+ flt = "attachment=test-1-att"
+ port_data = self._do_filtered_port_list_request(flt, self.net1_id)
+ # Check port count: should return 1
+ self.assertEqual(len(port_data['ports']), 1)
+ self.assertEqual(port_data['ports'][0]['id'], self.port11_id)
+
+ # And then for a non-existent attachment in 2nd network
+ flt = "attachment=non-existent"
+ port_data = self._do_filtered_port_list_request(flt, self.net2_id)
+ # Check port count: should return 0
+ self.assertEqual(len(port_data['ports']), 0)
+ LOG.debug("test_port_has_attachment_filter - END")
+
+ def test_port_multiple_filters(self):
+ LOG.debug("test_port_multiple_filters - START")
+ flt = "op-status=%s&state=DOWN" % self.port_op_status
+ port_data = self._do_filtered_port_list_request(flt, self.net1_id)
+ # Check port count: should return 1
+ self.assertEqual(len(port_data['ports']), 1)
+ self.assertEqual(port_data['ports'][0]['id'], self.port12_id)
+
+ flt = "state=ACTIVE&attachment=test-1-att"
+ port_data = self._do_filtered_port_list_request(flt, self.net1_id)
+ # Check port count: should return 1
+ self.assertEqual(len(port_data['ports']), 1)
+ self.assertEqual(port_data['ports'][0]['id'], self.port11_id)
+
+ flt = "state=ACTIVE&has-attachment=False"
+ port_data = self._do_filtered_port_list_request(flt, self.net2_id)
+ # Check port count: should return 2
+ self.assertEqual(len(port_data['ports']), 2)
+ LOG.debug("test_port_multiple_filters - END")
from quantum.common.serializer import Serializer
-def create_request(path, body, content_type, method='GET'):
- req = webob.Request.blank(path)
+def create_request(path, body, content_type, method='GET', query_string=None):
+ if query_string:
+ url = "%s?%s" % (path, query_string)
+ else:
+ url = path
+ req = webob.Request.blank(url)
req.method = method
req.headers = {}
req.headers['Accept'] = content_type
return req
-def _network_list_request(tenant_id, format='xml', detail=False):
+def _network_list_request(tenant_id, format='xml', detail=False,
+ query_string=None):
method = 'GET'
detail_str = detail and '/detail' or ''
path = "/tenants/%(tenant_id)s/networks" \
"%(detail_str)s.%(format)s" % locals()
content_type = "application/%s" % format
- return create_request(path, None, content_type, method)
+ return create_request(path, None, content_type, method, query_string)
-def network_list_request(tenant_id, format='xml'):
- return _network_list_request(tenant_id, format)
+def network_list_request(tenant_id, format='xml', query_string=None):
+ return _network_list_request(tenant_id, format, query_string=query_string)
def network_list_detail_request(tenant_id, format='xml'):
return create_request(path, None, content_type, method)
-def _port_list_request(tenant_id, network_id, format='xml', detail=False):
+def _port_list_request(tenant_id, network_id, format='xml',
+ detail=False, query_string=None):
method = 'GET'
detail_str = detail and '/detail' or ''
path = "/tenants/%(tenant_id)s/networks/" \
"%(network_id)s/ports%(detail_str)s.%(format)s" % locals()
content_type = "application/%s" % format
- return create_request(path, None, content_type, method)
+ return create_request(path, None, content_type, method, query_string)
-def port_list_request(tenant_id, network_id, format='xml'):
- return _port_list_request(tenant_id, network_id, format)
+def port_list_request(tenant_id, network_id, format='xml', query_string=None):
+ return _port_list_request(tenant_id,
+ network_id,
+ format,
+ query_string=query_string)
def port_list_detail_request(tenant_id, network_id, format='xml'):