# ovs datapath types
OVS_DATAPATH_SYSTEM = 'system'
OVS_DATAPATH_NETDEV = 'netdev'
+
+MAX_DEVICE_RETRIES = 5
port_moves.append(name)
return port_moves
- def _get_port_info(self, registered_ports, cur_ports):
+ def _get_port_info(self, registered_ports, cur_ports,
+ readd_registered_ports):
port_info = {'current': cur_ports}
# FIXME(salv-orlando): It's not really necessary to return early
# if nothing has changed.
- if cur_ports == registered_ports:
- # No added or removed ports to set, just return here
+ if not readd_registered_ports and cur_ports == registered_ports:
return port_info
- port_info['added'] = cur_ports - registered_ports
- # Remove all the known ports not found on the integration bridge
+
+ if readd_registered_ports:
+ port_info['added'] = cur_ports
+ else:
+ port_info['added'] = cur_ports - registered_ports
+ # Update port_info with ports not found on the integration bridge
port_info['removed'] = registered_ports - cur_ports
return port_info
- def scan_ports(self, registered_ports, updated_ports=None):
+ def scan_ports(self, registered_ports, sync, updated_ports=None):
cur_ports = self.int_br.get_vif_port_set()
self.int_br_device_count = len(cur_ports)
- port_info = self._get_port_info(registered_ports, cur_ports)
+ port_info = self._get_port_info(registered_ports, cur_ports, sync)
if updated_ports is None:
updated_ports = set()
updated_ports.update(self.check_changed_vlans())
port_info['updated'] = updated_ports
return port_info
- def scan_ancillary_ports(self, registered_ports):
+ def scan_ancillary_ports(self, registered_ports, sync):
cur_ports = set()
for bridge in self.ancillary_brs:
cur_ports |= bridge.get_vif_port_set()
- return self._get_port_info(registered_ports, cur_ports)
+ return self._get_port_info(registered_ports, cur_ports, sync)
def check_changed_vlans(self):
"""Return ports which have lost their vlan tag.
ancillary_ports = set()
tunnel_sync = True
ovs_restarted = False
+ consecutive_resyncs = 0
while self._check_and_handle_signal():
port_info = {}
ancillary_port_info = {}
self.iter_num)
if sync:
LOG.info(_LI("Agent out of sync with plugin!"))
- ports.clear()
- ancillary_ports.clear()
- sync = False
polling_manager.force_polling()
+ consecutive_resyncs = consecutive_resyncs + 1
+ if consecutive_resyncs >= constants.MAX_DEVICE_RETRIES:
+ LOG.warn(_LW("Clearing cache of registered ports, retrials"
+ " to resync were > %s"),
+ constants.MAX_DEVICE_RETRIES)
+ ports.clear()
+ ancillary_ports.clear()
+ sync = False
+ consecutive_resyncs = 0
+ else:
+ consecutive_resyncs = 0
ovs_status = self.check_ovs_status()
if ovs_status == constants.OVS_RESTARTED:
self.setup_integration_br()
updated_ports_copy = self.updated_ports
self.updated_ports = set()
reg_ports = (set() if ovs_restarted else ports)
- port_info = self.scan_ports(reg_ports, updated_ports_copy)
+ port_info = self.scan_ports(reg_ports, sync,
+ updated_ports_copy)
self.process_deleted_ports(port_info)
ofport_changed_ports = self.update_stale_ofport_rules()
if ofport_changed_ports:
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
-
# Treat ancillary devices if they exist
if self.ancillary_brs:
ancillary_port_info = self.scan_ancillary_ports(
- ancillary_ports)
+ ancillary_ports, sync)
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
"ancillary port info retrieved. "
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
+ sync = False
# Secure and wire/unwire VIFs and update their status
# on Neutron server
if (self._port_info_has_changes(port_info) or
self._test_port_dead(self.mod_agent.DEAD_VLAN_TAG)
def mock_scan_ports(self, vif_port_set=None, registered_ports=None,
- updated_ports=None, port_tags_dict=None):
+ updated_ports=None, port_tags_dict=None, sync=False):
if port_tags_dict is None: # Because empty dicts evaluate as False.
port_tags_dict = {}
with mock.patch.object(self.agent.int_br,
mock.patch.object(self.agent.int_br,
'get_port_tag_dict',
return_value=port_tags_dict):
- return self.agent.scan_ports(registered_ports, updated_ports)
+ return self.agent.scan_ports(registered_ports, sync, updated_ports)
def test_scan_ports_returns_current_only_for_unchanged_ports(self):
vif_port_set = set([1, 3])
actual = self.mock_scan_ports(vif_port_set, registered_ports)
self.assertEqual(expected, actual)
+ def test_scan_ports_returns_port_changes_with_sync(self):
+ vif_port_set = set([1, 3])
+ registered_ports = set([1, 2])
+ expected = dict(current=vif_port_set, added=vif_port_set,
+ removed=set([2]))
+ actual = self.mock_scan_ports(vif_port_set, registered_ports,
+ sync=True)
+ self.assertEqual(expected, actual)
+
def _test_scan_ports_with_updated_ports(self, updated_ports):
vif_port_set = set([1, 3, 4])
registered_ports = set([1, 2, 4])
pass
scan_ports.assert_has_calls([
- mock.call(set(), set()),
- mock.call(set(), set())
+ mock.call(set(), True, set()),
+ mock.call(set(), False, set())
])
process_network_ports.assert_has_calls([
mock.call(reply2, False),
self._test_ancillary_bridges(bridges, ['br-ex1', 'br-ex2'])
def mock_scan_ancillary_ports(self, vif_port_set=None,
- registered_ports=None):
+ registered_ports=None, sync=False):
bridges = ['br-int', 'br-ex']
ancillary = ['br-ex']
return_value=vif_port_set):
self.agent = self.mod_agent.OVSNeutronAgent(self._bridge_classes(),
**self.kwargs)
- return self.agent.scan_ancillary_ports(registered_ports)
+ return self.agent.scan_ancillary_ports(registered_ports, sync)
def test_scan_ancillary_ports_returns_cur_only_for_unchanged_ports(self):
vif_port_set = set([1, 2])
actual = self.mock_scan_ancillary_ports(vif_port_set, registered_ports)
self.assertEqual(expected, actual)
+ def test_scan_ancillary_ports_returns_port_changes_with_sync(self):
+ vif_port_set = set([1, 3])
+ registered_ports = set([1, 2])
+ expected = dict(current=vif_port_set, added=vif_port_set,
+ removed=set([2]))
+ actual = self.mock_scan_ancillary_ports(vif_port_set, registered_ports,
+ sync=True)
+ self.assertEqual(expected, actual)
+
class AncillaryBridgesTestOFCtl(AncillaryBridgesTest,
ovs_test_base.OVSOFCtlTestBase):
log_exception.assert_called_once_with(
"Error while processing VIF ports")
scan_ports.assert_has_calls([
- mock.call(set(), set()),
- mock.call(set(['tap0']), set())
+ mock.call(set(), True, set()),
+ mock.call(set(['tap0']), False, set())
])
process_network_ports.assert_has_calls([
mock.call({'current': set(['tap0']),