LQUEUE_RESOURCE = "lqueue"
GWSERVICE_RESOURCE = "gateway-service"
QUANTUM_VERSION = "2013.1"
+# Other constants for NVP resource
+MAX_DISPLAY_NAME_LEN = 40
# Constants for NAT rules
MATCH_KEYS = ["destination_ip_addresses", "destination_port_max",
"destination_port_min", "source_ip_addresses",
return uri_path
+def _check_and_truncate_name(display_name):
+ if display_name and len(display_name) > MAX_DISPLAY_NAME_LEN:
+ LOG.debug(_("Specified name:'%s' exceeds maximum length. "
+ "It will be truncated on NVP"), display_name)
+ return display_name[:MAX_DISPLAY_NAME_LEN]
+ return display_name
+
+
def get_cluster_version(cluster):
"""Return major/minor version #"""
# Get control-cluster nodes
cluster.default_tz_uuid),
"transport_type": (nvp_binding_type or
DEF_TRANSPORT_TYPE)}
- lswitch_obj = {"display_name": display_name,
+ lswitch_obj = {"display_name": _check_and_truncate_name(display_name),
"transport_zones": [transport_zone_config],
"tags": [{"tag": tenant_id, "scope": "os_tid"}]}
if nvp_binding_type == 'bridge' and vlan_id:
def update_lswitch(cluster, lswitch_id, display_name,
tenant_id=None, **kwargs):
uri = _build_uri_path(LSWITCH_RESOURCE, resource_id=lswitch_id)
- # TODO(salvatore-orlando): Make sure this operation does not remove
- # any other important tag set on the lswtich object
- lswitch_obj = {"display_name": display_name,
+ lswitch_obj = {"display_name": _check_and_truncate_name(display_name),
"tags": [{"tag": tenant_id, "scope": "os_tid"}]}
if "tags" in kwargs:
lswitch_obj["tags"].extend(kwargs["tags"])
"device_id": device['interface_name'],
"type": "L2Gateway"} for device in devices]
gwservice_obj = {
- "display_name": display_name,
+ "display_name": _check_and_truncate_name(display_name),
"tags": tags,
"gateways": gateways,
"type": "L2GatewayServiceConfig"
with the NVP controller
"""
tags = [{"tag": tenant_id, "scope": "os_tid"}]
+ display_name = _check_and_truncate_name(display_name)
lrouter_obj = {
"display_name": display_name,
"tags": tags,
if not display_name:
# Nothing to update
return gwservice_obj
- gwservice_obj["display_name"] = display_name
+ gwservice_obj["display_name"] = _check_and_truncate_name(display_name)
try:
return json.loads(do_single_request("PUT",
_build_uri_path(GWSERVICE_RESOURCE,
# Nothing to update
return lrouter_obj
# It seems that this is faster than the doing an if on display_name
- lrouter_obj["display_name"] = display_name or lrouter_obj["display_name"]
+ lrouter_obj["display_name"] = (_check_and_truncate_name(display_name) or
+ lrouter_obj["display_name"])
if nexthop:
nh_element = lrouter_obj["routing_config"].get(
"default_route_next_hop")
display_name, device_id, admin_status_enabled,
mac_address=None, fixed_ips=None, port_security_enabled=None,
security_profiles=None, queue_id=None):
-
# device_id can be longer than 40 so we rehash it
hashed_device_id = hashlib.sha1(device_id).hexdigest()
lport_obj = dict(
admin_status_enabled=admin_status_enabled,
- display_name=display_name,
+ display_name=_check_and_truncate_name(display_name),
tags=[dict(scope='os_tid', tag=tenant_id),
dict(scope='q_port_id', tag=quantum_port_id),
dict(scope='vm_id', tag=hashed_device_id)])
-
_configure_extensions(lport_obj, mac_address, fixed_ips,
port_security_enabled, security_profiles,
queue_id)
""" Creates a logical port on the assigned logical switch """
# device_id can be longer than 40 so we rehash it
hashed_device_id = hashlib.sha1(device_id).hexdigest()
+ display_name = _check_and_truncate_name(display_name)
lport_obj = dict(
admin_status_enabled=admin_status_enabled,
display_name=display_name,
'ip_prefix': '0.0.0.0/0'}],
'logical_port_ingress_rules': []}
try:
+ display_name = _check_and_truncate_name(security_profile.get('name'))
body = mk_body(
- tags=tags, display_name=security_profile.get('name'),
+ tags=tags, display_name=display_name,
logical_port_ingress_rules=dhcp['logical_port_ingress_rules'],
logical_port_egress_rules=dhcp['logical_port_egress_rules'])
rsp = do_request(HTTP_POST, path, body, cluster=cluster)
LOG = logging.getLogger(__name__)
+MAX_NAME_LEN = 40
+
+
+def _validate_name(name):
+ if name and len(name) > MAX_NAME_LEN:
+ raise Exception("Logical switch name exceeds %d characters",
+ MAX_NAME_LEN)
+
+
+def _validate_resource(body):
+ _validate_name(body.get('display_name'))
class FakeClient:
_fake_lqueue_dict = {}
_fake_gatewayservice_dict = {}
+ _validators = {
+ LSWITCH_RESOURCE: _validate_resource,
+ LSWITCH_LPORT_RESOURCE: _validate_resource,
+ LROUTER_LPORT_RESOURCE: _validate_resource,
+ SECPROF_RESOURCE: _validate_resource,
+ LQUEUE_RESOURCE: _validate_resource,
+ GWSERVICE_RESOURCE: _validate_resource
+ }
+
def __init__(self, fake_files_path):
self.fake_files_path = fake_files_path
with open("%s/%s" % (self.fake_files_path, response_file)) as f:
response_template = f.read()
add_resource = getattr(self, '_add_%s' % res_type)
+ body_json = json.loads(body)
+ val_func = self._validators.get(res_type)
+ if val_func:
+ val_func(body_json)
args = [body]
if len(uuids):
args.append(uuids[0])
is_attachment = True
res_type = res_type[:res_type.index('attachment')]
res_dict = getattr(self, '_fake_%s_dict' % res_type)
+ body_json = json.loads(body)
+ val_func = self._validators.get(res_type)
+ if val_func:
+ val_func(body_json)
resource = res_dict[uuids[-1]]
if not is_attachment:
- resource.update(json.loads(body))
+ resource.update(body_json)
else:
relations = resource.get("_relations", {})
body_2 = json.loads(body)
self.assertEqual(res['port']['fixed_ips'],
data['port']['fixed_ips'])
+ def test_create_port_name_exceeds_40_chars(self):
+ name = 'this_is_a_port_whose_name_is_longer_than_40_chars'
+ with self.port(name=name) as port:
+ # Assert the Quantum name is not truncated
+ self.assertEqual(name, port['port']['name'])
+
class TestNiciraNetworksV2(test_plugin.TestNetworksV2,
NiciraPluginV2TestCase):
# tenant must see a single network
self.assertEqual(len(res['networks']), 1)
+ def test_create_network_name_exceeds_40_chars(self):
+ name = 'this_is_a_network_whose_name_is_longer_than_40_chars'
+ with self.network(name=name) as net:
+ # Assert Quantum name is not truncated
+ self.assertEqual(net['network']['name'], name)
+
class NiciraPortSecurityTestCase(psec.PortSecurityDBTestCase):
class TestNiciraSecurityGroup(ext_sg.TestSecurityGroups,
NiciraSecurityGroupsTestCase):
- pass
+
+ def test_create_security_group_name_exceeds_40_chars(self):
+ name = 'this_is_a_secgroup_whose_name_is_longer_than_40_chars'
+ with self.security_group(name=name) as sg:
+ # Assert Quantum name is not truncated
+ self.assertEqual(sg['security_group']['name'], name)
class TestNiciraL3NatTestCase(test_l3_plugin.L3NatDBTestCase,
def _nvp_metadata_teardown(self):
cfg.CONF.set_override('enable_metadata_access_network', False, 'NVP')
+ def test_create_router_name_exceeds_40_chars(self):
+ name = 'this_is_a_router_whose_name_is_longer_than_40_chars'
+ with self.router(name=name) as rtr:
+ # Assert Quantum name is not truncated
+ self.assertEqual(rtr['router']['name'], name)
+
def test_router_add_interface_subnet_with_metadata_access(self):
self._nvp_metadata_setup()
self.test_router_add_interface_subnet()
self.assertEqual(q['qos_queue']['qos_marking'], 'untrusted')
self.assertFalse(q['qos_queue']['default'])
+ def test_create_qos_queue_name_exceeds_40_chars(self):
+ name = 'this_is_a_queue_whose_name_is_longer_than_40_chars'
+ with self.qos_queue(name=name) as queue:
+ # Assert Quantum name is not truncated
+ self.assertEqual(queue['qos_queue']['name'], name)
+
def test_create_qos_queue_default(self):
with self.qos_queue(default=True) as q:
self.assertTrue(q['qos_queue']['default'])
cfg.CONF.set_override('api_extensions_path', ext_path)
super(TestNiciraNetworkGateway, self).setUp()
+ def test_create_network_gateway_name_exceeds_40_chars(self):
+ name = 'this_is_a_gateway_whose_name_is_longer_than_40_chars'
+ with self._network_gateway(name=name) as nw_gw:
+ # Assert Quantum name is not truncated
+ self.assertEqual(nw_gw[self.resource]['name'], name)
+
def test_list_network_gateways(self):
with self._network_gateway(name='test-gw-1') as gw1:
with self._network_gateway(name='test_gw_2') as gw2: