getattr(driver, action)()
return True
- except Exception, e:
+ except Exception:
self.needs_resync = True
LOG.exception(_('Unable to %s dhcp.'), action)
LOG.info(_("DHCP agent started"))
-def main():
- eventlet.monkey_patch()
+def register_options():
cfg.CONF.register_opts(DhcpAgent.OPTS)
config.register_agent_state_opts_helper(cfg.CONF)
config.register_root_helper(cfg.CONF)
cfg.CONF.register_opts(DhcpLeaseRelay.OPTS)
cfg.CONF.register_opts(dhcp.OPTS)
cfg.CONF.register_opts(interface.OPTS)
+
+
+def main():
+ eventlet.monkey_patch()
+ register_options()
cfg.CONF(project='quantum')
config.setup_logging(cfg.CONF)
server = quantum_service.Service.create(
def __init__(self, pidfile, procname, root_helper='sudo'):
try:
self.fd = os.open(pidfile, os.O_CREAT | os.O_RDWR)
- except IOError, e:
+ except IOError:
LOG.exception(_("Failed to open pidfile: %s"), pidfile)
sys.exit(1)
self.pidfile = pidfile
cmd = ['cat', '/proc/%s/cmdline' % pid]
try:
return self.procname in utils.execute(cmd, self.root_helper)
- except RuntimeError, e:
+ except RuntimeError:
return False
pid = os.fork()
if pid > 0:
sys.exit(0)
- except OSError, e:
+ except OSError:
LOG.exception(_('Fork failed'))
sys.exit(1)
with open(file_name, 'r') as f:
try:
return converter and converter(f.read()) or f.read()
- except ValueError, e:
+ except ValueError:
msg = _('Unable to convert value in %s')
- except IOError, e:
+ except IOError:
msg = _('Unable to access %s')
LOG.debug(msg % file_name)
cmd = ['cat', '/proc/%s/cmdline' % pid]
try:
return self.network.id in utils.execute(cmd, self.root_helper)
- except RuntimeError, e:
+ except RuntimeError:
return False
@property
try:
with open(file_name, 'r') as f:
return int(f.read())
- except IOError, e:
+ except IOError:
msg = _('Unable to access %s')
- except ValueError, e:
+ except ValueError:
msg = _('Unable to convert value in %s')
LOG.debug(msg, file_name)
cmd = ['cat', '/proc/%s/cmdline' % pid]
try:
return self.uuid in utils.execute(cmd, self.root_helper)
- except RuntimeError, e:
+ except RuntimeError:
return False
import inspect
import os
-from oslo.config import cfg
-
from quantum.agent.linux import utils
from quantum.openstack.common import lockutils
from quantum.openstack.common import log as logging
else:
return webob.exc.HTTPNotFound()
- except Exception, e:
+ except Exception:
LOG.exception(_("Unexpected error."))
msg = _('An unknown error has occurred. '
'Please try your request again.')
return self._proxy_request(req.remote_addr,
req.path_info,
req.query_string)
- except Exception, e:
+ except Exception:
LOG.exception(_("Unexpected error."))
msg = _('An unknown error has occurred. '
'Please try your request again.')
unplug_device(conf, device)
ip.garbage_collect_namespace()
- except Exception, e:
+ except Exception:
LOG.exception(_('Error unable to destroy namespace: %s'), namespace)
# License for the specific language governing permissions and limitations
# under the License.
-import eventlet
-
from quantum.common import topics
from quantum.openstack.common import log as logging
-from quantum.openstack.common.notifier import api
-from quantum.openstack.common.notifier import rpc_notifier
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import proxy
from quantum.openstack.common import timeutils
-from quantum.openstack.common import uuidutils
LOG = logging.getLogger(__name__)
import webob.exc
from quantum.api.v2 import attributes
-from quantum.common import constants
from quantum.common import exceptions
import quantum.extensions
from quantum.manager import QuantumManager
from paste import deploy
from quantum.api.v2 import attributes
-from quantum.common import constants
from quantum.common import utils
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
"""
product_name = "quantum"
logging.setup(product_name)
- log_root = logging.getLogger(product_name).logger
LOG.info(_("Logging enabled!"))
"""
from quantum.openstack.common.exception import Error
-from quantum.openstack.common.exception import InvalidContentType
+from quantum.openstack.common.exception import InvalidContentType # noqa
from quantum.openstack.common.exception import OpenstackException
# under the License.
#
-from oslo.config import cfg
import sqlalchemy as sa
from sqlalchemy import exc as sa_exc
from sqlalchemy import orm
from sqlalchemy.orm import exc
-from sqlalchemy.sql import expression as expr
-import webob.exc as w_exc
from quantum.api.v2 import attributes
from quantum.common import exceptions as q_exc
-from quantum.db import db_base_plugin_v2
from quantum.db import model_base
from quantum.db import models_v2
from quantum.extensions import loadbalancer
from quantum.openstack.common import log as logging
from quantum.openstack.common import uuidutils
from quantum.plugins.common import constants
-from quantum import policy
LOG = logging.getLogger(__name__)
tenant_id = self._get_tenant_id_for_create(context, v)
with context.session.begin(subtransactions=True):
- pool = None
try:
qry = context.session.query(Pool)
- pool = qry.filter_by(id=v['pool_id']).one()
+ qry.filter_by(id=v['pool_id']).one()
except exc.NoResultFound:
raise loadbalancer.PoolNotFound(pool_id=v['pool_id'])
from alembic import op
import sqlalchemy as sa
-from sqlalchemy.dialects import mysql
from quantum.db import migration
from alembic import op
import sqlalchemy as sa
-from sqlalchemy.dialects import mysql
from quantum.db import migration
migration_for_plugins = ['*']
-from alembic import op
-import sqlalchemy as sa
-
-from quantum.db import migration
-
def upgrade(active_plugin=None, options=None):
"""A no-op migration for marking the Grizzly release."""
# @author: Mark McClain, DreamHost
import os
-import sys
from alembic import command as alembic_command
from alembic import config as alembic_config
from alembic import util as alembic_util
from oslo.config import cfg
-from quantum import manager
_core_opts = [
cfg.StrOpt('core_plugin',
import sqlalchemy as sa
from sqlalchemy import event
-from sqlalchemy.orm import exc
from quantum.common import exceptions as qexception
from quantum.db import model_base
# @author: Kaiwei Fan, VMware, Inc
import sqlalchemy as sa
-from sqlalchemy.orm import exc
from quantum.db import model_base
from quantum.extensions import routerservicetype as rst
# update network on network controller
try:
self._send_update_network(orig_net)
- except RemoteRestError as e:
+ except RemoteRestError:
# rollback creation of subnet
super(QuantumRestProxyV2, self).delete_subnet(context,
subnet['id'])
# update network on network controller
try:
self._send_update_network(orig_net)
- except RemoteRestError as e:
+ except RemoteRestError:
# rollback updation of subnet
super(QuantumRestProxyV2, self).update_subnet(context, id,
orig_subnet)
# update network on network controller
try:
self._send_update_network(orig_net)
- except RemoteRestError as e:
+ except RemoteRestError:
# TODO (Sumit): rollback deletion of subnet
raise
# update network on network controller
try:
self._send_update_network(orig_net)
- except RemoteRestError as e:
+ except RemoteRestError:
# rollback updation of subnet
super(QuantumRestProxyV2, self).update_floatingip(context, id,
orig_fl_ip)
# update network on network controller
try:
self._send_update_network(orig_net)
- except RemoteRestError as e:
+ except RemoteRestError:
# TODO(Sumit): rollback deletion of floating IP
raise
assert id_ <= len(self.matches), 'remove_id: id > len()'
self.matches.pop(id_)
- def remove_match(self, prior, method_regexp, uri_regexp):
- for i in self.matches:
- if (i[0], i[1], i[2]) == (method_regexp, uri_regexp, idstr):
- self.remove_id(i)
- break
-
def request_handler(self, method, uri, body):
retstatus = self.default_status
retbody = self.default_response
from quantum.db import api as db
from quantum.db import db_base_plugin_v2
from quantum.db import dhcp_rpc_base
-from quantum.db import l3_db
from quantum.db import l3_rpc_base
from quantum.db import securitygroups_rpc_base as sg_db_rpc
from quantum.extensions import portbindings
import logging as LOG
-from oslo.config import cfg
-
from quantum.plugins.cisco.common import config
from quantum.plugins.cisco.common import cisco_constants as const
from quantum.plugins.cisco.common import cisco_exceptions as cexc
@staticmethod
def put_credential(cred_name, username, password):
"""Set the username and password"""
- credential = cdb.add_credential(TENANT, cred_name, username, password)
+ cdb.add_credential(TENANT, cred_name, username, password)
@staticmethod
def get_username(cred_name):
@staticmethod
def get_credential(cred_name):
"""Get the username and password"""
- credential = cdb.get_credential_name(TENANT, cred_name)
+ cdb.get_credential_name(TENANT, cred_name)
return {const.USERNAME: const.CREDENTIAL_USERNAME,
const.PASSWORD: const.CREDENTIAL_PASSWORD}
import logging
from quantum.plugins.cisco.common import cisco_constants as const
-from quantum.plugins.cisco.db import l2network_db as cdb
LOG = logging.getLogger(__name__)
options(joinedload(models.Network.ports)).
filter_by(name=net_name).
all())
- except exc.NoResultFound, e:
+ except exc.NoResultFound:
raise q_exc.NetworkNotFound(net_name=net_name)
options(joinedload(models.Network.ports)). \
filter_by(uuid=net_id).\
one()
- except exc.NoResultFound, e:
+ except exc.NoResultFound:
raise q_exc.NetworkNotFound(net_id=net_id)
filter_by(uuid=net_id).
filter_by(tenant_id=tenant_id).
one())
- except exc.NoResultFound, e:
+ except exc.NoResultFound:
raise q_exc.NetworkNotFound(net_id=net_id)
# under the License.
# @author: Rohit Agarwalla, Cisco Systems, Inc.
-from sqlalchemy import Column, Integer, String, ForeignKey, Boolean
-from sqlalchemy.orm import relation, object_mapper
+from sqlalchemy import Column, Integer, String, Boolean
+from sqlalchemy.orm import object_mapper
from quantum.openstack.common import uuidutils
-from quantum.plugins.cisco.db import models
from quantum.plugins.cisco.db.models import BASE
from quantum.plugins.cisco.common import cisco_exceptions as c_exc
from quantum.plugins.cisco.common import config
from quantum.plugins.cisco.db import network_models_v2
-from quantum.plugins.cisco.db import nexus_models_v2
+from quantum.plugins.cisco.db import nexus_models_v2 # noqa
from quantum.plugins.openvswitch import ovs_models_v2
#
# @author: Rohit Agarwalla, Cisco Systems, Inc.
-from sqlalchemy import Column, Integer, String, ForeignKey, Boolean
-from sqlalchemy.orm import relation, object_mapper
+from sqlalchemy import Column, Integer, String, Boolean
+from sqlalchemy.orm import object_mapper
from quantum.db import model_base
-from quantum.db import models_v2 as models
+from quantum.db import models_v2 as models # noqa
from quantum.openstack.common import uuidutils
# @author: Rohit Agarwalla, Cisco Systems, Inc.
#
-from copy import deepcopy
import inspect
import logging
from novaclient.v1_1 import client as nova_client
from oslo.config import cfg
-from quantum.db import l3_db
from quantum.manager import QuantumManager
from quantum.openstack.common import importutils
from quantum.plugins.cisco.common import cisco_constants as const
args = [ovs_output[0]['tenant_id'], id, {'vlan_id': vlan_id},
{'net_admin_state': ovs_output[0]['admin_state_up']},
{'vlan_ids': vlanids}]
- nexus_output = self._invoke_plugin_per_device(const.NEXUS_PLUGIN,
- self._func_name(),
- args)
+ self._invoke_plugin_per_device(const.NEXUS_PLUGIN,
+ self._func_name(),
+ args)
return ovs_output[0]
def delete_network(self, context, id):
instance_id = port['port']['device_id']
device_owner = port['port']['device_owner']
- if conf.CISCO_TEST.host is not None:
- host = conf.CISCO_TEST.host
- elif device_owner == 'network:dhcp':
- return ovs_output[0]
- elif instance_id:
+ create_net = (conf.CISCO_TEST.host is None and
+ device_owner != 'network:dhcp' and
+ instance_id)
+ if create_net:
net_id = port['port']['network_id']
tenant_id = port['port']['tenant_id']
self._invoke_nexus_for_net_create(
ovs_output = self._invoke_plugin_per_device(const.VSWITCH_PLUGIN,
self._func_name(),
args)
- nexus_output = self._invoke_plugin_per_device(const.NEXUS_PLUGIN,
- self._func_name(),
- n_args)
+ self._invoke_plugin_per_device(const.NEXUS_PLUGIN,
+ self._func_name(),
+ n_args)
return ovs_output[0]
except:
# TODO (asomya): Check if we need to perform any rollback here
from quantum.openstack.common import importutils
from quantum.plugins.cisco.common import cisco_constants as const
from quantum.plugins.cisco.common import cisco_exceptions as cexc
-from quantum.plugins.cisco.common import cisco_utils as cutil
-from quantum.plugins.cisco.common import config
+from quantum.plugins.cisco.common import config # noqa
from quantum.plugins.cisco.db import network_db_v2 as cdb
LOG = logging.getLogger(__name__)
"""Delete a QoS level"""
LOG.debug(_("delete_qos() called"))
try:
- qos_level = cdb.get_qos(tenant_id, qos_id)
+ cdb.get_qos(tenant_id, qos_id)
except Exception:
raise cexc.QosNotFound(tenant_id=tenant_id,
qos_id=qos_id)
"""Rename QoS level"""
LOG.debug(_("rename_qos() called"))
try:
- qos_level = cdb.get_qos(tenant_id, qos_id)
+ cdb.get_qos(tenant_id, qos_id)
except Exception:
raise cexc.QosNotFound(tenant_id=tenant_id,
qos_id=qos_id)
Implements a Nexus-OS NETCONF over SSHv2 API Client
"""
-import eventlet
import logging
from ncclient import manager
from quantum.openstack.common import importutils
from quantum.plugins.cisco.common import cisco_constants as const
from quantum.plugins.cisco.common import cisco_credentials_v2 as cred
-from quantum.plugins.cisco.common import cisco_exceptions as excep
from quantum.plugins.cisco.common import config as conf
from quantum.plugins.cisco.db import network_db_v2 as cdb
from quantum.plugins.cisco.db import nexus_db_v2 as nxos_db
import webob
from webtest import TestApp
-from quantum import api as server
from quantum.api import extensions
from quantum.api.extensions import (
ExtensionMiddleware,
that tests the database api method calls
"""
-import logging as LOG
-
from quantum.openstack.common import log as logging
-from quantum.plugins.cisco.common import cisco_constants as const
import quantum.plugins.cisco.db.api as db
import quantum.plugins.cisco.db.l2network_db as l2network_db
import quantum.plugins.cisco.db.nexus_db_v2 as nexus_db
def testb_getall_nexusportbindings(self):
"""get all nexus port binding"""
- binding1 = self.dbtest.create_nexusportbinding("port1", 10)
- binding2 = self.dbtest.create_nexusportbinding("port2", 10)
+ self.dbtest.create_nexusportbinding("port1", 10)
+ self.dbtest.create_nexusportbinding("port2", 10)
bindings = self.dbtest.get_all_nexusportbindings()
count = 0
for bind in bindings:
def testc_delete_nexusportbinding(self):
"""delete nexus port binding"""
- binding1 = self.dbtest.create_nexusportbinding("port1", 10)
+ self.dbtest.create_nexusportbinding("port1", 10)
self.dbtest.delete_nexusportbinding(10)
bindings = self.dbtest.get_all_nexusportbindings()
count = 0
from quantum.common import topics
from quantum.db import db_base_plugin_v2
from quantum.db import l3_db
-# NOTE: quota_db cannot be removed, it is for db model
-from quantum.db import quota_db
+from quantum.db import quota_db # noqa
from quantum.extensions import portbindings
from quantum.extensions import providernet as provider
from quantum.openstack.common import log as logging
from quantum.openstack.common import log as logging
from quantum.openstack.common import loopingcall
from quantum.openstack.common.rpc import dispatcher
-# NOTE (e0ne): this import is needed for config init
-from quantum.plugins.linuxbridge.common import config
+from quantum.plugins.linuxbridge.common import config # noqa
from quantum.plugins.linuxbridge.common import constants as lconst
from quantum.db import securitygroups_db as sg_db
from quantum import manager
from quantum.openstack.common import log as logging
-# NOTE (e0ne): this import is needed for config init
-from quantum.plugins.linuxbridge.common import config
+from quantum.plugins.linuxbridge.common import config # noqa
from quantum.plugins.linuxbridge.common import constants
from quantum.plugins.linuxbridge.db import l2network_models_v2
from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc
from quantum.common import topics
-from quantum.common import utils
from quantum.db import agents_db
from quantum.db import agentschedulers_db
from quantum.db import api as db_api
from quantum.db import dhcp_rpc_base
from quantum.db import extraroute_db
from quantum.db import l3_rpc_base
-# NOTE: quota_db cannot be removed, it is for db model
-from quantum.db import quota_db
+from quantum.db import quota_db # noqa
from quantum.db import securitygroups_rpc_base as sg_db_rpc
from quantum.extensions import portbindings
from quantum.extensions import providernet as provider
-from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.extensions.flavor import (FLAVOR_NETWORK, FLAVOR_ROUTER)
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
-# NOTE (e0ne): this import is needed for config init
-from quantum.plugins.metaplugin.common import config
+from quantum.plugins.metaplugin.common import config # noqa
from quantum.plugins.metaplugin import meta_db_v2
from quantum.plugins.metaplugin.meta_models_v2 import (NetworkFlavor,
RouterFlavor)
protocol = rule['protocol']
port_range_max = rule['port_range_max']
rule_id = rule['id']
- ethertype = rule['ethertype']
security_group_id = rule['security_group_id']
remote_group_id = rule['remote_group_id']
remote_ip_prefix = rule['remote_ip_prefix'] # watch out. not validated
tenant_id = rule['tenant_id']
port_range_min = rule['port_range_min']
- external_id = rule['external_id']
# construct a corresponding rule
tp_src_start = tp_src_end = None
tp_dst_start, tp_dst_end = port_range_min, port_range_max
# protocol
- if rule['protocol'] == 'tcp':
+ if protocol == 'tcp':
nw_proto = 6
- elif rule['protocol'] == 'udp':
+ elif protocol == 'udp':
nw_proto = 17
- elif rule['protocol'] == 'icmp':
+ elif protocol == 'icmp':
nw_proto = 1
# extract type and code from reporposed fields
icmp_type = rule['from_port']
from webob import exc as w_exc
from quantum.common import exceptions as q_exc
-from quantum.common.utils import find_config_file
from quantum.db import api as db
from quantum.db import db_base_plugin_v2
from quantum.db import l3_db
from quantum.db import securitygroups_db
from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import log as logging
-from quantum.plugins.midonet import config
+from quantum.plugins.midonet import config # noqa
from quantum.plugins.midonet import midonet_lib
datapath_id=datapath_id,
port_added=port_added,
port_removed=port_removed))
- except Exception as e:
+ except Exception:
LOG.warn(_("update_ports() failed."))
return
from oslo.config import cfg
from quantum.agent.common import config
-# import rpc config options
-from quantum.openstack.common import rpc
+from quantum.openstack.common import rpc # noqa
from quantum import scheduler
from quantum.extensions import securitygroup as ext_sg
from quantum import manager
from quantum.openstack.common import log as logging
-# NOTE (e0ne): this import is needed for config init
-from quantum.plugins.nec.common import config
+from quantum.plugins.nec.common import config # noqa
from quantum.plugins.nec.common import exceptions as nexc
from quantum.plugins.nec.db import models as nmodels
plugin = manager.QuantumManager.get_plugin()
port_dict = plugin._make_port_dict(port)
port_dict[ext_sg.SECURITYGROUPS] = [
- sg_id for port, sg_id in port_and_sgs if sg_id]
+ sg_id for port_, sg_id in port_and_sgs if sg_id]
port_dict['security_group_rules'] = []
port_dict['security_group_source_groups'] = []
port_dict['fixed_ips'] = [ip['ip_address']
def create_tenant(self, description, tenant_id=None):
ofc_tenant_id = self._generate_pfc_id(tenant_id)
body = {'id': ofc_tenant_id}
- res = self.client.post('/tenants', body=body)
+ self.client.post('/tenants', body=body)
return '/tenants/' + ofc_tenant_id
def delete_tenant(self, ofc_tenant_id):
from quantum.agent import securitygroups_rpc as sg_rpc
from quantum.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from quantum.api.rpc.agentnotifiers import l3_rpc_agent_api
-from quantum.common import constants as q_const
-from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc
from quantum.common import topics
-from quantum import context
from quantum.db import agents_db
from quantum.db import agentschedulers_db
from quantum.db import dhcp_rpc_base
from quantum.db import extraroute_db
from quantum.db import l3_rpc_base
-#NOTE(amotoki): quota_db cannot be removed, it is for db model
-from quantum.db import quota_db
+from quantum.db import quota_db # noqa
from quantum.db import securitygroups_rpc_base as sg_db_rpc
from quantum.extensions import portbindings
-from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
"""
LOG.debug(_("NECPluginV2RPCCallbacks.update_ports() called, "
"kwargs=%s ."), kwargs)
- topic = kwargs['topic']
datapath_id = kwargs['datapath_id']
session = rpc_context.session
for p in kwargs.get('port_added', []):
ofc_pf_id = self._get_ofc_id(context, "ofc_packet_filter", filter_id)
ofc_pf_id = self.driver.convert_ofc_filter_id(context, ofc_pf_id)
- res = self.driver.delete_filter(ofc_pf_id)
+ self.driver.delete_filter(ofc_pf_id)
self._del_ofc_item(context, "ofc_packet_filter", filter_id)
from quantum.db import l3_db
from quantum.db import models_v2
from quantum.db import portsecurity_db
-# NOTE: quota_db cannot be removed, it is for db model
-from quantum.db import quota_db
+from quantum.db import quota_db # noqa
from quantum.db import securitygroups_db
from quantum.extensions import l3
from quantum.extensions import portsecurity as psec
from quantum.openstack.common import log as logging
from quantum.openstack.common import uuidutils
from quantum.plugins.nicira.nicira_nvp_plugin.extensions import nvp_networkgw
-from quantum import policy
LOG = logging.getLogger(__name__)
from quantum.openstack.common import log as logging
from quantum.openstack.common import loopingcall
from quantum.openstack.common.rpc import dispatcher
-from quantum.plugins.openvswitch.common import config
+from quantum.plugins.openvswitch.common import config # noqa
from quantum.plugins.openvswitch.common import constants
from sqlalchemy.orm import exc
-from oslo.config import cfg
-
from quantum.common import exceptions as q_exc
import quantum.db.api as db
from quantum.db import models_v2
plugin = manager.QuantumManager.get_plugin()
port_dict = plugin._make_port_dict(port)
port_dict[ext_sg.SECURITYGROUPS] = [
- sg_id for port, sg_id in port_and_sgs if sg_id]
+ sg_id for port_, sg_id in port_and_sgs if sg_id]
port_dict['security_group_rules'] = []
port_dict['security_group_source_groups'] = []
port_dict['fixed_ips'] = [ip['ip_address']
from quantum.db import dhcp_rpc_base
from quantum.db import extraroute_db
from quantum.db import l3_rpc_base
-# NOTE: quota_db cannot be removed, it is for db model
-from quantum.db import quota_db
+from quantum.db import quota_db # noqa
from quantum.db import securitygroups_rpc_base as sg_db_rpc
from quantum.extensions import portbindings
from quantum.extensions import providernet as provider
-from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import proxy
-from quantum.plugins.openvswitch.common import config
+from quantum.plugins.openvswitch.common import config # noqa
from quantum.plugins.openvswitch.common import constants
from quantum.plugins.openvswitch import ovs_db_v2
from quantum import policy
to the Network Operating System by PLUMgrid called NOS
"""
-import sys
-
from oslo.config import cfg
from quantum.db import api as db
self.snippets = plumgrid_nos_snippets.DataNOSPLUMgrid()
# TODO: (Edgar) These are placeholders for next PLUMgrid release
- nos_username = cfg.CONF.PLUMgridNOS.username
- nos_password = cfg.CONF.PLUMgridNOS.password
+ cfg.CONF.PLUMgridNOS.username
+ cfg.CONF.PLUMgridNOS.password
self.rest_conn = rest_connection.RestConnection(nos_plumgrid,
nos_port, timeout)
if self.rest_conn is None:
self._network_admin_state(network)
tenant_id = self._get_tenant_id_for_create(context, network["network"])
- # Get initial network details
- original_net = super(QuantumPluginPLUMgridV2, self).get_network(
- context, net_id)
-
with context.session.begin(subtransactions=True):
# Plugin DB - Network Update
new_network = super(
with context.session.begin(subtransactions=True):
# Plugin DB - Network Delete
- net_deleted = super(QuantumPluginPLUMgridV2,
- self).delete_network(context, net_id)
+ super(QuantumPluginPLUMgridV2, self).delete_network(context,
+ net_id)
try:
nos_url = self.snippets.BASE_NOS_URL + net_id
raise plum_excep.PLUMgridException(err_message)
ret = (resp.status, resp.reason, resp_str)
- except urllib2.HTTPError, e:
+ except urllib2.HTTPError:
LOG.error(_('PLUMgrid_NOS_Server: %(action)s failure, %(e)r'))
ret = 0, None, None, None
conn.close()
from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import log
from quantum.openstack.common.rpc import dispatcher
-from quantum.plugins.ryu.common import config
+from quantum.plugins.ryu.common import config # noqa
LOG = log.getLogger(__name__)
plugin = manager.QuantumManager.get_plugin()
port_dict = plugin._make_port_dict(port)
port_dict[ext_sg.SECURITYGROUPS] = [
- sg_id for port, sg_id in port_and_sgs if sg_id]
+ sg_id for port_, sg_id in port_and_sgs if sg_id]
port_dict['security_group_rules'] = []
port_dict['security_group_source_groups'] = []
port_dict['fixed_ips'] = [ip['ip_address'] for ip in port['fixed_ips']]
from quantum.db import l3_rpc_base
from quantum.db import models_v2
from quantum.db import securitygroups_rpc_base as sg_db_rpc
-from quantum.extensions import securitygroup as ext_sg
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import proxy
-from quantum.plugins.ryu.common import config
+from quantum.plugins.ryu.common import config # noqa
from quantum.plugins.ryu.db import api_v2 as db_api_v2
namespace = get_ns_name(pool_id)
ns = ip_lib.IPWrapper(self.root_helper, namespace)
pid_path = self._get_state_file_path(pool_id, 'pid')
- sock_path = self._get_state_file_path(pool_id, 'sock')
# kill the process
kill_pids_in_file(self.root_helper, pid_path)
# check if the configuration of l3 agent is compatible
# with the router
- router_ids = [router_id[0] for router_id in router_ids]
+ router_ids = [router_id_[0] for router_id_ in router_ids]
routers = plugin.get_routers(context, filters={'id': router_ids})
to_removed_ids = []
for router in routers:
self.assertEqual(result,
"0 arg Now is the time for all good men to \
come to the aid of their party.")
- except Exception, ex:
+ except Exception:
LOG.exception("Losing in rootwrap test")
def tearDown(self):
self.httpPatch = patch('httplib.HTTPConnection', create=True,
new=HTTPConnectionMock)
self.addCleanup(self.httpPatch.stop)
- MockHTTPConnection = self.httpPatch.start()
+ self.httpPatch.start()
super(BigSwitchProxyPluginV2TestCase,
self).setUp(self._plugin_name)
with self.subnet(cidr='10.0.10.0/24') as public_sub:
self._set_net_external(public_sub['subnet']['network_id'])
with self.port() as private_port:
- with self.router() as r:
+ with self.router():
res = self._create_floatingip(
'json',
public_sub['subnet']['network_id'],
def test_router_remove_interface_wrong_port_returns_404(self):
with self.router() as r:
- with self.subnet(cidr='10.0.10.0/24') as s:
+ with self.subnet(cidr='10.0.10.0/24'):
with self.port(no_delete=True) as p:
self._router_interface_action('add',
r['router']['id'],
import mock
from quantum import context
-from quantum.db import api as db
from quantum.manager import QuantumManager
from quantum.plugins.cisco.common import cisco_constants as const
-from quantum.plugins.cisco.db import network_db_v2
-from quantum.plugins.cisco.db import network_models_v2
+from quantum.plugins.cisco.db import network_db_v2 # noqa
from quantum.tests.unit import test_db_plugin
LOG = logging.getLogger(__name__)
from quantum.db import api as db
from quantum.openstack.common import importutils
from quantum.plugins.cisco.common import cisco_constants as const
-from quantum.plugins.cisco.db import network_models_v2
from quantum.plugins.cisco.nexus import cisco_nexus_plugin_v2
from quantum.tests import base
import contextlib
import logging
-import mock
import os
import testtools
-from oslo.config import cfg
import webob.exc
from quantum import context
from quantum.api.extensions import ExtensionMiddleware
from quantum.api.extensions import PluginAwareExtensionManager
-from quantum.api.v2 import attributes
-from quantum.api.v2.router import APIRouter
from quantum.common import config
-from quantum.common import exceptions as q_exc
-from quantum.common.test_lib import test_config
-from quantum.db import api as db
from quantum.db.loadbalancer import loadbalancer_db as ldb
import quantum.extensions
from quantum.extensions import loadbalancer
-from quantum.manager import QuantumManager
from quantum.plugins.common import constants
from quantum.plugins.services.agent_loadbalancer import (
plugin as loadbalancer_plugin
)
from quantum.tests.unit import test_db_plugin
-from quantum.tests.unit import test_extensions
-from quantum.tests.unit import testlib_api
-from quantum.tests.unit.testlib_api import create_request
-from quantum import wsgi
LOG = logging.getLogger(__name__)
""" Test loadbalancer db plugin via extension and directly """
with self.subnet() as subnet:
with self.pool(name="pool1") as pool:
- with self.vip(name='vip1', subnet=subnet, pool=pool) as vip:
+ with self.vip(name='vip1', subnet=subnet, pool=pool):
vip_data = {
'name': 'vip1',
'pool_id': pool['pool']['id'],
self.assertEqual(res['vip'][k], v)
def test_delete_vip(self):
- with self.pool() as pool:
+ with self.pool():
with self.vip(no_delete=True) as vip:
req = self.new_delete_request('vips',
vip['vip']['id'])
def test_delete_pool(self):
with self.pool(no_delete=True) as pool:
with self.member(no_delete=True,
- pool_id=pool['pool']['id']) as member:
+ pool_id=pool['pool']['id']):
req = self.new_delete_request('pools',
pool['pool']['id'])
res = req.get_response(self.ext_api)
Unit tests for Windows Hyper-V virtual switch quantum driver
"""
-import sys
-
import mock
from oslo.config import cfg
from oslo.config import cfg
-#NOTE this import loads tests required options
-from quantum.plugins.linuxbridge.common import config
+from quantum.plugins.linuxbridge.common import config # noqa
from quantum.tests import base
def test_ensure_physical_in_bridge_flat(self):
with mock.patch.object(self.linux_bridge,
'ensure_flat_bridge') as flat_bridge_func:
- result = self.linux_bridge.ensure_physical_in_bridge(
+ self.linux_bridge.ensure_physical_in_bridge(
'network_id', 'physnet1', lconst.FLAT_VLAN_ID)
self.assertTrue(flat_bridge_func.called)
def test_ensure_physical_in_bridge_vlan(self):
with mock.patch.object(self.linux_bridge,
'ensure_vlan_bridge') as vlan_bridge_func:
- result = self.linux_bridge.ensure_physical_in_bridge(
+ self.linux_bridge.ensure_physical_in_bridge(
'network_id', 'physnet1', 7)
self.assertTrue(vlan_bridge_func.called)
from quantum import context
from quantum.db import api as db
-from quantum.db import models_v2
from quantum.extensions.flavor import (FLAVOR_NETWORK, FLAVOR_ROUTER)
from quantum.openstack.common import uuidutils
from quantum.plugins.metaplugin.meta_quantum_plugin import FlavorNotFound
self.mox = mox.Mox()
self.stubs = stubout.StubOutForTesting()
- args = ['--config-file', etcdir('quantum.conf.test')]
self.client_cls_p = mock.patch('quantumclient.v2_0.client.Client')
client_cls = self.client_cls_p.start()
self.client_inst = mock.Mock()
import uuid
import mock
-from webob import exc as w_exc
-import quantum.common.test_lib as test_lib
-import quantum.tests.unit.midonet as midonet
import quantum.tests.unit.test_db_plugin as test_plugin
super(TestMidonetNetworksV2, self).test_update_network()
def test_list_networks(self):
- bridge = self._setup_bridge_mock()
+ self._setup_bridge_mock()
with self.network(name='net1') as net1:
req = self.new_list_request('networks')
res = self.deserialize('json', req.get_response(self.api))
def test_list_networks_with_pagination_reverse_emulated(self):
pass
- def test_list_networks_with_parameters(self):
- pass
-
- def test_list_networks_with_parameters_invalid_values(self):
- pass
-
def test_list_networks_with_sort_emulated(self):
pass
def test_create_two_subnets_same_cidr_returns_400(self):
pass
- def test_create_two_subnets_same_cidr_returns_400(self):
- pass
-
def test_create_subnet_bad_V4_cidr(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_create_subnet_bad_V4_cidr()
from quantum.openstack.common import uuidutils
from quantum.plugins.nec.common import exceptions as nexc
from quantum.plugins.nec.db import api as ndb
-from quantum.plugins.nec.db import models as nmodels
+from quantum.plugins.nec.db import models as nmodels # noqa
from quantum.tests import base
from quantum.openstack.common import uuidutils
from quantum.plugins.nec.common import config
from quantum.plugins.nec.db import api as ndb
-from quantum.plugins.nec.db import models as nmodels
+from quantum.plugins.nec.db import models as nmodels # noqa
from quantum.plugins.nec import ofc_manager
from quantum.tests import base
from quantum.api.v2 import attributes
from quantum.extensions import securitygroup as ext_sg
from quantum import manager
-from quantum.plugins.nec.db import api as ndb
+from quantum.plugins.nec.db import api as ndb # noqa
from quantum.tests.unit import test_extension_security_group as test_sg
from quantum.tests.unit import test_security_groups_rpc as test_sg_rpc
t_id = 'dummy'
ofc_t_id, ofc_n_id = generate_random_ids(2)
ofc_n_id = '/networks/%s' % ofc_n_id
- ret = self.driver.convert_ofc_network_id(self.ctx, ofc_n_id, t_id)
+ self.driver.convert_ofc_network_id(self.ctx, ofc_n_id, t_id)
def test_convert_filter_id(self):
ofc_f_id = generate_random_ids(1)
from oslo.config import cfg
-from quantum.plugins.nicira.nicira_nvp_plugin.common import config
+from quantum.plugins.nicira.nicira_nvp_plugin.common import config # noqa
from quantum.tests import base
import mock
from oslo.config import cfg
-import testtools
from webob import exc
import webtest
self.assertEqual(net_del_res.status_int, 204)
def test_list_networks_with_shared(self):
- with self.network(name='net1') as net1:
- with self.network(name='net2', shared=True) as net2:
+ with self.network(name='net1'):
+ with self.network(name='net2', shared=True):
req = self.new_list_request('networks')
res = self.deserialize('json', req.get_response(self.api))
self.assertEqual(len(res['networks']), 2)
self._nvp_metadata_setup()
with self.router() as r:
with self.subnet() as s:
- body = self._router_interface_action('add',
- r['router']['id'],
- s['subnet']['id'],
- None)
+ self._router_interface_action('add',
+ r['router']['id'],
+ s['subnet']['id'],
+ None)
r_ports = self._list('ports')['ports']
self.assertEqual(len(r_ports), 2)
ips = []
meta_cidr = netaddr.IPNetwork('169.254.0.0/16')
self.assertTrue(any([ip in meta_cidr for ip in ips]))
# Needed to avoid 409
- body = self._router_interface_action('remove',
- r['router']['id'],
- s['subnet']['id'],
- None)
+ self._router_interface_action('remove',
+ r['router']['id'],
+ s['subnet']['id'],
+ None)
self._nvp_metadata_teardown()
def test_metadatata_network_removed_with_router_interface_remove(self):
subnets = self._list('subnets')['subnets']
with self.subnet() as s:
with self.port(subnet=s, device_id='1234',
- device_owner='network:dhcp') as p:
+ device_owner='network:dhcp'):
subnets = self._list('subnets')['subnets']
self.assertEqual(len(subnets), 1)
self.assertEquals(subnets[0]['host_routes'][0]['nexthop'],
from oslo.config import cfg
-#NOTE this import loads tests required options
-from quantum.plugins.openvswitch.common import config
+from quantum.plugins.openvswitch.common import config # noqa
from quantum.tests import base
self.assertEqual(port.switch.br_name, self.BR_NAME)
# test __str__
- foo = str(port)
+ str(port)
self.mox.VerifyAll()
self.assertEqual(ovs_lib.get_bridge_for_iface(root_helper, iface), br)
self.mox.VerifyAll()
- def test_iface_to_br(self):
+ def test_iface_to_br_handles_ovs_vsctl_exception(self):
iface = 'tap0'
- br = 'br-int'
root_helper = 'sudo'
utils.execute(["ovs-vsctl", self.TO, "iface-to-br", iface],
root_helper=root_helper).AndRaise(Exception)
'ca:fe:de:ad:be:ef', 'br')
port2 = ovs_lib.VifPort('tap5678', 2, uuidutils.generate_uuid(),
'ca:ee:de:ad:be:ef', 'br')
- ports = [port1, port2]
self.mox.StubOutWithMock(self.br, 'get_vif_ports')
self.br.get_vif_ports().AndReturn([port1, port2])
self.mox.StubOutWithMock(self.br, 'delete_port')
from quantum.agent.linux import ip_lib
from quantum.agent.linux import ovs_lib
from quantum.agent.linux import utils
-from quantum.agent import rpc
from quantum.openstack.common import log
from quantum.plugins.openvswitch.agent import ovs_quantum_agent
from quantum.plugins.openvswitch.common import constants
def testConstruct(self):
self.mox.ReplayAll()
- b = ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
- self.TUN_BRIDGE,
- '10.0.0.1', self.NET_MAPPING,
- 'sudo', 2, True)
+ ovs_quantum_agent.OVSQuantumAgent(self.INT_BRIDGE,
+ self.TUN_BRIDGE,
+ '10.0.0.1', self.NET_MAPPING,
+ 'sudo', 2, True)
self.mox.VerifyAll()
def testProvisionLocalVlan(self):
from oslo.config import cfg
-#NOTE this import loads tests required options
-from quantum.plugins.ryu.common import config
+from quantum.plugins.ryu.common import config # noqa
from quantum.tests import base
from contextlib import nested
import operator
-from oslo.config import cfg
-
from quantum.db import api as db
-# NOTE: this import is needed for correct plugin code work
-from quantum.plugins.ryu.common import config
+from quantum.plugins.ryu.common import config # noqa
from quantum.plugins.ryu.db import api_v2 as db_api_v2
-# NOTE: this import is needed for correct plugin code work
-from quantum.plugins.ryu.db import models_v2 as ryu_models_v2
+from quantum.plugins.ryu.db import models_v2 as ryu_models_v2 # noqa
from quantum.tests.unit import test_db_plugin as test_plugin
# See the License for the specific language governing permissions and
# limitations under the License.
-import mock
-
-# NOTE: this import is needed for correct plugin code work
-from quantum.plugins.ryu.db import models_v2 as ryu_models_v2
+from quantum.plugins.ryu.db import models_v2 as ryu_models_v2 # noqa
from quantum.tests.unit.ryu import fake_ryu
from quantum.tests.unit import test_db_plugin as test_plugin
from quantum.api.v2 import attributes
from quantum.extensions import securitygroup as ext_sg
from quantum import manager
-from quantum.plugins.ryu.db import api_v2 as api_db_v2
from quantum.tests.unit.ryu import fake_ryu
from quantum.tests.unit import test_extension_security_group as test_sg
from quantum.tests.unit import test_security_groups_rpc as test_sg_rpc
import contextlib
import mock
from oslo.config import cfg
-import testtools
from quantum.plugins.services.agent_loadbalancer import agent
from quantum.tests import base
def test_get_logical_device_inactive(self):
with self.pool() as pool:
with self.vip(pool=pool) as vip:
- with self.member(pool_id=vip['vip']['pool_id']) as member:
+ with self.member(pool_id=vip['vip']['pool_id']):
self.assertRaises(
Exception,
self.callbacks.get_logical_device,
with self.pool() as pool:
with self.vip(pool=pool) as vip:
- with self.member(pool_id=vip['vip']['pool_id']) as member:
+ with self.member(pool_id=vip['vip']['pool_id']):
ctx = context.get_admin_context()
func(ctx, port_id=vip['vip']['port_id'], **kwargs)
def test_delete_quantum_ports(self):
ports = ['tap1234', 'tap5678', 'tap09ab']
port_found = [True, False, True]
- delete_ports = [p for p, found
- in itertools.izip(ports, port_found) if found]
with contextlib.nested(
mock.patch.object(ip_lib, 'device_exists',
side_effect=port_found),
# under the License.
import mock
-from oslo.config import cfg
from quantum.agent import rpc
from quantum.openstack.common import context
call_to_patch = 'quantum.openstack.common.rpc.create_connection'
with mock.patch(call_to_patch) as create_connection:
- conn = rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')])
+ rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')])
create_connection.assert_has_calls(expected)
import mock
from oslo.config import cfg
-from testtools import matchers
import webob
from webob import exc
import webtest
from quantum.api.v2 import base as v2_base
from quantum.api.v2 import router
from quantum.common import config
-from quantum.common import constants
from quantum.common import exceptions as q_exc
from quantum import context
from quantum.manager import QuantumManager
from quantum.openstack.common import uuidutils
from quantum.tests import base
from quantum.tests.unit import testlib_api
-from quantum import wsgi
ROOTDIR = os.path.dirname(os.path.dirname(__file__))
from oslo.config import cfg
-from quantum.common import config
+from quantum.common import config # noqa
from quantum.tests import base
import mock
from oslo.config import cfg
-import sqlalchemy as sa
import testtools
from testtools import matchers
import webob.exc
def _create_port(self, fmt, net_id, expected_res_status=None,
arg_list=None, **kwargs):
- content_type = 'application/' + fmt
data = {'port': {'network_id': net_id,
'tenant_id': self._tenant_id}}
self.assertEqual(port['port']['id'], sport['port']['id'])
def test_delete_port(self):
- port_id = None
with self.port() as port:
- port_id = port['port']['id']
- req = self.new_show_request('port', self.fmt, port['port']['id'])
- res = req.get_response(self.api)
- self.assertEqual(res.status_int, 404)
+ req = self.new_show_request('port', self.fmt, port['port']['id'])
+ res = req.get_response(self.api)
+ self.assertEqual(res.status_int, 404)
def test_delete_port_public_network(self):
with self.network(shared=True) as network:
set_context=True)
port = self.deserialize(self.fmt, port_res)
- port_id = port['port']['id']
# delete the port
self._delete('ports', port['port']['id'])
# Todo: verify!!!
kwargs = {"mac_address": mac}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
- port2 = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
def test_mac_generation(self):
cfg.CONF.set_override('base_mac', "12:34:56:00:00:00")
with self.port() as port:
mac = port['port']['mac_address']
- # check that MAC address matches base MAC
- base_mac = cfg.CONF.base_mac
self.assertTrue(mac.startswith("12:34:56"))
def test_mac_generation_4octet(self):
cfg.CONF.set_override('base_mac', "12:34:56:78:00:00")
with self.port() as port:
mac = port['port']['mac_address']
- # check that MAC address matches base MAC
- base_mac = cfg.CONF.base_mac
self.assertTrue(mac.startswith("12:34:56:78"))
def test_bad_mac_format(self):
'ip_address': ips[0]['ip_address']}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
- port2 = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 409)
def test_requested_subnet_delete(self):
'ip_address': '1011.0.0.5'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
- port = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
def test_requested_split(self):
'ip_address': '10.0.0.5'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
- port2 = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
def test_fixed_ip_invalid_subnet_id(self):
'ip_address': '10.0.0.5'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
- port2 = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
def test_fixed_ip_invalid_ip(self):
'ip_address': '10.0.0.55555'}]}
net_id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
- port2 = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, 400)
def test_requested_ips_only(self):
matchers.GreaterThan(datetime.timedelta(seconds=10)))
def test_port_delete_holds_ip(self):
- plugin = QuantumManager.get_plugin()
base_class = db_base_plugin_v2.QuantumDbPluginV2
with mock.patch.object(base_class, '_hold_ip') as hold_ip:
with self.subnet() as subnet:
def test_create_public_network_no_admin_tenant(self):
name = 'public_net'
- keys = [('subnets', []), ('name', name), ('admin_state_up', True),
- ('status', 'ACTIVE'), ('shared', True)]
with testtools.ExpectedException(
webob.exc.HTTPClientError) as ctx_manager:
with self.network(name=name,
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
- network_id = network['network']['id']
subnet = self._make_subnet(self.fmt, network, gateway_ip,
cidr, ip_version=4)
self._create_port(self.fmt,
def test_delete_subnet_port_exists_owned_by_other(self):
with self.subnet() as subnet:
- with self.port(subnet=subnet) as port:
+ with self.port(subnet=subnet):
id = subnet['subnet']['id']
req = self.new_delete_request('subnets', id)
res = req.get_response(self.api)
res = self._create_network(fmt=self.fmt, name='net',
admin_state_up=True)
network = self.deserialize(self.fmt, res)
- subnet = self._make_subnet(self.fmt, network, gateway_ip,
- cidr, ip_version=4)
+ self._make_subnet(self.fmt, network, gateway_ip, cidr, ip_version=4)
req = self.new_delete_request('networks', network['network']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, 204)
expected = {'gateway_ip': gateway,
'cidr': cidr,
'allocation_pools': allocation_pools}
- subnet = self._test_create_subnet(expected=expected,
- gateway_ip=gateway)
+ self._test_create_subnet(expected=expected, gateway_ip=gateway)
# Gateway is last IP in range
gateway = '10.0.0.254'
allocation_pools = [{'start': '10.0.0.1',
expected = {'gateway_ip': gateway,
'cidr': cidr,
'allocation_pools': allocation_pools}
- subnet = self._test_create_subnet(expected=expected,
- gateway_ip=gateway)
+ self._test_create_subnet(expected=expected, gateway_ip=gateway)
# Gateway is first in subnet
gateway = '10.0.0.1'
allocation_pools = [{'start': '10.0.0.2',
expected = {'gateway_ip': gateway,
'cidr': cidr,
'allocation_pools': allocation_pools}
- subnet = self._test_create_subnet(expected=expected,
- gateway_ip=gateway)
+ self._test_create_subnet(expected=expected,
+ gateway_ip=gateway)
def test_create_force_subnet_gw_values(self):
cfg.CONF.set_override('force_gateway_on_subnet', True)
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, 201)
port = self.deserialize(self.fmt, res)
- port_id = port['port']['id']
# delete the port
self._delete('ports', port['port']['id'])
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(res.status_int, 201)
port = self.deserialize(self.fmt, res)
- port_id = port['port']['id']
# delete the port
self._delete('ports', port['port']['id'])
cfg.CONF.set_override('dhcp_lease_duration', 120)
expectations = [
mock.call.update_port(mock.ANY, 'port_id', dict(port=port_retval))]
- retval = self._test_get_dhcp_port_helper(port_retval, expectations,
- update_port=port_retval)
+ self._test_get_dhcp_port_helper(port_retval, expectations,
+ update_port=port_retval)
self.assertEqual(len(self.log.mock_calls), 1)
def test_get_dhcp_port_create_new(self):
with mock.patch('quantum.agent.linux.ip_lib.IpNetnsCommand') as ns:
cmd.run(parsed_args)
ns.assert_has_calls([mock.call.execute(mock.ANY)])
- fake_port = {'port':
- {'device_owner': DEVICE_OWNER_NETWORK_PROBE,
- 'admin_state_up': True,
- 'network_id': 'fake_net',
- 'tenant_id': 'fake_tenant',
- 'fixed_ips': [{'subnet_id': 'fake_subnet'}],
- 'device_id': socket.gethostname()}}
expected = [mock.call.list_ports(),
mock.call.list_ports(
network_id='fake_net',
class TestDhcpAgent(base.BaseTestCase):
def setUp(self):
super(TestDhcpAgent, self).setUp()
- cfg.CONF.register_opts(dhcp_agent.DeviceManager.OPTS)
- cfg.CONF.register_opts(dhcp_agent.DhcpAgent.OPTS)
- cfg.CONF.register_opts(dhcp_agent.DhcpLeaseRelay.OPTS)
+ dhcp_agent.register_options()
cfg.CONF.set_override('interface_driver',
'quantum.agent.linux.interface.NullDriver')
self.driver_cls_p = mock.patch(
mock.call().wait()])
def test_run_completes_single_pass(self):
- with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
+ with mock.patch('quantum.agent.dhcp_agent.DeviceManager'):
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
attrs_to_mock = dict(
[(a, mock.DEFAULT) for a in
[mock.call.start()])
def test_ns_name(self):
- with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
+ with mock.patch('quantum.agent.dhcp_agent.DeviceManager'):
mock_net = mock.Mock(id='foo')
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
self.assertEqual(dhcp._ns_name(mock_net), 'qdhcp-foo')
def test_ns_name_disabled_namespace(self):
- with mock.patch('quantum.agent.dhcp_agent.DeviceManager') as dev_mgr:
+ with mock.patch('quantum.agent.dhcp_agent.DeviceManager'):
cfg.CONF.set_override('use_namespaces', False)
mock_net = mock.Mock(id='foo')
dhcp = dhcp_agent.DhcpAgent(HOSTNAME)
fake_port = FakeModel('12345678-1234-aaaa-1234567890ab',
mac_address='aa:bb:cc:dd:ee:ff')
- expected_driver_calls = [mock.call(cfg.CONF),
- mock.call().get_device_name(fake_network),
- mock.call().unplug('tap12345678-12')]
-
with mock.patch('quantum.agent.linux.interface.NullDriver') as dvr_cls:
mock_driver = mock.MagicMock()
#mock_driver.DEV_NAME_LEN = (
fake_port = FakeModel('12345678-1234-aaaa-1234567890ab',
mac_address='aa:bb:cc:dd:ee:ff')
- expected_driver_calls = [mock.call(cfg.CONF),
- mock.call().get_device_name(fake_network),
- mock.call().unplug('tap12345678-12')]
-
with mock.patch('quantum.agent.linux.interface.NullDriver') as dvr_cls:
mock_driver = mock.MagicMock()
mock_driver.get_device_name.return_value = 'tap12345678-12'
fake_port = FakeModel('12345678-1234-aaaa-1234567890ab',
mac_address='aa:bb:cc:dd:ee:ff')
- expected_driver_calls = [mock.call(cfg.CONF),
- mock.call().get_device_name(fake_network),
- mock.call().unplug('tap12345678-12')]
-
with mock.patch('quantum.agent.linux.interface.NullDriver') as dvr_cls:
mock_driver = mock.MagicMock()
mock_driver.get_device_name.return_value = 'tap12345678-12'
exists.return_value = False
self.unlink.side_effect = OSError
- relay = dhcp_agent.DhcpLeaseRelay(None)
+ dhcp_agent.DhcpLeaseRelay(None)
self.unlink.assert_called_once_with(
cfg.CONF.dhcp_lease_relay_socket)
with mock.patch('os.path.exists') as exists:
exists.return_value = False
- relay = dhcp_agent.DhcpLeaseRelay(None)
+ dhcp_agent.DhcpLeaseRelay(None)
self.unlink.assert_called_once_with(
cfg.CONF.dhcp_lease_relay_socket)
with mock.patch('os.path.exists') as exists:
exists.return_value = True
with testtools.ExpectedException(OSError):
- relay = dhcp_agent.DhcpLeaseRelay(None)
+ dhcp_agent.DhcpLeaseRelay(None)
self.unlink.assert_called_once_with(
cfg.CONF.dhcp_lease_relay_socket)
self.assertTrue(log.called)
def test_handler_other_exception(self):
- network_id = 'cccccccc-cccc-cccc-cccc-cccccccccccc'
- ip_address = '192.168.x.x'
- lease_remaining = 120
-
- json_rep = jsonutils.dumps(
- dict(network_id=network_id,
- lease_remaining=lease_remaining,
- ip_address=ip_address))
handler = mock.Mock()
mock_sock = mock.Mock()
mock_sock.recv.side_effect = Exception
import os
import mock
-from oslo.config import cfg
import webob.exc
from quantum.api.v2 import attributes as attr
# anything that is below 200 or above 400 so we can't actually check
# it. It throws webtest.AppError instead.
try:
- response = (
- test_app.get("/tweedles/some_id/notimplemented_function"))
+ test_app.get("/tweedles/some_id/notimplemented_function")
# Shouldn't be reached
self.assertTrue(False)
except webtest.AppError:
from quantum.openstack.common.notifier import api as notifier_api
from quantum.openstack.common.notifier import test_notifier
from quantum.openstack.common import uuidutils
-from quantum.tests import base
from quantum.tests.unit import test_api_v2
from quantum.tests.unit import test_db_plugin
from quantum.tests.unit import test_extensions
# @author: Mark McClain, DreamHost
import os
+import sys
import mock
import testtools
self.os.O_CREAT = os.O_CREAT
self.os.O_RDWR = os.O_RDWR
- p = daemon.Pidfile('thefile', 'python')
+ daemon.Pidfile('thefile', 'python')
self.os.open.assert_called_once_with('thefile', os.O_CREAT | os.O_RDWR)
self.fcntl.flock.assert_called_once_with(FAKE_FD, self.fcntl.LOCK_EX)
def test_init_open_fail(self):
self.os.open.side_effect = IOError
- with mock.patch.object(daemon.sys, 'stderr') as stderr:
+ with mock.patch.object(daemon.sys, 'stderr'):
with testtools.ExpectedException(SystemExit):
- p = daemon.Pidfile('thefile', 'python')
+ daemon.Pidfile('thefile', 'python')
sys.assert_has_calls([
mock.call.stderr.write(mock.ANY),
mock.call.exit(1)]
def test_fork_error(self):
self.os.fork.side_effect = lambda: OSError(1)
- with mock.patch.object(daemon.sys, 'stderr') as stderr:
+ with mock.patch.object(daemon.sys, 'stderr'):
with testtools.ExpectedException(SystemExit):
d = daemon.Daemon('pidfile', 'stdin')
d._fork()
self.pidfile.return_value.is_running.return_value = True
d = daemon.Daemon('pidfile')
- with mock.patch.object(daemon.sys, 'stderr') as stderr:
+ with mock.patch.object(daemon.sys, 'stderr'):
with mock.patch.object(d, 'daemonize') as daemonize:
with testtools.ExpectedException(SystemExit):
d.start()
active.__get__ = mock.Mock(return_value=True)
manager = ep.ProcessManager(self.conf, 'uuid', namespace='ns')
- with mock.patch.object(ep, 'ip_lib') as ip_lib:
+ with mock.patch.object(ep, 'ip_lib'):
manager.enable(callback)
self.assertFalse(callback.called)
# under the License.
import mock
-from oslo.config import cfg
from quantum.agent.common import config
from quantum.agent.dhcp_agent import DeviceManager
class TestBase(base.BaseTestCase):
def setUp(self):
super(TestBase, self).setUp()
- root_helper_opt = [
- cfg.StrOpt('root_helper', default='sudo'),
- ]
self.conf = config.setup_conf()
self.conf.register_opts(interface.OPTS)
config.register_root_helper(self.conf)
self.conf.set_override('network_device_mtu', 9000)
self._test_plug(mtu=9000)
- def test_unplug(self):
- self.device_exists.return_value = True
- with mock.patch('quantum.agent.linux.interface.LOG.debug') as log:
- br = interface.BridgeInterfaceDriver(self.conf)
- br.unplug('tap0')
- log.assert_called_once()
- self.execute.assert_has_calls(
- [mock.call(['ip', 'link', 'delete', 'tap0'], 'sudo')])
-
def test_unplug_no_device(self):
self.device_exists.return_value = False
self.ip_dev().link.delete.side_effect = RuntimeError
ip = ip_lib.IPWrapper('sudo')
with mock.patch.object(ip.netns, 'exists') as ns_exists:
ns_exists.return_value = False
- ns = ip.ensure_namespace('ns')
+ ip.ensure_namespace('ns')
self.execute.assert_has_calls(
[mock.call([], 'netns', ('add', 'ns'), 'sudo', None)])
ip_dev.assert_has_calls([mock.call('lo', 'sudo', 'ns'),
self.assertEqual(ns.namespace, 'ns')
def test_delete_namespace(self):
- with mock.patch('quantum.agent.linux.utils.execute') as execute:
+ with mock.patch('quantum.agent.linux.utils.execute'):
self.netns_cmd.delete('ns')
self._assert_sudo([], ('delete', 'ns'), force_root_namespace=True)
import webtest
from quantum.api import extensions
-from quantum.api.v2 import attributes
from quantum.common import config
from quantum.extensions import loadbalancer
from quantum import manager
from quantum.openstack.common import uuidutils
from quantum.plugins.common import constants
-from quantum.tests import base
from quantum.tests.unit import test_api_v2
from quantum.tests.unit import test_extensions
from quantum.tests.unit import testlib_api
webob.exc.HTTPInternalServerError)
def test_proxy_request_other_code(self):
- with testtools.ExpectedException(Exception) as e:
+ with testtools.ExpectedException(Exception):
self._proxy_request_test_helper(302)
def test_sign_instance_id(self):
with mock.patch('os.path.isdir') as isdir:
with mock.patch('os.makedirs') as makedirs:
isdir.return_value = False
- p = agent.UnixDomainMetadataProxy(mock.Mock())
+ agent.UnixDomainMetadataProxy(mock.Mock())
isdir.assert_called_once_with('/the')
makedirs.assert_called_once_with('/the', 0755)
with mock.patch('os.path.isdir') as isdir:
with mock.patch('os.unlink') as unlink:
isdir.return_value = True
- p = agent.UnixDomainMetadataProxy(mock.Mock())
+ agent.UnixDomainMetadataProxy(mock.Mock())
isdir.assert_called_once_with('/the')
unlink.assert_called_once_with('/the/path')
exists.return_value = False
unlink.side_effect = OSError
- p = agent.UnixDomainMetadataProxy(mock.Mock())
+ agent.UnixDomainMetadataProxy(mock.Mock())
isdir.assert_called_once_with('/the')
unlink.assert_called_once_with('/the/path')
unlink.side_effect = OSError
with testtools.ExpectedException(OSError):
- p = agent.UnixDomainMetadataProxy(mock.Mock())
+ agent.UnixDomainMetadataProxy(mock.Mock())
isdir.assert_called_once_with('/the')
unlink.assert_called_once_with('/the/path')
with mock.patch('eventlet.monkey_patch') as eventlet:
with mock.patch.object(agent, 'config') as config:
with mock.patch.object(agent, 'cfg') as cfg:
- with mock.patch.object(utils, 'cfg') as utils_cfg:
+ with mock.patch.object(utils, 'cfg'):
agent.main()
self.assertTrue(eventlet.called)
def test_proxy_request_network_exception(self):
self.handler.network_id = 'network_id'
- resp = mock.Mock(status=500)
+ mock.Mock(status=500)
with mock.patch('httplib2.Http') as mock_http:
mock_http.return_value.request.side_effect = Exception
class TestProxyDaemon(base.BaseTestCase):
def test_init(self):
- with mock.patch('quantum.agent.linux.daemon.Pidfile') as pf:
+ with mock.patch('quantum.agent.linux.daemon.Pidfile'):
pd = ns_proxy.ProxyDaemon('pidfile', 9697, 'net_id', 'router_id')
self.assertEqual(pd.router_id, 'router_id')
self.assertEqual(pd.network_id, 'net_id')
def test_run(self):
- with mock.patch('quantum.agent.linux.daemon.Pidfile') as pf:
+ with mock.patch('quantum.agent.linux.daemon.Pidfile'):
with mock.patch('quantum.wsgi.Server') as Server:
pd = ns_proxy.ProxyDaemon('pidfile', 9697, 'net_id',
'router_id')
"""Test of Policy Engine For Quantum"""
-import contextlib
-import os
-import shutil
import StringIO
import urllib2
self.assertIsNone(cxt_dict['user_id'])
self.assertIsNone(cxt_dict['tenant_id'])
try:
- session = cxt.session
+ cxt.session
except Exception:
pass
else:
f = filters.IpNetnsExecFilter('/sbin/ip', 'root')
self.assertFalse(f.match(['ip', 'link', 'list']))
- def test_match_filter_recurses_exec_command_filter(self):
+ def test_match_filter_recurses_exec_command_filter_matches(self):
filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'),
filters.IpFilter('/sbin/ip', 'root')]
args = ['ip', 'netns', 'exec', 'foo', 'ip', 'link', 'list']
self.assertIsNotNone(wrapper.match_filter(filter_list, args))
- def test_match_filter_recurses_exec_command_filter(self):
+ def test_match_filter_recurses_exec_command_filter_does_not_match(self):
filter_list = [filters.IpNetnsExecFilter('/sbin/ip', 'root'),
filters.IpFilter('/sbin/ip', 'root')]
args = ['ip', 'netns', 'exec', 'foo', 'ip', 'netns', 'exec', 'bar',
uattrs['router_id'] = self._router_id
with testtools.ExpectedException(
webexc.HTTPClientError) as ctx_manager:
- newobj = self._do_request(
+ self._do_request(
'PUT',
_get_path('lb/{0}s/{1}'.format(res, obj['id'])), data)
self.assertEqual(ctx_manager.exception.code, 400)
else:
- newobj = self._do_request(
+ self._do_request(
'PUT',
_get_path('lb/{0}s/{1}'.format(res, obj['id'])), data)
updated = self._do_request(
import webtest
from quantum.api import extensions
-from quantum.api.v2 import attributes
from quantum import context
from quantum.db import api as db_api
from quantum.db import servicetype_db
from quantum.extensions import servicetype
from quantum import manager
from quantum.plugins.common import constants
-from quantum.tests import base
from quantum.tests.unit import dummy_plugin as dp
from quantum.tests.unit import test_api_v2
from quantum.tests.unit import test_db_plugin
def _test_service_type_update(self, env=None,
expected_status=webexc.HTTPOk.code):
svc_type_name = 'updated'
- tenant_id = 'fake'
- if env and 'quantum.context' in env:
- tenant_id = env['quantum.context'].tenant_id
data = {self.resource_name: {'name': svc_type_name}}
svc_type_id = _uuid()
python run_tests.py functional.test_stores
"""
-import gettext
import os
import sys
discover
distribute>=0.6.24
fixtures>=0.3.12
+flake8
mock>=1.0b1
mox==0.5.3
nose
nosehtmloutput
nosexcover
openstack.nose_plugin
-pep8==1.3.3
python-subunit
sphinx>=1.1.2
testrepository>=0.0.13
downloadcache = ~/cache/pip
[testenv:pep8]
-# E711/E712 comparison to False should be 'if cond is False:' or 'if not cond:'
-# query = query.filter(Component.disabled == False)
-# E125 continuation line does not distinguish itself from next logical line
-
commands =
- pep8 --repeat --show-source --ignore=E125,E711,E712 --exclude=.venv,.tox,dist,doc,openstack,*egg .
- pep8 --repeat --show-source --ignore=E125 --filename=quantum* bin
+ flake8
[testenv:i18n]
commands = python ./tools/check_i18n.py ./quantum ./tools/i18n_cfg.py
[testenv:venv]
commands = {posargs}
+
+[flake8]
+# E711/E712 comparison to False should be 'if cond is False:' or 'if not cond:'
+# query = query.filter(Component.disabled == False)
+# E125 continuation line does not distinguish itself from next logical line
+# H hacking.py - automatic checks of rules in HACKING.rst
+ignore = E711,E712,E125,H
+show-source = true
+builtins = _
+exclude=.venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,tools