def delete_port(self, port_name):
self.run_vsctl(["--", "--if-exists", "del-port", self.br_name,
- port_name])
+ port_name])
def set_db_attribute(self, table_name, record, column, value):
args = ["set", table_name, record, "%s=%s" % (column, value)]
is_delete_expr = kwargs.get('delete', False)
print "kwargs = %s" % kwargs
if not is_delete_expr:
- prefix = ("hard_timeout=%s,idle_timeout=%s,priority=%s"
- % (kwargs.get('hard_timeout', '0'),
- kwargs.get('idle_timeout', '0'),
- kwargs.get('priority', '1')))
+ prefix = ("hard_timeout=%s,idle_timeout=%s,priority=%s" %
+ (kwargs.get('hard_timeout', '0'),
+ kwargs.get('idle_timeout', '0'),
+ kwargs.get('priority', '1')))
flow_expr_arr.append(prefix)
elif 'priority' in kwargs:
raise Exception("Cannot match priority on flow deletion")
return utils.execute(["xe", "vif-param-get", "param-name=other-config",
"param-key=nicira-iface-id",
"uuid=%s" % xs_vif_uuid],
- root_helper=self.root_helper).strip()
+ root_helper=self.root_helper).strip()
# returns a VIF object for each VIF port
def get_vif_ports(self):
'1.1': [ControllerV11(plugin),
ControllerV11._serialization_metadata,
common.XML_NS_V11],
- }
+ }
return common.create_resource(version, controller_dict)
{
'param-name': 'id',
'required': True,
- },
- ]
+ },
+ ]
_serialization_metadata = {
"application/xml": {
"attributes": {
"attachment": ["id"],
- },
},
- }
+ },
+ }
@common.APIFaultWrapper([exception.NetworkNotFound,
exception.PortNotFound])
'code': code,
'message': wrapped_exc.explanation,
'detail': str(wrapped_exc.detail),
- },
- }
+ },
+ }
metadata = {'attributes': {fault_name: ['code']}}
return fault_data, metadata
'type': fault_name,
'message': wrapped_exc.explanation,
'detail': str(wrapped_exc.detail),
- },
- }
+ },
+ }
# Metadata not required for v11
return fault_data, None
class Quantum10HTTPError(webob.exc.HTTPClientError):
_fault_dict = {
- exceptions.NetworkNotFound: {
- 'code': 420,
- 'title': 'networkNotFound',
- 'explanation': _NETNOTFOUND_EXPL
- },
- exceptions.NetworkInUse: {
- 'code': 421,
- 'title': 'networkInUse',
- 'explanation': _NETINUSE_EXPL
- },
- exceptions.PortNotFound: {
- 'code': 430,
- 'title': 'portNotFound',
- 'explanation': _PORTNOTFOUND_EXPL
- },
- exceptions.StateInvalid: {
- 'code': 431,
- 'title': 'requestedStateInvalid',
- 'explanation': _STATEINVALID_EXPL
- },
- exceptions.PortInUse: {
- 'code': 432,
- 'title': 'portInUse',
- 'explanation': _PORTINUSE_EXPL
- },
- exceptions.AlreadyAttached: {
- 'code': 440,
- 'title': 'alreadyAttached',
- 'explanation': _ALREADYATTACHED_EXPL
- },
- exceptions.NotImplementedError: {
- 'code': 501,
- 'title': 'notImplemented',
- 'explanation': _NOTIMPLEMENTED_EXPL
- }
+ exceptions.NetworkNotFound: {
+ 'code': 420,
+ 'title': 'networkNotFound',
+ 'explanation': _NETNOTFOUND_EXPL
+ },
+ exceptions.NetworkInUse: {
+ 'code': 421,
+ 'title': 'networkInUse',
+ 'explanation': _NETINUSE_EXPL
+ },
+ exceptions.PortNotFound: {
+ 'code': 430,
+ 'title': 'portNotFound',
+ 'explanation': _PORTNOTFOUND_EXPL
+ },
+ exceptions.StateInvalid: {
+ 'code': 431,
+ 'title': 'requestedStateInvalid',
+ 'explanation': _STATEINVALID_EXPL
+ },
+ exceptions.PortInUse: {
+ 'code': 432,
+ 'title': 'portInUse',
+ 'explanation': _PORTINUSE_EXPL
+ },
+ exceptions.AlreadyAttached: {
+ 'code': 440,
+ 'title': 'alreadyAttached',
+ 'explanation': _ALREADYATTACHED_EXPL
+ },
+ exceptions.NotImplementedError: {
+ 'code': 501,
+ 'title': 'notImplemented',
+ 'explanation': _NOTIMPLEMENTED_EXPL
+ }
}
def __init__(self, inner_exc):
class Quantum11HTTPError(webob.exc.HTTPClientError):
_fault_dict = {
- exceptions.NetworkNotFound: {
- 'code': webob.exc.HTTPNotFound.code,
- 'title': webob.exc.HTTPNotFound.title,
- 'type': 'NetworkNotFound',
- 'explanation': _NETNOTFOUND_EXPL
- },
- exceptions.NetworkInUse: {
- 'code': webob.exc.HTTPConflict.code,
- 'title': webob.exc.HTTPConflict.title,
- 'type': 'NetworkInUse',
- 'explanation': _NETINUSE_EXPL
- },
- exceptions.PortNotFound: {
- 'code': webob.exc.HTTPNotFound.code,
- 'title': webob.exc.HTTPNotFound.title,
- 'type': 'PortNotFound',
- 'explanation': _PORTNOTFOUND_EXPL
- },
- exceptions.StateInvalid: {
- 'code': webob.exc.HTTPBadRequest.code,
- 'title': webob.exc.HTTPBadRequest.title,
- 'type': 'RequestedStateInvalid',
- 'explanation': _STATEINVALID_EXPL
- },
- exceptions.PortInUse: {
- 'code': webob.exc.HTTPConflict.code,
- 'title': webob.exc.HTTPConflict.title,
- 'type': 'PortInUse',
- 'explanation': _PORTINUSE_EXPL
- },
- exceptions.AlreadyAttached: {
- 'code': webob.exc.HTTPConflict.code,
- 'title': webob.exc.HTTPConflict.title,
- 'type': 'AlreadyAttached',
- 'explanation': _ALREADYATTACHED_EXPL
- }
+ exceptions.NetworkNotFound: {
+ 'code': webob.exc.HTTPNotFound.code,
+ 'title': webob.exc.HTTPNotFound.title,
+ 'type': 'NetworkNotFound',
+ 'explanation': _NETNOTFOUND_EXPL
+ },
+ exceptions.NetworkInUse: {
+ 'code': webob.exc.HTTPConflict.code,
+ 'title': webob.exc.HTTPConflict.title,
+ 'type': 'NetworkInUse',
+ 'explanation': _NETINUSE_EXPL
+ },
+ exceptions.PortNotFound: {
+ 'code': webob.exc.HTTPNotFound.code,
+ 'title': webob.exc.HTTPNotFound.title,
+ 'type': 'PortNotFound',
+ 'explanation': _PORTNOTFOUND_EXPL
+ },
+ exceptions.StateInvalid: {
+ 'code': webob.exc.HTTPBadRequest.code,
+ 'title': webob.exc.HTTPBadRequest.title,
+ 'type': 'RequestedStateInvalid',
+ 'explanation': _STATEINVALID_EXPL
+ },
+ exceptions.PortInUse: {
+ 'code': webob.exc.HTTPConflict.code,
+ 'title': webob.exc.HTTPConflict.title,
+ 'type': 'PortInUse',
+ 'explanation': _PORTINUSE_EXPL
+ },
+ exceptions.AlreadyAttached: {
+ 'code': webob.exc.HTTPConflict.code,
+ 'title': webob.exc.HTTPConflict.title,
+ 'type': 'AlreadyAttached',
+ 'explanation': _ALREADYATTACHED_EXPL
+ }
}
def __init__(self, inner_exc):
'1.1': [ControllerV11(plugin),
ControllerV11._serialization_metadata,
common.XML_NS_V11],
- }
+ }
return common.create_resource(version, controller_dict)
version = None
_network_ops_param_list = [
{'param-name': 'name', 'required': True},
- ]
+ ]
def _item(self, request, tenant_id, network_id,
net_details=True, port_details=False):
"network": ["id", "name"],
"port": ["id", "state"],
"attachment": ["id"],
- },
+ },
"plurals": {
"networks": "network",
"ports": "port",
- },
- }
+ },
+ }
version = "1.0"
"network": ["id", "name", "op-status"],
"port": ["id", "state", "op-status"],
"attachment": ["id"],
- },
+ },
"plurals": {
"networks": "network",
"ports": "port",
- },
- }
+ },
+ }
version = "1.1"
'1.1': [ControllerV11(plugin),
ControllerV11._serialization_metadata,
common.XML_NS_V11],
- }
+ }
return common.create_resource(version, controller_dict)
version = None
_port_ops_param_list = [
{'param-name': 'state', 'default-value': 'DOWN', 'required': False},
- ]
+ ]
def _items(self, request, tenant_id, network_id,
port_details=False):
"attributes": {
"port": ["id", "state"],
"attachment": ["id"],
- },
+ },
"plurals": {
"ports": "port",
- },
- }
+ },
+ }
version = "1.0"
"attributes": {
"port": ["id", "state", "op-status"],
"attachment": ["id"],
- },
+ },
"plurals": {
"ports": "port",
- },
- }
+ },
+ }
version = "1.1"
if self._resource not in ['network', 'port']:
return
- if ('tenant_id' in res_dict and
- res_dict['tenant_id'] != context.tenant_id and
- not context.is_admin):
+ if (('tenant_id' in res_dict and
+ res_dict['tenant_id'] != context.tenant_id and
+ not context.is_admin)):
msg = _("Specifying 'tenant_id' other than authenticated"
"tenant in request requires admin privileges")
raise webob.exc.HTTPBadRequest(msg)
res_dict['tenant_id'] = context.tenant_id
else:
msg = _("Running without keystyone AuthN requires "
- " that tenant_id is specified")
+ " that tenant_id is specified")
raise webob.exc.HTTPBadRequest(msg)
def _prepare_request_body(self, context, body, is_create,
# NOTE(jkoelker) To anyone wishing to add "proper" xml support
# this is where you do it
- serializers = {
+ serializers = {}
# 'application/xml': wsgi.XMLDictSerializer(metadata, XML_NS_V20),
- }
- deserializers = {
+ deserializers = {}
# 'application/xml': wsgi.XMLDeserializer(metadata),
- }
return wsgi_resource.Resource(controller, FAULT_MAP, deserializers,
serializers)
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
- metadata = {'application/xml': {
- 'attributes': {
- 'resource': ['name', 'collection'],
- 'link': ['href', 'rel'],
- }
- }
- }
+ metadata = {'application/xml': {'attributes': {
+ 'resource': ['name', 'collection'],
+ 'link': ['href', 'rel']}}}
layout = []
for name, collection in self.resources.iteritems():
for resource in resources:
_map_resource(resources[resource], resource,
RESOURCE_ATTRIBUTE_MAP.get(resources[resource],
- dict()))
+ dict()))
super(APIRouter, self).__init__(mapper)
}
content_type = req.best_match_content_type()
- body = wsgi.Serializer(metadata=metadata). \
- serialize(response, content_type)
+ body = (wsgi.Serializer(metadata=metadata).
+ serialize(response, content_type))
response = webob.Response()
response.content_type = content_type
# Don't pass filter options, don't care about unused filters
port_list = plugin.get_all_ports(tenant_id, network['net-id'])
ports_data = [plugin.get_port_details(
- tenant_id, network['net-id'],
- port['port-id'])
+ tenant_id, network['net-id'],
+ port['port-id'])
for port in port_list]
network['net-ports'] = ports_data
'attachment': _filter_port_by_interface}
# port details are need for filtering
ports = [plugin.get_port_details(tenant_id, network_id,
- port['port-id'])
- for port in ports]
+ port['port-id']) for port in ports]
# filter ports
return _do_filtering(ports,
filters,
def _build_detail(self, network_data):
"""Return a detailed model of a network."""
return dict(network=dict(id=network_data['net-id'],
- name=network_data['net-name']))
+ name=network_data['net-name']))
def _build_port(self, port_data):
"""Return details about a specific logical port."""
def _build_port(self, port_data):
"""Return details about a specific logical port."""
op_status = port_data.get('port-op-status',
- OperationalStatus.UNKNOWN)
+ OperationalStatus.UNKNOWN)
port_dict = {'id': port_data['port-id'],
'state': port_data['port-state'],
'op-status': op_status}
if port_details:
port['port']['state'] = port_data['port-state']
port['port']['op-status'] = port_data.get('port-op-status',
- OperationalStatus.UNKNOWN)
+ OperationalStatus.
+ UNKNOWN)
if att_details and port_data['attachment']:
port['port']['attachment'] = dict(id=port_data['attachment'])
return port
"the Python logging module documentation for "
"details on logging configuration files.")
group.add_option('--log-date-format', metavar="FORMAT",
- default=DEFAULT_LOG_DATE_FORMAT,
- help="Format string for %(asctime)s in log records. "
- "Default: %default")
+ default=DEFAULT_LOG_DATE_FORMAT,
+ help="Format string for %(asctime)s in log records. "
+ "Default: %default")
group.add_option('--use-syslog', default=False,
- action="store_true",
- help="Output logs to syslog.")
+ action="store_true",
+ help="Output logs to syslog.")
group.add_option('--log-file', default=None, metavar="PATH",
- help="(Optional) Name of log file to output to. "
- "If not set, logging will go to stdout.")
+ help="(Optional) Name of log file to output to. "
+ "If not set, logging will go to stdout.")
group.add_option("--log-dir", default=None,
- help="(Optional) The directory to keep log files in "
- "(will be prepended to --logfile)")
+ help="(Optional) The directory to keep log files in "
+ "(will be prepended to --logfile)")
parser.add_option_group(group)
'/etc']
if 'plugin' in options:
- config_file_dirs = [
- os.path.join(x, 'quantum', 'plugins', options['plugin'])
- for x in config_file_dirs
- ]
+ config_file_dirs = [os.path.join(x, 'quantum',
+ 'plugins', options['plugin'])
+ for x in config_file_dirs]
if os.path.exists(os.path.join(root, 'plugins')):
plugins = [fix_path(os.path.join(root, 'plugins', p, 'etc'))
conf_file = find_config_file(options, args)
if not conf_file:
raise RuntimeError("Unable to locate any configuration file. "
- "Cannot load application %s" % app_name)
+ "Cannot load application %s" % app_name)
try:
conf = deploy.appconfig("config:%s" % conf_file, name=app_name)
return conf_file, conf
See _AnsiColorizer docstring.
"""
def __init__(self, stream):
- from win32console import GetStdHandle, STD_OUT_HANDLE, \
- FOREGROUND_RED, FOREGROUND_BLUE, FOREGROUND_GREEN, \
- FOREGROUND_INTENSITY
+ from win32console import GetStdHandle, STD_OUT_HANDLE
+ from win32console import FOREGROUND_RED, FOREGROUND_BLUE
+ from win32console import FOREGROUND_GREEN, FOREGROUND_INTENSITY
red, green, blue, bold = (FOREGROUND_RED, FOREGROUND_GREEN,
FOREGROUND_BLUE, FOREGROUND_INTENSITY)
self.stream = stream
class QuantumTestRunner(core.TextTestRunner):
def _makeResult(self):
return QuantumTestResult(self.stream,
- self.descriptions,
- self.verbosity,
- self.config)
+ self.descriptions,
+ self.verbosity,
+ self.config)
def run_tests(c=None):
return True
runner = QuantumTestRunner(stream=c.stream,
- verbosity=c.verbosity,
- config=c)
+ verbosity=c.verbosity,
+ config=c)
return not core.run(config=c, testRunner=runner)
# describes parameters used by different unit/functional tests
if addl_env:
env.update(addl_env)
obj = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ env=env)
result = None
if process_input is not None:
result = obj.communicate(process_input)
*only* deleted records are visible.
"""
if kwargs:
- LOG.warn(_('Arguments dropped when creating context: %s') %
- str(kwargs))
+ LOG.warn(_('Arguments dropped when creating '
+ 'context: %s') % str(kwargs))
self.user_id = user_id
self.tenant_id = tenant_id
'admin_state_up': network['admin_state_up'],
'status': network['status'],
'subnets': [subnet['id']
- for subnet in network['subnets']]}
+ for subnet in network['subnets']]}
return self._fields(res, fields)
self.op_status = op_status
def __repr__(self):
- return "<Network(%s,%s,%s,%s)>" % \
- (self.uuid, self.name, self.op_status, self.tenant_id)
+ return "<Network(%s,%s,%s,%s)>" % (self.uuid, self.name,
+ self.op_status, self.tenant_id)
port_id = sa.Column(sa.String(36), sa.ForeignKey('ports.id'))
address = sa.Column(sa.String(16), nullable=False, primary_key=True)
subnet_id = sa.Column(sa.String(36), sa.ForeignKey('subnets.id'),
- primary_key=True)
+ primary_key=True)
allocated = sa.Column(sa.Boolean(), nullable=False)
class Port(model_base.BASEV2, HasTenant):
"""Represents a port on a quantum v2 network"""
network_id = sa.Column(sa.String(36), sa.ForeignKey("networks.id"),
- nullable=False)
+ nullable=False)
fixed_ips = orm.relationship(IPAllocation, backref='ports')
mac_address = sa.Column(sa.String(32), nullable=False)
admin_state_up = sa.Column(sa.Boolean(), nullable=False)
"""Return a detailed info of a portprofile."""
if (portprofile_data['assignment'] is None):
return dict(portprofile=dict(
- id=portprofile_data['profile_id'],
- name=portprofile_data['profile_name'],
- qos_name=portprofile_data['qos_name'],
- ))
+ id=portprofile_data['profile_id'],
+ name=portprofile_data['profile_name'],
+ qos_name=portprofile_data['qos_name']))
else:
return dict(portprofile=dict(
- id=portprofile_data['profile_id'],
- name=portprofile_data['profile_name'],
- qos_name=portprofile_data['qos_name'],
- assignment=portprofile_data['assignment'],
- ))
+ id=portprofile_data['profile_id'],
+ name=portprofile_data['profile_name'],
+ qos_name=portprofile_data['qos_name'],
+ assignment=portprofile_data['assignment']))
{'param-name': 'credential_name', 'required': True},
{'param-name': 'user_name', 'required': True},
{'param-name': 'password', 'required': True},
- ]
+ ]
_serialization_metadata = {
"application/xml": {
# extended resources
for resource in self.ext_mgr.get_resources():
LOG.debug(_('Extended resource: %s'),
- resource.collection)
+ resource.collection)
for action, method in resource.collection_actions.iteritems():
path_prefix = ""
parent = resource.parent
LOG.warn(_('Loaded extension: %s'), alias)
if alias in self.extensions:
- raise exceptions.Error("Found duplicate extension: %s"
- % alias)
+ raise exceptions.Error("Found duplicate extension: %s" %
+ alias)
self.extensions[alias] = ext
"""Checks if plugin supports extension and implements the
extension contract."""
extension_is_valid = super(PluginAwareExtensionManager,
- self)._check_extension(extension)
+ self)._check_extension(extension)
return (extension_is_valid and
self._plugin_supports(extension) and
self._plugin_implements_interface(extension))
{'param-name': 'net_id_list', 'required': True},
{'param-name': 'status', 'required': True},
{'param-name': 'ports_desc', 'required': True},
- ]
+ ]
_serialization_metadata = {
"application/xml": {
_Novatenant_ops_param_list = [
{'param-name': 'novatenant_name', 'required': True},
- ]
+ ]
_schedule_host_ops_param_list = [
{'param-name': 'instance_id', 'required': True},
{'param-name': 'instance_desc', 'required': True},
- ]
+ ]
_serialization_metadata = {
"application/xml": {
self._plugin = plugin
self._portprofile_ops_param_list = [{
- 'param-name': 'portprofile_name',
- 'required': True}, {
- 'param-name': 'qos_name',
- 'required': True}, {
- 'param-name': 'assignment',
- 'required': False}]
+ 'param-name': 'portprofile_name',
+ 'required': True}, {
+ 'param-name': 'qos_name',
+ 'required': True}, {
+ 'param-name': 'assignment',
+ 'required': False}
+ ]
self._assignprofile_ops_param_list = [{
- 'param-name': 'network-id',
- 'required': True}, {
- 'param-name': 'port-id',
- 'required': True}]
+ 'param-name': 'network-id',
+ 'required': True}, {
+ 'param-name': 'port-id',
+ 'required': True}
+ ]
self._serialization_metadata = {
- "application/xml": {
- "attributes": {
- "portprofile": ["id", "name"],
+ "application/xml": {
+ "attributes": {
+ "portprofile": ["id", "name"],
+ },
},
- },
- }
+ }
def index(self, request, tenant_id):
""" Returns a list of portprofile ids """
def show(self, request, tenant_id, id):
""" Returns portprofile details for the given portprofile id """
try:
- portprofile = self._plugin.get_portprofile_details(
- tenant_id, id)
+ portprofile = self._plugin.get_portprofile_details(tenant_id, id)
builder = pprofiles_view.get_view_builder(request)
#build response with details
result = builder.build(portprofile, True)
req_params = req_body[self._resource_name]
except exc.HTTPError as exp:
return faults.Fault(exp)
- portprofile = self._plugin.\
- create_portprofile(tenant_id,
- req_params['portprofile_name'],
- req_params['qos_name'])
+ portprofile = \
+ self._plugin.create_portprofile(tenant_id,
+ req_params['portprofile_name'],
+ req_params['qos_name'])
builder = pprofiles_view.get_view_builder(request)
result = builder.build(portprofile)
return dict(portprofiles=result)
except exc.HTTPError as exp:
return faults.Fault(exp)
try:
- portprofile = self._plugin.\
- rename_portprofile(tenant_id,
- id, req_params['portprofile_name'])
+ portprofile = \
+ self._plugin.rename_portprofile(tenant_id, id,
+ req_params['portprofile_name'])
builder = pprofiles_view.get_view_builder(request)
result = builder.build(portprofile, True)
port_id = req_params['port-id'].strip()
try:
self._plugin.associate_portprofile(tenant_id,
- net_id, port_id,
- id)
+ net_id, port_id,
+ id)
return exc.HTTPOk()
except exception.PortProfileNotFound as exp:
return faults.Fault(faults.PortprofileNotFound(exp))
net_id = req_params['network-id'].strip()
port_id = req_params['port-id'].strip()
try:
- self._plugin. \
- disassociate_portprofile(tenant_id,
- net_id, port_id, id)
+ self._plugin.disassociate_portprofile(tenant_id,
+ net_id, port_id, id)
return exc.HTTPOk()
except exception.PortProfileNotFound as exp:
return faults.Fault(faults.PortprofileNotFound(exp))
"application/xml": {
"attributes": {
"stats": ["rx_bytes", "rx_packets", "rx_errors",
- "tx_bytes", "tx_packets", "tx_errors"],
- },
- },
+ "tx_bytes", "tx_packets", "tx_errors"]
+ }
}
+ }
def __init__(self, plugin):
self._resource_name = 'stats'
_qos_ops_param_list = [
{'param-name': 'qos_name', 'required': True},
{'param-name': 'qos_desc', 'required': True},
- ]
+ ]
_serialization_metadata = {
"application/xml": {
fix_path('~'),
os.path.join('/etc', project) if project else None,
'/etc'
- ]
+ ]
return filter(bool, cfg_dirs)
dest = self.dest
if group is not None:
dest = group.name + '_' + dest
- kwargs.update({
- 'dest': dest,
- 'metavar': self.metavar,
- 'help': self.help,
- })
+ kwargs.update({'dest': dest,
+ 'metavar': self.metavar,
+ 'help': self.help})
return kwargs
def _get_optparse_prefix(self, prefix, group):
self.__cache = {}
opts = [
- MultiStrOpt('config-file',
- default=self.default_config_files,
- metavar='PATH',
- help='Path to a config file to use. Multiple config '
- 'files can be specified, with values in later '
- 'files taking precedence. The default files '
- ' used are: %s' %
- (self.default_config_files, )),
+ MultiStrOpt('config-file',
+ default=self.default_config_files,
+ metavar='PATH',
+ help='Path to a config file to use. Multiple config '
+ 'files can be specified, with values in later '
+ 'files taking precedence. The default files '
+ ' used are: %s' %
+ (self.default_config_files)),
StrOpt('config-dir',
metavar='DIR',
help='Path to a config directory to pull *.conf '
'the file(s), if any, specified via --config-file, '
'hence over-ridden options in the directory take '
'precedence.'),
- ]
+ ]
self.register_cli_opts(opts)
def __clear_cache(f):
default, opt, override = [info[k] for k in sorted(info.keys())]
if opt.required:
- if (default is not None or
- override is not None):
+ if (default is not None or override is not None):
continue
if self._get(opt.name, group) is None:
short='v',
default=False,
help='Print more verbose output'),
- ]
+ ]
logging_cli_opts = [
StrOpt('log-config',
StrOpt('syslog-log-facility',
default='LOG_USER',
help='syslog facility to receive log lines')
- ]
+ ]
def __init__(self, **kwargs):
super(CommonConfigOpts, self).__init__(**kwargs)
return getattr(sys.modules[mod_str], class_str)
except (ImportError, ValueError, AttributeError), exc:
raise ImportError('Class %s cannot be found (%s)' %
- (class_str, str(exc)))
+ (class_str, str(exc)))
def import_object(import_str, *args, **kwargs):
'instance_desc': {
'user_id': user_id,
'project_id': project_id,
- },
},
- }
+ },
+ }
request_url = "/novatenants/" + project_id + "/schedule_host"
client = Client(HOST, PORT, USE_SSL, format='json', tenant=TENANT_ID,
"func": create_multiport,
"args": ["tenant-id",
"net-id-list (comma separated list of netword IDs)"],
- },
+ },
"list_extensions": {
"func": list_extensions,
"args": [],
- },
+ },
"schedule_host": {
"func": schedule_host,
"args": ["tenant-id", "instance-id"],
- },
- }
+ },
+}
def main():
class NoMoreNics(exceptions.QuantumException):
"""No more dynamic nics are available in the system"""
message = _("Unable to complete operation. No more dynamic nics are "
- "available in the system.")
+ "available in the system.")
class PortProfileLimit(exceptions.QuantumException):
class PortProfileBindingAlreadyExists(exceptions.QuantumException):
"""Binding cannot be created, since it already exists"""
message = _("PortProfileBinding for port profile %(pp_id)s to "
- "port %(port_id) already exists")
+ "port %(port_id) already exists")
class VlanIDNotFound(exceptions.QuantumException):
"""Error codes for API faults"""
_fault_names = {
- 400: "malformedRequest",
- 401: "unauthorized",
- 421: "PortprofileInUse",
- 450: "PortprofileNotFound",
- 451: "CredentialNotFound",
- 452: "QoSNotFound",
- 453: "NovatenantNotFound",
- 454: "MultiportNotFound",
- 470: "serviceUnavailable",
- 471: "pluginFault"}
+ 400: "malformedRequest",
+ 401: "unauthorized",
+ 421: "PortprofileInUse",
+ 450: "PortprofileNotFound",
+ 451: "CredentialNotFound",
+ 452: "QoSNotFound",
+ 453: "NovatenantNotFound",
+ 454: "MultiportNotFound",
+ 470: "serviceUnavailable",
+ 471: "pluginFault"
+ }
def __init__(self, exception):
"""Create a Fault for the given webob.exc.exception."""
return res
-def make_portprofile_dict(tenant_id, profile_id, profile_name,
- qos):
+def make_portprofile_dict(tenant_id, profile_id,
+ profile_name, qos):
"""Helper funciton"""
profile_associations = make_portprofile_assc_list(tenant_id,
profile_id)
def network_list(tenant_id):
session = get_session()
- return session.query(models.Network).\
- options(joinedload(models.Network.ports)). \
- filter_by(tenant_id=tenant_id).\
- all()
+ return (session.query(models.Network).
+ options(joinedload(models.Network.ports)).
+ filter_by(tenant_id=tenant_id).
+ all())
def network_id(net_name):
session = get_session()
try:
- return session.query(models.Network).\
- options(joinedload(models.Network.ports)). \
- filter_by(name=net_name).\
- all()
+ return (session.query(models.Network).
+ options(joinedload(models.Network.ports)).
+ filter_by(name=net_name).
+ all())
except exc.NoResultFound, e:
raise q_exc.NetworkNotFound(net_name=net_name)
def network_destroy(net_id):
session = get_session()
try:
- net = session.query(models.Network).\
- filter_by(uuid=net_id).\
- one()
+ net = (session.query(models.Network).
+ filter_by(uuid=net_id).
+ one())
session.delete(net)
session.flush()
return net
def validate_network_ownership(tenant_id, net_id):
session = get_session()
try:
- return session.query(models.Network).\
- filter_by(uuid=net_id).\
- filter_by(tenant_id=tenant_id).\
- one()
+ return (session.query(models.Network).
+ filter_by(uuid=net_id).
+ filter_by(tenant_id=tenant_id).
+ one())
except exc.NoResultFound, e:
raise q_exc.NetworkNotFound(net_id=net_id)
def port_list(net_id):
session = get_session()
- return session.query(models.Port).\
- options(joinedload(models.Port.network)). \
- filter_by(network_id=net_id).\
- all()
+ return (session.query(models.Port).
+ options(joinedload(models.Port.network)).
+ filter_by(network_id=net_id).
+ all())
def port_get(net_id, port_id):
network_get(net_id)
session = get_session()
try:
- return session.query(models.Port).\
- filter_by(uuid=port_id).\
- filter_by(network_id=net_id).\
- one()
+ return (session.query(models.Port).
+ filter_by(uuid=port_id).
+ filter_by(network_id=net_id).
+ one())
except exc.NoResultFound:
raise q_exc.PortNotFound(net_id=net_id, port_id=port_id)
# We are setting, not clearing, the attachment-id
if port['interface_id']:
raise q_exc.PortInUse(net_id=net_id, port_id=port_id,
- att_id=port['interface_id'])
+ att_id=port['interface_id'])
try:
- port = session.query(models.Port).\
- filter_by(interface_id=new_interface_id).\
- one()
+ port = (session.query(models.Port).
+ filter_by(interface_id=new_interface_id).
+ one())
raise q_exc.AlreadyAttached(net_id=net_id,
- port_id=port_id,
- att_id=new_interface_id,
- att_port_id=port['uuid'])
+ port_id=port_id,
+ att_id=new_interface_id,
+ att_port_id=port['uuid'])
except exc.NoResultFound:
# this is what should happen
pass
session = get_session()
try:
- port = session.query(models.Port).\
- filter_by(uuid=port_id).\
- filter_by(network_id=net_id).\
- one()
+ port = (session.query(models.Port).
+ filter_by(uuid=port_id).
+ filter_by(network_id=net_id).
+ one())
if port['interface_id']:
raise q_exc.PortInUse(net_id=net_id, port_id=port_id,
- att_id=port['interface_id'])
+ att_id=port['interface_id'])
session.delete(port)
session.flush()
return port
def port_get_by_id(port_id):
session = get_session()
try:
- return session.query(models.Port).\
- filter_by(uuid=port_id).one()
+ return (session.query(models.Port).
+ filter_by(uuid=port_id).one())
except exc.NoResultFound:
raise q_exc.PortNotFound(port_id=port_id)
filter_by(interface_id=new_interface_id).\
one()
raise q_exc.AlreadyAttached(port_id=port_id,
- att_id=new_interface_id,
- att_port_id=port['uuid'])
+ att_id=new_interface_id,
+ att_port_id=port['uuid'])
except exc.NoResultFound:
pass
port.interface_id = new_interface_id
def initialize():
'Establish database connection and load models'
options = {"sql_connection": "mysql://%s:%s@%s/%s" % (conf.DB_USER,
- conf.DB_PASS, conf.DB_HOST, conf.DB_NAME)}
+ conf.DB_PASS, conf.DB_HOST, conf.DB_NAME)}
db.configure_db(options)
LOG.debug("create_vlanids() called")
session = db.get_session()
try:
- vlanid = session.query(l2network_models.VlanID).\
- one()
+ vlanid = session.query(l2network_models.VlanID).one()
except exc.MultipleResultsFound:
pass
except exc.NoResultFound:
LOG.debug("get_all_vlanids() called")
session = db.get_session()
try:
- vlanids = session.query(l2network_models.VlanID).\
- all()
+ vlanids = session.query(l2network_models.VlanID).all()
return vlanids
except exc.NoResultFound:
return []
LOG.debug("is_vlanid_used() called")
session = db.get_session()
try:
- vlanid = session.query(l2network_models.VlanID).\
- filter_by(vlan_id=vlan_id).\
- one()
+ vlanid = (session.query(l2network_models.VlanID).
+ filter_by(vlan_id=vlan_id).one())
return vlanid["vlan_used"]
except exc.NoResultFound:
raise c_exc.VlanIDNotFound(vlan_id=vlan_id)
LOG.debug("release_vlanid() called")
session = db.get_session()
try:
- vlanid = session.query(l2network_models.VlanID).\
- filter_by(vlan_id=vlan_id).\
- one()
+ vlanid = (session.query(l2network_models.VlanID).
+ filter_by(vlan_id=vlan_id).one())
vlanid["vlan_used"] = False
session.merge(vlanid)
session.flush()
LOG.debug("delete_vlanid() called")
session = db.get_session()
try:
- vlanid = session.query(l2network_models.VlanID).\
- filter_by(vlan_id=vlan_id).\
- one()
+ vlanid = (session.query(l2network_models.VlanID).
+ filter_by(vlan_id=vlan_id).one())
session.delete(vlanid)
session.flush()
return vlanid
LOG.debug("reserve_vlanid() called")
session = db.get_session()
try:
- rvlan = session.query(l2network_models.VlanID).\
- filter_by(vlan_used=False).\
- first()
+ rvlan = (session.query(l2network_models.VlanID).
+ filter_by(vlan_used=False).first())
if not rvlan:
raise exc.NoResultFound
- rvlanid = session.query(l2network_models.VlanID).\
- filter_by(vlan_id=rvlan["vlan_id"]).\
- one()
+ rvlanid = (session.query(l2network_models.VlanID).
+ filter_by(vlan_id=rvlan["vlan_id"]).one())
rvlanid["vlan_used"] = True
session.merge(rvlanid)
session.flush()
LOG.debug("get_all_vlanids() called")
session = db.get_session()
try:
- vlanids = session.query(l2network_models.VlanID).\
- filter_by(vlan_used=True).\
- all()
+ vlanids = (session.query(l2network_models.VlanID).
+ filter_by(vlan_used=True).all())
return vlanids
except exc.NoResultFound:
return []
LOG.debug("get_all_vlan_bindings() called")
session = db.get_session()
try:
- bindings = session.query(l2network_models.VlanBinding).\
- all()
+ bindings = session.query(l2network_models.VlanBinding).all()
return bindings
except exc.NoResultFound:
return []
LOG.debug("get_vlan_binding() called")
session = db.get_session()
try:
- binding = session.query(l2network_models.VlanBinding).\
- filter_by(network_id=netid).\
- one()
+ binding = (session.query(l2network_models.VlanBinding).
+ filter_by(network_id=netid).one())
return binding
except exc.NoResultFound:
raise q_exc.NetworkNotFound(net_id=netid)
LOG.debug("add_vlan_binding() called")
session = db.get_session()
try:
- binding = session.query(l2network_models.VlanBinding).\
- filter_by(vlan_id=vlanid).\
- one()
+ binding = (session.query(l2network_models.VlanBinding).
+ filter_by(vlan_id=vlanid).one())
raise c_exc.NetworkVlanBindingAlreadyExists(vlan_id=vlanid,
network_id=netid)
except exc.NoResultFound:
LOG.debug("remove_vlan_binding() called")
session = db.get_session()
try:
- binding = session.query(l2network_models.VlanBinding).\
- filter_by(network_id=netid).\
- one()
+ binding = (session.query(l2network_models.VlanBinding).
+ filter_by(network_id=netid).one())
session.delete(binding)
session.flush()
return binding
LOG.debug("update_vlan_binding() called")
session = db.get_session()
try:
- binding = session.query(l2network_models.VlanBinding).\
- filter_by(network_id=netid).\
- one()
+ binding = (session.query(l2network_models.VlanBinding).
+ filter_by(network_id=netid).one())
if newvlanid:
binding["vlan_id"] = newvlanid
if newvlanname:
LOG.debug("get_all_portprofiles() called")
session = db.get_session()
try:
- pps = session.query(l2network_models.PortProfile).\
- all()
+ pps = session.query(l2network_models.PortProfile).all()
return pps
except exc.NoResultFound:
return []
LOG.debug("get_portprofile() called")
session = db.get_session()
try:
- pp = session.query(l2network_models.PortProfile).\
- filter_by(uuid=ppid).\
- one()
+ pp = (session.query(l2network_models.PortProfile).
+ filter_by(uuid=ppid).one())
return pp
except exc.NoResultFound:
raise c_exc.PortProfileNotFound(tenant_id=tenantid,
- portprofile_id=ppid)
+ portprofile_id=ppid)
def add_portprofile(tenantid, ppname, vlanid, qos):
LOG.debug("add_portprofile() called")
session = db.get_session()
try:
- pp = session.query(l2network_models.PortProfile).\
- filter_by(name=ppname).\
- one()
+ pp = (session.query(l2network_models.PortProfile).
+ filter_by(name=ppname).one())
raise c_exc.PortProfileAlreadyExists(tenant_id=tenantid,
- pp_name=ppname)
+ pp_name=ppname)
except exc.NoResultFound:
pp = l2network_models.PortProfile(ppname, vlanid, qos)
session.add(pp)
LOG.debug("remove_portprofile() called")
session = db.get_session()
try:
- pp = session.query(l2network_models.PortProfile).\
- filter_by(uuid=ppid).\
- one()
+ pp = (session.query(l2network_models.PortProfile).
+ filter_by(uuid=ppid).one())
session.delete(pp)
session.flush()
return pp
LOG.debug("update_portprofile() called")
session = db.get_session()
try:
- pp = session.query(l2network_models.PortProfile).\
- filter_by(uuid=ppid).\
- one()
+ pp = (session.query(l2network_models.PortProfile).
+ filter_by(uuid=ppid).one())
if newppname:
pp["name"] = newppname
if newvlanid:
return pp
except exc.NoResultFound:
raise c_exc.PortProfileNotFound(tenant_id=tenantid,
- portprofile_id=ppid)
+ portprofile_id=ppid)
def get_all_pp_bindings():
LOG.debug("get_all_pp_bindings() called")
session = db.get_session()
try:
- bindings = session.query(l2network_models.PortProfileBinding).\
- all()
+ bindings = session.query(l2network_models.PortProfileBinding).all()
return bindings
except exc.NoResultFound:
return []
LOG.debug("get_pp_binding() called")
session = db.get_session()
try:
- binding = session.query(l2network_models.PortProfileBinding).\
- filter_by(portprofile_id=ppid).\
- one()
+ binding = (session.query(l2network_models.PortProfileBinding).
+ filter_by(portprofile_id=ppid).one())
return binding
except exc.NoResultFound:
return []
LOG.debug("add_pp_binding() called")
session = db.get_session()
try:
- binding = session.query(l2network_models.PortProfileBinding).\
- filter_by(portprofile_id=ppid).\
- one()
+ binding = (session.query(l2network_models.PortProfileBinding).
+ filter_by(portprofile_id=ppid).one())
raise c_exc.PortProfileBindingAlreadyExists(pp_id=ppid,
port_id=portid)
except exc.NoResultFound:
LOG.debug("remove_pp_binding() called")
session = db.get_session()
try:
- binding = session.query(l2network_models.PortProfileBinding).\
- filter_by(portprofile_id=ppid).\
- filter_by(port_id=portid).\
- one()
+ binding = (session.query(l2network_models.PortProfileBinding).
+ filter_by(portprofile_id=ppid).filter_by(port_id=portid).
+ one())
session.delete(binding)
session.flush()
return binding
pass
-def update_pp_binding(tenantid, ppid, newtenantid=None, newportid=None,
- newdefault=None):
+def update_pp_binding(tenantid, ppid, newtenantid=None,
+ newportid=None, newdefault=None):
"""Updates port profile binding"""
LOG.debug("update_pp_binding() called")
session = db.get_session()
try:
- binding = session.query(l2network_models.PortProfileBinding).\
- filter_by(portprofile_id=ppid).\
- one()
+ binding = (session.query(l2network_models.PortProfileBinding).
+ filter_by(portprofile_id=ppid).one())
if newtenantid:
binding["tenant_id"] = newtenantid
if newportid:
return binding
except exc.NoResultFound:
raise c_exc.PortProfileNotFound(tenant_id=tenantid,
- portprofile_id=ppid)
+ portprofile_id=ppid)
def get_all_qoss(tenant_id):
LOG.debug("get_all_qoss() called")
session = db.get_session()
try:
- qoss = session.query(l2network_models.QoS).\
- filter_by(tenant_id=tenant_id).\
- all()
+ qoss = (session.query(l2network_models.QoS).
+ filter_by(tenant_id=tenant_id).all())
return qoss
except exc.NoResultFound:
return []
LOG.debug("get_qos() called")
session = db.get_session()
try:
- qos = session.query(l2network_models.QoS).\
- filter_by(tenant_id=tenant_id).\
- filter_by(qos_id=qos_id).\
- one()
+ qos = (session.query(l2network_models.QoS).
+ filter_by(tenant_id=tenant_id).
+ filter_by(qos_id=qos_id).one())
return qos
except exc.NoResultFound:
raise c_exc.QosNotFound(qos_id=qos_id,
LOG.debug("add_qos() called")
session = db.get_session()
try:
- qos = session.query(l2network_models.QoS).\
- filter_by(tenant_id=tenant_id).\
- filter_by(qos_name=qos_name).\
- one()
+ qos = (session.query(l2network_models.QoS).
+ filter_by(tenant_id=tenant_id).
+ filter_by(qos_name=qos_name).one())
raise c_exc.QosNameAlreadyExists(qos_name=qos_name,
- tenant_id=tenant_id)
+ tenant_id=tenant_id)
except exc.NoResultFound:
qos = l2network_models.QoS(tenant_id, qos_name, qos_desc)
session.add(qos)
"""Removes a qos to tenant association"""
session = db.get_session()
try:
- qos = session.query(l2network_models.QoS).\
- filter_by(tenant_id=tenant_id).\
- filter_by(qos_id=qos_id).\
- one()
+ qos = (session.query(l2network_models.QoS).
+ filter_by(tenant_id=tenant_id).
+ filter_by(qos_id=qos_id).one())
session.delete(qos)
session.flush()
return qos
"""Updates a qos to tenant association"""
session = db.get_session()
try:
- qos = session.query(l2network_models.QoS).\
- filter_by(tenant_id=tenant_id).\
- filter_by(qos_id=qos_id).\
- one()
+ qos = (session.query(l2network_models.QoS).
+ filter_by(tenant_id=tenant_id).
+ filter_by(qos_id=qos_id).one())
if new_qos_name:
qos["qos_name"] = new_qos_name
session.merge(qos)
"""Lists all the creds for a tenant"""
session = db.get_session()
try:
- creds = session.query(l2network_models.Credential).\
- filter_by(tenant_id=tenant_id).\
- all()
+ creds = (session.query(l2network_models.Credential).
+ filter_by(tenant_id=tenant_id).all())
return creds
except exc.NoResultFound:
return []
"""Lists the creds for given a cred_id and tenant_id"""
session = db.get_session()
try:
- cred = session.query(l2network_models.Credential).\
- filter_by(tenant_id=tenant_id).\
- filter_by(credential_id=credential_id).\
- one()
+ cred = (session.query(l2network_models.Credential).
+ filter_by(tenant_id=tenant_id).
+ filter_by(credential_id=credential_id).one())
return cred
except exc.NoResultFound:
raise c_exc.CredentialNotFound(credential_id=credential_id,
- tenant_id=tenant_id)
+ tenant_id=tenant_id)
def get_credential_name(tenant_id, credential_name):
"""Lists the creds for given a cred_name and tenant_id"""
session = db.get_session()
try:
- cred = session.query(l2network_models.Credential).\
- filter_by(tenant_id=tenant_id).\
- filter_by(credential_name=credential_name).\
- one()
+ cred = (session.query(l2network_models.Credential).
+ filter_by(tenant_id=tenant_id).
+ filter_by(credential_name=credential_name).one())
return cred
except exc.NoResultFound:
raise c_exc.CredentialNameNotFound(credential_name=credential_name,
- tenant_id=tenant_id)
+ tenant_id=tenant_id)
def add_credential(tenant_id, credential_name, user_name, password):
"""Adds a qos to tenant association"""
session = db.get_session()
try:
- cred = session.query(l2network_models.Credential).\
- filter_by(tenant_id=tenant_id).\
- filter_by(credential_name=credential_name).\
- one()
+ cred = (session.query(l2network_models.Credential).
+ filter_by(tenant_id=tenant_id).
+ filter_by(credential_name=credential_name).one())
raise c_exc.CredentialAlreadyExists(credential_name=credential_name,
tenant_id=tenant_id)
except exc.NoResultFound:
- cred = l2network_models.Credential(tenant_id,
- credential_name, user_name, password)
+ cred = l2network_models.Credential(tenant_id, credential_name,
+ user_name, password)
session.add(cred)
session.flush()
return cred
"""Removes a credential from a tenant"""
session = db.get_session()
try:
- cred = session.query(l2network_models.Credential).\
- filter_by(tenant_id=tenant_id).\
- filter_by(credential_id=credential_id).\
- one()
+ cred = (session.query(l2network_models.Credential).
+ filter_by(tenant_id=tenant_id).
+ filter_by(credential_id=credential_id).one())
session.delete(cred)
session.flush()
return cred
"""Updates a credential for a tenant"""
session = db.get_session()
try:
- cred = session.query(l2network_models.Credential).\
- filter_by(tenant_id=tenant_id).\
- filter_by(credential_id=credential_id).\
- one()
+ cred = (session.query(l2network_models.Credential).
+ filter_by(tenant_id=tenant_id).
+ filter_by(credential_id=credential_id).one())
if new_user_name:
cred["user_name"] = new_user_name
if new_password:
return cred
except exc.NoResultFound:
raise c_exc.CredentialNotFound(credential_id=credential_id,
- tenant_id=tenant_id)
+ tenant_id=tenant_id)
Includes attributes from joins."""
local = dict(self)
joined = dict([(k, v) for k, v in self.__dict__.iteritems()
- if not k[0] == '_'])
+ if not k[0] == '_'])
local.update(joined)
return local.iteritems()
self.vlan_used = False
def __repr__(self):
- return "<VlanID(%d,%s)>" % \
- (self.vlan_id, self.vlan_used)
+ return "<VlanID(%d,%s)>" % (self.vlan_id, self.vlan_used)
class VlanBinding(BASE, L2NetworkBase):
self.network_id = network_id
def __repr__(self):
- return "<VlanBinding(%d,%s,%s)>" % \
- (self.vlan_id, self.vlan_name, self.network_id)
+ return "<VlanBinding(%d,%s,%s)>" % (self.vlan_id,
+ self.vlan_name,
+ self.network_id)
class PortProfile(BASE, L2NetworkBase):
self.qos = qos
def __repr__(self):
- return "<PortProfile(%s,%s,%d,%s)>" % \
- (self.uuid, self.name, self.vlan_id, self.qos)
+ return "<PortProfile(%s,%s,%d,%s)>" % (self.uuid,
+ self.name,
+ self.vlan_id,
+ self.qos)
class PortProfileBinding(BASE, L2NetworkBase):
id = Column(Integer, primary_key=True, autoincrement=True)
tenant_id = Column(String(255))
- port_id = Column(String(255), ForeignKey("ports.uuid"),
- nullable=False)
+ port_id = Column(String(255), ForeignKey("ports.uuid"), nullable=False)
portprofile_id = Column(String(255), ForeignKey("portprofiles.uuid"),
nullable=False)
default = Column(Boolean)
self.default = default
def __repr__(self):
- return "<PortProfile Binding(%s,%s,%s,%s)>" % \
- (self.tenant_id, self.port_id, self.portprofile_id, self.default)
+ return "<PortProfile Binding(%s,%s,%s,%s)>" % (self.tenant_id,
+ self.port_id,
+ self.portprofile_id,
+ self.default)
class QoS(BASE, L2NetworkBase):
self.qos_desc = qos_desc
def __repr__(self):
- return "<QoS(%s,%s,%s,%s)>" % \
- (self.qos_id, self.tenant_id, self.qos_name, self.qos_desc)
+ return "<QoS(%s,%s,%s,%s)>" % (self.qos_id, self.tenant_id,
+ self.qos_name, self.qos_desc)
class Credential(BASE, L2NetworkBase):
self.password = password
def __repr__(self):
- return "<Credentials(%s,%s,%s,%s,%s)>" % \
- (self.credential_id, self.tenant_id, self.credential_name,
- self.user_name, self.password)
+ return "<Credentials(%s,%s,%s,%s,%s)>" % (self.credential_id,
+ self.tenant_id,
+ self.credential_name,
+ self.user_name,
+ self.password)
Includes attributes from joins."""
local = dict(self)
joined = dict([(k, v) for k, v in self.__dict__.iteritems()
- if not k[0] == '_'])
+ if not k[0] == '_'])
local.update(joined)
return local.iteritems()
def __repr__(self):
return "<Port(%s,%s,%s,%s)>" % (self.uuid, self.network_id,
- self.state, self.interface_id)
+ self.state, self.interface_id)
class Network(BASE, QuantumBase):
self.name = name
def __repr__(self):
- return "<Network(%s,%s,%s)>" % \
- (self.uuid, self.name, self.tenant_id)
+ return "<Network(%s,%s,%s)>" % (self.uuid, self.name, self.tenant_id)
LOG.debug("get_all_nexusport_bindings() called")
session = db.get_session()
try:
- bindings = session.query(nexus_models.NexusPortBinding).\
- all()
+ bindings = session.query(nexus_models.NexusPortBinding).all()
return bindings
except exc.NoResultFound:
return []
LOG.debug("get_nexusport_binding() called")
session = db.get_session()
try:
- binding = session.query(nexus_models.NexusPortBinding).\
- filter_by(vlan_id=vlan_id).\
- all()
+ binding = (session.query(nexus_models.NexusPortBinding).
+ filter_by(vlan_id=vlan_id).all())
return binding
except exc.NoResultFound:
raise c_exc.NexusPortBindingNotFound(vlan_id=vlan_id)
LOG.debug("remove_nexusport_binding() called")
session = db.get_session()
try:
- binding = session.query(nexus_models.NexusPortBinding).\
- filter_by(vlan_id=vlan_id).\
- all()
+ binding = (session.query(nexus_models.NexusPortBinding).
+ filter_by(vlan_id=vlan_id).all())
for bind in binding:
session.delete(bind)
session.flush()
LOG.debug("update_nexusport_binding called")
session = db.get_session()
try:
- binding = session.query(nexus_models.NexusPortBinding).\
- filter_by(port_id=port_id).\
- one()
+ binding = (session.query(nexus_models.NexusPortBinding).
+ filter_by(port_id=port_id).one())
if new_vlan_id:
binding["vlan_id"] = new_vlan_id
session.merge(binding)
self.vlan_id = vlan_id
def __repr__(self):
- return "<NexusPortBinding (%s,%d)>" % \
- (self.port_id, self.vlan_id)
+ return "<NexusPortBinding (%s,%d)>" % (self.port_id, self.vlan_id)
LOG.debug("get_all_services_bindings() called")
session = db.get_session()
try:
- bindings = session.query(services_models.ServicesBinding).\
- all()
+ bindings = session.query(services_models.ServicesBinding).all()
return bindings
except exc.NoResultFound:
return []
LOG.debug("get_service_bindings() called")
session = db.get_session()
try:
- bindings = session.query(services_models.ServicesBinding).\
- filter_by(service_id=service_id).\
- one()
+ bindings = (session.query(services_models.ServicesBinding).
+ filter_by(service_id=service_id).one())
return bindings
except exc.NoResultFound:
return []
LOG.debug("remove_services_binding() called")
session = db.get_session()
try:
- binding = session.query(services_models.ServicesBinding).\
- filter_by(service_id=service_id).\
- all()
+ binding = (session.query(services_models.ServicesBinding).
+ filter_by(service_id=service_id).all())
for bind in binding:
session.delete(bind)
session.flush()
self.sbnet_id = sbnet_id
def __repr__(self):
- return "<ServicesBinding (%s,%d)>" % \
- (self.service_id, self.mngnet_id, self.nbnet_id, self.sbnet_id)
+ return "<ServicesBinding (%s,%d)>" % (self.service_id, self.mngnet_id,
+ self.nbnet_id, self.sbnet_id)
LOG.debug("db get_all_portbindings() called")
session = db.get_session()
try:
- port_bindings = session.query(ucs_models.PortBinding).\
- all()
+ port_bindings = session.query(ucs_models.PortBinding).all()
return port_bindings
except exc.NoResultFound:
return []
LOG.debug("get_portbinding() called")
session = db.get_session()
try:
- port_binding = session.query(ucs_models.PortBinding).\
- filter_by(port_id=port_id).\
- one()
+ port_binding = (session.query(ucs_models.PortBinding).
+ filter_by(port_id=port_id).one())
return port_binding
except exc.NoResultFound:
raise c_exc.PortVnicNotFound(port_id=port_id)
def add_portbinding(port_id, blade_intf_dn, portprofile_name,
- vlan_name, vlan_id, qos):
+ vlan_name, vlan_id, qos):
"""Adds a port binding"""
LOG.debug("add_portbinding() called")
session = db.get_session()
try:
- port_binding = session.query(ucs_models.PortBinding).\
- filter_by(port_id=port_id).\
- one()
+ port_binding = (session.query(ucs_models.PortBinding).
+ filter_by(port_id=port_id).one())
raise c_exc.PortVnicBindingAlreadyExists(port_id=port_id)
except exc.NoResultFound:
port_binding = ucs_models.PortBinding(port_id, blade_intf_dn,
LOG.debug("db remove_portbinding() called")
session = db.get_session()
try:
- port_binding = session.query(ucs_models.PortBinding).\
- filter_by(port_id=port_id).\
- one()
+ port_binding = (session.query(ucs_models.PortBinding).
+ filter_by(port_id=port_id).one())
session.delete(port_binding)
session.flush()
return port_binding
LOG.debug("db update_portbinding() called")
session = db.get_session()
try:
- port_binding = session.query(ucs_models.PortBinding).\
- filter_by(port_id=port_id).\
- one()
+ port_binding = (session.query(ucs_models.PortBinding).
+ filter_by(port_id=port_id).one())
if blade_intf_dn:
port_binding.blade_intf_dn = blade_intf_dn
if portprofile_name:
LOG.debug("db update_portbinding_instance_id() called")
session = db.get_session()
try:
- port_binding = session.query(ucs_models.PortBinding).\
- filter_by(port_id=port_id).\
- one()
+ port_binding = (session.query(ucs_models.PortBinding).
+ filter_by(port_id=port_id).one())
port_binding.instance_id = instance_id
session.merge(port_binding)
session.flush()
LOG.debug("db update_portbinding_vif_id() called")
session = db.get_session()
try:
- port_binding = session.query(ucs_models.PortBinding).\
- filter_by(port_id=port_id).\
- one()
+ port_binding = (session.query(ucs_models.PortBinding).
+ filter_by(port_id=port_id).one())
port_binding.vif_id = vif_id
session.merge(port_binding)
session.flush()
LOG.debug("get_portbinding_dn() called")
session = db.get_session()
try:
- port_binding = session.query(ucs_models.PortBinding).\
- filter_by(blade_intf_dn=blade_intf_dn).\
- one()
+ port_binding = (session.query(ucs_models.PortBinding).
+ filter_by(blade_intf_dn=blade_intf_dn).one())
return port_binding
except exc.NoResultFound:
return []
self.qos = qos
def __repr__(self):
- return "<PortProfile Binding(%s,%s,%s,%s,%s,%s)>" % \
- (self.port_id, self.blade_intf_dn, self.portprofile_name,
- self.vlan_name, self.vlan_id, self.qos)
+ return "<PortProfile Binding(%s,%s,%s,%s,%s,%s)>" % (
+ self.port_id, self.blade_intf_dn, self.portprofile_name,
+ self.vlan_name, self.vlan_id, self.qos)
portprofile = cdb.get_portprofile(tenant_id, portprofile_id)
except Exception:
raise cexc.PortProfileNotFound(tenant_id=tenant_id,
- portprofile_id=portprofile_id)
+ portprofile_id=portprofile_id)
cdb.remove_pp_binding(tenant_id, port_id, portprofile_id)
if args and isinstance(args[-1], dict):
kwargs.update(args.pop())
- return getattr(self._plugins[plugin_key], function_name)(*args,
- **kwargs)
+ return getattr(self._plugins[plugin_key],
+ function_name)(*args, **kwargs)
def get_all_networks(self, args):
"""Not implemented for this model"""
cfg.StrOpt('quantum_default_tenant_id',
default="default",
help='Default tenant id when creating quantum networks'),
- ]
+]
FLAGS = flags.FLAGS
cfg.StrOpt('quantum_default_tenant_id',
default="default",
help='Default tenant id when creating quantum networks'),
- ]
+]
FLAGS = flags.FLAGS
FLAGS.register_opts(quantum_opts)
from quantum.plugins.cisco.db import l2network_db as cdb
from quantum.plugins.cisco.l2network_segmentation_base import (
L2NetworkSegmentationMgrBase,
- )
+)
LOG = logging.getLogger(__name__)
"func": insert_inpath_service,
"args": ["tenant_id", "service_image_id", "management_net_name",
"northbound_net_name", "southbound_net_name"],
- },
+ },
"delete_service": {
"func": delete_service,
"args": ["tenant_id", "service_instance_id"],
- },
+ },
"connect_vm": {
"func": connect_vm,
"args": ["tenant_id", "vm_image_id", "service_instance_id"],
- },
+ },
"disconnect_vm": {
"func": disconnect_vm,
"args": ["vm_instance_id"],
- },
- }
+ },
+}
if __name__ == "__main__":
novatenant,
portprofile,
qos,
- )
+)
from quantum.extensions.extensions import (
ExtensionMiddleware,
PluginAwareExtensionManager,
- )
+)
from quantum.manager import QuantumManager
from quantum.openstack.common import jsonutils
from quantum.plugins.cisco.db import api as db
'portprofile': {
'portprofile_name': 'cisco_test_portprofile',
'qos_name': 'test-qos1',
- },
- }
+ },
+ }
self.tenant_id = "test_tenant"
self.network_name = "test_network"
options = {}
req_body1 = jsonutils.dumps(self.test_port_profile)
create_response1 = self.test_app.post(
self.profile_path, req_body1,
- content_type=self.contenttype)
+ content_type=self.contenttype
+ )
req_body2 = jsonutils.dumps({
'portprofile': {
'portprofile_name': 'cisco_test_portprofile2',
'qos_name': 'test-qos2',
- },
- })
+ },
+ })
create_response2 = self.test_app.post(
self.profile_path, req_body2,
content_type=self.contenttype)
'portprofile': {
'portprofile_name': 'cisco_rename_portprofile',
'qos_name': 'test-qos1',
- },
- }
+ },
+ }
rename_req_body = jsonutils.dumps(rename_port_profile)
rename_path_temp = (self.portprofile_path +
resp_body['portprofiles']['portprofile']['id'])
'portprofile': {
'portprofile_name': 'cisco_rename_portprofile',
'qos_name': 'test-qos1',
- },
- }
+ },
+ }
rename_req_body = jsonutils.dumps(rename_port_profile)
update_path_temp = self.portprofile_path + portprofile_id
update_path = str(update_path_temp)
'portprofile': {
'network-id': net_id,
'port-id': port_id,
- },
- }
+ },
+ }
req_assign_body = jsonutils.dumps(test_port_assign_data)
associate_path_temp = (
self.portprofile_path +
'portprofile': {
'network-id': '001',
'port-id': '1',
- },
- }
+ },
+ }
req_assign_body = jsonutils.dumps(test_port_assign_data)
associate_path = (self.portprofile_path +
portprofile_id +
'portprofile': {
'network-id': net_id,
'port-id': port_id,
- },
- }
+ },
+ }
req_assign_body = jsonutils.dumps(test_port_assign_data)
associate_path_temp = (self.portprofile_path +
resp_body['portprofiles']['portprofile']['id'] +
'project_id': 'demo',
'user_id': 'root',
'vif_id': '23432423',
- },
},
- }
+ },
+ }
self.test_associate_data = {
'novatenant': {
'instance_id': 1,
'instance_desc': {
'project_id': 'demo',
'user_id': 'root',
- },
},
- }
+ },
+ }
self._l2network_plugin = l2network_plugin.L2Network()
def test_schedule_host(self):
'qos_desc': {
'PPS': 50,
'TTL': 5,
- },
},
- }
+ },
+ }
self._l2network_plugin = l2network_plugin.L2Network()
def test_create_qos(self):
'qos_desc': {
'PPS': 50,
'TTL': 5,
- },
},
- })
+ },
+ })
create_resp2 = self.test_app.post(self.qos_path, req_body2,
content_type=self.contenttype)
index_response = self.test_app.get(self.qos_path)
'qos_desc': {
'PPS': 50,
'TTL': 5,
- },
},
- })
+ },
+ })
rename_path_temp = (self.qos_second_path +
resp_body['qoss']['qos']['id'])
rename_path = str(rename_path_temp)
self.assertEqual(200, rename_response.status_int)
rename_resp_dict = wsgi.Serializer().deserialize(rename_response.body,
self.contenttype)
- self.assertEqual(
- rename_resp_dict['qoss']['qos']['name'],
- 'cisco_rename_qos')
+ self.assertEqual(rename_resp_dict['qoss']['qos']['name'],
+ 'cisco_rename_qos')
self.tearDownQos(rename_path)
LOG.debug("test_update_qos - END")
'qos_desc': {
'PPS': 50,
'TTL': 5,
- },
},
- })
+ },
+ })
rename_path_temp = self.qos_second_path + qos_id
rename_path = str(rename_path_temp)
rename_response = self.test_app.put(rename_path, rename_req_body,
'qos_desc': {
'PPS': 50,
'TTL': 5,
- },
},
- })
+ },
+ })
index_response = self.test_app.post(self.qos_path, req_body,
content_type=self.contenttype)
resp_body = wsgi.Serializer().deserialize(index_response.body,
parent_resource = dict(member_name="tenant",
collection_name="extensions/csco/tenants")
- controller = credential.CredentialController(
- QuantumManager.get_plugin())
+ controller = credential.CredentialController(QuantumManager.
+ get_plugin())
res_ext = extensions.ResourceExtension('credentials', controller,
parent=parent_resource)
- self.test_app = setup_extensions_test_app(
- SimpleExtensionManager(res_ext))
+ self.test_app = setup_extensions_test_app(SimpleExtensionManager(
+ res_ext))
self.contenttype = 'application/json'
self.credential_path = '/extensions/csco/tenants/tt/credentials'
self.cred_second_path = '/extensions/csco/tenants/tt/credentials/'
'credential_name': 'cred8',
'user_name': 'newUser2',
'password': 'newPasswd1',
- },
- }
+ },
+ }
self._l2network_plugin = l2network_plugin.L2Network()
def test_list_credentials(self):
'credential_name': 'cred9',
'user_name': 'newUser2',
'password': 'newPasswd2',
- },
- })
+ },
+ })
create_response2 = self.test_app.post(
self.credential_path, req_body2,
content_type=self.contenttype)
'credential_name': 'cred3',
'user_name': 'RenamedUser',
'password': 'Renamedpassword',
- },
- })
+ },
+ })
rename_path_temp = (self.cred_second_path +
resp_body['credentials']['credential']['id'])
rename_path = str(rename_path_temp)
'credential_name': 'cred3',
'user_name': 'RenamedUser',
'password': 'Renamedpassword',
- },
- })
+ },
+ })
rename_path_temp = self.cred_second_path + credential_id
rename_path = str(rename_path_temp)
rename_response = self.test_app.put(rename_path, rename_req_body,
'net_id_list': '1',
'status': 'test-qos1',
'ports_desc': 'Port Descr',
- },
- }
+ },
+ }
self.tenant_id = "test_tenant"
self.network_name = "test_network"
options = {}
'status': 'ACTIVE',
'ports_desc': {
'key': 'value',
- },
},
- }
+ },
+ }
req_body = jsonutils.dumps(test_multi_port)
index_response = self.test_app.post(self.multiport_path, req_body,
content_type=self.contenttype)
return bindings
except Exception, exc:
raise Exception("Failed to delete nexus port binding: %s"
- % str(exc))
+ % str(exc))
def update_nexusport_binding(self, port_id, new_vlan_id):
"""update nexus port binding"""
LOG.debug("Deleted service binding: %s" % res.service_id)
except Exception, exc:
raise Exception("Failed to delete service binding: %s"
- % str(exc))
+ % str(exc))
class L2networkDB(object):
"""Update a vlan binding"""
try:
res = l2network_db.update_vlan_binding(network_id, vlan_id,
- vlan_name)
+ vlan_name)
LOG.debug("Updating vlan binding for vlan: %s" % res.vlan_id)
vlan_dict = {}
vlan_dict["vlan-id"] = str(res.vlan_id)
net1 = self.dbtest.create_network(self.tenant_id, "plugin_test1")
self.assertTrue(net1["net-name"] == "plugin_test1")
net = self.dbtest.update_network(self.tenant_id, net1["net-id"],
- name="plugin_test1_renamed")
+ name="plugin_test1_renamed")
self.assertTrue(net["net-name"] == "plugin_test1_renamed")
self.teardown_network_port()
tenant_id, net_id)
LOG.debug("test_delete_network_not_found - END")
- def test_delete_networkInUse(
- self, tenant_id='test_tenant', instance_tenant_id='nova',
- nova_user_id='novaadmin', instance_id=10,
- vif_id='fe701ddf-26a2-42ea-b9e6-7313d1c522cc'):
+ def test_delete_networkInUse(self, tenant_id='test_tenant',
+ instance_tenant_id='nova',
+ nova_user_id='novaadmin', instance_id=10,
+ vif_id='fe701ddf-26a2-'
+ '42ea-b9e6-7313d1c522cc'):
"""
Tests deletion of a Virtual Network when Network is in Use.
"""
LOG.debug("test_plug_interface - START")
- new_net_dict = self._l2network_plugin.create_network(
- tenant_id, self.network_name)
- port_dict = self._l2network_plugin.create_port(
- tenant_id, new_net_dict[const.NET_ID], self.state)
+ new_net_dict = self._l2network_plugin.create_network(tenant_id,
+ self.network_name)
+ port_dict = self._l2network_plugin.create_port(tenant_id,
+ new_net_dict[const.
+ NET_ID], self.state)
instance_desc = {'project_id': tenant_id,
'user_id': nova_user_id}
host_list = self._l2network_plugin.schedule_host(instance_tenant_id,
self.tearDownNetwork(tenant_id, new_net_dict[const.NET_ID])
LOG.debug("test_plug_interface_portDNE - END")
- def test_plug_interface_portInUse(
- self, tenant_id='test_tenant', instance_tenant_id='nova',
- nova_user_id='novaadmin', instance_id=10,
- vif_id='fe701ddf-26a2-42ea-b9e6-7313d1c522cc',
- remote_interface='new_interface'):
+ def test_plug_interface_portInUse(self, tenant_id='test_tenant',
+ instance_tenant_id='nova',
+ nova_user_id='novaadmin',
+ instance_id=10,
+ vif_id='fe701ddf-26a2-42ea-'
+ 'b9e6-7313d1c522cc',
+ remote_interface='new_interface'):
"""
Tests attachment of new interface to the port when there is an
existing attachment
LOG.debug("test_plug_interface_portInUse - END")
- def test_unplug_interface(
- self, tenant_id='test_tenant', instance_tenant_id='nova',
- nova_user_id='novaadmin', instance_id=10,
- vif_id='fe701ddf-26a2-42ea-b9e6-7313d1c522cc'):
+ def test_unplug_interface(self, tenant_id='test_tenant',
+ instance_tenant_id='nova',
+ nova_user_id='novaadmin', instance_id=10,
+ vif_id='fe701ddf-26a2-42ea-b9e6-7313d1c522cc'):
"""
Tests detaachment of an interface to a port
"""
port_profile_dict = self._l2network_plugin.create_portprofile(
tenant_id, self.profile_name, self.qos)
port_profile_id = port_profile_dict['profile_id']
- new_net_dict = self._l2network_plugin.create_network(
- tenant_id, 'test_network')
+ new_net_dict = self._l2network_plugin.create_network(tenant_id,
+ 'test_network')
port_dict = self._l2network_plugin.create_port(
tenant_id, new_net_dict[const.NET_ID],
const.PORT_UP)
LOG.debug("test_disassociate_portprofile - END")
def test_disassociate_portprofileDNE(self, tenant_id='test_tenant',
- net_id='0005', port_id='p00005', profile_id='pr0005'):
+ net_id='0005', port_id='p00005',
+ profile_id='pr0005'):
"""
Tests disassociation of a port-profile when network does not exist
"""
self.tearDownNetwork(tenant_id, network_dict_id)
def tearDownNetworkPortInterface(self, tenant_id, instance_tenant_id,
- instance_id, instance_desc, network_dict_id,
- port_id):
+ instance_id, instance_desc,
+ network_dict_id, port_id):
"""
Tear down Network Port Interface
"""
const.PROFILE_ASSOCIATIONS: profile_associations,
const.PROFILE_VLAN_ID: None,
const.PROFILE_QOS: qos,
- }
+ }
return res
def _make_portprofile_assc_list(self, tenant_id, profile_id):
self.net_id,
vlan_name(self.net_id),
vlan_id,
- ])
+ ])
cdb.add_vlan_binding(vlan_id, vlan_name(self.net_id), self.net_id)
for network in networks:
tenant_id,
self.net_id,
{'name': new_net_name},
- ])
+ ])
for network in networks:
self.assertEqual(network[const.NET_ID], self.net_id)
LOG = logging.getLogger('quantum.tests.test_ucs_driver')
-CREATE_VLAN_OUTPUT = ("<configConfMos cookie=\"cookie_placeholder\" "
-"inHierarchical=\"true\"> <inConfigs><pair key=\"fabric/lan/net-New Vlan\"> "
-"<fabricVlan defaultNet=\"no\" dn=\"fabric/lan/net-New Vlan\" id=\"200\" "
-"name=\"New Vlan\" status=\"created\"></fabricVlan> </pair> </inConfigs> "
-"</configConfMos>")
-
-CREATE_PROFILE_OUTPUT = ("<configConfMos cookie=\"cookie_placeholder\" "
-"inHierarchical=\"true\"> <inConfigs><pair key=\"fabric/lan/profiles/vnic-"
-"New Profile\"> <vnicProfile descr=\"Profile created by Cisco OpenStack "
-"Quantum Plugin\" dn=\"fabric/lan/profiles/vnic-New Profile\" maxPorts="
-"\"64\" name=\"New Profile\" nwCtrlPolicyName=\"\" pinToGroupName=\"\" "
-"qosPolicyName=\"\" status=\"created\"> <vnicEtherIf defaultNet=\"yes\" "
-"name=\"New Vlan\" rn=\"if-New Vlan\" > </vnicEtherIf> </vnicProfile> "
-"</pair> </inConfigs> </configConfMos>")
-
-CHANGE_VLAN_OUTPUT = ("<configConfMos cookie=\"cookie_placeholder\" "
-"inHierarchical=\"true\"> <inConfigs><pair key=\""
-"fabric/lan/profiles/vnic-New Profile\"> <vnicProfile descr=\"Profile "
-"created by Cisco OpenStack Quantum Plugin\" "
-"dn=\"fabric/lan/profiles/vnic-New Profile\" maxPorts=\"64\" "
-"name=\"New Profile\" nwCtrlPolicyName=\"\" pinToGroupName=\"\" "
-"qosPolicyName=\"\" status=\"created,modified\"><vnicEtherIf "
-"rn=\"if-Old Vlan\" status=\"deleted\"> </vnicEtherIf> "
-"<vnicEtherIf defaultNet=\"yes\" name=\"New Vlan\" rn=\"if-New Vlan\" > "
-"</vnicEtherIf> </vnicProfile> </pair></inConfigs> </configConfMos>")
-
-DELETE_VLAN_OUTPUT = ("<configConfMos cookie=\"cookie_placeholder\" "
-"inHierarchical=\"true\"> <inConfigs><pair key=\"fabric/lan/net-New Vlan\"> "
-"<fabricVlan dn=\"fabric/lan/net-New Vlan\" status=\"deleted\"> "
-"</fabricVlan> </pair> </inConfigs></configConfMos>")
-
-DELETE_PROFILE_OUTPUT = ("<configConfMos cookie=\"cookie_placeholder\" "
-"inHierarchical=\"false\"> <inConfigs><pair key=\""
-"fabric/lan/profiles/vnic-New Profile\"> <vnicProfile "
-"dn=\"fabric/lan/profiles/vnic-New Profile\" status=\"deleted\"> "
-"</vnicProfile></pair> </inConfigs> </configConfMos>")
-
-ASSOCIATE_PROFILE_OUTPUT = ("<configConfMos cookie=\"cookie_placeholder\" "
-"inHierarchical=\"true\"> <inConfigs> <pair key="
-"\"fabric/lan/profiles/vnic-New Profile/cl-New Profile Client\">"
-" <vmVnicProfCl dcName=\".*\" descr=\"\" dn=\"fabric/lan/profiles/vnic-"
-"New Profile/cl-New Profile Client\"name=\"New Profile Client\" "
-"orgPath=\".*\" status=\"created\" swName=\"default$\"> </vmVnicProfCl>"
-"</pair> </inConfigs> </configConfMos>")
+CREATE_VLAN_OUTPUT = ('<configConfMos cookie="cookie_placeholder" '
+ 'inHierarchical="true"> <inConfigs><pair '
+ 'key="fabric/lan/net-New Vlan"> '
+ '<fabricVlan defaultNet="no" '
+ 'dn="fabric/lan/net-New Vlan" id="200" '
+ 'name="New Vlan" status="created"></fabricVlan> '
+ '</pair> </inConfigs> </configConfMos>')
+
+CREATE_PROFILE_OUTPUT = ('<configConfMos cookie="cookie_placeholder" '
+ 'inHierarchical="true"> <inConfigs><pair '
+ 'key="fabric/lan/profiles/vnic-'
+ 'New Profile"> <vnicProfile descr="Profile created '
+ 'by Cisco OpenStack Quantum Plugin" '
+ 'dn="fabric/lan/profiles/vnic-New Profile" maxPorts='
+ '"64" name="New Profile" nwCtrlPolicyName="" '
+ 'pinToGroupName="" qosPolicyName="" status="created">'
+ ' <vnicEtherIf defaultNet="yes" name="New Vlan" '
+ 'rn="if-New Vlan" > </vnicEtherIf> </vnicProfile> '
+ '</pair> </inConfigs> </configConfMos>')
+
+CHANGE_VLAN_OUTPUT = ('<configConfMos cookie="cookie_placeholder" '
+ 'inHierarchical="true"> <inConfigs><pair key="'
+ 'fabric/lan/profiles/vnic-New Profile"> '
+ '<vnicProfile descr="Profile '
+ 'created by Cisco OpenStack Quantum Plugin" '
+ 'dn="fabric/lan/profiles/vnic-New Profile" maxPorts="64"'
+ ' name="New Profile" nwCtrlPolicyName="" '
+ 'pinToGroupName="" qosPolicyName="" '
+ 'status="created,modified"><vnicEtherIf '
+ 'rn="if-Old Vlan" status="deleted"> </vnicEtherIf> '
+ '<vnicEtherIf defaultNet="yes" name="New Vlan" '
+ 'rn="if-New Vlan" > </vnicEtherIf> </vnicProfile> '
+ '</pair></inConfigs> </configConfMos>')
+
+DELETE_VLAN_OUTPUT = ('<configConfMos cookie="cookie_placeholder" '
+ 'inHierarchical="true"> <inConfigs><pair '
+ 'key="fabric/lan/net-New Vlan"> '
+ '<fabricVlan dn="fabric/lan/net-New Vlan" '
+ 'status="deleted"> </fabricVlan> '
+ '</pair> </inConfigs></configConfMos>')
+
+DELETE_PROFILE_OUTPUT = ('<configConfMos cookie="cookie_placeholder" '
+ 'inHierarchical="false"> <inConfigs><pair key="'
+ 'fabric/lan/profiles/vnic-New Profile"> <vnicProfile '
+ 'dn="fabric/lan/profiles/vnic-New Profile" '
+ 'status="deleted"> </vnicProfile></pair> '
+ '</inConfigs> </configConfMos>')
+
+ASSOCIATE_PROFILE_OUTPUT = ('<configConfMos cookie="cookie_placeholder" '
+ 'inHierarchical="true"> <inConfigs> <pair key='
+ '"fabric/lan/profiles/vnic-New Profile/cl-New '
+ 'Profile Client"> <vmVnicProfCl dcName=".*" '
+ 'descr="" dn="fabric/lan/profiles/vnic-'
+ 'New Profile/cl-New Profile Client"name="New '
+ 'Profile Client" orgPath=".*" status="created" '
+ 'swName="default$"> </vmVnicProfCl>'
+ '</pair> </inConfigs> </configConfMos>')
class TestUCSDriver(unittest.TestCase):
self.assertEqual(vlan_details, expected_output)
LOG.debug("test_create_vlan - END")
- def test_create_profile_post_data(
- self, expected_output=CREATE_PROFILE_OUTPUT):
+ def test_create_profile_post_data(self,
+ expected_output=CREATE_PROFILE_OUTPUT):
"""
Tests creation of profile post Data
"""
self.assertEqual(profile_details, expected_output)
LOG.debug("test_create_profile_post - END")
- def test_change_vlan_profile_data(
- self, expected_output=CHANGE_VLAN_OUTPUT):
+ def test_change_vlan_profile_data(self,
+ expected_output=CHANGE_VLAN_OUTPUT):
"""
Tests creation of change vlan in profile post Data
"""
self.assertEqual(profile_delete_details, expected_output)
LOG.debug("test_create_profile_post - END")
- def test_create_profile_client_data(
- self, expected_output=ASSOCIATE_PROFILE_OUTPUT):
+ def test_create_profile_client_data(self,
+ expected_output=
+ ASSOCIATE_PROFILE_OUTPUT):
"""
Tests creation of profile client post Data
"""
from quantum.plugins.cisco import l2network_plugin_configuration as conf
from quantum.plugins.cisco.segmentation.l2network_vlan_mgr import (
L2NetworkVLANMgr,
- )
+)
logging.basicConfig(level=logging.WARN)
from quantum.common import exceptions as exc
from quantum.plugins.cisco.l2device_inventory_base import (
L2NetworkDeviceInventoryBase,
- )
+)
from quantum.plugins.cisco.common import cisco_constants as const
from quantum.plugins.cisco.common import cisco_credentials as cred
from quantum.plugins.cisco.common import cisco_exceptions as cexc
from quantum.plugins.cisco.db import ucs_db as udb
from quantum.plugins.cisco.ucs import (
cisco_ucs_inventory_configuration as conf,
- )
+)
from quantum.plugins.cisco.ucs import cisco_ucs_network_driver
# need to change it, and also load the state from the DB for
# other associations
intf_data = blade_intf_data[blade_intf]
- if (intf_data[const.BLADE_INTF_RESERVATION] ==
- const.BLADE_INTF_UNRESERVED):
+ if ((intf_data[const.BLADE_INTF_RESERVATION] == const.
+ BLADE_INTF_UNRESERVED)):
unreserved_counter -= 1
intf_data[const.BLADE_INTF_RESERVATION] = (
const.BLADE_INTF_RESERVED)
dynamicnic_details = {
const.DEVICENAME: device_name,
const.UCSPROFILE: profile_name,
- }
+ }
LOG.debug(("Found reserved dynamic nic: %s"
"associated with port %s") %
(intf_data, port_id))
blade_data = ucsm[chassis_id][blade_id]
blade_intf_data = blade_data[const.BLADE_INTF_DATA]
for blade_intf in blade_intf_data.keys():
- if (not blade_intf_data[blade_intf][const.PORTID] or
- not blade_intf_data[blade_intf][const.TENANTID]):
+ if ((not blade_intf_data[blade_intf][const.PORTID] or
+ not blade_intf_data[blade_intf][const.TENANTID])):
continue
intf_data = blade_intf_data[blade_intf]
if (intf_data[const.BLADE_INTF_RESERVATION] ==
for chassis_id in ucsm.keys():
for blade_id in ucsm[chassis_id]:
blade_data = ucsm[chassis_id][blade_id]
- if (blade_data[const.BLADE_UNRESERVED_INTF_COUNT] >
- unreserved_interface_count):
+ if ((blade_data[const.BLADE_UNRESERVED_INTF_COUNT] >
+ unreserved_interface_count)):
unreserved_interface_count = (
blade_data[const.BLADE_UNRESERVED_INTF_COUNT])
least_reserved_blade_ucsm = ucsm_ip
const.LEAST_RSVD_BLADE_CHASSIS: least_reserved_blade_chassis,
const.LEAST_RSVD_BLADE_ID: least_reserved_blade_id,
const.LEAST_RSVD_BLADE_DATA: least_reserved_blade_data,
- }
+ }
LOG.debug("Found dynamic nic %s available for reservation",
least_reserved_blade_dict)
return least_reserved_blade_dict
const.RESERVED_NIC_HOSTNAME: host_name,
const.RESERVED_NIC_NAME: dev_eth_name,
const.BLADE_INTF_DN: blade_intf,
- }
+ }
port_binding = udb.add_portbinding(port_id, blade_intf, None,
None, None, None)
udb.update_portbinding(port_id,
const.DEVICE_IP: [ucsm_ip],
const.UCS_INVENTORY: self,
const.LEAST_RSVD_BLADE_DICT: least_reserved_blade_dict,
- }
+ }
return device_params
def delete_port(self, args):
rsvd_info = self._get_rsvd_blade_intf_by_port(tenant_id, port_id)
if not rsvd_info:
LOG.warn("UCSInventory: Port not found: net_id: %s, port_id: %s" %
- (net_id, port_id))
+ (net_id, port_id))
return {const.DEVICE_IP: []}
device_params = {
const.DEVICE_IP: [rsvd_info[const.UCSM_IP]],
const.CHASSIS_ID: rsvd_info[const.CHASSIS_ID],
const.BLADE_ID: rsvd_info[const.BLADE_ID],
const.BLADE_INTF_DN: rsvd_info[const.BLADE_INTF_DN],
- }
+ }
return device_params
def update_port(self, args):
const.UCS_INVENTORY: self,
const.LEAST_RSVD_BLADE_DICT:
least_reserved_blade_dict,
- }
+ }
return device_params
METHOD = "POST"
URL = "/nuova"
-CREATE_VLAN = "<configConfMos cookie=\"" + COOKIE_VALUE + \
-"\" inHierarchical=\"true\"> <inConfigs>" \
-"<pair key=\"fabric/lan/net-" + VLAN_NAME + \
-"\"> <fabricVlan defaultNet=\"no\" " \
-"dn=\"fabric/lan/net-" + VLAN_NAME + \
-"\" id=\"" + VLAN_ID + "\" name=\"" + \
-VLAN_NAME + "\" status=\"created\">" \
-"</fabricVlan> </pair> </inConfigs> </configConfMos>"
-
-CREATE_PROFILE = "<configConfMos cookie=\"" + COOKIE_VALUE + \
-"\" inHierarchical=\"true\"> <inConfigs>" \
-"<pair key=\"fabric/lan/profiles/vnic-" + PROFILE_NAME + \
-"\"> <vnicProfile descr=\"Profile created by " \
-"Cisco OpenStack Quantum Plugin\" " \
-"dn=\"fabric/lan/profiles/vnic-" + PROFILE_NAME + \
-"\" maxPorts=\"64\" name=\"" + PROFILE_NAME + \
-"\" nwCtrlPolicyName=\"\" pinToGroupName=\"\" " \
-"qosPolicyName=\"\" status=\"created\"> " \
-"<vnicEtherIf defaultNet=\"yes\" name=\"" + VLAN_NAME + \
-"\" rn=\"if-" + VLAN_NAME + "\" > </vnicEtherIf> " \
-"</vnicProfile> </pair> </inConfigs> </configConfMos>"
-
-ASSOCIATE_PROFILE = "<configConfMos cookie=\"" + COOKIE_VALUE + \
-"\" inHierarchical=\"true\"> <inConfigs> <pair " \
-"key=\"fabric/lan/profiles/vnic-" + PROFILE_NAME + \
-"/cl-" + PROFILE_CLIENT + "\"> <vmVnicProfCl dcName=\".*\" " \
-"descr=\"\" dn=\"fabric/lan/profiles/vnic-" + \
-PROFILE_NAME + "/cl-" + PROFILE_CLIENT + \
-"\"name=\"" + PROFILE_CLIENT + "\" orgPath=\".*\" " \
-"status=\"created\" swName=\"default$\"> </vmVnicProfCl>" \
-"</pair> </inConfigs> </configConfMos>"
-
-CHANGE_VLAN_IN_PROFILE = "<configConfMos cookie=\"" + COOKIE_VALUE + \
-"\" inHierarchical=\"true\"> <inConfigs>" \
-"<pair key=\"fabric/lan/profiles/vnic-" + \
-PROFILE_NAME + "\"> <vnicProfile descr=\"Profile " \
-"created by Cisco OpenStack Quantum Plugin\" " \
-"dn=\"fabric/lan/profiles/vnic-" + \
-PROFILE_NAME + "\" maxPorts=\"64\" name=\"" + \
-PROFILE_NAME + "\" nwCtrlPolicyName=\"\" " \
-"pinToGroupName=\"\" qosPolicyName=\"\" " \
-"status=\"created,modified\">" \
-"<vnicEtherIf rn=\"if-" + OLD_VLAN_NAME + \
-"\" status=\"deleted\"> </vnicEtherIf> <vnicEtherIf " \
-"defaultNet=\"yes\" name=\"" + \
-VLAN_NAME + "\" rn=\"if-" + VLAN_NAME + \
-"\" > </vnicEtherIf> </vnicProfile> </pair>" \
-"</inConfigs> </configConfMos>"
-
-DELETE_VLAN = "<configConfMos cookie=\"" + COOKIE_VALUE + \
-"\" inHierarchical=\"true\"> <inConfigs>" \
-"<pair key=\"fabric/lan/net-" + VLAN_NAME + \
-"\"> <fabricVlan dn=\"fabric/lan/net-" + VLAN_NAME + \
-"\" status=\"deleted\"> </fabricVlan> </pair> </inConfigs>" \
-"</configConfMos>"
-
-DELETE_PROFILE = "<configConfMos cookie=\"" + COOKIE_VALUE + \
-"\" inHierarchical=\"false\"> <inConfigs>" \
-"<pair key=\"fabric/lan/profiles/vnic-" + PROFILE_NAME + \
-"\"> <vnicProfile dn=\"fabric/lan/profiles/vnic-" + \
-PROFILE_NAME + "\" status=\"deleted\"> </vnicProfile>" \
-"</pair> </inConfigs> </configConfMos>"
-
-GET_BLADE_INTERFACE_STATE = "<configScope cookie=\"" + COOKIE_VALUE + \
- "\" dn=\"" + BLADE_DN_VALUE + "\" inClass=\"dcxVIf\" " + \
- "inHierarchical=\"false\" inRecursive=\"false\"> " + \
- "<inFilter> </inFilter> </configScope>"
-
-GET_BLADE_INTERFACE = "<configResolveClass cookie=\"" + COOKIE_VALUE + \
- "\" classId=\"vnicEther\"" + \
- " inHierarchical=\"false\">" + \
- " <inFilter> <eq class=\"vnicEther\" property=\"equipmentDn\"" + \
- " value=\"sys/chassis-" + CHASSIS_VALUE + "/blade-" + \
- BLADE_VALUE + "/adaptor-1/host-eth-?\"/> " + \
- "</inFilter> </configResolveClass>"
+CREATE_VLAN = ('<configConfMos cookie="' + COOKIE_VALUE +
+ '" inHierarchical="true"> <inConfigs>'
+ '<pair key="fabric/lan/net-"' + VLAN_NAME +
+ '"> <fabricVlan defaultNet="no" '
+ 'dn="fabric/lan/net-' + VLAN_NAME +
+ '" id="' + VLAN_ID + '" name="' +
+ VLAN_NAME + '" status="created">'
+ '</fabricVlan> </pair> </inConfigs> </configConfMos>')
+
+CREATE_PROFILE = ('<configConfMos cookie="' + COOKIE_VALUE +
+ '" inHierarchical="true"> <inConfigs>'
+ '<pair key="fabric/lan/profiles/vnic-' + PROFILE_NAME +
+ '"> <vnicProfile descr="Profile created by '
+ 'Cisco OpenStack Quantum Plugin" '
+ 'dn="fabric/lan/profiles/vnic' + PROFILE_NAME +
+ '" maxPorts="64" name="' + PROFILE_NAME +
+ '" nwCtrlPolicyName="" pinToGroupName="" '
+ 'qosPolicyName="" status="created"> '
+ '<vnicEtherIf defaultNet="yes" name="' + VLAN_NAME +
+ '" rn="if' + VLAN_NAME + '" > </vnicEtherIf> '
+ '</vnicProfile> </pair> </inConfigs> </configConfMos>')
+
+ASSOCIATE_PROFILE = ('<configConfMos cookie="' + COOKIE_VALUE +
+ '" inHierarchical="true"> <inConfigs> <pair '
+ 'key="fabric/lan/profiles/vnic' + PROFILE_NAME +
+ '/cl' + PROFILE_CLIENT + '"> <vmVnicProfCl dcName=".*" '
+ 'descr="" dn="fabric/lan/profiles/vnic' +
+ PROFILE_NAME + '/cl' + PROFILE_CLIENT +
+ '"name="' + PROFILE_CLIENT + '" orgPath=".*" '
+ 'status="created" swName="default$"> </vmVnicProfCl>'
+ '</pair> </inConfigs> </configConfMos>')
+
+CHANGE_VLAN_IN_PROFILE = ('<configConfMos cookie="' + COOKIE_VALUE +
+ '" inHierarchical="true"> <inConfigs'
+ '<pair key="fabric/lan/profiles/vnic' +
+ PROFILE_NAME + '"> <vnicProfile descr="Profile'
+ 'created by Cisco OpenStack Quantum Plugin"'
+ 'dn="fabric/lan/profiles/vnic' +
+ PROFILE_NAME + '" maxPorts="64" name="' +
+ PROFILE_NAME + '" nwCtrlPolicyName=""'
+ 'pinToGroupName="" qosPolicyName=""'
+ 'status="created,modified"'
+ '<vnicEtherIf rn="if' + OLD_VLAN_NAME +
+ '" status="deleted"> </vnicEtherIf> <vnicEtherIf'
+ 'defaultNet="yes" name="' +
+ VLAN_NAME + '" rn="if' + VLAN_NAME +
+ '" > </vnicEtherIf> </vnicProfile> </pair'
+ '</inConfigs> </configConfMos>')
+
+DELETE_VLAN = ('<configConfMos cookie="' + COOKIE_VALUE +
+ '" inHierarchical="true"> <inConfigs'
+ '<pair key="fabric/lan/net' + VLAN_NAME +
+ '"> <fabricVlan dn="fabric/lan/net' + VLAN_NAME +
+ '" status="deleted"> </fabricVlan> </pair> </inConfigs'
+ '</configConfMos')
+
+DELETE_PROFILE = ('<configConfMos cookie="' + COOKIE_VALUE +
+ '" inHierarchical="false"> <inConfigs'
+ '<pair key="fabric/lan/profiles/vnic' + PROFILE_NAME +
+ '"> <vnicProfile dn="fabric/lan/profiles/vnic' +
+ PROFILE_NAME + '" status="deleted"> </vnicProfile'
+ '</pair> </inConfigs> </configConfMos')
+
+GET_BLADE_INTERFACE_STATE = ('<configScope cookie="' + COOKIE_VALUE +
+ '" dn="' + BLADE_DN_VALUE + '" inClass="dcxVIf"' +
+ 'inHierarchical="false" inRecursive="false">' +
+ '<inFilter> </inFilter> </configScope')
+
+GET_BLADE_INTERFACE = ('<configResolveClass cookie="' + COOKIE_VALUE +
+ '" classId="vnicEther"' +
+ ' inHierarchical="false"' +
+ ' <inFilter> <eq class="vnicEther" ' +
+ 'property="equipmentDn"' +
+ ' value="sys/chassis' + CHASSIS_VALUE + '/blade' +
+ BLADE_VALUE + '/adaptor-1/host-eth-?"/>' +
+ '</inFilter> </configResolveClass')
# TODO (Sumit): Assumes "adaptor-1", check if this has to be discovered too
-GET_BLADE_INTERFACES = "<configResolveChildren cookie=\"" + \
- COOKIE_VALUE + "\" inDn=\"sys/chassis-" + \
- CHASSIS_VALUE + "/blade-" + BLADE_VALUE + \
- "/adaptor-1\"" + \
- " inHierarchical=\"false\"> <inFilter> </inFilter>" + \
- " </configResolveChildren>"
+GET_BLADE_INTERFACES = ('<configResolveChildren cookie="' +
+ COOKIE_VALUE + '" inDn="sys/chassis' +
+ CHASSIS_VALUE + '/blade' + BLADE_VALUE +
+ '/adaptor-1"' +
+ ' inHierarchical="false"> <inFilter> </inFilter' +
+ ' </configResolveChildren')
class CiscoUCSMDriver():
response = self._post_data(ucsm_ip, ucsm_username, ucsm_password, data)
elements = (
et.XML(response).find("outConfigs").findall("adaptorHostEthIf")
- )
+ )
blade_interfaces = {}
for element in elements:
dist_name = element.get("dn", default=None)
const.BLADE_INTF_INST_TYPE: None,
const.BLADE_INTF_RHEL_DEVICE_NAME:
self._get_rhel_device_name(order),
- }
+ }
blade_interfaces[dist_name] = blade_interface
return blade_interfaces
def _get_blade_interface_state(self, blade_intf, ucsm_ip,
- ucsm_username, ucsm_password):
+ ucsm_username, ucsm_password):
"""Create command"""
- data = (
- self._get_blade_intf_st_post_data(blade_intf[const.BLADE_INTF_DN])
- )
+ data = (self._get_blade_intf_st_post_data(
+ blade_intf[const.BLADE_INTF_DN]))
response = self._post_data(ucsm_ip, ucsm_username, ucsm_password, data)
elements = et.XML(response).find("outConfigs").findall("dcxVIf")
for element in elements:
self._get_blade_interface_state(blade_interfaces[blade_intf],
ucsm_ip, ucsm_username,
ucsm_password)
- if blade_interfaces[blade_intf][const.BLADE_INTF_INST_TYPE] != \
- const.BLADE_INTF_DYNAMIC:
+ if ((blade_interfaces[blade_intf][const.BLADE_INTF_INST_TYPE] !=
+ const.BLADE_INTF_DYNAMIC)):
blade_interfaces.pop(blade_intf)
return blade_interfaces
return udb.update_portbinding(port_id, vlan_name=new_vlan_name,
vlan_id=conf.DEFAULT_VLAN_ID)
- def create_multiport(self, tenant_id, net_id_list, ports_num, port_id_list,
- **kwargs):
+ def create_multiport(self, tenant_id, net_id_list, ports_num,
+ port_id_list, **kwargs):
"""
Creates a port on the specified Virtual Network.
"""
const.NET_NAME: net_name,
const.NET_PORTS: [],
const.NET_OP_STATUS: new_network[const.OPSTATUS],
- }
+ }
return new_net_dict
def delete_network(self, tenant_id, net_id):
LOG.debug("Creating subinterface %s for VLAN %s on interface %s" %
(interface, vlan_id, self.physical_interface))
if utils.execute(['ip', 'link', 'add', 'link',
- self.physical_interface,
- 'name', interface, 'type', 'vlan', 'id',
- vlan_id], root_wrapper=self.root_helper):
+ self.physical_interface,
+ 'name', interface, 'type', 'vlan', 'id',
+ vlan_id], root_wrapper=self.root_helper):
return
if utils.execute(['ip', 'link', 'set',
- interface, 'up'], root_wrapper=self.root_helper):
+ interface, 'up'], root_wrapper=self.root_helper):
return
LOG.debug("Done creating subinterface %s" % interface)
return interface
root_wrapper=self.root_helper):
return
if utils.execute(['brctl', 'setfd', bridge_name,
- str(0)], root_wrapper=self.root_helper):
+ str(0)], root_wrapper=self.root_helper):
return
if utils.execute(['brctl', 'stp', bridge_name,
- 'off'], root_wrapper=self.root_helper):
+ 'off'], root_wrapper=self.root_helper):
return
if utils.execute(['ip', 'link', 'set', bridge_name,
- 'up'], root_wrapper=self.root_helper):
+ 'up'], root_wrapper=self.root_helper):
return
LOG.debug("Done starting bridge %s for subinterface %s" %
(bridge_name, interface))
bridge_name))
if current_bridge_name:
if utils.execute(['brctl', 'delif', current_bridge_name,
- tap_device_name], root_wrapper=self.root_helper):
+ tap_device_name], root_wrapper=self.root_helper):
return False
self.ensure_vlan_bridge(network_id, vlan_id)
usagestr = "%prog [OPTIONS] <config file>"
parser = OptionParser(usage=usagestr)
parser.add_option("-v", "--verbose", dest="verbose",
- action="store_true", default=False, help="turn on verbose logging")
+ action="store_true", default=False,
+ help="turn on verbose logging")
options, args = parser.parse_args()
const.NET_ID: net_id,
const.NET_NAME: net_name,
const.NET_OP_STATUS: op_status,
- }
+ }
if ports:
res[const.NET_PORTS] = ports
return res
const.PORT_OP_STATUS: op_status,
const.NET_ID: port[const.NETWORKID],
const.ATTACHMENT: port[const.INTERFACEID],
- }
+ }
LOG = logging.getLogger(__name__)
CONF_FILE = find_config_file({'plugin': 'linuxbridge'}, None,
- "linuxbridge_conf.ini")
+ "linuxbridge_conf.ini")
CONF = config.parse(CONF_FILE)
start = CONF.VLANS.vlan_start
end = CONF.VLANS.vlan_end
try:
- vlanid = session.query(l2network_models.VlanID).\
- one()
+ vlanid = session.query(l2network_models.VlanID).one()
except exc.MultipleResultsFound:
"""
TODO (Sumit): Salvatore rightly points out that this will not handle
from quantum.plugins.linuxbridge import LinuxBridgePlugin
from quantum.plugins.linuxbridge.agent import (
linuxbridge_quantum_agent as linux_agent,
- )
+)
from quantum.plugins.linuxbridge.common import constants as lconst
from quantum.plugins.linuxbridge.db import l2network_db as cdb
class LinuxBridgeAgentTest(unittest.TestCase):
- def test_add_gateway_interface(
- self, tenant_id="test_tenant", network_name="test_network",
- interface_id='fe701ddf-26a2-42ea-b9e6-7313d1c522cc',
- mac_address='fe:16:3e:51:60:dd'):
+ def test_add_gateway_interface(self, tenant_id="test_tenant",
+ network_name="test_network",
+ interface_id='fe701ddf-26a2-42ea-'
+ 'b9e6-7313d1c522cc',
+ mac_address='fe:16:3e:51:60:dd'):
LOG.debug("test_tap_gateway_interface - START")
new_network = (
LOG.debug("test_add_gateway_interface - END")
- def test_add_tap_interface(
- self, tenant_id="test_tenant", network_name="test_network",
- interface_id='fe701ddf-26a2-42ea-b9e6-7313d1c522cc',
- mac_address='fe:16:3e:51:60:dd'):
+ def test_add_tap_interface(self, tenant_id="test_tenant",
+ network_name="test_network",
+ interface_id='fe701ddf-26a2-42ea-'
+ 'b9e6-7313d1c522cc',
+ mac_address='fe:16:3e:51:60:dd'):
LOG.debug("test_add_tap_interface - START")
new_network = (
LOG.debug("test_add_tap_interface -END")
- def test_remove_interface(
- self, tenant_id="test_tenant", network_name="test_network",
- interface_id='fe701ddf-26a2-42ea-b9e6-7313d1c522cc',
- mac_address='fe:16:3e:51:60:dd'):
+ def test_remove_interface(self, tenant_id="test_tenant",
+ network_name="test_network",
+ interface_id='fe701ddf-26a2-42ea-'
+ 'b9e6-7313d1c522cc',
+ mac_address='fe:16:3e:51:60:dd'):
LOG.debug("test_remove_interface - START")
new_network = (
LOG.debug("test_remove_interface -END")
- def test_ensure_vlan_bridge(
- self, tenant_id="test_tenant",
- network_name="test_network",
- interface_id='fe701ddf-26a2-42ea-b9e6-7313d1c522cc'):
+ def test_ensure_vlan_bridge(self, tenant_id="test_tenant",
+ network_name="test_network",
+ interface_id='fe701ddf-26a2-42ea-'
+ 'b9e6-7313d1c522cc'):
LOG.debug("test_ensure_vlan_bridge - START")
new_network = (
self._linuxbridge_quantum_agent.linux_br.ensure_vlan_bridge(
new_network[lconst.NET_ID], str(vlan_id))
list_quantum_bridges = (self._linuxbridge_quantum_agent.linux_br.
- get_all_quantum_bridges())
+ get_all_quantum_bridges())
self.assertTrue(bridge_name in list_quantum_bridges)
list_interface = (self._linuxbridge_quantum_agent.linux_br.
get_interfaces_on_bridge(bridge_name))
LOG.debug("test_ensure_vlan_bridge -END")
- def test_delete_vlan_bridge(
- self, tenant_id="test_tenant", network_name="test_network",
- interface_id='fe701ddf-26a2-42ea-b9e6-7313d1c522cc'):
+ def test_delete_vlan_bridge(self, tenant_id="test_tenant",
+ network_name="test_network",
+ interface_id='fe701ddf-26a2-42ea-'
+ 'b9e6-7313d1c522cc'):
LOG.debug("test_delete_vlan_bridge - START")
new_network = (
LOG.debug("test_delete_vlan_bridge - END")
- def test_process_deleted_networks(
- self, tenant_id="test_tenant", network_name="test_network",
- interface_id='fe701ddf-26a2-42ea-b9e6-7313d1c522cc'):
+ def test_process_deleted_networks(self, tenant_id="test_tenant",
+ network_name="test_network",
+ interface_id='fe701ddf-26a2-42ea-'
+ 'b9e6-7313d1c522cc'):
LOG.debug("test_delete_vlan_bridge - START")
new_network = (
self.assertEquals(self.device_exists(bridge_name), False)
LOG.debug("test_delete_vlan_bridge - END")
- def test_process_unplugged_tap_interface(
- self, tenant_id="test_tenant", network_name="test_network",
- interface_id='fe701ddf-26a2-42ea-b9e6-7313d1c522cc',
- mac_address='fe:16:3e:51:60:dd'):
+ def test_process_unplugged_tap_interface(self, tenant_id="test_tenant",
+ network_name="test_network",
+ interface_id='fe701ddf-26a2-'
+ '42ea-b9e6-'
+ '7313d1c522cc',
+ mac_address='fe:16:3e:51:60:dd'):
LOG.debug("test_process_unplugged_tap_interface - START")
new_network = (
self._linuxbridge_quantum_agent.process_port_binding(
new_port[lconst.PORT_ID], new_network[lconst.NET_ID],
interface_id, str(vlan_id))
- list_interface = self._linuxbridge_quantum_agent.linux_br.\
- get_interfaces_on_bridge(bridge_name)
+ list_interface = (self._linuxbridge_quantum_agent.linux_br.
+ get_interfaces_on_bridge(bridge_name))
self._linuxbridge_plugin.unplug_interface(tenant_id,
new_network[lconst.NET_ID],
new_port[lconst.PORT_ID])
self._linuxbridge_quantum_agent.process_unplugged_interfaces(
plugged_interface)
list_interface = (self._linuxbridge_quantum_agent.linux_br.
- get_interfaces_on_bridge(bridge_name))
+ get_interfaces_on_bridge(bridge_name))
self.assertFalse(device_name in list_interface)
for interface in list_interface:
self._linuxbridge_quantum_agent.linux_br.remove_interface(
LOG.debug("test_test_process_unplugged_tap_interface -END")
- def test_process_unplugged_interface_empty(
- self, tenant_id="test_tenant", network_name="test_network"):
+ def test_process_unplugged_interface_empty(self, tenant_id="test_tenant",
+ network_name="test_network"):
""" test to unplug not plugged port. It should not raise exception
"""
LOG.debug("test_process_unplugged_interface_empty - START")
LOG.debug("test_process_unplugged_interface_empty -END")
- def test_process_unplugged_gw_interface(
- self, tenant_id="test_tenant", network_name="test_network",
- interface_id='fe701ddf-26a2-42ea-b9e6-7313d1c522cc',
- mac_address='fe:16:3e:51:60:dd'):
+ def test_process_unplugged_gw_interface(self, tenant_id="test_tenant",
+ network_name="test_network",
+ interface_id='fe701ddf-26a2-42ea-'
+ 'b9e6-7313d1c522cc',
+ mac_address='fe:16:3e:51:60:dd'):
LOG.debug("test_process_unplugged_gw_interface - START")
new_network = (
self._linuxbridge_quantum_agent.process_unplugged_interfaces(
plugged_interface)
list_interface = (self._linuxbridge_quantum_agent.linux_br.
- get_interfaces_on_bridge(bridge_name))
+ get_interfaces_on_bridge(bridge_name))
self.assertFalse(device_name in list_interface)
for interface in list_interface:
self._linuxbridge_quantum_agent.linux_br.remove_interface(
from quantum.plugins.nicira.nicira_nvp_plugin.api_client.client_eventlet \
- import NvpApiClientEventlet
+ import NvpApiClientEventlet
from quantum.plugins.nicira.nicira_nvp_plugin.api_client.request_eventlet \
- import NvpGenericRequestEventlet
+ import NvpGenericRequestEventlet
LOG = logging.getLogger("NVPApiHelper")
307: zero,
400: zero,
500: zero,
- }
+ }
class NvpApiException(Exception):
from quantum.common import exceptions as exception
from quantum.plugins.nicira.nicira_nvp_plugin.api_client.client_eventlet \
- import (
- DEFAULT_CONCURRENT_CONNECTIONS,
- DEFAULT_FAILOVER_TIME,
+ import (
+ DEFAULT_CONCURRENT_CONNECTIONS,
+ DEFAULT_FAILOVER_TIME,
)
from quantum.plugins.nicira.nicira_nvp_plugin.api_client.request_eventlet \
- import (
- DEFAULT_REQUEST_TIMEOUT,
- DEFAULT_HTTP_TIMEOUT,
- DEFAULT_RETRIES,
- DEFAULT_REDIRECTS,
+ import (
+ DEFAULT_REQUEST_TIMEOUT,
+ DEFAULT_HTTP_TIMEOUT,
+ DEFAULT_RETRIES,
+ DEFAULT_REDIRECTS,
)
plugin_config = {
'failover_time': failover_time,
'concurrent_connections': concurrent_connections,
- }
+ }
LOG.info('parse_config(): plugin_config == "%s"' % plugin_config)
cluster = NVPCluster('cluster1')
"net-ifaces": remote_vifs,
"net-name": result["display_name"],
"net-op-status": "UP",
- }
+ }
LOG.debug("get_network_details() completed for tenant %s: %s" %
(tenant_id, d))
return d
'net-id': netw_id,
'net-name': result["display_name"],
'net-op-status': "UP",
- }
+ }
def get_all_ports(self, tenant_id, netw_id, **kwargs):
"""
if not nvplib.check_tenant(self.controller, netw_id, tenant_id):
raise exception.NetworkNotFound(net_id=netw_id)
result = nvplib.create_port(tenant_id, netw_id, port_init_state,
- **params)
+ **params)
d = {
"port-id": result["uuid"],
"port-op-status": result["port-op-status"],
- }
+ }
LOG.debug("create_port() completed for tenant %s: %s" % (tenant_id, d))
return d
'port-id': portw_id,
'port-state': result["admin_status_enabled"],
'port-op-status': result["port-op-status"],
- }
+ }
LOG.debug("returning updated port %s: " % port)
return port
if not nvplib.check_tenant(self.controller, netw_id, tenant_id):
raise exception.NetworkNotFound(net_id=netw_id)
port = nvplib.get_port(self.controller, netw_id, portw_id,
- "LogicalPortAttachment")
+ "LogicalPortAttachment")
state = "ACTIVE" if port["admin_status_enabled"] else "DOWN"
op_status = nvplib.get_port_status(self.controller, netw_id, portw_id)
"port-id": portw_id, "attachment": vif_uuid,
"net-id": netw_id, "port-state": state,
"port-op-status": op_status,
- }
+ }
LOG.debug("Port details for tenant %s: %s" % (tenant_id, d))
return d
"""
if not nvplib.check_tenant(self.controller, netw_id, tenant_id):
raise exception.NetworkNotFound(net_id=netw_id)
- result = nvplib.plug_interface(self.controller, netw_id, portw_id,
- "VifAttachment", attachment=remote_interface_id)
+ result = nvplib.plug_interface(self.controller, netw_id,
+ portw_id, "VifAttachment",
+ attachment=remote_interface_id)
LOG.debug("plug_interface() completed for %s: %s" %
(tenant_id, result))
from quantum.plugins.nicira.nicira_nvp_plugin.api_client.common import (
_conn_str,
- )
+)
import quantum.plugins.nicira.nicira_nvp_plugin.api_client.client as client
import quantum.plugins.nicira.nicira_nvp_plugin.api_client.request_eventlet
from quantum.openstack.common import jsonutils
from quantum.plugins.nicira.nicira_nvp_plugin.api_client.common import (
_conn_str,
- )
+)
import quantum.plugins.nicira.nicira_nvp_plugin.api_client.request as request
import quantum.plugins.nicira.nicira_nvp_plugin.api_client.client_eventlet
response.body = response.read()
response.headers = response.getheaders()
LOG.info("Request '%s' complete: %s (%0.2f seconds)"
- % (self._request_str(conn, url), response.status,
- time.time() - issued_time))
+ % (self._request_str(conn, url), response.status,
+ time.time() - issued_time))
if response.status not in [httplib.MOVED_PERMANENTLY,
httplib.TEMPORARY_REDIRECT]:
break
api_providers = [(result.hostname, result.port, use_https)]
client_eventlet = (
quantum.plugins.nicira.nicira_nvp_plugin.api_client.client_eventlet
- )
+ )
api_client = client_eventlet.NvpApiClientEventlet(
api_providers, self._api_client.user, self._api_client.password,
use_https=use_https)
LOG.debug('req: %s' % type(req))
if isinstance(req, httplib.HTTPResponse):
- if (req.status == httplib.UNAUTHORIZED
- or req.status == httplib.FORBIDDEN):
+ if ((req.status == httplib.UNAUTHORIZED
+ or req.status == httplib.FORBIDDEN)):
self._api_client.need_login = True
if attempt <= self._retries:
continue
from quantum.plugins.nicira.nicira_nvp_plugin import nvplib
from quantum.plugins.nicira.nicira_nvp_plugin.QuantumPlugin import (
NvpPlugin as QuantumManager,
- )
+)
logging.basicConfig(level=logging.INFO)
"""Help for CLI"""
print "\nNVP Plugin Commands:"
for key in COMMANDS.keys():
- print " %s %s" % (key,
- " ".join(["<%s>" % y for y in COMMANDS[key]["args"]]))
+ print " %s %s" %
+ (key, " ".join(["<%s>" % y for y in COMMANDS[key]["args"]]))
def build_args(cmd, cmdargs, arglist):
del arglist[0]
except:
LOG.error("Not enough arguments for \"%s\" (expected: %d, got: %d)" % (
- cmd, len(cmdargs), len(orig_arglist)))
- print "Usage:\n %s %s" % (cmd,
- " ".join(["<%s>" % y for y in COMMANDS[cmd]["args"]]))
+ cmd, len(cmdargs), len(orig_arglist)))
+ print "Usage:\n %s %s" %
+ (cmd, " ".join(["<%s>" % y for y in COMMANDS[cmd]["args"]]))
sys.exit()
if len(arglist) > 0:
LOG.error("Too many arguments for \"%s\" (expected: %d, got: %d)" % (
- cmd, len(cmdargs), len(orig_arglist)))
- print "Usage:\n %s %s" % (cmd,
- " ".join(["<%s>" % y for y in COMMANDS[cmd]["args"]]))
+ cmd, len(cmdargs), len(orig_arglist)))
+ print "Usage:\n %s %s" %
+ (cmd, " ".join(["<%s>" % y for y in COMMANDS[cmd]["args"]]))
sys.exit()
return args
COMMANDS = {
- "check_config": {
- "need_login": True,
- "func": check_config,
- "args": []},
- }
+ "check_config": {
+ "need_login": True,
+ "func": check_config,
+ "args": []
+ },
+}
def main():
usagestr = "Usage: %prog [OPTIONS] <command> [args]"
PARSER = OptionParser(usage=usagestr)
PARSER.add_option("-v", "--verbose", dest="verbose",
- action="store_true", default=False, help="turn on verbose logging")
- PARSER.add_option("-c", "--configfile", dest="configfile",
- type="string", default="/etc/quantum/plugins/nvp/nvp.ini",
- help="nvp plugin config file path (nvp.ini)")
+ action="store_true", default=False,
+ help="turn on verbose logging")
+ PARSER.add_option("-c", "--configfile", dest="configfile", type="string",
+ default="/etc/quantum/plugins/nvp/nvp.ini",
+ help="nvp plugin config file path (nvp.ini)")
options, args = PARSER.parse_args()
loglevel = logging.INFO
controller = kwargs["controller"]
transport_zone = kwargs.get("transport_zone",
- controller.default_tz_uuid)
+ controller.default_tz_uuid)
transport_type = kwargs.get("transport_type", "gre")
lswitch_obj = {
"display_name": net_name,
- "transport_zones": [
- {
- "zone_uuid": transport_zone,
- "transport_type": transport_type,
- },
- ],
+ "transport_zones": [{
+ "zone_uuid": transport_zone,
+ "transport_type": transport_type,
+ }],
"tags": [{"tag": tenant_id, "scope": "os_tid"}],
- }
+ }
net = create_lswitch(controller, lswitch_obj)
net['net-op-status'] = "UP"
if filters and "attachment" in filters:
uri += "&attachment_vif_uuid=%s" % filters["attachment"]
try:
- resp_obj = do_single_request("GET", uri,
- controller=controller)
+ resp_obj = do_single_request("GET", uri, controller=controller)
except NvpApiClient.ResourceNotFound as e:
LOG.error("Network not found, Error: %s" % str(e))
raise exception.NetworkNotFound(net_id=network)
def delete_all_ports(controller, ls_uuid):
- res = do_single_request("GET",
- "/ws.v1/lswitch/%s/lport?fields=uuid" % ls_uuid,
- controller=controller)
+ res = do_single_request("GET", "/ws.v1/lswitch/%s/lport?fields=uuid" %
+ ls_uuid, controller=controller)
res = jsonutils.loads(res)
for r in res["results"]:
do_single_request(
from quantum.plugins.nicira.nicira_nvp_plugin.QuantumPlugin import (
NVPCluster,
parse_config,
- )
+)
class ConfigParserTest(unittest.TestCase):
from quantum.plugins.nicira.nicira_nvp_plugin import (
NvpApiClient,
nvplib,
- )
+)
logging.basicConfig(level=logging.DEBUG)
body = {"display_name": name,
"tags": [{"tag": "plugin-test"}]}
try:
- resp_obj = self.quantum.api_client.request("POST",
- post_uri, jsonutils.dumps(body))
+ resp_obj = self.quantum.api_client.request("POST", post_uri,
+ jsonutils.dumps(body))
except NvpApiClient.NvpApiException as e:
print("Unknown API Error: %s" % str(e))
raise exception.QuantumException()
def test_update_network(self):
resp = self.quantum.create_network("quantum-test-tenant",
- "quantum-Private-TenantA")
+ "quantum-Private-TenantA")
net_id = resp["net-id"]
try:
resp = self.quantum.update_network("quantum-test-tenant", net_id,
from quantum.plugins.nicira.nicira_nvp_plugin.api_client import (
client_eventlet as nace,
request_eventlet as nare,
- )
+)
logging.basicConfig(level=logging.DEBUG)
from quantum.plugins.nicira.nicira_nvp_plugin import (
NvpApiClient,
nvplib,
- )
+)
logging.basicConfig(level=logging.DEBUG)
{
"ip_address": "172.168.17.5",
"mac_address": "10:9a:dd:61:4e:89",
- },
+ },
{
"ip_address": "172.168.17.6",
"mac_address": "10:9a:dd:61:4e:88",
- },
- ]
+ },
+ ]
resp = self.quantum.create_port("quantum-test-tenant", net_id,
"ACTIVE", **params)
port_id = resp["port-id"]
old_vic = resp["attachment"]
self.assertTrue(old_vic == "None")
self.quantum.plug_interface("quantum-test-tenant", net_id, port_id,
- "nova-instance-test-%s" % os.getpid())
+ "nova-instance-test-%s" % os.getpid())
resp = self.quantum.get_port_details("quantum-test-tenant", net_id,
port_id)
new_vic = resp["attachment"]
self.assertTrue(True)
def test_create_multi_port_attachment(self):
- resp = self.quantum.create_custom_network(
- "quantum-test-tenant", "quantum-Private-TenantA",
- self.BRIDGE_TZ_UUID, self.quantum.controller)
+ resp = self.quantum.create_custom_network("quantum-test-tenant",
+ "quantum-Private-TenantA",
+ self.BRIDGE_TZ_UUID,
+ self.quantum.controller)
net_id = resp["net-id"]
resp = self.quantum.create_port("quantum-test-tenant", net_id,
self.assertTrue(old_vic == "None")
self.quantum.plug_interface("quantum-test-tenant", net_id, port_id1,
- "nova-instance-test-%s" % os.getpid())
+ "nova-instance-test-%s" % os.getpid())
resp = self.quantum.get_port_details("quantum-test-tenant", net_id,
port_id1)
new_vic = resp["attachment"]
self.assertTrue(old_vic2 == "None")
self.quantum.plug_interface("quantum-test-tenant", net_id, port_id2,
- "nova-instance-test2-%s" % os.getpid())
+ "nova-instance-test2-%s" % os.getpid())
resp = self.quantum.get_port_details("quantum-test-tenant", net_id,
port_id2)
new_vic = resp["attachment"]
# Make sure we only get the filtered ones back
ports = self.quantum.get_all_ports("quantum-test-tenant", net_id,
- filter_opts={"attachment": "attachment2"})
+ filter_opts={"attachment":
+ "attachment2"})
self.assertTrue(len(ports) == 1)
self.assertTrue(ports[0]["port-id"] == port_id2)
if old_b != new_b:
if old_b is not None:
LOG.info("Removing binding to net-id = %s for %s"
- % (old_b, str(p)))
+ % (old_b, str(p)))
self.port_unbound(p, True)
if p.vif_id in all_bindings:
all_bindings[p.vif_id].op_status = OP_STATUS_DOWN
lsw_id = lsw_id_bindings[new_net_uuid]
self.port_bound(p, new_net_uuid, lsw_id)
all_bindings[p.vif_id].op_status = OP_STATUS_UP
- LOG.info("Port " + str(p) + " on net-id = "
- + new_net_uuid + " bound to " +
- str(self.local_vlan_map[new_net_uuid]))
+ LOG.info("Port %s on net-id = %s bound to %s " % (
+ str(p), new_net_uuid,
+ str(self.local_vlan_map[new_net_uuid])))
for vif_id in disappeared_vif_ports_ids:
LOG.info("Port Disappeared: " + vif_id)
usagestr = "%prog [OPTIONS] <config file>"
parser = OptionParser(usage=usagestr)
parser.add_option("-v", "--verbose", dest="verbose",
- action="store_true", default=False, help="turn on verbose logging")
+ action="store_true", default=False,
+ help="turn on verbose logging")
options, args = parser.parse_args()
self.vlan_id = vlan_id
def __repr__(self):
- return "<VlanBinding(%s,%s)>" % \
- (self.vlan_id, self.network_id)
+ return "<VlanBinding(%s,%s)>" % (self.vlan_id, self.network_id)
class TunnelIP(BASE):
'net-id': net_id,
'net-name': net_name,
'net-op-status': op_status,
- }
+ }
if ports:
res['net-ports'] = ports
return res
'port-op-status': op_status,
'net-id': port.network_id,
'attachment': port.interface_id,
- }
+ }
def get_all_ports(self, tenant_id, net_id, **kwargs):
ids = []
LVM = ovs_quantum_agent.LocalVLANMapping(LV_ID, LS_ID, LV_IDS)
VIF_ID = '404deaec-5d37-11e1-a64b-000c29d5f0a8'
VIF_MAC = '3c:09:24:1e:78:23'
-VIF_PORT = ovs_lib.VifPort('port', 'ofport', VIF_ID, VIF_MAC,
- 'switch')
+VIF_PORT = ovs_lib.VifPort('port', 'ofport',
+ VIF_ID, VIF_MAC, 'switch')
class DummyPort:
self.TUN_OFPORT = 'PATCH_TUN_OFPORT'
self.mox.StubOutClassWithMocks(ovs_lib, 'OVSBridge')
- self.mock_int_bridge = ovs_lib.OVSBridge(self.INT_BRIDGE,
- 'sudo')
+ self.mock_int_bridge = ovs_lib.OVSBridge(self.INT_BRIDGE, 'sudo')
self.mock_int_bridge.delete_port('patch-tun')
self.mock_int_bridge.add_patch_port(
'patch-tun', 'patch-int').AndReturn(self.TUN_OFPORT)
self.mock_int_bridge.remove_all_flows()
self.mock_int_bridge.add_flow(priority=1, actions='normal')
- self.mock_tun_bridge = ovs_lib.OVSBridge(self.TUN_BRIDGE,
- 'sudo')
+ self.mock_tun_bridge = ovs_lib.OVSBridge(self.TUN_BRIDGE, 'sudo')
self.mock_tun_bridge.reset_bridge()
self.mock_tun_bridge.add_patch_port(
'patch-int', 'patch-tun').AndReturn(self.INT_OFPORT)
from quantum.plugins.openvswitch.ovs_quantum_plugin import (
NoFreeVLANException,
VlanMap,
- )
+)
class VlanMapTest(unittest.TestCase):
"param-name=other-config",
"param-key=nicira-iface-id",
"uuid=%s" % xs_vif_uuid],
- root_helper=self.root_helper).strip()
+ root_helper=self.root_helper).strip()
def _vifport(self, name, external_ids):
ofport = self.db_get_val("Interface", name, "ofport")
usagestr = "%prog [OPTIONS] <config file>"
parser = OptionParser(usage=usagestr)
parser.add_option("-v", "--verbose", dest="verbose",
- action="store_true", default=False, help="turn on verbose logging")
+ action="store_true", default=False,
+ help="turn on verbose logging")
options, args = parser.parse_args()
"""Specific filter for the dnsmasq call (which includes env)"""
def match(self, userargs):
- if (userargs[0].startswith("FLAGFILE=") and
- userargs[1].startswith("NETWORK_ID=") and
- userargs[2] == "dnsmasq"):
+ if ((userargs[0].startswith("FLAGFILE=") and
+ userargs[1].startswith("NETWORK_ID=") and
+ userargs[2] == "dnsmasq")):
return True
return False
# 'ip', 'tuntap'
filters.CommandFilter("/usr/sbin/ip", "root"),
filters.CommandFilter("/sbin/ip", "root"),
- ]
+]
# "xe", "vif-param-get", ...
filters.CommandFilter("/usr/bin/xe", "root"),
filters.CommandFilter("/usr/sbin/xe", "root"),
- ]
+]
# "xe", "vif-param-get", ...
filters.CommandFilter("/usr/bin/xe", "root"),
filters.CommandFilter("/usr/sbin/xe", "root"),
- ]
+]
FILTERS_MODULES = ['quantum.rootwrap.linuxbridge-agent',
'quantum.rootwrap.openvswitch-agent',
- 'quantum.rootwrap.ryu-agent',
- ]
+ 'quantum.rootwrap.ryu-agent']
def load_filters():
# Create a network and a port
network_id = self._create_network(fmt)
port_id = self._create_port(network_id, "ACTIVE", fmt)
- show_network_req = testlib.show_network_detail_request(
- self.tenant_id, network_id, fmt)
+ show_network_req = testlib.show_network_detail_request(self.tenant_id,
+ network_id,
+ fmt)
show_network_res = show_network_req.get_response(self.api)
self.assertEqual(show_network_res.status_int, 200)
network_data = self._deserialize_net_response(content_type,
def _test_list_ports_networknotfound(self, fmt):
LOG.debug("_test_list_ports_networknotfound"
- " - fmt:%s - START", fmt)
+ " - fmt:%s - START", fmt)
list_port_req = testlib.port_list_request(self.tenant_id,
"A_BAD_ID", fmt)
list_port_res = list_port_req.get_response(self.api)
port_data = self._deserialize_port_response(content_type,
show_port_res)
self.assert_port(id=port_id, state=port_state,
- port_data=port_data['port'])
+ port_data=port_data['port'])
LOG.debug("_test_show_port - fmt:%s - END", fmt)
def _test_show_port_detail(self, fmt):
def get_actions(self):
return [extensions.ActionExtension('dummy_resources',
- 'FOXNSOX:add_tweedle',
- self._add_tweedle_handler),
+ 'FOXNSOX:add_tweedle',
+ self._add_tweedle_handler),
extensions.ActionExtension('dummy_resources',
- 'FOXNSOX:delete_tweedle',
- self._delete_tweedle_handler)]
+ 'FOXNSOX:delete_tweedle',
+ self._delete_tweedle_handler)]
def get_request_extensions(self):
request_exts = []
def test_with_helper(self):
result = utils.execute(["ls", self.test_file],
- self.root_helper)
+ self.root_helper)
self.assertEqual(result, "ls %s\n" % self.test_file)
def test_stderr(self):
test_api.NETS: nets.ControllerV10._serialization_metadata,
test_api.PORTS: ports.ControllerV10._serialization_metadata,
test_api.ATTS: atts.ControllerV10._serialization_metadata,
- })
+ }
+ )
self._successful_create_code = exc.HTTPOk.code
self._network_not_found_code = 420
self._network_in_use_code = 421
test_api.NETS: nets.ControllerV11._serialization_metadata,
test_api.PORTS: ports.ControllerV11._serialization_metadata,
test_api.ATTS: atts.ControllerV11._serialization_metadata,
- })
+ }
+ )
self._successful_create_code = exc.HTTPAccepted.code
self._network_not_found_code = exc.HTTPNotFound.code
self._network_in_use_code = exc.HTTPConflict.code
test_api.NETS: nets.ControllerV11._serialization_metadata,
test_api.PORTS: ports.ControllerV11._serialization_metadata,
test_api.ATTS: atts.ControllerV11._serialization_metadata,
- })
+ }
+ )
self._successful_create_code = exc.HTTPAccepted.code
self.net_op_status = test_config.get('default_net_op_status',
'UNKNOWN')
def deserialize(self, content_type, response):
ctype = 'application/%s' % content_type
- data = self._deserializers[ctype].\
- deserialize(response.body)['body']
+ data = self._deserializers[ctype].deserialize(response.body)['body']
return data
def _create_network(self, fmt, name, admin_status_up):
ExtensionManager,
ExtensionMiddleware,
PluginAwareExtensionManager,
- )
+)
from quantum.openstack.common import jsonutils
from quantum.plugins.sample.SamplePlugin import QuantumEchoPlugin
from quantum.tests.unit import BaseTest
StubBaseAppController,
StubExtension,
StubPlugin,
- )
+)
import quantum.tests.unit.extensions
from quantum import wsgi
self.assertEqual(response.json['uneditable'], "original_value")
ext_app = self._setup_app_with_request_handler(_update_handler,
- 'PUT')
+ 'PUT')
ext_response = ext_app.put("/dummy_resources/1",
{'uneditable': "new_value"})
self.assertEqual(ext_response.json['uneditable'], "new_value")
def test_reset_bridge(self):
utils.execute(["ovs-vsctl", self.TO, "--",
"--if-exists", "del-br", self.BR_NAME],
- root_helper=self.root_helper)
+ root_helper=self.root_helper)
utils.execute(["ovs-vsctl", self.TO, "add-br", self.BR_NAME],
root_helper=self.root_helper)
self.mox.ReplayAll()
pname = "tap5"
utils.execute(["ovs-vsctl", self.TO, "--", "--if-exists",
"del-port", self.BR_NAME, pname],
- root_helper=self.root_helper)
+ root_helper=self.root_helper)
self.mox.ReplayAll()
self.br.delete_port(pname)
"hard_timeout=0,idle_timeout=0,"
"priority=2,dl_src=ca:fe:de:ad:be:ef"
",actions=strip_vlan,output:0"],
- root_helper=self.root_helper)
+ root_helper=self.root_helper)
utils.execute(["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,"
"priority=1,actions=normal"],
- root_helper=self.root_helper)
+ root_helper=self.root_helper)
utils.execute(["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,"
"priority=2,actions=drop"],
- root_helper=self.root_helper)
+ root_helper=self.root_helper)
utils.execute(["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,"
"priority=2,in_port=%s,actions=drop" % ofport],
- root_helper=self.root_helper)
+ root_helper=self.root_helper)
utils.execute(["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,"
"priority=4,in_port=%s,dl_vlan=%s,"
"actions=strip_vlan,set_tunnel:%s,normal"
% (ofport, vid, lsw_id)],
- root_helper=self.root_helper)
+ root_helper=self.root_helper)
utils.execute(["ovs-ofctl", "add-flow", self.BR_NAME,
"hard_timeout=0,idle_timeout=0,"
"priority=3,tun_id=%s,actions="
self.mox.ReplayAll()
self.br.add_flow(priority=2, dl_src="ca:fe:de:ad:be:ef",
- actions="strip_vlan,output:0")
+ actions="strip_vlan,output:0")
self.br.add_flow(priority=1, actions="normal")
self.br.add_flow(priority=2, actions="drop")
self.br.add_flow(priority=2, in_port=ofport, actions="drop")
self.br.add_flow(priority=4, in_port=ofport, dl_vlan=vid,
- actions="strip_vlan,set_tunnel:%s,normal" %
- (lsw_id))
+ actions="strip_vlan,set_tunnel:%s,normal" %
+ (lsw_id))
self.br.add_flow(priority=3, tun_id=lsw_id,
- actions="mod_vlan_vid:%s,output:%s" %
- (vid, ofport))
+ actions="mod_vlan_vid:%s,output:%s" %
+ (vid, ofport))
self.mox.VerifyAll()
def test_get_port_ofport(self):
ofport = "6"
utils.execute(["ovs-vsctl", self.TO, "get",
"Interface", pname, "ofport"],
- root_helper=self.root_helper).AndReturn(ofport)
+ root_helper=self.root_helper).AndReturn(ofport)
self.mox.ReplayAll()
self.assertEqual(self.br.get_port_ofport(pname), ofport)
def test_count_flows(self):
utils.execute(["ovs-ofctl", "dump-flows", self.BR_NAME],
- root_helper=self.root_helper).\
- AndReturn("ignore\nflow-1\n")
+ root_helper=self.root_helper).AndReturn('ignore'
+ '\nflow-1\n')
self.mox.ReplayAll()
# counts the number of flows as total lines of output - 2
lsw_id = 40
vid = 39
utils.execute(["ovs-ofctl", "del-flows", self.BR_NAME,
- "in_port=" + ofport], root_helper=self.root_helper)
+ "in_port=" + ofport], root_helper=self.root_helper)
utils.execute(["ovs-ofctl", "del-flows", self.BR_NAME,
- "tun_id=%s" % lsw_id], root_helper=self.root_helper)
+ "tun_id=%s" % lsw_id], root_helper=self.root_helper)
utils.execute(["ovs-ofctl", "del-flows", self.BR_NAME,
- "dl_vlan=%s" % vid], root_helper=self.root_helper)
+ "dl_vlan=%s" % vid], root_helper=self.root_helper)
self.mox.ReplayAll()
self.br.delete_flows(in_port=ofport)
ofport = "6"
utils.execute(["ovs-vsctl", self.TO, "add-port",
- self.BR_NAME, pname], root_helper=self.root_helper)
+ self.BR_NAME, pname], root_helper=self.root_helper)
utils.execute(["ovs-vsctl", self.TO, "set", "Interface",
- pname, "type=gre"], root_helper=self.root_helper)
+ pname, "type=gre"], root_helper=self.root_helper)
utils.execute(["ovs-vsctl", self.TO, "set", "Interface",
- pname, "options:remote_ip=" + ip],
+ pname, "options:remote_ip=" + ip],
root_helper=self.root_helper)
utils.execute(["ovs-vsctl", self.TO, "set", "Interface",
- pname, "options:in_key=flow"],
+ pname, "options:in_key=flow"],
root_helper=self.root_helper)
utils.execute(["ovs-vsctl", self.TO, "set", "Interface",
- pname, "options:out_key=flow"],
+ pname, "options:out_key=flow"],
root_helper=self.root_helper)
utils.execute(["ovs-vsctl", self.TO, "get",
- "Interface", pname, "ofport"],
+ "Interface", pname, "ofport"],
root_helper=self.root_helper).AndReturn(ofport)
self.mox.ReplayAll()
ofport = "6"
utils.execute(["ovs-vsctl", self.TO, "add-port",
- self.BR_NAME, pname], root_helper=self.root_helper)
+ self.BR_NAME, pname], root_helper=self.root_helper)
utils.execute(["ovs-vsctl", self.TO, "set", "Interface",
- pname, "type=patch"], root_helper=self.root_helper)
+ pname, "type=patch"], root_helper=self.root_helper)
utils.execute(["ovs-vsctl", self.TO, "set",
- "Interface", pname, "options:peer=" + peer],
+ "Interface", pname, "options:peer=" + peer],
root_helper=self.root_helper)
utils.execute(["ovs-vsctl", self.TO, "get",
- "Interface", pname, "ofport"],
+ "Interface", pname, "ofport"],
root_helper=self.root_helper).AndReturn(ofport)
self.mox.ReplayAll()
% (vif_id, mac))
utils.execute(["ovs-vsctl", self.TO, "get",
- "Interface", pname, "external_ids"],
+ "Interface", pname, "external_ids"],
root_helper=self.root_helper).AndReturn(external_ids)
utils.execute(["ovs-vsctl", self.TO, "get",
- "Interface", pname, "ofport"],
+ "Interface", pname, "ofport"],
root_helper=self.root_helper).AndReturn(ofport)
if is_xen:
utils.execute(["xe", "vif-param-get", "param-name=other-config",
- "param-key=nicira-iface-id", "uuid=" + vif_id],
+ "param-key=nicira-iface-id", "uuid=" + vif_id],
root_helper=self.root_helper).AndReturn(vif_id)
self.mox.ReplayAll()
def test_clear_db_attribute(self):
pname = "tap77"
utils.execute(["ovs-vsctl", self.TO, "clear", "Port",
- pname, "tag"], root_helper=self.root_helper)
+ pname, "tag"], root_helper=self.root_helper)
self.mox.ReplayAll()
self.br.clear_db_attribute("Port", pname, "tag")
self.mox.VerifyAll()
with open(self.mailmap, 'w') as mm_fh:
mm_fh.write("Foo Bar <email@foo.com> Foo Bar <email@bar.com>\n")
self.assertEqual({'<email@bar.com>': '<email@foo.com>'},
- parse_mailmap(self.mailmap))
+ parse_mailmap(self.mailmap))
def test_mailmap_with_firstname(self):
with open(self.mailmap, 'w') as mm_fh:
mm_fh.write("Foo <email@foo.com> Foo <email@bar.com>\n")
self.assertEqual({'<email@bar.com>': '<email@foo.com>'},
- parse_mailmap(self.mailmap))
+ parse_mailmap(self.mailmap))
def test_mailmap_with_noname(self):
with open(self.mailmap, 'w') as mm_fh:
mm_fh.write("<email@foo.com> <email@bar.com>\n")
self.assertEqual({'<email@bar.com>': '<email@foo.com>'},
- parse_mailmap(self.mailmap))
+ parse_mailmap(self.mailmap))
def tearDown(self):
if os.path.exists(self.mailmap):
}
self.body_deserializers.update(body_deserializers or {})
- self.headers_deserializer = headers_deserializer or \
- RequestHeadersDeserializer()
+ self.headers_deserializer = (headers_deserializer or
+ RequestHeadersDeserializer())
def deserialize(self, request):
"""Extract necessary pieces of the request.
"""WSGI method that controls (de)serialization and method dispatch."""
LOG.info("%(method)s %(url)s" % {"method": request.method,
- "url": request.url})
+ "url": request.url})
try:
action, args, accept = self.deserializer.deserialize(request)
HAS_EASY_INSTALL = bool(run_command(['which', 'easy_install'],
check_exit_code=False).strip())
HAS_VIRTUALENV = bool(run_command(['which', 'virtualenv'],
- check_exit_code=False).strip())
+ check_exit_code=False).strip())
def check_dependencies():
"""Make sure virtualenv is in the path."""
if not HAS_VIRTUALENV:
- raise Exception('Virtualenv not found. ' +
- 'Try installing python-virtualenv')
+ raise Exception('Virtualenv not found. '
+ 'Try installing python-virtualenv')
print 'done.'