from neutron.common import topics
from neutron.common import utils as q_utils
from neutron import context
-from neutron.extensions import securitygroup as ext_sg
from neutron.openstack.common import log as logging
from neutron.openstack.common import loopingcall
-from neutron.openstack.common.rpc import common as rpc_common
from neutron.openstack.common.rpc import dispatcher
from neutron.plugins.common import constants as p_const
from neutron.plugins.openvswitch.common import config # noqa
self.sg_agent = OVSSecurityGroupAgent(self.context,
self.plugin_rpc,
root_helper)
+ # Stores port update notifications for processing in main rpc loop
+ self.updated_ports = set()
# Initialize iteration counter
self.iter_num = 0
LOG.debug(_("Network %s not used on agent."), network_id)
def port_update(self, context, **kwargs):
- LOG.debug(_("port_update received"))
port = kwargs.get('port')
- # Validate that port is on OVS
- vif_port = self.int_br.get_vif_port_by_id(port['id'])
- if not vif_port:
- return
-
- if ext_sg.SECURITYGROUPS in port:
- self.sg_agent.refresh_firewall()
- network_type = kwargs.get('network_type')
- segmentation_id = kwargs.get('segmentation_id')
- physical_network = kwargs.get('physical_network')
- self.treat_vif_port(vif_port, port['id'], port['network_id'],
- network_type, physical_network,
- segmentation_id, port['admin_state_up'])
- try:
- if port['admin_state_up']:
- # update plugin about port status
- self.plugin_rpc.update_device_up(self.context, port['id'],
- self.agent_id,
- cfg.CONF.host)
- else:
- # update plugin about port status
- self.plugin_rpc.update_device_down(self.context, port['id'],
- self.agent_id,
- cfg.CONF.host)
- except rpc_common.Timeout:
- LOG.error(_("RPC timeout while updating port %s"), port['id'])
+ # Put the port identifier in the updated_ports set.
+ # Even if full port details might be provided to this call,
+ # they are not used since there is no guarantee the notifications
+ # are processed in the same order as the relevant API requests
+ self.updated_ports.add(port['id'])
+ LOG.debug(_("port_update message processed for port %s"), port['id'])
def tunnel_update(self, context, **kwargs):
LOG.debug(_("tunnel_update received"))
int_veth.link.set_mtu(self.veth_mtu)
phys_veth.link.set_mtu(self.veth_mtu)
- def update_ports(self, registered_ports):
- ports = self.int_br.get_vif_port_set()
- if ports == registered_ports:
- return
- self.int_br_device_count = len(ports)
- added = ports - registered_ports
- removed = registered_ports - ports
- return {'current': ports,
- 'added': added,
- 'removed': removed}
+ def scan_ports(self, registered_ports, updated_ports=None):
+ cur_ports = self.int_br.get_vif_port_set()
+ self.int_br_device_count = len(cur_ports)
+ port_info = {'current': cur_ports}
+ if updated_ports:
+ # Some updated ports might have been removed in the
+ # meanwhile, and therefore should not be processed.
+ # In this case the updated port won't be found among
+ # current ports.
+ updated_ports &= cur_ports
+ if updated_ports:
+ port_info['updated'] = updated_ports
+
+ # FIXME(salv-orlando): It's not really necessary to return early
+ # if nothing has changed.
+ if cur_ports == registered_ports:
+ # No added or removed ports to set, just return here
+ return port_info
+
+ port_info['added'] = cur_ports - registered_ports
+ # Remove all the known ports not found on the integration bridge
+ port_info['removed'] = registered_ports - cur_ports
+ return port_info
def update_ancillary_ports(self, registered_ports):
ports = set()
self.tun_br.delete_port(port_name)
self.tun_br_ofports[tunnel_type].pop(remote_ip, None)
- def treat_devices_added(self, devices):
+ def treat_devices_added_or_updated(self, devices):
resync = False
- self.sg_agent.prepare_devices_filter(devices)
for device in devices:
- LOG.info(_("Port %s added"), device)
+ LOG.debug(_("Processing port:%s"), device)
try:
+ # TODO(salv-orlando): Provide bulk API for retrieving
+ # details for all devices in one call
details = self.plugin_rpc.get_device_details(self.context,
device,
self.agent_id)
details['physical_network'],
details['segmentation_id'],
details['admin_state_up'])
-
# update plugin about port status
- self.plugin_rpc.update_device_up(self.context,
- device,
- self.agent_id,
- cfg.CONF.host)
+ if details.get('admin_state_up'):
+ LOG.debug(_("Setting status for %s to UP"), device)
+ self.plugin_rpc.update_device_up(
+ self.context, device, self.agent_id, cfg.CONF.host)
+ else:
+ LOG.debug(_("Setting status for %s to DOWN"), device)
+ self.plugin_rpc.update_device_down(
+ self.context, device, self.agent_id, cfg.CONF.host)
else:
LOG.debug(_("Device %s not defined on plugin"), device)
if (port and int(port.ofport) != -1):
def process_network_ports(self, port_info):
resync_a = False
resync_b = False
- if 'added' in port_info:
+ # If there is an exception while processing security groups ports
+ # will not be wired anyway, and a resync will be triggered
+ self.sg_agent.prepare_devices_filter(port_info.get('added', set()))
+ # TODO(salv-orlando): Optimize by avoiding unnecessary applying
+ # filters twice to the same ports, and unnecessary calls to the
+ # plugin (eg: when there are no IP address changes)
+ if port_info.get('updated'):
+ self.sg_agent.refresh_firewall()
+ # VIF wiring needs to be performed always for 'new' devices.
+ # For updated ports, re-wiring is not needed in most cases, but needs
+ # to be performed anyway when the admin state of a device is changed.
+ # TODO(salv-orlando): Optimize for avoiding unnecessary VIF
+ # processing for updated ports whose admin state is left unchanged
+ # A device might be both in the 'added' and 'updated'
+ # list at the same time; avoid processing it twice.
+ devices_added_updated = (port_info.get('added', set()) |
+ port_info.get('updated', set()))
+ if devices_added_updated:
start = time.time()
- resync_a = self.treat_devices_added(port_info['added'])
+ resync_a = self.treat_devices_added_or_updated(
+ devices_added_updated)
LOG.debug(_("process_network_ports - iteration:%(iter_num)d -"
- "treat_devices_added completed in %(elapsed).3f"),
+ "treat_devices_added_or_updated completed "
+ "in %(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
if 'removed' in port_info:
resync = True
return resync
+ def _agent_has_updates(self, polling_manager):
+ return (polling_manager.is_polling_required or
+ self.updated_ports)
+
+ def _port_info_has_changes(self, port_info):
+ return (port_info.get('added') or
+ port_info.get('removed') or
+ port_info.get('updated'))
+
def rpc_loop(self, polling_manager=None):
if not polling_manager:
polling_manager = polling.AlwaysPoll()
sync = True
ports = set()
+ updated_ports_copy = set()
ancillary_ports = set()
tunnel_sync = True
while True:
- try:
- start = time.time()
- port_stats = {'regular': {'added': 0, 'removed': 0},
- 'ancillary': {'added': 0, 'removed': 0}}
- LOG.debug(_("Agent rpc_loop - iteration:%d started"),
- self.iter_num)
- if sync:
- LOG.info(_("Agent out of sync with plugin!"))
- ports.clear()
- ancillary_ports.clear()
- sync = False
- polling_manager.force_polling()
-
- # Notify the plugin of tunnel IP
- if self.enable_tunneling and tunnel_sync:
- LOG.info(_("Agent tunnel out of sync with plugin!"))
+ start = time.time()
+ port_stats = {'regular': {'added': 0,
+ 'updated': 0,
+ 'removed': 0},
+ 'ancillary': {'added': 0,
+ 'removed': 0}}
+ LOG.debug(_("Agent rpc_loop - iteration:%d started"),
+ self.iter_num)
+ if sync:
+ LOG.info(_("Agent out of sync with plugin!"))
+ ports.clear()
+ ancillary_ports.clear()
+ sync = False
+ polling_manager.force_polling()
+ # Notify the plugin of tunnel IP
+ if self.enable_tunneling and tunnel_sync:
+ LOG.info(_("Agent tunnel out of sync with plugin!"))
+ try:
tunnel_sync = self.tunnel_sync()
- if polling_manager.is_polling_required:
+ except Exception:
+ LOG.exception(_("Error while synchronizing tunnels"))
+ tunnel_sync = True
+ if self._agent_has_updates(polling_manager):
+ try:
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
"starting polling. Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
- port_info = self.update_ports(ports)
+ # Save updated ports dict to perform rollback in
+ # case resync would be needed, and then clear
+ # self.updated_ports. As the greenthread should not yield
+ # between these two statements, this will be thread-safe
+ updated_ports_copy = self.updated_ports
+ self.updated_ports = set()
+ port_info = self.scan_ports(ports, updated_ports_copy)
+ ports = port_info['current']
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d - "
"port information retrieved. "
"Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
# notify plugin about port deltas
- if port_info:
- LOG.debug(_("Agent loop has new devices!"))
+ if self._port_info_has_changes(port_info):
+ LOG.debug(_("Starting to process devices in:%s"),
+ port_info)
# If treat devices fails - must resync with plugin
sync = self.process_network_ports(port_info)
LOG.debug(_("Agent rpc_loop - iteration:%(iter_num)d -"
"ports processed. Elapsed:%(elapsed).3f"),
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
- ports = port_info['current']
port_stats['regular']['added'] = (
len(port_info.get('added', [])))
+ port_stats['regular']['updated'] = (
+ len(port_info.get('updated', [])))
port_stats['regular']['removed'] = (
len(port_info.get('removed', [])))
# Treat ancillary devices if they exist
sync = sync | rc
polling_manager.polling_completed()
-
- except Exception:
- LOG.exception(_("Error in agent event loop"))
- sync = True
- tunnel_sync = True
+ except Exception:
+ LOG.exception(_("Error while processing VIF ports"))
+ # Put the ports back in self.updated_port
+ self.updated_ports |= updated_ports_copy
+ sync = True
# sleep till end of polling interval
elapsed = (time.time() - start)
from neutron.agent.linux import ovs_lib
from neutron.agent.linux import utils
from neutron.common import constants as n_const
-from neutron.openstack.common.rpc import common as rpc_common
from neutron.plugins.common import constants as p_const
from neutron.plugins.openvswitch.agent import ovs_neutron_agent
from neutron.plugins.openvswitch.common import constants
self.agent.port_dead(mock.Mock())
self.assertTrue(add_flow_func.called)
- def mock_update_ports(self, vif_port_set=None, registered_ports=None):
+ def mock_scan_ports(self, vif_port_set=None, registered_ports=None,
+ updated_ports=None):
with mock.patch.object(self.agent.int_br, 'get_vif_port_set',
return_value=vif_port_set):
- return self.agent.update_ports(registered_ports)
+ return self.agent.scan_ports(registered_ports, updated_ports)
- def test_update_ports_returns_none_for_unchanged_ports(self):
- self.assertIsNone(self.mock_update_ports())
+ def test_scan_ports_returns_current_only_for_unchanged_ports(self):
+ vif_port_set = set([1, 3])
+ registered_ports = set([1, 3])
+ expected = {'current': vif_port_set}
+ actual = self.mock_scan_ports(vif_port_set, registered_ports)
+ self.assertEqual(expected, actual)
- def test_update_ports_returns_port_changes(self):
+ def test_scan_ports_returns_port_changes(self):
vif_port_set = set([1, 3])
registered_ports = set([1, 2])
expected = dict(current=vif_port_set, added=set([3]), removed=set([2]))
- actual = self.mock_update_ports(vif_port_set, registered_ports)
+ actual = self.mock_scan_ports(vif_port_set, registered_ports)
+ self.assertEqual(expected, actual)
+
+ def _test_scan_ports_with_updated_ports(self, updated_ports):
+ vif_port_set = set([1, 3, 4])
+ registered_ports = set([1, 2, 4])
+ expected = dict(current=vif_port_set, added=set([3]),
+ removed=set([2]), updated=set([4]))
+ actual = self.mock_scan_ports(vif_port_set, registered_ports,
+ updated_ports)
+ self.assertEqual(expected, actual)
+
+ def test_scan_ports_finds_known_updated_ports(self):
+ self._test_scan_ports_with_updated_ports(set([4]))
+
+ def test_scan_ports_ignores_unknown_updated_ports(self):
+ # the port '5' was not seen on current ports. Hence it has either
+ # never been wired or already removed and should be ignored
+ self._test_scan_ports_with_updated_ports(set([4, 5]))
+
+ def test_scan_ports_ignores_updated_port_if_removed(self):
+ vif_port_set = set([1, 3])
+ registered_ports = set([1, 2])
+ updated_ports = set([1, 2])
+ expected = dict(current=vif_port_set, added=set([3]),
+ removed=set([2]), updated=set([1]))
+ actual = self.mock_scan_ports(vif_port_set, registered_ports,
+ updated_ports)
+ self.assertEqual(expected, actual)
+
+ def test_scan_ports_no_vif_changes_returns_updated_port_only(self):
+ vif_port_set = set([1, 2, 3])
+ registered_ports = set([1, 2, 3])
+ updated_ports = set([2])
+ expected = dict(current=vif_port_set, updated=set([2]))
+ actual = self.mock_scan_ports(vif_port_set, registered_ports,
+ updated_ports)
self.assertEqual(expected, actual)
def test_treat_devices_added_returns_true_for_missing_device(self):
with mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
side_effect=Exception()):
- self.assertTrue(self.agent.treat_devices_added([{}]))
+ self.assertTrue(self.agent.treat_devices_added_or_updated([{}]))
- def _mock_treat_devices_added(self, details, port, func_name):
- """Mock treat devices added.
+ def _mock_treat_devices_added_updated(self, details, port, func_name):
+ """Mock treat devices added or updated.
:param details: the details to return for the device
:param port: the port that get_vif_port_by_id should return
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
return_value=port),
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
+ mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
mock.patch.object(self.agent, func_name)
- ) as (get_dev_fn, get_vif_func, upd_dev_up, func):
- self.assertFalse(self.agent.treat_devices_added([{}]))
+ ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func):
+ self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
return func.called
- def test_treat_devices_added_ignores_invalid_ofport(self):
+ def test_treat_devices_added_updated_ignores_invalid_ofport(self):
port = mock.Mock()
port.ofport = -1
- self.assertFalse(self._mock_treat_devices_added(mock.MagicMock(), port,
- 'port_dead'))
+ self.assertFalse(self._mock_treat_devices_added_updated(
+ mock.MagicMock(), port, 'port_dead'))
- def test_treat_devices_added_marks_unknown_port_as_dead(self):
+ def test_treat_devices_added_updated_marks_unknown_port_as_dead(self):
port = mock.Mock()
port.ofport = 1
- self.assertTrue(self._mock_treat_devices_added(mock.MagicMock(), port,
- 'port_dead'))
+ self.assertTrue(self._mock_treat_devices_added_updated(
+ mock.MagicMock(), port, 'port_dead'))
- def test_treat_devices_added_updates_known_port(self):
+ def test_treat_devices_added_updated_updates_known_port(self):
details = mock.MagicMock()
details.__contains__.side_effect = lambda x: True
- self.assertTrue(self._mock_treat_devices_added(details,
- mock.Mock(),
- 'treat_vif_port'))
+ self.assertTrue(self._mock_treat_devices_added_updated(
+ details, mock.Mock(), 'treat_vif_port'))
+
+ def test_treat_devices_added_updated_put_port_down(self):
+ fake_details_dict = {'admin_state_up': False,
+ 'port_id': 'xxx',
+ 'device': 'xxx',
+ 'network_id': 'yyy',
+ 'physical_network': 'foo',
+ 'segmentation_id': 'bar',
+ 'network_type': 'baz'}
+ with contextlib.nested(
+ mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
+ return_value=fake_details_dict),
+ mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
+ return_value=mock.MagicMock()),
+ mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
+ mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
+ mock.patch.object(self.agent, 'treat_vif_port')
+ ) as (get_dev_fn, get_vif_func, upd_dev_up,
+ upd_dev_down, treat_vif_port):
+ self.assertFalse(self.agent.treat_devices_added_or_updated([{}]))
+ self.assertTrue(treat_vif_port.called)
+ self.assertTrue(upd_dev_down.called)
def test_treat_devices_removed_returns_true_for_missing_device(self):
with mock.patch.object(self.agent.plugin_rpc, 'update_device_down',
def test_treat_devices_removed_ignores_missing_port(self):
self._mock_treat_devices_removed(False)
+ def _test_process_network_ports(self, port_info):
+ with contextlib.nested(
+ mock.patch.object(self.agent.sg_agent, "prepare_devices_filter"),
+ mock.patch.object(self.agent.sg_agent, "refresh_firewall"),
+ mock.patch.object(self.agent, "treat_devices_added_or_updated",
+ return_value=False),
+ mock.patch.object(self.agent, "treat_devices_removed",
+ return_value=False)
+ ) as (prep_dev_filter, refresh_fw,
+ device_added_updated, device_removed):
+ self.assertFalse(self.agent.process_network_ports(port_info))
+ prep_dev_filter.assert_called_once_with(port_info['added'])
+ if port_info.get('updated'):
+ self.assertEqual(1, refresh_fw.call_count)
+ device_added_updated.assert_called_once_with(
+ port_info['added'] | port_info.get('updated', set()))
+ device_removed.assert_called_once_with(port_info['removed'])
+
def test_process_network_ports(self):
- reply = {'current': set(['tap0']),
- 'removed': set(['eth0']),
- 'added': set(['eth1'])}
- with mock.patch.object(self.agent, 'treat_devices_added',
- return_value=False) as device_added:
- with mock.patch.object(self.agent, 'treat_devices_removed',
- return_value=False) as device_removed:
- self.assertFalse(self.agent.process_network_ports(reply))
- self.assertTrue(device_added.called)
- self.assertTrue(device_removed.called)
+ self._test_process_network_ports(
+ {'current': set(['tap0']),
+ 'removed': set(['eth0']),
+ 'added': set(['eth1'])})
+
+ def test_process_network_port_with_updated_ports(self):
+ self._test_process_network_ports(
+ {'current': set(['tap0', 'tap1']),
+ 'updated': set(['tap1', 'eth1']),
+ 'removed': set(['eth0']),
+ 'added': set(['eth1'])})
def test_report_state(self):
with mock.patch.object(self.agent.state_rpc,
recl_fn.assert_called_with("123")
def test_port_update(self):
- with contextlib.nested(
- mock.patch.object(self.agent.int_br, "get_vif_port_by_id"),
- mock.patch.object(self.agent, "treat_vif_port"),
- mock.patch.object(self.agent.plugin_rpc, "update_device_up"),
- mock.patch.object(self.agent.plugin_rpc, "update_device_down")
- ) as (getvif_fn, treatvif_fn, updup_fn, upddown_fn):
- port = {"id": "123",
- "network_id": "124",
- "admin_state_up": False}
- getvif_fn.return_value = "vif_port_obj"
- self.agent.port_update("unused_context",
- port=port,
- network_type="vlan",
- segmentation_id="1",
- physical_network="physnet")
- treatvif_fn.assert_called_with("vif_port_obj", "123",
- "124", "vlan", "physnet",
- "1", False)
- upddown_fn.assert_called_with(self.agent.context,
- "123", self.agent.agent_id,
- cfg.CONF.host)
-
- port["admin_state_up"] = True
- self.agent.port_update("unused_context",
- port=port,
- network_type="vlan",
- segmentation_id="1",
- physical_network="physnet")
- updup_fn.assert_called_with(self.agent.context,
- "123", self.agent.agent_id,
- cfg.CONF.host)
-
- def test_port_update_plugin_rpc_failed(self):
- port = {'id': 1,
- 'network_id': 1,
- 'admin_state_up': True}
- with contextlib.nested(
- mock.patch.object(ovs_neutron_agent.LOG, 'error'),
- mock.patch.object(self.agent.int_br, "get_vif_port_by_id"),
- mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
- mock.patch.object(self.agent, 'port_bound'),
- mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
- mock.patch.object(self.agent, 'port_dead')
- ) as (log, _, device_up, _, device_down, _):
- device_up.side_effect = rpc_common.Timeout
- self.agent.port_update(mock.Mock(), port=port)
- self.assertTrue(device_up.called)
- self.assertEqual(log.call_count, 1)
-
- log.reset_mock()
- port['admin_state_up'] = False
- device_down.side_effect = rpc_common.Timeout
- self.agent.port_update(mock.Mock(), port=port)
- self.assertTrue(device_down.called)
- self.assertEqual(log.call_count, 1)
+ port = {"id": "123",
+ "network_id": "124",
+ "admin_state_up": False}
+ self.agent.port_update("unused_context",
+ port=port,
+ network_type="vlan",
+ segmentation_id="1",
+ physical_network="physnet")
+ self.assertEqual(set(['123']), self.agent.updated_ports)
def test_setup_physical_bridges(self):
with contextlib.nested(