utils.execute(cmd, self.root_helper)
def release_lease(self, mac_address, removed_ips):
+ pass
+
+ def _release_lease(self, mac_address, ip):
"""Release a DHCP lease."""
- for ip in removed_ips or []:
- cmd = ['dhcp_release', self.interface_name, ip, mac_address]
- if self.network.namespace:
- ip_wrapper = ip_lib.IPWrapper(self.root_helper,
- self.network.namespace)
- ip_wrapper.netns.execute(cmd)
- else:
- utils.execute(cmd, self.root_helper)
+ cmd = ['dhcp_release', self.interface_name, ip, mac_address]
+ if self.network.namespace:
+ ip_wrapper = ip_lib.IPWrapper(self.root_helper,
+ self.network.namespace)
+ ip_wrapper.netns.execute(cmd)
+ else:
+ utils.execute(cmd, self.root_helper)
def reload_allocations(self):
"""Rebuild the dnsmasq config and signal the dnsmasq to reload."""
'turned off DHCP: %s'), self.network.id)
return
+ self._release_unused_leases()
self._output_hosts_file()
self._output_opts_file()
if self.active:
utils.replace_file(name, buf.getvalue())
return name
+ def _read_hosts_file_leases(self, filename):
+ leases = set()
+ if os.path.exists(filename):
+ with open(filename) as f:
+ for l in f.readlines():
+ host = l.strip().split(',')
+ leases.add((host[2], host[0]))
+ return leases
+
+ def _release_unused_leases(self):
+ filename = self.get_conf_file_name('host')
+ old_leases = self._read_hosts_file_leases(filename)
+
+ new_leases = set()
+ for port in self.network.ports:
+ for alloc in port.fixed_ips:
+ new_leases.add((alloc.ip_address, port.mac_address))
+
+ for ip, mac in old_leases - new_leases:
+ self._release_lease(mac, ip)
+
def _output_opts_file(self):
"""Write a dnsmasq compatible options file."""
self.safe.assert_called_once_with('/foo/opts', expected)
- def test_release_lease(self):
- dm = dhcp.Dnsmasq(self.conf, FakeDualNetwork(), version=float(2.59))
- dm.release_lease(mac_address=FakePort2.mac_address,
- removed_ips=[FakePort2.fixed_ips[0].ip_address])
- exp_args = ['ip', 'netns', 'exec', 'qdhcp-ns', 'dhcp_release',
- dm.interface_name, FakePort2.fixed_ips[0].ip_address,
- FakePort2.mac_address]
- self.execute.assert_called_once_with(exp_args, root_helper='sudo',
- check_exit_code=True)
-
def test_output_opts_file_pxe_2port_1net(self):
expected = """
tag:tag0,option:dns-server,8.8.8.8
mock.call(exp_opt_name, exp_opt_data)])
mock_open.assert_called_once_with('/proc/5/cmdline', 'r')
+ def test_release_unused_leases(self):
+ dnsmasq = dhcp.Dnsmasq(self.conf, FakeDualNetwork())
+
+ ip1 = '192.168.1.2'
+ mac1 = '00:00:80:aa:bb:cc'
+ ip2 = '192.168.1.3'
+ mac2 = '00:00:80:cc:bb:aa'
+
+ old_leases = set([(ip1, mac1), (ip2, mac2)])
+ dnsmasq._read_hosts_file_leases = mock.Mock(return_value=old_leases)
+ dnsmasq._output_hosts_file = mock.Mock()
+ dnsmasq._release_lease = mock.Mock()
+ dnsmasq.network.ports = []
+
+ dnsmasq._release_unused_leases()
+
+ dnsmasq._release_lease.assert_has_calls([mock.call(mac1, ip1),
+ mock.call(mac2, ip2)],
+ any_order=True)
+
+ def test_release_unused_leases_one_lease(self):
+ dnsmasq = dhcp.Dnsmasq(self.conf, FakeDualNetwork())
+
+ ip1 = '192.168.0.2'
+ mac1 = '00:00:80:aa:bb:cc'
+ ip2 = '192.168.0.3'
+ mac2 = '00:00:80:cc:bb:aa'
+
+ old_leases = set([(ip1, mac1), (ip2, mac2)])
+ dnsmasq._read_hosts_file_leases = mock.Mock(return_value=old_leases)
+ dnsmasq._output_hosts_file = mock.Mock()
+ dnsmasq._release_lease = mock.Mock()
+ dnsmasq.network.ports = [FakePort1()]
+
+ dnsmasq._release_unused_leases()
+
+ dnsmasq._release_lease.assert_has_calls([mock.call(mac2, ip2)],
+ any_order=True)
+
+ def test_read_hosts_file_leases(self):
+ filename = '/path/to/file'
+ with mock.patch('os.path.exists') as mock_exists:
+ mock_exists.return_value = True
+ with mock.patch('__builtin__.open') as mock_open:
+ mock_open.return_value.__enter__ = lambda s: s
+ mock_open.return_value.__exit__ = mock.Mock()
+ lines = ["00:00:80:aa:bb:cc,inst-name,192.168.0.1"]
+ mock_open.return_value.readlines.return_value = lines
+
+ dnsmasq = dhcp.Dnsmasq(self.conf, FakeDualNetwork())
+ leases = dnsmasq._read_hosts_file_leases(filename)
+
+ self.assertEqual(set([("192.168.0.1", "00:00:80:aa:bb:cc")]), leases)
+ mock_exists.assert_called_once_with(filename)
+ mock_open.assert_called_once_with(filename)
+
def test_make_subnet_interface_ip_map(self):
with mock.patch('neutron.agent.linux.ip_lib.IPDevice') as ip_dev:
ip_dev.return_value.addr.list.return_value = [