Addressing Dan's comments.
This changeset provides:
- improved framework for API versioning, with separated controllers for each API version
- Taken from nova:'Resource' class in WSGI framework. This class is a container for serializers, deserializers and controller
- Better deserialization thanks to resource class. _parse_request_params has been removed
- Improved management of HTTP client errors
NOTE: this changeset does not update the framework used by API extensions.
Change-Id: I88a669ce418225c415e0da22e951762d0708e0a5
[composite:quantum]
use = egg:Paste#urlmap
/: quantumversions
-/v1.0: quantumapi
+/v1.0: quantumapi_v1_0
+/v1.1: quantumapi_v1_1
-[pipeline:quantumapi]
+[pipeline:quantumapi_v1_0]
# By default, authentication is disabled.
# To enable Keystone integration uncomment the
# following line and comment the next one
-pipeline = extensions quantumapiapp
-#pipeline = authN extensions quantumapiapp
+pipeline = extensions quantumapiapp_v1_0
+#pipeline = authN extensions quantumapiapp_v1_0
+
+[pipeline:quantumapi_v1_1]
+# By default, authentication is disabled.
+# To enable Keystone integration uncomment the
+# following line and comment the next one
+pipeline = extensions quantumapiapp_v1_1
+#pipeline = authN extensions quantumapiapp_v1_1
[filter:authN]
paste.filter_factory = keystone.middleware.quantum_auth_token:filter_factory
[app:quantumversions]
paste.app_factory = quantum.api.versions:Versions.factory
-[app:quantumapiapp]
-paste.app_factory = quantum.api:APIRouterV1.factory
+[app:quantumapiapp_v1_0]
+paste.app_factory = quantum.api:APIRouterV10.factory
+
+[app:quantumapiapp_v1_1]
+paste.app_factory = quantum.api:APIRouterV11.factory
import webob.exc
from quantum import manager
-from quantum.api import faults
from quantum.api import attachments
from quantum.api import networks
from quantum.api import ports
FLAGS = flags.FLAGS
-class APIRouterV1(wsgi.Router):
+class APIRouter(wsgi.Router):
"""
- Routes requests on the Quantum API to the appropriate controller
+ Base class for Quantum API routes.
"""
def __init__(self, options=None):
- mapper = routes.Mapper()
+ mapper = self._mapper()
self._setup_routes(mapper, options)
- super(APIRouterV1, self).__init__(mapper)
+ super(APIRouter, self).__init__(mapper)
+
+ def _mapper(self):
+ return routes.Mapper()
def _setup_routes(self, mapper, options):
+ self._setup_base_routes(mapper, options, self._version)
+
+ def _setup_base_routes(self, mapper, options, version):
+ """Routes common to all versions."""
# Loads the quantum plugin
+ # Note(salvatore-orlando): Should the plugin be versioned
+ # I don't think so
plugin = manager.QuantumManager.get_plugin(options)
uri_prefix = '/tenants/{tenant_id}/'
mapper.resource('network', 'networks',
- controller=networks.Controller(plugin),
+ controller=networks.create_resource(plugin, version),
collection={'detail': 'GET'},
member={'detail': 'GET'},
path_prefix=uri_prefix)
mapper.resource('port', 'ports',
- controller=ports.Controller(plugin),
+ controller=ports.create_resource(plugin, version),
collection={'detail': 'GET'},
member={'detail': 'GET'},
parent_resource=dict(member_name='network',
collection_name=uri_prefix +\
'networks'))
- attachments_ctrl = attachments.Controller(plugin)
+ attachments_ctrl = attachments.create_resource(plugin, version)
mapper.connect("get_resource",
uri_prefix + 'networks/{network_id}/' \
'ports/{id}/attachment{.format}',
controller=attachments_ctrl,
action="detach_resource",
conditions=dict(method=['DELETE']))
+
+
+class APIRouterV10(APIRouter):
+ """
+ API routes mappings for Quantum API v1.0
+ """
+ _version = '1.0'
+
+
+class APIRouterV11(APIRouter):
+ """
+ API routes mappings for Quantum API v1.1
+ """
+ _version = '1.1'
# under the License.
import logging
-import webob
from webob import exc
from quantum import wsgi
+from quantum.api import faults
-XML_NS_V01 = 'http://netstack.org/quantum/api/v0.1'
-XML_NS_V10 = 'http://netstack.org/quantum/api/v1.0'
+XML_NS_V10 = 'http://openstack.org/quantum/api/v1.0'
+XML_NS_V11 = 'http://openstack.org/quantum/api/v1.1'
LOG = logging.getLogger('quantum.api.api_common')
-class QuantumController(wsgi.Controller):
+def create_resource(version, controller_dict):
+ """
+ Generic function for creating a wsgi resource
+ The function takes as input:
+ - desired version
+ - controller and metadata dictionary
+ e.g.: {'1.0': [ctrl_v10, meta_v10, xml_ns],
+ '1.1': [ctrl_v11, meta_v11, xml_ns]}
+
+ """
+ # the first element of the iterable is expected to be the controller
+ controller = controller_dict[version][0]
+ # the second element should be the metadata
+ metadata = controller_dict[version][1]
+ # and the third element the xml namespace
+ xmlns = controller_dict[version][2]
+
+ headers_serializer = HeaderSerializer()
+ xml_serializer = wsgi.XMLDictSerializer(metadata, xmlns)
+ json_serializer = wsgi.JSONDictSerializer()
+ xml_deserializer = wsgi.XMLDeserializer(metadata)
+ json_deserializer = wsgi.JSONDeserializer()
+
+ body_serializers = {
+ 'application/xml': xml_serializer,
+ 'application/json': json_serializer,
+ }
+
+ body_deserializers = {
+ 'application/xml': xml_deserializer,
+ 'application/json': json_deserializer,
+ }
+
+ serializer = wsgi.ResponseSerializer(body_serializers, headers_serializer)
+ deserializer = wsgi.RequestDeserializer(body_deserializers)
+
+ return wsgi.Resource(controller, deserializer, serializer)
+
+
+def APIFaultWrapper(errors=None):
+
+ def wrapper(func, **kwargs):
+
+ def the_func(*args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except Exception as e:
+ if errors != None and type(e) in errors:
+ raise faults.QuantumHTTPError(e)
+ # otherwise just re-raise
+ raise
+ the_func.__name__ = func.__name__
+ return the_func
+
+ return wrapper
+
+
+class HeaderSerializer(wsgi.ResponseHeaderSerializer):
+ """
+ Defines default respone status codes for Quantum API operations
+ create - 202 ACCEPTED
+ update - 204 NOCONTENT
+ delete - 204 NOCONTENT
+ others - 200 OK (defined in base class)
+
+ """
+
+ def create(self, response, data):
+ response.status_int = 202
+
+ def delete(self, response, data):
+ response.status_int = 204
+
+ def update(self, response, data):
+ response.status_int = 204
+
+ def attach_resource(self, response, data):
+ response.status_int = 204
+
+ def detach_resource(self, response, data):
+ response.status_int = 204
+
+
+class QuantumController(object):
""" Base controller class for Quantum API """
def __init__(self, plugin):
self._plugin = plugin
super(QuantumController, self).__init__()
- def _parse_request_params(self, req, params):
- results = {}
- data = {}
- # Parameters are expected to be in request body only
- if req.body:
- des_body = self._deserialize(req.body,
- req.best_match_content_type())
- data = des_body and des_body.get(self._resource_name, None)
- if not data:
- msg = ("Failed to parse request. Resource: " +
- self._resource_name + " not found in request body")
- for line in msg.split('\n'):
- LOG.error(line)
- raise exc.HTTPBadRequest(msg)
+ def _prepare_request_body(self, body, params):
+ """ verifies required parameters are in request body.
+ sets default value for missing optional parameters.
+ body argument must be the deserialized body
+ """
+ try:
+ if body is None:
+ # Initialize empty resource for setting default value
+ body = {self._resource_name: {}}
+ data = body[self._resource_name]
+ except KeyError:
+ # raise if _resource_name is not in req body.
+ raise exc.HTTPBadRequest("Unable to find '%s' in request body"\
+ % self._resource_name)
for param in params:
param_name = param['param-name']
param_value = data.get(param_name, None)
for line in msg.split('\n'):
LOG.error(line)
raise exc.HTTPBadRequest(msg)
- results[param_name] = param_value or param.get('default-value')
-
- # There may be other parameters (data extensions), so we
- # should include those in the results dict as well.
- for key in data.keys():
- if key not in params:
- results[key] = data[key]
-
- return results
-
- def _build_response(self, req, res_data, status_code=200):
- """ A function which builds an HTTP response
- given a status code and a dictionary containing
- the response body to be serialized
-
- """
- content_type = req.best_match_content_type()
- default_xmlns = self.get_default_xmlns(req)
- body = self._serialize(res_data, content_type, default_xmlns)
-
- response = webob.Response()
- response.status = status_code
- response.headers['Content-Type'] = content_type
- response.body = body
- msg_dict = dict(url=req.url, status=response.status_int)
- msg = _("%(url)s returned with HTTP %(status)d") % msg_dict
- LOG.debug(msg)
- return response
+ data[param_name] = param_value or param.get('default-value')
+ return body
import logging
-from webob import exc
-
from quantum.api import api_common as common
-from quantum.api import faults
from quantum.api.views import attachments as attachments_view
from quantum.common import exceptions as exception
+
LOG = logging.getLogger('quantum.api.ports')
+def create_resource(plugin, version):
+ controller_dict = {
+ '1.0': [ControllerV10(plugin),
+ ControllerV10._serialization_metadata,
+ common.XML_NS_V10],
+ '1.1': [ControllerV11(plugin),
+ ControllerV11._serialization_metadata,
+ common.XML_NS_V11]}
+ return common.create_resource(version, controller_dict)
+
+
class Controller(common.QuantumController):
""" Port API controller for Quantum API """
self._resource_name = 'attachment'
super(Controller, self).__init__(plugin)
+ @common.APIFaultWrapper([exception.NetworkNotFound,
+ exception.PortNotFound])
def get_resource(self, request, tenant_id, network_id, id):
- try:
- att_data = self._plugin.get_port_details(
- tenant_id, network_id, id)
- builder = attachments_view.get_view_builder(request)
- result = builder.build(att_data)['attachment']
- return dict(attachment=result)
- except exception.NetworkNotFound as e:
- return faults.Fault(faults.NetworkNotFound(e))
- except exception.PortNotFound as e:
- return faults.Fault(faults.PortNotFound(e))
-
- def attach_resource(self, request, tenant_id, network_id, id):
- try:
- request_params = \
- self._parse_request_params(request,
- self._attachment_ops_param_list)
- except exc.HTTPError as e:
- return faults.Fault(e)
- try:
- LOG.debug("PLUGGING INTERFACE:%s", request_params['id'])
- self._plugin.plug_interface(tenant_id, network_id, id,
- request_params['id'])
- return exc.HTTPNoContent()
- except exception.NetworkNotFound as e:
- return faults.Fault(faults.NetworkNotFound(e))
- except exception.PortNotFound as e:
- return faults.Fault(faults.PortNotFound(e))
- except exception.PortInUse as e:
- return faults.Fault(faults.PortInUse(e))
- except exception.AlreadyAttached as e:
- return faults.Fault(faults.AlreadyAttached(e))
+ att_data = self._plugin.get_port_details(
+ tenant_id, network_id, id)
+ builder = attachments_view.get_view_builder(request)
+ result = builder.build(att_data)['attachment']
+ return dict(attachment=result)
+ @common.APIFaultWrapper([exception.NetworkNotFound,
+ exception.PortNotFound,
+ exception.PortInUse,
+ exception.AlreadyAttached])
+ def attach_resource(self, request, tenant_id, network_id, id, body):
+ body = self._prepare_request_body(body,
+ self._attachment_ops_param_list)
+ self._plugin.plug_interface(tenant_id, network_id, id,
+ body['attachment']['id'])
+
+ @common.APIFaultWrapper([exception.NetworkNotFound,
+ exception.PortNotFound])
def detach_resource(self, request, tenant_id, network_id, id):
- try:
- self._plugin.unplug_interface(tenant_id,
- network_id, id)
- return exc.HTTPNoContent()
- except exception.NetworkNotFound as e:
- return faults.Fault(faults.NetworkNotFound(e))
- except exception.PortNotFound as e:
- return faults.Fault(faults.PortNotFound(e))
+ self._plugin.unplug_interface(tenant_id,
+ network_id, id)
+
+
+class ControllerV10(Controller):
+ """Attachment resources controller for Quantum v1.0 API"""
+
+ def __init__(self, plugin):
+ self.version = "1.0"
+ super(ControllerV10, self).__init__(plugin)
+
+
+class ControllerV11(Controller):
+ """Attachment resources controller for Quantum v1.1 API"""
+
+ def __init__(self, plugin):
+ self.version = "1.1"
+ super(ControllerV11, self).__init__(plugin)
# under the License.
-import webob.dec
import webob.exc
-from quantum.api import api_common as common
-from quantum import wsgi
-
-
-class Fault(webob.exc.HTTPException):
- """Error codes for API faults"""
-
- _fault_names = {
- 400: "malformedRequest",
- 401: "unauthorized",
- 420: "networkNotFound",
- 421: "networkInUse",
- 430: "portNotFound",
- 431: "requestedStateInvalid",
- 432: "portInUse",
- 440: "alreadyAttached",
- 470: "serviceUnavailable",
- 471: "pluginFault"}
-
- def __init__(self, exception):
- """Create a Fault for the given webob.exc.exception."""
- self.wrapped_exc = exception
-
- @webob.dec.wsgify(RequestClass=wsgi.Request)
- def __call__(self, req):
- """Generate a WSGI response based on the exception passed to ctor."""
- # Replace the body with fault details.
- code = self.wrapped_exc.status_int
- fault_name = self._fault_names.get(code, "quantumServiceFault")
- fault_data = {
- fault_name: {
- 'code': code,
- 'message': self.wrapped_exc.explanation,
- 'detail': str(self.wrapped_exc.detail)}}
- # 'code' is an attribute on the fault tag itself
- metadata = {'application/xml': {'attributes': {fault_name: 'code'}}}
- default_xmlns = common.XML_NS_V10
- serializer = wsgi.Serializer(metadata, default_xmlns)
- content_type = req.best_match_content_type()
- self.wrapped_exc.body = serializer.serialize(fault_data, content_type)
- self.wrapped_exc.content_type = content_type
- return self.wrapped_exc
-
-
-class NetworkNotFound(webob.exc.HTTPClientError):
- """
- subclass of :class:`~HTTPClientError`
-
- This indicates that the server did not find the network specified
- in the HTTP request
-
- code: 420, title: Network not Found
- """
- code = 420
- title = 'Network not Found'
- explanation = ('Unable to find a network with the specified identifier.')
-
-
-class NetworkInUse(webob.exc.HTTPClientError):
- """
- subclass of :class:`~HTTPClientError`
-
- This indicates that the server could not delete the network as there is
- at least an attachment plugged into its ports
-
- code: 421, title: Network In Use
- """
- code = 421
- title = 'Network in Use'
- explanation = ('Unable to remove the network: attachments still plugged.')
-
-
-class PortNotFound(webob.exc.HTTPClientError):
- """
- subclass of :class:`~HTTPClientError`
-
- This indicates that the server did not find the port specified
- in the HTTP request for a given network
-
- code: 430, title: Port not Found
- """
- code = 430
- title = 'Port not Found'
- explanation = ('Unable to find a port with the specified identifier.')
-
-
-class RequestedStateInvalid(webob.exc.HTTPClientError):
- """
- subclass of :class:`~HTTPClientError`
-
- This indicates that the server could not update the port state to
- to the request value
-
- code: 431, title: Requested State Invalid
- """
- code = 431
- title = 'Requested State Invalid'
- explanation = ('Unable to update port state with specified value.')
-
-
-class PortInUse(webob.exc.HTTPClientError):
- """
- subclass of :class:`~HTTPClientError`
-
- This indicates that the server could not remove o port or attach
- a resource to it because there is an attachment plugged into the port
-
- code: 432, title: PortInUse
- """
- code = 432
- title = 'Port in Use'
- explanation = ('A resource is currently attached to the logical port')
-
-
-class AlreadyAttached(webob.exc.HTTPClientError):
- """
- subclass of :class:`~HTTPClientError`
-
- This indicates that the server refused an attempt to re-attach a resource
- already attached to the network
-
- code: 440, title: AlreadyAttached
- """
- code = 440
- title = 'Already Attached'
- explanation = ('The resource is already attached to another port')
+from quantum.common import exceptions
+
+_NETNOTFOUND_EXPL = 'Unable to find a network with the specified identifier.'
+_NETINUSE_EXPL = 'Unable to remove the network: attachments still plugged.'
+_PORTNOTFOUND_EXPL = 'Unable to find a port with the specified identifier.'
+_STATEINVALID_EXPL = 'Unable to update port state with specified value.'
+_PORTINUSE_EXPL = 'A resource is currently attached to the logical port'
+_ALREADYATTACHED_EXPL = 'The resource is already attached to another port'
+
+
+class QuantumHTTPError(webob.exc.HTTPClientError):
+
+ _fault_dict = {
+ exceptions.NetworkNotFound: {
+ 'code': 420,
+ 'title': 'networkNotFound',
+ 'explanation': _NETNOTFOUND_EXPL
+ },
+ exceptions.NetworkInUse: {
+ 'code': 421,
+ 'title': 'networkInUse',
+ 'explanation': _NETINUSE_EXPL
+ },
+ exceptions.PortNotFound: {
+ 'code': 430,
+ 'title': 'portNotFound',
+ 'explanation': _PORTNOTFOUND_EXPL
+ },
+ exceptions.StateInvalid: {
+ 'code': 431,
+ 'title': 'requestedStateInvalid',
+ 'explanation': _STATEINVALID_EXPL
+ },
+ exceptions.PortInUse: {
+ 'code': 432,
+ 'title': 'portInUse',
+ 'explanation': _PORTINUSE_EXPL
+ },
+ exceptions.AlreadyAttached: {
+ 'code': 440,
+ 'title': 'alreadyAttached',
+ 'explanation': _ALREADYATTACHED_EXPL
+ }
+ }
+
+ def __init__(self, inner_exc):
+ _fault_data = self._fault_dict.get(type(inner_exc), None)
+ if _fault_data:
+ self.code = _fault_data['code']
+ self.title = _fault_data['title']
+ self.explanation = _fault_data['explanation']
+ super(webob.exc.HTTPClientError, self).__init__(inner_exc)
LOG = logging.getLogger('quantum.api.networks')
+def create_resource(plugin, version):
+ controller_dict = {
+ '1.0': [ControllerV10(plugin),
+ ControllerV10._serialization_metadata,
+ common.XML_NS_V10],
+ '1.1': [ControllerV11(plugin),
+ ControllerV11._serialization_metadata,
+ common.XML_NS_V11]}
+ return common.create_resource(version, controller_dict)
+
+
class Controller(common.QuantumController):
""" Network API controller for Quantum API """
'param-name': 'name',
'required': True}, ]
- _serialization_metadata = {
- "application/xml": {
- "attributes": {
- "network": ["id", "name"],
- "port": ["id", "state"],
- "attachment": ["id"]},
- "plurals": {"networks": "network",
- "ports": "port"}},
- }
-
def __init__(self, plugin):
self._resource_name = 'network'
super(Controller, self).__init__(plugin)
- def _item(self, req, tenant_id, network_id,
+ def _item(self, request, tenant_id, network_id,
net_details=True, port_details=False):
# We expect get_network_details to return information
# concerning logical ports as well.
ports_data = [self._plugin.get_port_details(
tenant_id, network_id, port['port-id'])
for port in port_list]
- builder = networks_view.get_view_builder(req)
+ builder = networks_view.get_view_builder(request, self.version)
result = builder.build(network, net_details,
ports_data, port_details)['network']
return dict(network=result)
- def _items(self, req, tenant_id, net_details=False):
+ def _items(self, request, tenant_id, net_details=False):
""" Returns a list of networks. """
networks = self._plugin.get_all_networks(tenant_id)
- builder = networks_view.get_view_builder(req)
+ builder = networks_view.get_view_builder(request, self.version)
result = [builder.build(network, net_details)['network']
for network in networks]
return dict(networks=result)
+ @common.APIFaultWrapper()
def index(self, request, tenant_id):
""" Returns a list of network ids """
return self._items(request, tenant_id)
+ @common.APIFaultWrapper()
def show(self, request, tenant_id, id):
""" Returns network details for the given network id """
try:
return self._item(request, tenant_id, id,
net_details=True, port_details=False)
except exception.NetworkNotFound as e:
- return faults.Fault(faults.NetworkNotFound(e))
+ raise faults.QuantumHTTPError(e)
+ @common.APIFaultWrapper()
def detail(self, request, **kwargs):
tenant_id = kwargs.get('tenant_id')
network_id = kwargs.get('id')
# show details for all networks
return self._items(request, tenant_id, net_details=True)
- def create(self, request, tenant_id):
+ @common.APIFaultWrapper()
+ def create(self, request, tenant_id, body):
""" Creates a new network for a given tenant """
- try:
- request_params = \
- self._parse_request_params(request,
- self._network_ops_param_list)
- except exc.HTTPError as e:
- return faults.Fault(e)
# NOTE(bgh): We're currently passing both request_params['name'] and
# the entire request_params dict because their may be pieces of
# information (data extensions) inside the request params that the
# actual plugin will want to parse. We could just pass only
# request_params but that would mean all the plugins would need to
# change.
+ body = self._prepare_request_body(body, self._network_ops_param_list)
network = self._plugin.\
create_network(tenant_id,
- request_params['name'],
- **request_params)
- builder = networks_view.get_view_builder(request)
+ body['network']['name'],
+ **body)
+ builder = networks_view.get_view_builder(request, self.version)
result = builder.build(network)['network']
- # Wsgi middleware allows us to build the response
- # before returning the call.
- # This will allow us to return a 200 status code. NOTE: in v1.1 we
- # will be returning a 202 status code.
- return self._build_response(request, dict(network=result), 200)
+ return dict(network=result)
- def update(self, request, tenant_id, id):
+ @common.APIFaultWrapper([exception.NetworkNotFound])
+ def update(self, request, tenant_id, id, body):
""" Updates the name for the network with the given id """
- try:
- request_params = \
- self._parse_request_params(request,
- self._network_ops_param_list)
- except exc.HTTPError as e:
- return faults.Fault(e)
- try:
- self._plugin.update_network(tenant_id, id,
- **request_params)
- return exc.HTTPNoContent()
- except exception.NetworkNotFound as e:
- return faults.Fault(faults.NetworkNotFound(e))
+ body = self._prepare_request_body(body, self._network_ops_param_list)
+ self._plugin.update_network(tenant_id, id, **body['network'])
+ @common.APIFaultWrapper([exception.NetworkNotFound,
+ exception.NetworkInUse])
def delete(self, request, tenant_id, id):
""" Destroys the network with the given id """
- try:
- self._plugin.delete_network(tenant_id, id)
- return exc.HTTPNoContent()
- except exception.NetworkNotFound as e:
- return faults.Fault(faults.NetworkNotFound(e))
- except exception.NetworkInUse as e:
- return faults.Fault(faults.NetworkInUse(e))
+ self._plugin.delete_network(tenant_id, id)
+
+
+class ControllerV10(Controller):
+ """Network resources controller for Quantum v1.0 API"""
+
+ _serialization_metadata = {
+ "attributes": {
+ "network": ["id", "name"],
+ "port": ["id", "state"],
+ "attachment": ["id"]},
+ "plurals": {"networks": "network",
+ "ports": "port"}
+ }
+
+ def __init__(self, plugin):
+ self.version = "1.0"
+ super(ControllerV10, self).__init__(plugin)
+
+
+class ControllerV11(Controller):
+ """Network resources controller for Quantum v1.1 API"""
+
+ _serialization_metadata = {
+ "attributes": {
+ "network": ["id", "name"],
+ "port": ["id", "state"],
+ "attachment": ["id"]},
+ "plurals": {"networks": "network",
+ "ports": "port"}
+ }
+
+ def __init__(self, plugin):
+ self.version = "1.1"
+ super(ControllerV11, self).__init__(plugin)
import logging
-from webob import exc
-
from quantum.api import api_common as common
-from quantum.api import faults
from quantum.api.views import ports as ports_view
from quantum.common import exceptions as exception
+
LOG = logging.getLogger('quantum.api.ports')
+def create_resource(plugin, version):
+ controller_dict = {
+ '1.0': [ControllerV10(plugin),
+ ControllerV10._serialization_metadata,
+ common.XML_NS_V10],
+ '1.1': [ControllerV11(plugin),
+ ControllerV11._serialization_metadata,
+ common.XML_NS_V11]}
+ return common.create_resource(version, controller_dict)
+
+
class Controller(common.QuantumController):
""" Port API controller for Quantum API """
'default-value': 'DOWN',
'required': False}, ]
- _serialization_metadata = {
- "application/xml": {
- "attributes": {
- "port": ["id", "state"],
- "attachment": ["id"]},
- "plurals": {"ports": "port"}},
- }
-
def __init__(self, plugin):
self._resource_name = 'port'
super(Controller, self).__init__(plugin)
def _items(self, request, tenant_id, network_id,
port_details=False):
""" Returns a list of ports. """
- try:
- port_list = self._plugin.get_all_ports(tenant_id, network_id)
- builder = ports_view.get_view_builder(request)
-
- # Load extra data for ports if required.
- if port_details:
- port_list_detail = \
- [self._plugin.get_port_details(
- tenant_id, network_id, port['port-id'])
- for port in port_list]
- port_list = port_list_detail
-
- result = [builder.build(port, port_details)['port']
- for port in port_list]
- return dict(ports=result)
- except exception.NetworkNotFound as e:
- return faults.Fault(faults.NetworkNotFound(e))
+ port_list = self._plugin.get_all_ports(tenant_id, network_id)
+ builder = ports_view.get_view_builder(request)
+
+ # Load extra data for ports if required.
+ if port_details:
+ port_list_detail = \
+ [self._plugin.get_port_details(
+ tenant_id, network_id, port['port-id'])
+ for port in port_list]
+ port_list = port_list_detail
+
+ result = [builder.build(port, port_details)['port']
+ for port in port_list]
+ return dict(ports=result)
def _item(self, request, tenant_id, network_id, port_id,
att_details=False):
att_details=att_details)['port']
return dict(port=result)
+ @common.APIFaultWrapper([exception.NetworkNotFound])
def index(self, request, tenant_id, network_id):
""" Returns a list of port ids for a given network """
return self._items(request, tenant_id, network_id, port_details=False)
+ @common.APIFaultWrapper([exception.NetworkNotFound,
+ exception.PortNotFound])
def show(self, request, tenant_id, network_id, id):
""" Returns port details for given port and network """
- try:
- return self._item(request, tenant_id, network_id, id)
- except exception.NetworkNotFound as e:
- return faults.Fault(faults.NetworkNotFound(e))
- except exception.PortNotFound as e:
- return faults.Fault(faults.PortNotFound(e))
+ return self._item(request, tenant_id, network_id, id)
+ @common.APIFaultWrapper([exception.NetworkNotFound,
+ exception.PortNotFound])
def detail(self, request, **kwargs):
tenant_id = kwargs.get('tenant_id')
network_id = kwargs.get('network_id')
return self._items(request, tenant_id,
network_id, port_details=True)
- def create(self, request, tenant_id, network_id):
- """ Creates a new port for a given network """
- try:
- request_params = \
- self._parse_request_params(request, self._port_ops_param_list)
- except exc.HTTPError as e:
- return faults.Fault(e)
- try:
- port = self._plugin.create_port(tenant_id,
- network_id,
- request_params['state'],
- **request_params)
- builder = ports_view.get_view_builder(request)
- result = builder.build(port)['port']
- # Wsgi middleware allows us to build the response
- # before returning the call.
- # This will allow us to return a 200 status code. NOTE: in v1.1
- # we will be returning a 202 status code.
- return self._build_response(request, dict(port=result), 200)
- except exception.NetworkNotFound as e:
- return faults.Fault(faults.NetworkNotFound(e))
- except exception.StateInvalid as e:
- return faults.Fault(faults.RequestedStateInvalid(e))
-
- def update(self, request, tenant_id, network_id, id):
+ @common.APIFaultWrapper([exception.NetworkNotFound,
+ exception.StateInvalid])
+ def create(self, request, tenant_id, network_id, body=None):
+ """ Creates a new port for a given network
+ The request body is optional for a port object.
+
+ """
+ body = self._prepare_request_body(body, self._port_ops_param_list)
+ port = self._plugin.create_port(tenant_id,
+ network_id, body['port']['state'],
+ **body)
+ builder = ports_view.get_view_builder(request)
+ result = builder.build(port)['port']
+ return dict(port=result)
+
+ @common.APIFaultWrapper([exception.NetworkNotFound,
+ exception.PortNotFound,
+ exception.StateInvalid])
+ def update(self, request, tenant_id, network_id, id, body):
""" Updates the state of a port for a given network """
- try:
- request_params = \
- self._parse_request_params(request, self._port_ops_param_list)
- except exc.HTTPError as e:
- return faults.Fault(e)
- try:
- self._plugin.update_port(tenant_id, network_id, id,
- **request_params)
- return exc.HTTPNoContent()
- except exception.NetworkNotFound as e:
- return faults.Fault(faults.NetworkNotFound(e))
- except exception.PortNotFound as e:
- return faults.Fault(faults.PortNotFound(e))
- except exception.StateInvalid as e:
- return faults.Fault(faults.RequestedStateInvalid(e))
+ body = self._prepare_request_body(body, self._port_ops_param_list)
+ self._plugin.update_port(tenant_id, network_id, id, **body['port'])
+ @common.APIFaultWrapper([exception.NetworkNotFound,
+ exception.PortNotFound,
+ exception.PortInUse])
def delete(self, request, tenant_id, network_id, id):
""" Destroys the port with the given id """
- #look for port state in request
- try:
- self._plugin.delete_port(tenant_id, network_id, id)
- return exc.HTTPNoContent()
- except exception.NetworkNotFound as e:
- return faults.Fault(faults.NetworkNotFound(e))
- except exception.PortNotFound as e:
- return faults.Fault(faults.PortNotFound(e))
- except exception.PortInUse as e:
- return faults.Fault(faults.PortInUse(e))
+ self._plugin.delete_port(tenant_id, network_id, id)
+
+
+class ControllerV10(Controller):
+ """Port resources controller for Quantum v1.0 API"""
+
+ _serialization_metadata = {
+ "attributes": {
+ "port": ["id", "state"],
+ "attachment": ["id"]},
+ "plurals": {"ports": "port"}
+ }
+
+ def __init__(self, plugin):
+ self.version = "1.0"
+ super(ControllerV10, self).__init__(plugin)
+
+
+class ControllerV11(Controller):
+ """Port resources controller for Quantum v1.1 API"""
+
+ _serialization_metadata = {
+ "attributes": {
+ "port": ["id", "state"],
+ "attachment": ["id"]},
+ "plurals": {"ports": "port"}
+ }
+
+ def __init__(self, plugin):
+ self.version = "1.1"
+ super(ControllerV11, self).__init__(plugin)
},
{
"id": "v1.1",
- "status": "FUTURE",
+ "status": "PROPOSED",
},
]
# under the License.
-def get_view_builder(req):
+def get_view_builder(req, version):
base_url = req.application_url
- return ViewBuilder(base_url)
+ view_builder = {
+ '1.0': ViewBuilder10,
+ '1.1': ViewBuilder11,
+ }[version](base_url)
+ return view_builder
-class ViewBuilder(object):
+class ViewBuilder10(object):
def __init__(self, base_url=None):
"""
if port_data['attachment']:
port_dict['attachment'] = dict(id=port_data['attachment'])
return port_dict
+
+
+class ViewBuilder11(ViewBuilder10):
+ #TODO(salvatore-orlando): will extend for Operational status
+ # in appropriate branch
+ pass
"already plugged into another port.")
+class MalformedRequestBody(QuantumException):
+ message = _("Malformed request body: %(reason)s")
+
+
class Duplicate(Error):
pass
if current_case != self._last_case:
self.stream.writeln(current_case)
self._last_case = current_case
-
+ #NOTE(salvatore-orlando):
+ #slightly changed in order to print test case class
+ #together with unit test name
self.stream.write(
- ' %s' % str(test.test._testMethodName).ljust(60))
+ ' %s' % str(test.test).ljust(60))
self.stream.flush()
LOG.error(_('No known API applications configured in %s.'),
paste_config_file)
return
- server = wsgi.Server()
+ server = wsgi.Server("Quantum")
server.start(app,
int(paste_conf['bind_port']), paste_conf['bind_host'])
return server
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010-2011 ????
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+# @author: Brad Hall, Nicira Networks
+# @author: Salvatore Orlando, Citrix Systems
+
+import logging
+import unittest
+
+import quantum.tests.unit.testlib_api as testlib
+
+from quantum.db import api as db
+from quantum.common import utils
+from quantum.common.test_lib import test_config
+from quantum.wsgi import XMLDeserializer, JSONDeserializer
+
+LOG = logging.getLogger('quantum.tests.test_api')
+NETS = "networks"
+PORTS = "ports"
+ATTS = "attachments"
+
+
+class AbstractAPITest(unittest.TestCase):
+ """Abstract base class for Quantum API unit tests
+ Defined according to operations defined for Quantum API v1.0
+
+ """
+
+ def _deserialize_net_response(self, content_type, response):
+ network_data = self._net_deserializers[content_type].\
+ deserialize(response.body)['body']
+ # do not taint assertions with xml namespace
+ if 'xmlns' in network_data['network']:
+ del network_data['network']['xmlns']
+ return network_data
+
+ def _deserialize_port_response(self, content_type, response):
+ port_data = self._port_deserializers[content_type].\
+ deserialize(response.body)['body']
+ # do not taint assertions with xml namespace
+ if 'xmlns' in port_data['port']:
+ del port_data['port']['xmlns']
+ return port_data
+
+ def _create_network(self, fmt, name=None, custom_req_body=None,
+ expected_res_status=202):
+ LOG.debug("Creating network")
+ content_type = "application/" + fmt
+ if name:
+ net_name = name
+ else:
+ net_name = self.network_name
+ network_req = testlib.new_network_request(self.tenant_id,
+ net_name, fmt,
+ custom_req_body)
+ network_res = network_req.get_response(self.api)
+ self.assertEqual(network_res.status_int, expected_res_status)
+ if expected_res_status in (200, 202):
+ network_data = self._deserialize_net_response(content_type,
+ network_res)
+ return network_data['network']['id']
+
+ def _create_port(self, network_id, port_state, fmt,
+ custom_req_body=None, expected_res_status=202):
+ LOG.debug("Creating port for network %s", network_id)
+ content_type = "application/%s" % fmt
+ port_req = testlib.new_port_request(self.tenant_id, network_id,
+ port_state, fmt,
+ custom_req_body)
+ port_res = port_req.get_response(self.api)
+ self.assertEqual(port_res.status_int, expected_res_status)
+ if expected_res_status in (200, 202):
+ port_data = self._deserialize_port_response(content_type,
+ port_res)
+ LOG.debug("PORT RESPONSE:%s", port_res.body)
+ LOG.debug("PORT DATA:%s", port_data)
+ return port_data['port']['id']
+
+ def _test_create_network(self, fmt):
+ LOG.debug("_test_create_network - fmt:%s - START", fmt)
+ content_type = "application/%s" % fmt
+ network_id = self._create_network(fmt)
+ show_network_req = testlib.show_network_request(self.tenant_id,
+ network_id,
+ fmt)
+ show_network_res = show_network_req.get_response(self.api)
+ self.assertEqual(show_network_res.status_int, 200)
+ network_data = self._net_deserializers[content_type].\
+ deserialize(show_network_res.body)['body']
+ self.assertEqual(network_id, network_data['network']['id'])
+ LOG.debug("_test_create_network - fmt:%s - END", fmt)
+
+ def _test_create_network_badrequest(self, fmt):
+ LOG.debug("_test_create_network_badrequest - fmt:%s - START",
+ fmt)
+ bad_body = {'network': {'bad-attribute': 'very-bad'}}
+ self._create_network(fmt, custom_req_body=bad_body,
+ expected_res_status=400)
+ LOG.debug("_test_create_network_badrequest - fmt:%s - END",
+ fmt)
+
+ def _test_list_networks(self, fmt):
+ LOG.debug("_test_list_networks - fmt:%s - START", fmt)
+ content_type = "application/%s" % fmt
+ self._create_network(fmt, "net_1")
+ self._create_network(fmt, "net_2")
+ list_network_req = testlib.network_list_request(self.tenant_id,
+ fmt)
+ list_network_res = list_network_req.get_response(self.api)
+ self.assertEqual(list_network_res.status_int, 200)
+ network_data = self._net_deserializers[content_type].\
+ deserialize(list_network_res.body)['body']
+ # Check network count: should return 2
+ self.assertEqual(len(network_data['networks']), 2)
+ LOG.debug("_test_list_networks - fmt:%s - END", fmt)
+
+ def _test_list_networks_detail(self, fmt):
+ LOG.debug("_test_list_networks_detail - fmt:%s - START", fmt)
+ content_type = "application/%s" % fmt
+ self._create_network(fmt, "net_1")
+ self._create_network(fmt, "net_2")
+ list_network_req = testlib.network_list_detail_request(self.tenant_id,
+ fmt)
+ list_network_res = list_network_req.get_response(self.api)
+ self.assertEqual(list_network_res.status_int, 200)
+ network_data = self._net_deserializers[content_type].\
+ deserialize(list_network_res.body)['body']
+ # Check network count: should return 2
+ self.assertEqual(len(network_data['networks']), 2)
+ # Check contents - id & name for each network
+ for network in network_data['networks']:
+ self.assertTrue('id' in network and 'name' in network)
+ self.assertTrue(network['id'] and network['name'])
+ LOG.debug("_test_list_networks_detail - fmt:%s - END", fmt)
+
+ def _test_show_network(self, fmt):
+ LOG.debug("_test_show_network - fmt:%s - START", fmt)
+ content_type = "application/%s" % fmt
+ network_id = self._create_network(fmt)
+ show_network_req = testlib.show_network_request(self.tenant_id,
+ network_id,
+ fmt)
+ show_network_res = show_network_req.get_response(self.api)
+ self.assertEqual(show_network_res.status_int, 200)
+ network_data = self._deserialize_net_response(content_type,
+ show_network_res)
+ self.assertEqual({'id': network_id,
+ 'name': self.network_name},
+ network_data['network'])
+ LOG.debug("_test_show_network - fmt:%s - END", fmt)
+
+ def _test_show_network_detail(self, fmt):
+ LOG.debug("_test_show_network_detail - fmt:%s - START", fmt)
+ content_type = "application/%s" % fmt
+ # Create a network and a port
+ network_id = self._create_network(fmt)
+ port_id = self._create_port(network_id, "ACTIVE", fmt)
+ show_network_req = testlib.show_network_detail_request(
+ self.tenant_id, network_id, fmt)
+ show_network_res = show_network_req.get_response(self.api)
+ self.assertEqual(show_network_res.status_int, 200)
+ network_data = self._deserialize_net_response(content_type,
+ show_network_res)
+ self.assertEqual({'id': network_id,
+ 'name': self.network_name,
+ 'ports': [{'id': port_id,
+ 'state': 'ACTIVE'}]},
+ network_data['network'])
+ LOG.debug("_test_show_network_detail - fmt:%s - END", fmt)
+
+ def _test_show_network_not_found(self, fmt):
+ LOG.debug("_test_show_network_not_found - fmt:%s - START", fmt)
+ show_network_req = testlib.show_network_request(self.tenant_id,
+ "A_BAD_ID",
+ fmt)
+ show_network_res = show_network_req.get_response(self.api)
+ self.assertEqual(show_network_res.status_int, 420)
+ LOG.debug("_test_show_network_not_found - fmt:%s - END", fmt)
+
+ def _test_rename_network(self, fmt):
+ LOG.debug("_test_rename_network - fmt:%s - START", fmt)
+ content_type = "application/%s" % fmt
+ new_name = 'new_network_name'
+ network_id = self._create_network(fmt)
+ update_network_req = testlib.update_network_request(self.tenant_id,
+ network_id,
+ new_name,
+ fmt)
+ update_network_res = update_network_req.get_response(self.api)
+ self.assertEqual(update_network_res.status_int, 204)
+ show_network_req = testlib.show_network_request(self.tenant_id,
+ network_id,
+ fmt)
+ show_network_res = show_network_req.get_response(self.api)
+ self.assertEqual(show_network_res.status_int, 200)
+ network_data = self._deserialize_net_response(content_type,
+ show_network_res)
+ self.assertEqual({'id': network_id,
+ 'name': new_name},
+ network_data['network'])
+ LOG.debug("_test_rename_network - fmt:%s - END", fmt)
+
+ def _test_rename_network_badrequest(self, fmt):
+ LOG.debug("_test_rename_network_badrequest - fmt:%s - START",
+ fmt)
+ network_id = self._create_network(fmt)
+ bad_body = {'network': {'bad-attribute': 'very-bad'}}
+ update_network_req = testlib.\
+ update_network_request(self.tenant_id,
+ network_id, fmt,
+ custom_req_body=bad_body)
+ update_network_res = update_network_req.get_response(self.api)
+ self.assertEqual(update_network_res.status_int, 400)
+ LOG.debug("_test_rename_network_badrequest - fmt:%s - END",
+ fmt)
+
+ def _test_rename_network_not_found(self, fmt):
+ LOG.debug("_test_rename_network_not_found - fmt:%s - START",
+ fmt)
+ new_name = 'new_network_name'
+ update_network_req = testlib.update_network_request(self.tenant_id,
+ "A BAD ID",
+ new_name,
+ fmt)
+ update_network_res = update_network_req.get_response(self.api)
+ self.assertEqual(update_network_res.status_int, 420)
+ LOG.debug("_test_rename_network_not_found - fmt:%s - END",
+ fmt)
+
+ def _test_delete_network(self, fmt):
+ LOG.debug("_test_delete_network - fmt:%s - START", fmt)
+ content_type = "application/%s" % fmt
+ network_id = self._create_network(fmt)
+ LOG.debug("Deleting network %s"\
+ " of tenant %s" % (network_id, self.tenant_id))
+ delete_network_req = testlib.network_delete_request(self.tenant_id,
+ network_id,
+ fmt)
+ delete_network_res = delete_network_req.get_response(self.api)
+ self.assertEqual(delete_network_res.status_int, 204)
+ list_network_req = testlib.network_list_request(self.tenant_id,
+ fmt)
+ list_network_res = list_network_req.get_response(self.api)
+ network_list_data = self._net_deserializers[content_type].\
+ deserialize(list_network_res.body)['body']
+ network_count = len(network_list_data['networks'])
+ self.assertEqual(network_count, 0)
+ LOG.debug("_test_delete_network - fmt:%s - END", fmt)
+
+ def _test_delete_network_in_use(self, fmt):
+ LOG.debug("_test_delete_network_in_use - fmt:%s - START", fmt)
+ port_state = "ACTIVE"
+ attachment_id = "test_attachment"
+ network_id = self._create_network(fmt)
+ LOG.debug("Deleting network %s"\
+ " of tenant %s" % (network_id, self.tenant_id))
+ port_id = self._create_port(network_id, port_state, fmt)
+ #plug an attachment into the port
+ LOG.debug("Putting attachment into port %s", port_id)
+ attachment_req = testlib.put_attachment_request(self.tenant_id,
+ network_id,
+ port_id,
+ attachment_id)
+ attachment_res = attachment_req.get_response(self.api)
+ self.assertEquals(attachment_res.status_int, 204)
+
+ LOG.debug("Deleting network %s"\
+ " of tenant %s" % (network_id, self.tenant_id))
+ delete_network_req = testlib.network_delete_request(self.tenant_id,
+ network_id,
+ fmt)
+ delete_network_res = delete_network_req.get_response(self.api)
+ self.assertEqual(delete_network_res.status_int, 421)
+ LOG.debug("_test_delete_network_in_use - fmt:%s - END", fmt)
+
+ def _test_delete_network_with_unattached_port(self, fmt):
+ LOG.debug("_test_delete_network_with_unattached_port "\
+ "- fmt:%s - START", fmt)
+ port_state = "ACTIVE"
+ network_id = self._create_network(fmt)
+ LOG.debug("Deleting network %s"\
+ " of tenant %s" % (network_id, self.tenant_id))
+ self._create_port(network_id, port_state, fmt)
+
+ LOG.debug("Deleting network %s"\
+ " of tenant %s" % (network_id, self.tenant_id))
+ delete_network_req = testlib.network_delete_request(self.tenant_id,
+ network_id,
+ fmt)
+ delete_network_res = delete_network_req.get_response(self.api)
+ self.assertEqual(delete_network_res.status_int, 204)
+ LOG.debug("_test_delete_network_with_unattached_port "\
+ "- fmt:%s - END", fmt)
+
+ def _test_list_ports(self, fmt):
+ LOG.debug("_test_list_ports - fmt:%s - START", fmt)
+ content_type = "application/%s" % fmt
+ port_state = "ACTIVE"
+ network_id = self._create_network(fmt)
+ self._create_port(network_id, port_state, fmt)
+ self._create_port(network_id, port_state, fmt)
+ list_port_req = testlib.port_list_request(self.tenant_id,
+ network_id, fmt)
+ list_port_res = list_port_req.get_response(self.api)
+ self.assertEqual(list_port_res.status_int, 200)
+ port_data = self._port_deserializers[content_type].\
+ deserialize(list_port_res.body)['body']
+ # Check port count: should return 2
+ self.assertEqual(len(port_data['ports']), 2)
+ LOG.debug("_test_list_ports - fmt:%s - END", fmt)
+
+ def _test_list_ports_networknotfound(self, fmt):
+ LOG.debug("_test_list_ports_networknotfound"
+ " - fmt:%s - START", fmt)
+ list_port_req = testlib.port_list_request(self.tenant_id,
+ "A_BAD_ID", fmt)
+ list_port_res = list_port_req.get_response(self.api)
+ self.assertEqual(list_port_res.status_int, 420)
+ LOG.debug("_test_list_ports_networknotfound - fmt:%s - END", fmt)
+
+ def _test_list_ports_detail(self, fmt):
+ LOG.debug("_test_list_ports_detail - fmt:%s - START", fmt)
+ content_type = "application/%s" % fmt
+ port_state = "ACTIVE"
+ network_id = self._create_network(fmt)
+ self._create_port(network_id, port_state, fmt)
+ self._create_port(network_id, port_state, fmt)
+ list_port_req = testlib.port_list_detail_request(self.tenant_id,
+ network_id, fmt)
+ list_port_res = list_port_req.get_response(self.api)
+ self.assertEqual(list_port_res.status_int, 200)
+ port_data = self._port_deserializers[content_type].\
+ deserialize(list_port_res.body)['body']
+ # Check port count: should return 2
+ self.assertEqual(len(port_data['ports']), 2)
+ # Check contents - id & name for each network
+ for port in port_data['ports']:
+ self.assertTrue('id' in port and 'state' in port)
+ self.assertTrue(port['id'] and port['state'])
+ LOG.debug("_test_list_ports_detail - fmt:%s - END", fmt)
+
+ def _test_show_port(self, fmt):
+ LOG.debug("_test_show_port - fmt:%s - START", fmt)
+ content_type = "application/%s" % fmt
+ port_state = "ACTIVE"
+ network_id = self._create_network(fmt)
+ port_id = self._create_port(network_id, port_state, fmt)
+ show_port_req = testlib.show_port_request(self.tenant_id,
+ network_id, port_id,
+ fmt)
+ show_port_res = show_port_req.get_response(self.api)
+ self.assertEqual(show_port_res.status_int, 200)
+ port_data = self._deserialize_port_response(content_type,
+ show_port_res)
+ self.assertEqual({'id': port_id, 'state': port_state},
+ port_data['port'])
+ LOG.debug("_test_show_port - fmt:%s - END", fmt)
+
+ def _test_show_port_detail(self, fmt):
+ LOG.debug("_test_show_port - fmt:%s - START", fmt)
+ content_type = "application/%s" % fmt
+ port_state = "ACTIVE"
+ network_id = self._create_network(fmt)
+ port_id = self._create_port(network_id, port_state, fmt)
+
+ # Part 1 - no attachment
+ show_port_req = testlib.show_port_detail_request(self.tenant_id,
+ network_id, port_id, fmt)
+ show_port_res = show_port_req.get_response(self.api)
+ self.assertEqual(show_port_res.status_int, 200)
+ port_data = self._deserialize_port_response(content_type,
+ show_port_res)
+ self.assertEqual({'id': port_id, 'state': port_state},
+ port_data['port'])
+
+ # Part 2 - plug attachment into port
+ interface_id = "test_interface"
+ put_attachment_req = testlib.put_attachment_request(self.tenant_id,
+ network_id,
+ port_id,
+ interface_id,
+ fmt)
+ put_attachment_res = put_attachment_req.get_response(self.api)
+ self.assertEqual(put_attachment_res.status_int, 204)
+ show_port_req = testlib.show_port_detail_request(self.tenant_id,
+ network_id, port_id, fmt)
+ show_port_res = show_port_req.get_response(self.api)
+ self.assertEqual(show_port_res.status_int, 200)
+ port_data = self._deserialize_port_response(content_type,
+ show_port_res)
+ self.assertEqual({'id': port_id, 'state': port_state,
+ 'attachment': {'id': interface_id}},
+ port_data['port'])
+
+ LOG.debug("_test_show_port_detail - fmt:%s - END", fmt)
+
+ def _test_show_port_networknotfound(self, fmt):
+ LOG.debug("_test_show_port_networknotfound - fmt:%s - START",
+ fmt)
+ port_state = "ACTIVE"
+ network_id = self._create_network(fmt)
+ port_id = self._create_port(network_id, port_state, fmt)
+ show_port_req = testlib.show_port_request(self.tenant_id,
+ "A_BAD_ID", port_id,
+ fmt)
+ show_port_res = show_port_req.get_response(self.api)
+ self.assertEqual(show_port_res.status_int, 420)
+ LOG.debug("_test_show_port_networknotfound - fmt:%s - END",
+ fmt)
+
+ def _test_show_port_portnotfound(self, fmt):
+ LOG.debug("_test_show_port_portnotfound - fmt:%s - START", fmt)
+ network_id = self._create_network(fmt)
+ show_port_req = testlib.show_port_request(self.tenant_id,
+ network_id,
+ "A_BAD_ID",
+ fmt)
+ show_port_res = show_port_req.get_response(self.api)
+ self.assertEqual(show_port_res.status_int, 430)
+ LOG.debug("_test_show_port_portnotfound - fmt:%s - END", fmt)
+
+ def _test_create_port_noreqbody(self, fmt):
+ LOG.debug("_test_create_port_noreqbody - fmt:%s - START", fmt)
+ content_type = "application/%s" % fmt
+ network_id = self._create_network(fmt)
+ port_id = self._create_port(network_id, None, fmt,
+ custom_req_body='')
+ show_port_req = testlib.show_port_request(self.tenant_id,
+ network_id, port_id, fmt)
+ show_port_res = show_port_req.get_response(self.api)
+ self.assertEqual(show_port_res.status_int, 200)
+ port_data = self._port_deserializers[content_type].\
+ deserialize(show_port_res.body)['body']
+ self.assertEqual(port_id, port_data['port']['id'])
+ LOG.debug("_test_create_port_noreqbody - fmt:%s - END", fmt)
+
+ def _test_create_port(self, fmt):
+ LOG.debug("_test_create_port - fmt:%s - START", fmt)
+ content_type = "application/%s" % fmt
+ port_state = "ACTIVE"
+ network_id = self._create_network(fmt)
+ port_id = self._create_port(network_id, port_state, fmt)
+ show_port_req = testlib.show_port_request(self.tenant_id,
+ network_id, port_id, fmt)
+ show_port_res = show_port_req.get_response(self.api)
+ self.assertEqual(show_port_res.status_int, 200)
+ port_data = self._port_deserializers[content_type].\
+ deserialize(show_port_res.body)['body']
+ self.assertEqual(port_id, port_data['port']['id'])
+ LOG.debug("_test_create_port - fmt:%s - END", fmt)
+
+ def _test_create_port_networknotfound(self, fmt):
+ LOG.debug("_test_create_port_networknotfound - fmt:%s - START",
+ fmt)
+ port_state = "ACTIVE"
+ self._create_port("A_BAD_ID", port_state, fmt,
+ expected_res_status=420)
+ LOG.debug("_test_create_port_networknotfound - fmt:%s - END",
+ fmt)
+
+ def _test_create_port_badrequest(self, fmt):
+ LOG.debug("_test_create_port_badrequest - fmt:%s - START", fmt)
+ bad_body = {'bad-resource': {'bad-attribute': 'bad-value'}}
+ network_id = self._create_network(fmt)
+ port_state = "ACTIVE"
+ self._create_port(network_id, port_state, fmt,
+ custom_req_body=bad_body, expected_res_status=400)
+ LOG.debug("_test_create_port_badrequest - fmt:%s - END", fmt)
+
+ def _test_delete_port(self, fmt):
+ LOG.debug("_test_delete_port - fmt:%s - START", fmt)
+ content_type = "application/%s" % fmt
+ port_state = "ACTIVE"
+ network_id = self._create_network(fmt)
+ port_id = self._create_port(network_id, port_state, fmt)
+ LOG.debug("Deleting port %s for network %s"\
+ " of tenant %s" % (port_id, network_id,
+ self.tenant_id))
+ delete_port_req = testlib.port_delete_request(self.tenant_id,
+ network_id, port_id,
+ fmt)
+ delete_port_res = delete_port_req.get_response(self.api)
+ self.assertEqual(delete_port_res.status_int, 204)
+ list_port_req = testlib.port_list_request(self.tenant_id, network_id,
+ fmt)
+ list_port_res = list_port_req.get_response(self.api)
+ port_list_data = self._port_deserializers[content_type].\
+ deserialize(list_port_res.body)['body']
+ port_count = len(port_list_data['ports'])
+ self.assertEqual(port_count, 0)
+ LOG.debug("_test_delete_port - fmt:%s - END", fmt)
+
+ def _test_delete_port_in_use(self, fmt):
+ LOG.debug("_test_delete_port_in_use - fmt:%s - START", fmt)
+ port_state = "ACTIVE"
+ attachment_id = "test_attachment"
+ network_id = self._create_network(fmt)
+ port_id = self._create_port(network_id, port_state, fmt)
+ #plug an attachment into the port
+ LOG.debug("Putting attachment into port %s", port_id)
+ attachment_req = testlib.put_attachment_request(self.tenant_id,
+ network_id,
+ port_id,
+ attachment_id)
+ attachment_res = attachment_req.get_response(self.api)
+ self.assertEquals(attachment_res.status_int, 204)
+ LOG.debug("Deleting port %s for network %s"\
+ " of tenant %s" % (port_id, network_id,
+ self.tenant_id))
+ delete_port_req = testlib.port_delete_request(self.tenant_id,
+ network_id, port_id,
+ fmt)
+ delete_port_res = delete_port_req.get_response(self.api)
+ self.assertEqual(delete_port_res.status_int, 432)
+ LOG.debug("_test_delete_port_in_use - fmt:%s - END", fmt)
+
+ def _test_delete_port_with_bad_id(self, fmt):
+ LOG.debug("_test_delete_port_with_bad_id - fmt:%s - START",
+ fmt)
+ port_state = "ACTIVE"
+ network_id = self._create_network(fmt)
+ self._create_port(network_id, port_state, fmt)
+ # Test for portnotfound
+ delete_port_req = testlib.port_delete_request(self.tenant_id,
+ network_id, "A_BAD_ID",
+ fmt)
+ delete_port_res = delete_port_req.get_response(self.api)
+ self.assertEqual(delete_port_res.status_int, 430)
+ LOG.debug("_test_delete_port_with_bad_id - fmt:%s - END", fmt)
+
+ def _test_delete_port_networknotfound(self, fmt):
+ LOG.debug("_test_delete_port_networknotfound - fmt:%s - START",
+ fmt)
+ port_state = "ACTIVE"
+ network_id = self._create_network(fmt)
+ port_id = self._create_port(network_id, port_state, fmt)
+ delete_port_req = testlib.port_delete_request(self.tenant_id,
+ "A_BAD_ID", port_id,
+ fmt)
+ delete_port_res = delete_port_req.get_response(self.api)
+ self.assertEqual(delete_port_res.status_int, 420)
+ LOG.debug("_test_delete_port_networknotfound - fmt:%s - END",
+ fmt)
+
+ def _test_set_port_state(self, fmt):
+ LOG.debug("_test_set_port_state - fmt:%s - START", fmt)
+ content_type = "application/%s" % fmt
+ port_state = 'DOWN'
+ new_port_state = 'ACTIVE'
+ network_id = self._create_network(fmt)
+ port_id = self._create_port(network_id, port_state, fmt)
+ update_port_req = testlib.update_port_request(self.tenant_id,
+ network_id, port_id,
+ new_port_state,
+ fmt)
+ update_port_res = update_port_req.get_response(self.api)
+ self.assertEqual(update_port_res.status_int, 204)
+ show_port_req = testlib.show_port_request(self.tenant_id,
+ network_id, port_id,
+ fmt)
+ show_port_res = show_port_req.get_response(self.api)
+ self.assertEqual(show_port_res.status_int, 200)
+ port_data = self._deserialize_port_response(content_type,
+ show_port_res)
+ self.assertEqual({'id': port_id, 'state': new_port_state},
+ port_data['port'])
+ # now set it back to the original value
+ update_port_req = testlib.update_port_request(self.tenant_id,
+ network_id, port_id,
+ port_state,
+ fmt)
+ update_port_res = update_port_req.get_response(self.api)
+ self.assertEqual(update_port_res.status_int, 204)
+ show_port_req = testlib.show_port_request(self.tenant_id,
+ network_id, port_id,
+ fmt)
+ show_port_res = show_port_req.get_response(self.api)
+ self.assertEqual(show_port_res.status_int, 200)
+ port_data = self._deserialize_port_response(content_type,
+ show_port_res)
+ self.assertEqual({'id': port_id, 'state': port_state},
+ port_data['port'])
+ LOG.debug("_test_set_port_state - fmt:%s - END", fmt)
+
+ def _test_set_port_state_networknotfound(self, fmt):
+ LOG.debug("_test_set_port_state_networknotfound - fmt:%s - START",
+ fmt)
+ port_state = 'DOWN'
+ new_port_state = 'ACTIVE'
+ network_id = self._create_network(fmt)
+ port_id = self._create_port(network_id, port_state, fmt)
+ update_port_req = testlib.update_port_request(self.tenant_id,
+ "A_BAD_ID", port_id,
+ new_port_state,
+ fmt)
+ update_port_res = update_port_req.get_response(self.api)
+ self.assertEqual(update_port_res.status_int, 420)
+ LOG.debug("_test_set_port_state_networknotfound - fmt:%s - END",
+ fmt)
+
+ def _test_set_port_state_portnotfound(self, fmt):
+ LOG.debug("_test_set_port_state_portnotfound - fmt:%s - START",
+ fmt)
+ port_state = 'DOWN'
+ new_port_state = 'ACTIVE'
+ network_id = self._create_network(fmt)
+ self._create_port(network_id, port_state, fmt)
+ update_port_req = testlib.update_port_request(self.tenant_id,
+ network_id,
+ "A_BAD_ID",
+ new_port_state,
+ fmt)
+ update_port_res = update_port_req.get_response(self.api)
+ self.assertEqual(update_port_res.status_int, 430)
+ LOG.debug("_test_set_port_state_portnotfound - fmt:%s - END",
+ fmt)
+
+ def _test_set_port_state_stateinvalid(self, fmt):
+ LOG.debug("_test_set_port_state_stateinvalid - fmt:%s - START",
+ fmt)
+ port_state = 'DOWN'
+ new_port_state = 'A_BAD_STATE'
+ network_id = self._create_network(fmt)
+ port_id = self._create_port(network_id, port_state, fmt)
+ update_port_req = testlib.update_port_request(self.tenant_id,
+ network_id, port_id,
+ new_port_state,
+ fmt)
+ update_port_res = update_port_req.get_response(self.api)
+ self.assertEqual(update_port_res.status_int, 431)
+ LOG.debug("_test_set_port_state_stateinvalid - fmt:%s - END",
+ fmt)
+
+ def _test_show_attachment(self, fmt):
+ LOG.debug("_test_show_attachment - fmt:%s - START", fmt)
+ content_type = "application/%s" % fmt
+ port_state = "ACTIVE"
+ network_id = self._create_network(fmt)
+ interface_id = "test_interface"
+ port_id = self._create_port(network_id, port_state, fmt)
+ put_attachment_req = testlib.put_attachment_request(self.tenant_id,
+ network_id,
+ port_id,
+ interface_id,
+ fmt)
+ put_attachment_res = put_attachment_req.get_response(self.api)
+ self.assertEqual(put_attachment_res.status_int, 204)
+ get_attachment_req = testlib.get_attachment_request(self.tenant_id,
+ network_id,
+ port_id,
+ fmt)
+ get_attachment_res = get_attachment_req.get_response(self.api)
+ attachment_data = self._att_deserializers[content_type].\
+ deserialize(get_attachment_res.body)['body']
+ self.assertEqual(attachment_data['attachment']['id'], interface_id)
+ LOG.debug("_test_show_attachment - fmt:%s - END", fmt)
+
+ def _test_show_attachment_none_set(self, fmt):
+ LOG.debug("_test_show_attachment_none_set - fmt:%s - START", fmt)
+ content_type = "application/%s" % fmt
+ port_state = "ACTIVE"
+ network_id = self._create_network(fmt)
+ port_id = self._create_port(network_id, port_state, fmt)
+ get_attachment_req = testlib.get_attachment_request(self.tenant_id,
+ network_id,
+ port_id,
+ fmt)
+ get_attachment_res = get_attachment_req.get_response(self.api)
+ attachment_data = self._att_deserializers[content_type].\
+ deserialize(get_attachment_res.body)['body']
+ self.assertTrue('id' not in attachment_data['attachment'])
+ LOG.debug("_test_show_attachment_none_set - fmt:%s - END", fmt)
+
+ def _test_show_attachment_networknotfound(self, fmt):
+ LOG.debug("_test_show_attachment_networknotfound - fmt:%s - START",
+ fmt)
+ port_state = "ACTIVE"
+ network_id = self._create_network(fmt)
+ port_id = self._create_port(network_id, port_state, fmt)
+ get_attachment_req = testlib.get_attachment_request(self.tenant_id,
+ "A_BAD_ID",
+ port_id,
+ fmt)
+ get_attachment_res = get_attachment_req.get_response(self.api)
+ self.assertEqual(get_attachment_res.status_int, 420)
+ LOG.debug("_test_show_attachment_networknotfound - fmt:%s - END",
+ fmt)
+
+ def _test_show_attachment_portnotfound(self, fmt):
+ LOG.debug("_test_show_attachment_portnotfound - fmt:%s - START",
+ fmt)
+ port_state = "ACTIVE"
+ network_id = self._create_network(fmt)
+ self._create_port(network_id, port_state, fmt)
+ get_attachment_req = testlib.get_attachment_request(self.tenant_id,
+ network_id,
+ "A_BAD_ID",
+ fmt)
+ get_attachment_res = get_attachment_req.get_response(self.api)
+ self.assertEqual(get_attachment_res.status_int, 430)
+ LOG.debug("_test_show_attachment_portnotfound - fmt:%s - END",
+ fmt)
+
+ def _test_put_attachment(self, fmt):
+ LOG.debug("_test_put_attachment - fmt:%s - START", fmt)
+ port_state = "ACTIVE"
+ network_id = self._create_network(fmt)
+ interface_id = "test_interface"
+ port_id = self._create_port(network_id, port_state, fmt)
+ put_attachment_req = testlib.put_attachment_request(self.tenant_id,
+ network_id,
+ port_id,
+ interface_id,
+ fmt)
+ put_attachment_res = put_attachment_req.get_response(self.api)
+ self.assertEqual(put_attachment_res.status_int, 204)
+ LOG.debug("_test_put_attachment - fmt:%s - END", fmt)
+
+ def _test_put_attachment_networknotfound(self, fmt):
+ LOG.debug("_test_put_attachment_networknotfound - fmt:%s - START",
+ fmt)
+ port_state = 'DOWN'
+ interface_id = "test_interface"
+ network_id = self._create_network(fmt)
+ port_id = self._create_port(network_id, port_state, fmt)
+ put_attachment_req = testlib.put_attachment_request(self.tenant_id,
+ "A_BAD_ID",
+ port_id,
+ interface_id,
+ fmt)
+ put_attachment_res = put_attachment_req.get_response(self.api)
+ self.assertEqual(put_attachment_res.status_int, 420)
+ LOG.debug("_test_put_attachment_networknotfound - fmt:%s - END",
+ fmt)
+
+ def _test_put_attachment_portnotfound(self, fmt):
+ LOG.debug("_test_put_attachment_portnotfound - fmt:%s - START",
+ fmt)
+ port_state = 'DOWN'
+ interface_id = "test_interface"
+ network_id = self._create_network(fmt)
+ self._create_port(network_id, port_state, fmt)
+ put_attachment_req = testlib.put_attachment_request(self.tenant_id,
+ network_id,
+ "A_BAD_ID",
+ interface_id,
+ fmt)
+ put_attachment_res = put_attachment_req.get_response(self.api)
+ self.assertEqual(put_attachment_res.status_int, 430)
+ LOG.debug("_test_put_attachment_portnotfound - fmt:%s - END",
+ fmt)
+
+ def _test_delete_attachment(self, fmt):
+ LOG.debug("_test_delete_attachment - fmt:%s - START", fmt)
+ port_state = "ACTIVE"
+ network_id = self._create_network(fmt)
+ interface_id = "test_interface"
+ port_id = self._create_port(network_id, port_state, fmt)
+ put_attachment_req = testlib.put_attachment_request(self.tenant_id,
+ network_id,
+ port_id,
+ interface_id,
+ fmt)
+ put_attachment_res = put_attachment_req.get_response(self.api)
+ self.assertEqual(put_attachment_res.status_int, 204)
+ del_attachment_req = testlib.delete_attachment_request(self.tenant_id,
+ network_id,
+ port_id,
+ fmt)
+ del_attachment_res = del_attachment_req.get_response(self.api)
+ self.assertEqual(del_attachment_res.status_int, 204)
+ LOG.debug("_test_delete_attachment - fmt:%s - END", fmt)
+
+ def _test_delete_attachment_networknotfound(self, fmt):
+ LOG.debug("_test_delete_attachment_networknotfound -" \
+ " fmt:%s - START", fmt)
+ port_state = "ACTIVE"
+ network_id = self._create_network(fmt)
+ port_id = self._create_port(network_id, port_state, fmt)
+ del_attachment_req = testlib.delete_attachment_request(self.tenant_id,
+ "A_BAD_ID",
+ port_id,
+ fmt)
+ del_attachment_res = del_attachment_req.get_response(self.api)
+ self.assertEqual(del_attachment_res.status_int, 420)
+ LOG.debug("_test_delete_attachment_networknotfound -" \
+ " fmt:%s - END", fmt)
+
+ def _test_delete_attachment_portnotfound(self, fmt):
+ LOG.debug("_test_delete_attachment_portnotfound - " \
+ " fmt:%s - START", fmt)
+ port_state = "ACTIVE"
+ network_id = self._create_network(fmt)
+ self._create_port(network_id, port_state, fmt)
+ del_attachment_req = testlib.delete_attachment_request(self.tenant_id,
+ network_id,
+ "A_BAD_ID",
+ fmt)
+ del_attachment_res = del_attachment_req.get_response(self.api)
+ self.assertEqual(del_attachment_res.status_int, 430)
+ LOG.debug("_test_delete_attachment_portnotfound - " \
+ "fmt:%s - END", fmt)
+
+ def _test_unparsable_data(self, fmt):
+ LOG.debug("_test_unparsable_data - " \
+ " fmt:%s - START", fmt)
+
+ data = "this is not json or xml"
+ method = 'POST'
+ content_type = "application/%s" % fmt
+ tenant_id = self.tenant_id
+ path = "/tenants/%(tenant_id)s/networks.%(fmt)s" % locals()
+ network_req = testlib.create_request(path, data, content_type, method)
+ network_res = network_req.get_response(self.api)
+ self.assertEqual(network_res.status_int, 400)
+
+ LOG.debug("_test_unparsable_data - " \
+ "fmt:%s - END", fmt)
+
+ def setUp(self, api_router_klass, xml_metadata_dict):
+ options = {}
+ options['plugin_provider'] = test_config['plugin_name']
+ api_router_cls = utils.import_class(api_router_klass)
+ self.api = api_router_cls(options)
+ self.tenant_id = "test_tenant"
+ self.network_name = "test_network"
+
+ # Prepare XML & JSON deserializers
+ net_xml_deserializer = XMLDeserializer(xml_metadata_dict[NETS])
+ port_xml_deserializer = XMLDeserializer(xml_metadata_dict[PORTS])
+ att_xml_deserializer = XMLDeserializer(xml_metadata_dict[ATTS])
+
+ json_deserializer = JSONDeserializer()
+
+ self._net_deserializers = {
+ 'application/xml': net_xml_deserializer,
+ 'application/json': json_deserializer,
+ }
+ self._port_deserializers = {
+ 'application/xml': port_xml_deserializer,
+ 'application/json': json_deserializer,
+ }
+ self._att_deserializers = {
+ 'application/xml': att_xml_deserializer,
+ 'application/json': json_deserializer,
+ }
+
+ def tearDown(self):
+ """Clear the test environment"""
+ # Remove database contents
+ db.clear_db()
+
+ def test_list_networks_json(self):
+ self._test_list_networks('json')
+
+ def test_list_networks_xml(self):
+ self._test_list_networks('xml')
+
+ def test_list_networks_detail_json(self):
+ self._test_list_networks_detail('json')
+
+ def test_list_networks_detail_xml(self):
+ self._test_list_networks_detail('xml')
+
+ def test_create_network_json(self):
+ self._test_create_network('json')
+
+ def test_create_network_xml(self):
+ self._test_create_network('xml')
+
+ def test_create_network_badrequest_json(self):
+ self._test_create_network_badrequest('json')
+
+ def test_create_network_badrequest_xml(self):
+ self._test_create_network_badrequest('xml')
+
+ def test_show_network_not_found_json(self):
+ self._test_show_network_not_found('json')
+
+ def test_show_network_not_found_xml(self):
+ self._test_show_network_not_found('xml')
+
+ def test_show_network_json(self):
+ self._test_show_network('json')
+
+ def test_show_network_xml(self):
+ self._test_show_network('xml')
+
+ def test_show_network_detail_json(self):
+ self._test_show_network_detail('json')
+
+ def test_show_network_detail_xml(self):
+ self._test_show_network_detail('xml')
+
+ def test_delete_network_json(self):
+ self._test_delete_network('json')
+
+ def test_delete_network_xml(self):
+ self._test_delete_network('xml')
+
+ def test_rename_network_json(self):
+ self._test_rename_network('json')
+
+ def test_rename_network_xml(self):
+ self._test_rename_network('xml')
+
+ def test_rename_network_badrequest_json(self):
+ self._test_rename_network_badrequest('json')
+
+ def test_rename_network_badrequest_xml(self):
+ self._test_rename_network_badrequest('xml')
+
+ def test_rename_network_not_found_json(self):
+ self._test_rename_network_not_found('json')
+
+ def test_rename_network_not_found_xml(self):
+ self._test_rename_network_not_found('xml')
+
+ def test_delete_network_in_use_json(self):
+ self._test_delete_network_in_use('json')
+
+ def test_delete_network_in_use_xml(self):
+ self._test_delete_network_in_use('xml')
+
+ def test_delete_network_with_unattached_port_xml(self):
+ self._test_delete_network_with_unattached_port('xml')
+
+ def test_delete_network_with_unattached_port_json(self):
+ self._test_delete_network_with_unattached_port('json')
+
+ def test_list_ports_json(self):
+ self._test_list_ports('json')
+
+ def test_list_ports_xml(self):
+ self._test_list_ports('xml')
+
+ def test_list_ports_networknotfound_json(self):
+ self._test_list_ports_networknotfound('json')
+
+ def test_list_ports_networknotfound_xml(self):
+ self._test_list_ports_networknotfound('xml')
+
+ def test_list_ports_detail_json(self):
+ self._test_list_ports_detail('json')
+
+ def test_list_ports_detail_xml(self):
+ self._test_list_ports_detail('xml')
+
+ def test_show_port_json(self):
+ self._test_show_port('json')
+
+ def test_show_port_xml(self):
+ self._test_show_port('xml')
+
+ def test_show_port_detail_json(self):
+ self._test_show_port_detail('json')
+
+ def test_show_port_detail_xml(self):
+ self._test_show_port_detail('xml')
+
+ def test_show_port_networknotfound_json(self):
+ self._test_show_port_networknotfound('json')
+
+ def test_show_port_networknotfound_xml(self):
+ self._test_show_port_networknotfound('xml')
+
+ def test_show_port_portnotfound_json(self):
+ self._test_show_port_portnotfound('json')
+
+ def test_show_port_portnotfound_xml(self):
+ self._test_show_port_portnotfound('xml')
+
+ def test_create_port_json(self):
+ self._test_create_port('json')
+
+ def test_create_port_xml(self):
+ self._test_create_port('xml')
+
+ def test_create_port_noreqbody_json(self):
+ self._test_create_port_noreqbody('json')
+
+ def test_create_port_noreqbody_xml(self):
+ self._test_create_port_noreqbody('xml')
+
+ def test_create_port_networknotfound_json(self):
+ self._test_create_port_networknotfound('json')
+
+ def test_create_port_networknotfound_xml(self):
+ self._test_create_port_networknotfound('xml')
+
+ def test_create_port_badrequest_json(self):
+ self._test_create_port_badrequest('json')
+
+ def test_create_port_badrequest_xml(self):
+ self._test_create_port_badrequest('xml')
+
+ def test_delete_port_xml(self):
+ self._test_delete_port('xml')
+
+ def test_delete_port_json(self):
+ self._test_delete_port('json')
+
+ def test_delete_port_in_use_xml(self):
+ self._test_delete_port_in_use('xml')
+
+ def test_delete_port_in_use_json(self):
+ self._test_delete_port_in_use('json')
+
+ def test_delete_port_networknotfound_xml(self):
+ self._test_delete_port_networknotfound('xml')
+
+ def test_delete_port_networknotfound_json(self):
+ self._test_delete_port_networknotfound('json')
+
+ def test_delete_port_with_bad_id_xml(self):
+ self._test_delete_port_with_bad_id('xml')
+
+ def test_delete_port_with_bad_id_json(self):
+ self._test_delete_port_with_bad_id('json')
+
+ def test_set_port_state_xml(self):
+ self._test_set_port_state('xml')
+
+ def test_set_port_state_json(self):
+ self._test_set_port_state('json')
+
+ def test_set_port_state_networknotfound_xml(self):
+ self._test_set_port_state_networknotfound('xml')
+
+ def test_set_port_state_networknotfound_json(self):
+ self._test_set_port_state_networknotfound('json')
+
+ def test_set_port_state_portnotfound_xml(self):
+ self._test_set_port_state_portnotfound('xml')
+
+ def test_set_port_state_portnotfound_json(self):
+ self._test_set_port_state_portnotfound('json')
+
+ def test_set_port_state_stateinvalid_xml(self):
+ self._test_set_port_state_stateinvalid('xml')
+
+ def test_set_port_state_stateinvalid_json(self):
+ self._test_set_port_state_stateinvalid('json')
+
+ def test_show_attachment_xml(self):
+ self._test_show_attachment('xml')
+
+ def test_show_attachment_json(self):
+ self._test_show_attachment('json')
+
+ def test_show_attachment_none_set_xml(self):
+ self._test_show_attachment_none_set('xml')
+
+ def test_show_attachment_none_set_json(self):
+ self._test_show_attachment_none_set('json')
+
+ def test_show_attachment_networknotfound_xml(self):
+ self._test_show_attachment_networknotfound('xml')
+
+ def test_show_attachment_networknotfound_json(self):
+ self._test_show_attachment_networknotfound('json')
+
+ def test_show_attachment_portnotfound_xml(self):
+ self._test_show_attachment_portnotfound('xml')
+
+ def test_show_attachment_portnotfound_json(self):
+ self._test_show_attachment_portnotfound('json')
+
+ def test_put_attachment_xml(self):
+ self._test_put_attachment('xml')
+
+ def test_put_attachment_json(self):
+ self._test_put_attachment('json')
+
+ def test_put_attachment_networknotfound_xml(self):
+ self._test_put_attachment_networknotfound('xml')
+
+ def test_put_attachment_networknotfound_json(self):
+ self._test_put_attachment_networknotfound('json')
+
+ def test_put_attachment_portnotfound_xml(self):
+ self._test_put_attachment_portnotfound('xml')
+
+ def test_put_attachment_portnotfound_json(self):
+ self._test_put_attachment_portnotfound('json')
+
+ def test_delete_attachment_xml(self):
+ self._test_delete_attachment('xml')
+
+ def test_delete_attachment_json(self):
+ self._test_delete_attachment('json')
+
+ def test_delete_attachment_networknotfound_xml(self):
+ self._test_delete_attachment_networknotfound('xml')
+
+ def test_delete_attachment_networknotfound_json(self):
+ self._test_delete_attachment_networknotfound('json')
+
+ def test_delete_attachment_portnotfound_xml(self):
+ self._test_delete_attachment_portnotfound('xml')
+
+ def test_delete_attachment_portnotfound_json(self):
+ self._test_delete_attachment_portnotfound('json')
+
+ def test_unparsable_data_xml(self):
+ self._test_unparsable_data('xml')
+
+ def test_unparsable_data_json(self):
+ self._test_unparsable_data('json')
self._req = None
plugin = 'quantum.plugins.sample.SamplePlugin.FakePlugin'
options = dict(plugin_provider=plugin)
- self._api = server.APIRouterV1(options)
+ self._api = server.APIRouterV11(options)
def request(self, method, action, body, headers):
# TODO: remove version prefix from action!
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from quantum.common import flags
+
+FLAGS = flags.FLAGS
+
+flags.DEFINE_integer('answer', 42, 'test flag')
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from quantum.common import flags
+
+FLAGS = flags.FLAGS
+
+flags.DEFINE_integer('runtime_answer', 54, 'test flag')
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-# @author: Brad Hall, Nicira Networks
# @author: Salvatore Orlando, Citrix Systems
-import logging
-import unittest
+import quantum.api.attachments as atts
+import quantum.api.networks as nets
+import quantum.api.ports as ports
+import quantum.tests.unit._test_api as test_api
-import quantum.tests.unit.testlib_api as testlib
-
-from quantum import api as server
-from quantum.common.serializer import Serializer
-from quantum.common.test_lib import test_config
-from quantum.db import api as db
-
-LOG = logging.getLogger('quantum.tests.test_api')
-
-
-class APITest(unittest.TestCase):
-
- def _create_network(self, format, name=None, custom_req_body=None,
- expected_res_status=200):
- LOG.debug("Creating network")
- content_type = "application/" + format
- if name:
- net_name = name
- else:
- net_name = self.network_name
- network_req = testlib.new_network_request(self.tenant_id,
- net_name, format,
- custom_req_body)
- network_res = network_req.get_response(self.api)
- self.assertEqual(network_res.status_int, expected_res_status)
- if expected_res_status in (200, 202):
- network_data = Serializer().deserialize(network_res.body,
- content_type)
- return network_data['network']['id']
-
- def _create_port(self, network_id, port_state, format,
- custom_req_body=None, expected_res_status=200):
- LOG.debug("Creating port for network %s", network_id)
- content_type = "application/%s" % format
- port_req = testlib.new_port_request(self.tenant_id, network_id,
- port_state, format,
- custom_req_body)
- port_res = port_req.get_response(self.api)
- self.assertEqual(port_res.status_int, expected_res_status)
- if expected_res_status in (200, 202):
- port_data = Serializer().deserialize(port_res.body, content_type)
- return port_data['port']['id']
-
- def _test_create_network(self, format):
- LOG.debug("_test_create_network - format:%s - START", format)
- content_type = "application/%s" % format
- network_id = self._create_network(format)
- show_network_req = testlib.show_network_request(self.tenant_id,
- network_id,
- format)
- show_network_res = show_network_req.get_response(self.api)
- self.assertEqual(show_network_res.status_int, 200)
- network_data = Serializer().deserialize(show_network_res.body,
- content_type)
- self.assertEqual(network_id, network_data['network']['id'])
- LOG.debug("_test_create_network - format:%s - END", format)
-
- def _test_create_network_badrequest(self, format):
- LOG.debug("_test_create_network_badrequest - format:%s - START",
- format)
- bad_body = {'network': {'bad-attribute': 'very-bad'}}
- self._create_network(format, custom_req_body=bad_body,
- expected_res_status=400)
- LOG.debug("_test_create_network_badrequest - format:%s - END",
- format)
-
- def _test_list_networks(self, format):
- LOG.debug("_test_list_networks - format:%s - START", format)
- content_type = "application/%s" % format
- self._create_network(format, "net_1")
- self._create_network(format, "net_2")
- list_network_req = testlib.network_list_request(self.tenant_id,
- format)
- list_network_res = list_network_req.get_response(self.api)
- self.assertEqual(list_network_res.status_int, 200)
- network_data = self._net_serializer.deserialize(
- list_network_res.body, content_type)
- # Check network count: should return 2
- self.assertEqual(len(network_data['networks']), 2)
- LOG.debug("_test_list_networks - format:%s - END", format)
-
- def _test_list_networks_detail(self, format):
- LOG.debug("_test_list_networks_detail - format:%s - START", format)
- content_type = "application/%s" % format
- self._create_network(format, "net_1")
- self._create_network(format, "net_2")
- list_network_req = testlib.network_list_detail_request(self.tenant_id,
- format)
- list_network_res = list_network_req.get_response(self.api)
- self.assertEqual(list_network_res.status_int, 200)
- network_data = self._net_serializer.deserialize(
- list_network_res.body, content_type)
- # Check network count: should return 2
- self.assertEqual(len(network_data['networks']), 2)
- # Check contents - id & name for each network
- for network in network_data['networks']:
- self.assertTrue('id' in network and 'name' in network)
- self.assertTrue(network['id'] and network['name'])
- LOG.debug("_test_list_networks_detail - format:%s - END", format)
-
- def _test_show_network(self, format):
- LOG.debug("_test_show_network - format:%s - START", format)
- content_type = "application/%s" % format
- network_id = self._create_network(format)
- show_network_req = testlib.show_network_request(self.tenant_id,
- network_id,
- format)
- show_network_res = show_network_req.get_response(self.api)
- self.assertEqual(show_network_res.status_int, 200)
- network_data = self._net_serializer.deserialize(
- show_network_res.body, content_type)
- self.assertEqual({'id': network_id,
- 'name': self.network_name},
- network_data['network'])
- LOG.debug("_test_show_network - format:%s - END", format)
-
- def _test_show_network_detail(self, format):
- LOG.debug("_test_show_network_detail - format:%s - START", format)
- content_type = "application/%s" % format
- # Create a network and a port
- network_id = self._create_network(format)
- port_id = self._create_port(network_id, "ACTIVE", format)
- show_network_req = testlib.show_network_detail_request(
- self.tenant_id, network_id, format)
- show_network_res = show_network_req.get_response(self.api)
- self.assertEqual(show_network_res.status_int, 200)
- network_data = self._net_serializer.deserialize(
- show_network_res.body, content_type)
- self.assertEqual({'id': network_id,
- 'name': self.network_name,
- 'ports': [{'id': port_id,
- 'state': 'ACTIVE'}]},
- network_data['network'])
- LOG.debug("_test_show_network_detail - format:%s - END", format)
-
- def _test_show_network_not_found(self, format):
- LOG.debug("_test_show_network_not_found - format:%s - START", format)
- show_network_req = testlib.show_network_request(self.tenant_id,
- "A_BAD_ID",
- format)
- show_network_res = show_network_req.get_response(self.api)
- self.assertEqual(show_network_res.status_int, 420)
- LOG.debug("_test_show_network_not_found - format:%s - END", format)
-
- def _test_update_network(self, format):
- LOG.debug("_test_update_network - format:%s - START", format)
- content_type = "application/%s" % format
- new_name = 'new_network_name'
- network_id = self._create_network(format)
- update_network_req = testlib.update_network_request(self.tenant_id,
- network_id,
- new_name,
- format)
- update_network_res = update_network_req.get_response(self.api)
- self.assertEqual(update_network_res.status_int, 204)
- show_network_req = testlib.show_network_request(self.tenant_id,
- network_id,
- format)
- show_network_res = show_network_req.get_response(self.api)
- self.assertEqual(show_network_res.status_int, 200)
- network_data = self._net_serializer.deserialize(
- show_network_res.body, content_type)
- self.assertEqual({'id': network_id,
- 'name': new_name},
- network_data['network'])
- LOG.debug("_test_update_network - format:%s - END", format)
-
- def _test_update_network_badrequest(self, format):
- LOG.debug("_test_update_network_badrequest - format:%s - START",
- format)
- network_id = self._create_network(format)
- bad_body = {'network': {'bad-attribute': 'very-bad'}}
- update_network_req = testlib.\
- update_network_request(self.tenant_id,
- network_id, format,
- custom_req_body=bad_body)
- update_network_res = update_network_req.get_response(self.api)
- self.assertEqual(update_network_res.status_int, 400)
- LOG.debug("_test_update_network_badrequest - format:%s - END",
- format)
-
- def _test_update_network_not_found(self, format):
- LOG.debug("_test_update_network_not_found - format:%s - START",
- format)
- new_name = 'new_network_name'
- update_network_req = testlib.update_network_request(self.tenant_id,
- "A BAD ID",
- new_name,
- format)
- update_network_res = update_network_req.get_response(self.api)
- self.assertEqual(update_network_res.status_int, 420)
- LOG.debug("_test_update_network_not_found - format:%s - END",
- format)
-
- def _test_delete_network(self, format):
- LOG.debug("_test_delete_network - format:%s - START", format)
- content_type = "application/%s" % format
- network_id = self._create_network(format)
- LOG.debug("Deleting network %s"\
- " of tenant %s" % (network_id, self.tenant_id))
- delete_network_req = testlib.network_delete_request(self.tenant_id,
- network_id,
- format)
- delete_network_res = delete_network_req.get_response(self.api)
- self.assertEqual(delete_network_res.status_int, 204)
- list_network_req = testlib.network_list_request(self.tenant_id,
- format)
- list_network_res = list_network_req.get_response(self.api)
- network_list_data = self._net_serializer.deserialize(
- list_network_res.body, content_type)
- network_count = len(network_list_data['networks'])
- self.assertEqual(network_count, 0)
- LOG.debug("_test_delete_network - format:%s - END", format)
-
- def _test_delete_network_in_use(self, format):
- LOG.debug("_test_delete_network_in_use - format:%s - START", format)
- content_type = "application/%s" % format
- port_state = "ACTIVE"
- attachment_id = "test_attachment"
- network_id = self._create_network(format)
- LOG.debug("Deleting network %s"\
- " of tenant %s" % (network_id, self.tenant_id))
- port_id = self._create_port(network_id, port_state, format)
- #plug an attachment into the port
- LOG.debug("Putting attachment into port %s", port_id)
- attachment_req = testlib.put_attachment_request(self.tenant_id,
- network_id,
- port_id,
- attachment_id)
- attachment_res = attachment_req.get_response(self.api)
- self.assertEquals(attachment_res.status_int, 204)
-
- LOG.debug("Deleting network %s"\
- " of tenant %s" % (network_id, self.tenant_id))
- delete_network_req = testlib.network_delete_request(self.tenant_id,
- network_id,
- format)
- delete_network_res = delete_network_req.get_response(self.api)
- self.assertEqual(delete_network_res.status_int, 421)
- LOG.debug("_test_delete_network_in_use - format:%s - END", format)
-
- def _test_delete_network_with_unattached_port(self, format):
- LOG.debug("_test_delete_network_with_unattached_port "\
- "- format:%s - START", format)
- content_type = "application/%s" % format
- port_state = "ACTIVE"
- network_id = self._create_network(format)
- LOG.debug("Deleting network %s"\
- " of tenant %s" % (network_id, self.tenant_id))
- port_id = self._create_port(network_id, port_state, format)
-
- LOG.debug("Deleting network %s"\
- " of tenant %s" % (network_id, self.tenant_id))
- delete_network_req = testlib.network_delete_request(self.tenant_id,
- network_id,
- format)
- delete_network_res = delete_network_req.get_response(self.api)
- self.assertEqual(delete_network_res.status_int, 204)
- LOG.debug("_test_delete_network_with_unattached_port "\
- "- format:%s - END", format)
-
- def _test_list_ports(self, format):
- LOG.debug("_test_list_ports - format:%s - START", format)
- content_type = "application/%s" % format
- port_state = "ACTIVE"
- network_id = self._create_network(format)
- self._create_port(network_id, port_state, format)
- self._create_port(network_id, port_state, format)
- list_port_req = testlib.port_list_request(self.tenant_id,
- network_id, format)
- list_port_res = list_port_req.get_response(self.api)
- self.assertEqual(list_port_res.status_int, 200)
- port_data = self._port_serializer.deserialize(
- list_port_res.body, content_type)
- # Check port count: should return 2
- self.assertEqual(len(port_data['ports']), 2)
- LOG.debug("_test_list_ports - format:%s - END", format)
-
- def _test_list_ports_networknotfound(self, format):
- LOG.debug("_test_list_ports_networknotfound"
- " - format:%s - START", format)
- list_port_req = testlib.port_list_request(self.tenant_id,
- "A_BAD_ID", format)
- list_port_res = list_port_req.get_response(self.api)
- self.assertEqual(list_port_res.status_int, 420)
- LOG.debug("_test_list_ports_networknotfound - format:%s - END", format)
-
- def _test_list_ports_detail(self, format):
- LOG.debug("_test_list_ports_detail - format:%s - START", format)
- content_type = "application/%s" % format
- port_state = "ACTIVE"
- network_id = self._create_network(format)
- self._create_port(network_id, port_state, format)
- self._create_port(network_id, port_state, format)
- list_port_req = testlib.port_list_detail_request(self.tenant_id,
- network_id, format)
- list_port_res = list_port_req.get_response(self.api)
- self.assertEqual(list_port_res.status_int, 200)
- port_data = self._port_serializer.deserialize(
- list_port_res.body, content_type)
- # Check port count: should return 2
- self.assertEqual(len(port_data['ports']), 2)
- # Check contents - id & name for each network
- for port in port_data['ports']:
- self.assertTrue('id' in port and 'state' in port)
- self.assertTrue(port['id'] and port['state'])
- LOG.debug("_test_list_ports_detail - format:%s - END", format)
-
- def _test_show_port(self, format):
- LOG.debug("_test_show_port - format:%s - START", format)
- content_type = "application/%s" % format
- port_state = "ACTIVE"
- network_id = self._create_network(format)
- port_id = self._create_port(network_id, port_state, format)
- show_port_req = testlib.show_port_request(self.tenant_id,
- network_id, port_id,
- format)
- show_port_res = show_port_req.get_response(self.api)
- self.assertEqual(show_port_res.status_int, 200)
- port_data = self._port_serializer.deserialize(
- show_port_res.body, content_type)
- self.assertEqual({'id': port_id, 'state': port_state},
- port_data['port'])
- LOG.debug("_test_show_port - format:%s - END", format)
-
- def _test_show_port_detail(self, format):
- LOG.debug("_test_show_port - format:%s - START", format)
- content_type = "application/%s" % format
- port_state = "ACTIVE"
- network_id = self._create_network(format)
- port_id = self._create_port(network_id, port_state, format)
-
- # Part 1 - no attachment
- show_port_req = testlib.show_port_detail_request(self.tenant_id,
- network_id, port_id, format)
- show_port_res = show_port_req.get_response(self.api)
- self.assertEqual(show_port_res.status_int, 200)
- port_data = self._port_serializer.deserialize(
- show_port_res.body, content_type)
- self.assertEqual({'id': port_id, 'state': port_state},
- port_data['port'])
-
- # Part 2 - plug attachment into port
- interface_id = "test_interface"
- put_attachment_req = testlib.put_attachment_request(self.tenant_id,
- network_id,
- port_id,
- interface_id,
- format)
- put_attachment_res = put_attachment_req.get_response(self.api)
- self.assertEqual(put_attachment_res.status_int, 204)
- show_port_req = testlib.show_port_detail_request(self.tenant_id,
- network_id, port_id, format)
- show_port_res = show_port_req.get_response(self.api)
- self.assertEqual(show_port_res.status_int, 200)
- port_data = self._port_serializer.deserialize(
- show_port_res.body, content_type)
- self.assertEqual({'id': port_id, 'state': port_state,
- 'attachment': {'id': interface_id}},
- port_data['port'])
-
- LOG.debug("_test_show_port_detail - format:%s - END", format)
-
- def _test_show_port_networknotfound(self, format):
- LOG.debug("_test_show_port_networknotfound - format:%s - START",
- format)
- port_state = "ACTIVE"
- network_id = self._create_network(format)
- port_id = self._create_port(network_id, port_state, format)
- show_port_req = testlib.show_port_request(self.tenant_id,
- "A_BAD_ID", port_id,
- format)
- show_port_res = show_port_req.get_response(self.api)
- self.assertEqual(show_port_res.status_int, 420)
- LOG.debug("_test_show_port_networknotfound - format:%s - END",
- format)
-
- def _test_show_port_portnotfound(self, format):
- LOG.debug("_test_show_port_portnotfound - format:%s - START", format)
- network_id = self._create_network(format)
- show_port_req = testlib.show_port_request(self.tenant_id,
- network_id,
- "A_BAD_ID",
- format)
- show_port_res = show_port_req.get_response(self.api)
- self.assertEqual(show_port_res.status_int, 430)
- LOG.debug("_test_show_port_portnotfound - format:%s - END", format)
-
- def _test_create_port_noreqbody(self, format):
- LOG.debug("_test_create_port_noreqbody - format:%s - START", format)
- content_type = "application/%s" % format
- network_id = self._create_network(format)
- port_id = self._create_port(network_id, None, format,
- custom_req_body='')
- show_port_req = testlib.show_port_request(self.tenant_id,
- network_id, port_id, format)
- show_port_res = show_port_req.get_response(self.api)
- self.assertEqual(show_port_res.status_int, 200)
- port_data = self._port_serializer.deserialize(
- show_port_res.body, content_type)
- self.assertEqual(port_id, port_data['port']['id'])
- LOG.debug("_test_create_port_noreqbody - format:%s - END", format)
-
- def _test_create_port(self, format):
- LOG.debug("_test_create_port - format:%s - START", format)
- content_type = "application/%s" % format
- port_state = "ACTIVE"
- network_id = self._create_network(format)
- port_id = self._create_port(network_id, port_state, format)
- show_port_req = testlib.show_port_request(self.tenant_id,
- network_id, port_id, format)
- show_port_res = show_port_req.get_response(self.api)
- self.assertEqual(show_port_res.status_int, 200)
- port_data = self._port_serializer.deserialize(
- show_port_res.body, content_type)
- self.assertEqual(port_id, port_data['port']['id'])
- LOG.debug("_test_create_port - format:%s - END", format)
-
- def _test_create_port_networknotfound(self, format):
- LOG.debug("_test_create_port_networknotfound - format:%s - START",
- format)
- port_state = "ACTIVE"
- self._create_port("A_BAD_ID", port_state, format,
- expected_res_status=420)
- LOG.debug("_test_create_port_networknotfound - format:%s - END",
- format)
-
- def _test_create_port_badrequest(self, format):
- LOG.debug("_test_create_port_badrequest - format:%s - START", format)
- bad_body = {'bad-resource': {'bad-attribute': 'bad-value'}}
- network_id = self._create_network(format)
- port_state = "ACTIVE"
- self._create_port(network_id, port_state, format,
- custom_req_body=bad_body, expected_res_status=400)
- LOG.debug("_test_create_port_badrequest - format:%s - END", format)
-
- def _test_delete_port(self, format):
- LOG.debug("_test_delete_port - format:%s - START", format)
- content_type = "application/%s" % format
- port_state = "ACTIVE"
- network_id = self._create_network(format)
- port_id = self._create_port(network_id, port_state, format)
- LOG.debug("Deleting port %s for network %s"\
- " of tenant %s" % (port_id, network_id,
- self.tenant_id))
- delete_port_req = testlib.port_delete_request(self.tenant_id,
- network_id, port_id,
- format)
- delete_port_res = delete_port_req.get_response(self.api)
- self.assertEqual(delete_port_res.status_int, 204)
- list_port_req = testlib.port_list_request(self.tenant_id, network_id,
- format)
- list_port_res = list_port_req.get_response(self.api)
- port_list_data = self._port_serializer.deserialize(
- list_port_res.body, content_type)
- port_count = len(port_list_data['ports'])
- self.assertEqual(port_count, 0)
- LOG.debug("_test_delete_port - format:%s - END", format)
-
- def _test_delete_port_in_use(self, format):
- LOG.debug("_test_delete_port_in_use - format:%s - START", format)
- content_type = "application/" + format
- port_state = "ACTIVE"
- attachment_id = "test_attachment"
- network_id = self._create_network(format)
- port_id = self._create_port(network_id, port_state, format)
- #plug an attachment into the port
- LOG.debug("Putting attachment into port %s", port_id)
- attachment_req = testlib.put_attachment_request(self.tenant_id,
- network_id,
- port_id,
- attachment_id)
- attachment_res = attachment_req.get_response(self.api)
- self.assertEquals(attachment_res.status_int, 204)
- LOG.debug("Deleting port %s for network %s"\
- " of tenant %s" % (port_id, network_id,
- self.tenant_id))
- delete_port_req = testlib.port_delete_request(self.tenant_id,
- network_id, port_id,
- format)
- delete_port_res = delete_port_req.get_response(self.api)
- self.assertEqual(delete_port_res.status_int, 432)
- LOG.debug("_test_delete_port_in_use - format:%s - END", format)
-
- def _test_delete_port_with_bad_id(self, format):
- LOG.debug("_test_delete_port_with_bad_id - format:%s - START",
- format)
- port_state = "ACTIVE"
- network_id = self._create_network(format)
- self._create_port(network_id, port_state, format)
- # Test for portnotfound
- delete_port_req = testlib.port_delete_request(self.tenant_id,
- network_id, "A_BAD_ID",
- format)
- delete_port_res = delete_port_req.get_response(self.api)
- self.assertEqual(delete_port_res.status_int, 430)
- LOG.debug("_test_delete_port_with_bad_id - format:%s - END", format)
-
- def _test_delete_port_networknotfound(self, format):
- LOG.debug("_test_delete_port_networknotfound - format:%s - START",
- format)
- port_state = "ACTIVE"
- network_id = self._create_network(format)
- port_id = self._create_port(network_id, port_state, format)
- delete_port_req = testlib.port_delete_request(self.tenant_id,
- "A_BAD_ID", port_id,
- format)
- delete_port_res = delete_port_req.get_response(self.api)
- self.assertEqual(delete_port_res.status_int, 420)
- LOG.debug("_test_delete_port_networknotfound - format:%s - END",
- format)
-
- def _test_set_port_state(self, format):
- LOG.debug("_test_set_port_state - format:%s - START", format)
- content_type = "application/%s" % format
- port_state = 'DOWN'
- new_port_state = 'ACTIVE'
- network_id = self._create_network(format)
- port_id = self._create_port(network_id, port_state, format)
- update_port_req = testlib.update_port_request(self.tenant_id,
- network_id, port_id,
- new_port_state,
- format)
- update_port_res = update_port_req.get_response(self.api)
- self.assertEqual(update_port_res.status_int, 204)
- show_port_req = testlib.show_port_request(self.tenant_id,
- network_id, port_id,
- format)
- show_port_res = show_port_req.get_response(self.api)
- self.assertEqual(show_port_res.status_int, 200)
- port_data = self._port_serializer.deserialize(
- show_port_res.body, content_type)
- self.assertEqual({'id': port_id, 'state': new_port_state},
- port_data['port'])
- # now set it back to the original value
- update_port_req = testlib.update_port_request(self.tenant_id,
- network_id, port_id,
- port_state,
- format)
- update_port_res = update_port_req.get_response(self.api)
- self.assertEqual(update_port_res.status_int, 204)
- show_port_req = testlib.show_port_request(self.tenant_id,
- network_id, port_id,
- format)
- show_port_res = show_port_req.get_response(self.api)
- self.assertEqual(show_port_res.status_int, 200)
- port_data = self._port_serializer.deserialize(
- show_port_res.body, content_type)
- self.assertEqual({'id': port_id, 'state': port_state},
- port_data['port'])
- LOG.debug("_test_set_port_state - format:%s - END", format)
-
- def _test_set_port_state_networknotfound(self, format):
- LOG.debug("_test_set_port_state_networknotfound - format:%s - START",
- format)
- port_state = 'DOWN'
- new_port_state = 'ACTIVE'
- network_id = self._create_network(format)
- port_id = self._create_port(network_id, port_state, format)
- update_port_req = testlib.update_port_request(self.tenant_id,
- "A_BAD_ID", port_id,
- new_port_state,
- format)
- update_port_res = update_port_req.get_response(self.api)
- self.assertEqual(update_port_res.status_int, 420)
- LOG.debug("_test_set_port_state_networknotfound - format:%s - END",
- format)
-
- def _test_set_port_state_portnotfound(self, format):
- LOG.debug("_test_set_port_state_portnotfound - format:%s - START",
- format)
- port_state = 'DOWN'
- new_port_state = 'ACTIVE'
- network_id = self._create_network(format)
- self._create_port(network_id, port_state, format)
- update_port_req = testlib.update_port_request(self.tenant_id,
- network_id,
- "A_BAD_ID",
- new_port_state,
- format)
- update_port_res = update_port_req.get_response(self.api)
- self.assertEqual(update_port_res.status_int, 430)
- LOG.debug("_test_set_port_state_portnotfound - format:%s - END",
- format)
-
- def _test_set_port_state_stateinvalid(self, format):
- LOG.debug("_test_set_port_state_stateinvalid - format:%s - START",
- format)
- port_state = 'DOWN'
- new_port_state = 'A_BAD_STATE'
- network_id = self._create_network(format)
- port_id = self._create_port(network_id, port_state, format)
- update_port_req = testlib.update_port_request(self.tenant_id,
- network_id, port_id,
- new_port_state,
- format)
- update_port_res = update_port_req.get_response(self.api)
- self.assertEqual(update_port_res.status_int, 431)
- LOG.debug("_test_set_port_state_stateinvalid - format:%s - END",
- format)
-
- def _test_show_attachment(self, format):
- LOG.debug("_test_show_attachment - format:%s - START", format)
- content_type = "application/%s" % format
- port_state = "ACTIVE"
- network_id = self._create_network(format)
- interface_id = "test_interface"
- port_id = self._create_port(network_id, port_state, format)
- put_attachment_req = testlib.put_attachment_request(self.tenant_id,
- network_id,
- port_id,
- interface_id,
- format)
- put_attachment_res = put_attachment_req.get_response(self.api)
- self.assertEqual(put_attachment_res.status_int, 204)
- get_attachment_req = testlib.get_attachment_request(self.tenant_id,
- network_id,
- port_id,
- format)
- get_attachment_res = get_attachment_req.get_response(self.api)
- attachment_data = Serializer().deserialize(get_attachment_res.body,
- content_type)
- self.assertEqual(attachment_data['attachment']['id'], interface_id)
- LOG.debug("_test_show_attachment - format:%s - END", format)
-
- def _test_show_attachment_none_set(self, format):
- LOG.debug("_test_show_attachment_none_set - format:%s - START", format)
- content_type = "application/%s" % format
- port_state = "ACTIVE"
- network_id = self._create_network(format)
- port_id = self._create_port(network_id, port_state, format)
- get_attachment_req = testlib.get_attachment_request(self.tenant_id,
- network_id,
- port_id,
- format)
- get_attachment_res = get_attachment_req.get_response(self.api)
- attachment_data = Serializer().deserialize(get_attachment_res.body,
- content_type)
- self.assertTrue('id' not in attachment_data['attachment'])
- LOG.debug("_test_show_attachment_none_set - format:%s - END", format)
-
- def _test_show_attachment_networknotfound(self, format):
- LOG.debug("_test_show_attachment_networknotfound - format:%s - START",
- format)
- port_state = "ACTIVE"
- network_id = self._create_network(format)
- port_id = self._create_port(network_id, port_state, format)
- get_attachment_req = testlib.get_attachment_request(self.tenant_id,
- "A_BAD_ID",
- port_id,
- format)
- get_attachment_res = get_attachment_req.get_response(self.api)
- self.assertEqual(get_attachment_res.status_int, 420)
- LOG.debug("_test_show_attachment_networknotfound - format:%s - END",
- format)
-
- def _test_show_attachment_portnotfound(self, format):
- LOG.debug("_test_show_attachment_portnotfound - format:%s - START",
- format)
- port_state = "ACTIVE"
- network_id = self._create_network(format)
- self._create_port(network_id, port_state, format)
- get_attachment_req = testlib.get_attachment_request(self.tenant_id,
- network_id,
- "A_BAD_ID",
- format)
- get_attachment_res = get_attachment_req.get_response(self.api)
- self.assertEqual(get_attachment_res.status_int, 430)
- LOG.debug("_test_show_attachment_portnotfound - format:%s - END",
- format)
-
- def _test_put_attachment(self, format):
- LOG.debug("_test_put_attachment - format:%s - START", format)
- port_state = "ACTIVE"
- network_id = self._create_network(format)
- interface_id = "test_interface"
- port_id = self._create_port(network_id, port_state, format)
- put_attachment_req = testlib.put_attachment_request(self.tenant_id,
- network_id,
- port_id,
- interface_id,
- format)
- put_attachment_res = put_attachment_req.get_response(self.api)
- self.assertEqual(put_attachment_res.status_int, 204)
- LOG.debug("_test_put_attachment - format:%s - END", format)
-
- def _test_put_attachment_networknotfound(self, format):
- LOG.debug("_test_put_attachment_networknotfound - format:%s - START",
- format)
- port_state = 'DOWN'
- interface_id = "test_interface"
- network_id = self._create_network(format)
- port_id = self._create_port(network_id, port_state, format)
- put_attachment_req = testlib.put_attachment_request(self.tenant_id,
- "A_BAD_ID",
- port_id,
- interface_id,
- format)
- put_attachment_res = put_attachment_req.get_response(self.api)
- self.assertEqual(put_attachment_res.status_int, 420)
- LOG.debug("_test_put_attachment_networknotfound - format:%s - END",
- format)
-
- def _test_put_attachment_portnotfound(self, format):
- LOG.debug("_test_put_attachment_portnotfound - format:%s - START",
- format)
- port_state = 'DOWN'
- interface_id = "test_interface"
- network_id = self._create_network(format)
- self._create_port(network_id, port_state, format)
- put_attachment_req = testlib.put_attachment_request(self.tenant_id,
- network_id,
- "A_BAD_ID",
- interface_id,
- format)
- put_attachment_res = put_attachment_req.get_response(self.api)
- self.assertEqual(put_attachment_res.status_int, 430)
- LOG.debug("_test_put_attachment_portnotfound - format:%s - END",
- format)
-
- def _test_delete_attachment(self, format):
- LOG.debug("_test_delete_attachment - format:%s - START", format)
- port_state = "ACTIVE"
- network_id = self._create_network(format)
- interface_id = "test_interface"
- port_id = self._create_port(network_id, port_state, format)
- put_attachment_req = testlib.put_attachment_request(self.tenant_id,
- network_id,
- port_id,
- interface_id,
- format)
- put_attachment_res = put_attachment_req.get_response(self.api)
- self.assertEqual(put_attachment_res.status_int, 204)
- del_attachment_req = testlib.delete_attachment_request(self.tenant_id,
- network_id,
- port_id,
- format)
- del_attachment_res = del_attachment_req.get_response(self.api)
- self.assertEqual(del_attachment_res.status_int, 204)
- LOG.debug("_test_delete_attachment - format:%s - END", format)
-
- def _test_delete_attachment_networknotfound(self, format):
- LOG.debug("_test_delete_attachment_networknotfound -" \
- " format:%s - START", format)
- port_state = "ACTIVE"
- network_id = self._create_network(format)
- port_id = self._create_port(network_id, port_state, format)
- del_attachment_req = testlib.delete_attachment_request(self.tenant_id,
- "A_BAD_ID",
- port_id,
- format)
- del_attachment_res = del_attachment_req.get_response(self.api)
- self.assertEqual(del_attachment_res.status_int, 420)
- LOG.debug("_test_delete_attachment_networknotfound -" \
- " format:%s - END", format)
-
- def _test_delete_attachment_portnotfound(self, format):
- LOG.debug("_test_delete_attachment_portnotfound - " \
- " format:%s - START", format)
- port_state = "ACTIVE"
- network_id = self._create_network(format)
- self._create_port(network_id, port_state, format)
- del_attachment_req = testlib.delete_attachment_request(self.tenant_id,
- network_id,
- "A_BAD_ID",
- format)
- del_attachment_res = del_attachment_req.get_response(self.api)
- self.assertEqual(del_attachment_res.status_int, 430)
- LOG.debug("_test_delete_attachment_portnotfound - " \
- "format:%s - END", format)
-
- def _test_unparsable_data(self, format):
- LOG.debug("_test_unparsable_data - " \
- " format:%s - START", format)
-
- data = "this is not json or xml"
- method = 'POST'
- content_type = "application/%s" % format
- tenant_id = self.tenant_id
- path = "/tenants/%(tenant_id)s/networks.%(format)s" % locals()
- network_req = testlib.create_request(path, data, content_type, method)
- network_res = network_req.get_response(self.api)
- self.assertEqual(network_res.status_int, 400)
-
- LOG.debug("_test_unparsable_data - " \
- "format:%s - END", format)
+class APITestV10(test_api.AbstractAPITest):
def setUp(self):
- options = {}
- options['plugin_provider'] = test_config['plugin_name']
- self.api = server.APIRouterV1(options)
- self.tenant_id = "test_tenant"
- self.network_name = "test_network"
- self._net_serializer = \
- Serializer(server.networks.Controller._serialization_metadata)
- self._port_serializer = \
- Serializer(server.ports.Controller._serialization_metadata)
-
- def tearDown(self):
- """Clear the test environment"""
- # Remove database contents
- db.clear_db()
-
- def test_list_networks_json(self):
- self._test_list_networks('json')
-
- def test_list_networks_xml(self):
- self._test_list_networks('xml')
-
- def test_list_networks_detail_json(self):
- self._test_list_networks_detail('json')
-
- def test_list_networks_detail_xml(self):
- self._test_list_networks_detail('xml')
-
- def test_create_network_json(self):
- self._test_create_network('json')
-
- def test_create_network_xml(self):
- self._test_create_network('xml')
-
- def test_create_network_badrequest_json(self):
- self._test_create_network_badrequest('json')
-
- def test_create_network_badreqyest_xml(self):
- self._test_create_network_badrequest('xml')
-
- def test_show_network_not_found_json(self):
- self._test_show_network_not_found('json')
-
- def test_show_network_not_found_xml(self):
- self._test_show_network_not_found('xml')
-
- def test_show_network_json(self):
- self._test_show_network('json')
-
- def test_show_network_xml(self):
- self._test_show_network('xml')
-
- def test_show_network_detail_json(self):
- self._test_show_network_detail('json')
-
- def test_show_network_detail_xml(self):
- self._test_show_network_detail('xml')
+ super(APITestV10, self).setUp('quantum.api.APIRouterV10',
+ {test_api.NETS: nets.ControllerV10._serialization_metadata,
+ test_api.PORTS: ports.ControllerV10._serialization_metadata,
+ test_api.ATTS: atts.ControllerV10._serialization_metadata})
- def test_delete_network_json(self):
- self._test_delete_network('json')
- def test_delete_network_xml(self):
- self._test_delete_network('xml')
+class APITestV11(test_api.AbstractAPITest):
- def test_update_network_json(self):
- self._test_update_network('json')
-
- def test_update_network_xml(self):
- self._test_update_network('xml')
-
- def test_update_network_badrequest_json(self):
- self._test_update_network_badrequest('json')
-
- def test_update_network_badrequest_xml(self):
- self._test_update_network_badrequest('xml')
-
- def test_update_network_not_found_json(self):
- self._test_update_network_not_found('json')
-
- def test_update_network_not_found_xml(self):
- self._test_update_network_not_found('xml')
-
- def test_delete_network_in_use_json(self):
- self._test_delete_network_in_use('json')
-
- def test_delete_network_in_use_xml(self):
- self._test_delete_network_in_use('xml')
-
- def test_delete_network_with_unattached_port_xml(self):
- self._test_delete_network_with_unattached_port('xml')
-
- def test_delete_network_with_unattached_port_json(self):
- self._test_delete_network_with_unattached_port('json')
-
- def test_list_ports_json(self):
- self._test_list_ports('json')
-
- def test_list_ports_xml(self):
- self._test_list_ports('xml')
-
- def test_list_ports_networknotfound_json(self):
- self._test_list_ports_networknotfound('json')
-
- def test_list_ports_networknotfound_xml(self):
- self._test_list_ports_networknotfound('xml')
-
- def test_list_ports_detail_json(self):
- self._test_list_ports_detail('json')
-
- def test_list_ports_detail_xml(self):
- self._test_list_ports_detail('xml')
-
- def test_show_port_json(self):
- self._test_show_port('json')
-
- def test_show_port_xml(self):
- self._test_show_port('xml')
-
- def test_show_port_detail_json(self):
- self._test_show_port_detail('json')
-
- def test_show_port_detail_xml(self):
- self._test_show_port_detail('xml')
-
- def test_show_port_networknotfound_json(self):
- self._test_show_port_networknotfound('json')
-
- def test_show_port_networknotfound_xml(self):
- self._test_show_port_networknotfound('xml')
-
- def test_show_port_portnotfound_json(self):
- self._test_show_port_portnotfound('json')
-
- def test_show_port_portnotfound_xml(self):
- self._test_show_port_portnotfound('xml')
-
- def test_create_port_json(self):
- self._test_create_port('json')
-
- def test_create_port_xml(self):
- self._test_create_port('xml')
-
- def test_create_port_noreqbody_json(self):
- self._test_create_port_noreqbody('json')
-
- def test_create_port_noreqbody_xml(self):
- self._test_create_port_noreqbody('xml')
-
- def test_create_port_networknotfound_json(self):
- self._test_create_port_networknotfound('json')
-
- def test_create_port_networknotfound_xml(self):
- self._test_create_port_networknotfound('xml')
-
- def test_create_port_badrequest_json(self):
- self._test_create_port_badrequest('json')
-
- def test_create_port_badrequest_xml(self):
- self._test_create_port_badrequest('xml')
-
- def test_delete_port_xml(self):
- self._test_delete_port('xml')
-
- def test_delete_port_json(self):
- self._test_delete_port('json')
-
- def test_delete_port_in_use_xml(self):
- self._test_delete_port_in_use('xml')
-
- def test_delete_port_in_use_json(self):
- self._test_delete_port_in_use('json')
-
- def test_delete_port_networknotfound_xml(self):
- self._test_delete_port_networknotfound('xml')
-
- def test_delete_port_networknotfound_json(self):
- self._test_delete_port_networknotfound('json')
-
- def test_delete_port_with_bad_id_xml(self):
- self._test_delete_port_with_bad_id('xml')
-
- def test_delete_port_with_bad_id_json(self):
- self._test_delete_port_with_bad_id('json')
-
- def test_set_port_state_xml(self):
- self._test_set_port_state('xml')
-
- def test_set_port_state_json(self):
- self._test_set_port_state('json')
-
- def test_set_port_state_networknotfound_xml(self):
- self._test_set_port_state_networknotfound('xml')
-
- def test_set_port_state_networknotfound_json(self):
- self._test_set_port_state_networknotfound('json')
-
- def test_set_port_state_portnotfound_xml(self):
- self._test_set_port_state_portnotfound('xml')
-
- def test_set_port_state_portnotfound_json(self):
- self._test_set_port_state_portnotfound('json')
-
- def test_set_port_state_stateinvalid_xml(self):
- self._test_set_port_state_stateinvalid('xml')
-
- def test_set_port_state_stateinvalid_json(self):
- self._test_set_port_state_stateinvalid('json')
-
- def test_show_attachment_xml(self):
- self._test_show_attachment('xml')
-
- def test_show_attachment_json(self):
- self._test_show_attachment('json')
-
- def test_show_attachment_none_set_xml(self):
- self._test_show_attachment_none_set('xml')
-
- def test_show_attachment_none_set_json(self):
- self._test_show_attachment_none_set('json')
-
- def test_show_attachment_networknotfound_xml(self):
- self._test_show_attachment_networknotfound('xml')
-
- def test_show_attachment_networknotfound_json(self):
- self._test_show_attachment_networknotfound('json')
-
- def test_show_attachment_portnotfound_xml(self):
- self._test_show_attachment_portnotfound('xml')
-
- def test_show_attachment_portnotfound_json(self):
- self._test_show_attachment_portnotfound('json')
-
- def test_put_attachment_xml(self):
- self._test_put_attachment('xml')
-
- def test_put_attachment_json(self):
- self._test_put_attachment('json')
-
- def test_put_attachment_networknotfound_xml(self):
- self._test_put_attachment_networknotfound('xml')
-
- def test_put_attachment_networknotfound_json(self):
- self._test_put_attachment_networknotfound('json')
-
- def test_put_attachment_portnotfound_xml(self):
- self._test_put_attachment_portnotfound('xml')
-
- def test_put_attachment_portnotfound_json(self):
- self._test_put_attachment_portnotfound('json')
-
- def test_delete_attachment_xml(self):
- self._test_delete_attachment('xml')
-
- def test_delete_attachment_json(self):
- self._test_delete_attachment('json')
-
- def test_delete_attachment_networknotfound_xml(self):
- self._test_delete_attachment_networknotfound('xml')
-
- def test_delete_attachment_networknotfound_json(self):
- self._test_delete_attachment_networknotfound('json')
-
- def test_delete_attachment_portnotfound_xml(self):
- self._test_delete_attachment_portnotfound('xml')
-
- def test_delete_attachment_portnotfound_json(self):
- self._test_delete_attachment_portnotfound('json')
-
- def test_unparsable_data_xml(self):
- self._test_unparsable_data('xml')
-
- def test_unparsable_data_json(self):
- self._test_unparsable_data('json')
+ def setUp(self):
+ super(APITestV11, self).setUp('quantum.api.APIRouterV11',
+ {test_api.NETS: nets.ControllerV11._serialization_metadata,
+ test_api.PORTS: ports.ControllerV11._serialization_metadata,
+ test_api.ATTS: atts.ControllerV11._serialization_metadata})
options = {}
options['plugin_provider'] = \
'quantum.plugins.sample.SamplePlugin.FakePlugin'
- self.api = server.APIRouterV1(options)
+ #TODO: make the version of the API router configurable
+ self.api = server.APIRouterV11(options)
self.tenant_id = "test_tenant"
self.network_name_1 = "test_network_1"
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import gflags
+import os
+import tempfile
+import unittest
+
+from quantum.common import flags
+
+FLAGS = flags.FLAGS
+flags.DEFINE_string('flags_unittest', 'foo', 'for testing purposes only')
+
+
+class FlagsTestCase(unittest.TestCase):
+
+ def flags(self, **kw):
+ """Override flag variables for a test."""
+ for k, v in kw.iteritems():
+ setattr(FLAGS, k, v)
+
+ def reset_flags(self):
+ """Resets all flag variables for the test.
+
+ Runs after each test.
+
+ """
+ FLAGS.Reset()
+ for k, v in self._original_flags.iteritems():
+ setattr(FLAGS, k, v)
+
+ def setUp(self):
+ super(FlagsTestCase, self).setUp()
+ self.FLAGS = flags.FlagValues()
+ self.global_FLAGS = flags.FLAGS
+ self._original_flags = FLAGS.FlagValuesDict()
+
+ def test_define(self):
+ self.assert_('string' not in self.FLAGS)
+ self.assert_('int' not in self.FLAGS)
+ self.assert_('false' not in self.FLAGS)
+ self.assert_('true' not in self.FLAGS)
+
+ flags.DEFINE_string('string', 'default', 'desc',
+ flag_values=self.FLAGS)
+ flags.DEFINE_integer('int', 1, 'desc', flag_values=self.FLAGS)
+ flags.DEFINE_bool('false', False, 'desc', flag_values=self.FLAGS)
+ flags.DEFINE_bool('true', True, 'desc', flag_values=self.FLAGS)
+
+ self.assert_(self.FLAGS['string'])
+ self.assert_(self.FLAGS['int'])
+ self.assert_(self.FLAGS['false'])
+ self.assert_(self.FLAGS['true'])
+ self.assertEqual(self.FLAGS.string, 'default')
+ self.assertEqual(self.FLAGS.int, 1)
+ self.assertEqual(self.FLAGS.false, False)
+ self.assertEqual(self.FLAGS.true, True)
+
+ argv = ['flags_test',
+ '--string', 'foo',
+ '--int', '2',
+ '--false',
+ '--notrue']
+
+ self.FLAGS(argv)
+ self.assertEqual(self.FLAGS.string, 'foo')
+ self.assertEqual(self.FLAGS.int, 2)
+ self.assertEqual(self.FLAGS.false, True)
+ self.assertEqual(self.FLAGS.true, False)
+
+ def test_define_float(self):
+ flags.DEFINE_float('float', 6.66, 'desc', flag_values=self.FLAGS)
+ self.assertEqual(self.FLAGS.float, 6.66)
+
+ def test_define_multistring(self):
+ flags.DEFINE_multistring('multi', [], 'desc', flag_values=self.FLAGS)
+
+ argv = ['flags_test', '--multi', 'foo', '--multi', 'bar']
+ self.FLAGS(argv)
+
+ self.assertEqual(self.FLAGS.multi, ['foo', 'bar'])
+
+ def test_define_list(self):
+ flags.DEFINE_list('list', ['foo'], 'desc', flag_values=self.FLAGS)
+
+ self.assert_(self.FLAGS['list'])
+ self.assertEqual(self.FLAGS.list, ['foo'])
+
+ argv = ['flags_test', '--list=a,b,c,d']
+ self.FLAGS(argv)
+
+ self.assertEqual(self.FLAGS.list, ['a', 'b', 'c', 'd'])
+
+ def test_error(self):
+ flags.DEFINE_integer('error', 1, 'desc', flag_values=self.FLAGS)
+
+ self.assertEqual(self.FLAGS.error, 1)
+
+ argv = ['flags_test', '--error=foo']
+ self.assertRaises(gflags.IllegalFlagValue, self.FLAGS, argv)
+
+ def test_declare(self):
+ self.assert_('answer' not in self.global_FLAGS)
+ flags.DECLARE('answer', 'quantum.tests.unit.declare_flags')
+ self.assert_('answer' in self.global_FLAGS)
+ self.assertEqual(self.global_FLAGS.answer, 42)
+
+ # Make sure we don't overwrite anything
+ self.global_FLAGS.answer = 256
+ self.assertEqual(self.global_FLAGS.answer, 256)
+ flags.DECLARE('answer', 'quantum.tests.unit.declare_flags')
+ self.assertEqual(self.global_FLAGS.answer, 256)
+
+ def test_getopt_non_interspersed_args(self):
+ self.assert_('runtime_answer' not in self.global_FLAGS)
+
+ argv = ['flags_test', 'extra_arg', '--runtime_answer=60']
+ args = self.global_FLAGS(argv)
+ self.assertEqual(len(args), 3)
+ self.assertEqual(argv, args)
+
+ def test_runtime_and_unknown_flags(self):
+ self.assert_('runtime_answer' not in self.global_FLAGS)
+
+ argv = ['flags_test', '--runtime_answer=60', 'extra_arg']
+ args = self.global_FLAGS(argv)
+ self.assertEqual(len(args), 2)
+ self.assertEqual(args[1], 'extra_arg')
+
+ self.assert_('runtime_answer' not in self.global_FLAGS)
+
+ import quantum.tests.unit.runtime_flags
+
+ self.assert_('runtime_answer' in self.global_FLAGS)
+ self.assertEqual(self.global_FLAGS.runtime_answer, 60)
+
+ def test_flag_overrides(self):
+ self.assertEqual(FLAGS.flags_unittest, 'foo')
+ self.flags(flags_unittest='bar')
+ self.assertEqual(FLAGS.flags_unittest, 'bar')
+ self.assertEqual(FLAGS['flags_unittest'].value, 'bar')
+ self.assertEqual(FLAGS.FlagValuesDict()['flags_unittest'], 'bar')
+ self.reset_flags()
+ self.assertEqual(FLAGS.flags_unittest, 'foo')
+ self.assertEqual(FLAGS['flags_unittest'].value, 'foo')
+ self.assertEqual(FLAGS.FlagValuesDict()['flags_unittest'], 'foo')
+
+ def test_flagfile(self):
+ flags.DEFINE_string('string', 'default', 'desc',
+ flag_values=self.FLAGS)
+ flags.DEFINE_integer('int', 1, 'desc', flag_values=self.FLAGS)
+ flags.DEFINE_bool('false', False, 'desc', flag_values=self.FLAGS)
+ flags.DEFINE_bool('true', True, 'desc', flag_values=self.FLAGS)
+
+ (fd, path) = tempfile.mkstemp(prefix='nova', suffix='.flags')
+
+ try:
+ os.write(fd, '--string=foo\n--int=2\n--false\n--notrue\n')
+ os.close(fd)
+
+ self.FLAGS(['flags_test', '--flagfile=' + path])
+
+ self.assertEqual(self.FLAGS.string, 'foo')
+ self.assertEqual(self.FLAGS.int, 2)
+ self.assertEqual(self.FLAGS.false, True)
+ self.assertEqual(self.FLAGS.true, False)
+ finally:
+ os.remove(path)
+
+ def test_defaults(self):
+ flags.DEFINE_string('foo', 'bar', 'help', flag_values=self.FLAGS)
+ self.assertEqual(self.FLAGS.foo, 'bar')
+
+ self.FLAGS['foo'].SetDefault('blaa')
+ self.assertEqual(self.FLAGS.foo, 'blaa')
+
+ def test_templated_values(self):
+ flags.DEFINE_string('foo', 'foo', 'help', flag_values=self.FLAGS)
+ flags.DEFINE_string('bar', 'bar', 'help', flag_values=self.FLAGS)
+ flags.DEFINE_string('blaa', '$foo$bar', 'help', flag_values=self.FLAGS)
+ self.assertEqual(self.FLAGS.blaa, 'foobar')
-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
-#
-# Copyright 2011, Nicira Networks, Inc.
+
+# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
import routes.middleware
import webob.dec
import webob.exc
-from xml.dom import minidom
+from lxml import etree
+from xml.dom import minidom
+from xml.parsers import expat
from quantum.common import exceptions as exception
from quantum.common import utils
-
LOG = logging.getLogger('quantum.common.wsgi')
class Server(object):
"""Server class to manage multiple WSGI sockets and applications."""
- def __init__(self, threads=1000):
+ def __init__(self, name, threads=1000):
self.pool = eventlet.GreenPool(threads)
+ self.name = name
def start(self, application, port, host='0.0.0.0', backlog=128):
"""Run a WSGI server with the given application."""
def best_match_content_type(self):
"""Determine the most acceptable content-type.
- Based on the query extension then the Accept header.
-
+ Based on:
+ 1) URI extension (.json/.xml)
+ 2) Content-type header
+ 3) Accept* headers
"""
# First lookup http request
parts = self.path.rsplit('.', 1)
- LOG.debug("Request parts:%s", parts)
if len(parts) > 1:
format = parts[1]
if format in ['json', 'xml']:
type = self.content_type
if type in allowed_types:
return type
- LOG.debug(_("Wrong Content-Type: %s") % type)
return None
+class ActionDispatcher(object):
+ """Maps method name to local methods through action name."""
+
+ def dispatch(self, *args, **kwargs):
+ """Find and call local method."""
+ action = kwargs.pop('action', 'default')
+ action_method = getattr(self, str(action), self.default)
+ return action_method(*args, **kwargs)
+
+ def default(self, data):
+ raise NotImplementedError()
+
+
+class DictSerializer(ActionDispatcher):
+ """Default request body serialization"""
+
+ def serialize(self, data, action='default'):
+ return self.dispatch(data, action=action)
+
+ def default(self, data):
+ return ""
+
+
+class JSONDictSerializer(DictSerializer):
+ """Default JSON request body serialization"""
+
+ def default(self, data):
+ return utils.dumps(data)
+
+
+class XMLDictSerializer(DictSerializer):
+
+ def __init__(self, metadata=None, xmlns=None):
+ """
+ :param metadata: information needed to deserialize xml into
+ a dictionary.
+ :param xmlns: XML namespace to include with serialized xml
+ """
+ super(XMLDictSerializer, self).__init__()
+ self.metadata = metadata or {}
+ self.xmlns = xmlns
+
+ def default(self, data):
+ # We expect data to contain a single key which is the XML root.
+ root_key = data.keys()[0]
+ doc = minidom.Document()
+ node = self._to_xml_node(doc, self.metadata, root_key, data[root_key])
+
+ return self.to_xml_string(node)
+
+ def to_xml_string(self, node, has_atom=False):
+ self._add_xmlns(node, has_atom)
+ return node.toxml('UTF-8')
+
+ #NOTE (ameade): the has_atom should be removed after all of the
+ # xml serializers and view builders have been updated to the current
+ # spec that required all responses include the xmlns:atom, the has_atom
+ # flag is to prevent current tests from breaking
+ def _add_xmlns(self, node, has_atom=False):
+ if self.xmlns is not None:
+ node.setAttribute('xmlns', self.xmlns)
+ if has_atom:
+ node.setAttribute('xmlns:atom', "http://www.w3.org/2005/Atom")
+
+ def _to_xml_node(self, doc, metadata, nodename, data):
+ """Recursive method to convert data members to XML nodes."""
+ result = doc.createElement(nodename)
+
+ # Set the xml namespace if one is specified
+ # TODO(justinsb): We could also use prefixes on the keys
+ xmlns = metadata.get('xmlns', None)
+ if xmlns:
+ result.setAttribute('xmlns', xmlns)
+
+ #TODO(bcwaldon): accomplish this without a type-check
+ if type(data) is list:
+ collections = metadata.get('list_collections', {})
+ if nodename in collections:
+ metadata = collections[nodename]
+ for item in data:
+ node = doc.createElement(metadata['item_name'])
+ node.setAttribute(metadata['item_key'], str(item))
+ result.appendChild(node)
+ return result
+ singular = metadata.get('plurals', {}).get(nodename, None)
+ if singular is None:
+ if nodename.endswith('s'):
+ singular = nodename[:-1]
+ else:
+ singular = 'item'
+ for item in data:
+ node = self._to_xml_node(doc, metadata, singular, item)
+ result.appendChild(node)
+ #TODO(bcwaldon): accomplish this without a type-check
+ elif type(data) is dict:
+ collections = metadata.get('dict_collections', {})
+ if nodename in collections:
+ metadata = collections[nodename]
+ for k, v in data.items():
+ node = doc.createElement(metadata['item_name'])
+ node.setAttribute(metadata['item_key'], str(k))
+ text = doc.createTextNode(str(v))
+ node.appendChild(text)
+ result.appendChild(node)
+ return result
+ attrs = metadata.get('attributes', {}).get(nodename, {})
+ for k, v in data.items():
+ if k in attrs:
+ result.setAttribute(k, str(v))
+ else:
+ node = self._to_xml_node(doc, metadata, k, v)
+ result.appendChild(node)
+ else:
+ # Type is atom
+ node = doc.createTextNode(str(data))
+ result.appendChild(node)
+ return result
+
+ def _create_link_nodes(self, xml_doc, links):
+ link_nodes = []
+ for link in links:
+ link_node = xml_doc.createElement('atom:link')
+ link_node.setAttribute('rel', link['rel'])
+ link_node.setAttribute('href', link['href'])
+ if 'type' in link:
+ link_node.setAttribute('type', link['type'])
+ link_nodes.append(link_node)
+ return link_nodes
+
+ def _to_xml(self, root):
+ """Convert the xml object to an xml string."""
+ return etree.tostring(root, encoding='UTF-8', xml_declaration=True)
+
+
+class ResponseHeaderSerializer(ActionDispatcher):
+ """Default response headers serialization"""
+
+ def serialize(self, response, data, action):
+ self.dispatch(response, data, action=action)
+
+ def default(self, response, data):
+ response.status_int = 200
+
+
+class ResponseSerializer(object):
+ """Encode the necessary pieces into a response object"""
+
+ def __init__(self, body_serializers=None, headers_serializer=None):
+ self.body_serializers = {
+ 'application/xml': XMLDictSerializer(),
+ 'application/json': JSONDictSerializer(),
+ }
+ self.body_serializers.update(body_serializers or {})
+
+ self.headers_serializer = headers_serializer or \
+ ResponseHeadersSerializer()
+
+ def serialize(self, response_data, content_type, action='default'):
+ """Serialize a dict into a string and wrap in a wsgi.Request object.
+
+ :param response_data: dict produced by the Controller
+ :param content_type: expected mimetype of serialized response body
+
+ """
+ response = webob.Response()
+ self.serialize_headers(response, response_data, action)
+ self.serialize_body(response, response_data, content_type, action)
+ return response
+
+ def serialize_headers(self, response, data, action):
+ self.headers_serializer.serialize(response, data, action)
+
+ def serialize_body(self, response, data, content_type, action):
+ response.headers['Content-Type'] = content_type
+ if data is not None:
+ serializer = self.get_body_serializer(content_type)
+ response.body = serializer.serialize(data, action)
+
+ def get_body_serializer(self, content_type):
+ try:
+ return self.body_serializers[content_type]
+ except (KeyError, TypeError):
+ raise exception.InvalidContentType(content_type=content_type)
+
+
+class TextDeserializer(ActionDispatcher):
+ """Default request body deserialization"""
+
+ def deserialize(self, datastring, action='default'):
+ return self.dispatch(datastring, action=action)
+
+ def default(self, datastring):
+ return {}
+
+
+class JSONDeserializer(TextDeserializer):
+
+ def _from_json(self, datastring):
+ try:
+ return utils.loads(datastring)
+ except ValueError:
+ msg = _("cannot understand JSON")
+ raise exception.MalformedRequestBody(reason=msg)
+
+ def default(self, datastring):
+ return {'body': self._from_json(datastring)}
+
+
+class XMLDeserializer(TextDeserializer):
+
+ def __init__(self, metadata=None):
+ """
+ :param metadata: information needed to deserialize xml into
+ a dictionary.
+ """
+ super(XMLDeserializer, self).__init__()
+ self.metadata = metadata or {}
+
+ def _from_xml(self, datastring):
+ plurals = set(self.metadata.get('plurals', {}))
+ try:
+ node = minidom.parseString(datastring).childNodes[0]
+ return {node.nodeName: self._from_xml_node(node, plurals)}
+ except expat.ExpatError:
+ msg = _("cannot understand XML")
+ raise exception.MalformedRequestBody(reason=msg)
+
+ def _from_xml_node(self, node, listnames):
+ """Convert a minidom node to a simple Python type.
+
+ :param listnames: list of XML node names whose subnodes should
+ be considered list items.
+
+ """
+ if len(node.childNodes) == 1 and node.childNodes[0].nodeType == 3:
+ return node.childNodes[0].nodeValue
+ elif node.nodeName in listnames:
+ return [self._from_xml_node(n, listnames) for n in node.childNodes]
+ else:
+ result = dict()
+ for attr in node.attributes.keys():
+ result[attr] = node.attributes[attr].nodeValue
+ for child in node.childNodes:
+ if child.nodeType != node.TEXT_NODE:
+ result[child.nodeName] = self._from_xml_node(child,
+ listnames)
+ return result
+
+ def find_first_child_named(self, parent, name):
+ """Search a nodes children for the first child with a given name"""
+ for node in parent.childNodes:
+ if node.nodeName == name:
+ return node
+ return None
+
+ def find_children_named(self, parent, name):
+ """Return all of a nodes children who have the given name"""
+ for node in parent.childNodes:
+ if node.nodeName == name:
+ yield node
+
+ def extract_text(self, node):
+ """Get the text field contained by the given node"""
+ if len(node.childNodes) == 1:
+ child = node.childNodes[0]
+ if child.nodeType == child.TEXT_NODE:
+ return child.nodeValue
+ return ""
+
+ def default(self, datastring):
+ return {'body': self._from_xml(datastring)}
+
+
+class RequestHeadersDeserializer(ActionDispatcher):
+ """Default request headers deserializer"""
+
+ def deserialize(self, request, action):
+ return self.dispatch(request, action=action)
+
+ def default(self, request):
+ return {}
+
+
+class RequestDeserializer(object):
+ """Break up a Request object into more useful pieces."""
+
+ def __init__(self, body_deserializers=None, headers_deserializer=None):
+ self.body_deserializers = {
+ 'application/xml': XMLDeserializer(),
+ 'application/json': JSONDeserializer(),
+ }
+ self.body_deserializers.update(body_deserializers or {})
+
+ self.headers_deserializer = headers_deserializer or \
+ RequestHeadersDeserializer()
+
+ def deserialize(self, request):
+ """Extract necessary pieces of the request.
+
+ :param request: Request object
+ :returns tuple of expected controller action name, dictionary of
+ keyword arguments to pass to the controller, the expected
+ content type of the response
+
+ """
+ action_args = self.get_action_args(request.environ)
+ action = action_args.pop('action', None)
+
+ action_args.update(self.deserialize_headers(request, action))
+ action_args.update(self.deserialize_body(request, action))
+
+ accept = self.get_expected_content_type(request)
+
+ return (action, action_args, accept)
+
+ def deserialize_headers(self, request, action):
+ return self.headers_deserializer.deserialize(request, action)
+
+ def deserialize_body(self, request, action):
+ try:
+ content_type = request.best_match_content_type()
+ except exception.InvalidContentType:
+ LOG.debug(_("Unrecognized Content-Type provided in request"))
+ return {}
+
+ if content_type is None:
+ LOG.debug(_("No Content-Type provided in request"))
+ return {}
+
+ if not len(request.body) > 0:
+ LOG.debug(_("Empty body provided in request"))
+ return {}
+
+ try:
+ deserializer = self.get_body_deserializer(content_type)
+ except exception.InvalidContentType:
+ LOG.debug(_("Unable to deserialize body as provided Content-Type"))
+ raise
+
+ return deserializer.deserialize(request.body, action)
+
+ def get_body_deserializer(self, content_type):
+ try:
+ return self.body_deserializers[content_type]
+ except (KeyError, TypeError):
+ raise exception.InvalidContentType(content_type=content_type)
+
+ def get_expected_content_type(self, request):
+ return request.best_match_content_type()
+
+ def get_action_args(self, request_environment):
+ """Parse dictionary created by routes library."""
+ try:
+ args = request_environment['wsgiorg.routing_args'][1].copy()
+ except Exception:
+ return {}
+
+ try:
+ del args['controller']
+ except KeyError:
+ pass
+
+ try:
+ del args['format']
+ except KeyError:
+ pass
+
+ return args
+
+
class Application(object):
"""Base WSGI application wrapper. Subclasses need to implement __call__."""
return app
+class Resource(Application):
+ """WSGI app that handles (de)serialization and controller dispatch.
+
+ WSGI app that reads routing information supplied by RoutesMiddleware
+ and calls the requested action method upon its controller. All
+ controller action methods must accept a 'req' argument, which is the
+ incoming wsgi.Request. If the operation is a PUT or POST, the controller
+ method must also accept a 'body' argument (the deserialized request body).
+ They may raise a webob.exc exception or return a dict, which will be
+ serialized by requested content type.
+
+ """
+
+ def __init__(self, controller, deserializer=None, serializer=None):
+ """
+ :param controller: object that implement methods created by routes lib
+ :param deserializer: object that can serialize the output of a
+ controller into a webob response
+ :param serializer: object that can deserialize a webob request
+ into necessary pieces
+
+ """
+ self.controller = controller
+ self.deserializer = deserializer or RequestDeserializer()
+ self.serializer = serializer or ResponseSerializer()
+ # use serializer's xmlns for populating Fault generator xmlns
+ xml_serializer = self.serializer.body_serializers['application/xml']
+ if hasattr(xml_serializer, 'xmlns'):
+ self._xmlns = xml_serializer.xmlns
+
+ @webob.dec.wsgify(RequestClass=Request)
+ def __call__(self, request):
+ """WSGI method that controls (de)serialization and method dispatch."""
+
+ LOG.info("%(method)s %(url)s" % {"method": request.method,
+ "url": request.url})
+
+ try:
+ action, args, accept = self.deserializer.deserialize(request)
+ except exception.InvalidContentType:
+ msg = _("Unsupported Content-Type")
+ LOG.exception("InvalidContentType:%s", msg)
+ return Fault(webob.exc.HTTPBadRequest(explanation=msg),
+ self._xmlns)
+ except exception.MalformedRequestBody:
+ msg = _("Malformed request body")
+ LOG.exception("MalformedRequestBody:%s", msg)
+ return Fault(webob.exc.HTTPBadRequest(explanation=msg),
+ self._xmlns)
+
+ try:
+ action_result = self.dispatch(request, action, args)
+ except webob.exc.HTTPException as ex:
+ LOG.info(_("HTTP exception thrown: %s"), unicode(ex))
+ action_result = Fault(ex, self._xmlns)
+
+ if type(action_result) is dict or action_result is None:
+ response = self.serializer.serialize(action_result,
+ accept,
+ action=action)
+ else:
+ response = action_result
+
+ try:
+ msg_dict = dict(url=request.url, status=response.status_int)
+ msg = _("%(url)s returned with HTTP %(status)d") % msg_dict
+ except AttributeError, e:
+ msg_dict = dict(url=request.url, e=e)
+ msg = _("%(url)s returned a fault: %(e)s" % msg_dict)
+
+ LOG.info(msg)
+
+ return response
+
+ def dispatch(self, request, action, action_args):
+ """Find action-spefic method on controller and call it."""
+
+ controller_method = getattr(self.controller, action)
+ try:
+ #NOTE(salvatore-orlando): the controller method must have
+ # an argument whose name is 'request'
+ return controller_method(request=request, **action_args)
+ except TypeError as exc:
+ LOG.exception(exc)
+ return Fault(webob.exc.HTTPBadRequest(),
+ self._xmlns)
+
+
+class Fault(webob.exc.HTTPException):
+ """ Generates an HTTP response from a webob HTTP exception"""
+
+ def __init__(self, exception, xmlns=None):
+ """Creates a Fault for the given webob.exc.exception."""
+ self.wrapped_exc = exception
+ self.status_int = self.wrapped_exc.status_int
+ self._xmlns = xmlns
+
+ @webob.dec.wsgify(RequestClass=Request)
+ def __call__(self, req):
+ """Generate a WSGI response based on the exception passed to ctor."""
+ # Replace the body with fault details.
+ code = self.wrapped_exc.status_int
+ fault_name = hasattr(self.wrapped_exc, 'title') and \
+ self.wrapped_exc.title or "quantumServiceFault"
+ fault_data = {
+ fault_name: {
+ 'code': code,
+ 'message': self.wrapped_exc.explanation,
+ 'detail': str(self.wrapped_exc.detail)}}
+ # 'code' is an attribute on the fault tag itself
+ metadata = {'application/xml': {'attributes': {fault_name: 'code'}}}
+ xml_serializer = XMLDictSerializer(metadata, self._xmlns)
+ content_type = req.best_match_content_type()
+ serializer = {
+ 'application/xml': xml_serializer,
+ 'application/json': JSONDictSerializer(),
+ }[content_type]
+
+ self.wrapped_exc.body = serializer.serialize(fault_data)
+ self.wrapped_exc.content_type = content_type
+ return self.wrapped_exc
+
+
+# NOTE(salvatore-orlando): this class will go once the
+# extension API framework is updated
class Controller(object):
"""WSGI app that dispatched to methods.
return None
+# NOTE(salvatore-orlando): this class will go once the
+# extension API framework is updated
class Serializer(object):
"""Serializes and deserializes dictionaries to certain MIME types."""