try:
return func(*args, **kwargs)
except Exception as e:
- if errors != None and type(e) in errors:
+ if errors is not None and type(e) in errors:
raise faults.QuantumHTTPError(e)
# otherwise just re-raise
raise
action = self.action_prefix + action
action = action.replace('{tenant_id}', self.tenant)
- if type(params) is dict:
+ if isinstance(params, dict):
action += '?' + urllib.urlencode(params)
if body:
body = self.serialize(body)
headers[AUTH_TOKEN_HEADER] = self.auth_token
# Open connection and send request, handling SSL certs
certs = {'key_file': self.key_file, 'cert_file': self.cert_file}
- certs = dict((x, certs[x]) for x in certs if certs[x] != None)
+ certs = dict((x, certs[x]) for x in certs if certs[x] is not None)
if self.use_ssl and len(certs):
conn = connection_type(self.host, self.port, **certs)
"""
if data is None:
return None
- elif type(data) is dict:
+ elif isinstance(data, dict):
return Serializer().serialize(data, self.content_type())
else:
raise Exception("unable to serialize object of type = '%s'" \
if self.IsDirty(name):
self.ParseNewFlags()
val = gflags.FlagValues.__getattr__(self, name)
- if type(val) is str:
+ if isinstance(val, str):
tmpl = string.Template(val)
context = [self, self.__dict__['__extra_context']]
return tmpl.substitute(StrWrapper(context))
xmlns = metadata.get('xmlns', None)
if xmlns:
result.setAttribute('xmlns', xmlns)
- if type(data) is list:
+ if isinstance(data, list):
collections = metadata.get('list_collections', {})
if nodename in collections:
metadata = collections[nodename]
for item in data:
node = self._to_xml_node(doc, metadata, singular, item)
result.appendChild(node)
- elif type(data) is dict:
+ elif isinstance(data, dict):
collections = metadata.get('dict_collections', {})
if nodename in collections:
metadata = collections[nodename]
import string
import struct
import time
-import types
from quantum.common import flags
from quantum.common import exceptions as exception
def to_primitive(value):
- if type(value) is type([]) or type(value) is type((None,)):
+ if isinstance(value, (list, tuple)):
o = []
for v in value:
o.append(to_primitive(v))
return o
- elif type(value) is type({}):
+ elif isinstance(value, dict):
o = {}
for k, v in value.iteritems():
o[k] = to_primitive(v)
Useful for JSON-decoded stuff and config file parsing
"""
- if type(subject) == type(bool):
+ if isinstance(subject, bool):
return subject
if hasattr(subject, 'startswith'): # str or unicode...
if subject.strip().lower() in ('true', 'on', '1'):
obj = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
result = None
- if process_input != None:
+ if process_input is not None:
result = obj.communicate(process_input)
else:
result = obj.communicate()
raise exception.Error('Invalid backend: %s' % backend_name)
backend = self.__backends[backend_name]
- if type(backend) == type(tuple()):
+ if isinstance(backend, tuple):
name = backend[0]
fromlist = backend[1]
else:
def _build_detail(self, portprofile_data):
"""Return a detailed info of a portprofile."""
- if (portprofile_data['assignment'] == None):
+ if (portprofile_data['assignment'] is None):
return dict(portprofile=dict(id=portprofile_data['profile_id'],
name=portprofile_data['profile_name'],
qos_name=portprofile_data['qos_name']))
network = db.network_get(net_id)
port = db.port_get(net_id, port_id)
attachment_id = port[const.INTERFACEID]
- if attachment_id == None:
+ if attachment_id is None:
raise cexc.InvalidAttach(port_id=port_id, net_id=net_id,
att_id=remote_interface_id)
attachment_id = attachment_id[:const.UUID_LENGTH]
network = db.network_get(net_id)
port = db.port_get(net_id, port_id)
attachment_id = port[const.INTERFACEID]
- if attachment_id == None:
+ if attachment_id is None:
raise exc.InvalidDetach(port_id=port_id, net_id=net_id,
att_id=remote_interface_id)
self._invoke_device_plugins(self._func_name(), [tenant_id, net_id,
def _invoke_plugin(self, plugin_key, function_name, args, kwargs):
"""Invoke only the device plugin"""
# If the last param is a dict, add it to kwargs
- if args and type(args[-1]) is dict:
+ if args and isinstance(args[-1], dict):
kwargs.update(args.pop())
return getattr(self._plugins[plugin_key], function_name)(*args,
service_args.append(image_name)
counter = 0
flag = False
- while flag == False and counter <= 5:
+ while not flag and counter <= 5:
counter = counter + 1
time.sleep(2.5)
process = subprocess.Popen(service_args, \
service_args.append(image_name)
counter = 0
flag = False
- while flag == False and counter <= 10:
+ while not flag and counter <= 10:
counter = counter + 1
time.sleep(2.5)
process = subprocess.Popen(service_args, \
self.assertTrue(len(vlanids) > 0)
vlanid = l2network_db.reserve_vlanid()
used = l2network_db.is_vlanid_used(vlanid)
- self.assertTrue(used == True)
+ self.assertTrue(used)
used = l2network_db.release_vlanid(vlanid)
- self.assertTrue(used == False)
+ self.assertFalse(used)
#counting on default teardown here to clear db
def teardown_network(self):
self.assertTrue(port[0]["int-id"] == "vif1.1")
self.dbtest.unplug_interface(net1["net-id"], port1["port-id"])
port = self.dbtest.get_port(net1["net-id"], port1["port-id"])
- self.assertTrue(port[0]["int-id"] == None)
+ self.assertTrue(port[0]["int-id"] is None)
self.teardown_network_port()
def testh_joined_test(self):
self.assertEqual(port_dict[const.PORTID], new_port[const.UUID])
profile_name = self._cisco_ucs_plugin.\
_get_profile_name(port_dict[const.PORTID])
- self.assertTrue(profile_name != None)
+ self.assertTrue(profile_name is not None)
self.tear_down_network_port(
self.tenant_id, new_net_dict[const.NET_ID],
port_dict[const.PORTID])
for blade_intf in blade_intf_data.keys():
tmp = deepcopy(blade_intf_data[blade_intf])
intf_data = blade_intf_data[blade_intf]
- if intf_data[const.BLADE_INTF_RESERVATION] == \
- const.BLADE_INTF_RESERVED and \
- intf_data[const.TENANTID] == tenant_id and \
- intf_data[const.INSTANCE_ID] == None:
+ if (intf_data[const.BLADE_INTF_RESERVATION] ==
+ const.BLADE_INTF_RESERVED and
+ intf_data[const.TENANTID] == tenant_id and
+ intf_data[const.INSTANCE_ID] is None):
intf_data[const.INSTANCE_ID] = instance_id
host_name = self._get_host_name(ucsm_ip,
chassis_id,
def acquire(self, network_id):
for x in xrange(2, 4094):
- if self.vlans[x] == None:
+ if self.vlans[x] is None:
self.vlans[x] = network_id
# LOG.debug("VlanMap::acquire %s -> %s" % (x, network_id))
return x
def __init__(self, configfile=None):
config = ConfigParser.ConfigParser()
- if configfile == None:
+ if configfile is None:
if os.path.exists(CONF_FILE):
configfile = CONF_FILE
else:
configfile = find_config(os.path.abspath(
os.path.dirname(__file__)))
- if configfile == None:
+ if configfile is None:
raise Exception("Configuration file \"%s\" doesn't exist" %
(configfile))
LOG.debug("Using configuration file: %s" % configfile)
def testReleaseVlan(self):
vlan_id = self.vmap.acquire("foobar")
self.vmap.release("foobar")
- self.assertTrue(self.vmap.get(vlan_id) == None)
+ self.assertTrue(self.vmap.get(vlan_id) is None)
self.assertTrue(port[0]["attachment"] == "vif1.1")
self.dbtest.unplug_interface(net1["id"], port1["id"])
port = self.dbtest.get_port(net1["id"], port1["id"])
- self.assertTrue(port[0]["attachment"] == None)
+ self.assertTrue(port[0]["attachment"] is None)
result.setAttribute('xmlns', xmlns)
#TODO(bcwaldon): accomplish this without a type-check
- if type(data) is list:
+ if isinstance(data, list):
collections = metadata.get('list_collections', {})
if nodename in collections:
metadata = collections[nodename]
node = self._to_xml_node(doc, metadata, singular, item)
result.appendChild(node)
#TODO(bcwaldon): accomplish this without a type-check
- elif type(data) is dict:
+ elif isinstance(data, dict):
collections = metadata.get('dict_collections', {})
if nodename in collections:
metadata = collections[nodename]
LOG.info(_("HTTP exception thrown: %s"), unicode(ex))
action_result = Fault(ex, self._xmlns)
- if type(action_result) is dict or action_result is None:
+ if isinstance(action_result, dict) or action_result is None:
response = self.serializer.serialize(action_result,
accept,
action=action)
arg_dict['request'] = req
result = method(**arg_dict)
- if type(result) is dict:
+ if isinstance(result, dict):
content_type = req.best_match_content_type()
default_xmlns = self.get_default_xmlns(req)
body = self._serialize(result, content_type, default_xmlns)
xmlns = metadata.get('xmlns', None)
if xmlns:
result.setAttribute('xmlns', xmlns)
- if type(data) is list:
+ if isinstance(data, list):
collections = metadata.get('list_collections', {})
if nodename in collections:
metadata = collections[nodename]
for item in data:
node = self._to_xml_node(doc, metadata, singular, item)
result.appendChild(node)
- elif type(data) is dict:
+ elif isinstance(data, dict):
collections = metadata.get('dict_collections', {})
if nodename in collections:
metadata = collections[nodename]