class GatewayConflictWithAllocationPools(InUse):
message = _("Gateway ip %(ip_address)s conflicts with "
"allocation pool %(pool)s")
+
+
+class NetworkVlanRangeError(QuantumException):
+ message = _("Invalid network VLAN range: '%(range)s' - '%(error)s'")
--- /dev/null
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Cisco Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+Common utilities and helper functions for Openstack Networking Plugins.
+"""
+
+from quantum.common import exceptions as q_exc
+
+
+def parse_network_vlan_range(network_vlan_range):
+ """Interpret a string as network[:vlan_begin:vlan_end]."""
+ entry = network_vlan_range.strip()
+ if ':' in entry:
+ try:
+ network, vlan_min, vlan_max = entry.split(':')
+ vlan_min, vlan_max = int(vlan_min), int(vlan_max)
+ except ValueError as ex:
+ raise q_exc.NetworkVlanRangeError(range=entry, error=ex)
+ return network, (vlan_min, vlan_max)
+ else:
+ return entry, None
+
+
+def parse_network_vlan_ranges(network_vlan_ranges_cfg_entries):
+ """Interpret a list of strings as network[:vlan_begin:vlan_end] entries."""
+ networks = {}
+ for entry in network_vlan_ranges_cfg_entries:
+ network, vlan_range = parse_network_vlan_range(entry)
+ if vlan_range:
+ networks.setdefault(network, []).append(vlan_range)
+ else:
+ networks.setdefault(network, [])
+ return networks
from quantum.extensions import providernet as provider
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
+from quantum.plugins.common import utils as plugin_utils
from quantum.plugins.hyperv import agent_notifier_api
from quantum.plugins.hyperv.common import constants
from quantum.plugins.hyperv import db as hyperv_db
return policy.check(context, action, resource)
def _parse_network_vlan_ranges(self):
- self._network_vlan_ranges = {}
- for entry in cfg.CONF.HYPERV.network_vlan_ranges:
- entry = entry.strip()
- if ':' in entry:
- try:
- physical_network, vlan_min, vlan_max = entry.split(':')
- self._add_network_vlan_range(physical_network.strip(),
- int(vlan_min),
- int(vlan_max))
- except ValueError as ex:
- msg = _(
- "Invalid network VLAN range: "
- "'%(range)s' - %(e)s. Agent terminated!"), \
- {'range': entry, 'e': ex}
- raise q_exc.InvalidInput(error_message=msg)
- else:
- self._add_network(entry)
+ self._network_vlan_ranges = plugin_utils.parse_network_vlan_ranges(
+ cfg.CONF.HYPERV.network_vlan_ranges)
LOG.info(_("Network VLAN ranges: %s"), self._network_vlan_ranges)
- def _add_network_vlan_range(self, physical_network, vlan_min, vlan_max):
- self._add_network(physical_network)
- self._network_vlan_ranges[physical_network].append(
- (vlan_min, vlan_max))
-
- def _add_network(self, physical_network):
- if physical_network not in self._network_vlan_ranges:
- self._network_vlan_ranges[physical_network] = []
-
def _check_vlan_id_in_range(self, physical_network, vlan_id):
for r in self._network_vlan_ranges[physical_network]:
if vlan_id >= r[0] and vlan_id <= r[1]:
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import proxy
+from quantum.plugins.common import utils as plugin_utils
from quantum.plugins.linuxbridge.common import constants
from quantum.plugins.linuxbridge.db import l2network_db_v2 as db
from quantum import policy
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotify
def _parse_network_vlan_ranges(self):
- self.network_vlan_ranges = {}
- for entry in cfg.CONF.VLANS.network_vlan_ranges:
- if ':' in entry:
- try:
- physical_network, vlan_min, vlan_max = entry.split(':')
- self._add_network_vlan_range(physical_network,
- int(vlan_min),
- int(vlan_max))
- except ValueError as ex:
- LOG.error(_("Invalid network VLAN range: "
- "'%(entry)s' - %(ex)s. "
- "Service terminated!"),
- {'entry': entry, 'ex': ex})
- sys.exit(1)
- else:
- self._add_network(entry)
- LOG.debug(_("Network VLAN ranges: %s"), self.network_vlan_ranges)
+ try:
+ self.network_vlan_ranges = plugin_utils.parse_network_vlan_ranges(
+ cfg.CONF.VLANS.network_vlan_ranges)
+ except Exception as ex:
+ LOG.error(_("%s. Agent terminated!"), ex)
+ sys.exit(1)
+ LOG.info(_("Network VLAN ranges: %s"), self.network_vlan_ranges)
def _check_view_auth(self, context, resource, action):
return policy.check(context, action, resource)
- def _add_network_vlan_range(self, physical_network, vlan_min, vlan_max):
- self._add_network(physical_network)
- self.network_vlan_ranges[physical_network].append((vlan_min, vlan_max))
-
- def _add_network(self, physical_network):
- if physical_network not in self.network_vlan_ranges:
- self.network_vlan_ranges[physical_network] = []
-
# REVISIT(rkukura) Use core mechanism for attribute authorization
# when available.
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
from quantum.openstack.common.rpc import proxy
+from quantum.plugins.common import utils as plugin_utils
from quantum.plugins.openvswitch.common import config # noqa
from quantum.plugins.openvswitch.common import constants
from quantum.plugins.openvswitch import ovs_db_v2
self.conn.consume_in_thread()
def _parse_network_vlan_ranges(self):
- self.network_vlan_ranges = {}
- for entry in cfg.CONF.OVS.network_vlan_ranges:
- entry = entry.strip()
- if ':' in entry:
- try:
- physical_network, vlan_min, vlan_max = entry.split(':')
- self._add_network_vlan_range(physical_network.strip(),
- int(vlan_min),
- int(vlan_max))
- except ValueError as ex:
- LOG.error(_("Invalid network VLAN range: "
- "'%(range)s' - %(e)s. Agent terminated!"),
- {'range': entry, 'e': ex})
- sys.exit(1)
- else:
- self._add_network(entry)
+ try:
+ self.network_vlan_ranges = plugin_utils.parse_network_vlan_ranges(
+ cfg.CONF.OVS.network_vlan_ranges)
+ except Exception as ex:
+ LOG.error(_("%s. Agent terminated!"), ex)
+ sys.exit(1)
LOG.info(_("Network VLAN ranges: %s"), self.network_vlan_ranges)
- def _add_network_vlan_range(self, physical_network, vlan_min, vlan_max):
- self._add_network(physical_network)
- self.network_vlan_ranges[physical_network].append((vlan_min, vlan_max))
-
- def _add_network(self, physical_network):
- if physical_network not in self.network_vlan_ranges:
- self.network_vlan_ranges[physical_network] = []
-
def _parse_tunnel_id_ranges(self):
for entry in cfg.CONF.OVS.tunnel_id_ranges:
entry = entry.strip()
import testtools
+from quantum.common import exceptions as q_exc
from quantum.common import utils
+from quantum.plugins.common import utils as plugin_utils
from quantum.tests import base
def test_parse_mappings_succeeds_for_no_mappings(self):
self.assertEqual(self.parse(['']), {})
+
+
+class UtilTestParseVlanRanges(base.BaseTestCase):
+ _err_prefix = "Invalid network VLAN range: '"
+ _err_too_few = "' - 'need more than 2 values to unpack'"
+ _err_too_many = "' - 'too many values to unpack'"
+ _err_not_int = "' - 'invalid literal for int() with base 10: '%s''"
+
+ def _range_too_few_err(self, nv_range):
+ return self._err_prefix + nv_range + self._err_too_few
+
+ def _range_too_many_err(self, nv_range):
+ return self._err_prefix + nv_range + self._err_too_many
+
+ def _vlan_not_int_err(self, nv_range, vlan):
+ return self._err_prefix + nv_range + (self._err_not_int % vlan)
+
+
+class TestParseOneVlanRange(UtilTestParseVlanRanges):
+ def parse_one(self, cfg_entry):
+ return plugin_utils.parse_network_vlan_range(cfg_entry)
+
+ def test_parse_one_net_no_vlan_range(self):
+ config_str = "net1"
+ expected_networks = ("net1", None)
+ self.assertEqual(self.parse_one(config_str), expected_networks)
+
+ def test_parse_one_net_and_vlan_range(self):
+ config_str = "net1:100:199"
+ expected_networks = ("net1", (100, 199))
+ self.assertEqual(self.parse_one(config_str), expected_networks)
+
+ def test_parse_one_net_incomplete_range(self):
+ config_str = "net1:100"
+ expected_msg = self._range_too_few_err(config_str)
+ err = self.assertRaises(q_exc.NetworkVlanRangeError,
+ self.parse_one, config_str)
+ self.assertEqual(str(err), expected_msg)
+
+ def test_parse_one_net_range_too_many(self):
+ config_str = "net1:100:150:200"
+ expected_msg = self._range_too_many_err(config_str)
+ err = self.assertRaises(q_exc.NetworkVlanRangeError,
+ self.parse_one, config_str)
+ self.assertEqual(str(err), expected_msg)
+
+ def test_parse_one_net_vlan1_not_int(self):
+ config_str = "net1:foo:199"
+ expected_msg = self._vlan_not_int_err(config_str, 'foo')
+ err = self.assertRaises(q_exc.NetworkVlanRangeError,
+ self.parse_one, config_str)
+ self.assertEqual(str(err), expected_msg)
+
+ def test_parse_one_net_vlan2_not_int(self):
+ config_str = "net1:100:bar"
+ expected_msg = self._vlan_not_int_err(config_str, 'bar')
+ err = self.assertRaises(q_exc.NetworkVlanRangeError,
+ self.parse_one, config_str)
+ self.assertEqual(str(err), expected_msg)
+
+
+class TestParseVlanRangeList(UtilTestParseVlanRanges):
+ def parse_list(self, cfg_entries):
+ return plugin_utils.parse_network_vlan_ranges(cfg_entries)
+
+ def test_parse_list_one_net_no_vlan_range(self):
+ config_list = ["net1"]
+ expected_networks = {"net1": []}
+ self.assertEqual(self.parse_list(config_list), expected_networks)
+
+ def test_parse_list_one_net_vlan_range(self):
+ config_list = ["net1:100:199"]
+ expected_networks = {"net1": [(100, 199)]}
+ self.assertEqual(self.parse_list(config_list), expected_networks)
+
+ def test_parse_two_nets_no_vlan_range(self):
+ config_list = ["net1",
+ "net2"]
+ expected_networks = {"net1": [],
+ "net2": []}
+ self.assertEqual(self.parse_list(config_list), expected_networks)
+
+ def test_parse_two_nets_range_and_no_range(self):
+ config_list = ["net1:100:199",
+ "net2"]
+ expected_networks = {"net1": [(100, 199)],
+ "net2": []}
+ self.assertEqual(self.parse_list(config_list), expected_networks)
+
+ def test_parse_two_nets_no_range_and_range(self):
+ config_list = ["net1",
+ "net2:200:299"]
+ expected_networks = {"net1": [],
+ "net2": [(200, 299)]}
+ self.assertEqual(self.parse_list(config_list), expected_networks)
+
+ def test_parse_two_nets_bad_vlan_range1(self):
+ config_list = ["net1:100",
+ "net2:200:299"]
+ expected_msg = self._range_too_few_err(config_list[0])
+ err = self.assertRaises(q_exc.NetworkVlanRangeError,
+ self.parse_list, config_list)
+ self.assertEqual(str(err), expected_msg)
+
+ def test_parse_two_nets_vlan_not_int2(self):
+ config_list = ["net1:100:199",
+ "net2:200:0x200"]
+ expected_msg = self._vlan_not_int_err(config_list[1], '0x200')
+ err = self.assertRaises(q_exc.NetworkVlanRangeError,
+ self.parse_list, config_list)
+ self.assertEqual(str(err), expected_msg)
+
+ def test_parse_two_nets_and_append_1_2(self):
+ config_list = ["net1:100:199",
+ "net1:1000:1099",
+ "net2:200:299"]
+ expected_networks = {"net1": [(100, 199),
+ (1000, 1099)],
+ "net2": [(200, 299)]}
+ self.assertEqual(self.parse_list(config_list), expected_networks)
+
+ def test_parse_two_nets_and_append_1_3(self):
+ config_list = ["net1:100:199",
+ "net2:200:299",
+ "net1:1000:1099"]
+ expected_networks = {"net1": [(100, 199),
+ (1000, 1099)],
+ "net2": [(200, 299)]}
+ self.assertEqual(self.parse_list(config_list), expected_networks)