--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 VMware, Inc.
+# All Rights Reserved
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.openstack.common import log
+
+LOG = log.getLogger(__name__)
+MAX_DISPLAY_NAME_LEN = 40
+
+
+def check_and_truncate(display_name):
+ if display_name and len(display_name) > MAX_DISPLAY_NAME_LEN:
+ LOG.debug(_("Specified name:'%s' exceeds maximum length. "
+ "It will be truncated on NVP"), display_name)
+ return display_name[:MAX_DISPLAY_NAME_LEN]
+ return display_name or ''
from neutron.common import exceptions as exception
from neutron.openstack.common import excutils
from neutron.openstack.common import log
-from neutron.plugins.nicira.common import (
- exceptions as nvp_exc)
+from neutron.plugins.nicira.common import exceptions as nvp_exc
+from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira import NvpApiClient
from neutron.version import version_info
GWSERVICE_RESOURCE = "gateway-service"
# Current neutron version
NEUTRON_VERSION = version_info.release_string()
-# Other constants for NVP resource
-MAX_DISPLAY_NAME_LEN = 40
# Constants for NAT rules
MATCH_KEYS = ["destination_ip_addresses", "destination_port_max",
"destination_port_min", "source_ip_addresses",
# To fit it into an NVP tag we need to hash it, however device_id
# used for ports associated to VM's are small enough so let's skip the
# hashing
- if len(device_id) > MAX_DISPLAY_NAME_LEN or obfuscate:
+ if len(device_id) > utils.MAX_DISPLAY_NAME_LEN or obfuscate:
return hashlib.sha1(device_id).hexdigest()
else:
return device_id
return uri_path
-def _check_and_truncate_name(display_name):
- if display_name and len(display_name) > MAX_DISPLAY_NAME_LEN:
- LOG.debug(_("Specified name:'%s' exceeds maximum length. "
- "It will be truncated on NVP"), display_name)
- return display_name[:MAX_DISPLAY_NAME_LEN]
- return display_name or ''
-
-
def get_cluster_version(cluster):
"""Return major/minor version #."""
# Get control-cluster nodes
neutron_net_id=None,
shared=None,
**kwargs):
- lswitch_obj = {"display_name": _check_and_truncate_name(display_name),
+ lswitch_obj = {"display_name": utils.check_and_truncate(display_name),
"transport_zones": transport_zones_config,
"tags": [{"tag": tenant_id, "scope": "os_tid"},
{"tag": NEUTRON_VERSION, "scope": "quantum"}]}
def update_lswitch(cluster, lswitch_id, display_name,
tenant_id=None, **kwargs):
uri = _build_uri_path(LSWITCH_RESOURCE, resource_id=lswitch_id)
- lswitch_obj = {"display_name": _check_and_truncate_name(display_name),
+ lswitch_obj = {"display_name": utils.check_and_truncate(display_name),
"tags": [{"tag": tenant_id, "scope": "os_tid"},
{"tag": NEUTRON_VERSION, "scope": "quantum"}]}
if "tags" in kwargs:
"device_id": device['interface_name'],
"type": "L2Gateway"} for device in devices]
gwservice_obj = {
- "display_name": _check_and_truncate_name(display_name),
+ "display_name": utils.check_and_truncate(display_name),
"tags": tags,
"gateways": gateways,
"type": "L2GatewayServiceConfig"
def _prepare_lrouter_body(name, tenant_id, router_type,
distributed=None, **kwargs):
body = {
- "display_name": _check_and_truncate_name(name),
+ "display_name": utils.check_and_truncate(name),
"tags": [{"tag": tenant_id, "scope": "os_tid"},
{"tag": NEUTRON_VERSION, "scope": "quantum"}],
"routing_config": {
if not display_name:
# Nothing to update
return gwservice_obj
- gwservice_obj["display_name"] = _check_and_truncate_name(display_name)
+ gwservice_obj["display_name"] = utils.check_and_truncate(display_name)
return do_request("PUT", _build_uri_path(GWSERVICE_RESOURCE,
resource_id=gateway_id),
json.dumps(gwservice_obj), cluster=cluster)
# Nothing to update
return lrouter_obj
# It seems that this is faster than the doing an if on display_name
- lrouter_obj["display_name"] = (_check_and_truncate_name(display_name) or
+ lrouter_obj["display_name"] = (utils.check_and_truncate(display_name) or
lrouter_obj["display_name"])
if nexthop:
nh_element = lrouter_obj["routing_config"].get(
mac_learning_enabled=None, allowed_address_pairs=None):
lport_obj = dict(
admin_status_enabled=admin_status_enabled,
- display_name=_check_and_truncate_name(display_name),
+ display_name=utils.check_and_truncate(display_name),
tags=[dict(scope='os_tid', tag=tenant_id),
dict(scope='q_port_id', tag=neutron_port_id),
dict(scope='vm_id', tag=device_id_to_vm_id(device_id)),
security_profiles=None, queue_id=None,
mac_learning_enabled=None, allowed_address_pairs=None):
"""Creates a logical port on the assigned logical switch."""
- display_name = _check_and_truncate_name(display_name)
+ display_name = utils.check_and_truncate(display_name)
lport_obj = dict(
admin_status_enabled=admin_status_enabled,
display_name=display_name,
{'ethertype': 'IPv6'}]}
tags = [dict(scope='os_tid', tag=tenant_id),
dict(scope='quantum', tag=NEUTRON_VERSION)]
- display_name = _check_and_truncate_name(security_profile.get('name'))
+ display_name = utils.check_and_truncate(security_profile.get('name'))
body = mk_body(
tags=tags, display_name=display_name,
logical_port_ingress_rules=(
from neutron.common import exceptions
from neutron.plugins.nicira.common import config # noqa
from neutron.plugins.nicira.common import exceptions as nvp_exc
+from neutron.plugins.nicira.common import utils
from neutron.plugins.nicira import nvp_cluster
from neutron.plugins.nicira import NvpApiClient
from neutron.plugins.nicira import nvplib
def test_check_and_truncate_name_with_none(self):
name = None
- result = nvplib._check_and_truncate_name(name)
+ result = utils.check_and_truncate(name)
self.assertEqual('', result)
def test_check_and_truncate_name_with_short_name(self):
name = 'foo_port_name'
- result = nvplib._check_and_truncate_name(name)
+ result = utils.check_and_truncate(name)
self.assertEqual(name, result)
def test_check_and_truncate_name_long_name(self):
name = 'this_is_a_port_whose_name_is_longer_than_40_chars'
- result = nvplib._check_and_truncate_name(name)
- self.assertEqual(len(result), nvplib.MAX_DISPLAY_NAME_LEN)
+ result = utils.check_and_truncate(name)
+ self.assertEqual(len(result), utils.MAX_DISPLAY_NAME_LEN)
def _nicira_method(method_name, module_name='nvplib'):