from quantum.api.v2 import attributes
from quantum.api.v2 import resource as wsgi_resource
from quantum.common import exceptions
-from quantum.common import utils
from quantum.openstack.common import cfg
from quantum.openstack.common.notifier import api as notifier_api
from quantum import policy
Returns a dict of lists for the filters:
- check=a&check=b&name=Bob&verbose=True&verbose=other
+ check=a&check=b&name=Bob&
becomes
"""
res = {}
for key in set(request.GET):
- if key in ('verbose', 'fields'):
+ if key == 'fields':
continue
values = [v for v in request.GET.getall(key) if v]
if not attr_info.get(key) and values:
return res
-def _verbose(request):
- """
- Determines the verbose fields for a request
-
- Returns a list of items that are requested to be verbose:
-
- check=a&check=b&name=Bob&verbose=True&verbose=other
-
- returns
-
- [True]
-
- and
-
- check=a&check=b&name=Bob&verbose=other
-
- returns
-
- ['other']
-
- """
- verbose = [utils.boolize(v) for v in request.GET.getall('verbose') if v]
-
- # NOTE(jkoelker) verbose=<bool> trumps all other verbose settings
- if True in verbose:
- return True
- elif False in verbose:
- return False
-
- return verbose
-
-
class Controller(object):
def __init__(self, plugin, collection, resource, attr_info,
# plugin before returning.
original_fields, fields_to_add = self._do_field_list(_fields(request))
kwargs = {'filters': _filters(request, self._attr_info),
- 'verbose': _verbose(request),
'fields': original_fields}
obj_getter = getattr(self._plugin, "get_%s" % self._collection)
obj_list = obj_getter(request.context, **kwargs)
"get_%s" % self._resource,
obj,
plugin=self._plugin)]
-
return {self._collection: [self._view(obj,
fields_to_strip=fields_to_add)
for obj in obj_list]}
def _item(self, request, id, do_authz=False, field_list=None):
"""Retrieves and formats a single element of the requested entity"""
- kwargs = {'verbose': _verbose(request),
- 'fields': field_list}
+ kwargs = {'fields': field_list}
action = "get_%s" % self._resource
obj_getter = getattr(self._plugin, action)
obj = obj_getter(request.context, id, **kwargs)
query = query.filter(model.tenant_id == context.tenant_id)
return query
- def _get_by_id(self, context, model, id, joins=(), verbose=None):
+ def _get_by_id(self, context, model, id):
query = self._model_query(context, model)
- if verbose:
- if verbose and isinstance(verbose, list):
- options = [orm.joinedload(join) for join in joins
- if join in verbose]
- else:
- options = [orm.joinedload(join) for join in joins]
- query = query.options(*options)
return query.filter_by(id=id).one()
- def _get_network(self, context, id, verbose=None):
+ def _get_network(self, context, id):
try:
- network = self._get_by_id(context, models_v2.Network, id,
- joins=('subnets',), verbose=verbose)
+ network = self._get_by_id(context, models_v2.Network, id)
except exc.NoResultFound:
raise q_exc.NetworkNotFound(net_id=id)
except exc.MultipleResultsFound:
raise q_exc.NetworkNotFound(net_id=id)
return network
- def _get_subnet(self, context, id, verbose=None):
+ def _get_subnet(self, context, id):
try:
- subnet = self._get_by_id(context, models_v2.Subnet, id,
- verbose=verbose)
+ subnet = self._get_by_id(context, models_v2.Subnet, id)
except exc.NoResultFound:
raise q_exc.SubnetNotFound(subnet_id=id)
except exc.MultipleResultsFound:
raise q_exc.SubnetNotFound(subnet_id=id)
return subnet
- def _get_port(self, context, id, verbose=None):
+ def _get_port(self, context, id):
try:
- port = self._get_by_id(context, models_v2.Port, id,
- verbose=verbose)
+ port = self._get_by_id(context, models_v2.Port, id)
except exc.NoResultFound:
# NOTE(jkoelker) The PortNotFound exceptions requires net_id
# kwarg in order to set the message correctly
return resource
def _get_collection(self, context, model, dict_func, filters=None,
- fields=None, verbose=None):
+ fields=None):
collection = self._model_query(context, model)
if filters:
for key, value in filters.iteritems():
subnets_qry.filter_by(network_id=id).delete()
context.session.delete(network)
- def get_network(self, context, id, fields=None, verbose=None):
- network = self._get_network(context, id, verbose=verbose)
+ def get_network(self, context, id, fields=None):
+ network = self._get_network(context, id)
return self._make_network_dict(network, fields)
- def get_networks(self, context, filters=None, fields=None, verbose=None):
+ def get_networks(self, context, filters=None, fields=None):
return self._get_collection(context, models_v2.Network,
self._make_network_dict,
- filters=filters, fields=fields,
- verbose=verbose)
+ filters=filters, fields=fields)
def create_subnet_bulk(self, context, subnets):
return self._create_bulk('subnet', context, subnets)
context.session.delete(subnet)
- def get_subnet(self, context, id, fields=None, verbose=None):
- subnet = self._get_subnet(context, id, verbose=verbose)
+ def get_subnet(self, context, id, fields=None):
+ subnet = self._get_subnet(context, id)
return self._make_subnet_dict(subnet, fields)
- def get_subnets(self, context, filters=None, fields=None, verbose=None):
+ def get_subnets(self, context, filters=None, fields=None):
return self._get_collection(context, models_v2.Subnet,
self._make_subnet_dict,
- filters=filters, fields=fields,
- verbose=verbose)
+ filters=filters, fields=fields)
def create_port_bulk(self, context, ports):
return self._create_bulk('port', context, ports)
port_id=id)
context.session.delete(port)
- def get_port(self, context, id, fields=None, verbose=None):
- port = self._get_port(context, id, verbose=verbose)
+ def get_port(self, context, id, fields=None):
+ port = self._get_port(context, id)
return self._make_port_dict(port, fields)
- def get_ports(self, context, filters=None, fields=None, verbose=None):
+ def get_ports(self, context, filters=None, fields=None):
fixed_ips = filters.pop('fixed_ips', []) if filters else []
ports = self._get_collection(context, models_v2.Port,
self._make_port_dict,
- filters=filters, fields=fields,
- verbose=verbose)
+ filters=filters, fields=fields)
if ports and fixed_ips:
filtered_ports = []
class L3_NAT_db_mixin(l3.RouterPluginBase):
"""Mixin class to add L3/NAT router methods to db_plugin_base_v2"""
- def _get_router(self, context, id, verbose=None):
+ def _get_router(self, context, id):
try:
- router = self._get_by_id(context, Router, id, verbose=verbose)
+ router = self._get_by_id(context, Router, id)
except exc.NoResultFound:
raise l3.RouterNotFound(router_id=id)
except exc.MultipleResultsFound:
# thanks to cascading on the ORM relationship
context.session.delete(router)
- def get_router(self, context, id, fields=None, verbose=None):
- router = self._get_router(context, id, verbose=verbose)
+ def get_router(self, context, id, fields=None):
+ router = self._get_router(context, id)
return self._make_router_dict(router, fields)
- def get_routers(self, context, filters=None, fields=None, verbose=None):
+ def get_routers(self, context, filters=None, fields=None):
return self._get_collection(context, Router,
self._make_router_dict,
- filters=filters, fields=fields,
- verbose=verbose)
+ filters=filters, fields=fields)
def _check_for_dup_router_subnet(self, context, router_id,
network_id, subnet_id):
"interface on subnet %(subnet_id)s"
% locals())
- def _get_floatingip(self, context, id, verbose=None):
+ def _get_floatingip(self, context, id):
try:
- floatingip = self._get_by_id(context, FloatingIP, id,
- verbose=verbose)
+ floatingip = self._get_by_id(context, FloatingIP, id)
except exc.NoResultFound:
raise l3.FloatingIPNotFound(floatingip_id=id)
except exc.MultipleResultsFound:
context.session.delete(floatingip)
self.delete_port(context, floatingip['floating_port_id'])
- def get_floatingip(self, context, id, fields=None, verbose=None):
- floatingip = self._get_floatingip(context, id, verbose=verbose)
+ def get_floatingip(self, context, id, fields=None):
+ floatingip = self._get_floatingip(context, id)
return self._make_floatingip_dict(floatingip, fields)
- def get_floatingips(self, context, filters=None, fields=None,
- verbose=None):
+ def get_floatingips(self, context, filters=None, fields=None):
return self._get_collection(context, FloatingIP,
self._make_floatingip_dict,
- filters=filters, fields=fields,
- verbose=verbose)
+ filters=filters, fields=fields)
def disassociate_floatingips(self, context, port_id):
with context.session.begin(subtransactions=True):
pass
@abstractmethod
- def get_router(self, context, id, fields=None, verbose=None):
+ def get_router(self, context, id, fields=None):
pass
@abstractmethod
pass
@abstractmethod
- def get_routers(self, context, filters=None, fields=None, verbose=None):
+ def get_routers(self, context, filters=None, fields=None):
pass
@abstractmethod
pass
@abstractmethod
- def get_floatingip(self, context, id, fields=None, verbose=None):
+ def get_floatingip(self, context, id, fields=None):
pass
@abstractmethod
pass
@abstractmethod
- def get_floatingips(self, context, filters=None, fields=None,
- verbose=None):
+ def get_floatingips(self, context, filters=None, fields=None):
pass
# TODO (Sumit): Check if we need to perform any rollback here
raise
- def get_network(self, context, id, fields=None, verbose=None):
+ def get_network(self, context, id, fields=None):
"""Currently there is no processing required for the device plugins"""
pass
- def get_networks(self, context, filters=None, fields=None, verbose=None):
+ def get_networks(self, context, filters=None, fields=None):
"""Currently there is no processing required for the device plugins"""
pass
# TODO (Sumit): Check if we need to perform any rollback here
raise
- def get_port(self, context, id, fields=None, verbose=None):
+ def get_port(self, context, id, fields=None):
"""Currently there is no processing required for the device plugins"""
pass
- def get_ports(self, context, filters=None, fields=None, verbose=None):
+ def get_ports(self, context, filters=None, fields=None):
"""Currently there is no processing required for the device plugins"""
pass
"""Currently there is no processing required for the device plugins"""
pass
- def get_subnet(self, context, id, fields=None, verbose=None):
+ def get_subnet(self, context, id, fields=None):
"""Currently there is no processing required for the device plugins"""
pass
"""Currently there is no processing required for the device plugins"""
pass
- def get_subnets(self, context, filters=None, fields=None, verbose=None):
+ def get_subnets(self, context, filters=None, fields=None):
"""Currently there is no processing required for the device plugins"""
pass
except:
raise
- def get_network(self, context, id, fields=None, verbose=None):
+ def get_network(self, context, id, fields=None):
"""For this model this method will be delegated to vswitch plugin"""
pass
- def get_networks(self, context, filters=None, fields=None, verbose=None):
+ def get_networks(self, context, filters=None, fields=None):
"""For this model this method will be delegated to vswitch plugin"""
pass
"""For this model this method will be delegated to vswitch plugin"""
pass
- def get_port(self, context, id, fields=None, verbose=None):
+ def get_port(self, context, id, fields=None):
"""For this model this method will be delegated to vswitch plugin"""
pass
- def get_ports(self, context, filters=None, fields=None, verbose=None):
+ def get_ports(self, context, filters=None, fields=None):
"""For this model this method will be delegated to vswitch plugin"""
pass
"""For this model this method will be delegated to vswitch plugin"""
pass
- def get_subnet(self, context, id, fields=None, verbose=None):
+ def get_subnet(self, context, id, fields=None):
"""For this model this method will be delegated to vswitch plugin"""
pass
"""For this model this method will be delegated to vswitch plugin"""
pass
- def get_subnets(self, context, filters=None, fields=None, verbose=None):
+ def get_subnets(self, context, filters=None, fields=None):
"""For this model this method will be delegated to vswitch plugin"""
pass
except:
raise
- def get_network(self, context, id, fields=None, verbose=None):
+ def get_network(self, context, id, fields=None):
"""
Gets a particular network
"""
LOG.debug("get_network() called\n")
- return super(PluginV2, self).get_network(context, id,
- fields, verbose)
+ return super(PluginV2, self).get_network(context, id, fields)
- def get_networks(self, context, filters=None, fields=None, verbose=None):
+ def get_networks(self, context, filters=None, fields=None):
"""
Gets all networks
"""
LOG.debug("get_networks() called\n")
- return super(PluginV2, self).get_networks(context, filters,
- fields, verbose)
+ return super(PluginV2, self).get_networks(context, filters, fields)
def create_port(self, context, port):
"""
if self.agent_rpc:
self.notifier.network_delete(self.rpc_context, id)
- def get_network(self, context, id, fields=None, verbose=None):
- net = super(LinuxBridgePluginV2, self).get_network(context, id,
- None, verbose)
+ def get_network(self, context, id, fields=None):
+ net = super(LinuxBridgePluginV2, self).get_network(context, id, None)
self._extend_network_dict(context, net)
return self._fields(net, fields)
- def get_networks(self, context, filters=None, fields=None, verbose=None):
+ def get_networks(self, context, filters=None, fields=None):
nets = super(LinuxBridgePluginV2, self).get_networks(context, filters,
- None, verbose)
+ None)
for net in nets:
self._extend_network_dict(context, net)
# TODO(rkukura): Filter on extended attributes.
plugin = self._get_plugin(flavor)
return plugin.delete_network(context, id)
- def get_network(self, context, id, fields=None, verbose=None):
+ def get_network(self, context, id, fields=None):
flavor = meta_db_v2.get_flavor_by_network(id)
plugin = self._get_plugin(flavor)
- net = plugin.get_network(context, id, fields, verbose)
+ net = plugin.get_network(context, id, fields)
if not fields or 'flavor:id' in fields:
self._extend_network_dict(context, net)
return net
def get_networks_with_flavor(self, context, filters=None,
- fields=None, verbose=None):
+ fields=None):
collection = self._model_query(context, models_v2.Network)
collection = collection.join(Flavor,
models_v2.Network.id == Flavor.network_id)
collection = collection.filter(column.in_(value))
return [self._make_network_dict(c, fields) for c in collection.all()]
- def get_networks(self, context, filters=None, fields=None, verbose=None):
- nets = self.get_networks_with_flavor(context, filters,
- None, verbose)
- return [self.get_network(context, net['id'],
- fields, verbose)
+ def get_networks(self, context, filters=None, fields=None):
+ nets = self.get_networks_with_flavor(context, filters, None)
+ return [self.get_network(context, net['id'], fields)
for net in nets]
def _get_flavor_by_network_id(self, network_id):
'status': packet_filter['status']}
return self._fields(res, fields)
- def _get_packet_filter(self, context, id, verbose=None):
+ def _get_packet_filter(self, context, id):
try:
- packet_filter = self._get_by_id(context, nmodels.PacketFilter, id,
- verbose=verbose)
+ packet_filter = self._get_by_id(context, nmodels.PacketFilter, id)
except exc.NoResultFound:
raise q_exc.PacketFilterNotFound(id=id)
except exc.MultipleResultsFound:
raise q_exc.PacketFilterNotFound(id=id)
return packet_filter
- def get_packet_filter(self, context, id, fields=None, verbose=None):
- packet_filter = self._get_packet_filter(context, id, verbose=verbose)
+ def get_packet_filter(self, context, id, fields=None):
+ packet_filter = self._get_packet_filter(context, id)
return self._make_packet_filter_dict(packet_filter, fields)
- def get_packet_filters(self, context, filters=None, fields=None,
- verbose=None):
+ def get_packet_filters(self, context, filters=None, fields=None):
return self._get_collection(context,
nmodels.PacketFilter,
self._make_packet_filter_dict,
filters=filters,
- fields=fields,
- verbose=verbose)
+ fields=fields)
def create_packet_filter(self, context, packet_filter):
pf = packet_filter['packet_filter']
LOG.debug("Returning pairs for network: %s" % (pairs))
return pairs
- def get_network(self, context, id, fields=None, verbose=None):
+ def get_network(self, context, id, fields=None):
"""
Retrieves all attributes of the network, NOT including
the ports of that network.
# the context, as with shared networks context.tenant_id and
# network['tenant_id'] might differ on GETs
# goto to the plugin DB and fecth the network
- network = self._get_network(context, id, verbose)
+ network = self._get_network(context, id)
# TODO(salvatore-orlando): verify whether the query on os_tid is
# redundant or not.
if context.is_admin is False:
context.tenant_id, d))
return d
- def get_networks(self, context, filters=None, fields=None, verbose=None):
+ def get_networks(self, context, filters=None, fields=None):
"""
Retrieves all attributes of the network, NOT including
the ports of that network.
context.tenant_id)
return super(NvpPluginV2, self).update_network(context, id, network)
- def get_ports(self, context, filters=None, fields=None, verbose=None):
+ def get_ports(self, context, filters=None, fields=None):
"""
Returns all ports from given tenant
LOG.debug("delete_port() completed for tenant: %s" % context.tenant_id)
return super(NvpPluginV2, self).delete_port(context, id)
- def get_port(self, context, id, fields=None, verbose=None):
+ def get_port(self, context, id, fields=None):
"""
This method allows the user to retrieve a remote interface
that is attached to this particular port.
:raises: exception.NetworkNotFound
"""
- quantum_db = super(NvpPluginV2, self).get_port(context, id, fields,
- verbose)
+ quantum_db = super(NvpPluginV2, self).get_port(context, id, fields)
port, cluster = (
nvplib.get_port_by_quantum_tag(self.clusters,
self.notifier.network_delete(self.context, id)
return result
- def get_network(self, context, id, fields=None, verbose=None):
- net = super(OVSQuantumPluginV2, self).get_network(context, id,
- None, verbose)
+ def get_network(self, context, id, fields=None):
+ net = super(OVSQuantumPluginV2, self).get_network(context, id, None)
self._extend_network_dict(context, net)
return self._fields(net, fields)
- def get_networks(self, context, filters=None, fields=None, verbose=None):
+ def get_networks(self, context, filters=None, fields=None):
nets = super(OVSQuantumPluginV2, self).get_networks(context, filters,
- None, verbose)
+ None)
for net in nets:
self._extend_network_dict(context, net)
# TODO(rkukura): Filter on extended attributes.
pass
@abstractmethod
- def get_subnet(self, context, id, fields=None, verbose=None):
+ def get_subnet(self, context, id, fields=None):
pass
@abstractmethod
pass
@abstractmethod
- def get_subnets(self, context, filters=None, fields=None, verbose=None):
+ def get_subnets(self, context, filters=None, fields=None):
pass
@abstractmethod
pass
@abstractmethod
- def get_network(self, context, id, fields=None, verbose=None):
+ def get_network(self, context, id, fields=None):
pass
@abstractmethod
- def get_networks(self, context, filters=None, fields=None, verbose=None):
+ def get_networks(self, context, filters=None, fields=None):
pass
@abstractmethod
pass
@abstractmethod
- def get_port(self, context, id, fields=None, verbose=None):
+ def get_port(self, context, id, fields=None):
pass
@abstractmethod
- def get_ports(self, context, filters=None, fields=None, verbose=None):
+ def get_ports(self, context, filters=None, fields=None):
pass
# raises without being caught. Using unittest2
# or dropping 2.6 support so we can use addCleanup
# will get around this.
- def test_verbose_attr(self):
- instance = self.plugin.return_value
- instance.get_networks.return_value = []
-
- self.api.get(_get_path('networks'), {'verbose': 'foo'})
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=mock.ANY,
- fields=mock.ANY,
- verbose=['foo'])
-
- def test_multiple_verbose_attr(self):
- instance = self.plugin.return_value
- instance.get_networks.return_value = []
-
- self.api.get(_get_path('networks'), {'verbose': ['foo', 'bar']})
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=mock.ANY,
- fields=mock.ANY,
- verbose=['foo',
- 'bar'])
-
- def test_verbose_false_str(self):
- instance = self.plugin.return_value
- instance.get_networks.return_value = []
-
- self.api.get(_get_path('networks'), {'verbose': 'false'})
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=mock.ANY,
- fields=mock.ANY,
- verbose=False)
-
- def test_verbose_true_str(self):
- instance = self.plugin.return_value
- instance.get_networks.return_value = []
-
- self.api.get(_get_path('networks'), {'verbose': 'true'})
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=mock.ANY,
- fields=mock.ANY,
- verbose=True)
-
- def test_verbose_true_trump_attr(self):
- instance = self.plugin.return_value
- instance.get_networks.return_value = []
-
- self.api.get(_get_path('networks'), {'verbose': ['true', 'foo']})
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=mock.ANY,
- fields=mock.ANY,
- verbose=True)
-
- def test_verbose_false_trump_attr(self):
- instance = self.plugin.return_value
- instance.get_networks.return_value = []
-
- self.api.get(_get_path('networks'), {'verbose': ['false', 'foo']})
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=mock.ANY,
- fields=mock.ANY,
- verbose=False)
-
- def test_verbose_true_trump_false(self):
- instance = self.plugin.return_value
- instance.get_networks.return_value = []
-
- self.api.get(_get_path('networks'), {'verbose': ['true', 'false']})
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=mock.ANY,
- fields=mock.ANY,
- verbose=True)
-
def _do_field_list(self, resource, base_fields):
attr_info = attributes.RESOURCE_ATTRIBUTE_MAP[resource]
policy_attrs = [name for (name, info) in attr_info.items()
fields = self._do_field_list('networks', ['foo'])
instance.get_networks.assert_called_once_with(mock.ANY,
filters=mock.ANY,
- fields=fields,
- verbose=mock.ANY)
+ fields=fields)
def test_fields_multiple(self):
instance = self.plugin.return_value
self.api.get(_get_path('networks'), {'fields': ['foo', 'bar']})
instance.get_networks.assert_called_once_with(mock.ANY,
filters=mock.ANY,
- fields=fields,
- verbose=mock.ANY)
+ fields=fields)
def test_fields_multiple_with_empty(self):
instance = self.plugin.return_value
self.api.get(_get_path('networks'), {'fields': ['foo', '']})
instance.get_networks.assert_called_once_with(mock.ANY,
filters=mock.ANY,
- fields=fields,
- verbose=mock.ANY)
+ fields=fields)
def test_fields_empty(self):
instance = self.plugin.return_value
self.api.get(_get_path('networks'), {'fields': ''})
instance.get_networks.assert_called_once_with(mock.ANY,
filters=mock.ANY,
- fields=[],
- verbose=mock.ANY)
+ fields=[])
def test_fields_multiple_empty(self):
instance = self.plugin.return_value
self.api.get(_get_path('networks'), {'fields': ['', '']})
instance.get_networks.assert_called_once_with(mock.ANY,
filters=mock.ANY,
- fields=[],
- verbose=mock.ANY)
+ fields=[])
def test_filters(self):
instance = self.plugin.return_value
filters = {'foo': ['bar']}
instance.get_networks.assert_called_once_with(mock.ANY,
filters=filters,
- fields=mock.ANY,
- verbose=mock.ANY)
+ fields=mock.ANY)
def test_filters_empty(self):
instance = self.plugin.return_value
filters = {}
instance.get_networks.assert_called_once_with(mock.ANY,
filters=filters,
- fields=mock.ANY,
- verbose=mock.ANY)
+ fields=mock.ANY)
def test_filters_multiple_empty(self):
instance = self.plugin.return_value
filters = {}
instance.get_networks.assert_called_once_with(mock.ANY,
filters=filters,
- fields=mock.ANY,
- verbose=mock.ANY)
+ fields=mock.ANY)
def test_filters_multiple_with_empty(self):
instance = self.plugin.return_value
filters = {'foo': ['bar']}
instance.get_networks.assert_called_once_with(mock.ANY,
filters=filters,
- fields=mock.ANY,
- verbose=mock.ANY)
+ fields=mock.ANY)
def test_filters_multiple_values(self):
instance = self.plugin.return_value
filters = {'foo': ['bar', 'bar2']}
instance.get_networks.assert_called_once_with(mock.ANY,
filters=filters,
- fields=mock.ANY,
- verbose=mock.ANY)
+ fields=mock.ANY)
def test_filters_multiple(self):
instance = self.plugin.return_value
filters = {'foo': ['bar'], 'foo2': ['bar2']}
instance.get_networks.assert_called_once_with(mock.ANY,
filters=filters,
- fields=mock.ANY,
- verbose=mock.ANY)
+ fields=mock.ANY)
def test_filters_with_fields(self):
instance = self.plugin.return_value
fields = self._do_field_list('networks', ['foo'])
instance.get_networks.assert_called_once_with(mock.ANY,
filters=filters,
- fields=fields,
- verbose=mock.ANY)
-
- def test_filters_with_verbose(self):
- instance = self.plugin.return_value
- instance.get_networks.return_value = []
-
- self.api.get(_get_path('networks'), {'foo': 'bar',
- 'verbose': 'true'})
- filters = {'foo': ['bar']}
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=filters,
- fields=mock.ANY,
- verbose=True)
-
- def test_filters_with_fields_and_verbose(self):
- instance = self.plugin.return_value
- instance.get_networks.return_value = []
-
- self.api.get(_get_path('networks'), {'foo': 'bar',
- 'fields': 'foo',
- 'verbose': 'true'})
- filters = {'foo': ['bar']}
- fields = self._do_field_list('networks', ['foo'])
- instance.get_networks.assert_called_once_with(mock.ANY,
- filters=filters,
- fields=fields,
- verbose=True)
+ fields=fields)
# Note: since all resources use the same controller and validation
res = self.api.get(_get_path('routers'))
instance.get_routers.assert_called_with(mock.ANY, fields=mock.ANY,
- verbose=mock.ANY,
filters=mock.ANY)
self.assertEqual(res.status_int, exc.HTTPOk.code)
res = self.api.get(_get_path('routers', id=router_id))
instance.get_router.assert_called_with(mock.ANY, router_id,
- fields=mock.ANY,
- verbose=mock.ANY)
+ fields=mock.ANY)
self.assertEqual(res.status_int, exc.HTTPOk.code)
self.assertTrue('router' in res.json)
router = res.json['router']
def tearDown(self):
self.patcher.stop()
- quantum.policy.reset()
+ policy.reset()
def test_nonadmin_write_on_private_returns_403(self):
action = "update_network"