Check that the range beginning and end tags are valid values 1-4094.
Supply two global constants for min/max vlan tags and update all
local usage of these values to use the global constants.
Fixes: Bug #1169266
Change-Id: I054a8bebd16d95ea40414e3cecb6d24a970c730f
UDP_PROTOCOL = 17
DHCP_RESPONSE_PORT = 68
+MIN_VLAN_TAG = 1
+MAX_VLAN_TAG = 4094
+
EXT_NS = '_extension_ns'
XML_NS_V20 = 'http://openstack.org/quantum/api/v2.0'
XSI_NAMESPACE = "http://www.w3.org/2001/XMLSchema-instance"
class NetworkVlanRangeError(QuantumException):
- message = _("Invalid network VLAN range: '%(range)s' - '%(error)s'")
+ message = _("Invalid network VLAN range: '%(vlan_range)s' - '%(error)s'")
+
+ def __init__(self, **kwargs):
+ # Convert vlan_range tuple to 'start:end' format for display
+ if isinstance(kwargs['vlan_range'], tuple):
+ kwargs['vlan_range'] = "%d:%d" % kwargs['vlan_range']
+ super(NetworkVlanRangeError, self).__init__(**kwargs)
from eventlet.green import subprocess
from oslo.config import cfg
+from quantum.common import constants as q_const
from quantum.openstack.common import log as logging
def log_opt_values(log):
cfg.CONF.log_opt_values(log, std_logging.DEBUG)
+
+
+def is_valid_vlan_tag(vlan):
+ return q_const.MIN_VLAN_TAG <= vlan <= q_const.MAX_VLAN_TAG
"""A Vlan Bitmap class to handle allocation/de-allocation of vlan ids."""
+
+from quantum.common import constants
from quantum.plugins.brocade.db import models as brocade_db
-MIN_VLAN = 2
-MAX_VLAN = 4094
+MIN_VLAN = constants.MIN_VLAN_TAG + 1
+MAX_VLAN = constants.MAX_VLAN_TAG
class VlanBitmap(object):
"""
from quantum.common import exceptions as q_exc
+from quantum.common import utils
+
+
+def verify_vlan_range(vlan_range):
+ """Raise an exception for invalid tags or malformed range."""
+ for vlan_tag in vlan_range:
+ if not utils.is_valid_vlan_tag(vlan_tag):
+ raise q_exc.NetworkVlanRangeError(
+ vlan_range=vlan_range,
+ error=_("%s is not a valid VLAN tag") % vlan_tag)
+ if vlan_range[1] < vlan_range[0]:
+ raise q_exc.NetworkVlanRangeError(
+ vlan_range=vlan_range,
+ error=_("End of VLAN range is less than start of VLAN range"))
def parse_network_vlan_range(network_vlan_range):
if ':' in entry:
try:
network, vlan_min, vlan_max = entry.split(':')
- vlan_min, vlan_max = int(vlan_min), int(vlan_max)
+ vlan_range = (int(vlan_min), int(vlan_max))
except ValueError as ex:
- raise q_exc.NetworkVlanRangeError(range=entry, error=ex)
- return network, (vlan_min, vlan_max)
+ raise q_exc.NetworkVlanRangeError(vlan_range=entry, error=ex)
+ verify_vlan_range(vlan_range)
+ return network, vlan_range
else:
return entry, None
# Special vlan_id value in ovs_vlan_allocations table indicating flat network
FLAT_VLAN_ID = -1
-VLAN_ID_MIN = 1
-VLAN_ID_MAX = 4096
# Values for network_type
TYPE_LOCAL = 'local'
from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc
from quantum.common import topics
+from quantum.common import utils
from quantum.db import agents_db
from quantum.db import agentschedulers_db
from quantum.db import api as db_api
if not segmentation_id_set:
msg = _("provider:segmentation_id required")
raise q_exc.InvalidInput(error_message=msg)
- if segmentation_id < 1 or segmentation_id > 4094:
- msg = _("provider:segmentation_id out of range "
- "(1 through 4094)")
+ if not utils.is_valid_vlan_tag(segmentation_id):
+ msg = (_("provider:segmentation_id out of range "
+ "(%(min_id)s through %(max_id)s)") %
+ {'min_id': q_const.MIN_VLAN_TAG,
+ 'max_id': q_const.MAX_VLAN_TAG})
raise q_exc.InvalidInput(error_message=msg)
elif network_type == constants.TYPE_LOCAL:
if physical_network_set:
LOCAL_VLAN_ID = -2
FLAT_VLAN_ID = -1
-VLAN_ID_MIN = 1
-VLAN_ID_MAX = 4096
# Values for network_type
TYPE_LOCAL = 'local'
from quantum.agent import securitygroups_rpc as sg_rpc
from quantum.api.v2 import attributes
+from quantum.common import constants as q_const
from quantum.common import exceptions as q_exc
from quantum.common import topics
+from quantum.common import utils
from quantum.db import agents_db
from quantum.db import db_base_plugin_v2
from quantum.db import l3_db
if not segmentation_id_set:
msg = _("provider:segmentation_id required")
raise q_exc.InvalidInput(error_message=msg)
- if segmentation_id < 1 or segmentation_id > 4094:
- msg = _("provider:segmentation_id out of range "
- "(1 through 4094)")
+ if not utils.is_valid_vlan_tag(segmentation_id):
+ msg = (_("provider:segmentation_id out of range "
+ "(%(min_id)s through %(max_id)s)") %
+ {'min_id': q_const.MIN_VLAN_TAG,
+ 'max_id': q_const.MAX_VLAN_TAG})
raise q_exc.InvalidInput(error_message=msg)
def _process_local_net(self, physical_network_set, segmentation_id_set):
from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc
from quantum.common import topics
+from quantum.common import utils
from quantum import context as q_context
from quantum.db import agents_db
from quantum.db import agentschedulers_db
err_msg = _("Segmentation ID must be specified with "
"vlan network type")
elif (segmentation_id_set and
- (segmentation_id < 1 or segmentation_id > 4094)):
- err_msg = _("%s out of range (1 to 4094)") % segmentation_id
+ not utils.is_valid_vlan_tag(segmentation_id)):
+ err_msg = (_("%(segmentation_id)s out of range "
+ "(%(min_id)s through %(max_id)s)") %
+ {'segmentation_id': segmentation_id,
+ 'min_id': constants.MIN_VLAN_TAG,
+ 'max_id': constants.MAX_VLAN_TAG})
else:
# Verify segment is not already allocated
binding = nicira_db.get_network_binding_by_vlanid(
physical_network=physical_network)
elif network_type == NetworkTypes.L3_EXT:
if (segmentation_id_set and
- (segmentation_id < 1 or segmentation_id > 4094)):
- err_msg = _("%s out of range (1 to 4094)") % segmentation_id
+ not utils.is_valid_vlan_tag(segmentation_id)):
+ err_msg = (_("%(segmentation_id)s out of range "
+ "(%(min_id)s through %(max_id)s)") %
+ {'segmentation_id': segmentation_id,
+ 'min_id': constants.MIN_VLAN_TAG,
+ 'max_id': constants.MAX_VLAN_TAG})
else:
err_msg = _("%(net_type_param)s %(net_type_value)s not "
"supported") % {'net_type_param': pnet.NETWORK_TYPE,
LOG = logging.getLogger(__name__)
# A placeholder for dead vlans.
-DEAD_VLAN_TAG = "4095"
+DEAD_VLAN_TAG = str(q_const.MAX_VLAN_TAG + 1)
# A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac'
modifying, or stripping VLAN tags as necessary.
'''
- # Lower bound on available vlans.
- MIN_VLAN_TAG = 1
-
- # Upper bound on available vlans.
- MAX_VLAN_TAG = 4094
-
# history
# 1.0 Initial version
# 1.1 Support Security Group RPC
:param enable_tunneling: if True enable GRE networks.
'''
self.root_helper = root_helper
- self.available_local_vlans = set(
- xrange(OVSQuantumAgent.MIN_VLAN_TAG,
- OVSQuantumAgent.MAX_VLAN_TAG))
+ self.available_local_vlans = set(xrange(q_const.MIN_VLAN_TAG,
+ q_const.MAX_VLAN_TAG))
self.int_br = self.setup_integration_br(integ_br)
self.setup_physical_bridges(bridge_mappings)
self.local_vlan_map = {}
from quantum.common import exceptions as q_exc
from quantum.common import rpc as q_rpc
from quantum.common import topics
+from quantum.common import utils
from quantum.db import agents_db
from quantum.db import agentschedulers_db
from quantum.db import db_base_plugin_v2
if not segmentation_id_set:
msg = _("provider:segmentation_id required")
raise q_exc.InvalidInput(error_message=msg)
- if segmentation_id < 1 or segmentation_id > 4094:
- msg = _("provider:segmentation_id out of range "
- "(1 through 4094)")
+ if not utils.is_valid_vlan_tag(segmentation_id):
+ msg = (_("provider:segmentation_id out of range "
+ "(%(min_id)s through %(max_id)s)") %
+ {'min_id': q_const.MIN_VLAN_TAG,
+ 'max_id': q_const.MAX_VLAN_TAG})
raise q_exc.InvalidInput(error_message=msg)
elif network_type == constants.TYPE_GRE:
if not self.enable_tunneling:
import random
+from quantum.common import constants as q_const
from quantum.db import api as db_api
from quantum.openstack.common import uuidutils
from quantum.plugins.nec.common import exceptions as nexc
port_id = uuidutils.generate_uuid()
datapath_id = hex(random.randint(0, 0xffffffff))
port_no = random.randint(1, 100)
- vlan_id = random.randint(0, 4095)
+ vlan_id = random.randint(q_const.MIN_VLAN_TAG, q_const.MAX_VLAN_TAG)
mac = ':'.join(["%02x" % random.randint(0, 0xff) for x in range(6)])
none = uuidutils.generate_uuid()
return port_id, datapath_id, port_no, vlan_id, mac, none
_err_too_few = "' - 'need more than 2 values to unpack'"
_err_too_many = "' - 'too many values to unpack'"
_err_not_int = "' - 'invalid literal for int() with base 10: '%s''"
+ _err_bad_vlan = "' - '%s is not a valid VLAN tag'"
+ _err_range = "' - 'End of VLAN range is less than start of VLAN range'"
def _range_too_few_err(self, nv_range):
return self._err_prefix + nv_range + self._err_too_few
def _vlan_not_int_err(self, nv_range, vlan):
return self._err_prefix + nv_range + (self._err_not_int % vlan)
+ def _nrange_invalid_vlan(self, nv_range, n):
+ vlan = nv_range.split(':')[n]
+ v_range = ':'.join(nv_range.split(':')[1:])
+ return self._err_prefix + v_range + (self._err_bad_vlan % vlan)
+
+ def _vrange_invalid_vlan(self, v_range_tuple, n):
+ vlan = v_range_tuple[n - 1]
+ v_range_str = '%d:%d' % v_range_tuple
+ return self._err_prefix + v_range_str + (self._err_bad_vlan % vlan)
+
+ def _vrange_invalid(self, v_range_tuple):
+ v_range_str = '%d:%d' % v_range_tuple
+ return self._err_prefix + v_range_str + self._err_range
+
+
+class TestVlanRangeVerifyValid(UtilTestParseVlanRanges):
+ def verify_range(self, vlan_range):
+ return plugin_utils.verify_vlan_range(vlan_range)
+
+ def test_range_valid_ranges(self):
+ self.assertEqual(self.verify_range((1, 2)), None)
+ self.assertEqual(self.verify_range((1, 1999)), None)
+ self.assertEqual(self.verify_range((100, 100)), None)
+ self.assertEqual(self.verify_range((100, 200)), None)
+ self.assertEqual(self.verify_range((4001, 4094)), None)
+ self.assertEqual(self.verify_range((1, 4094)), None)
+
+ def check_one_vlan_invalid(self, bad_range, which):
+ expected_msg = self._vrange_invalid_vlan(bad_range, which)
+ err = self.assertRaises(q_exc.NetworkVlanRangeError,
+ self.verify_range, bad_range)
+ self.assertEqual(str(err), expected_msg)
+
+ def test_range_first_vlan_invalid_negative(self):
+ self.check_one_vlan_invalid((-1, 199), 1)
+
+ def test_range_first_vlan_invalid_zero(self):
+ self.check_one_vlan_invalid((0, 199), 1)
+
+ def test_range_first_vlan_invalid_limit_plus_one(self):
+ self.check_one_vlan_invalid((4095, 199), 1)
+
+ def test_range_first_vlan_invalid_too_big(self):
+ self.check_one_vlan_invalid((9999, 199), 1)
+
+ def test_range_second_vlan_invalid_negative(self):
+ self.check_one_vlan_invalid((299, -1), 2)
+
+ def test_range_second_vlan_invalid_zero(self):
+ self.check_one_vlan_invalid((299, 0), 2)
+
+ def test_range_second_vlan_invalid_limit_plus_one(self):
+ self.check_one_vlan_invalid((299, 4095), 2)
+
+ def test_range_second_vlan_invalid_too_big(self):
+ self.check_one_vlan_invalid((299, 9999), 2)
+
+ def test_range_both_vlans_invalid_01(self):
+ self.check_one_vlan_invalid((-1, 0), 1)
+
+ def test_range_both_vlans_invalid_02(self):
+ self.check_one_vlan_invalid((0, 4095), 1)
+
+ def test_range_both_vlans_invalid_03(self):
+ self.check_one_vlan_invalid((4095, 9999), 1)
+
+ def test_range_both_vlans_invalid_04(self):
+ self.check_one_vlan_invalid((9999, -1), 1)
+
+ def test_range_reversed(self):
+ bad_range = (95, 10)
+ expected_msg = self._vrange_invalid(bad_range)
+ err = self.assertRaises(q_exc.NetworkVlanRangeError,
+ self.verify_range, bad_range)
+ self.assertEqual(str(err), expected_msg)
+
class TestParseOneVlanRange(UtilTestParseVlanRanges):
def parse_one(self, cfg_entry):
self.parse_one, config_str)
self.assertEqual(str(err), expected_msg)
+ def test_parse_one_net_and_max_range(self):
+ config_str = "net1:1:4094"
+ expected_networks = ("net1", (1, 4094))
+ self.assertEqual(self.parse_one(config_str), expected_networks)
+
+ def test_parse_one_net_range_bad_vlan1(self):
+ config_str = "net1:9000:150"
+ expected_msg = self._nrange_invalid_vlan(config_str, 1)
+ err = self.assertRaises(q_exc.NetworkVlanRangeError,
+ self.parse_one, config_str)
+ self.assertEqual(str(err), expected_msg)
+
+ def test_parse_one_net_range_bad_vlan2(self):
+ config_str = "net1:4000:4999"
+ expected_msg = self._nrange_invalid_vlan(config_str, 2)
+ err = self.assertRaises(q_exc.NetworkVlanRangeError,
+ self.parse_one, config_str)
+ self.assertEqual(str(err), expected_msg)
+
class TestParseVlanRangeList(UtilTestParseVlanRanges):
def parse_list(self, cfg_entries):