From 8499d50259d7f7c01de6dad08b660afa5544af4d Mon Sep 17 00:00:00 2001 From: Tatyana Leontovich Date: Tue, 29 Jan 2013 16:29:13 +0200 Subject: [PATCH] Improve test coverage for quantum wsgi module OpenStack quantum.wsgi package includes a lot of classes not covered with unit tests. This change adds such unit tests. Fixes: bug #1136027 Change-Id: I16dd35389bf82f2818b74d1fb4cf7f7a6bebce28 --- quantum/tests/unit/test_wsgi.py | 683 ++++++++++++++++++++++++++++++-- 1 file changed, 652 insertions(+), 31 deletions(-) diff --git a/quantum/tests/unit/test_wsgi.py b/quantum/tests/unit/test_wsgi.py index c2759a65f..6d0c1b745 100644 --- a/quantum/tests/unit/test_wsgi.py +++ b/quantum/tests/unit/test_wsgi.py @@ -15,9 +15,13 @@ # License for the specific language governing permissions and limitations # under the License. -import mock import socket +import mock +import testtools +import webob +import webob.exc + from quantum.api.v2 import attributes from quantum.common import constants from quantum.common import exceptions as exception @@ -86,7 +90,7 @@ class SerializerTest(base.BaseTestCase): """ Test serialize verifies that exception InvalidContentType is raised """ - input_dict = dict(servers={'test': 'pass'}) + input_dict = {'servers': {'test': 'pass'}} content_type = 'application/unknown' serializer = wsgi.Serializer() @@ -106,8 +110,229 @@ class SerializerTest(base.BaseTestCase): exception.InvalidContentType, serializer.get_deserialize_handler, content_type) + def test_serialize_content_type_json(self): + """ + Test serialize with content type json + """ + input_data = {'servers': ['test=pass']} + content_type = 'application/json' + serializer = wsgi.Serializer(default_xmlns="fake") + result = serializer.serialize(input_data, content_type) + + self.assertEqual('{"servers": ["test=pass"]}', result) + + def test_serialize_content_type_xml(self): + """ + Test serialize with content type xml + """ + input_data = {'servers': ['test=pass']} + content_type = 'application/xml' + serializer = wsgi.Serializer(default_xmlns="fake") + result = serializer.serialize(input_data, content_type) + expected = ( + '\n' + '' + 'test=pass' + ) + + self.assertEqual(expected, result) + + def test_deserialize_raise_bad_request(self): + """ + Test serialize verifies that exception is raises + """ + content_type = 'application/unknown' + data_string = 'test' + serializer = wsgi.Serializer(default_xmlns="fake") + + self.assertRaises( + webob.exc.HTTPBadRequest, + serializer.deserialize, data_string, content_type) + + def test_deserialize_json_content_type(self): + """ + Test Serializer.deserialize with content type json + """ + content_type = 'application/json' + data_string = '{"servers": ["test=pass"]}' + serializer = wsgi.Serializer(default_xmlns="fake") + result = serializer.deserialize(data_string, content_type) + + self.assertEqual({'body': {u'servers': [u'test=pass']}}, result) + + def test_deserialize_xml_content_type(self): + """ + Test deserialize with content type xml + """ + content_type = 'application/xml' + data_string = ( + '' + 'test=pass' + '' + ) + serializer = wsgi.Serializer( + default_xmlns="fake", metadata={'xmlns': 'fake'}) + result = serializer.deserialize(data_string, content_type) + expected = {'body': {'servers': {'server': 'test=pass'}}} + + self.assertEqual(expected, result) + + def test_deserialize_xml_content_type_with_meta(self): + """ + Test deserialize with content type xml with meta + """ + content_type = 'application/xml' + data_string = ( + '' + 'passed' + '' + ) + + metadata = {'plurals': ['servers', 'test'], 'xmlns': 'fake'} + serializer = wsgi.Serializer( + default_xmlns="fake", metadata=metadata) + result = serializer.deserialize(data_string, content_type) + expected = {'body': {'servers': ['passed']}} + + self.assertEqual(expected, result) + + def test_serialize_xml_root_key_is_dict(self): + """ + Test Serializer.serialize with content type xml with meta dict + """ + content_type = 'application/xml' + data = {'servers': {'network': (2, 3)}} + metadata = {'xmlns': 'fake'} + + serializer = wsgi.Serializer(default_xmlns="fake", metadata=metadata) + result = serializer.serialize(data, content_type) + result = result.replace('\n', '') + expected = ( + '' + '' + '(2, 3)' + ) + + self.assertEqual(result, expected) + + def test_serialize_xml_root_key_is_list(self): + """ + Test serialize with content type xml with meta list + """ + input_dict = {'servers': ['test=pass']} + content_type = 'application/xml' + metadata = {'application/xml': { + 'xmlns': 'fake'}} + serializer = wsgi.Serializer(default_xmlns="fake", metadata=metadata) + result = serializer.serialize(input_dict, content_type) + result = result.replace('\n', '').replace(' ', '') + expected = ( + '' + '' + 'test=pass' + ) + + self.assertEqual(result, expected) + + def test_serialize_xml_root_is_None(self): + input_dict = {'test': 'pass'} + content_type = 'application/xml' + serializer = wsgi.Serializer(default_xmlns="fake") + result = serializer.serialize(input_dict, content_type) + result = result.replace('\n', '').replace(' ', '') + expected = ( + '' + '' + 'pass' + ) + + self.assertEqual(result, expected) + + +class RequestDeserializerTest(testtools.TestCase): + def setUp(self): + super(RequestDeserializerTest, self).setUp() + + class JSONDeserializer(object): + def deserialize(self, data, action='default'): + return 'pew_json' + + class XMLDeserializer(object): + def deserialize(self, data, action='default'): + return 'pew_xml' + + self.body_deserializers = { + 'application/json': JSONDeserializer(), + 'application/xml': XMLDeserializer()} + + self.deserializer = wsgi.RequestDeserializer(self.body_deserializers) + + def test_get_deserializer(self): + """ + Test RequestDeserializer.get_body_deserializer + """ + expected_json_serializer = self.deserializer.get_body_deserializer( + 'application/json') + expected_xml_serializer = self.deserializer.get_body_deserializer( + 'application/xml') + + self.assertEqual( + expected_json_serializer, + self.body_deserializers['application/json']) + self.assertEqual( + expected_xml_serializer, + self.body_deserializers['application/xml']) + + def test_get_expected_content_type(self): + """ + Test RequestDeserializer.get_expected_content_type + """ + request = wsgi.Request.blank('/') + request.headers['Accept'] = 'application/json' + + self.assertEqual( + self.deserializer.get_expected_content_type(request), + 'application/json') + + def test_get_action_args(self): + """ + Test RequestDeserializer.get_action_args + """ + env = { + 'wsgiorg.routing_args': [None, { + 'controller': None, + 'format': None, + 'action': 'update', + 'id': 12}]} + expected = {'action': 'update', 'id': 12} + + self.assertEqual( + self.deserializer.get_action_args(env), expected) + + def test_deserialize(self): + """ + Test RequestDeserializer.deserialize + """ + with mock.patch.object( + self.deserializer, 'get_action_args') as mock_method: + mock_method.return_value = {'action': 'create'} + request = wsgi.Request.blank('/') + request.headers['Accept'] = 'application/xml' + deserialized = self.deserializer.deserialize(request) + expected = ('create', {}, 'application/xml') + + self.assertEqual(expected, deserialized) -class RequestDeserializerTest(base.BaseTestCase): def test_get_body_deserializer_unknown_content_type(self): """ Test get body deserializer verifies @@ -120,7 +345,7 @@ class RequestDeserializerTest(base.BaseTestCase): deserializer.get_body_deserializer, content_type) -class ResponseSerializerTest(base.BaseTestCase): +class ResponseSerializerTest(testtools.TestCase): def setUp(self): super(ResponseSerializerTest, self).setUp() @@ -162,42 +387,234 @@ class ResponseSerializerTest(base.BaseTestCase): exception.InvalidContentType, self.serializer.get_body_serializer, 'application/unknown') + def test_get_serializer(self): + """ + Test ResponseSerializer.get_body_serializer + """ + content_type = 'application/json' + self.assertEqual( + self.serializer.get_body_serializer(content_type), + self.body_serializers[content_type]) -class XMLDeserializerTest(base.BaseTestCase): - def test_default_raise_Maiformed_Exception(self): + def test_serialize_json_response(self): + response = self.serializer.serialize({}, 'application/json') + + self.assertEqual(response.headers['Content-Type'], 'application/json') + self.assertEqual(response.body, 'pew_json') + self.assertEqual(response.status_int, 404) + + def test_serialize_xml_response(self): + response = self.serializer.serialize({}, 'application/xml') + + self.assertEqual(response.headers['Content-Type'], 'application/xml') + self.assertEqual(response.body, 'pew_xml') + self.assertEqual(response.status_int, 404) + + def test_serialize_response_None(self): + response = self.serializer.serialize( + None, 'application/json') + + self.assertEqual(response.headers['Content-Type'], 'application/json') + self.assertEqual(response.body, '') + self.assertEqual(response.status_int, 404) + + +class RequestTest(base.BaseTestCase): + + def test_content_type_missing(self): + request = wsgi.Request.blank('/tests/123', method='POST') + request.body = "" + + self.assertIsNone(request.get_content_type()) + + def test_content_type_unsupported(self): + request = wsgi.Request.blank('/tests/123', method='POST') + request.headers["Content-Type"] = "text/html" + request.body = "fake
" + + self.assertIsNone(request.get_content_type()) + + def test_content_type_with_charset(self): + request = wsgi.Request.blank('/tests/123') + request.headers["Content-Type"] = "application/json; charset=UTF-8" + result = request.get_content_type() + + self.assertEqual(result, "application/json") + + def test_content_type_with_given_content_types(self): + request = wsgi.Request.blank('/tests/123') + request.headers["Content-Type"] = "application/new-type;" + + self.assertIsNone(request.get_content_type()) + + def test_content_type_from_accept(self): + request = wsgi.Request.blank('/tests/123') + request.headers["Accept"] = "application/xml" + result = request.best_match_content_type() + + self.assertEqual(result, "application/xml") + + request = wsgi.Request.blank('/tests/123') + request.headers["Accept"] = "application/json" + result = request.best_match_content_type() + + self.assertEqual(result, "application/json") + + request = wsgi.Request.blank('/tests/123') + request.headers["Accept"] = "application/xml, application/json" + result = request.best_match_content_type() + + self.assertEqual(result, "application/json") + + request = wsgi.Request.blank('/tests/123') + request.headers["Accept"] = ("application/json; q=0.3, " + "application/xml; q=0.9") + result = request.best_match_content_type() + + self.assertEqual(result, "application/xml") + + def test_content_type_from_query_extension(self): + request = wsgi.Request.blank('/tests/123.xml') + result = request.best_match_content_type() + + self.assertEqual(result, "application/xml") + + request = wsgi.Request.blank('/tests/123.json') + result = request.best_match_content_type() + + self.assertEqual(result, "application/json") + + request = wsgi.Request.blank('/tests/123.invalid') + result = request.best_match_content_type() + + self.assertEqual(result, "application/json") + + def test_content_type_accept_and_query_extension(self): + request = wsgi.Request.blank('/tests/123.xml') + request.headers["Accept"] = "application/json" + result = request.best_match_content_type() + + self.assertEqual(result, "application/xml") + + def test_content_type_accept_default(self): + request = wsgi.Request.blank('/tests/123.unsupported') + request.headers["Accept"] = "application/unsupported1" + result = request.best_match_content_type() + + self.assertEqual(result, "application/json") + + def test_content_type_accept_with_given_content_types(self): + request = wsgi.Request.blank('/tests/123') + request.headers["Accept"] = "application/new_type" + result = request.best_match_content_type() + + self.assertEqual(result, 'application/json') + + +class ActionDispatcherTest(base.BaseTestCase): + def test_dispatch(self): """ - Test verifies that exception MalformedRequestBody is raised + Test ActionDispatcher.dispatch """ - data_string = "" - deserializer = wsgi.XMLDeserializer() + serializer = wsgi.ActionDispatcher() + serializer.create = lambda x: x - self.assertRaises( - exception.MalformedRequestBody, deserializer.default, data_string) + self.assertEqual( + serializer.dispatch('pants', action='create'), + 'pants') - def test_entity_expansion(self): - def killer_body(): - return ((""" - - ]> - - - %(d)s - - """) % { - 'a': 'A' * 10, - 'b': '&a;' * 10, - 'c': '&b;' * 10, - 'd': '&c;' * 9999, - }).strip() + def test_dispatch_action_None(self): + """ + Test ActionDispatcher.dispatch with none action + """ + serializer = wsgi.ActionDispatcher() + serializer.create = lambda x: x + ' pants' + serializer.default = lambda x: x + ' trousers' - deserializer = wsgi.XMLDeserializer() - self.assertRaises( - ValueError, deserializer.default, killer_body()) + self.assertEqual( + serializer.dispatch('Two', action=None), + 'Two trousers') + + def test_dispatch_default(self): + serializer = wsgi.ActionDispatcher() + serializer.create = lambda x: x + ' pants' + serializer.default = lambda x: x + ' trousers' + + self.assertEqual( + serializer.dispatch('Two', action='update'), + 'Two trousers') + + +class ResponseHeadersSerializerTest(base.BaseTestCase): + def test_default(self): + serializer = wsgi.ResponseHeaderSerializer() + response = webob.Response() + serializer.serialize(response, {'v': '123'}, 'fake') + + self.assertEqual(response.status_int, 200) + + def test_custom(self): + class Serializer(wsgi.ResponseHeaderSerializer): + def update(self, response, data): + response.status_int = 404 + response.headers['X-Custom-Header'] = data['v'] + serializer = Serializer() + response = webob.Response() + serializer.serialize(response, {'v': '123'}, 'update') + + self.assertEqual(response.status_int, 404) + self.assertEqual(response.headers['X-Custom-Header'], '123') + + +class DictSerializerTest(base.BaseTestCase): + + def test_dispatch_default(self): + serializer = wsgi.DictSerializer() + self.assertEqual( + serializer.serialize({}, 'NonExistantAction'), '') + + +class JSONDictSerializerTest(base.BaseTestCase): + + def test_json(self): + input_dict = dict(servers=dict(a=(2, 3))) + expected_json = '{"servers":{"a":[2,3]}}' + serializer = wsgi.JSONDictSerializer() + result = serializer.serialize(input_dict) + result = result.replace('\n', '').replace(' ', '') + + self.assertEqual(result, expected_json) + + +class TextDeserializerTest(base.BaseTestCase): + + def test_dispatch_default(self): + deserializer = wsgi.TextDeserializer() + self.assertEqual( + deserializer.deserialize({}, 'update'), {}) class JSONDeserializerTest(base.BaseTestCase): - def test_default_raise_Maiformed_Exception(self): + def test_json(self): + data = """{"a": { + "a1": "1", + "a2": "2", + "bs": ["1", "2", "3", {"c": {"c1": "1"}}], + "d": {"e": "1"}, + "f": "1"}}""" + as_dict = { + 'body': { + 'a': { + 'a1': '1', + 'a2': '2', + 'bs': ['1', '2', '3', {'c': {'c1': '1'}}], + 'd': {'e': '1'}, + 'f': '1'}}} + deserializer = wsgi.JSONDeserializer() + self.assertEqual( + deserializer.deserialize(data), as_dict) + + def test_default_raise_Malformed_Exception(self): """ Test verifies JsonDeserializer.default raises exception MalformedRequestBody correctly @@ -209,7 +626,69 @@ class JSONDeserializerTest(base.BaseTestCase): exception.MalformedRequestBody, deserializer.default, data_string) +class XMLDeserializerTest(base.BaseTestCase): + def test_xml_empty(self): + xml = '' + as_dict = {'body': {'a': ''}} + deserializer = wsgi.XMLDeserializer() + + self.assertEqual( + deserializer.deserialize(xml), as_dict) + + def test_initialization(self): + xml = 'test' + deserializer = wsgi.XMLDeserializer() + + self.assertEqual( + {'body': {u'a': {u'b': u'test'}}}, deserializer(xml)) + + def test_default_raise_Malformed_Exception(self): + """ + Test verifies that exception MalformedRequestBody is raised + """ + data_string = "" + deserializer = wsgi.XMLDeserializer() + + self.assertRaises( + exception.MalformedRequestBody, deserializer.default, data_string) + + +class RequestHeadersDeserializerTest(base.BaseTestCase): + + def test_default(self): + deserializer = wsgi.RequestHeadersDeserializer() + req = wsgi.Request.blank('/') + + self.assertEqual( + deserializer.deserialize(req, 'nonExistant'), {}) + + def test_custom(self): + class Deserializer(wsgi.RequestHeadersDeserializer): + def update(self, request): + return {'a': request.headers['X-Custom-Header']} + deserializer = Deserializer() + req = wsgi.Request.blank('/') + req.headers['X-Custom-Header'] = 'b' + self.assertEqual( + deserializer.deserialize(req, 'update'), {'a': 'b'}) + + class ResourceTest(base.BaseTestCase): + def test_dispatch(self): + class Controller(object): + def index(self, request, index=None): + return index + + def my_fault_body_function(): + return 'off' + + resource = wsgi.Resource(Controller(), my_fault_body_function) + actual = resource.dispatch( + resource.controller, 'index', action_args={'index': 'off'}) + expected = 'off' + + self.assertEqual(actual, expected) + def test_dispatch_unknown_controller_action(self): class Controller(object): def index(self, request, pants=None): @@ -281,6 +760,83 @@ class ResourceTest(base.BaseTestCase): result = resource(request) self.assertEqual(400, result.status_int) + def test_type_error(self): + class Controller(object): + def index(self, request, index=None): + return index + + def my_fault_body_function(): + return 'off' + resource = wsgi.Resource(Controller(), my_fault_body_function) + request = wsgi.Request.blank( + "/", method='POST', headers={'Content-Type': "xml"}) + + response = resource.dispatch( + request, action='index', action_args='test') + self.assertEqual(400, response.status_int) + + def test_call_resource_class_internal_error(self): + class Controller(object): + def index(self, request, index=None): + return index + + def my_fault_body_function(): + return 'off' + + class FakeRequest(): + def __init__(self): + self.url = 'http://where.no' + self.environ = 'environ' + self.body = '{"Content-Type": "xml"}' + + def method(self): + pass + + def best_match_content_type(self): + return 'application/json' + + resource = wsgi.Resource(Controller(), my_fault_body_function) + request = FakeRequest() + result = resource(request) + self.assertEqual(500, result.status_int) + + +class ServerTest(base.BaseTestCase): + + def test_run_server(self): + with mock.patch('eventlet.listen') as listen: + with mock.patch('eventlet.wsgi.server') as server: + wsgi.run_server(mock.sentinel.application, mock.sentinel.port) + server.assert_called_once_with( + listen.return_value, + mock.sentinel.application + ) + listen.assert_called_once_with(('0.0.0.0', mock.sentinel.port)) + + +class MiddlewareTest(base.BaseTestCase): + def test_process_response(self): + def application(environ, start_response): + response = 'Sucess' + return response + response = application('test', 'fake') + result = wsgi.Middleware(application).process_response(response) + self.assertEqual('Sucess', result) + + +class FaultTest(base.BaseTestCase): + def test_call_fault(self): + class MyException(object): + status_int = 415 + explanation = 'test' + + my_exceptions = MyException() + my_fault = wsgi.Fault(exception=my_exceptions) + request = wsgi.Request.blank( + "/", method='POST', headers={'Content-Type': "unknow"}) + response = my_fault(request) + self.assertEqual(415, response.status_int) + class XMLDictSerializerTest(base.BaseTestCase): def test_xml(self): @@ -393,3 +949,68 @@ class XMLDictSerializerTest(base.BaseTestCase): deserializer = wsgi.XMLDeserializer(attributes.get_attr_metadata()) new_data = deserializer.deserialize(result)['body'] self.assertEqual(data, new_data) + + def test_xml_root_key_is_list(self): + input_dict = {'servers': ['test-pass']} + serializer = wsgi.XMLDictSerializer(xmlns="fake") + result = serializer.default(input_dict) + result = result.replace('\n', '').replace(' ', '') + expected = ( + '' + '' + 'test-pass' + ) + + self.assertEqual(result, expected) + + def test_xml_meta_contains_node_name_list(self): + input_dict = {'servers': ['test-pass']} + servers = {'nodename': 'test', + 'item_name': 'test', + 'item_key': 'test'} + metadata = {'list_collections': {'servers': servers}} + serializer = wsgi.XMLDictSerializer(xmlns="fake", metadata=metadata) + result = serializer.default(input_dict) + result = result.replace('\n', '').replace(' ', '') + expected = ( + '' + '' + 'test-pass' + ) + + self.assertEqual(result, expected) + + def test_xml_meta_contains_node_name_dict(self): + input_dict = {'servers': {'a': {'2': '3'}}} + servers = {'servers': { + 'nodename': 'test', + 'item_name': 'test', + 'item_key': 'test'}} + metadata = {'dict_collections': servers} + serializer = wsgi.XMLDictSerializer(xmlns="fake", metadata=metadata) + result = serializer.default(input_dict) + result = result.replace('\n', '').replace(' ', '') + expected = ( + '' + '' + '<2>3' + ) + + self.assertEqual(result, expected) + + def test_call(self): + data = {'servers': {'a': {'2': '3'}}} + serializer = wsgi.XMLDictSerializer() + expected = ( + '' + '' + '<2>3' + ) + result = serializer(data) + result = result.replace('\n', '').replace(' ', '') + self.assertEqual(expected, result) -- 2.45.2