Common utilities and helper functions for Openstack Networking Plugins.
"""
+import hashlib
import webob.exc
+from oslo_log import log as logging
+import six
+
from neutron.api.v2 import attributes
+from neutron.common import constants as n_const
from neutron.common import exceptions as n_exc
+from neutron.i18n import _LI
from neutron.plugins.common import constants as p_const
+LOG = logging.getLogger(__name__)
+
def is_valid_vlan_tag(vlan):
return p_const.MIN_VLAN_TAG <= vlan <= p_const.MAX_VLAN_TAG
port.get('port', {}),
check_allow_post=check_allow_post)
return core_plugin.create_port(context, {'port': port_data})
+
+
+def get_interface_name(name, prefix='', postfix='',
+ max_len=n_const.DEVICE_NAME_MAX_LEN):
+ """Construct an interface name based on the prefix, postfix and name.
+
+ The interface name can not exceed the maximum length passed in. Longer
+ names are hashed to help ensure uniqueness.
+ """
+ requested_name = prefix + name + postfix
+
+ if len(requested_name) <= max_len:
+ return requested_name
+ # We can't just truncate because interfaces may be distinguished
+ # by an ident at the end. A hash over the name should be unique.
+ # Leave part of the interface name on for easier identification
+ hashlen = 6
+
+ if (len(prefix + postfix) + hashlen) > max_len:
+ raise ValueError("Too long pre- and postfix provided. New name would "
+ "exceed given length for an interface name.")
+
+ namelen = max_len - len(prefix) - hashlen - len(postfix)
+ if isinstance(name, six.text_type):
+ hashed_name = hashlib.sha1(name.encode('utf-8'))
+ else:
+ hashed_name = hashlib.sha1(name)
+ new_name = ('%(prefix)s%(truncated)s%(hash)s%(postfix)s' %
+ {'prefix': prefix, 'truncated': name[0:namelen],
+ 'hash': hashed_name.hexdigest()[0:hashlen],
+ 'postfix': postfix})
+ LOG.info(_LI("The requested interface name %(requested_name)s exceeds the "
+ "%(limit)d character limitation. It was shortened to "
+ "%(new_name)s to fit."),
+ {'requested_name': requested_name,
+ 'limit': max_len, 'new_name': new_name})
+ return new_name
from neutron import context
from neutron.i18n import _LE, _LI, _LW
from neutron.plugins.common import constants as p_const
+from neutron.plugins.common import utils as p_utils
from neutron.plugins.ml2.drivers.l2pop.rpc_manager \
import l2population_rpc as l2pop_rpc
from neutron.plugins.ml2.drivers.linuxbridge.agent import arp_protect
bridge_name = BRIDGE_NAME_PREFIX + network_id[0:11]
return bridge_name
- def get_subinterface_name(self, physical_interface, vlan_id):
+ def get_vlan_device_name(self, physical_interface, vlan_id):
if not vlan_id:
- LOG.warning(_LW("Invalid VLAN ID, will lead to incorrect "
- "subinterface name"))
- subinterface_name = '%s.%s' % (physical_interface, vlan_id)
- return subinterface_name
+ raise ValueError("No VLAN ID specified!")
+
+ vlan_len = len(str(vlan_id))
+ if vlan_len > 4:
+ raise ValueError("Invalid VLAN ID! ID exceeds 4 digits!")
+
+ vlan_postfix = '.%s' % vlan_id
+
+ # Handling for too long physical_interface names:
+ # Ensure that vlan devices that belong to the same logical network
+ # use the same naming pattern despite the hashing algorithm that is
+ # used in such cases. E.g.
+ # Interface name: "very_long_name" should NOT result in
+ # "veryHASHED.1111" and "very_loHASHED.1" but rather in
+ # "veryHASHED.1111" and "veryHASHED.1". This can be accomplished with
+ # requesting a smaller device name length for small vlan ids.
+ max_len = constants.DEVICE_NAME_MAX_LEN - (4 - vlan_len)
+ vlan_name = p_utils.get_interface_name(physical_interface,
+ postfix=vlan_postfix,
+ max_len=max_len)
+ return vlan_name
def get_tap_device_name(self, interface_id):
if not interface_id:
def ensure_vlan(self, physical_interface, vlan_id):
"""Create a vlan unless it already exists."""
- interface = self.get_subinterface_name(physical_interface, vlan_id)
- if not ip_lib.device_exists(interface):
+ vlan_device = self.get_vlan_device_name(physical_interface, vlan_id)
+ if not ip_lib.device_exists(vlan_device):
LOG.debug("Creating subinterface %(interface)s for "
"VLAN %(vlan_id)s on interface "
"%(physical_interface)s",
- {'interface': interface, 'vlan_id': vlan_id,
+ {'interface': vlan_device, 'vlan_id': vlan_id,
'physical_interface': physical_interface})
if utils.execute(['ip', 'link', 'add', 'link',
physical_interface,
- 'name', interface, 'type', 'vlan', 'id',
+ 'name', vlan_device, 'type', 'vlan', 'id',
vlan_id], run_as_root=True):
return
if utils.execute(['ip', 'link', 'set',
- interface, 'up'], run_as_root=True):
+ vlan_device, 'up'], run_as_root=True):
return
- LOG.debug("Done creating subinterface %s", interface)
- return interface
+ LOG.debug("Done creating subinterface %s", vlan_device)
+ return vlan_device
def ensure_vxlan(self, segmentation_id):
"""Create a vxlan unless it already exists."""
# under the License.
import collections
-import hashlib
import signal
import sys
import time
from neutron import context
from neutron.i18n import _LE, _LI, _LW
from neutron.plugins.common import constants as p_const
+from neutron.plugins.common import utils as p_utils
from neutron.plugins.ml2.drivers.l2pop.rpc_manager import l2population_rpc
from neutron.plugins.ml2.drivers.openvswitch.agent.common \
import constants
self.tun_br.setup_default_table(self.patch_int_ofport,
self.arp_responder_enabled)
- def get_peer_name(self, prefix, name):
- """Construct a peer name based on the prefix and name.
-
- The peer name can not exceed the maximum length allowed for a linux
- device. Longer names are hashed to help ensure uniqueness.
- """
- if len(prefix + name) <= n_const.DEVICE_NAME_MAX_LEN:
- return prefix + name
- # We can't just truncate because bridges may be distinguished
- # by an ident at the end. A hash over the name should be unique.
- # Leave part of the bridge name on for easier identification
- hashlen = 6
- namelen = n_const.DEVICE_NAME_MAX_LEN - len(prefix) - hashlen
- if isinstance(name, six.text_type):
- hashed_name = hashlib.sha1(name.encode('utf-8'))
- else:
- hashed_name = hashlib.sha1(name)
- new_name = ('%(prefix)s%(truncated)s%(hash)s' %
- {'prefix': prefix, 'truncated': name[0:namelen],
- 'hash': hashed_name.hexdigest()[0:hashlen]})
- LOG.warning(_LW("Creating an interface named %(name)s exceeds the "
- "%(limit)d character limitation. It was shortened to "
- "%(new_name)s to fit."),
- {'name': name, 'limit': n_const.DEVICE_NAME_MAX_LEN,
- 'new_name': new_name})
- return new_name
-
def setup_physical_bridges(self, bridge_mappings):
'''Setup the physical network bridges.
self.phys_brs[physical_network] = br
# interconnect physical and integration bridges using veth/patchs
- int_if_name = self.get_peer_name(constants.PEER_INTEGRATION_PREFIX,
- bridge)
- phys_if_name = self.get_peer_name(constants.PEER_PHYSICAL_PREFIX,
- bridge)
+ int_if_name = p_utils.get_interface_name(bridge,
+ prefix=constants.PEER_INTEGRATION_PREFIX)
+ phys_if_name = p_utils.get_interface_name(bridge,
+ prefix=constants.PEER_PHYSICAL_PREFIX)
# Interface type of port for physical and integration bridges must
# be same, so check only one of them.
int_type = self.int_br.db_get_val("Interface", int_if_name, "type")
--- /dev/null
+# Copyright (c) 2015 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import hashlib
+import mock
+
+
+from neutron.plugins.common import utils
+from neutron.tests import base
+
+
+LONG_NAME1 = "A_REALLY_LONG_INTERFACE_NAME1"
+LONG_NAME2 = "A_REALLY_LONG_INTERFACE_NAME2"
+SHORT_NAME = "SHORT"
+
+MOCKED_HASH = "mockedhash"
+
+
+class MockSHA(object):
+ def hexdigest(self):
+ return MOCKED_HASH
+
+
+class TestUtils(base.BaseTestCase):
+
+ @mock.patch.object(hashlib, 'sha1', return_value=MockSHA())
+ def test_get_interface_name(self, mock_sha1):
+ prefix = "pre-"
+ postfix = ".1111"
+
+ if_default_long = utils.get_interface_name(LONG_NAME1)
+ if_default_short = utils.get_interface_name(SHORT_NAME)
+
+ if_prefix_long = utils.get_interface_name(LONG_NAME1, prefix=prefix)
+ if_prefix_short = utils.get_interface_name(SHORT_NAME, prefix=prefix)
+
+ if_postfix_long = utils.get_interface_name(LONG_NAME1, postfix=postfix)
+ if_postfix_short = utils.get_interface_name(SHORT_NAME,
+ postfix=postfix)
+
+ if_prefix_postfix_long = utils.get_interface_name(LONG_NAME1,
+ prefix=prefix,
+ postfix=postfix)
+ if_prefix_postfix_short = utils.get_interface_name(SHORT_NAME,
+ prefix=prefix,
+ postfix=postfix)
+
+ # Each combination is a tuple of the following values:
+ # the calculated name, the expected name"
+ combinations = [(if_default_long, "A_REALLY_mocked"),
+ (if_default_short, "SHORT"),
+ (if_prefix_long, "pre-A_REAmocked"),
+ (if_prefix_short, "pre-SHORT"),
+ (if_postfix_long, "A_REmocked.1111"),
+ (if_postfix_short, "SHORT.1111"),
+ (if_prefix_postfix_long, "pre-mocked.1111"),
+ (if_prefix_postfix_short, "pre-SHORT.1111")]
+
+ for if_new_name, if_expected_new_name in combinations:
+ self.assertEqual(if_new_name, if_expected_new_name)
+
+ def test_get_interface_uniqueness(self):
+ prefix = "prefix-"
+ if_prefix1 = utils.get_interface_name(LONG_NAME1, prefix=prefix)
+ if_prefix2 = utils.get_interface_name(LONG_NAME2, prefix=prefix)
+ self.assertNotEqual(if_prefix1, if_prefix2)
+
+ def test_get_interface_long_post_and_prefix(self):
+ """Prefix and postfix alone overcome the max character limit."""
+
+ long_prefix = "long_pre"
+ long_postfix = "long_pos"
+ much_too_long_prefix = "much_too_long_prefix"
+ much_too_long_postfix = "much_too_long_postfix"
+
+ self.assertEqual("long_preSHORT", utils.get_interface_name(SHORT_NAME,
+ prefix=long_prefix))
+ self.assertEqual("SHORTlong_pos", utils.get_interface_name(SHORT_NAME,
+ postfix=long_postfix))
+ self.assertRaises(ValueError, utils.get_interface_name, SHORT_NAME,
+ prefix=long_prefix, postfix=long_postfix)
+
+ self.assertRaises(ValueError, utils.get_interface_name, SHORT_NAME,
+ prefix=much_too_long_prefix)
+ self.assertRaises(ValueError, utils.get_interface_name, SHORT_NAME,
+ postfix=much_too_long_postfix)
+
+ @mock.patch.object(hashlib, 'sha1', return_value=MockSHA())
+ def test_get_interface_max_len(self, mock_sha1):
+ self.assertTrue(len(utils.get_interface_name(LONG_NAME1)) == 15)
+ self.assertTrue(len(utils.get_interface_name(LONG_NAME1, max_len=10))
+ == 10)
from neutron.common import constants
from neutron.common import exceptions
from neutron.plugins.common import constants as p_const
+from neutron.plugins.common import utils as p_utils
from neutron.plugins.ml2.drivers.linuxbridge.agent.common \
import constants as lconst
from neutron.plugins.ml2.drivers.linuxbridge.agent \
self.assertEqual(self.lbm.get_bridge_name(nw_id),
"brq")
- def test_get_subinterface_name(self):
- self.assertEqual(self.lbm.get_subinterface_name("eth0", "0"),
- "eth0.0")
- self.assertEqual(self.lbm.get_subinterface_name("eth0", ""),
- "eth0.")
+ def test_get_vlan_device_name(self):
+ with mock.patch.object(p_utils, "get_interface_name",
+ return_value="eth0.1") as mock_get_vlan:
+ self.assertEqual(self.lbm.get_vlan_device_name("eth0", 1),
+ "eth0.1")
+ mock_get_vlan.assert_called_with("eth0", postfix=".1", max_len=12)
+
+ with mock.patch.object(p_utils, "get_interface_name",
+ return_value="eth0.1111") as mock_get_vlan:
+ self.assertEqual(self.lbm.get_vlan_device_name("eth0", 1111),
+ "eth0.1111")
+ mock_get_vlan.assert_called_with("eth0", postfix=".1111",
+ max_len=15)
+ invalid_vlan = 11111
+ self.assertRaises(ValueError, self.lbm.get_vlan_device_name, "eth0",
+ invalid_vlan)
+ self.assertRaises(ValueError, self.lbm.get_vlan_device_name, "eth0", 0)
+ self.assertRaises(ValueError, self.lbm.get_vlan_device_name, "eth0",
+ None)
def test_get_tap_device_name(self):
if_id = "123456789101112"
self.assertEqual(self.agent.phys_ofports["physnet1"],
"phy_ofport")
- def test_get_peer_name(self):
- bridge1 = "A_REALLY_LONG_BRIDGE_NAME1"
- bridge2 = "A_REALLY_LONG_BRIDGE_NAME2"
- self.agent.use_veth_interconnection = True
- self.assertEqual(len(self.agent.get_peer_name('int-', bridge1)),
- n_const.DEVICE_NAME_MAX_LEN)
- self.assertEqual(len(self.agent.get_peer_name('int-', bridge2)),
- n_const.DEVICE_NAME_MAX_LEN)
- self.assertNotEqual(self.agent.get_peer_name('int-', bridge1),
- self.agent.get_peer_name('int-', bridge2))
-
def test_setup_tunnel_br(self):
self.tun_br = mock.Mock()
with mock.patch.object(self.agent.int_br,