return method(context, 'report_state', **kwargs)
-class PluginApi(n_rpc.RpcProxy):
+class PluginApi(object):
'''Agent side of the rpc API.
API version history:
the device port
'''
- BASE_RPC_API_VERSION = '1.1'
-
def __init__(self, topic):
- super(PluginApi, self).__init__(
- topic=topic, default_version=self.BASE_RPC_API_VERSION)
+ target = messaging.Target(topic=topic, version='1.0')
+ self.client = n_rpc.get_client(target)
def get_device_details(self, context, device, agent_id, host=None):
- return self.call(context,
- self.make_msg('get_device_details', device=device,
- agent_id=agent_id, host=host))
+ cctxt = self.client.prepare()
+ return cctxt.call(context, 'get_device_details', device=device,
+ agent_id=agent_id, host=host)
def get_devices_details_list(self, context, devices, agent_id, host=None):
res = []
try:
- res = self.call(context,
- self.make_msg('get_devices_details_list',
- devices=devices,
- agent_id=agent_id,
- host=host),
- version='1.3')
+ cctxt = self.client.prepare(version='1.3')
+ res = cctxt.call(context, 'get_devices_details_list',
+ devices=devices, agent_id=agent_id, host=host)
except messaging.UnsupportedVersion:
# If the server has not been upgraded yet, a DVR-enabled agent
# may not work correctly, however it can function in 'degraded'
# mode, in that DVR routers may not be in the system yet, and
# it might be not necessary to retrieve info about the host.
LOG.warn(_LW('DVR functionality requires a server upgrade.'))
+ cctxt = self.client.prepare()
res = [
- self.call(context,
- self.make_msg('get_device_details', device=device,
- agent_id=agent_id, host=host))
+ self.get_device_details(context, device, agent_id, host)
for device in devices
]
return res
def update_device_down(self, context, device, agent_id, host=None):
- return self.call(context,
- self.make_msg('update_device_down', device=device,
- agent_id=agent_id, host=host))
+ cctxt = self.client.prepare()
+ return cctxt.call(context, 'update_device_down', device=device,
+ agent_id=agent_id, host=host)
def update_device_up(self, context, device, agent_id, host=None):
- return self.call(context,
- self.make_msg('update_device_up', device=device,
- agent_id=agent_id, host=host))
+ cctxt = self.client.prepare()
+ return cctxt.call(context, 'update_device_up', device=device,
+ agent_id=agent_id, host=host)
def tunnel_sync(self, context, tunnel_ip, tunnel_type=None):
- return self.call(context,
- self.make_msg('tunnel_sync', tunnel_ip=tunnel_ip,
- tunnel_type=tunnel_type))
+ cctxt = self.client.prepare()
+ return cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip,
+ tunnel_type=tunnel_type)
def security_group_rules_for_devices(self, context, devices):
LOG.debug("Get security group rules "
"for devices via rpc %r", devices)
- return self.call(context,
- self.make_msg('security_group_rules_for_devices',
- devices=devices),
- version='1.1')
+ cctxt = self.client.prepare(version='1.1')
+ return cctxt.call(context, 'security_group_rules_for_devices',
+ devices=devices)
def security_group_info_for_devices(self, context, devices):
LOG.debug("Get security group information for devices via rpc %r",
devices)
- return self.call(context,
- self.make_msg('security_group_info_for_devices',
- devices=devices),
- version='1.2')
+ cctxt = self.client.prepare(version='1.2')
+ return cctxt.call(context, 'security_group_info_for_devices',
+ devices=devices)
class SecurityGroupAgentRpcCallbackMixin(object):
self.refresh_firewall(updated_devices)
+# NOTE(russellb) This class has been conditionally converted to use the
+# oslo.messaging APIs because it's a mix-in used in different places. The
+# conditional usage is temporary until the whole code base has been converted
+# to stop using the RpcProxy compatibility class.
class SecurityGroupAgentRpcApiMixin(object):
def _get_security_group_topic(self):
"""Notify rule updated security groups."""
if not security_groups:
return
- self.fanout_cast(context,
- self.make_msg('security_groups_rule_updated',
- security_groups=security_groups),
- version=SG_RPC_VERSION,
- topic=self._get_security_group_topic())
+ if hasattr(self, 'client'):
+ cctxt = self.client.prepare(version=SG_RPC_VERSION,
+ topic=self._get_security_group_topic(),
+ fanout=True)
+ cctxt.cast(context, 'security_groups_rule_updated',
+ security_groups=security_groups)
+ else:
+ self.fanout_cast(context,
+ self.make_msg('security_groups_rule_updated',
+ security_groups=security_groups),
+ version=SG_RPC_VERSION,
+ topic=self._get_security_group_topic())
def security_groups_member_updated(self, context, security_groups):
"""Notify member updated security groups."""
if not security_groups:
return
- self.fanout_cast(context,
- self.make_msg('security_groups_member_updated',
- security_groups=security_groups),
- version=SG_RPC_VERSION,
- topic=self._get_security_group_topic())
+ if hasattr(self, 'client'):
+ cctxt = self.client.prepare(version=SG_RPC_VERSION,
+ topic=self._get_security_group_topic(),
+ fanout=True)
+ cctxt.cast(context, 'security_groups_member_updated',
+ security_groups=security_groups)
+ else:
+ self.fanout_cast(context,
+ self.make_msg('security_groups_member_updated',
+ security_groups=security_groups),
+ version=SG_RPC_VERSION,
+ topic=self._get_security_group_topic())
def security_groups_provider_updated(self, context):
"""Notify provider updated security groups."""
- self.fanout_cast(context,
- self.make_msg('security_groups_provider_updated'),
- version=SG_RPC_VERSION,
- topic=self._get_security_group_topic())
+ if hasattr(self, 'client'):
+ cctxt = self.client.prepare(version=SG_RPC_VERSION,
+ topic=self._get_security_group_topic(),
+ fanout=True)
+ cctxt.cast(context, 'security_groups_member_updated')
+ else:
+ self.fanout_cast(context,
+ self.make_msg('security_groups_provider_updated'),
+ version=SG_RPC_VERSION,
+ topic=self._get_security_group_topic())
class NECPluginApi(agent_rpc.PluginApi):
- BASE_RPC_API_VERSION = '1.0'
def update_ports(self, context, agent_id, datapath_id,
port_added, port_removed):
LOG.info(_("Update ports: added=%(added)s, "
"removed=%(removed)s"),
{'added': port_added, 'removed': port_removed})
- self.call(context,
- self.make_msg('update_ports',
- topic=topics.AGENT,
- agent_id=agent_id,
- datapath_id=datapath_id,
- port_added=port_added,
- port_removed=port_removed))
+ cctxt = self.client.prepare()
+ return cctxt.call(context, 'update_ports',
+ agent_id=agent_id,
+ datapath_id=datapath_id,
+ port_added=port_added,
+ port_removed=port_removed)
class NECAgentRpcCallback(n_rpc.RpcCallback):
sg_rpc.SecurityGroupServerRpcApiMixin):
def get_ofp_rest_api_addr(self, context):
LOG.debug(_("Get Ryu rest API address"))
- return self.call(context,
- self.make_msg('get_ofp_rest_api'))
+ cctxt = self.client.prepare()
+ return cctxt.call(context, 'get_ofp_rest_api')
class RyuSecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin):
Unit Tests for hyperv neutron rpc
"""
+import contextlib
import mock
from neutron.agent import rpc as agent_rpc
class rpcHyperVApiTestCase(base.BaseTestCase):
- def _test_hyperv_neutron_api(
+ def _test_hyperv_neutron_api_legacy(
self, rpcapi, topic, method, rpc_method, **kwargs):
+ # NOTE(russellb) This version of the test method is used for interfaces
+ # not yet converted away from using the RpcProxy compatibility class.
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if rpc_method == 'call' else None
expected_version = kwargs.pop('version', None)
]
rpc_method_mock.assert_has_calls(expected)
+ def _test_hyperv_neutron_api(
+ self, rpcapi, topic, method, rpc_method, **kwargs):
+ ctxt = context.RequestContext('fake_user', 'fake_project')
+ expected_retval = 'foo' if rpc_method == 'call' else None
+ expected_version = kwargs.pop('version', None)
+
+ with contextlib.nested(
+ mock.patch.object(rpcapi.client, rpc_method),
+ mock.patch.object(rpcapi.client, 'prepare'),
+ ) as (
+ rpc_mock, prepare_mock
+ ):
+ prepare_mock.return_value = rpcapi.client
+ rpc_mock.return_value = expected_retval
+ retval = getattr(rpcapi, method)(ctxt, **kwargs)
+
+ self.assertEqual(retval, expected_retval)
+
+ prepare_args = {}
+ if expected_version:
+ prepare_args['version'] = expected_version
+ prepare_mock.assert_called_once_with(**prepare_args)
+
+ rpc_mock.assert_called_once_with(ctxt, method, **kwargs)
+
def test_delete_network(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT)
- self._test_hyperv_neutron_api(
+ self._test_hyperv_neutron_api_legacy(
rpcapi,
topics.get_topic_name(
topics.AGENT,
def test_port_update(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT)
- self._test_hyperv_neutron_api(
+ self._test_hyperv_neutron_api_legacy(
rpcapi,
topics.get_topic_name(
topics.AGENT,
def test_port_delete(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT)
- self._test_hyperv_neutron_api(
+ self._test_hyperv_neutron_api_legacy(
rpcapi,
topics.get_topic_name(
topics.AGENT,
def test_tunnel_update(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT)
- self._test_hyperv_neutron_api(
+ self._test_hyperv_neutron_api_legacy(
rpcapi,
topics.get_topic_name(
topics.AGENT,
"""
import collections
+import contextlib
import mock
class RpcApiTestCase(base.BaseTestCase):
- def _test_rpc_api(self, rpcapi, topic, method, rpc_method, **kwargs):
+ def _test_rpc_api_legacy(self, rpcapi, topic, method, rpc_method,
+ **kwargs):
+ # NOTE(russellb) This can be removed once AgentNotifierApi has been
+ # converted over to no longer use the RpcProxy compatibility class.
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if rpc_method == 'call' else None
expected_version = kwargs.pop('version', None)
]
rpc_method_mock.assert_has_calls(expected)
+ def _test_rpc_api(self, rpcapi, topic, method, rpc_method, **kwargs):
+ ctxt = context.RequestContext('fake_user', 'fake_project')
+ expected_retval = 'foo' if rpc_method == 'call' else None
+ expected_version = kwargs.pop('version', None)
+
+ with contextlib.nested(
+ mock.patch.object(rpcapi.client, rpc_method),
+ mock.patch.object(rpcapi.client, 'prepare'),
+ ) as (
+ rpc_mock, prepare_mock
+ ):
+ prepare_mock.return_value = rpcapi.client
+ rpc_mock.return_value = expected_retval
+ retval = getattr(rpcapi, method)(ctxt, **kwargs)
+
+ prepare_args = {}
+ if expected_version:
+ prepare_args['version'] = expected_version
+ prepare_mock.assert_called_once_with(**prepare_args)
+
+ self.assertEqual(retval, expected_retval)
+ rpc_mock.assert_called_once_with(ctxt, method, **kwargs)
+
def test_delete_network(self):
rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
- self._test_rpc_api(rpcapi,
- topics.get_topic_name(topics.AGENT,
- topics.NETWORK,
- topics.DELETE),
- 'network_delete', rpc_method='fanout_cast',
- network_id='fake_request_spec')
+ self._test_rpc_api_legacy(
+ rpcapi,
+ topics.get_topic_name(topics.AGENT,
+ topics.NETWORK,
+ topics.DELETE),
+ 'network_delete', rpc_method='fanout_cast',
+ network_id='fake_request_spec')
def test_port_update(self):
rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
- self._test_rpc_api(rpcapi,
- topics.get_topic_name(topics.AGENT,
- topics.PORT,
- topics.UPDATE),
- 'port_update', rpc_method='fanout_cast',
- port='fake_port',
- network_type='fake_network_type',
- segmentation_id='fake_segmentation_id',
- physical_network='fake_physical_network')
+ self._test_rpc_api_legacy(
+ rpcapi,
+ topics.get_topic_name(topics.AGENT,
+ topics.PORT,
+ topics.UPDATE),
+ 'port_update', rpc_method='fanout_cast',
+ port='fake_port',
+ network_type='fake_network_type',
+ segmentation_id='fake_segmentation_id',
+ physical_network='fake_physical_network')
def test_tunnel_update(self):
rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT)
- self._test_rpc_api(rpcapi,
- topics.get_topic_name(topics.AGENT,
- type_tunnel.TUNNEL,
- topics.UPDATE),
- 'tunnel_update', rpc_method='fanout_cast',
- tunnel_ip='fake_ip', tunnel_type='gre')
+ self._test_rpc_api_legacy(
+ rpcapi,
+ topics.get_topic_name(topics.AGENT,
+ type_tunnel.TUNNEL,
+ topics.UPDATE),
+ 'tunnel_update', rpc_method='fanout_cast',
+ tunnel_ip='fake_ip', tunnel_type='gre')
def test_device_details(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
Unit Tests for Mellanox RPC (major reuse of linuxbridge rpc unit tests)
"""
+import contextlib
+import mock
+
import fixtures
from oslo.config import cfg
class rpcApiTestCase(base.BaseTestCase):
- def _test_mlnx_api(self, rpcapi, topic, method, rpc_method,
- expected_msg=None, **kwargs):
+ def _test_mlnx_api_legacy(self, rpcapi, topic, method, rpc_method,
+ expected_msg=None, **kwargs):
+ # NOTE(russellb) This method can be removed once the AgentNotifierApi
+ # has been converted to no longer use the RpcProxy class.
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if rpc_method == 'call' else None
expected_kwargs = {}
self.assertEqual(expected_arg, arg)
self.assertEqual(expected_kwargs, self.fake_kwargs)
+ def _test_mlnx_api(self, rpcapi, topic, method, rpc_method, **kwargs):
+ ctxt = context.RequestContext('fake_user', 'fake_project')
+ expected_retval = 'foo' if rpc_method == 'call' else None
+ expected_version = kwargs.pop('version', None)
+
+ with contextlib.nested(
+ mock.patch.object(rpcapi.client, rpc_method),
+ mock.patch.object(rpcapi.client, 'prepare'),
+ ) as (
+ rpc_mock, prepare_mock
+ ):
+ prepare_mock.return_value = rpcapi.client
+ rpc_mock.return_value = expected_retval
+ retval = getattr(rpcapi, method)(ctxt, **kwargs)
+
+ prepare_args = {}
+ if expected_version:
+ prepare_args['version'] = expected_version
+ prepare_mock.assert_called_once_with(**prepare_args)
+
+ self.assertEqual(retval, expected_retval)
+ rpc_mock.assert_called_once_with(ctxt, method, **kwargs)
+
def test_delete_network(self):
rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT)
- self._test_mlnx_api(rpcapi,
- topics.get_topic_name(topics.AGENT,
- topics.NETWORK,
- topics.DELETE),
- 'network_delete', rpc_method='fanout_cast',
- network_id='fake_request_spec')
+ self._test_mlnx_api_legacy(
+ rpcapi,
+ topics.get_topic_name(topics.AGENT,
+ topics.NETWORK,
+ topics.DELETE),
+ 'network_delete', rpc_method='fanout_cast',
+ network_id='fake_request_spec')
def test_port_update(self):
cfg.CONF.set_override('rpc_support_old_agents', False, 'AGENT')
network_type='vlan',
physical_network='fake_net',
segmentation_id='fake_vlan_id')
- self._test_mlnx_api(rpcapi,
- topics.get_topic_name(topics.AGENT,
- topics.PORT,
- topics.UPDATE),
- 'port_update', rpc_method='fanout_cast',
- expected_msg=expected_msg,
- port='fake_port',
- network_type='vlan',
- physical_network='fake_net',
- vlan_id='fake_vlan_id')
+ self._test_mlnx_api_legacy(
+ rpcapi,
+ topics.get_topic_name(topics.AGENT,
+ topics.PORT,
+ topics.UPDATE),
+ 'port_update', rpc_method='fanout_cast',
+ expected_msg=expected_msg,
+ port='fake_port',
+ network_type='vlan',
+ physical_network='fake_net',
+ vlan_id='fake_vlan_id')
def test_port_update_ib(self):
cfg.CONF.set_override('rpc_support_old_agents', False, 'AGENT')
network_type='ib',
physical_network='fake_net',
segmentation_id='fake_vlan_id')
- self._test_mlnx_api(rpcapi,
- topics.get_topic_name(topics.AGENT,
- topics.PORT,
- topics.UPDATE),
- 'port_update', rpc_method='fanout_cast',
- expected_msg=expected_msg,
- port='fake_port',
- network_type='ib',
- physical_network='fake_net',
- vlan_id='fake_vlan_id')
+ self._test_mlnx_api_legacy(
+ rpcapi,
+ topics.get_topic_name(topics.AGENT,
+ topics.PORT,
+ topics.UPDATE),
+ 'port_update', rpc_method='fanout_cast',
+ expected_msg=expected_msg,
+ port='fake_port',
+ network_type='ib',
+ physical_network='fake_net',
+ vlan_id='fake_vlan_id')
def test_port_update_old_agent(self):
cfg.CONF.set_override('rpc_support_old_agents', True, 'AGENT')
physical_network='fake_net',
segmentation_id='fake_vlan_id',
vlan_id='fake_vlan_id')
- self._test_mlnx_api(rpcapi,
- topics.get_topic_name(topics.AGENT,
- topics.PORT,
- topics.UPDATE),
- 'port_update', rpc_method='fanout_cast',
- expected_msg=expected_msg,
- port='fake_port',
- network_type='vlan',
- physical_network='fake_net',
- vlan_id='fake_vlan_id')
+ self._test_mlnx_api_legacy(
+ rpcapi,
+ topics.get_topic_name(topics.AGENT,
+ topics.PORT,
+ topics.UPDATE),
+ 'port_update', rpc_method='fanout_cast',
+ expected_msg=expected_msg,
+ port='fake_port',
+ network_type='vlan',
+ physical_network='fake_net',
+ vlan_id='fake_vlan_id')
def test_device_details(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
class TestNecAgentPluginApi(TestNecAgentBase):
- def _test_plugin_api(self, expected_failure=False):
+ def test_plugin_api(self):
with contextlib.nested(
- mock.patch.object(nec_neutron_agent.NECPluginApi, 'make_msg'),
- mock.patch.object(nec_neutron_agent.NECPluginApi, 'call'),
- mock.patch.object(nec_neutron_agent, 'LOG')
- ) as (make_msg, apicall, log):
+ mock.patch.object(self.agent.plugin_rpc.client, 'prepare'),
+ mock.patch.object(self.agent.plugin_rpc.client, 'call'),
+ ) as (mock_prepare, mock_call):
+ mock_prepare.return_value = self.agent.plugin_rpc.client
+
agent_id = 'nec-q-agent.dummy-host'
- if expected_failure:
- apicall.side_effect = Exception()
+ port_added = [{'id': 'id-1', 'mac': 'mac-1', 'port_no': '1'},
+ {'id': 'id-2', 'mac': 'mac-2', 'port_no': '2'}]
+ port_removed = ['id-3', 'id-4', 'id-5']
self.agent.plugin_rpc.update_ports(
mock.sentinel.ctx, agent_id, OVS_DPID_0X,
- # port_added
- [{'id': 'id-1', 'mac': 'mac-1', 'port_no': '1'},
- {'id': 'id-2', 'mac': 'mac-2', 'port_no': '2'}],
- # port_removed
- ['id-3', 'id-4', 'id-5'])
-
- make_msg.assert_called_once_with(
- 'update_ports', topic='q-agent-notifier',
- agent_id=agent_id, datapath_id=OVS_DPID_0X,
- port_added=[{'id': 'id-1', 'mac': 'mac-1', 'port_no': '1'},
- {'id': 'id-2', 'mac': 'mac-2', 'port_no': '2'}],
- port_removed=['id-3', 'id-4', 'id-5'])
-
- apicall.assert_called_once_with(mock.sentinel.ctx,
- make_msg.return_value)
-
- self.assertTrue(log.info.called)
- if expected_failure:
- self.assertTrue(log.warn.called)
+ port_added, port_removed)
- def test_plugin_api(self):
- self._test_plugin_api()
+ mock_call.assert_called_once_with(
+ mock.sentinel.ctx, 'update_ports',
+ agent_id=agent_id, datapath_id=OVS_DPID_0X,
+ port_added=port_added, port_removed=port_removed)
class TestNecAgentMain(base.BaseTestCase):
class TestRyuPluginApi(RyuAgentTestCase):
def test_get_ofp_rest_api_addr(self):
+ rpcapi = self.mod_agent.RyuPluginApi('foo')
with contextlib.nested(
- mock.patch(self._AGENT_NAME + '.RyuPluginApi.make_msg',
- return_value='msg'),
- mock.patch(self._AGENT_NAME + '.RyuPluginApi.call',
- return_value='10.0.0.1')
- ) as (mock_msg, mock_call):
- api = self.mod_agent.RyuPluginApi('topics')
- addr = api.get_ofp_rest_api_addr('context')
-
- self.assertEqual(addr, '10.0.0.1')
- mock_msg.assert_has_calls([
- mock.call('get_ofp_rest_api')
- ])
- mock_call.assert_has_calls([
- mock.call('context', 'msg')
- ])
+ mock.patch.object(rpcapi.client, 'call'),
+ mock.patch.object(rpcapi.client, 'prepare'),
+ ) as (
+ rpc_mock, prepare_mock
+ ):
+ prepare_mock.return_value = rpcapi.client
+ rpc_mock.return_value = 'return'
+ addr = rpcapi.get_ofp_rest_api_addr('context')
+
+ self.assertEqual('return', addr)
+ rpc_mock.assert_called_once_with('context', 'get_ofp_rest_api')
class TestVifPortSet(RyuAgentTestCase):
agent = rpc.PluginApi('fake_topic')
ctxt = context.RequestContext('fake_user', 'fake_project')
expect_val = 'foo'
- with mock.patch('neutron.common.rpc.RpcProxy.call') as rpc_call:
- rpc_call.return_value = expect_val
+ with contextlib.nested(
+ mock.patch.object(agent.client, 'call'),
+ mock.patch.object(agent.client, 'prepare'),
+ ) as (
+ mock_call, mock_prepare
+ ):
+ mock_prepare.return_value = agent.client
+ mock_call.return_value = expect_val
func_obj = getattr(agent, method)
if method == 'tunnel_sync':
actual_val = func_obj(ctxt, 'fake_tunnel_ip')
ctxt = context.RequestContext('fake_user', 'fake_project')
expect_val_get_device_details = 'foo'
expect_val = [expect_val_get_device_details]
- with mock.patch('neutron.common.rpc.RpcProxy.call') as rpc_call:
- rpc_call.side_effect = [messaging.UnsupportedVersion('1.2'),
+ with contextlib.nested(
+ mock.patch.object(agent.client, 'call'),
+ mock.patch.object(agent.client, 'prepare'),
+ ) as (
+ mock_call, mock_prepare
+ ):
+ mock_prepare.return_value = agent.client
+ mock_call.side_effect = [messaging.UnsupportedVersion('1.2'),
expect_val_get_device_details]
func_obj = getattr(agent, 'get_devices_details_list')
actual_val = func_obj(ctxt, ['fake_device'], 'fake_agent_id')
class SecurityGroupServerRpcApiTestCase(base.BaseTestCase):
- def setUp(self):
- super(SecurityGroupServerRpcApiTestCase, self).setUp()
- self.rpc = FakeSGRpcApi('fake_topic')
- self.rpc.call = mock.Mock()
-
def test_security_group_rules_for_devices(self):
- self.rpc.security_group_rules_for_devices(None, ['fake_device'])
- self.rpc.call.assert_has_calls(
- [mock.call(None,
- {'args':
- {'devices': ['fake_device']},
- 'method': 'security_group_rules_for_devices',
- 'namespace': None},
- version='1.1')])
+ rpcapi = FakeSGRpcApi('fake_topic')
+
+ with contextlib.nested(
+ mock.patch.object(rpcapi.client, 'call'),
+ mock.patch.object(rpcapi.client, 'prepare'),
+ ) as (
+ rpc_mock, prepare_mock
+ ):
+ prepare_mock.return_value = rpcapi.client
+ rpcapi.security_group_rules_for_devices('context', ['fake_device'])
+
+ rpc_mock.assert_called_once_with(
+ 'context',
+ 'security_group_rules_for_devices',
+ devices=['fake_device'])
class FakeSGNotifierAPI(n_rpc.RpcProxy,