[DEFAULT]
# The list of modules to copy from openstack-common
-modules=cfg,importutils,iniparser,setup
+modules=cfg,importutils,iniparser,jsonutils,setup
# The base module to hold the copy of openstack.common
base=quantum
import ConfigParser
import datetime
import inspect
-import json
import logging
import os
import random
from quantum.common import flags
-def dumps(value):
- return json.dumps(value)
-
-
-def loads(value):
- return json.loads(value)
-
-
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
FLAGS = flags.FLAGS
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# Copyright 2011 Justin Santa Barbara
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+'''
+JSON related utilities.
+
+This module provides a few things:
+
+ 1) A handy function for getting an object down to something that can be
+ JSON serialized. See to_primitive().
+
+ 2) Wrappers around loads() and dumps(). The dumps() wrapper will
+ automatically use to_primitive() for you if needed.
+
+ 3) This sets up anyjson to use the loads() and dumps() wrappers if anyjson
+ is available.
+'''
+
+
+import datetime
+import inspect
+import itertools
+import json
+
+
+def to_primitive(value, convert_instances=False, level=0):
+ """Convert a complex object into primitives.
+
+ Handy for JSON serialization. We can optionally handle instances,
+ but since this is a recursive function, we could have cyclical
+ data structures.
+
+ To handle cyclical data structures we could track the actual objects
+ visited in a set, but not all objects are hashable. Instead we just
+ track the depth of the object inspections and don't go too deep.
+
+ Therefore, convert_instances=True is lossy ... be aware.
+
+ """
+ nasty = [inspect.ismodule, inspect.isclass, inspect.ismethod,
+ inspect.isfunction, inspect.isgeneratorfunction,
+ inspect.isgenerator, inspect.istraceback, inspect.isframe,
+ inspect.iscode, inspect.isbuiltin, inspect.isroutine,
+ inspect.isabstract]
+ for test in nasty:
+ if test(value):
+ return unicode(value)
+
+ # value of itertools.count doesn't get caught by inspects
+ # above and results in infinite loop when list(value) is called.
+ if type(value) == itertools.count:
+ return unicode(value)
+
+ # FIXME(vish): Workaround for LP bug 852095. Without this workaround,
+ # tests that raise an exception in a mocked method that
+ # has a @wrap_exception with a notifier will fail. If
+ # we up the dependency to 0.5.4 (when it is released) we
+ # can remove this workaround.
+ if getattr(value, '__module__', None) == 'mox':
+ return 'mock'
+
+ if level > 3:
+ return '?'
+
+ # The try block may not be necessary after the class check above,
+ # but just in case ...
+ try:
+ if isinstance(value, (list, tuple)):
+ o = []
+ for v in value:
+ o.append(to_primitive(v, convert_instances=convert_instances,
+ level=level))
+ return o
+ elif isinstance(value, dict):
+ o = {}
+ for k, v in value.iteritems():
+ o[k] = to_primitive(v, convert_instances=convert_instances,
+ level=level)
+ return o
+ elif isinstance(value, datetime.datetime):
+ return str(value)
+ elif hasattr(value, 'iteritems'):
+ return to_primitive(dict(value.iteritems()),
+ convert_instances=convert_instances,
+ level=level)
+ elif hasattr(value, '__iter__'):
+ return to_primitive(list(value), level)
+ elif convert_instances and hasattr(value, '__dict__'):
+ # Likely an instance of something. Watch for cycles.
+ # Ignore class member vars.
+ return to_primitive(value.__dict__,
+ convert_instances=convert_instances,
+ level=level + 1)
+ else:
+ return value
+ except TypeError, e:
+ # Class objects are tricky since they may define something like
+ # __iter__ defined but it isn't callable as list().
+ return unicode(value)
+
+
+def dumps(value):
+ return json.dumps(value, default=to_primitive)
+
+
+def loads(s):
+ return json.loads(s)
+
+
+try:
+ import anyjson
+except ImportError:
+ pass
+else:
+ anyjson._modules.append((__name__, 'dumps', TypeError,
+ 'loads', ValueError))
+ anyjson.force_implementation(__name__)
# Peter Strunk , Cisco Systems, Inc.
# Shubhangi Satras , Cisco Systems, Inc.
-import json
import logging
import os.path
import unittest
PluginAwareExtensionManager,
)
from quantum.manager import QuantumManager
+from quantum.openstack.common import jsonutils
from quantum.plugins.cisco.db import api as db
from quantum.plugins.cisco import l2network_plugin
from quantum.plugins.cisco.l2network_plugin import L2Network
""" Test List Portprofile"""
LOG.debug("test_list_portprofile - START")
- req_body1 = json.dumps(self.test_port_profile)
+ req_body1 = jsonutils.dumps(self.test_port_profile)
create_response1 = self.test_app.post(
self.profile_path, req_body1,
content_type=self.contenttype)
- req_body2 = json.dumps({
+ req_body2 = jsonutils.dumps({
'portprofile': {
'portprofile_name': 'cisco_test_portprofile2',
'qos_name': 'test-qos2',
""" Test create Portprofile"""
LOG.debug("test_create_portprofile - START")
- req_body = json.dumps(self.test_port_profile)
+ req_body = jsonutils.dumps(self.test_port_profile)
index_response = self.test_app.post(self.profile_path, req_body,
content_type=self.contenttype)
self.assertEqual(200, index_response.status_int)
""" Test show Portprofile """
LOG.debug("test_show_portprofile - START")
- req_body = json.dumps(self.test_port_profile)
+ req_body = jsonutils.dumps(self.test_port_profile)
index_response = self.test_app.post(self.profile_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(index_response.body,
""" Test update Portprofile"""
LOG.debug("test_update_portprofile - START")
- req_body = json.dumps(self.test_port_profile)
+ req_body = jsonutils.dumps(self.test_port_profile)
index_response = self.test_app.post(
self.profile_path, req_body,
content_type=self.contenttype)
'qos_name': 'test-qos1',
},
}
- rename_req_body = json.dumps(rename_port_profile)
+ rename_req_body = jsonutils.dumps(rename_port_profile)
rename_path_temp = (self.portprofile_path +
resp_body['portprofiles']['portprofile']['id'])
rename_path = str(rename_path_temp)
""" Test update Portprofile Bad Request"""
LOG.debug("test_update_portprofileBADRequest - START")
- req_body = json.dumps(self.test_port_profile)
+ req_body = jsonutils.dumps(self.test_port_profile)
index_response = self.test_app.post(
self.profile_path, req_body,
content_type=self.contenttype)
'qos_name': 'test-qos1',
},
}
- rename_req_body = json.dumps(rename_port_profile)
+ rename_req_body = jsonutils.dumps(rename_port_profile)
update_path_temp = self.portprofile_path + portprofile_id
update_path = str(update_path_temp)
update_response = self.test_app.put(update_path, rename_req_body,
""" Test delete Portprofile"""
LOG.debug("test_delete_portprofile - START")
- req_body = json.dumps(self.test_port_profile)
+ req_body = jsonutils.dumps(self.test_port_profile)
index_response = self.test_app.post(
self.profile_path, req_body,
content_type=self.contenttype)
LOG.debug("test_associate_portprofile - START")
net_id = self._create_network()
port_id = self._create_port(net_id, "ACTIVE")
- req_body = json.dumps(self.test_port_profile)
+ req_body = jsonutils.dumps(self.test_port_profile)
index_response = self.test_app.post(
self.profile_path, req_body,
content_type=self.contenttype)
'port-id': port_id,
},
}
- req_assign_body = json.dumps(test_port_assign_data)
+ req_assign_body = jsonutils.dumps(test_port_assign_data)
associate_path_temp = (
self.portprofile_path +
resp_body['portprofiles']['portprofile']['id'] +
'port-id': '1',
},
}
- req_assign_body = json.dumps(test_port_assign_data)
+ req_assign_body = jsonutils.dumps(test_port_assign_data)
associate_path = (self.portprofile_path +
portprofile_id +
"/associate_portprofile")
net_id = self._create_network()
port_id = self._create_port(net_id, "ACTIVE")
- req_body = json.dumps(self.test_port_profile)
+ req_body = jsonutils.dumps(self.test_port_profile)
index_response = self.test_app.post(
self.profile_path, req_body,
content_type=self.contenttype)
'port-id': port_id,
},
}
- req_assign_body = json.dumps(test_port_assign_data)
+ req_assign_body = jsonutils.dumps(test_port_assign_data)
associate_path_temp = (self.portprofile_path +
resp_body['portprofiles']['portprofile']['id'] +
"/associate_portprofile")
def test_schedule_host(self):
""" Test get host"""
LOG.debug("test_schedule_host - START")
- req_body = json.dumps(self.test_associate_data)
+ req_body = jsonutils.dumps(self.test_associate_data)
host_path = self.novatenants_path + "001/schedule_host"
host_response = self.test_app.put(
host_path, req_body,
def test_associate_port(self):
""" Test get associate port """
LOG.debug("test_associate_port - START")
- req_body = json.dumps(self.test_associate_port_data)
+ req_body = jsonutils.dumps(self.test_associate_port_data)
associate_port_path = self.novatenants_path + "001/associate_port"
associate_port_response = self.test_app.put(
associate_port_path, req_body,
""" Test create qos """
LOG.debug("test_create_qos - START")
- req_body = json.dumps(self.test_qos_data)
+ req_body = jsonutils.dumps(self.test_qos_data)
index_response = self.test_app.post(self.qos_path,
req_body,
content_type=self.contenttype)
""" Test list qoss """
LOG.debug("test_list_qoss - START")
- req_body1 = json.dumps(self.test_qos_data)
+ req_body1 = jsonutils.dumps(self.test_qos_data)
create_resp1 = self.test_app.post(self.qos_path, req_body1,
content_type=self.contenttype)
- req_body2 = json.dumps({
+ req_body2 = jsonutils.dumps({
'qos': {
'qos_name': 'cisco_test_qos2',
'qos_desc': {
""" Test show qos """
LOG.debug("test_show_qos - START")
- req_body = json.dumps(self.test_qos_data)
+ req_body = jsonutils.dumps(self.test_qos_data)
index_response = self.test_app.post(self.qos_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(index_response.body,
""" Test update qos """
LOG.debug("test_update_qos - START")
- req_body = json.dumps(self.test_qos_data)
+ req_body = jsonutils.dumps(self.test_qos_data)
index_response = self.test_app.post(self.qos_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(index_response.body,
self.contenttype)
- rename_req_body = json.dumps({
+ rename_req_body = jsonutils.dumps({
'qos': {
'qos_name': 'cisco_rename_qos',
'qos_desc': {
""" Test update qos does not exist """
LOG.debug("test_update_qosDNE - START")
- rename_req_body = json.dumps({
+ rename_req_body = jsonutils.dumps({
'qos': {
'qos_name': 'cisco_rename_qos',
'qos_desc': {
""" Test update qos bad request """
LOG.debug("test_update_qosBADRequest - START")
- req_body = json.dumps(self.test_qos_data)
+ req_body = jsonutils.dumps(self.test_qos_data)
index_response = self.test_app.post(self.qos_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(index_response.body,
""" Test delte qos """
LOG.debug("test_delete_qos - START")
- req_body = json.dumps({
+ req_body = jsonutils.dumps({
'qos': {
'qos_name': 'cisco_test_qos',
'qos_desc': {
#Create Credential before listing
LOG.debug("test_list_credentials - START")
- req_body1 = json.dumps(self.test_credential_data)
+ req_body1 = jsonutils.dumps(self.test_credential_data)
create_response1 = self.test_app.post(
self.credential_path, req_body1,
content_type=self.contenttype)
- req_body2 = json.dumps({
+ req_body2 = jsonutils.dumps({
'credential': {
'credential_name': 'cred9',
'user_name': 'newUser2',
""" Test create credential """
LOG.debug("test_create_credential - START")
- req_body = json.dumps(self.test_credential_data)
+ req_body = jsonutils.dumps(self.test_credential_data)
index_response = self.test_app.post(
self.credential_path, req_body,
content_type=self.contenttype)
""" Test show credential """
LOG.debug("test_show_credential - START")
- req_body = json.dumps(self.test_credential_data)
+ req_body = jsonutils.dumps(self.test_credential_data)
index_response = self.test_app.post(
self.credential_path, req_body,
content_type=self.contenttype)
""" Test update credential """
LOG.debug("test_update_credential - START")
- req_body = json.dumps(self.test_credential_data)
+ req_body = jsonutils.dumps(self.test_credential_data)
index_response = self.test_app.post(
self.credential_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(
index_response.body, self.contenttype)
- rename_req_body = json.dumps({
+ rename_req_body = jsonutils.dumps({
'credential': {
'credential_name': 'cred3',
'user_name': 'RenamedUser',
""" Test update credential bad request """
LOG.debug("test_update_credBADReq - START")
- req_body = json.dumps(self.test_credential_data)
+ req_body = jsonutils.dumps(self.test_credential_data)
index_response = self.test_app.post(
self.credential_path, req_body,
content_type=self.contenttype)
""" Test update credential does not exist"""
LOG.debug("test_update_credentialDNE - START")
- rename_req_body = json.dumps({
+ rename_req_body = jsonutils.dumps({
'credential': {
'credential_name': 'cred3',
'user_name': 'RenamedUser',
""" Test delete credential """
LOG.debug("test_delete_credential - START")
- req_body = json.dumps(self.test_credential_data)
+ req_body = jsonutils.dumps(self.test_credential_data)
index_response = self.test_app.post(
self.credential_path, req_body,
content_type=self.contenttype)
},
},
}
- req_body = json.dumps(test_multi_port)
+ req_body = jsonutils.dumps(test_multi_port)
index_response = self.test_app.post(self.multiport_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(index_response.body,
# under the License.
import httplib
-import json
import logging
import time
import urllib
import eventlet
from eventlet import timeout
+from quantum.openstack.common import jsonutils
from quantum.plugins.nicira.nicira_nvp_plugin.api_client.common import (
_conn_str,
)
try:
if self.successful():
ret = []
- body = json.loads(self.value.body)
+ body = jsonutils.loads(self.value.body)
for node in body.get('results', []):
for role in node.get('roles', []):
if role.get('role') == 'api_provider':
#
# @author: Brad Hall, Nicira Networks, Inc.
-import json
import logging
from quantum.common import exceptions as exception
+from quantum.openstack.common import jsonutils
from quantum.plugins.nicira.nicira_nvp_plugin import NvpApiClient
"GET",
"/ws.v1/transport-zone?uuid=%s" % c.default_tz_uuid,
controller=c)
- result = json.loads(resp)
+ result = jsonutils.loads(resp)
if int(result["result_count"]) == 0:
msg.append("Unable to find zone \"%s\" for controller \"%s\"" %
(c.default_tz_uuid, c.name))
path = "/ws.v1/lswitch/%s" % net_id
try:
resp_obj = do_single_request("GET", path, controller=controller)
- network = json.loads(resp_obj)
+ network = jsonutils.loads(resp_obj)
except NvpApiClient.ResourceNotFound as e:
raise exception.NetworkNotFound(net_id=net_id)
except NvpApiClient.NvpApiException as e:
uri = "/ws.v1/lswitch"
try:
resp_obj = do_single_request("POST", uri,
- json.dumps(lswitch_obj),
+ jsonutils.dumps(lswitch_obj),
controller=controller)
except NvpApiClient.NvpApiException as e:
raise exception.QuantumException()
- r = json.loads(resp_obj)
+ r = jsonutils.loads(resp_obj)
d = {}
d["net-id"] = r["uuid"]
d["net-name"] = r["display_name"]
if "name" in kwargs:
lswitch_obj["display_name"] = kwargs["name"]
try:
- resp_obj = do_single_request(
- "PUT", uri, json.dumps(lswitch_obj), controller=controller)
+ resp_obj = do_single_request("PUT",
+ uri,
+ jsonutils.dumps(lswitch_obj),
+ controller=controller)
except NvpApiClient.ResourceNotFound as e:
LOG.error("Network not found, Error: %s" % str(e))
raise exception.NetworkNotFound(net_id=network)
except NvpApiClient.NvpApiException as e:
raise exception.QuantumException()
- obj = json.loads(resp_obj)
+ obj = jsonutils.loads(resp_obj)
return obj
raise exception.QuantumException()
if not resp_obj:
return []
- lswitches = json.loads(resp_obj)["results"]
+ lswitches = jsonutils.loads(resp_obj)["results"]
for lswitch in lswitches:
net_id = lswitch["uuid"]
if net_id not in [x["net-id"] for x in networks]:
raise exception.QuantumException()
if not resp_obj:
return []
- lswitches = json.loads(resp_obj)["results"]
+ lswitches = jsonutils.loads(resp_obj)["results"]
nets = [{'net-id': lswitch["uuid"],
'net-name': lswitch["display_name"]}
for lswitch in lswitches]
try:
path = "/ws.v1/lswitch/%s/lport/%s/statistic" % (network_id, port_id)
resp = do_single_request("GET", path, controller=controller)
- stats = json.loads(resp)
+ stats = jsonutils.loads(resp)
except NvpApiClient.ResourceNotFound as e:
LOG.error("Port not found, Error: %s" % str(e))
raise exception.PortNotFound(port_id=port_id, net_id=network_id)
raise exception.NetworkNotFound(net_id=network)
except NvpApiClient.NvpApiException as e:
raise exception.QuantumException()
- return json.loads(resp_obj)["results"]
+ return jsonutils.loads(resp_obj)["results"]
def delete_port(controller, network, port):
res = do_single_request("GET",
"/ws.v1/lswitch/%s/lport?fields=uuid" % ls_uuid,
controller=controller)
- res = json.loads(res)
+ res = jsonutils.loads(res)
for r in res["results"]:
do_single_request(
"DELETE",
uri += "relations=%s" % relations
try:
resp_obj = do_single_request("GET", uri, controller=controller)
- port = json.loads(resp_obj)
+ port = jsonutils.loads(resp_obj)
except NvpApiClient.ResourceNotFound as e:
LOG.error("Port or Network not found, Error: %s" % str(e))
raise exception.PortNotFound(port_id=port, net_id=network)
lport_obj["type"] = type
try:
- resp_obj = do_single_request("PUT", uri,
- json.dumps(lport_obj), controller=controller)
+ resp_obj = do_single_request("PUT",
+ uri,
+ jsonutils.dumps(lport_obj),
+ controller=controller)
except NvpApiClient.ResourceNotFound as e:
LOG.error("Port or Network not found, Error: %s" % str(e))
raise exception.PortNotFound(port_id=port, net_id=network)
except NvpApiClient.NvpApiException as e:
raise exception.QuantumException()
- result = json.dumps(resp_obj)
+ result = jsonutils.dumps(resp_obj)
return result
uri = "/ws.v1/lswitch/" + network + "/lport/" + port + "/attachment"
lport_obj = {"type": "NoAttachment"}
try:
- resp_obj = do_single_request(
- "PUT", uri, json.dumps(lport_obj), controller=controller)
+ resp_obj = do_single_request("PUT",
+ uri,
+ jsonutils.dumps(lport_obj),
+ controller=controller)
except NvpApiClient.ResourceNotFound as e:
LOG.error("Port or Network not found, Error: %s" % str(e))
raise exception.PortNotFound(port_id=port, net_id=network)
except NvpApiClient.NvpApiException as e:
raise exception.QuantumException()
- return json.loads(resp_obj)
+ return jsonutils.loads(resp_obj)
def update_port(network, port_id, **params):
uri = "/ws.v1/lswitch/" + network + "/lport/" + port_id
try:
- resp_obj = do_single_request(
- "PUT", uri, json.dumps(lport_obj), controller=controller)
+ resp_obj = do_single_request("PUT",
+ uri,
+ jsonutils.dumps(lport_obj),
+ controller=controller)
except NvpApiClient.ResourceNotFound as e:
LOG.error("Port or Network not found, Error: %s" % str(e))
raise exception.PortNotFound(port_id=port_id, net_id=network)
except NvpApiClient.NvpApiException as e:
raise exception.QuantumException()
- obj = json.loads(resp_obj)
+ obj = jsonutils.loads(resp_obj)
obj["port-op-status"] = get_port_status(controller, network, obj["uuid"])
return obj
path = "/ws.v1/lswitch/" + ls_uuid + "/lport"
try:
- resp_obj = do_single_request(
- "POST", path, json.dumps(lport_obj), controller=controller)
+ resp_obj = do_single_request("POST",
+ path,
+ jsonutils.dumps(lport_obj),
+ controller=controller)
except NvpApiClient.ResourceNotFound as e:
LOG.error("Network not found, Error: %s" % str(e))
raise exception.NetworkNotFound(net_id=network)
except NvpApiClient.NvpApiException as e:
raise exception.QuantumException()
- result = json.loads(resp_obj)
+ result = jsonutils.loads(resp_obj)
result['port-op-status'] = get_port_status(controller, ls_uuid,
result['uuid'])
return result
"GET",
"/ws.v1/lswitch/%s/lport/%s/status" % (lswitch_id, port_id),
controller=controller)
- r = json.loads(r)
+ r = jsonutils.loads(r)
except NvpApiClient.ResourceNotFound as e:
LOG.error("Port not found, Error: %s" % str(e))
raise exception.PortNotFound(port_id=port_id, net_id=lswitch_id)
# @author: Somik Behera, Nicira Networks, Inc.
# @author: Brad Hall, Nicira Networks, Inc.
-import json
import logging
import os
import unittest
from quantum.common import exceptions as exception
+from quantum.openstack.common import jsonutils
from quantum.plugins.nicira.nicira_nvp_plugin.QuantumPlugin import NvpPlugin
from quantum.plugins.nicira.nicira_nvp_plugin import (
NvpApiClient,
"tags": [{"tag": "plugin-test"}]}
try:
resp_obj = self.quantum.api_client.request("POST",
- post_uri, json.dumps(body))
+ post_uri, jsonutils.dumps(body))
except NvpApiClient.NvpApiException as e:
print("Unknown API Error: %s" % str(e))
raise exception.QuantumException()
- return json.loads(resp_obj)["uuid"]
+ return jsonutils.loads(resp_obj)["uuid"]
def _delete_tz(self, uuid):
post_uri = "/ws.v1/transport-zone/%s" % uuid
#
# @author: Somik Behera, Nicira Networks, Inc.
-import json
import logging
import os
import unittest
from quantum.common import exceptions as exception
+from quantum.openstack.common import jsonutils
from quantum.plugins.nicira.nicira_nvp_plugin.QuantumPlugin import NvpPlugin
from quantum.plugins.nicira.nicira_nvp_plugin import (
NvpApiClient,
post_uri = "/ws.v1/transport-zone"
body = {"display_name": name, "tags": [{"tag": "plugin-test"}]}
try:
- resp_obj = self.quantum.api_client.request(
- "POST", post_uri, json.dumps(body))
+ resp_obj = self.quantum.api_client.request("POST",
+ post_uri,
+ jsonutils.dumps(body))
except NvpApiClient.NvpApiException as e:
LOG.error("Unknown API Error: %s" % str(e))
raise exception.QuantumException()
- return json.loads(resp_obj)["uuid"]
+ return jsonutils.loads(resp_obj)["uuid"]
def _delete_tz(self, uuid):
post_uri = "/ws.v1/transport-zone/%s" % uuid
# under the License.
from abc import abstractmethod
-import json
from quantum.extensions import extensions
+from quantum.openstack.common import jsonutils
from quantum import wsgi
def _goose_handler(req, res):
#NOTE: This only handles JSON responses.
# You can use content type header to test for XML.
- data = json.loads(res.body)
+ data = jsonutils.loads(res.body)
data['FOXNSOX:googoose'] = req.GET.get('chewing')
- res.body = json.dumps(data)
+ res.body = jsonutils.dumps(data)
return res
req_ext1 = extensions.RequestExtension('GET', '/dummy_resources/:(id)',
def _bands_handler(req, res):
#NOTE: This only handles JSON responses.
# You can use content type header to test for XML.
- data = json.loads(res.body)
+ data = jsonutils.loads(res.body)
data['FOXNSOX:big_bands'] = 'Pig Bands!'
- res.body = json.dumps(data)
+ res.body = jsonutils.dumps(data)
return res
req_ext2 = extensions.RequestExtension('GET', '/dummy_resources/:(id)',
# under the License.
# @author: Salvatore Orlando, Citrix Systems
-import json
import unittest
from lxml import etree
import quantum.api.ports as ports
import quantum.api.versions as versions
from quantum.common.test_lib import test_config
+from quantum.openstack.common import jsonutils
import quantum.tests.unit._test_api as test_api
import quantum.tests.unit.testlib_api as testlib
def test_root_responds_with_versions_json(self):
body = self._test_root_responds_with_versions('application/json')
- data = json.loads(body)
+ data = jsonutils.loads(body)
self.assertEquals('versions', data.keys()[0])
def test_root_responds_with_versions_xml(self):
# License for the specific language governing permissions and limitations
# under the License.
-import json
import unittest
import routes
ExtensionMiddleware,
PluginAwareExtensionManager,
)
+from quantum.openstack.common import jsonutils
from quantum.plugins.sample.SamplePlugin import QuantumEchoPlugin
from quantum.tests.unit import BaseTest
from quantum.tests.unit.extension_stubs import (
response = test_app.get("/tweedles/some_id/custom_member_action")
self.assertEqual(200, response.status_int)
- self.assertEqual(json.loads(response.body)['member_action'], "value")
+ self.assertEqual(jsonutils.loads(response.body)['member_action'],
+ "value")
def test_resource_extension_for_get_custom_collection_action(self):
controller = self.ResourceExtensionController()
response = test_app.get("/tweedles/custom_collection_action")
self.assertEqual(200, response.status_int)
- self.assertEqual(json.loads(response.body)['collection'], "value")
+ self.assertEqual(jsonutils.loads(response.body)['collection'], "value")
def test_resource_extension_for_put_custom_collection_action(self):
controller = self.ResourceExtensionController()
response = test_app.put("/tweedles/custom_collection_action")
self.assertEqual(200, response.status_int)
- self.assertEqual(json.loads(response.body)['collection'], 'value')
+ self.assertEqual(jsonutils.loads(response.body)['collection'], 'value')
def test_resource_extension_for_post_custom_collection_action(self):
controller = self.ResourceExtensionController()
response = test_app.post("/tweedles/custom_collection_action")
self.assertEqual(200, response.status_int)
- self.assertEqual(json.loads(response.body)['collection'], 'value')
+ self.assertEqual(jsonutils.loads(response.body)['collection'], 'value')
def test_resource_extension_for_delete_custom_collection_action(self):
controller = self.ResourceExtensionController()
response = test_app.delete("/tweedles/custom_collection_action")
self.assertEqual(200, response.status_int)
- self.assertEqual(json.loads(response.body)['collection'], 'value')
+ self.assertEqual(jsonutils.loads(response.body)['collection'], 'value')
def test_resource_ext_for_formatted_req_on_custom_collection_action(self):
controller = self.ResourceExtensionController()
response = test_app.get("/tweedles/custom_collection_action.json")
self.assertEqual(200, response.status_int)
- self.assertEqual(json.loads(response.body)['collection'], "value")
+ self.assertEqual(jsonutils.loads(response.body)['collection'], "value")
def test_resource_ext_for_nested_resource_custom_collection_action(self):
controller = self.ResourceExtensionController()
"/tweedles/custom_collection_action")
self.assertEqual(200, response.status_int)
- self.assertEqual(json.loads(response.body)['collection'], "value")
+ self.assertEqual(jsonutils.loads(response.body)['collection'], "value")
def test_returns_404_for_non_existant_extension(self):
test_app = setup_extensions_test_app(SimpleExtensionManager(None))
def test_extended_action_for_adding_extra_data(self):
action_name = 'FOXNSOX:add_tweedle'
action_params = dict(name='Beetle')
- req_body = json.dumps({action_name: action_params})
+ req_body = jsonutils.dumps({action_name: action_params})
response = self.extension_app.post('/dummy_resources/1/action',
req_body,
content_type='application/json')
def test_extended_action_for_deleting_extra_data(self):
action_name = 'FOXNSOX:delete_tweedle'
action_params = dict(name='Bailey')
- req_body = json.dumps({action_name: action_params})
+ req_body = jsonutils.dumps({action_name: action_params})
response = self.extension_app.post("/dummy_resources/1/action",
req_body,
content_type='application/json')
def test_returns_404_for_non_existant_action(self):
non_existant_action = 'blah_action'
action_params = dict(name="test")
- req_body = json.dumps({non_existant_action: action_params})
+ req_body = jsonutils.dumps({non_existant_action: action_params})
response = self.extension_app.post("/dummy_resources/1/action",
req_body,
def test_returns_404_for_non_existant_resource(self):
action_name = 'add_tweedle'
action_params = dict(name='Beetle')
- req_body = json.dumps({action_name: action_params})
+ req_body = jsonutils.dumps({action_name: action_params})
response = self.extension_app.post("/asdf/1/action", req_body,
content_type='application/json',
def test_extend_get_resource_response(self):
def extend_response_data(req, res):
- data = json.loads(res.body)
+ data = jsonutils.loads(res.body)
data['FOXNSOX:extended_key'] = req.GET.get('extended_key')
- res.body = json.dumps(data)
+ res.body = jsonutils.dumps(data)
return res
app = self._setup_app_with_request_handler(extend_response_data, 'GET')
response = app.get("/dummy_resources/1?extended_key=extended_data")
self.assertEqual(200, response.status_int)
- response_data = json.loads(response.body)
+ response_data = jsonutils.loads(response.body)
self.assertEqual('extended_data',
response_data['FOXNSOX:extended_key'])
self.assertEqual('knox', response_data['fort'])
response = app.get("/dummy_resources/1?chewing=newblue")
- response_data = json.loads(response.body)
+ response_data = jsonutils.loads(response.body)
self.assertEqual('newblue', response_data['FOXNSOX:googoose'])
self.assertEqual("Pig Bands!", response_data['FOXNSOX:big_bands'])
def test_edit_previously_uneditable_field(self):
def _update_handler(req, res):
- data = json.loads(res.body)
+ data = jsonutils.loads(res.body)
data['uneditable'] = req.params['uneditable']
- res.body = json.dumps(data)
+ res.body = jsonutils.dumps(data)
return res
base_app = TestApp(setup_base_app())
import webob.exc
from quantum.common import exceptions as exception
-from quantum.common import utils
+from quantum.openstack.common import jsonutils
LOG = logging.getLogger(__name__)
"""Default JSON request body serialization"""
def default(self, data):
- return utils.dumps(data)
+ return jsonutils.dumps(data)
class XMLDictSerializer(DictSerializer):
def _from_json(self, datastring):
try:
- return utils.loads(datastring)
+ return jsonutils.loads(datastring)
except ValueError:
msg = _("cannot understand JSON")
raise exception.MalformedRequestBody(reason=msg)
raise exception.InvalidContentType(content_type=content_type)
def _from_json(self, datastring):
- return utils.loads(datastring)
+ return jsonutils.loads(datastring)
def _from_xml(self, datastring):
xmldata = self.metadata.get('application/xml', {})
return result
def _to_json(self, data):
- return utils.dumps(data)
+ return jsonutils.dumps(data)
def _to_xml(self, data):
metadata = self.metadata.get('application/xml', {})