Common utilities and helper functions for Openstack Networking Plugins.
"""
-import hashlib
import webob.exc
-from oslo_log import log as logging
-import six
-
from neutron.api.v2 import attributes
-from neutron.common import constants as n_const
from neutron.common import exceptions as n_exc
-from neutron.i18n import _LI
from neutron.plugins.common import constants as p_const
-LOG = logging.getLogger(__name__)
-
def is_valid_vlan_tag(vlan):
return p_const.MIN_VLAN_TAG <= vlan <= p_const.MAX_VLAN_TAG
port.get('port', {}),
check_allow_post=check_allow_post)
return core_plugin.create_port(context, {'port': port_data})
-
-
-def get_interface_name(name, prefix='', postfix='',
- max_len=n_const.DEVICE_NAME_MAX_LEN):
- """Construct an interface name based on the prefix, postfix and name.
-
- The interface name can not exceed the maximum length passed in. Longer
- names are hashed to help ensure uniqueness.
- """
- requested_name = prefix + name + postfix
-
- if len(requested_name) <= max_len:
- return requested_name
- # We can't just truncate because interfaces may be distinguished
- # by an ident at the end. A hash over the name should be unique.
- # Leave part of the interface name on for easier identification
- hashlen = 6
-
- if (len(prefix + postfix) + hashlen) > max_len:
- raise ValueError("Too long pre- and postfix provided. New name would "
- "exceed given length for an interface name.")
-
- namelen = max_len - len(prefix) - hashlen - len(postfix)
- if isinstance(name, six.text_type):
- hashed_name = hashlib.sha1(name.encode('utf-8'))
- else:
- hashed_name = hashlib.sha1(name)
- new_name = ('%(prefix)s%(truncated)s%(hash)s%(postfix)s' %
- {'prefix': prefix, 'truncated': name[0:namelen],
- 'hash': hashed_name.hexdigest()[0:hashlen],
- 'postfix': postfix})
- LOG.info(_LI("The requested interface name %(requested_name)s exceeds the "
- "%(limit)d character limitation. It was shortened to "
- "%(new_name)s to fit."),
- {'requested_name': requested_name,
- 'limit': max_len, 'new_name': new_name})
- return new_name
from neutron import context
from neutron.i18n import _LE, _LI, _LW
from neutron.plugins.common import constants as p_const
-from neutron.plugins.common import utils as p_utils
from neutron.plugins.ml2.drivers.l2pop.rpc_manager \
import l2population_rpc as l2pop_rpc
from neutron.plugins.ml2.drivers.linuxbridge.agent import arp_protect
bridge_name = BRIDGE_NAME_PREFIX + network_id[0:11]
return bridge_name
- def get_vlan_device_name(self, physical_interface, vlan_id):
+ def get_subinterface_name(self, physical_interface, vlan_id):
if not vlan_id:
- raise ValueError("No VLAN ID specified!")
-
- vlan_len = len(str(vlan_id))
- if vlan_len > 4:
- raise ValueError("Invalid VLAN ID! ID exceeds 4 digits!")
-
- vlan_postfix = '.%s' % vlan_id
-
- # Handling for too long physical_interface names:
- # Ensure that vlan devices that belong to the same logical network
- # use the same naming pattern despite the hashing algorithm that is
- # used in such cases. E.g.
- # Interface name: "very_long_name" should NOT result in
- # "veryHASHED.1111" and "very_loHASHED.1" but rather in
- # "veryHASHED.1111" and "veryHASHED.1". This can be accomplished with
- # requesting a smaller device name length for small vlan ids.
- max_len = constants.DEVICE_NAME_MAX_LEN - (4 - vlan_len)
- vlan_name = p_utils.get_interface_name(physical_interface,
- postfix=vlan_postfix,
- max_len=max_len)
- return vlan_name
+ LOG.warning(_LW("Invalid VLAN ID, will lead to incorrect "
+ "subinterface name"))
+ subinterface_name = '%s.%s' % (physical_interface, vlan_id)
+ return subinterface_name
def get_tap_device_name(self, interface_id):
if not interface_id:
def ensure_vlan(self, physical_interface, vlan_id):
"""Create a vlan unless it already exists."""
- vlan_device = self.get_vlan_device_name(physical_interface, vlan_id)
- if not ip_lib.device_exists(vlan_device):
+ interface = self.get_subinterface_name(physical_interface, vlan_id)
+ if not ip_lib.device_exists(interface):
LOG.debug("Creating subinterface %(interface)s for "
"VLAN %(vlan_id)s on interface "
"%(physical_interface)s",
- {'interface': vlan_device, 'vlan_id': vlan_id,
+ {'interface': interface, 'vlan_id': vlan_id,
'physical_interface': physical_interface})
if utils.execute(['ip', 'link', 'add', 'link',
physical_interface,
- 'name', vlan_device, 'type', 'vlan', 'id',
+ 'name', interface, 'type', 'vlan', 'id',
vlan_id], run_as_root=True):
return
if utils.execute(['ip', 'link', 'set',
- vlan_device, 'up'], run_as_root=True):
+ interface, 'up'], run_as_root=True):
return
- LOG.debug("Done creating subinterface %s", vlan_device)
- return vlan_device
+ LOG.debug("Done creating subinterface %s", interface)
+ return interface
def ensure_vxlan(self, segmentation_id):
"""Create a vxlan unless it already exists."""
# under the License.
import collections
+import hashlib
import signal
import sys
import time
from neutron import context
from neutron.i18n import _LE, _LI, _LW
from neutron.plugins.common import constants as p_const
-from neutron.plugins.common import utils as p_utils
from neutron.plugins.ml2.drivers.l2pop.rpc_manager import l2population_rpc
from neutron.plugins.ml2.drivers.openvswitch.agent.common \
import constants
self.tun_br.setup_default_table(self.patch_int_ofport,
self.arp_responder_enabled)
+ def get_peer_name(self, prefix, name):
+ """Construct a peer name based on the prefix and name.
+
+ The peer name can not exceed the maximum length allowed for a linux
+ device. Longer names are hashed to help ensure uniqueness.
+ """
+ if len(prefix + name) <= n_const.DEVICE_NAME_MAX_LEN:
+ return prefix + name
+ # We can't just truncate because bridges may be distinguished
+ # by an ident at the end. A hash over the name should be unique.
+ # Leave part of the bridge name on for easier identification
+ hashlen = 6
+ namelen = n_const.DEVICE_NAME_MAX_LEN - len(prefix) - hashlen
+ if isinstance(name, six.text_type):
+ hashed_name = hashlib.sha1(name.encode('utf-8'))
+ else:
+ hashed_name = hashlib.sha1(name)
+ new_name = ('%(prefix)s%(truncated)s%(hash)s' %
+ {'prefix': prefix, 'truncated': name[0:namelen],
+ 'hash': hashed_name.hexdigest()[0:hashlen]})
+ LOG.warning(_LW("Creating an interface named %(name)s exceeds the "
+ "%(limit)d character limitation. It was shortened to "
+ "%(new_name)s to fit."),
+ {'name': name, 'limit': n_const.DEVICE_NAME_MAX_LEN,
+ 'new_name': new_name})
+ return new_name
+
def setup_physical_bridges(self, bridge_mappings):
'''Setup the physical network bridges.
self.phys_brs[physical_network] = br
# interconnect physical and integration bridges using veth/patchs
- int_if_name = p_utils.get_interface_name(bridge,
- prefix=constants.PEER_INTEGRATION_PREFIX)
- phys_if_name = p_utils.get_interface_name(bridge,
- prefix=constants.PEER_PHYSICAL_PREFIX)
+ int_if_name = self.get_peer_name(constants.PEER_INTEGRATION_PREFIX,
+ bridge)
+ phys_if_name = self.get_peer_name(constants.PEER_PHYSICAL_PREFIX,
+ bridge)
# Interface type of port for physical and integration bridges must
# be same, so check only one of them.
int_type = self.int_br.db_get_val("Interface", int_if_name, "type")
+++ /dev/null
-# Copyright (c) 2015 IBM Corp.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import hashlib
-import mock
-
-
-from neutron.plugins.common import utils
-from neutron.tests import base
-
-
-LONG_NAME1 = "A_REALLY_LONG_INTERFACE_NAME1"
-LONG_NAME2 = "A_REALLY_LONG_INTERFACE_NAME2"
-SHORT_NAME = "SHORT"
-
-MOCKED_HASH = "mockedhash"
-
-
-class MockSHA(object):
- def hexdigest(self):
- return MOCKED_HASH
-
-
-class TestUtils(base.BaseTestCase):
-
- @mock.patch.object(hashlib, 'sha1', return_value=MockSHA())
- def test_get_interface_name(self, mock_sha1):
- prefix = "pre-"
- postfix = ".1111"
-
- if_default_long = utils.get_interface_name(LONG_NAME1)
- if_default_short = utils.get_interface_name(SHORT_NAME)
-
- if_prefix_long = utils.get_interface_name(LONG_NAME1, prefix=prefix)
- if_prefix_short = utils.get_interface_name(SHORT_NAME, prefix=prefix)
-
- if_postfix_long = utils.get_interface_name(LONG_NAME1, postfix=postfix)
- if_postfix_short = utils.get_interface_name(SHORT_NAME,
- postfix=postfix)
-
- if_prefix_postfix_long = utils.get_interface_name(LONG_NAME1,
- prefix=prefix,
- postfix=postfix)
- if_prefix_postfix_short = utils.get_interface_name(SHORT_NAME,
- prefix=prefix,
- postfix=postfix)
-
- # Each combination is a tuple of the following values:
- # the calculated name, the expected name"
- combinations = [(if_default_long, "A_REALLY_mocked"),
- (if_default_short, "SHORT"),
- (if_prefix_long, "pre-A_REAmocked"),
- (if_prefix_short, "pre-SHORT"),
- (if_postfix_long, "A_REmocked.1111"),
- (if_postfix_short, "SHORT.1111"),
- (if_prefix_postfix_long, "pre-mocked.1111"),
- (if_prefix_postfix_short, "pre-SHORT.1111")]
-
- for if_new_name, if_expected_new_name in combinations:
- self.assertEqual(if_new_name, if_expected_new_name)
-
- def test_get_interface_uniqueness(self):
- prefix = "prefix-"
- if_prefix1 = utils.get_interface_name(LONG_NAME1, prefix=prefix)
- if_prefix2 = utils.get_interface_name(LONG_NAME2, prefix=prefix)
- self.assertNotEqual(if_prefix1, if_prefix2)
-
- def test_get_interface_long_post_and_prefix(self):
- """Prefix and postfix alone overcome the max character limit."""
-
- long_prefix = "long_pre"
- long_postfix = "long_pos"
- much_too_long_prefix = "much_too_long_prefix"
- much_too_long_postfix = "much_too_long_postfix"
-
- self.assertEqual("long_preSHORT", utils.get_interface_name(SHORT_NAME,
- prefix=long_prefix))
- self.assertEqual("SHORTlong_pos", utils.get_interface_name(SHORT_NAME,
- postfix=long_postfix))
- self.assertRaises(ValueError, utils.get_interface_name, SHORT_NAME,
- prefix=long_prefix, postfix=long_postfix)
-
- self.assertRaises(ValueError, utils.get_interface_name, SHORT_NAME,
- prefix=much_too_long_prefix)
- self.assertRaises(ValueError, utils.get_interface_name, SHORT_NAME,
- postfix=much_too_long_postfix)
-
- @mock.patch.object(hashlib, 'sha1', return_value=MockSHA())
- def test_get_interface_max_len(self, mock_sha1):
- self.assertTrue(len(utils.get_interface_name(LONG_NAME1)) == 15)
- self.assertTrue(len(utils.get_interface_name(LONG_NAME1, max_len=10))
- == 10)
from neutron.common import constants
from neutron.common import exceptions
from neutron.plugins.common import constants as p_const
-from neutron.plugins.common import utils as p_utils
from neutron.plugins.ml2.drivers.linuxbridge.agent.common \
import constants as lconst
from neutron.plugins.ml2.drivers.linuxbridge.agent \
self.assertEqual(self.lbm.get_bridge_name(nw_id),
"brq")
- def test_get_vlan_device_name(self):
- with mock.patch.object(p_utils, "get_interface_name",
- return_value="eth0.1") as mock_get_vlan:
- self.assertEqual(self.lbm.get_vlan_device_name("eth0", 1),
- "eth0.1")
- mock_get_vlan.assert_called_with("eth0", postfix=".1", max_len=12)
-
- with mock.patch.object(p_utils, "get_interface_name",
- return_value="eth0.1111") as mock_get_vlan:
- self.assertEqual(self.lbm.get_vlan_device_name("eth0", 1111),
- "eth0.1111")
- mock_get_vlan.assert_called_with("eth0", postfix=".1111",
- max_len=15)
- invalid_vlan = 11111
- self.assertRaises(ValueError, self.lbm.get_vlan_device_name, "eth0",
- invalid_vlan)
- self.assertRaises(ValueError, self.lbm.get_vlan_device_name, "eth0", 0)
- self.assertRaises(ValueError, self.lbm.get_vlan_device_name, "eth0",
- None)
+ def test_get_subinterface_name(self):
+ self.assertEqual(self.lbm.get_subinterface_name("eth0", "0"),
+ "eth0.0")
+ self.assertEqual(self.lbm.get_subinterface_name("eth0", ""),
+ "eth0.")
def test_get_tap_device_name(self):
if_id = "123456789101112"
self.assertEqual(self.agent.phys_ofports["physnet1"],
"phy_ofport")
+ def test_get_peer_name(self):
+ bridge1 = "A_REALLY_LONG_BRIDGE_NAME1"
+ bridge2 = "A_REALLY_LONG_BRIDGE_NAME2"
+ self.agent.use_veth_interconnection = True
+ self.assertEqual(len(self.agent.get_peer_name('int-', bridge1)),
+ n_const.DEVICE_NAME_MAX_LEN)
+ self.assertEqual(len(self.agent.get_peer_name('int-', bridge2)),
+ n_const.DEVICE_NAME_MAX_LEN)
+ self.assertNotEqual(self.agent.get_peer_name('int-', bridge1),
+ self.agent.get_peer_name('int-', bridge2))
+
def test_setup_tunnel_br(self):
self.tun_br = mock.Mock()
with mock.patch.object(self.agent.int_br,