import eventlet
from oslo.config import cfg
-import pyudev
from neutron.agent import l2population_rpc as l2pop_rpc
from neutron.agent.linux import ip_lib
# Store network mapping to segments
self.network_map = {}
- self.udev = pyudev.Context()
- monitor = pyudev.Monitor.from_netlink(self.udev)
- monitor.filter_by('net')
-
def device_exists(self, device):
"""Check if ethernet device exists."""
try:
LOG.debug(_("Done deleting vxlan interface %s"), interface)
def update_devices(self, registered_devices):
- devices = self.udev_get_tap_devices()
+ devices = self.get_tap_devices()
if devices == registered_devices:
return
added = devices - registered_devices
'added': added,
'removed': removed}
- def udev_get_tap_devices(self):
+ def get_tap_devices(self):
devices = set()
- for device in self.udev.list_devices(subsystem='net'):
- name = self.udev_get_name(device)
- if self.is_tap_device(name):
- devices.add(name)
+ for device in os.listdir(BRIDGE_FS):
+ if device.startswith(TAP_INTERFACE_PREFIX):
+ devices.add(device)
return devices
- def is_tap_device(self, name):
- return name.startswith(TAP_INTERFACE_PREFIX)
-
- def udev_get_name(self, device):
- return device.sys_name
-
def check_vxlan_support(self):
kernel_version = dist_version.LooseVersion(platform.release())
if cfg.CONF.VXLAN.l2_population and (
# Check port exists on node
port = kwargs.get('port')
tap_device_name = self.agent.br_mgr.get_tap_device_name(port['id'])
- devices = self.agent.br_mgr.udev_get_tap_devices()
+ devices = self.agent.br_mgr.get_tap_devices()
if tap_device_name not in devices:
return
def _report_state(self):
try:
- devices = len(self.br_mgr.udev_get_tap_devices())
+ devices = len(self.br_mgr.get_tap_devices())
self.agent_state.get('configurations')['devices'] = devices
self.state_rpc.report_state(self.context,
self.agent_state)
self.assertTrue(exec_fn.called)
def test_update_devices(self):
- with mock.patch.object(self.lbm, "udev_get_tap_devices") as gt_fn:
+ with mock.patch.object(self.lbm, "get_tap_devices") as gt_fn:
gt_fn.return_value = set(["dev1"])
self.assertIsNone(self.lbm.update_devices(set(["dev1"])))
mock.patch.object(self.lb_rpc.agent.br_mgr,
"get_tap_device_name"),
mock.patch.object(self.lb_rpc.agent.br_mgr,
- "udev_get_tap_devices"),
+ "get_tap_devices"),
mock.patch.object(self.lb_rpc.agent.br_mgr,
"get_bridge_name"),
mock.patch.object(self.lb_rpc.agent.br_mgr,
"plugin_rpc", create=True),
mock.patch.object(self.lb_rpc.sg_agent,
"refresh_firewall", create=True)
- ) as (get_tap_fn, udev_fn, getbr_fn, remif_fn,
+ ) as (get_tap_fn, get_tap_devs_fn, getbr_fn, remif_fn,
addif_fn, rpc_obj, reffw_fn):
get_tap_fn.return_value = "tap123"
- udev_fn.return_value = ["tap123", "tap124"]
+ get_tap_devs_fn.return_value = set(["tap123", "tap124"])
port = {"admin_state_up": True,
"id": "1234-5678",
"network_id": "123-123"}
mock.patch.object(self.lb_rpc.agent.br_mgr,
"get_tap_device_name"),
mock.patch.object(self.lb_rpc.agent.br_mgr,
- "udev_get_tap_devices"),
+ "get_tap_devices"),
mock.patch.object(self.lb_rpc.agent.br_mgr,
"get_bridge_name"),
mock.patch.object(self.lb_rpc.agent.br_mgr,
mock.patch.object(self.lb_rpc.agent,
"plugin_rpc", create=True),
mock.patch.object(linuxbridge_neutron_agent.LOG, 'error'),
- ) as (get_tap_fn, udev_fn, _, _, _, _, plugin_rpc, log):
+ ) as (get_tap_fn, get_tap_devs_fn, _, _, _, _, plugin_rpc, log):
get_tap_fn.return_value = "tap123"
- udev_fn.return_value = ["tap123", "tap124"]
+ get_tap_devs_fn.return_value = set(["tap123", "tap124"])
port = {"admin_state_up": True,
"id": "1234-5678",
"network_id": "123-123"}