from neutron.agent.ovsdb import api as ovsdb
from neutron.common import exceptions
from neutron.i18n import _LE, _LI, _LW
-from neutron.plugins.common import constants
+from neutron.plugins.common import constants as p_const
# Default timeout for ovs-vsctl command
DEFAULT_OVS_VSCTL_TIMEOUT = 10
return DeferredOVSBridge(self, **kwargs)
def add_tunnel_port(self, port_name, remote_ip, local_ip,
- tunnel_type=constants.TYPE_GRE,
- vxlan_udp_port=constants.VXLAN_UDP_PORT,
+ tunnel_type=p_const.TYPE_GRE,
+ vxlan_udp_port=p_const.VXLAN_UDP_PORT,
dont_fragment=True):
attrs = [('type', tunnel_type)]
# TODO(twilson) This is an OrderedDict solely to make a test happy
options = collections.OrderedDict()
vxlan_uses_custom_udp_port = (
- tunnel_type == constants.TYPE_VXLAN and
- vxlan_udp_port != constants.VXLAN_UDP_PORT
+ tunnel_type == p_const.TYPE_VXLAN and
+ vxlan_udp_port != p_const.VXLAN_UDP_PORT
)
if vxlan_uses_custom_udp_port:
options['dst_port'] = vxlan_udp_port
DHCP_RESPONSE_PORT = 68
-MIN_VLAN_TAG = 1
-MAX_VLAN_TAG = 4094
-
-# For GRE Tunnel
-MIN_GRE_ID = 1
-MAX_GRE_ID = 2 ** 32 - 1
-
-# For VXLAN Tunnel
-MIN_VXLAN_VNI = 1
-MAX_VXLAN_VNI = 2 ** 24 - 1
-
FLOODING_ENTRY = ('00:00:00:00:00:00', '0.0.0.0')
AGENT_TYPE_DHCP = 'DHCP agent'
from neutron.common import constants as q_const
-
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
LOG = logging.getLogger(__name__)
SYNCHRONIZED_PREFIX = 'neutron-'
cfg.CONF.log_opt_values(log, std_logging.DEBUG)
-def is_valid_vlan_tag(vlan):
- return q_const.MIN_VLAN_TAG <= vlan <= q_const.MAX_VLAN_TAG
-
-
-def is_valid_gre_id(gre_id):
- return q_const.MIN_GRE_ID <= gre_id <= q_const.MAX_GRE_ID
-
-
-def is_valid_vxlan_vni(vni):
- return q_const.MIN_VXLAN_VNI <= vni <= q_const.MAX_VXLAN_VNI
-
-
def get_random_mac(base_mac):
mac = [int(base_mac[0], 16), int(base_mac[1], 16),
int(base_mac[2], 16), random.randint(0x00, 0xff),
from neutron.extensions import portbindings
from neutron.openstack.common import uuidutils
from neutron.plugins.common import constants as p_const
-from neutron.plugins.ml2.drivers import type_vxlan
# Migration targets
""")
elif tunnel_type == p_const.TYPE_VXLAN:
if not vxlan_udp_port:
- vxlan_udp_port = type_vxlan.VXLAN_UDP_PORT
+ vxlan_udp_port = p_const.VXLAN_UDP_PORT
engine.execute("""
INSERT INTO ml2_vxlan_allocations
SELECT tunnel_id as vxlan_vni, allocated
"""A Vlan Bitmap class to handle allocation/de-allocation of vlan ids."""
from six import moves
-from neutron.common import constants
from neutron.plugins.brocade.db import models as brocade_db
+from neutron.plugins.common import constants as p_const
-
-MIN_VLAN = constants.MIN_VLAN_TAG + 1
-MAX_VLAN = constants.MAX_VLAN_TAG
+MIN_VLAN = p_const.MIN_VLAN_TAG + 1
+MAX_VLAN = p_const.MAX_VLAN_TAG
class VlanBitmap(object):
from sqlalchemy import sql
from neutron.api.v2 import attributes
-from neutron.common import constants
from neutron.common import exceptions as n_exc
import neutron.db.api as db
from neutron.db import models_v2
from neutron.plugins.cisco.common import cisco_exceptions as c_exc
from neutron.plugins.cisco.common import config as c_conf
from neutron.plugins.cisco.db import n1kv_models_v2
+from neutron.plugins.common import constants as p_const
+
LOG = logging.getLogger(__name__)
seg_min, seg_max = self._get_segment_range(net_p['segment_range'])
if segment_type == c_const.NETWORK_TYPE_VLAN:
if not ((seg_min <= seg_max) and
- ((seg_min in range(constants.MIN_VLAN_TAG,
+ ((seg_min in range(p_const.MIN_VLAN_TAG,
c_const.N1KV_VLAN_RESERVED_MIN) and
- seg_max in range(constants.MIN_VLAN_TAG,
+ seg_max in range(p_const.MIN_VLAN_TAG,
c_const.N1KV_VLAN_RESERVED_MIN)) or
(seg_min in range(c_const.N1KV_VLAN_RESERVED_MAX + 1,
- constants.MAX_VLAN_TAG) and
+ p_const.MAX_VLAN_TAG) and
seg_max in range(c_const.N1KV_VLAN_RESERVED_MAX + 1,
- constants.MAX_VLAN_TAG)))):
+ p_const.MAX_VLAN_TAG)))):
msg = (_("Segment range is invalid, select from "
"%(min)s-%(nmin)s, %(nmax)s-%(max)s") %
- {"min": constants.MIN_VLAN_TAG,
+ {"min": p_const.MIN_VLAN_TAG,
"nmin": c_const.N1KV_VLAN_RESERVED_MIN - 1,
"nmax": c_const.N1KV_VLAN_RESERVED_MAX + 1,
- "max": constants.MAX_VLAN_TAG - 1})
+ "max": p_const.MAX_VLAN_TAG - 1})
LOG.error(msg)
raise n_exc.InvalidInput(error_message=msg)
profiles = _get_network_profiles(
from neutron.common import exceptions as n_exc
from neutron.common import rpc as n_rpc
from neutron.common import topics
-from neutron.common import utils
from neutron.db import agents_db
from neutron.db import agentschedulers_db
from neutron.db import db_base_plugin_v2
from neutron.plugins.cisco.extensions import n1kv
from neutron.plugins.cisco.n1kv import n1kv_client
from neutron.plugins.common import constants as svc_constants
+from neutron.plugins.common import utils
LOG = logging.getLogger(__name__)
# License for the specific language governing permissions and limitations
# under the License.
-# service type constants:
+# Service type constants:
CORE = "CORE"
DUMMY = "DUMMY"
LOADBALANCER = "LOADBALANCER"
L3_ROUTER_NAT = "L3_ROUTER_NAT"
-#maps extension alias to service type
+# Maps extension alias to service type
EXT_TO_SERVICE_MAPPING = {
'dummy': DUMMY,
'lbaas': LOADBALANCER,
PENDING_UPDATE
)
-# FWaaS firewall rule action
-FWAAS_ALLOW = "allow"
-FWAAS_DENY = "deny"
-
-# L3 Protocol name constants
-TCP = "tcp"
-UDP = "udp"
-ICMP = "icmp"
-
# Network Type constants
TYPE_FLAT = 'flat'
TYPE_GRE = 'gre'
TYPE_NONE = 'none'
# Values for network_type
+
+# For VLAN Network
+MIN_VLAN_TAG = 1
+MAX_VLAN_TAG = 4094
+
+# For GRE Tunnel
+MIN_GRE_ID = 1
+MAX_GRE_ID = 2 ** 32 - 1
+
+# For VXLAN Tunnel
+MIN_VXLAN_VNI = 1
+MAX_VXLAN_VNI = 2 ** 24 - 1
VXLAN_UDP_PORT = 4789
# Network Type MTU overhead
"""
from neutron.common import exceptions as n_exc
-from neutron.common import utils
from neutron.plugins.common import constants as p_const
+def is_valid_vlan_tag(vlan):
+ return p_const.MIN_VLAN_TAG <= vlan <= p_const.MAX_VLAN_TAG
+
+
+def is_valid_gre_id(gre_id):
+ return p_const.MIN_GRE_ID <= gre_id <= p_const.MAX_GRE_ID
+
+
+def is_valid_vxlan_vni(vni):
+ return p_const.MIN_VXLAN_VNI <= vni <= p_const.MAX_VXLAN_VNI
+
+
def verify_tunnel_range(tunnel_range, tunnel_type):
"""Raise an exception for invalid tunnel range or malformed range."""
- mappings = {p_const.TYPE_GRE: utils.is_valid_gre_id,
- p_const.TYPE_VXLAN: utils.is_valid_vxlan_vni}
+ mappings = {p_const.TYPE_GRE: is_valid_gre_id,
+ p_const.TYPE_VXLAN: is_valid_vxlan_vni}
if tunnel_type in mappings:
for ident in tunnel_range:
if not mappings[tunnel_type](ident):
def verify_vlan_range(vlan_range):
"""Raise an exception for invalid tags or malformed range."""
for vlan_tag in vlan_range:
- if not utils.is_valid_vlan_tag(vlan_tag):
+ if not is_valid_vlan_tag(vlan_tag):
raise n_exc.NetworkVlanRangeError(
vlan_range=vlan_range,
error=_("%s is not a valid VLAN tag") % vlan_tag)
return tap_device_name
def get_vxlan_device_name(self, segmentation_id):
- if 0 <= int(segmentation_id) <= constants.MAX_VXLAN_VNI:
+ if 0 <= int(segmentation_id) <= p_const.MAX_VXLAN_VNI:
return VXLAN_INTERFACE_PREFIX + str(segmentation_id)
else:
LOG.warning(_LW("Invalid Segmentation ID: %s, will lead to "
return False
test_iface = None
- for seg_id in moves.xrange(1, constants.MAX_VXLAN_VNI + 1):
+ for seg_id in moves.xrange(1, p_const.MAX_VXLAN_VNI + 1):
if not ip_lib.device_exists(
self.get_vxlan_device_name(seg_id)):
test_iface = self.ensure_vxlan(seg_id)
from six import moves
import sqlalchemy as sa
-from neutron.common import constants as q_const
from neutron.common import exceptions as exc
-from neutron.common import utils
from neutron.db import api as db_api
from neutron.db import model_base
from neutron.i18n import _LE, _LI, _LW
" for VLAN provider network") % physical_network)
raise exc.InvalidInput(error_message=msg)
if segmentation_id:
- if not utils.is_valid_vlan_tag(segmentation_id):
+ if not plugin_utils.is_valid_vlan_tag(segmentation_id):
msg = (_("segmentation_id out of range (%(min)s through "
"%(max)s)") %
- {'min': q_const.MIN_VLAN_TAG,
- 'max': q_const.MAX_VLAN_TAG})
+ {'min': p_const.MIN_VLAN_TAG,
+ 'max': p_const.MAX_VLAN_TAG})
raise exc.InvalidInput(error_message=msg)
elif segmentation_id:
msg = _("segmentation_id requires physical_network for VLAN "
LOG = log.getLogger(__name__)
-VXLAN_UDP_PORT = 4789
-MAX_VXLAN_VNI = 16777215
-
vxlan_opts = [
cfg.ListOpt('vni_ranges',
default=[],
# determine current configured allocatable vnis
vxlan_vnis = set()
for tun_min, tun_max in self.tunnel_ranges:
- if tun_max + 1 - tun_min > MAX_VXLAN_VNI:
+ if tun_max + 1 - tun_min > p_const.MAX_VXLAN_VNI:
LOG.error(_LE("Skipping unreasonable VXLAN VNI range "
"%(tun_min)s:%(tun_max)s"),
{'tun_min': tun_min, 'tun_max': tun_max})
return (session.query(VxlanEndpoints).
filter_by(ip_address=ip).first())
- def add_endpoint(self, ip, host, udp_port=VXLAN_UDP_PORT):
+ def add_endpoint(self, ip, host, udp_port=p_const.VXLAN_UDP_PORT):
LOG.debug("add_vxlan_endpoint() called for ip %s", ip)
session = db_api.get_session()
try:
cfg.CONF.import_group('AGENT', 'neutron.plugins.openvswitch.common.config')
# A placeholder for dead vlans.
-DEAD_VLAN_TAG = q_const.MAX_VLAN_TAG + 1
+DEAD_VLAN_TAG = p_const.MAX_VLAN_TAG + 1
class DeviceListRetrievalError(exceptions.NeutronException):
super(OVSNeutronAgent, self).__init__()
self.use_veth_interconnection = use_veth_interconnection
self.veth_mtu = veth_mtu
- self.available_local_vlans = set(moves.xrange(q_const.MIN_VLAN_TAG,
- q_const.MAX_VLAN_TAG))
+ self.available_local_vlans = set(moves.xrange(p_const.MIN_VLAN_TAG,
+ p_const.MAX_VLAN_TAG))
self.use_call = True
self.tunnel_types = tunnel_types or []
self.l2_pop = l2_population
class TestGreTunnelRangeVerifyValid(TestParseTunnelRangesMixin,
base.BaseTestCase):
- TUN_MIN = constants.MIN_GRE_ID
- TUN_MAX = constants.MAX_GRE_ID
+ TUN_MIN = p_const.MIN_GRE_ID
+ TUN_MAX = p_const.MAX_GRE_ID
TYPE = p_const.TYPE_GRE
class TestVxlanTunnelRangeVerifyValid(TestParseTunnelRangesMixin,
base.BaseTestCase):
- TUN_MIN = constants.MIN_VXLAN_VNI
- TUN_MAX = constants.MAX_VXLAN_VNI
+ TUN_MIN = p_const.MIN_VXLAN_VNI
+ TUN_MAX = p_const.MAX_VXLAN_VNI
TYPE = p_const.TYPE_VXLAN
constants.TAP_DEVICE_PREFIX)
def test_get_vxlan_device_name(self):
- vn_id = constants.MAX_VXLAN_VNI
+ vn_id = p_const.MAX_VXLAN_VNI
self.assertEqual(self.lbm.get_vxlan_device_name(vn_id),
"vxlan-" + str(vn_id))
self.assertIsNone(self.lbm.get_vxlan_device_name(vn_id + 1))