if network:
return dhcp.NetModel(self.use_namespaces, network)
- def get_dhcp_port(self, network_id, device_id):
- """Make a remote process call to get the dhcp port."""
- cctxt = self.client.prepare()
- port = cctxt.call(self.context, 'get_dhcp_port',
- network_id=network_id, device_id=device_id,
- host=self.host)
- if port:
- return dhcp.DictModel(port)
-
def create_dhcp_port(self, port):
"""Make a remote process call to create the dhcp port."""
cctxt = self.client.prepare(version='1.1')
# 1.0 - Initial version.
# 1.1 - Added get_active_networks_info, create_dhcp_port,
# and update_dhcp_port methods.
+ # 1.2 - Removed get_dhcp_port. When removing a method (Making a
+ # backwards incompatible change) you would normally bump the
+ # major version. However, since the method was unused in the
+ # RPC client for many releases, it should be OK to bump the
+ # minor release instead and claim RPC compatibility with the
+ # last few client versions.
target = oslo_messaging.Target(
namespace=constants.RPC_NAMESPACE_DHCP_PLUGIN,
- version='1.1')
+ version='1.2')
def _get_active_networks(self, context, **kwargs):
"""Retrieve and return a list of the active networks."""
network['ports'] = plugin.get_ports(context, filters=filters)
return network
- def get_dhcp_port(self, context, **kwargs):
- """Allocate a DHCP port for the host and return port information.
-
- This method will re-use an existing port if one already exists. When a
- port is re-used, the fixed_ip allocation will be updated to the current
- network state. If an expected failure occurs, a None port is returned.
-
- """
- host = kwargs.get('host')
- network_id = kwargs.get('network_id')
- device_id = kwargs.get('device_id')
- # There could be more than one dhcp server per network, so create
- # a device id that combines host and network ids
-
- LOG.debug('Port %(device_id)s for %(network_id)s requested from '
- '%(host)s', {'device_id': device_id,
- 'network_id': network_id,
- 'host': host})
- plugin = manager.NeutronManager.get_plugin()
- retval = None
-
- filters = dict(network_id=[network_id])
- subnets = dict([(s['id'], s) for s in
- plugin.get_subnets(context, filters=filters)])
-
- dhcp_enabled_subnet_ids = [s['id'] for s in
- subnets.values() if s['enable_dhcp']]
-
- try:
- filters = dict(network_id=[network_id], device_id=[device_id])
- ports = plugin.get_ports(context, filters=filters)
- if ports:
- # Ensure that fixed_ips cover all dhcp_enabled subnets.
- port = ports[0]
- for fixed_ip in port['fixed_ips']:
- if fixed_ip['subnet_id'] in dhcp_enabled_subnet_ids:
- dhcp_enabled_subnet_ids.remove(fixed_ip['subnet_id'])
- port['fixed_ips'].extend(
- [dict(subnet_id=s) for s in dhcp_enabled_subnet_ids])
-
- retval = plugin.update_port(context, port['id'],
- dict(port=port))
-
- except n_exc.NotFound as e:
- LOG.warning(e)
-
- if retval is None:
- # No previous port exists, so create a new one.
- LOG.debug('DHCP port %(device_id)s on network %(network_id)s '
- 'does not exist on %(host)s',
- {'device_id': device_id,
- 'network_id': network_id,
- 'host': host})
- try:
- network = plugin.get_network(context, network_id)
- except n_exc.NetworkNotFound:
- LOG.warn(_LW("Network %s could not be found, it might have "
- "been deleted concurrently."), network_id)
- return
-
- port_dict = dict(
- admin_state_up=True,
- device_id=device_id,
- network_id=network_id,
- tenant_id=network['tenant_id'],
- mac_address=attributes.ATTR_NOT_SPECIFIED,
- name='',
- device_owner=constants.DEVICE_OWNER_DHCP,
- fixed_ips=[dict(subnet_id=s) for s in dhcp_enabled_subnet_ids])
-
- retval = self._port_action(plugin, context, {'port': port_dict},
- 'create_port')
- if not retval:
- return
-
- # Convert subnet_id to subnet dict
- for fixed_ip in retval['fixed_ips']:
- subnet_id = fixed_ip.pop('subnet_id')
- fixed_ip['subnet'] = subnets[subnet_id]
-
- return retval
-
def release_dhcp_port(self, context, **kwargs):
"""Release the port currently being used by a DHCP agent."""
host = kwargs.get('host')
self._test_dhcp_api('get_network_info', network_id='fake_id',
return_value=None)
- def test_get_dhcp_port(self):
- self._test_dhcp_api('get_dhcp_port', network_id='fake_id',
- device_id='fake_id_2', return_value=None)
-
def test_create_dhcp_port(self):
self._test_dhcp_api('create_dhcp_port', port='fake_port',
return_value=None, version='1.1')
port = port or fake_port1
plugin = mock.Mock()
plugin.create_dhcp_port.return_value = port or fake_port1
- plugin.get_dhcp_port.return_value = port or fake_port1
self.ensure_device_is_ready.return_value = device_is_ready
self.mock_driver.get_device_name.return_value = 'tap12345678-12'
True, dict(id=FAKE_NETWORK_UUID,
tenant_id='aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaa'))
- fake_port = dhcp.DictModel(
- dict(id='12345678-1234-aaaa-1234567890ab',
- mac_address='aa:bb:cc:dd:ee:ff'))
-
with mock.patch('neutron.agent.linux.interface.NullDriver') as dvr_cls:
mock_driver = mock.MagicMock()
mock_driver.get_device_name.return_value = 'tap12345678-12'
dvr_cls.return_value = mock_driver
plugin = mock.Mock()
- plugin.get_dhcp_port.return_value = fake_port
dh = dhcp.DeviceManager(cfg.CONF, plugin)
dh.destroy(fake_net, 'tap12345678-12')
dvr_cls.return_value = mock_driver
plugin = mock.Mock()
- plugin.get_dhcp_port.return_value = fake_port
dh = dhcp.DeviceManager(cfg.CONF, plugin)
dh.get_interface_name(fake_net, fake_port)
self.assertEqual(retval['subnets'], subnet_retval)
self.assertEqual(retval['ports'], port_retval)
- def _test_get_dhcp_port_helper(self, port_retval, other_expectations=[],
- update_port=None, create_port=None):
- subnets_retval = [dict(id='a', enable_dhcp=True),
- dict(id='b', enable_dhcp=False)]
-
- self.plugin.get_subnets.return_value = subnets_retval
- if port_retval:
- self.plugin.get_ports.return_value = [port_retval]
- else:
- self.plugin.get_ports.return_value = []
- if isinstance(update_port, n_exc.NotFound):
- self.plugin.update_port.side_effect = update_port
- else:
- self.plugin.update_port.return_value = update_port
- self.plugin.create_port.return_value = create_port
-
- retval = self.callbacks.get_dhcp_port(mock.Mock(),
- network_id='netid',
- device_id='devid',
- host='host')
-
- expected = [mock.call.get_subnets(mock.ANY,
- filters=dict(network_id=['netid'])),
- mock.call.get_ports(mock.ANY,
- filters=dict(network_id=['netid'],
- device_id=['devid']))]
-
- expected.extend(other_expectations)
- self.plugin.assert_has_calls(expected)
- return retval
-
def test_update_dhcp_port_verify_port_action_port_dict(self):
port = {'port': {'network_id': 'foo_network_id',
'device_owner': constants.DEVICE_OWNER_DHCP,
self.plugin.assert_has_calls(
mock.call.update_port(mock.ANY, 'foo_port_id', expected_port))
- def test_get_dhcp_port_existing(self):
- port_retval = dict(id='port_id', fixed_ips=[dict(subnet_id='a')])
- expectations = [
- mock.call.update_port(mock.ANY, 'port_id', dict(port=port_retval))]
-
- self._test_get_dhcp_port_helper(port_retval, expectations,
- update_port=port_retval)
- self.assertEqual(len(self.log.mock_calls), 1)
-
- def _test_get_dhcp_port_create_new(self, update_port=None):
- self.plugin.get_network.return_value = dict(tenant_id='tenantid')
- create_spec = dict(tenant_id='tenantid', device_id='devid',
- network_id='netid', name='',
- admin_state_up=True,
- device_owner=constants.DEVICE_OWNER_DHCP,
- mac_address=mock.ANY)
- create_retval = create_spec.copy()
- create_retval['id'] = 'port_id'
- create_retval['fixed_ips'] = [dict(subnet_id='a', enable_dhcp=True)]
-
- create_spec['fixed_ips'] = [dict(subnet_id='a')]
-
- expectations = [
- mock.call.get_network(mock.ANY, 'netid'),
- mock.call.create_port(mock.ANY, dict(port=create_spec))]
-
- retval = self._test_get_dhcp_port_helper(None, expectations,
- update_port=update_port,
- create_port=create_retval)
- self.assertEqual(create_retval, retval)
- self.assertEqual(len(self.log.mock_calls), 2)
-
- def test_get_dhcp_port_create_new(self):
- self._test_get_dhcp_port_create_new()
-
- def test_get_dhcp_port_create_new_with_failure_on_port_update(self):
- self._test_get_dhcp_port_create_new(
- update_port=n_exc.PortNotFound(port_id='foo'))
-
def test_release_dhcp_port(self):
port_retval = dict(id='port_id', fixed_ips=[dict(subnet_id='a')])
self.plugin.get_ports.return_value = [port_retval]