# under the License.
import binascii
+import collections
import netaddr
from oslo_log import log as logging
from neutron.common import constants as l3_constants
from neutron.common import exceptions
from neutron.common import utils as common_utils
-from neutron.i18n import _LE
+from neutron.i18n import _LE, _LW
LOG = logging.getLogger(__name__)
# xor-folding mask used for IPv6 rule index
MASK_30 = 0x3fffffff
+# Tracks the arp entry cache
+Arp_entry = collections.namedtuple(
+ 'Arp_entry', 'ip mac subnet_id operation')
+
class DvrLocalRouter(dvr_router_base.DvrRouterBase):
def __init__(self, agent, host, *args, **kwargs):
self.rtr_fip_subnet = None
self.dist_fip_count = None
self.fip_ns = None
+ self._pending_arp_set = set()
def get_floating_ips(self):
"""Filter Floating IPs to be hosted on this agent."""
if f['subnet_id'] == subnet_id:
return port
+ def _cache_arp_entry(self, ip, mac, subnet_id, operation):
+ """Cache the arp entries if device not ready."""
+ arp_entry_tuple = Arp_entry(ip=ip,
+ mac=mac,
+ subnet_id=subnet_id,
+ operation=operation)
+ self._pending_arp_set.add(arp_entry_tuple)
+
+ def _process_arp_cache_for_internal_port(self, subnet_id):
+ """Function to process the cached arp entries."""
+ arp_remove = set()
+ for arp_entry in self._pending_arp_set:
+ if subnet_id == arp_entry.subnet_id:
+ try:
+ state = self._update_arp_entry(
+ arp_entry.ip, arp_entry.mac,
+ arp_entry.subnet_id, arp_entry.operation)
+ except Exception:
+ state = False
+ if state:
+ # If the arp update was successful, then
+ # go ahead and add it to the remove set
+ arp_remove.add(arp_entry)
+
+ self._pending_arp_set -= arp_remove
+
+ def _delete_arp_cache_for_internal_port(self, subnet_id):
+ """Function to delete the cached arp entries."""
+ arp_delete = set()
+ for arp_entry in self._pending_arp_set:
+ if subnet_id == arp_entry.subnet_id:
+ arp_delete.add(arp_entry)
+ self._pending_arp_set -= arp_delete
+
def _update_arp_entry(self, ip, mac, subnet_id, operation):
"""Add or delete arp entry into router namespace for the subnet."""
port = self._get_internal_port(subnet_id)
# update arp entry only if the subnet is attached to the router
if not port:
- return
+ return False
try:
# TODO(mrsmith): optimize the calls below for bulk calls
interface_name = self.get_internal_device_name(port['id'])
device = ip_lib.IPDevice(interface_name, namespace=self.ns_name)
- if operation == 'add':
- device.neigh.add(ip, mac)
- elif operation == 'delete':
- device.neigh.delete(ip, mac)
+ if device.exists():
+ if operation == 'add':
+ device.neigh.add(ip, mac)
+ elif operation == 'delete':
+ device.neigh.delete(ip, mac)
+ return True
+ else:
+ if operation == 'add':
+ LOG.warn(_LW("Device %s does not exist so ARP entry "
+ "cannot be updated, will cache information "
+ "to be applied later when the device exists"),
+ device)
+ self._cache_arp_entry(ip, mac, subnet_id, operation)
+ return False
except Exception:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("DVR: Failed updating arp entry"))
p['mac_address'],
subnet_id,
'add')
+ self._process_arp_cache_for_internal_port(subnet_id)
@staticmethod
def _get_snat_idx(ip_cidr):
# DVR handling code for SNAT
interface_name = self.get_internal_device_name(port['id'])
self._snat_redirect_remove(sn_port, port, interface_name)
+ # Clean up the cached arp entries related to the port subnet
+ for subnet in port['subnets']:
+ self._delete_arp_cache_for_internal_port(subnet)
def internal_network_removed(self, port):
self._dvr_internal_network_removed(port)
# Test basic case
ports[0]['subnets'] = [{'id': subnet_id,
'cidr': '1.2.3.0/24'}]
- ri._set_subnet_arp_info(subnet_id)
+ with mock.patch.object(ri,
+ '_process_arp_cache_for_internal_port') as parp:
+ ri._set_subnet_arp_info(subnet_id)
+ self.assertEqual(1, parp.call_count)
self.mock_ip_dev.neigh.add.assert_called_once_with(
'1.2.3.4', '00:11:22:33:44:55')
ri._update_arp_entry(mock.ANY, mock.ANY, 'foo_subnet_id', 'add')
self.assertFalse(f.call_count)
+ def _setup_test_for_arp_entry_cache(self):
+ agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
+ router = l3_test_common.prepare_router_data(num_internal_ports=2)
+ router['distributed'] = True
+ ri = dvr_router.DvrLocalRouter(
+ agent, HOSTNAME, router['id'], router, **self.ri_kwargs)
+ subnet_id = l3_test_common.get_subnet_id(
+ ri.router[l3_constants.INTERFACE_KEY][0])
+ return ri, subnet_id
+
+ def test__update_arp_entry_calls_arp_cache_with_no_device(self):
+ ri, subnet_id = self._setup_test_for_arp_entry_cache()
+ state = True
+ with mock.patch.object(l3_agent.ip_lib, 'IPDevice') as rtrdev,\
+ mock.patch.object(ri, '_cache_arp_entry') as arp_cache:
+ rtrdev.return_value.exists.return_value = False
+ state = ri._update_arp_entry(
+ mock.ANY, mock.ANY, subnet_id, 'add')
+ self.assertFalse(state)
+ self.assertTrue(arp_cache.called)
+ arp_cache.assert_called_once_with(mock.ANY, mock.ANY,
+ subnet_id, 'add')
+ self.assertFalse(rtrdev.neigh.add.called)
+
+ def test__process_arp_cache_for_internal_port(self):
+ ri, subnet_id = self._setup_test_for_arp_entry_cache()
+ ri._cache_arp_entry('1.7.23.11', '00:11:22:33:44:55',
+ subnet_id, 'add')
+ self.assertEqual(1, len(ri._pending_arp_set))
+ with mock.patch.object(ri, '_update_arp_entry') as update_arp:
+ update_arp.return_value = True
+ ri._process_arp_cache_for_internal_port(subnet_id)
+ self.assertEqual(0, len(ri._pending_arp_set))
+
+ def test__delete_arp_cache_for_internal_port(self):
+ ri, subnet_id = self._setup_test_for_arp_entry_cache()
+ ri._cache_arp_entry('1.7.23.11', '00:11:22:33:44:55',
+ subnet_id, 'add')
+ self.assertEqual(1, len(ri._pending_arp_set))
+ ri._delete_arp_cache_for_internal_port(subnet_id)
+ self.assertEqual(0, len(ri._pending_arp_set))
+
def test_del_arp_entry(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
router = l3_test_common.prepare_router_data(num_internal_ports=2)