from neutron.agent.linux import utils
from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
+from neutron.plugins.common import constants as p_const
+# TODO(JLH) Should we remove the explicit include of the ovs plugin here
from neutron.plugins.openvswitch.common import constants
LOG = logging.getLogger(__name__)
self.deferred_flows = {'add': '', 'mod': '', 'del': ''}
def add_tunnel_port(self, port_name, remote_ip, local_ip,
- tunnel_type=constants.TYPE_GRE,
+ tunnel_type=p_const.TYPE_GRE,
vxlan_udp_port=constants.VXLAN_UDP_PORT):
vsctl_command = ["--", "--may-exist", "add-port", self.br_name,
port_name]
vsctl_command.extend(["--", "set", "Interface", port_name,
"type=%s" % tunnel_type])
- if tunnel_type == constants.TYPE_VXLAN:
+ if tunnel_type == p_const.TYPE_VXLAN:
# Only set the VXLAN UDP port if it's not the default
if vxlan_udp_port != constants.VXLAN_UDP_PORT:
vsctl_command.append("options:dst_port=%s" % vxlan_udp_port)
TCP = "tcp"
UDP = "udp"
ICMP = "icmp"
+
+# Network Type constants
+TYPE_FLAT = 'flat'
+TYPE_GRE = 'gre'
+TYPE_LOCAL = 'local'
+TYPE_VXLAN = 'vxlan'
+TYPE_VLAN = 'vlan'
+TYPE_NONE = 'none'
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
from neutron.openstack.common.rpc import dispatcher
+from neutron.plugins.common import constants as p_const
from neutron.plugins.hyperv.agent import utils
from neutron.plugins.hyperv.agent import utilsfactory
from neutron.plugins.hyperv.common import constants
return dispatcher.RpcDispatcher([self])
def _get_vswitch_name(self, network_type, physical_network):
- if network_type != constants.TYPE_LOCAL:
+ if network_type != p_const.TYPE_LOCAL:
vswitch_name = self._get_vswitch_for_physical_network(
physical_network)
else:
vswitch_name = self._get_vswitch_name(network_type, physical_network)
- if network_type in [constants.TYPE_VLAN, constants.TYPE_FLAT]:
+ if network_type in [p_const.TYPE_VLAN, p_const.TYPE_FLAT]:
#Nothing to do
pass
- elif network_type == constants.TYPE_LOCAL:
+ elif network_type == p_const.TYPE_LOCAL:
#TODO(alexpilotti): Check that the switch type is private
#or create it if not existing
pass
self._utils.connect_vnic_to_vswitch(map['vswitch_name'], port_id)
- if network_type == constants.TYPE_VLAN:
+ if network_type == p_const.TYPE_VLAN:
LOG.info(_('Binding VLAN ID %(segmentation_id)s '
'to switch port %(port_id)s'),
dict(segmentation_id=segmentation_id, port_id=port_id))
self._utils.set_vswitch_port_vlan_id(
segmentation_id,
port_id)
- elif network_type == constants.TYPE_FLAT:
+ elif network_type == p_const.TYPE_FLAT:
#Nothing to do
pass
- elif network_type == constants.TYPE_LOCAL:
+ elif network_type == p_const.TYPE_LOCAL:
#Nothing to do
pass
else:
# Special vlan_id value in ovs_vlan_allocations table indicating flat network
FLAT_VLAN_ID = -1
-
-# Values for network_type
-TYPE_LOCAL = 'local'
-TYPE_FLAT = 'flat'
-TYPE_VLAN = 'vlan'
-TYPE_NVGRE = 'gre'
-TYPE_NONE = 'none'
def _set_tenant_network_type(self):
tenant_network_type = cfg.CONF.HYPERV.tenant_network_type
- if tenant_network_type not in [constants.TYPE_LOCAL,
- constants.TYPE_FLAT,
- constants.TYPE_VLAN,
- constants.TYPE_NONE]:
+ if tenant_network_type not in [svc_constants.TYPE_LOCAL,
+ svc_constants.TYPE_FLAT,
+ svc_constants.TYPE_VLAN,
+ svc_constants.TYPE_NONE]:
msg = _(
"Invalid tenant_network_type: %s. "
"Agent terminated!") % tenant_network_type
def _create_network_providers_map(self):
self._network_providers_map = {
- constants.TYPE_LOCAL: LocalNetworkProvider(),
- constants.TYPE_FLAT: FlatNetworkProvider(),
- constants.TYPE_VLAN: VlanNetworkProvider()
+ svc_constants.TYPE_LOCAL: LocalNetworkProvider(),
+ svc_constants.TYPE_FLAT: FlatNetworkProvider(),
+ svc_constants.TYPE_VLAN: VlanNetworkProvider()
}
def _process_provider_create(self, context, session, attrs):
network_type = attrs.get(provider.NETWORK_TYPE)
network_type_set = attributes.is_attr_set(network_type)
if not network_type_set:
- if self._tenant_network_type == constants.TYPE_NONE:
+ if self._tenant_network_type == svc_constants.TYPE_NONE:
raise q_exc.TenantNetworksDisabled()
network_type = self._tenant_network_type
attrs[provider.NETWORK_TYPE] = network_type
from neutron.openstack.common import loopingcall
from neutron.openstack.common.rpc import common as rpc_common
from neutron.openstack.common.rpc import dispatcher
+from neutron.plugins.common import constants as p_const
from neutron.plugins.linuxbridge.common import config # noqa
from neutron.plugins.linuxbridge.common import constants as lconst
network_type,
physical_network,
segmentation_id):
- if network_type == lconst.TYPE_VXLAN:
+ if network_type == p_const.TYPE_VXLAN:
if self.vxlan_mode == lconst.VXLAN_NONE:
LOG.error(_("Unable to add vxlan interface for network %s"),
network_id)
LOG.error(_("No mapping for physical network %s"),
physical_network)
return
- if network_type == lconst.TYPE_FLAT:
+ if network_type == p_const.TYPE_FLAT:
return self.ensure_flat_bridge(network_id, physical_interface)
- elif network_type == lconst.TYPE_VLAN:
+ elif network_type == p_const.TYPE_VLAN:
return self.ensure_vlan_bridge(network_id, physical_interface,
segmentation_id)
else:
return False
bridge_name = self.get_bridge_name(network_id)
- if network_type == lconst.TYPE_LOCAL:
+ if network_type == p_const.TYPE_LOCAL:
self.ensure_local_bridge(network_id)
elif not self.ensure_physical_in_bridge(network_id,
network_type,
if not segment:
return
- if segment.network_type != lconst.TYPE_VXLAN:
+ if segment.network_type != p_const.TYPE_VXLAN:
return
interface = self.agent.br_mgr.get_vxlan_device_name(
if not segment:
return
- if segment.network_type != lconst.TYPE_VXLAN:
+ if segment.network_type != p_const.TYPE_VXLAN:
return
interface = self.agent.br_mgr.get_vxlan_device_name(
if not segment:
return
- if segment.network_type != lconst.TYPE_VXLAN:
+ if segment.network_type != p_const.TYPE_VXLAN:
return
interface = self.agent.br_mgr.get_vxlan_device_name(
configurations = {'interface_mappings': interface_mappings}
if self.br_mgr.vxlan_mode is not lconst.VXLAN_NONE:
configurations['tunneling_ip'] = self.br_mgr.local_ip
- configurations['tunnel_types'] = [lconst.TYPE_VXLAN]
+ configurations['tunnel_types'] = [p_const.TYPE_VXLAN]
configurations['l2_population'] = cfg.CONF.VXLAN.l2_population
self.agent_state = {
'binary': 'neutron-linuxbridge-agent',
# @author: Sumit Naiksatam, Cisco Systems, Inc.
+from neutron.plugins.common import constants as p_const
+
+
FLAT_VLAN_ID = -1
LOCAL_VLAN_ID = -2
-# Values for network_type
-TYPE_FLAT = 'flat'
-TYPE_VLAN = 'vlan'
-TYPE_VXLAN = 'vxlan'
-TYPE_LOCAL = 'local'
-TYPE_NONE = 'none'
-
# Supported VXLAN features
VXLAN_NONE = 'not_supported'
VXLAN_MCAST = 'multicast_flooding'
def interpret_vlan_id(vlan_id):
"""Return (network_type, segmentation_id) tuple for encoded vlan_id."""
if vlan_id == LOCAL_VLAN_ID:
- return (TYPE_LOCAL, None)
+ return (p_const.TYPE_LOCAL, None)
elif vlan_id == FLAT_VLAN_ID:
- return (TYPE_FLAT, None)
+ return (p_const.TYPE_FLAT, None)
else:
- return (TYPE_VLAN, vlan_id)
+ return (p_const.TYPE_VLAN, vlan_id)
self._parse_network_vlan_ranges()
db.sync_network_states(self.network_vlan_ranges)
self.tenant_network_type = cfg.CONF.VLANS.tenant_network_type
- if self.tenant_network_type not in [constants.TYPE_LOCAL,
- constants.TYPE_VLAN,
- constants.TYPE_NONE]:
+ if self.tenant_network_type not in [svc_constants.TYPE_LOCAL,
+ svc_constants.TYPE_VLAN,
+ svc_constants.TYPE_NONE]:
LOG.error(_("Invalid tenant_network_type: %s. "
"Service terminated!"),
self.tenant_network_type)
def _extend_network_dict_provider(self, context, network):
binding = db.get_network_binding(context.session, network['id'])
if binding.vlan_id == constants.FLAT_VLAN_ID:
- network[provider.NETWORK_TYPE] = constants.TYPE_FLAT
+ network[provider.NETWORK_TYPE] = svc_constants.TYPE_FLAT
network[provider.PHYSICAL_NETWORK] = binding.physical_network
network[provider.SEGMENTATION_ID] = None
elif binding.vlan_id == constants.LOCAL_VLAN_ID:
- network[provider.NETWORK_TYPE] = constants.TYPE_LOCAL
+ network[provider.NETWORK_TYPE] = svc_constants.TYPE_LOCAL
network[provider.PHYSICAL_NETWORK] = None
network[provider.SEGMENTATION_ID] = None
else:
- network[provider.NETWORK_TYPE] = constants.TYPE_VLAN
+ network[provider.NETWORK_TYPE] = svc_constants.TYPE_VLAN
network[provider.PHYSICAL_NETWORK] = binding.physical_network
network[provider.SEGMENTATION_ID] = binding.vlan_id
if not network_type_set:
msg = _("provider:network_type required")
raise q_exc.InvalidInput(error_message=msg)
- elif network_type == constants.TYPE_FLAT:
+ elif network_type == svc_constants.TYPE_FLAT:
if segmentation_id_set:
msg = _("provider:segmentation_id specified for flat network")
raise q_exc.InvalidInput(error_message=msg)
else:
segmentation_id = constants.FLAT_VLAN_ID
- elif network_type == constants.TYPE_VLAN:
+ elif network_type == svc_constants.TYPE_VLAN:
if not segmentation_id_set:
msg = _("provider:segmentation_id required")
raise q_exc.InvalidInput(error_message=msg)
{'min_id': q_const.MIN_VLAN_TAG,
'max_id': q_const.MAX_VLAN_TAG})
raise q_exc.InvalidInput(error_message=msg)
- elif network_type == constants.TYPE_LOCAL:
+ elif network_type == svc_constants.TYPE_LOCAL:
if physical_network_set:
msg = _("provider:physical_network specified for local "
"network")
msg = _("provider:network_type %s not supported") % network_type
raise q_exc.InvalidInput(error_message=msg)
- if network_type in [constants.TYPE_VLAN, constants.TYPE_FLAT]:
+ if network_type in [svc_constants.TYPE_VLAN, svc_constants.TYPE_FLAT]:
if physical_network_set:
if physical_network not in self.network_vlan_ranges:
msg = (_("Unknown provider:physical_network %s") %
if not network_type:
# tenant network
network_type = self.tenant_network_type
- if network_type == constants.TYPE_NONE:
+ if network_type == svc_constants.TYPE_NONE:
raise q_exc.TenantNetworksDisabled()
- elif network_type == constants.TYPE_VLAN:
+ elif network_type == svc_constants.TYPE_VLAN:
physical_network, vlan_id = db.reserve_network(session)
else: # TYPE_LOCAL
vlan_id = constants.LOCAL_VLAN_ID
else:
# provider network
- if network_type in [constants.TYPE_VLAN, constants.TYPE_FLAT]:
+ if network_type in [svc_constants.TYPE_VLAN,
+ svc_constants.TYPE_FLAT]:
db.reserve_specific_network(session, physical_network,
vlan_id)
# no reservation needed for TYPE_LOCAL
from neutron.common import exceptions as exc
from neutron.db import model_base
from neutron.openstack.common import log
+from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2 import driver_api as api
LOG = log.getLogger(__name__)
-TYPE_FLAT = 'flat'
-
flat_opts = [
cfg.ListOpt('flat_networks',
default=[],
self.flat_networks)
def get_type(self):
- return TYPE_FLAT
+ return p_const.TYPE_FLAT
def initialize(self):
LOG.info(_("ML2 FlatTypeDriver initialization complete"))
from neutron.db import api as db_api
from neutron.db import model_base
from neutron.openstack.common import log
+from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import type_tunnel
LOG = log.getLogger(__name__)
-TYPE_GRE = 'gre'
-
gre_opts = [
cfg.ListOpt('tunnel_id_ranges',
default=[],
class GreTypeDriver(type_tunnel.TunnelTypeDriver):
def get_type(self):
- return TYPE_GRE
+ return p_const.TYPE_GRE
def initialize(self):
self.gre_id_ranges = []
self._parse_tunnel_ranges(
cfg.CONF.ml2_type_gre.tunnel_id_ranges,
self.gre_id_ranges,
- TYPE_GRE
+ p_const.TYPE_GRE
)
self._sync_gre_allocations()
LOG.debug(_("Allocating gre tunnel id %(gre_id)s"),
{'gre_id': alloc.gre_id})
alloc.allocated = True
- return {api.NETWORK_TYPE: TYPE_GRE,
+ return {api.NETWORK_TYPE: p_const.TYPE_GRE,
api.PHYSICAL_NETWORK: None,
api.SEGMENTATION_ID: alloc.gre_id}
from neutron.common import exceptions as exc
from neutron.openstack.common import log
+from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2 import driver_api as api
LOG = log.getLogger(__name__)
-TYPE_LOCAL = 'local'
-
class LocalTypeDriver(api.TypeDriver):
"""Manage state for local networks with ML2.
LOG.info(_("ML2 LocalTypeDriver initialization complete"))
def get_type(self):
- return TYPE_LOCAL
+ return p_const.TYPE_LOCAL
def initialize(self):
pass
def allocate_tenant_segment(self, session):
# No resources to allocate
- return {api.NETWORK_TYPE: TYPE_LOCAL}
+ return {api.NETWORK_TYPE: p_const.TYPE_LOCAL}
def release_segment(self, session, segment):
# No resources to release
from neutron.db import api as db_api
from neutron.db import model_base
from neutron.openstack.common import log
+from neutron.plugins.common import constants as p_const
from neutron.plugins.common import utils as plugin_utils
from neutron.plugins.ml2 import driver_api as api
LOG = log.getLogger(__name__)
-TYPE_VLAN = 'vlan'
-
vlan_opts = [
cfg.ListOpt('network_vlan_ranges',
default=[],
session.delete(alloc)
def get_type(self):
- return TYPE_VLAN
+ return p_const.TYPE_VLAN
def initialize(self):
self._sync_vlan_allocations()
{'vlan_id': alloc.vlan_id,
'physical_network': alloc.physical_network})
alloc.allocated = True
- return {api.NETWORK_TYPE: TYPE_VLAN,
+ return {api.NETWORK_TYPE: p_const.TYPE_VLAN,
api.PHYSICAL_NETWORK: alloc.physical_network,
api.SEGMENTATION_ID: alloc.vlan_id}
from neutron.db import api as db_api
from neutron.db import model_base
from neutron.openstack.common import log
+from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import type_tunnel
LOG = log.getLogger(__name__)
-TYPE_VXLAN = 'vxlan'
VXLAN_UDP_PORT = 4789
MAX_VXLAN_VNI = 16777215
class VxlanTypeDriver(type_tunnel.TunnelTypeDriver):
def get_type(self):
- return TYPE_VXLAN
+ return p_const.TYPE_VXLAN
def initialize(self):
self.vxlan_vni_ranges = []
self._parse_tunnel_ranges(
cfg.CONF.ml2_type_vxlan.vni_ranges,
self.vxlan_vni_ranges,
- TYPE_VXLAN
+ p_const.TYPE_VXLAN
)
self._sync_vxlan_allocations()
LOG.debug(_("Allocating vxlan tunnel vni %(vxlan_vni)s"),
{'vxlan_vni': alloc.vxlan_vni})
alloc.allocated = True
- return {api.NETWORK_TYPE: TYPE_VXLAN,
+ return {api.NETWORK_TYPE: p_const.TYPE_VXLAN,
api.PHYSICAL_NETWORK: None,
api.SEGMENTATION_ID: alloc.vxlan_vni}
from neutron.openstack.common import loopingcall
from neutron.openstack.common.rpc import common as rpc_common
from neutron.openstack.common.rpc import dispatcher
+from neutron.plugins.common import constants as p_const
from neutron.plugins.mlnx.agent import utils
from neutron.plugins.mlnx.common import config # noqa
from neutron.plugins.mlnx.common import constants
net_map = self.network_map[network_id]
net_map['ports'].append({'port_id': port_id, 'port_mac': port_mac})
- if network_type in (constants.TYPE_VLAN,
+ if network_type in (p_const.TYPE_VLAN,
constants.TYPE_IB):
LOG.info(_('Binding Segmentation ID %(seg_id)s'
'to eSwitch for vNIC mac_address %(mac)s'),
network_id, network_type,
physical_network, segmentation_id):
LOG.info(_("Provisioning network %s"), network_id)
- if network_type == constants.TYPE_VLAN:
+ if network_type == p_const.TYPE_VLAN:
LOG.debug(_("Creating VLAN Network"))
elif network_type == constants.TYPE_IB:
LOG.debug(_("Creating IB Network"))
FLAT_VLAN_ID = -1
# Values for network_type
-TYPE_LOCAL = 'local'
-TYPE_FLAT = 'flat'
-TYPE_VLAN = 'vlan'
TYPE_IB = 'ib'
-TYPE_NONE = 'none'
VIF_TYPE_DIRECT = 'mlnx_direct'
VIF_TYPE_HOSTDEV = 'hostdev'
def _extend_network_dict_provider(self, context, network):
binding = db.get_network_binding(context.session, network['id'])
network[provider.NETWORK_TYPE] = binding.network_type
- if binding.network_type == constants.TYPE_FLAT:
+ if binding.network_type == svc_constants.TYPE_FLAT:
network[provider.PHYSICAL_NETWORK] = binding.physical_network
network[provider.SEGMENTATION_ID] = None
- elif binding.network_type == constants.TYPE_LOCAL:
+ elif binding.network_type == svc_constants.TYPE_LOCAL:
network[provider.PHYSICAL_NETWORK] = None
network[provider.SEGMENTATION_ID] = None
else:
def _set_tenant_network_type(self):
self.tenant_network_type = cfg.CONF.MLNX.tenant_network_type
- if self.tenant_network_type not in [constants.TYPE_VLAN,
+ if self.tenant_network_type not in [svc_constants.TYPE_VLAN,
constants.TYPE_IB,
- constants.TYPE_LOCAL,
- constants.TYPE_NONE]:
+ svc_constants.TYPE_LOCAL,
+ svc_constants.TYPE_NONE]:
LOG.error(_("Invalid tenant_network_type: %s. "
"Service terminated!"),
self.tenant_network_type)
if not network_type_set:
msg = _("provider:network_type required")
raise q_exc.InvalidInput(error_message=msg)
- elif network_type == constants.TYPE_FLAT:
+ elif network_type == svc_constants.TYPE_FLAT:
self._process_flat_net(segmentation_id_set)
segmentation_id = constants.FLAT_VLAN_ID
- elif network_type in [constants.TYPE_VLAN, constants.TYPE_IB]:
+ elif network_type in [svc_constants.TYPE_VLAN, constants.TYPE_IB]:
self._process_vlan_net(segmentation_id, segmentation_id_set)
- elif network_type == constants.TYPE_LOCAL:
+ elif network_type == svc_constants.TYPE_LOCAL:
self._process_local_net(physical_network_set,
segmentation_id_set)
segmentation_id = constants.LOCAL_VLAN_ID
def _process_net_type(self, network_type,
physical_network,
physical_network_set):
- if network_type in [constants.TYPE_VLAN,
+ if network_type in [svc_constants.TYPE_VLAN,
constants.TYPE_IB,
- constants.TYPE_FLAT]:
+ svc_constants.TYPE_FLAT]:
if physical_network_set:
if physical_network not in self.network_vlan_ranges:
msg = _("Unknown provider:physical_network "
return physical_network
def _check_port_binding_for_net_type(self, vnic_type, net_type):
- if net_type == constants.TYPE_VLAN:
+ if net_type == svc_constants.TYPE_VLAN:
return vnic_type in (constants.VIF_TYPE_DIRECT,
constants.VIF_TYPE_HOSTDEV)
elif net_type == constants.TYPE_IB:
if not network_type:
# tenant network
network_type = self.tenant_network_type
- if network_type == constants.TYPE_NONE:
+ if network_type == svc_constants.TYPE_NONE:
raise q_exc.TenantNetworksDisabled()
- elif network_type in [constants.TYPE_VLAN, constants.TYPE_IB]:
+ elif network_type in [svc_constants.TYPE_VLAN,
+ constants.TYPE_IB]:
physical_network, vlan_id = db.reserve_network(session)
else: # TYPE_LOCAL
vlan_id = constants.LOCAL_VLAN_ID
else:
# provider network
- if network_type in [constants.TYPE_VLAN,
+ if network_type in [svc_constants.TYPE_VLAN,
constants.TYPE_IB,
- constants.TYPE_FLAT]:
+ svc_constants.TYPE_FLAT]:
db.reserve_specific_network(session,
physical_network,
vlan_id)
from neutron.openstack.common import loopingcall
from neutron.openstack.common.rpc import common as rpc_common
from neutron.openstack.common.rpc import dispatcher
+from neutron.plugins.common import constants as p_const
from neutron.plugins.openvswitch.common import config # noqa
from neutron.plugins.openvswitch.common import constants
self.setup_integration_br()
self.setup_physical_bridges(bridge_mappings)
self.local_vlan_map = {}
- self.tun_br_ofports = {constants.TYPE_GRE: {},
- constants.TYPE_VXLAN: {}}
+ self.tun_br_ofports = {p_const.TYPE_GRE: {},
+ p_const.TYPE_VXLAN: {}}
self.polling_interval = polling_interval
self.minimize_polling = minimize_polling
self.iter_num = 0
def _check_ovs_version(self):
- if constants.TYPE_VXLAN in self.tunnel_types:
+ if p_const.TYPE_VXLAN in self.tunnel_types:
check_ovs_version(constants.MINIMUM_OVS_VXLAN_VERSION,
self.root_helper)
"net-id=%(net_uuid)s - tunneling disabled"),
{'network_type': network_type,
'net_uuid': net_uuid})
- elif network_type == constants.TYPE_FLAT:
+ elif network_type == p_const.TYPE_FLAT:
if physical_network in self.phys_brs:
# outbound
br = self.phys_brs[physical_network]
"physical_network %(physical_network)s"),
{'net_uuid': net_uuid,
'physical_network': physical_network})
- elif network_type == constants.TYPE_VLAN:
+ elif network_type == p_const.TYPE_VLAN:
if physical_network in self.phys_brs:
# outbound
br = self.phys_brs[physical_network]
"physical_network %(physical_network)s"),
{'net_uuid': net_uuid,
'physical_network': physical_network})
- elif network_type == constants.TYPE_LOCAL:
+ elif network_type == p_const.TYPE_LOCAL:
# no flows needed for local networks
pass
else:
# Try to remove tunnel ports if not used by other networks
for ofport in lvm.tun_ofports:
self.cleanup_tunnel_port(ofport, lvm.network_type)
- elif lvm.network_type == constants.TYPE_FLAT:
+ elif lvm.network_type == p_const.TYPE_FLAT:
if lvm.physical_network in self.phys_brs:
# outbound
br = self.phys_brs[lvm.physical_network]
br = self.int_br
br.delete_flows(in_port=self.int_ofports[lvm.physical_network],
dl_vlan=0xffff)
- elif lvm.network_type == constants.TYPE_VLAN:
+ elif lvm.network_type == p_const.TYPE_VLAN:
if lvm.physical_network in self.phys_brs:
# outbound
br = self.phys_brs[lvm.physical_network]
br = self.int_br
br.delete_flows(in_port=self.int_ofports[lvm.physical_network],
dl_vlan=lvm.segmentation_id)
- elif lvm.network_type == constants.TYPE_LOCAL:
+ elif lvm.network_type == p_const.TYPE_LOCAL:
# no flows needed for local networks
pass
else:
# If enable_tunneling is TRUE, set tunnel_type to default to GRE
if config.OVS.enable_tunneling and not kwargs['tunnel_types']:
- kwargs['tunnel_types'] = [constants.TYPE_GRE]
+ kwargs['tunnel_types'] = [p_const.TYPE_GRE]
# Verify the tunnel_types specified are valid
for tun in kwargs['tunnel_types']:
# See the License for the specific language governing permissions and
# limitations under the License.
+from neutron.plugins.common import constants as p_const
+
+
# Special vlan_id value in ovs_vlan_allocations table indicating flat network
FLAT_VLAN_ID = -1
TUNNEL = 'tunnel'
# Values for network_type
-TYPE_FLAT = 'flat'
-TYPE_VLAN = 'vlan'
-TYPE_GRE = 'gre'
-TYPE_LOCAL = 'local'
-TYPE_VXLAN = 'vxlan'
-TYPE_NONE = 'none'
VXLAN_UDP_PORT = 4789
# Name prefixes for veth device pair linking the integration bridge
MINIMUM_OVS_VXLAN_VERSION = "1.10"
# The different types of tunnels
-TUNNEL_NETWORK_TYPES = [TYPE_GRE, TYPE_VXLAN]
+TUNNEL_NETWORK_TYPES = [p_const.TYPE_GRE, p_const.TYPE_VXLAN]
# Various tables for tunneling flows
PATCH_LV_TO_TUN = 1
UCAST_TO_TUN = 20
FLOOD_TO_TUN = 21
# Map tunnel types to tables number
-TUN_TABLE = {TYPE_GRE: GRE_TUN_TO_LV, TYPE_VXLAN: VXLAN_TUN_TO_LV}
+TUN_TABLE = {p_const.TYPE_GRE: GRE_TUN_TO_LV,
+ p_const.TYPE_VXLAN: VXLAN_TUN_TO_LV}
# The default respawn interval for the ovsdb monitor
DEFAULT_OVSDBMON_RESPAWN = 30
self._parse_network_vlan_ranges()
ovs_db_v2.sync_vlan_allocations(self.network_vlan_ranges)
self.tenant_network_type = cfg.CONF.OVS.tenant_network_type
- if self.tenant_network_type not in [constants.TYPE_LOCAL,
- constants.TYPE_VLAN,
- constants.TYPE_GRE,
- constants.TYPE_VXLAN,
- constants.TYPE_NONE]:
+ if self.tenant_network_type not in [svc_constants.TYPE_LOCAL,
+ svc_constants.TYPE_VLAN,
+ svc_constants.TYPE_GRE,
+ svc_constants.TYPE_VXLAN,
+ svc_constants.TYPE_NONE]:
LOG.error(_("Invalid tenant_network_type: %s. "
"Server terminated!"),
self.tenant_network_type)
self.enable_tunneling = cfg.CONF.OVS.enable_tunneling
self.tunnel_type = None
if self.enable_tunneling:
- self.tunnel_type = cfg.CONF.OVS.tunnel_type or constants.TYPE_GRE
+ self.tunnel_type = (cfg.CONF.OVS.tunnel_type or
+ svc_constants.TYPE_GRE)
elif cfg.CONF.OVS.tunnel_type:
self.tunnel_type = cfg.CONF.OVS.tunnel_type
self.enable_tunneling = True
if binding.network_type in constants.TUNNEL_NETWORK_TYPES:
network[provider.PHYSICAL_NETWORK] = None
network[provider.SEGMENTATION_ID] = binding.segmentation_id
- elif binding.network_type == constants.TYPE_FLAT:
+ elif binding.network_type == svc_constants.TYPE_FLAT:
network[provider.PHYSICAL_NETWORK] = binding.physical_network
network[provider.SEGMENTATION_ID] = None
- elif binding.network_type == constants.TYPE_VLAN:
+ elif binding.network_type == svc_constants.TYPE_VLAN:
network[provider.PHYSICAL_NETWORK] = binding.physical_network
network[provider.SEGMENTATION_ID] = binding.segmentation_id
- elif binding.network_type == constants.TYPE_LOCAL:
+ elif binding.network_type == svc_constants.TYPE_LOCAL:
network[provider.PHYSICAL_NETWORK] = None
network[provider.SEGMENTATION_ID] = None
if not network_type_set:
msg = _("provider:network_type required")
raise q_exc.InvalidInput(error_message=msg)
- elif network_type == constants.TYPE_FLAT:
+ elif network_type == svc_constants.TYPE_FLAT:
if segmentation_id_set:
msg = _("provider:segmentation_id specified for flat network")
raise q_exc.InvalidInput(error_message=msg)
else:
segmentation_id = constants.FLAT_VLAN_ID
- elif network_type == constants.TYPE_VLAN:
+ elif network_type == svc_constants.TYPE_VLAN:
if not segmentation_id_set:
msg = _("provider:segmentation_id required")
raise q_exc.InvalidInput(error_message=msg)
if not segmentation_id_set:
msg = _("provider:segmentation_id required")
raise q_exc.InvalidInput(error_message=msg)
- elif network_type == constants.TYPE_LOCAL:
+ elif network_type == svc_constants.TYPE_LOCAL:
if physical_network_set:
msg = _("provider:physical_network specified for local "
"network")
msg = _("provider:network_type %s not supported") % network_type
raise q_exc.InvalidInput(error_message=msg)
- if network_type in [constants.TYPE_VLAN, constants.TYPE_FLAT]:
+ if network_type in [svc_constants.TYPE_VLAN, svc_constants.TYPE_FLAT]:
if physical_network_set:
if physical_network not in self.network_vlan_ranges:
msg = _("Unknown provider:physical_network "
if not network_type:
# tenant network
network_type = self.tenant_network_type
- if network_type == constants.TYPE_NONE:
+ if network_type == svc_constants.TYPE_NONE:
raise q_exc.TenantNetworksDisabled()
- elif network_type == constants.TYPE_VLAN:
+ elif network_type == svc_constants.TYPE_VLAN:
(physical_network,
segmentation_id) = ovs_db_v2.reserve_vlan(session)
elif network_type in constants.TUNNEL_NETWORK_TYPES:
# no reservation needed for TYPE_LOCAL
else:
# provider network
- if network_type in [constants.TYPE_VLAN, constants.TYPE_FLAT]:
+ if network_type in [svc_constants.TYPE_VLAN,
+ svc_constants.TYPE_FLAT]:
ovs_db_v2.reserve_specific_vlan(session, physical_network,
segmentation_id)
elif network_type in constants.TUNNEL_NETWORK_TYPES:
if binding.network_type in constants.TUNNEL_NETWORK_TYPES:
ovs_db_v2.release_tunnel(session, binding.segmentation_id,
self.tunnel_id_ranges)
- elif binding.network_type in [constants.TYPE_VLAN,
- constants.TYPE_FLAT]:
+ elif binding.network_type in [svc_constants.TYPE_VLAN,
+ svc_constants.TYPE_FLAT]:
ovs_db_v2.release_vlan(session, binding.physical_network,
binding.segmentation_id,
self.network_vlan_ranges)
from neutron.agent.linux import utils
from neutron.common import constants
from neutron.openstack.common.rpc import common as rpc_common
+from neutron.plugins.common import constants as p_const
from neutron.plugins.linuxbridge.agent import linuxbridge_neutron_agent
from neutron.plugins.linuxbridge.common import constants as lconst
from neutron.tests import base
def test_ensure_physical_in_bridge_invalid(self):
result = self.linux_bridge.ensure_physical_in_bridge('network_id',
- lconst.TYPE_VLAN,
+ p_const.TYPE_VLAN,
'physnetx',
7)
self.assertFalse(result)
with mock.patch.object(self.linux_bridge,
'ensure_flat_bridge') as flat_bridge_func:
self.linux_bridge.ensure_physical_in_bridge(
- 'network_id', lconst.TYPE_FLAT, 'physnet1', None)
+ 'network_id', p_const.TYPE_FLAT, 'physnet1', None)
self.assertTrue(flat_bridge_func.called)
def test_ensure_physical_in_bridge_vlan(self):
with mock.patch.object(self.linux_bridge,
'ensure_vlan_bridge') as vlan_bridge_func:
self.linux_bridge.ensure_physical_in_bridge(
- 'network_id', lconst.TYPE_VLAN, 'physnet1', 7)
+ 'network_id', p_const.TYPE_VLAN, 'physnet1', 7)
self.assertTrue(vlan_bridge_func.called)
def test_ensure_physical_in_bridge_vxlan(self):
def test_ensure_physical_in_bridge(self):
self.assertFalse(
- self.lbm.ensure_physical_in_bridge("123", lconst.TYPE_VLAN,
+ self.lbm.ensure_physical_in_bridge("123", p_const.TYPE_VLAN,
"phys", "1")
)
with mock.patch.object(self.lbm, "ensure_flat_bridge") as flbr_fn:
self.assertTrue(
- self.lbm.ensure_physical_in_bridge("123", lconst.TYPE_FLAT,
+ self.lbm.ensure_physical_in_bridge("123", p_const.TYPE_FLAT,
"physnet1", None)
)
self.assertTrue(flbr_fn.called)
with mock.patch.object(self.lbm, "ensure_vlan_bridge") as vlbr_fn:
self.assertTrue(
- self.lbm.ensure_physical_in_bridge("123", lconst.TYPE_VLAN,
+ self.lbm.ensure_physical_in_bridge("123", p_const.TYPE_VLAN,
"physnet1", "1")
)
self.assertTrue(vlbr_fn.called)
with mock.patch.object(self.lbm, "ensure_vxlan_bridge") as vlbr_fn:
self.lbm.vxlan_mode = lconst.VXLAN_MCAST
self.assertTrue(
- self.lbm.ensure_physical_in_bridge("123", lconst.TYPE_VXLAN,
+ self.lbm.ensure_physical_in_bridge("123", p_const.TYPE_VXLAN,
"physnet1", "1")
)
self.assertTrue(vlbr_fn.called)
with mock.patch.object(self.lbm, "device_exists") as de_fn:
de_fn.return_value = False
self.assertFalse(
- self.lbm.add_tap_interface("123", lconst.TYPE_VLAN,
+ self.lbm.add_tap_interface("123", p_const.TYPE_VLAN,
"physnet1", "1", "tap1")
)
exec_fn.return_value = False
get_br.return_value = True
self.assertTrue(self.lbm.add_tap_interface("123",
- lconst.TYPE_LOCAL,
+ p_const.TYPE_LOCAL,
"physnet1", None,
"tap1"))
en_fn.assert_called_with("123")
get_br.return_value = False
exec_fn.return_value = True
self.assertFalse(self.lbm.add_tap_interface("123",
- lconst.TYPE_LOCAL,
+ p_const.TYPE_LOCAL,
"physnet1", None,
"tap1"))
"ensure_physical_in_bridge") as ens_fn:
ens_fn.return_value = False
self.assertFalse(self.lbm.add_tap_interface("123",
- lconst.TYPE_VLAN,
+ p_const.TYPE_VLAN,
"physnet1", "1",
"tap1"))
def test_add_interface(self):
with mock.patch.object(self.lbm, "add_tap_interface") as add_tap:
- self.lbm.add_interface("123", lconst.TYPE_VLAN, "physnet-1",
+ self.lbm.add_interface("123", p_const.TYPE_VLAN, "physnet-1",
"1", "234")
- add_tap.assert_called_with("123", lconst.TYPE_VLAN, "physnet-1",
+ add_tap.assert_called_with("123", p_const.TYPE_VLAN, "physnet-1",
"1", "tap234")
def test_delete_vlan_bridge(self):
self.lb_rpc.port_update("unused_context", port=port,
vlan_id="1", physical_network="physnet1")
self.assertFalse(reffw_fn.called)
- addif_fn.assert_called_with(port["network_id"], lconst.TYPE_VLAN,
+ addif_fn.assert_called_with(port["network_id"], p_const.TYPE_VLAN,
"physnet1", "1", port["id"])
self.lb_rpc.port_update("unused_context", port=port,
- network_type=lconst.TYPE_VLAN,
+ network_type=p_const.TYPE_VLAN,
segmentation_id="2",
physical_network="physnet1")
self.assertFalse(reffw_fn.called)
- addif_fn.assert_called_with(port["network_id"], lconst.TYPE_VLAN,
+ addif_fn.assert_called_with(port["network_id"], p_const.TYPE_VLAN,
"physnet1", "2", port["id"])
self.lb_rpc.port_update("unused_context", port=port,
vlan_id=lconst.FLAT_VLAN_ID,
physical_network="physnet1")
self.assertFalse(reffw_fn.called)
- addif_fn.assert_called_with(port["network_id"], lconst.TYPE_FLAT,
+ addif_fn.assert_called_with(port["network_id"], p_const.TYPE_FLAT,
"physnet1", None, port["id"])
self.lb_rpc.port_update("unused_context", port=port,
- network_type=lconst.TYPE_FLAT,
+ network_type=p_const.TYPE_FLAT,
segmentation_id=None,
physical_network="physnet1")
self.assertFalse(reffw_fn.called)
- addif_fn.assert_called_with(port["network_id"], lconst.TYPE_FLAT,
+ addif_fn.assert_called_with(port["network_id"], p_const.TYPE_FLAT,
"physnet1", None, port["id"])
self.lb_rpc.port_update("unused_context", port=port,
vlan_id=lconst.LOCAL_VLAN_ID,
physical_network=None)
self.assertFalse(reffw_fn.called)
- addif_fn.assert_called_with(port["network_id"], lconst.TYPE_LOCAL,
+ addif_fn.assert_called_with(port["network_id"], p_const.TYPE_LOCAL,
None, None, port["id"])
self.lb_rpc.port_update("unused_context", port=port,
- network_type=lconst.TYPE_LOCAL,
+ network_type=p_const.TYPE_LOCAL,
segmentation_id=None,
physical_network=None)
self.assertFalse(reffw_fn.called)
- addif_fn.assert_called_with(port["network_id"], lconst.TYPE_LOCAL,
+ addif_fn.assert_called_with(port["network_id"], p_const.TYPE_LOCAL,
None, None, port["id"])
addif_fn.return_value = True
self.lb_rpc.port_update("unused_context", port=port,
- network_type=lconst.TYPE_LOCAL,
+ network_type=p_const.TYPE_LOCAL,
segmentation_id=None,
physical_network=None)
rpc_obj.update_device_up.assert_called_with(
addif_fn.return_value = False
self.lb_rpc.port_update("unused_context", port=port,
- network_type=lconst.TYPE_LOCAL,
+ network_type=p_const.TYPE_LOCAL,
segmentation_id=None,
physical_network=None)
rpc_obj.update_device_down.assert_called_with(
from neutron.common import exceptions as exc
from neutron.db import api as db
+from neutron.plugins.common import constants as p_const
from neutron.plugins.ml2 import db as ml2_db
from neutron.plugins.ml2 import driver_api as api
from neutron.plugins.ml2.drivers import type_vxlan
self.addCleanup(db.clear_db)
def test_vxlan_tunnel_type(self):
- self.assertEqual(self.driver.get_type(), type_vxlan.TYPE_VXLAN)
+ self.assertEqual(self.driver.get_type(), p_const.TYPE_VXLAN)
def test_validate_provider_segment(self):
segment = {api.NETWORK_TYPE: 'vxlan',
from neutron.agent.linux import utils
from neutron.common import constants as n_const
from neutron.openstack.common.rpc import common as rpc_common
+from neutron.plugins.common import constants as p_const
from neutron.plugins.openvswitch.agent import ovs_neutron_agent
from neutron.plugins.openvswitch.common import constants
from neutron.tests import base
self.addCleanup(cfg.CONF.reset)
# An ip address is required for tunneling but there is no default,
# verify this for both gre and vxlan tunnels.
- cfg.CONF.set_override('tunnel_types', [constants.TYPE_GRE],
+ cfg.CONF.set_override('tunnel_types', [p_const.TYPE_GRE],
group='AGENT')
with testtools.ExpectedException(ValueError):
ovs_neutron_agent.create_agent_config_map(cfg.CONF)
- cfg.CONF.set_override('tunnel_types', [constants.TYPE_VXLAN],
+ cfg.CONF.set_override('tunnel_types', [p_const.TYPE_VXLAN],
group='AGENT')
with testtools.ExpectedException(ValueError):
ovs_neutron_agent.create_agent_config_map(cfg.CONF)
cfg.CONF.set_override('enable_tunneling', True, group='OVS')
cfg.CONF.set_override('local_ip', '10.10.10.10', group='OVS')
cfgmap = ovs_neutron_agent.create_agent_config_map(cfg.CONF)
- self.assertEqual(cfgmap['tunnel_types'], [constants.TYPE_GRE])
+ self.assertEqual(cfgmap['tunnel_types'], [p_const.TYPE_GRE])
def test_create_agent_config_map_fails_no_local_ip(self):
self.addCleanup(cfg.CONF.reset)
def test_create_agent_config_map_multiple_tunnel_types(self):
self.addCleanup(cfg.CONF.reset)
cfg.CONF.set_override('local_ip', '10.10.10.10', group='OVS')
- cfg.CONF.set_override('tunnel_types', [constants.TYPE_GRE,
- constants.TYPE_VXLAN], group='AGENT')
+ cfg.CONF.set_override('tunnel_types', [p_const.TYPE_GRE,
+ p_const.TYPE_VXLAN], group='AGENT')
cfgmap = ovs_neutron_agent.create_agent_config_map(cfg.CONF)
self.assertEqual(cfgmap['tunnel_types'],
- [constants.TYPE_GRE, constants.TYPE_VXLAN])
+ [p_const.TYPE_GRE, p_const.TYPE_VXLAN])
class TestOvsNeutronAgent(base.BaseTestCase):
mock.patch.object(ovs_neutron_agent.LOG, 'error')
) as (add_tunnel_port_fn, log_error_fn):
ofport = self.agent.setup_tunnel_port(
- 'gre-1', 'remote_ip', constants.TYPE_GRE)
+ 'gre-1', 'remote_ip', p_const.TYPE_GRE)
add_tunnel_port_fn.assert_called_once_with(
- 'gre-1', 'remote_ip', self.agent.local_ip, constants.TYPE_GRE,
+ 'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
self.agent.vxlan_udp_port)
log_error_fn.assert_called_once_with(
_("Failed to set-up %(type)s tunnel port to %(ip)s"),
- {'type': constants.TYPE_GRE, 'ip': 'remote_ip'})
+ {'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
self.assertEqual(ofport, 0)
def test_setup_tunnel_port_error_not_int(self):
mock.patch.object(ovs_neutron_agent.LOG, 'error')
) as (add_tunnel_port_fn, log_exc_fn, log_error_fn):
ofport = self.agent.setup_tunnel_port(
- 'gre-1', 'remote_ip', constants.TYPE_GRE)
+ 'gre-1', 'remote_ip', p_const.TYPE_GRE)
add_tunnel_port_fn.assert_called_once_with(
- 'gre-1', 'remote_ip', self.agent.local_ip, constants.TYPE_GRE,
+ 'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
self.agent.vxlan_udp_port)
log_exc_fn.assert_called_once_with(
_("ofport should have a value that can be "
"interpreted as an integer"))
log_error_fn.assert_called_once_with(
_("Failed to set-up %(type)s tunnel port to %(ip)s"),
- {'type': constants.TYPE_GRE, 'ip': 'remote_ip'})
+ {'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
self.assertEqual(ofport, 0)
from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib
from neutron.openstack.common import log
+from neutron.plugins.common import constants as p_const
from neutron.plugins.openvswitch.agent import ovs_neutron_agent
from neutron.plugins.openvswitch.common import constants
from neutron.tests import base
LVM_VLAN = ovs_neutron_agent.LocalVLANMapping(
LV_ID, 'vlan', 'net1', LS_ID, VIF_PORTS)
-TUN_OFPORTS = {constants.TYPE_GRE: {'ip1': '11', 'ip2': '12'}}
+TUN_OFPORTS = {p_const.TYPE_GRE: {'ip1': '11', 'ip2': '12'}}
BCAST_MAC = "01:00:00:00:00:00/01:00:00:00:00:00"
UCAST_MAC = "00:00:00:00:00:00/01:00:00:00:00:00"
self._verify_mock_calls()
def test_provision_local_vlan(self):
- ofports = ','.join(TUN_OFPORTS[constants.TYPE_GRE].values())
+ ofports = ','.join(TUN_OFPORTS[p_const.TYPE_GRE].values())
self.mock_tun_bridge_expected += [
mock.call.mod_flow(table=constants.FLOOD_TO_TUN,
priority=1,
self.VETH_MTU)
a.available_local_vlans = set([LV_ID])
a.tun_br_ofports = TUN_OFPORTS
- a.provision_local_vlan(NET_UUID, constants.TYPE_GRE, None, LS_ID)
+ a.provision_local_vlan(NET_UUID, p_const.TYPE_GRE, None, LS_ID)
self._verify_mock_calls()
def test_provision_local_vlan_flat(self):
a.phys_brs['net1'] = self.mock_map_tun_bridge
a.phys_ofports['net1'] = self.MAP_TUN_OFPORT
a.int_ofports['net1'] = self.INT_OFPORT
- a.provision_local_vlan(NET_UUID, constants.TYPE_FLAT, 'net1', LS_ID)
+ a.provision_local_vlan(NET_UUID, p_const.TYPE_FLAT, 'net1', LS_ID)
self._verify_mock_calls()
def test_provision_local_vlan_flat_fail(self):
'10.0.0.1', self.NET_MAPPING,
'sudo', 2, ['gre'],
self.VETH_MTU)
- a.provision_local_vlan(NET_UUID, constants.TYPE_FLAT, 'net2', LS_ID)
+ a.provision_local_vlan(NET_UUID, p_const.TYPE_FLAT, 'net2', LS_ID)
self._verify_mock_calls()
def test_provision_local_vlan_vlan(self):
a.phys_brs['net1'] = self.mock_map_tun_bridge
a.phys_ofports['net1'] = self.MAP_TUN_OFPORT
a.int_ofports['net1'] = self.INT_OFPORT
- a.provision_local_vlan(NET_UUID, constants.TYPE_VLAN, 'net1', LS_ID)
+ a.provision_local_vlan(NET_UUID, p_const.TYPE_VLAN, 'net1', LS_ID)
self._verify_mock_calls()
def test_provision_local_vlan_vlan_fail(self):
'10.0.0.1', self.NET_MAPPING,
'sudo', 2, ['gre'],
self.VETH_MTU)
- a.provision_local_vlan(NET_UUID, constants.TYPE_VLAN, 'net2', LS_ID)
+ a.provision_local_vlan(NET_UUID, p_const.TYPE_VLAN, 'net2', LS_ID)
self._verify_mock_calls()
def test_reclaim_local_vlan(self):
self.VETH_MTU)
a.tunnel_update(
mock.sentinel.ctx, tunnel_id='1', tunnel_ip='10.0.10.1',
- tunnel_type=constants.TYPE_GRE)
+ tunnel_type=p_const.TYPE_GRE)
self._verify_mock_calls()
def test_tunnel_update_self(self):