--------------------------
- [N320] Validate that LOG messages, except debug ones, have translations
+- [N321] Validate that jsonutils module is used instead of json
Creating Unit Tests
-------------------
yield (0, msg)
+def use_jsonutils(logical_line, filename):
+ msg = "N321: jsonutils.%(fun)s must be used instead of json.%(fun)s"
+
+ # Some files in the tree are not meant to be run from inside Neutron
+ # itself, so we should not complain about them not using jsonutils
+ json_check_skipped_patterns = [
+ "neutron/plugins/openvswitch/agent/xenapi/etc/xapi.d/plugins/netwrap",
+ ]
+
+ for pattern in json_check_skipped_patterns:
+ if pattern in filename:
+ return
+
+ if "json." in logical_line:
+ json_funcs = ['dumps(', 'dump(', 'loads(', 'load(']
+ for f in json_funcs:
+ pos = logical_line.find('json.%s' % f)
+ if pos != -1:
+ yield (pos, msg % {'fun': f[:-1]})
+
+
def factory(register):
register(validate_log_translations)
+ register(use_jsonutils)
from neutron.common import exceptions
from neutron.common import utils
from neutron.openstack.common import excutils
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
from neutron.plugins.bigswitch.db import consistency_db as cdb
def get_capabilities(self):
try:
body = self.rest_call('GET', CAPABILITIES_PATH)[2]
- self.capabilities = json.loads(body)
+ self.capabilities = jsonutils.loads(body)
except Exception:
LOG.exception(_("Couldn't retrieve capabilities. "
"Newer API calls won't be supported."))
def rest_call(self, action, resource, data='', headers={}, timeout=False,
reconnect=False, hash_handler=None):
uri = self.base_uri + resource
- body = json.dumps(data)
+ body = jsonutils.dumps(data)
if not headers:
headers = {}
headers['Content-type'] = 'application/json'
if hash_value is not None:
hash_handler.put_hash(hash_value)
try:
- respdata = json.loads(respstr)
+ respdata = jsonutils.loads(respstr)
except ValueError:
# response was not JSON, ignore the exception
pass
from six import moves
from wsgiref import simple_server
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
class TestNetworkCtrl(object):
request_data = environ.get('wsgi.input').read(content_len)
if request_data:
try:
- request_data = json.loads(request_data)
+ request_data = jsonutils.loads(request_data)
except Exception:
# OK for it not to be json! Ignore it
pass
print('%s %s' % (method, uri))
if request_data:
print('%s' %
- json.dumps(request_data, sort_keys=True, indent=4))
+ jsonutils.dumps(
+ request_data, sort_keys=True, indent=4))
status, body = self.request_handler(method, uri, None)
body_data = None
if body:
try:
- body_data = json.loads(body)
+ body_data = jsonutils.loads(body)
except Exception:
# OK for it not to be json! Ignore it
pass
if self.debug:
if self.debug_env:
print('%s: %s' % ('Response',
- json.dumps(body_data, sort_keys=True, indent=4)))
+ jsonutils.dumps(
+ body_data, sort_keys=True, indent=4)))
return body
return simple_server.make_server(self.host, self.port, app)
#
# @author: Hareesh Puthalath, Cisco Systems, Inc.
-import json
import logging
+from neutron.openstack.common import jsonutils
from neutron.plugins.cisco.cfg_agent.device_drivers import devicedriver_api
LOG = logging.getLogger(__name__)
# Datetime values causes json decoding errors. So removing it locally
if my_device_params.get('created_at'):
del my_device_params['created_at']
- LOG.debug(json.dumps(my_device_params, sort_keys=True, indent=4))
+ LOG.debug(jsonutils.dumps(my_device_params, sort_keys=True, indent=4))
###### Public Functions ########
def router_added(self, ri):
def internal_network_added(self, ri, port):
LOG.debug("DummyDriver internal_network_added() called.")
- LOG.debug("Int port data: " + json.dumps(port, sort_keys=True,
+ LOG.debug("Int port data: " + jsonutils.dumps(port, sort_keys=True,
indent=4))
def internal_network_removed(self, ri, port):
def external_gateway_added(self, ri, ex_gw_port):
LOG.debug("DummyDriver external_gateway_added() called.")
- LOG.debug("Ext port data: " + json.dumps(ex_gw_port, sort_keys=True,
- indent=4))
+ LOG.debug("Ext port data: " + jsonutils.dumps(ex_gw_port,
+ sort_keys=True,
+ indent=4))
def external_gateway_removed(self, ri, ex_gw_port):
LOG.debug("DummyDriver external_gateway_removed() called.")
import requests
import requests.exceptions
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
from neutron.plugins.ml2.drivers.cisco.apic import exceptions as cexc
@staticmethod
def _make_data(key, **attrs):
"""Build the body for a msg out of a key and some attributes."""
- return json.dumps({key: {'attributes': attrs}})
+ return jsonutils.dumps({key: {'attributes': attrs}})
def _api_url(self, api):
"""Create the URL for a generic API."""
import requests
from neutron.openstack.common import excutils
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
from neutron.plugins.nec.common import config
from neutron.plugins.nec.common import exceptions as nexc
{'host': self.host, 'port': self.port,
'method': method, 'action': action, 'body': body})
if type(body) is dict:
- body = json.dumps(body)
+ body = jsonutils.dumps(body)
try:
res = self._get_response(method, action, body)
data = res.text
# Try to decode JSON data if possible.
try:
- data = json.loads(data)
+ data = jsonutils.loads(data)
except (ValueError, TypeError):
pass
"""Intermidiate NVSD Library."""
from neutron.openstack.common import excutils
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
import neutron.plugins.oneconvergence.lib.exception as nvsdexception
from neutron.plugins.oneconvergence.lib import plugin_helper
uri = NETWORKS_URI % tenant_id
- response = self.send_request("POST", uri, body=json.dumps(network_obj),
+ response = self.send_request("POST", uri,
+ body=jsonutils.dumps(network_obj),
resource='network', tenant_id=tenant_id)
nvsd_net = response.json()
uri = NETWORK_URI % (tenant_id, network_id)
self.send_request("PUT", uri,
- body=json.dumps(network_update),
+ body=jsonutils.dumps(network_update),
resource='network', tenant_id=tenant_id,
resource_id=network_id)
uri = SUBNETS_URI % (tenant_id, network_id)
- self.send_request("POST", uri, body=json.dumps(subnet),
+ self.send_request("POST", uri, body=jsonutils.dumps(subnet),
resource='subnet', tenant_id=tenant_id)
LOG.debug(_("Subnet %(id)s created under tenant %(tenant_id)s"),
uri = SUBNET_URI % (tenant_id, network_id, subnet_id)
self.send_request("PUT", uri,
- body=json.dumps(subnet_update),
+ body=jsonutils.dumps(subnet_update),
resource='subnet', tenant_id=tenant_id,
resource_id=subnet_id)
path = PORTS_URI % (tenant_id, network_id)
- self.send_request("POST", path, body=json.dumps(lport),
+ self.send_request("POST", path, body=jsonutils.dumps(lport),
resource='port', tenant_id=tenant_id)
LOG.debug(_("Port %(id)s created under tenant %(tenant_id)s"),
uri = PORT_URI % (tenant_id, network_id, port_id)
- self.send_request("PUT", uri, body=json.dumps(lport),
+ self.send_request("PUT", uri, body=jsonutils.dumps(lport),
resource='port', tenant_id=tenant_id,
resource_id=port_id)
uri = FLOATING_IPS_URI % tenant_id
- self.send_request("POST", uri, body=json.dumps(floating_ip),
+ self.send_request("POST", uri, body=jsonutils.dumps(floating_ip),
resource='floating_ip',
tenant_id=tenant_id)
uri = FLOATING_IP_URI % (tenant_id, floating_ip_id)
self.send_request("PUT", uri,
- body=json.dumps(floating_ip_update['floatingip']),
+ body=jsonutils.dumps(
+ floating_ip_update['floatingip']),
resource='floating_ip',
tenant_id=tenant_id,
resource_id=floating_ip_id)
uri = ROUTERS_URI % tenant_id
- self.send_request("POST", uri, body=json.dumps(router),
+ self.send_request("POST", uri, body=jsonutils.dumps(router),
resource='router',
tenant_id=tenant_id)
uri = ROUTER_URI % (tenant_id, router_id)
self.send_request("PUT", uri,
- body=json.dumps(router),
+ body=jsonutils.dumps(router),
resource='router', tenant_id=tenant_id,
resource_id=router_id)
import requests
from six.moves.urllib import parse
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
import neutron.plugins.oneconvergence.lib.exception as exception
login_url = parse.urljoin(self.api_url,
"/pluginhandler/ocplugin/authmgmt/login")
- data = json.dumps({"user_name": self._user, "passwd": self._password})
+ data = jsonutils.dumps({"user_name": self._user,
+ "passwd": self._password})
attempts = 0
LOG.debug(_("Login Successful %(uri)s "
"%(status)s"), {'uri': self.api_url,
'status': response.status_code})
- self.auth_token = json.loads(response.content)["session_uuid"]
+ self.auth_token = jsonutils.loads(response.content)["session_uuid"]
LOG.debug(_("AuthToken = %s"), self.auth_token)
else:
LOG.error(_("login failed"))
import httplib
import urllib
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
from neutron.plugins.vmware.api_client import request
try:
if self.successful():
ret = []
- body = json.loads(self.value.body)
+ body = jsonutils.loads(self.value.body)
for node in body.get('results', []):
for role in node.get('roles', []):
if role.get('role') == 'api_provider':
# under the License.
from neutron.common import exceptions as exception
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.openstack.common import log
from neutron.plugins.vmware.api_client import exception as api_exc
from neutron.plugins.vmware.common import exceptions as nsx_exc
try:
res = cluster.api_client.request(*args)
if res:
- return json.loads(res)
+ return jsonutils.loads(res)
except api_exc.ResourceNotFound:
raise exception.NotFound()
except api_exc.ReadOnlyMode:
:param kwargs: the key/value pirs to be dumped into a json string.
:returns: a json string.
"""
- return json.dumps(kwargs, ensure_ascii=False)
+ return jsonutils.dumps(kwargs, ensure_ascii=False)
# under the License.
#
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.openstack.common import log
from neutron.plugins.vmware.api_client import exception as api_exc
from neutron.plugins.vmware.common import exceptions as nsx_exc
}
return nsxlib.do_request(
HTTP_POST, nsxlib._build_uri_path(GWSERVICE_RESOURCE),
- json.dumps(gwservice_obj), cluster=cluster)
+ jsonutils.dumps(gwservice_obj), cluster=cluster)
def plug_l2_gw_service(cluster, lswitch_id, lport_id,
return nsxlib.do_request(HTTP_PUT,
nsxlib._build_uri_path(GWSERVICE_RESOURCE,
resource_id=gateway_id),
- json.dumps(gwservice_obj), cluster=cluster)
+ jsonutils.dumps(gwservice_obj), cluster=cluster)
def delete_l2_gw_service(cluster, gateway_id):
try:
return nsxlib.do_request(
HTTP_POST, nsxlib._build_uri_path(TRANSPORTNODE_RESOURCE),
- json.dumps(body), cluster=cluster)
+ jsonutils.dumps(body), cluster=cluster)
except api_exc.InvalidSecurityCertificate:
raise nsx_exc.InvalidSecurityCertificate()
HTTP_PUT,
nsxlib._build_uri_path(TRANSPORTNODE_RESOURCE,
resource_id=gateway_id),
- json.dumps(body), cluster=cluster)
+ jsonutils.dumps(body), cluster=cluster)
except api_exc.InvalidSecurityCertificate:
raise nsx_exc.InvalidSecurityCertificate()
# under the License.
from neutron.common import exceptions as exception
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.openstack.common import log
from neutron.plugins.vmware.api_client import exception as api_exc
from neutron.plugins.vmware.common import exceptions as nsx_exc
}
return nsxlib.do_request(HTTP_POST,
nsxlib._build_uri_path(LSERVICESNODE_RESOURCE),
- json.dumps(lsn_obj),
+ jsonutils.dumps(lsn_obj),
cluster=cluster)["uuid"]
parent_resource_id=lsn_id,
resource_id=lsn_port_id,
extra_action=conf),
- json.dumps(hosts_obj),
+ jsonutils.dumps(hosts_obj),
cluster=cluster)
return nsxlib.do_request(HTTP_POST,
nsxlib._build_uri_path(LSERVICESNODEPORT_RESOURCE,
parent_resource_id=lsn_id),
- json.dumps(port_obj),
+ jsonutils.dumps(port_obj),
cluster=cluster)["uuid"]
parent_resource_id=lsn_id,
resource_id=lsn_port_id,
is_attachment=True),
- json.dumps(patch_obj),
+ jsonutils.dumps(patch_obj),
cluster=cluster)
except api_exc.Conflict:
# This restriction might be lifted at some point
nsxlib._build_uri_path(LSERVICESNODE_RESOURCE,
resource_id=lsn_id,
extra_action=action),
- json.dumps(lsn_obj),
+ jsonutils.dumps(lsn_obj),
cluster=cluster)
nsxlib._build_uri_path(LSERVICESNODE_RESOURCE,
resource_id=lsn_id,
extra_action=action),
- json.dumps({"enabled": is_enabled}),
+ jsonutils.dumps({"enabled": is_enabled}),
cluster=cluster)
nsxlib.do_request(HTTP_PUT,
nsxlib._build_uri_path(LSERVICESNODEPORT_RESOURCE,
parent_resource_id=lsn_id,
resource_id=lsn_port_id,
extra_action=action),
- json.dumps(obj),
+ jsonutils.dumps(obj),
cluster=cluster)
resource_id=lsn_port_id,
extra_action=extra_action,
filters={"action": action}),
- json.dumps(host_obj),
+ jsonutils.dumps(host_obj),
cluster=cluster)
from neutron.common import constants
from neutron.common import exceptions
from neutron.openstack.common import excutils
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.openstack.common import log
from neutron.plugins.vmware.common import utils
from neutron.plugins.vmware import nsxlib
:param kwargs: the key/value pirs to be dumped into a json string.
:returns: a json string.
"""
- return json.dumps(kwargs, ensure_ascii=False)
+ return jsonutils.dumps(kwargs, ensure_ascii=False)
def query_security_profiles(cluster, fields=None, filters=None):
return nsxlib.do_request(
HTTP_PUT,
nsxlib._build_uri_path(SECPROF_RESOURCE, resource_id=spid),
- json.dumps({"display_name": utils.check_and_truncate(name)}),
+ jsonutils.dumps({"display_name": utils.check_and_truncate(name)}),
cluster=cluster)
from neutron.common import constants
from neutron.common import exceptions as exception
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.openstack.common import log
from neutron.plugins.vmware.api_client import exception as api_exc
from neutron.plugins.vmware.common import exceptions as nsx_exc
if "tags" in kwargs:
lswitch_obj["tags"].extend(kwargs["tags"])
uri = nsxlib._build_uri_path(LSWITCH_RESOURCE)
- lswitch = nsxlib.do_request(HTTP_POST, uri, json.dumps(lswitch_obj),
+ lswitch = nsxlib.do_request(HTTP_POST, uri, jsonutils.dumps(lswitch_obj),
cluster=cluster)
LOG.debug(_("Created logical switch: %s"), lswitch['uuid'])
return lswitch
if tags:
lswitch_obj['tags'] = tags
try:
- return nsxlib.do_request(HTTP_PUT, uri, json.dumps(lswitch_obj),
+ return nsxlib.do_request(HTTP_PUT, uri, jsonutils.dumps(lswitch_obj),
cluster=cluster)
except exception.NotFound as e:
LOG.error(_("Network not found, Error: %s"), str(e))
path = "/ws.v1/lswitch/" + lswitch_uuid + "/lport/" + lport_uuid
try:
- result = nsxlib.do_request(HTTP_PUT, path, json.dumps(lport_obj),
+ result = nsxlib.do_request(HTTP_PUT, path, jsonutils.dumps(lport_obj),
cluster=cluster)
LOG.debug(_("Updated logical port %(result)s "
"on logical switch %(uuid)s"),
path = nsxlib._build_uri_path(LSWITCHPORT_RESOURCE,
parent_resource_id=lswitch_uuid)
- result = nsxlib.do_request(HTTP_POST, path, json.dumps(lport_obj),
+ result = nsxlib.do_request(HTTP_POST, path, jsonutils.dumps(lport_obj),
cluster=cluster)
LOG.debug(_("Created logical port %(result)s on logical switch %(uuid)s"),
nsxlib._build_uri_path(LSWITCHPORT_RESOURCE,
lport_id, lswitch_id,
is_attachment=True),
- json.dumps(att_obj),
+ jsonutils.dumps(att_obj),
cluster=cluster)
import httplib2
from oslo.config import cfg
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
from neutron.services.firewall.agents.varmour import varmour_utils as va_utils
def rest_api(self, method, url, body=None, headers=None):
url = REST_URL_PREFIX + url
if body:
- body_data = json.dumps(body)
+ body_data = jsonutils.dumps(body)
else:
body_data = ''
if not headers:
if resp.status == 200:
return {'status': resp.status,
'reason': resp.reason,
- 'body': json.loads(resp_str)}
+ 'body': jsonutils.loads(resp_str)}
except Exception:
LOG.error(_('vArmourRestAPI: Could not establish HTTP connection'))
from neutron.db.loadbalancer import loadbalancer_db as lb_db
from neutron.extensions import loadbalancer
from neutron.openstack.common import excutils
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants
from neutron.services.loadbalancer.drivers import abstract_driver
if binary:
body = data
else:
- body = json.dumps(data)
+ body = jsonutils.dumps(data)
debug_data = 'binary' if binary else body
debug_data = debug_data if debug_data else 'EMPTY'
respstr = response.read()
respdata = respstr
try:
- respdata = json.loads(respstr)
+ respdata = jsonutils.loads(respstr)
except ValueError:
# response was not JSON, ignore the exception
pass
# @author: Kevin Benton, <kevin.benton@bigswitch.com>
#
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
from neutron.plugins.bigswitch import servermanager
def request(self, action, uri, body, headers):
# Only handle network update requests
if 'network' in uri and 'tenant' in uri and 'ports' not in uri:
- req = json.loads(body)
+ req = jsonutils.loads(body)
if 'network' not in req or 'floatingips' not in req['network']:
msg = _("No floating IPs in request"
"uri=%(uri)s, body=%(body)s") % {'uri': uri,
from oslo.config import cfg
import requests
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.plugins.nec.common import config
from neutron.plugins.nec.common import exceptions as nexc
from neutron.plugins.nec.common import ofc_client
headers=headers)
def test_do_request_200_json_value(self):
- self._test_do_request(200, json.dumps([1, 2, 3]), [1, 2, 3])
+ self._test_do_request(200, jsonutils.dumps([1, 2, 3]), [1, 2, 3])
def test_do_request_200_string(self):
self._test_do_request(200, 'abcdef', 'abcdef')
def test_do_request_with_path_prefix(self):
config.CONF.set_override('path_prefix', '/dummy', group='OFC')
- self._test_do_request(200, json.dumps([1, 2, 3]), [1, 2, 3],
+ self._test_do_request(200, jsonutils.dumps([1, 2, 3]), [1, 2, 3],
path_prefix='/dummy')
def test_do_request_returns_404(self):
exc_checks)
def test_do_request_error_json_body(self):
- resbody = json.dumps({'err_code': 40022,
+ resbody = jsonutils.dumps({'err_code': 40022,
'err_msg': 'This is an error.'})
errmsg = _("An OFC exception has occurred: Operation on OFC failed")
exc_checks = {'status': 400, 'err_code': 40022,
import mock
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.plugins.oneconvergence.lib import nvsdlib
from neutron.tests import base
return_value=resp) as send_request:
uri = NETWORKS_URI % TEST_TENANT
net = self.nvsdlib.create_network(network_obj)
- send_request.assert_called_once_with("POST", uri,
- body=json.dumps(network_obj),
- resource='network',
- tenant_id=TEST_TENANT)
+ send_request.assert_called_once_with(
+ "POST", uri,
+ body=jsonutils.dumps(network_obj),
+ resource='network',
+ tenant_id=TEST_TENANT)
self.assertEqual(net, {'id': 'uuid'})
def test_update_network(self):
with mock.patch.object(self.nvsdlib, 'send_request') as send_request:
self.nvsdlib.update_network(network, update_network)
send_request.assert_called_once_with(
- "PUT", uri, body=json.dumps(update_network),
+ "PUT", uri, body=jsonutils.dumps(update_network),
resource='network', tenant_id=TEST_TENANT,
resource_id=TEST_NET)
"admin_state_up": True,
"network_id": TEST_NET,
"status": 'ACTIVE'}
- send_request.assert_called_once_with("POST", path,
- body=json.dumps(expected),
- resource='port',
- tenant_id=TEST_TENANT)
+ send_request.assert_called_once_with(
+ "POST", path,
+ body=jsonutils.dumps(expected),
+ resource='port',
+ tenant_id=TEST_TENANT)
def test_update_port(self):
port = {'id': TEST_PORT,
with mock.patch.object(self.nvsdlib, 'send_request') as send_request:
self.nvsdlib.update_port(TEST_TENANT, port, port_update)
- send_request.assert_called_once_with("PUT", uri,
- body=json.dumps(port_update),
- resource='port',
- resource_id='test-port',
- tenant_id=TEST_TENANT)
+ send_request.assert_called_once_with(
+ "PUT", uri,
+ body=jsonutils.dumps(port_update),
+ resource='port',
+ resource_id='test-port',
+ tenant_id=TEST_TENANT)
def test_delete_port(self):
port = {'network_id': TEST_NET,
with mock.patch.object(self.nvsdlib, 'send_request') as send_request:
self.nvsdlib.create_subnet(subnet)
send_request.assert_called_once_with("POST", uri,
- body=json.dumps(subnet),
+ body=jsonutils.dumps(subnet),
resource='subnet',
tenant_id=TEST_TENANT)
with mock.patch.object(self.nvsdlib, 'send_request') as send_request:
self.nvsdlib.update_subnet(subnet, subnet_update)
send_request.assert_called_once_with(
- "PUT", uri, body=json.dumps(subnet_update), resource='subnet',
+ "PUT", uri,
+ body=jsonutils.dumps(subnet_update), resource='subnet',
tenant_id=TEST_TENANT, resource_id=TEST_SUBNET)
def test_delete_subnet(self):
with mock.patch.object(self.nvsdlib, 'send_request') as send_request:
self.nvsdlib.create_floatingip(floatingip)
- send_request.assert_called_once_with("POST", uri,
- body=json.dumps(floatingip),
- resource='floating_ip',
- tenant_id=TEST_TENANT)
+ send_request.assert_called_once_with(
+ "POST", uri,
+ body=jsonutils.dumps(floatingip),
+ resource='floating_ip',
+ tenant_id=TEST_TENANT)
def test_update_floatingip(self):
floatingip = {'id': TEST_FIP,
with mock.patch.object(self.nvsdlib, 'send_request') as send_request:
self.nvsdlib.update_floatingip(floatingip, floatingip_update)
send_request.assert_called_once_with(
- "PUT", uri, body=json.dumps(floatingip_update['floatingip']),
+ "PUT", uri,
+ body=jsonutils.dumps(floatingip_update['floatingip']),
resource='floating_ip', tenant_id=TEST_TENANT,
resource_id=TEST_FIP)
with mock.patch.object(self.nvsdlib, 'send_request') as send_request:
self.nvsdlib.create_router(router)
send_request.assert_called_once_with(
- "POST", uri, body=json.dumps(router), resource='router',
+ "POST", uri, body=jsonutils.dumps(router), resource='router',
tenant_id=TEST_TENANT)
def test_update_router(self):
with mock.patch.object(self.nvsdlib, 'send_request') as send_request:
self.nvsdlib.update_router(router)
send_request.assert_called_once_with(
- "PUT", uri, body=json.dumps(router),
+ "PUT", uri, body=jsonutils.dumps(router),
resource='router', tenant_id=TEST_TENANT,
resource_id=TEST_ROUTER)
import mock
import requests
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.plugins.oneconvergence.lib import config # noqa
from neutron.plugins.oneconvergence.lib import plugin_helper as client
from neutron.tests import base
def get_response(self, *args, **kwargs):
response = mock.Mock()
response.status_code = requests.codes.ok
- response.content = json.dumps({'session_uuid': 'new_auth_token'})
+ response.content = jsonutils.dumps({'session_uuid': 'new_auth_token'})
return response
def test_login(self):
login_url = ('http://127.0.0.1:8082/pluginhandler/ocplugin/'
'authmgmt/login')
headers = {'Content-Type': 'application/json'}
- data = json.dumps({"user_name": "ocplugin", "passwd": "oc123"})
+ data = jsonutils.dumps({"user_name": "ocplugin", "passwd": "oc123"})
timeout = 30.0
with mock.patch.object(self.nvsdcontroller, 'do_request',
from neutron import context
from neutron.extensions import loadbalancer
from neutron import manager
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.plugins.common import constants
from neutron.services.loadbalancer.drivers.radware import driver
from neutron.services.loadbalancer.drivers.radware import exceptions as r_exc
def _get_handler(resource):
if resource == GET_200[2]:
if rest_call_function_mock.TEMPLATES_MISSING:
- data = json.loads('[]')
+ data = jsonutils.loads('[]')
else:
- data = json.loads(
+ data = jsonutils.loads(
'[{"name":"openstack_l2_l3"},{"name":"openstack_l4"}]'
)
return 200, '', '', data
if resource in GET_200:
return 200, '', '', ''
else:
- data = json.loads('{"complete":"True", "success": "True"}')
+ data = jsonutils.loads('{"complete":"True", "success": "True"}')
return 202, '', '', data
def _post_handler(resource, binary):
if re.search(r'/api/workflow/.+/action/.+', resource):
- data = json.loads('{"uri":"some_uri"}')
+ data = jsonutils.loads('{"uri":"some_uri"}')
return 202, '', '', data
elif re.search(r'/api/service\?name=.+', resource):
- data = json.loads('{"links":{"actions":{"provision":"someuri"}}}')
+ data = jsonutils.loads('{"links":{"actions":{"provision":"someuri"}}}')
return 201, '', '', data
elif binary:
return 201, '', '', ''
self.assertEqual(
0, len(list(checks.validate_log_translations(ok,
ok, 'f'))))
+
+ def test_use_jsonutils(self):
+ def __get_msg(fun):
+ msg = ("N321: jsonutils.%(fun)s must be used instead of "
+ "json.%(fun)s" % {'fun': fun})
+ return [(0, msg)]
+
+ for method in ('dump', 'dumps', 'load', 'loads'):
+ self.assertEqual(
+ __get_msg(method),
+ list(checks.use_jsonutils("json.%s(" % method,
+ "./neutron/common/rpc.py")))
+
+ self.assertEqual(0,
+ len(list(checks.use_jsonutils("jsonx.%s(" % method,
+ "./neutron/common/rpc.py"))))
+
+ self.assertEqual(0,
+ len(list(checks.use_jsonutils("json.%sx(" % method,
+ "./neutron/common/rpc.py"))))
+
+ self.assertEqual(0,
+ len(list(checks.use_jsonutils(
+ "json.%s" % method,
+ "./neutron/plugins/openvswitch/agent/xenapi/etc/xapi.d/"
+ "plugins/netwrap"))))
from neutron import context
from neutron import manager
from neutron.openstack.common import importutils
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.openstack.common import policy as common_policy
from neutron import policy
from neutron.tests import base
def _test_set_rules_with_deprecated_policy(self, input_rules,
expected_rules):
- policy._set_rules(json.dumps(input_rules))
+ policy._set_rules(jsonutils.dumps(input_rules))
# verify deprecated policy has been removed
for pol in input_rules.keys():
self.assertNotIn(pol, common_policy._rules)
import six.moves.urllib.parse as urlparse
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
from neutron.openstack.common import uuidutils
from neutron.plugins.vmware.api_client import exception as api_exc
return (tag_filter, attr_filter, page_len, page_cursor)
def _add_lswitch(self, body):
- fake_lswitch = json.loads(body)
+ fake_lswitch = jsonutils.loads(body)
fake_lswitch['uuid'] = uuidutils.generate_uuid()
self._fake_lswitch_dict[fake_lswitch['uuid']] = fake_lswitch
# put the tenant_id and the zone_uuid in the main dict
return fake_lswitch
def _build_lrouter(self, body, uuid=None):
- fake_lrouter = json.loads(body)
+ fake_lrouter = jsonutils.loads(body)
if uuid:
fake_lrouter['uuid'] = uuid
fake_lrouter['tenant_id'] = self._get_tag(fake_lrouter, 'os_tid')
return fake_lrouter
def _add_lqueue(self, body):
- fake_lqueue = json.loads(body)
+ fake_lqueue = jsonutils.loads(body)
fake_lqueue['uuid'] = uuidutils.generate_uuid()
self._fake_lqueue_dict[fake_lqueue['uuid']] = fake_lqueue
return fake_lqueue
def _add_lswitch_lport(self, body, ls_uuid):
- fake_lport = json.loads(body)
+ fake_lport = jsonutils.loads(body)
new_uuid = uuidutils.generate_uuid()
fake_lport['uuid'] = new_uuid
# put the tenant_id and the ls_uuid in the main dict
return fake_lport
def _build_lrouter_lport(self, body, new_uuid=None, lr_uuid=None):
- fake_lport = json.loads(body)
+ fake_lport = jsonutils.loads(body)
if new_uuid:
fake_lport['uuid'] = new_uuid
# put the tenant_id and the le_uuid in the main dict
'q_port_id')
# replace ip_address with its json dump
if 'ip_addresses' in fake_lport:
- ip_addresses_json = json.dumps(fake_lport['ip_addresses'])
+ ip_addresses_json = jsonutils.dumps(fake_lport['ip_addresses'])
fake_lport['ip_addresses_json'] = ip_addresses_json
return fake_lport
return fake_lport
def _add_securityprofile(self, body):
- fake_securityprofile = json.loads(body)
+ fake_securityprofile = jsonutils.loads(body)
fake_securityprofile['uuid'] = uuidutils.generate_uuid()
fake_securityprofile['tenant_id'] = self._get_tag(
fake_securityprofile, 'os_tid')
return fake_securityprofile
def _add_lrouter_nat(self, body, lr_uuid):
- fake_nat = json.loads(body)
+ fake_nat = jsonutils.loads(body)
new_uuid = uuidutils.generate_uuid()
fake_nat['uuid'] = new_uuid
fake_nat['lr_uuid'] = lr_uuid
self._fake_lrouter_nat_dict[fake_nat['uuid']] = fake_nat
if 'match' in fake_nat:
- match_json = json.dumps(fake_nat['match'])
+ match_json = jsonutils.dumps(fake_nat['match'])
fake_nat['match_json'] = match_json
return fake_nat
def _add_gatewayservice(self, body):
- fake_gwservice = json.loads(body)
+ fake_gwservice = jsonutils.loads(body)
fake_gwservice['uuid'] = str(uuidutils.generate_uuid())
fake_gwservice['tenant_id'] = self._get_tag(
fake_gwservice, 'os_tid')
return False
def _build_item(resource):
- item = json.loads(response_template % resource)
+ item = jsonutils.loads(response_template % resource)
if relations:
for relation in relations:
self._build_relation(resource, item,
for item in res_dict.itervalues():
if 'tags' in item:
- item['tags_json'] = json.dumps(item['tags'])
+ item['tags_json'] = jsonutils.dumps(item['tags'])
if resource_type in (self.LSWITCH_LPORT_RESOURCE,
self.LSWITCH_LPORT_ATT,
self.LSWITCH_LPORT_STATUS):
response_dict['page_cursor'] = next_cursor
if do_result_count:
response_dict['result_count'] = total_items
- return json.dumps(response_dict)
+ return jsonutils.dumps(response_dict)
def _show(self, resource_type, response_file,
uuid1, uuid2=None, relations=None):
res_dict = getattr(self, '_fake_%s_dict' % resource_type)
for item in res_dict.itervalues():
if 'tags' in item:
- item['tags_json'] = json.dumps(item['tags'])
+ item['tags_json'] = jsonutils.dumps(item['tags'])
# replace sec prof rules with their json dump
def jsonify_rules(rule_key):
if rule_key in item:
- rules_json = json.dumps(item[rule_key])
+ rules_json = jsonutils.dumps(item[rule_key])
item['%s_json' % rule_key] = rules_json
jsonify_rules('logical_port_egress_rules')
jsonify_rules('logical_port_ingress_rules')
- items = [json.loads(response_template % res_dict[res_uuid])
+ items = [jsonutils.loads(response_template % res_dict[res_uuid])
for res_uuid in res_dict if res_uuid == target_uuid]
if items:
- return json.dumps(items[0])
+ return jsonutils.dumps(items[0])
raise api_exc.ResourceNotFound()
def handle_get(self, url):
with open("%s/%s" % (self.fake_files_path, response_file)) as f:
response_template = f.read()
add_resource = getattr(self, '_add_%s' % res_type)
- body_json = json.loads(body)
+ body_json = jsonutils.loads(body)
val_func = self._validators.get(res_type)
if val_func:
val_func(body_json)
is_attachment = True
res_type = res_type[:res_type.index('attachment')]
res_dict = getattr(self, '_fake_%s_dict' % res_type)
- body_json = json.loads(body)
+ body_json = jsonutils.loads(body)
val_func = self._validators.get(res_type)
if val_func:
val_func(body_json)
resource.update(body_json)
else:
relations = resource.get("_relations", {})
- body_2 = json.loads(body)
+ body_2 = jsonutils.loads(body)
resource['att_type'] = body_2['type']
relations['LogicalPortAttachment'] = body_2
resource['_relations'] = relations
self.LROUTER_RESOURCE)
res_dict_2 = getattr(self, '_fake_%s_dict' % res_type_2)
body_2['peer_port_uuid'] = uuids[-1]
- resource_2 = res_dict_2[json.loads(body)['peer_port_uuid']]
+ resource_2 = \
+ res_dict_2[jsonutils.loads(body)['peer_port_uuid']]
relations_2 = resource_2.get("_relations")
if not relations_2:
relations_2 = {}
lr_uuid = None
lp_uuid = uuids[1]
response = response_template % self._fill_attachment(
- json.loads(body), ls_uuid, lr_uuid, lp_uuid)
+ jsonutils.loads(body), ls_uuid, lr_uuid, lp_uuid)
return response
def handle_delete(self, url):
import mock
from neutron.common import exceptions
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.plugins.vmware.api_client import exception as api_exc
from neutron.plugins.vmware.common import exceptions as nsx_exc
from neutron.plugins.vmware.common import utils
lsnlib.lsn_for_network_create(self.cluster, net_id)
self.mock_request.assert_called_once_with(
"POST", "/ws.v1/lservices-node",
- json.dumps(obj), cluster=self.cluster)
+ jsonutils.dumps(obj), cluster=self.cluster)
def test_lsn_for_network_get(self):
net_id = "foo_network_id"
'/ws.v1/lservices-node/%s/lport/%s/%s' % (lsn_id,
lsn_port_id,
lsn_type),
- json.dumps({'hosts': hosts_data}),
+ jsonutils.dumps({'hosts': hosts_data}),
cluster=self.cluster)
def test_lsn_port_dhcp_entries_update(self):
}
self.mock_request.assert_called_once_with(
"POST", "/ws.v1/lservices-node/%s/lport" % lsn_id,
- json.dumps(port_obj), cluster=self.cluster)
+ jsonutils.dumps(port_obj), cluster=self.cluster)
def test_lsn_port_delete(self):
lsn_id = "foo_lsn_id"
"PUT",
("/ws.v1/lservices-node/%s/lport/%s/"
"attachment") % (lsn_id, lsn_port_id),
- json.dumps({"peer_port_uuid": lswitch_port_id,
- "type": "PatchAttachment"}),
+ jsonutils.dumps({"peer_port_uuid": lswitch_port_id,
+ "type": "PatchAttachment"}),
cluster=self.cluster)
def test_lsn_port_plug_network_raise_conflict(self):
]
self.mock_request.assert_has_calls([
mock.call("PUT", "/ws.v1/lservices-node/%s/dhcp" % lsn_id,
- json.dumps({"enabled": is_enabled}),
+ jsonutils.dumps({"enabled": is_enabled}),
cluster=self.cluster),
mock.call("PUT",
("/ws.v1/lservices-node/%s/"
"lport/%s/dhcp") % (lsn_id, lsn_port_id),
- json.dumps({"options": opt_array}),
+ jsonutils.dumps({"options": opt_array}),
cluster=self.cluster)
])
self.mock_request.assert_has_calls([
mock.call("PUT",
"/ws.v1/lservices-node/%s/metadata-proxy" % lsn_id,
- json.dumps(lsn_obj),
+ jsonutils.dumps(lsn_obj),
cluster=self.cluster),
])
"POST",
("/ws.v1/lservices-node/%s/lport/"
"%s/%s?action=%s") % (lsn_id, lsn_port_id, extra_action, action),
- json.dumps(host), cluster=self.cluster)
+ jsonutils.dumps(host), cluster=self.cluster)
def test_lsn_port_dhcp_host_add(self):
host = {
from neutron.common import exceptions as n_exc
from neutron import context
from neutron.extensions import l3
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.openstack.common import log
from neutron.plugins.vmware.api_client import client
from neutron.plugins.vmware.api_client import exception as api_exc
self.nsx_cache._lswitches)
self.nsx_cache._lswitches[lswitch['uuid']] = (
{'data': lswitch,
- 'hash': hash(json.dumps(lswitch))})
+ 'hash': hash(jsonutils.dumps(lswitch))})
for lswitchport in LSWITCHPORTS:
self.nsx_cache._uuid_dict_mappings[lswitchport['uuid']] = (
self.nsx_cache._lswitchports)
self.nsx_cache._lswitchports[lswitchport['uuid']] = (
{'data': lswitchport,
- 'hash': hash(json.dumps(lswitchport))})
+ 'hash': hash(jsonutils.dumps(lswitchport))})
for lrouter in LROUTERS:
self.nsx_cache._uuid_dict_mappings[lrouter['uuid']] = (
self.nsx_cache._lrouters)
self.nsx_cache._lrouters[lrouter['uuid']] = (
{'data': lrouter,
- 'hash': hash(json.dumps(lrouter))})
+ 'hash': hash(jsonutils.dumps(lrouter))})
super(CacheTestCase, self).setUp()
def test_get_lswitches(self):
ctx = context.get_admin_context()
# Generate 4 networks, 1 port per network, and 4 routers
with self._populate_data(ctx, net_size=4, port_size=1, router_size=4):
- fake_lswitches = json.loads(
+ fake_lswitches = jsonutils.loads(
self.fc.handle_get('/ws.v1/lswitch'))['results']
- fake_lrouters = json.loads(
+ fake_lrouters = jsonutils.loads(
self.fc.handle_get('/ws.v1/lrouter'))['results']
- fake_lswitchports = json.loads(
+ fake_lswitchports = jsonutils.loads(
self.fc.handle_get('/ws.v1/lswitch/*/lport'))['results']
return_values = [
# Chunk 0 - lswitches
import copy
-from neutron.openstack.common import jsonutils as json
+from neutron.openstack.common import jsonutils
from neutron.openstack.common import uuidutils
from neutron.plugins.vmware.vshield.common import exceptions
'moduleName': 'vShield Edge',
'errorData': None
}
- return (header, json.dumps(response))
+ return (header, jsonutils.dumps(response))
self._job_idx = self._job_idx + 1
job_id = "jobdata-%d" % self._job_idx
# The lswitch is created via VCNS API so the fake nsx_api will not
# see it. Added to fake nsx_api here.
if self._fake_nsx_api:
- lswitch = self._fake_nsx_api._add_lswitch(json.dumps(lsconfig))
+ lswitch = \
+ self._fake_nsx_api._add_lswitch(jsonutils.dumps(lsconfig))
else:
lswitch = lsconfig
lswitch['uuid'] = uuidutils.generate_uuid()