from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import config as common_config
from neutron.common import constants as q_const
+from neutron.common import exceptions
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils as q_utils
DEAD_VLAN_TAG = str(q_const.MAX_VLAN_TAG + 1)
+class DeviceListRetrievalError(exceptions.NeutronException):
+ message = _("Unable to retrieve port details for devices: %(devices)s "
+ "because of error: %(error)s")
+
+
# A class to represent a VIF (i.e., a port that has 'iface-id' and 'vif-mac'
# attributes set).
class LocalVLANMapping:
self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
def treat_devices_added_or_updated(self, devices, ovs_restarted):
+ skipped_devices = []
try:
devices_details_list = self.plugin_rpc.get_devices_details_list(
self.context,
devices,
self.agent_id)
except Exception as e:
- LOG.debug("Unable to get port details for %(devices)s: %(e)s",
- {'devices': devices, 'e': e})
- # resync is needed
- return True
+ raise DeviceListRetrievalError(devices=devices, error=e)
for details in devices_details_list:
device = details['device']
LOG.debug("Processing port: %s", device)
port = self.int_br.get_vif_port_by_id(device)
if not port:
- # The port has disappeared and should not be processed
- # There is no need to put the port DOWN in the plugin as
- # it never went up in the first place
+ # The port disappeared and cannot be processed
LOG.info(_("Port %s was not found on the integration bridge "
"and will therefore not be processed"), device)
+ skipped_devices.append(device)
continue
if 'port_id' in details:
details['admin_state_up'],
ovs_restarted)
# update plugin about port status
+ # FIXME(salv-orlando): Failures while updating device status
+ # must be handled appropriately. Otherwise this might prevent
+ # neutron server from sending network-vif-* events to the nova
+ # API server, thus possibly preventing instance spawn.
if details.get('admin_state_up'):
LOG.debug(_("Setting status for %s to UP"), device)
self.plugin_rpc.update_device_up(
LOG.warn(_("Device %s not defined on plugin"), device)
if (port and port.ofport != -1):
self.port_dead(port)
- return False
+ return skipped_devices
def treat_ancillary_devices_added(self, devices):
try:
devices,
self.agent_id)
except Exception as e:
- LOG.debug("Unable to get port details for "
- "%(devices)s: %(e)s", {'devices': devices, 'e': e})
- # resync is needed
- return True
+ raise DeviceListRetrievalError(devices=devices, error=e)
for details in devices_details_list:
device = details['device']
device,
self.agent_id,
cfg.CONF.host)
- return False
def treat_devices_removed(self, devices):
resync = False
port_info.get('updated', set()))
if devices_added_updated:
start = time.time()
- resync_a = self.treat_devices_added_or_updated(
- devices_added_updated, ovs_restarted)
- LOG.debug(_("process_network_ports - iteration:%(iter_num)d -"
- "treat_devices_added_or_updated completed "
- "in %(elapsed).3f"),
- {'iter_num': self.iter_num,
- 'elapsed': time.time() - start})
+ try:
+ skipped_devices = self.treat_devices_added_or_updated(
+ devices_added_updated, ovs_restarted)
+ LOG.debug(_("process_network_ports - iteration:%(iter_num)d -"
+ "treat_devices_added_or_updated completed. "
+ "Skipped %(num_skipped)d devices of "
+ "%(num_current)d devices currently available. "
+ "Time elapsed: %(elapsed).3f"),
+ {'iter_num': self.iter_num,
+ 'num_skipped': len(skipped_devices),
+ 'num_current': len(port_info['current']),
+ 'elapsed': time.time() - start})
+ # Update the list of current ports storing only those which
+ # have been actually processed.
+ port_info['current'] = (port_info['current'] -
+ set(skipped_devices))
+ except DeviceListRetrievalError:
+ # Need to resync as there was an error with server
+ # communication.
+ LOG.exception(_("process_network_ports - iteration:%d - "
+ "failure while retrieving port details "
+ "from server"), self.iter_num)
+ resync_a = True
if 'removed' in port_info:
start = time.time()
resync_b = self.treat_devices_removed(port_info['removed'])
resync_b = False
if 'added' in port_info:
start = time.time()
- resync_a = self.treat_ancillary_devices_added(port_info['added'])
- LOG.debug(_("process_ancillary_network_ports - iteration: "
- "%(iter_num)d - treat_ancillary_devices_added "
- "completed in %(elapsed).3f"),
- {'iter_num': self.iter_num,
- 'elapsed': time.time() - start})
+ try:
+ self.treat_ancillary_devices_added(port_info['added'])
+ LOG.debug(_("process_ancillary_network_ports - iteration: "
+ "%(iter_num)d - treat_ancillary_devices_added "
+ "completed in %(elapsed).3f"),
+ {'iter_num': self.iter_num,
+ 'elapsed': time.time() - start})
+ except DeviceListRetrievalError:
+ # Need to resync as there was an error with server
+ # communication.
+ LOG.exception(_("process_ancillary_network_ports - "
+ "iteration:%d - failure while retrieving "
+ "port details from server"), self.iter_num)
+ resync_a = True
if 'removed' in port_info:
start = time.time()
resync_b = self.treat_ancillary_devices_removed(
self.updated_ports = set()
reg_ports = (set() if ovs_restarted else ports)
port_info = self.scan_ports(reg_ports, updated_ports_copy)
- ports = port_info['current']
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
"port information retrieved. "
"Elapsed:%(elapsed).3f"),
len(port_info.get('updated', [])))
port_stats['regular']['removed'] = (
len(port_info.get('removed', [])))
+ ports = port_info['current']
# Treat ancillary devices if they exist
if self.ancillary_brs:
port_info = self.update_ancillary_ports(
vif_port_set, registered_ports, port_tags_dict=port_tags_dict)
self.assertEqual(expected, actual)
- def test_treat_devices_added_returns_true_for_missing_device(self):
+ def test_treat_devices_added_returns_raises_for_missing_device(self):
with contextlib.nested(
mock.patch.object(self.agent.plugin_rpc,
'get_devices_details_list',
side_effect=Exception()),
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
return_value=mock.Mock())):
- self.assertTrue(self.agent.treat_devices_added_or_updated([{}],
- False))
+ self.assertRaises(
+ ovs_neutron_agent.DeviceListRetrievalError,
+ self.agent.treat_devices_added_or_updated, [{}], False)
def _mock_treat_devices_added_updated(self, details, port, func_name):
"""Mock treat devices added or updated.
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
mock.patch.object(self.agent, func_name)
) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func):
- self.assertFalse(self.agent.treat_devices_added_or_updated([{}],
- False))
+ skip_devs = self.agent.treat_devices_added_or_updated([{}], False)
+ # The function should not raise
+ self.assertFalse(skip_devs)
return func.called
def test_treat_devices_added_updated_ignores_invalid_ofport(self):
) as (get_dev_fn, get_vif_func):
self.assertFalse(get_dev_fn.called)
- def test_treat_devices_added__updated_updates_known_port(self):
+ def test_treat_devices_added_updated_updates_known_port(self):
details = mock.MagicMock()
details.__contains__.side_effect = lambda x: True
self.assertTrue(self._mock_treat_devices_added_updated(
details, mock.Mock(), 'treat_vif_port'))
+ def test_treat_devices_added_updated_skips_if_port_not_found(self):
+ dev_mock = mock.MagicMock()
+ dev_mock.__getitem__.return_value = 'the_skipped_one'
+ with contextlib.nested(
+ mock.patch.object(self.agent.plugin_rpc,
+ 'get_devices_details_list',
+ return_value=[dev_mock]),
+ mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
+ return_value=None),
+ mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
+ mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
+ mock.patch.object(self.agent, 'treat_vif_port')
+ ) as (get_dev_fn, get_vif_func, upd_dev_up,
+ upd_dev_down, treat_vif_port):
+ skip_devs = self.agent.treat_devices_added_or_updated([{}], False)
+ # The function should return False for resync and no device
+ # processed
+ self.assertEqual(['the_skipped_one'], skip_devs)
+ self.assertFalse(treat_vif_port.called)
+ self.assertFalse(upd_dev_down.called)
+ self.assertFalse(upd_dev_up.called)
+
def test_treat_devices_added_updated_put_port_down(self):
fake_details_dict = {'admin_state_up': False,
'port_id': 'xxx',
mock.patch.object(self.agent, 'treat_vif_port')
) as (get_dev_fn, get_vif_func, upd_dev_up,
upd_dev_down, treat_vif_port):
- self.assertFalse(self.agent.treat_devices_added_or_updated([{}],
- False))
+ skip_devs = self.agent.treat_devices_added_or_updated([{}], False)
+ # The function should return False for resync
+ self.assertFalse(skip_devs)
self.assertTrue(treat_vif_port.called)
self.assertTrue(upd_dev_down.called)
with contextlib.nested(
mock.patch.object(self.agent.sg_agent, "setup_port_filters"),
mock.patch.object(self.agent, "treat_devices_added_or_updated",
- return_value=False),
+ return_value=[]),
mock.patch.object(self.agent, "treat_devices_removed",
return_value=False)
) as (setup_port_filters, device_added_updated, device_removed):