from neutron.agent.linux import ip_lib
from neutron.agent.linux import ovs_lib
from neutron.agent.linux import utils
+from neutron.common import constants as n_const
from neutron.common import exceptions
from neutron.extensions import flavor
from neutron.openstack.common import importutils
# from linux IF_NAMESIZE
DEV_NAME_LEN = 14
- DEV_NAME_PREFIX = 'tap'
+ DEV_NAME_PREFIX = n_const.TAP_DEVICE_PREFIX
def __init__(self, conf):
self.conf = conf
class OVSInterfaceDriver(LinuxInterfaceDriver):
"""Driver for creating an internal interface on an OVS bridge."""
- DEV_NAME_PREFIX = 'tap'
+ DEV_NAME_PREFIX = n_const.TAP_DEVICE_PREFIX
def __init__(self, conf):
super(OVSInterfaceDriver, self).__init__(conf)
def _get_tap_name(self, dev_name, prefix=None):
if self.conf.ovs_use_veth:
- dev_name = dev_name.replace(prefix or self.DEV_NAME_PREFIX, 'tap')
+ dev_name = dev_name.replace(prefix or self.DEV_NAME_PREFIX,
+ n_const.TAP_DEVICE_PREFIX)
return dev_name
def _ovs_add_port(self, bridge, device_name, port_id, mac_address,
self.root_helper,
namespace=namespace):
ip = ip_lib.IPWrapper(self.root_helper)
- tap_name = device_name.replace(prefix or 'tap', 'tap')
+ tap_name = device_name.replace(prefix or n_const.TAP_DEVICE_PREFIX,
+ n_const.TAP_DEVICE_PREFIX)
# Create ns_dev in a namespace if one is configured.
root_dev, ns_dev = ip.add_veth(tap_name, device_name,
class IVSInterfaceDriver(LinuxInterfaceDriver):
"""Driver for creating an internal interface on an IVS bridge."""
- DEV_NAME_PREFIX = 'tap'
+ DEV_NAME_PREFIX = n_const.TAP_DEVICE_PREFIX
def __init__(self, conf):
super(IVSInterfaceDriver, self).__init__(conf)
self.DEV_NAME_PREFIX = 'ns-'
def _get_tap_name(self, dev_name, prefix=None):
- dev_name = dev_name.replace(prefix or self.DEV_NAME_PREFIX, 'tap')
+ dev_name = dev_name.replace(prefix or self.DEV_NAME_PREFIX,
+ n_const.TAP_DEVICE_PREFIX)
return dev_name
def _ivs_add_port(self, device_name, port_id, mac_address):
ip = ip_lib.IPWrapper(self.root_helper)
# Enable agent to define the prefix
- if prefix:
- tap_name = device_name.replace(prefix, 'tap')
- else:
- tap_name = device_name.replace(self.DEV_NAME_PREFIX, 'tap')
+ tap_name = device_name.replace(prefix or self.DEV_NAME_PREFIX,
+ n_const.TAP_DEVICE_PREFIX)
# Create ns_veth in a namespace if one is configured.
root_veth, ns_veth = ip.add_veth(tap_name, device_name,
namespace2=namespace)
class OVSHybridIptablesFirewallDriver(IptablesFirewallDriver):
- OVS_HYBRID_TAP_PREFIX = 'tap'
+ OVS_HYBRID_TAP_PREFIX = constants.TAP_DEVICE_PREFIX
def _port_chain_name(self, port, direction):
return iptables_manager.get_chain_name(
# Linux interface max length
DEVICE_NAME_MAX_LEN = 15
+
+# Device names start with "tap"
+TAP_DEVICE_PREFIX = 'tap'
class SecurityGroupServerRpcMixin(sg_db_rpc.SecurityGroupServerRpcMixin):
def get_port_from_device(self, device):
- port_id = re.sub(r"^tap", "", device)
+ port_id = re.sub(r"^%s" % const.TAP_DEVICE_PREFIX, "", device)
port = self.get_port_and_sgs(port_id)
if port:
port['device'] = device
PLUGIN_VERSION = 0.88
AGENT_OWNER_PREFIX = "network:"
NOS_DRIVER = 'neutron.plugins.brocade.nos.nosdriver.NOSdriver'
-TAP_PREFIX_LEN = 3
SWITCH_OPTS = [cfg.StrOpt('address', default='',
help=_('The address of the host to SSH to')),
device = kwargs.get('device')
LOG.debug(_("Device %(device)s details requested from %(agent_id)s"),
{'device': device, 'agent_id': agent_id})
- port = brocade_db.get_port(rpc_context, device[TAP_PREFIX_LEN:])
+ port = brocade_db.get_port(rpc_context,
+ device[len(q_const.TAP_DEVICE_PREFIX):])
if port:
entry = {'device': device,
'vlan_id': port.vlan_id,
# Doing what other plugins are doing
session = db.get_session()
port = brocade_db.get_port_from_device(
- session, device[TAP_PREFIX_LEN:])
+ session, device[len(q_const.TAP_DEVICE_PREFIX):])
# TODO(shiv): need to extend the db model to include device owners
# make it appears that the device owner is of type network
LOG = logging.getLogger(__name__)
BRIDGE_NAME_PREFIX = "brq"
-TAP_INTERFACE_PREFIX = "tap"
BRIDGE_FS = "/sys/devices/virtual/net/"
BRIDGE_NAME_PLACEHOLDER = "bridge_name"
BRIDGE_INTERFACES_FS = BRIDGE_FS + BRIDGE_NAME_PLACEHOLDER + "/brif/"
if not interface_id:
LOG.warning(_("Invalid Interface ID, will lead to incorrect "
"tap device name"))
- tap_device_name = TAP_INTERFACE_PREFIX + interface_id[0:11]
+ tap_device_name = constants.TAP_DEVICE_PREFIX + interface_id[0:11]
return tap_device_name
def get_vxlan_device_name(self, segmentation_id):
try:
if_list = os.listdir(bridge_interface_path)
return len([interface for interface in if_list if
- interface.startswith(TAP_INTERFACE_PREFIX)])
+ interface.startswith(constants.TAP_DEVICE_PREFIX)])
except OSError:
return 0
def get_tap_devices(self):
devices = set()
for device in os.listdir(BRIDGE_FS):
- if device.startswith(TAP_INTERFACE_PREFIX):
+ if device.startswith(constants.TAP_DEVICE_PREFIX):
devices.add(device)
return devices
LOG = logging.getLogger(__name__)
-# Device names start with "tap"
-TAP_PREFIX_LEN = 3
-
class LinuxBridgeRpcCallbacks(n_rpc.RpcCallback):
+ # Device names start with "tap"
# history
# 1.1 Support Security Group RPC
# 1.2 Support get_devices_details_list
@classmethod
def get_port_from_device(cls, device):
- port = db.get_port_from_device(device[TAP_PREFIX_LEN:])
+ port = db.get_port_from_device(device[len(q_const.TAP_DEVICE_PREFIX):])
if port:
port['device'] = device
return port
# providernet.py?
TYPE_MULTI_SEGMENT = 'multi-segment'
-TAP_DEVICE_PREFIX = 'tap'
-TAP_DEVICE_PREFIX_LENGTH = 3
-
class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
dvr_mac_db.DVRDbMixin,
# REVISIT(rkukura): Consider calling into MechanismDrivers to
# process device names, or having MechanismDrivers supply list
# of device prefixes to strip.
- if device.startswith(TAP_DEVICE_PREFIX):
- return device[TAP_DEVICE_PREFIX_LENGTH:]
+ if device.startswith(const.TAP_DEVICE_PREFIX):
+ return device[len(const.TAP_DEVICE_PREFIX):]
else:
# REVISIT(irenab): Consider calling into bound MD to
# handle the get_device_details RPC, then remove the 'else' clause
LOG = logging.getLogger(__name__)
-#to be compatible with Linux Bridge Agent on Network Node
-TAP_PREFIX_LEN = 3
-
class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
external_net_db.External_net_db_mixin,
services get device either by linux bridge plugin
device name convention or by mac address
"""
- port = db.get_port_from_device(device[TAP_PREFIX_LEN:])
+ port = db.get_port_from_device(
+ device[len(q_const.TAP_DEVICE_PREFIX):])
if port:
port['device'] = device
else:
# License for the specific language governing permissions and limitations
# under the License.
+from neutron.common import constants as n_const
+
class OFPort(object):
def __init__(self, port_name, ofport):
PORT_NAME_LEN = 14
PORT_NAME_PREFIXES = [
- "tap", # common cases, including ovs_use_veth=True
+ n_const.TAP_DEVICE_PREFIX, # common cases, including ovs_use_veth=True
"qvo", # nova hybrid interface driver
"qr-", # l3-agent INTERNAL_DEV_PREFIX (ovs_use_veth=False)
"qg-", # l3-agent EXTERNAL_DEV_PREFIX (ovs_use_veth=False)
use "tap" prefix throughout the agent and plugin for simplicity.
Some care should be taken when talking to the switch.
"""
- return ("tap" + interface_id)[0:PORT_NAME_LEN]
+ return (n_const.TAP_DEVICE_PREFIX + interface_id)[0:PORT_NAME_LEN]
def _normalize_port_name(name):
"""
for pref in PORT_NAME_PREFIXES:
if name.startswith(pref):
- return "tap" + name[len(pref):]
+ return n_const.TAP_DEVICE_PREFIX + name[len(pref):]
return name
def test_get_tap_device_name(self):
if_id = "123456789101112"
self.assertEqual(self.lbm.get_tap_device_name(if_id),
- "tap" + if_id[0:11])
+ constants.TAP_DEVICE_PREFIX + if_id[0:11])
if_id = ""
self.assertEqual(self.lbm.get_tap_device_name(if_id),
- "tap")
+ constants.TAP_DEVICE_PREFIX)
def test_get_vxlan_device_name(self):
vn_id = constants.MAX_VXLAN_VNI
import mock
+from neutron.common import constants as n_const
from neutron.plugins.ofagent.agent import ports
from neutron.tests import base
self.assertFalse(p2.is_neutron_port())
def test_neutron_port(self):
- for pref in ['qvo', 'qr-', 'qg-', 'tap']:
+ for pref in ['qvo', 'qr-', 'qg-', n_const.TAP_DEVICE_PREFIX]:
name = pref + '03b9a237-0b'
p1 = ports.Port(port_name=name, ofport=999)
ryu_ofp_port = mock.Mock(port_no=999)