def vf_management_supported():
+ required_caps = (
+ ip_link_support.IpLinkConstants.IP_LINK_CAPABILITY_STATE,
+ ip_link_support.IpLinkConstants.IP_LINK_CAPABILITY_SPOOFCHK)
try:
vf_section = ip_link_support.IpLinkSupport.get_vf_mgmt_section()
- if not ip_link_support.IpLinkSupport.vf_mgmt_capability_supported(
- vf_section,
- ip_link_support.IpLinkConstants.IP_LINK_CAPABILITY_STATE):
- LOG.debug("ip link command does not support vf capability")
- return False
+ for cap in required_caps:
+ if not ip_link_support.IpLinkSupport.vf_mgmt_capability_supported(
+ vf_section, cap):
+ LOG.debug("ip link command does not support "
+ "vf capability '%(cap)s'", cap)
+ return False
except ip_link_support.UnsupportedIpLinkCommand:
LOG.exception(_LE("Unexpected exception while checking supported "
"ip link command"))
raise exc.InvalidPciSlotError(pci_slot=pci_slot)
return self.pci_dev_wrapper.set_vf_state(vf_index, state)
+ def set_device_spoofcheck(self, pci_slot, enabled):
+ """Set device spoofchecking
+
+ @param pci_slot: Virtual Function address
+ @param enabled: True to enable spoofcheck, False to disable
+ """
+ vf_index = self.pci_slot_map.get(pci_slot)
+ if vf_index is None:
+ raise exc.InvalidPciSlotError(pci_slot=pci_slot)
+ return self.pci_dev_wrapper.set_vf_spoofcheck(vf_index, enabled)
+
def get_pci_device(self, pci_slot):
"""Get mac address for given Virtual Function address
embedded_switch.set_device_state(pci_slot,
admin_state_up)
+ def set_device_spoofcheck(self, device_mac, pci_slot, enabled):
+ """Set device spoofcheck
+
+ Sets device spoofchecking (enabled or disabled)
+ @param device_mac: device mac
+ @param pci_slot: pci slot
+ @param enabled: device spoofchecking
+ """
+ embedded_switch = self._get_emb_eswitch(device_mac, pci_slot)
+ if embedded_switch:
+ embedded_switch.set_device_spoofcheck(pci_slot,
+ enabled)
+
def _discover_devices(self, device_mappings, exclude_devices):
"""Discover which Virtual functions to manage.
raise exc.IpCommandError(dev_name=self.dev_name,
reason=e)
+ def set_vf_spoofcheck(self, vf_index, enabled):
+ """sets vf spoofcheck
+
+ @param vf_index: vf index
+ @param enabled: True to enable spoof checking,
+ False to disable
+ """
+ setting = "on" if enabled else "off"
+
+ try:
+ self._as_root('', "link", ("set", self.dev_name, "vf",
+ str(vf_index), "spoofchk", setting))
+ except Exception as e:
+ raise exc.IpCommandError(dev_name=self.dev_name,
+ reason=str(e))
+
def _get_vf_link_show(self, vf_list, link_show_out):
"""Get link show output for VFs
from neutron.common import topics
from neutron.common import utils as n_utils
from neutron import context
-from neutron.i18n import _LE, _LI
+from neutron.i18n import _LE, _LI, _LW
from neutron.plugins.ml2.drivers.mech_sriov.agent.common import config # noqa
from neutron.plugins.ml2.drivers.mech_sriov.agent.common \
import exceptions as exc
# If one of the above operations fails => resync with plugin
return (resync_a | resync_b)
- def treat_device(self, device, pci_slot, admin_state_up):
+ def treat_device(self, device, pci_slot, admin_state_up, spoofcheck=True):
if self.eswitch_mgr.device_exists(device, pci_slot):
+ try:
+ self.eswitch_mgr.set_device_spoofcheck(device, pci_slot,
+ spoofcheck)
+ except Exception:
+ LOG.warning(_LW("Failed to set spoofcheck for device %s"),
+ device)
+ LOG.info(_LI("Device %(device)s spoofcheck %(spoofcheck)s"),
+ {"device": device, "spoofcheck": spoofcheck})
+
try:
self.eswitch_mgr.set_device_state(device, pci_slot,
admin_state_up)
LOG.info(_LI("Port %(device)s updated. Details: %(details)s"),
{'device': device, 'details': device_details})
profile = device_details['profile']
+ spoofcheck = device_details.get('port_security_enabled', True)
self.treat_device(device_details['device'],
profile.get('pci_slot'),
- device_details['admin_state_up'])
+ device_details['admin_state_up'],
+ spoofcheck)
else:
LOG.info(_LI("Device with MAC %s not defined on plugin"),
device)
TYPE := { vlan | veth | vcan | dummy | ifb | macvlan | can }
"""
+ IP_LINK_HELP_NO_SPOOFCHK = IP_LINK_HELP_NO_STATE
+
IP_LINK_HELP_NO_VF = """Usage: ip link set DEVICE { up | down |
arp { on | off } |
dynamic { on | off } |
expected=False,
stderr=self.IP_LINK_HELP_NO_STATE)
+ def test_vf_mgmt_no_spoofchk(self):
+ self._test_capability(
+ ip_link.IpLinkConstants.IP_LINK_CAPABILITY_SPOOFCHK,
+ expected=False,
+ stderr=self.IP_LINK_HELP_NO_SPOOFCHK)
+
def test_vf_mgmt_no_vf(self):
self._test_capability(
ip_link.IpLinkConstants.IP_LINK_CAPABILITY_STATE,
self.emb_switch.set_device_state,
self.WRONG_PCI_SLOT, True)
+ def test_set_device_spoofcheck_ok(self):
+ with mock.patch("neutron.plugins.ml2.drivers.mech_sriov.agent.pci_lib."
+ "PciDeviceIPWrapper.set_vf_spoofcheck") as \
+ set_vf_spoofcheck_mock:
+ self.emb_switch.set_device_spoofcheck(self.PCI_SLOT, True)
+ self.assertTrue(set_vf_spoofcheck_mock.called)
+
+ def test_set_device_spoofcheck_fail(self):
+ with mock.patch("neutron.plugins.ml2.drivers.mech_sriov.agent.pci_lib."
+ "PciDeviceIPWrapper.set_vf_spoofcheck"):
+ self.assertRaises(exc.InvalidPciSlotError,
+ self.emb_switch.set_device_spoofcheck,
+ self.WRONG_PCI_SLOT, True)
+
def test_get_pci_device(self):
with mock.patch("neutron.plugins.ml2.drivers.mech_sriov.agent.pci_lib."
"PciDeviceIPWrapper.get_assigned_macs",
self.pci_wrapper.set_vf_state,
self.VF_INDEX,
True)
+
+ def test_set_vf_spoofcheck(self):
+ with mock.patch.object(self.pci_wrapper, "_execute"):
+ result = self.pci_wrapper.set_vf_spoofcheck(self.VF_INDEX,
+ True)
+ self.assertIsNone(result)
+
+ def test_set_vf_spoofcheck_fail(self):
+ with mock.patch.object(self.pci_wrapper,
+ "_execute") as mock_exec:
+ mock_exec.side_effect = Exception()
+ self.assertRaises(exc.IpCommandError,
+ self.pci_wrapper.set_vf_spoofcheck,
+ self.VF_INDEX,
+ True)
'network_type': 'vlan',
'segmentation_id': 100,
'profile': {'pci_slot': '1:2:3.0'},
- 'physical_network': 'physnet1'}
+ 'physical_network': 'physnet1',
+ 'port_security_enabled': False}
agent.plugin_rpc = mock.Mock()
agent.plugin_rpc.get_devices_details_list.return_value = [mock_details]
agent.eswitch_mgr = mock.Mock()
agent.eswitch_mgr.device_exists.return_value = True
agent.set_device_state = mock.Mock()
+ agent.set_device_spoofcheck = mock.Mock()
resync_needed = agent.treat_devices_added_updated(
set(['aa:bb:cc:dd:ee:ff']))
'aa:bb:cc:dd:ee:ff',
'1:2:3.0',
True)
+ agent.eswitch_mgr.set_device_spoofcheck.assert_called_with(
+ 'aa:bb:cc:dd:ee:ff',
+ '1:2:3.0',
+ False)
self.assertTrue(agent.plugin_rpc.update_device_up.called)
def test_treat_devices_added_updated_admin_state_up_false(self):