from neutron.common import constants as constants
from neutron.common import ipv6_utils
from neutron.common import log
-from neutron.i18n import _LI
+from neutron.i18n import _LE, _LI
from neutron.services.metering.drivers import abstract_driver
continue
for label_id, label in rm.metering_labels.items():
- chain = iptables_manager.get_chain_name(WRAP_NAME + LABEL +
- label_id, wrap=False)
+ try:
+ chain = iptables_manager.get_chain_name(WRAP_NAME +
+ LABEL +
+ label_id,
+ wrap=False)
- chain_acc = rm.iptables_manager.get_traffic_counters(
- chain, wrap=False, zero=True)
+ chain_acc = rm.iptables_manager.get_traffic_counters(
+ chain, wrap=False, zero=True)
+ except RuntimeError:
+ LOG.exception(_LE('Failed to get traffic counters, '
+ 'router: %s'), router)
+ continue
if not chain_acc:
continue
wrap=False)]
self.v4filter_inst.assert_has_calls(calls)
+
+ def test_get_traffic_counters_with_missing_chain(self):
+ for r in TEST_ROUTERS:
+ rm = iptables_driver.RouterWithMetering(self.metering.conf, r)
+ rm.metering_labels = {r['_metering_labels'][0]['id']: 'fake'}
+ self.metering.routers[r['id']] = rm
+
+ mocked_method = self.iptables_cls.return_value.get_traffic_counters
+ mocked_method.side_effect = [{'pkts': 1, 'bytes': 8},
+ RuntimeError('Failed to find the chain')]
+
+ counters = self.metering.get_traffic_counters(None, TEST_ROUTERS)
+ expected_label_id = TEST_ROUTERS[0]['_metering_labels'][0]['id']
+ self.assertIn(expected_label_id, counters)
+ self.assertEqual(1, counters[expected_label_id]['pkts'])
+ self.assertEqual(8, counters[expected_label_id]['bytes'])