pm.disable()
-class DhcpPluginApi(n_rpc.RpcProxy):
+class DhcpPluginApi(object):
"""Agent side of the dhcp rpc API.
API version history:
"""
- BASE_RPC_API_VERSION = '1.1'
-
def __init__(self, topic, context, use_namespaces):
- super(DhcpPluginApi, self).__init__(
- topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.context = context
self.host = cfg.CONF.host
self.use_namespaces = use_namespaces
+ target = messaging.Target(topic=topic, version='1.1')
+ self.client = n_rpc.get_client(target)
def get_active_networks_info(self):
"""Make a remote process call to retrieve all network info."""
- networks = self.call(self.context,
- self.make_msg('get_active_networks_info',
- host=self.host))
+ cctxt = self.client.prepare()
+ networks = cctxt.call(self.context, 'get_active_networks_info',
+ host=self.host)
return [dhcp.NetModel(self.use_namespaces, n) for n in networks]
def get_network_info(self, network_id):
"""Make a remote process call to retrieve network info."""
- network = self.call(self.context,
- self.make_msg('get_network_info',
- network_id=network_id,
- host=self.host))
+ cctxt = self.client.prepare()
+ network = cctxt.call(self.context, 'get_network_info',
+ network_id=network_id, host=self.host)
if network:
return dhcp.NetModel(self.use_namespaces, network)
def get_dhcp_port(self, network_id, device_id):
"""Make a remote process call to get the dhcp port."""
- port = self.call(self.context,
- self.make_msg('get_dhcp_port',
- network_id=network_id,
- device_id=device_id,
- host=self.host))
+ cctxt = self.client.prepare()
+ port = cctxt.call(self.context, 'get_dhcp_port',
+ network_id=network_id, device_id=device_id,
+ host=self.host)
if port:
return dhcp.DictModel(port)
def create_dhcp_port(self, port):
"""Make a remote process call to create the dhcp port."""
- port = self.call(self.context,
- self.make_msg('create_dhcp_port',
- port=port,
- host=self.host))
+ cctxt = self.client.prepare()
+ port = cctxt.call(self.context, 'create_dhcp_port',
+ port=port, host=self.host)
if port:
return dhcp.DictModel(port)
def update_dhcp_port(self, port_id, port):
"""Make a remote process call to update the dhcp port."""
- port = self.call(self.context,
- self.make_msg('update_dhcp_port',
- port_id=port_id,
- port=port,
- host=self.host))
+ cctxt = self.client.prepare()
+ port = cctxt.call(self.context, 'update_dhcp_port',
+ port_id=port_id, port=port, host=self.host)
if port:
return dhcp.DictModel(port)
def release_dhcp_port(self, network_id, device_id):
"""Make a remote process call to release the dhcp port."""
- return self.call(self.context,
- self.make_msg('release_dhcp_port',
- network_id=network_id,
- device_id=device_id,
- host=self.host))
+ cctxt = self.client.prepare()
+ return cctxt.call(self.context, 'release_dhcp_port',
+ network_id=network_id, device_id=device_id,
+ host=self.host)
def release_port_fixed_ip(self, network_id, device_id, subnet_id):
"""Make a remote process call to release a fixed_ip on the port."""
- return self.call(self.context,
- self.make_msg('release_port_fixed_ip',
- network_id=network_id,
- subnet_id=subnet_id,
- device_id=device_id,
- host=self.host))
+ cctxt = self.client.prepare()
+ return cctxt.call(self.context, 'release_port_fixed_ip',
+ network_id=network_id, subnet_id=subnet_id,
+ device_id=device_id, host=self.host)
class NetworkCache(object):
# License for the specific language governing permissions and limitations
# under the License.
+import contextlib
import copy
import sys
import uuid
from neutron.common import config as common_config
from neutron.common import constants as const
from neutron.common import exceptions
+from neutron import context
from neutron.tests import base
class TestDhcpPluginApiProxy(base.BaseTestCase):
- def setUp(self):
- super(TestDhcpPluginApiProxy, self).setUp()
- self.proxy = dhcp_agent.DhcpPluginApi('foo', {}, None)
- self.proxy.host = 'foo'
+ def _test_dhcp_api(self, method, **kwargs):
+ ctxt = context.get_admin_context()
+ proxy = dhcp_agent.DhcpPluginApi('foo', ctxt, None)
+ proxy.host = 'foo'
+
+ with contextlib.nested(
+ mock.patch.object(proxy.client, 'call'),
+ mock.patch.object(proxy.client, 'prepare'),
+ ) as (
+ rpc_mock, prepare_mock
+ ):
+ prepare_mock.return_value = proxy.client
+ rpc_mock.return_value = kwargs.pop('return_value', [])
+
+ prepare_args = {}
+ if 'version' in kwargs:
+ prepare_args['version'] = kwargs.pop('version')
+
+ retval = getattr(proxy, method)(**kwargs)
+ self.assertEqual(retval, rpc_mock.return_value)
+
+ prepare_mock.assert_called_once_with(**prepare_args)
+ kwargs['host'] = proxy.host
+ rpc_mock.assert_called_once_with(ctxt, method, **kwargs)
- self.call_p = mock.patch.object(self.proxy, 'call')
- self.call = self.call_p.start()
- self.make_msg_p = mock.patch.object(self.proxy, 'make_msg')
- self.make_msg = self.make_msg_p.start()
+ def test_get_active_networks_info(self):
+ self._test_dhcp_api('get_active_networks_info')
def test_get_network_info(self):
- self.call.return_value = dict(a=1)
- retval = self.proxy.get_network_info('netid')
- self.assertEqual(retval.a, 1)
- self.assertTrue(self.call.called)
- self.make_msg.assert_called_once_with('get_network_info',
- network_id='netid',
- host='foo')
+ self._test_dhcp_api('get_network_info', network_id='fake_id',
+ return_value=None)
def test_get_dhcp_port(self):
- self.call.return_value = dict(a=1)
- retval = self.proxy.get_dhcp_port('netid', 'devid')
- self.assertEqual(retval.a, 1)
- self.assertTrue(self.call.called)
- self.make_msg.assert_called_once_with('get_dhcp_port',
- network_id='netid',
- device_id='devid',
- host='foo')
-
- def test_get_dhcp_port_none(self):
- self.call.return_value = None
- self.assertIsNone(self.proxy.get_dhcp_port('netid', 'devid'))
-
- def test_get_active_networks_info(self):
- self.proxy.get_active_networks_info()
- self.make_msg.assert_called_once_with('get_active_networks_info',
- host='foo')
+ self._test_dhcp_api('get_dhcp_port', network_id='fake_id',
+ device_id='fake_id_2', return_value=None)
def test_create_dhcp_port(self):
- port_body = (
- {'port':
- {'name': '', 'admin_state_up': True,
- 'network_id': fake_network.id,
- 'tenant_id': fake_network.tenant_id,
- 'fixed_ips': [{'subnet_id': fake_fixed_ip1.subnet_id}],
- 'device_id': mock.ANY}})
-
- self.proxy.create_dhcp_port(port_body)
- self.make_msg.assert_called_once_with('create_dhcp_port',
- port=port_body,
- host='foo')
-
- def test_create_dhcp_port_none(self):
- self.call.return_value = None
- port_body = (
- {'port':
- {'name': '', 'admin_state_up': True,
- 'network_id': fake_network.id,
- 'tenant_id': fake_network.tenant_id,
- 'fixed_ips': [{'subnet_id': fake_fixed_ip1.subnet_id}],
- 'device_id': mock.ANY}})
- self.assertIsNone(self.proxy.create_dhcp_port(port_body))
-
- def test_update_dhcp_port_none(self):
- self.call.return_value = None
- port_body = {'port': {'fixed_ips':
- [{'subnet_id': fake_fixed_ip1.subnet_id}]}}
- self.assertIsNone(self.proxy.update_dhcp_port(fake_port1.id,
- port_body))
+ self._test_dhcp_api('create_dhcp_port', port='fake_port',
+ return_value=None)
def test_update_dhcp_port(self):
- port_body = {'port': {'fixed_ips':
- [{'subnet_id': fake_fixed_ip1.subnet_id}]}}
- self.proxy.update_dhcp_port(fake_port1.id, port_body)
- self.make_msg.assert_called_once_with('update_dhcp_port',
- port_id=fake_port1.id,
- port=port_body,
- host='foo')
+ self._test_dhcp_api('update_dhcp_port', port_id='fake_id',
+ port='fake_port', return_value=None)
def test_release_dhcp_port(self):
- self.proxy.release_dhcp_port('netid', 'devid')
- self.assertTrue(self.call.called)
- self.make_msg.assert_called_once_with('release_dhcp_port',
- network_id='netid',
- device_id='devid',
- host='foo')
+ self._test_dhcp_api('release_dhcp_port', network_id='fake_id',
+ device_id='fake_id_2')
def test_release_port_fixed_ip(self):
- self.proxy.release_port_fixed_ip('netid', 'devid', 'subid')
- self.assertTrue(self.call.called)
- self.make_msg.assert_called_once_with('release_port_fixed_ip',
- network_id='netid',
- subnet_id='subid',
- device_id='devid',
- host='foo')
+ self._test_dhcp_api('release_port_fixed_ip', network_id='fake_id',
+ device_id='fake_id_2', subnet_id='fake_id_3')
class TestNetworkCache(base.BaseTestCase):