"""Make a remote process call to retrieve all network info."""
networks = self.call(self.context,
self.make_msg('get_active_networks_info',
- host=self.host),
- topic=self.topic)
+ host=self.host))
return [dhcp.NetModel(self.use_namespaces, n) for n in networks]
def get_network_info(self, network_id):
network = self.call(self.context,
self.make_msg('get_network_info',
network_id=network_id,
- host=self.host),
- topic=self.topic)
+ host=self.host))
if network:
return dhcp.NetModel(self.use_namespaces, network)
self.make_msg('get_dhcp_port',
network_id=network_id,
device_id=device_id,
- host=self.host),
- topic=self.topic)
+ host=self.host))
if port:
return dhcp.DictModel(port)
port = self.call(self.context,
self.make_msg('create_dhcp_port',
port=port,
- host=self.host),
- topic=self.topic)
+ host=self.host))
if port:
return dhcp.DictModel(port)
self.make_msg('update_dhcp_port',
port_id=port_id,
port=port,
- host=self.host),
- topic=self.topic)
+ host=self.host))
if port:
return dhcp.DictModel(port)
self.make_msg('release_dhcp_port',
network_id=network_id,
device_id=device_id,
- host=self.host),
- topic=self.topic)
+ host=self.host))
def release_port_fixed_ip(self, network_id, device_id, subnet_id):
"""Make a remote process call to release a fixed_ip on the port."""
network_id=network_id,
subnet_id=subnet_id,
device_id=device_id,
- host=self.host),
- topic=self.topic)
+ host=self.host))
class NetworkCache(object):
"""Make a remote process call to retrieve the sync data for routers."""
return self.call(context,
self.make_msg('sync_routers', host=self.host,
- router_ids=router_ids),
- topic=self.topic)
+ router_ids=router_ids))
def get_external_network_id(self, context):
"""Make a remote process call to retrieve the external network id.
"""
return self.call(context,
self.make_msg('get_external_network_id',
- host=self.host),
- topic=self.topic)
+ host=self.host))
def update_floatingip_statuses(self, context, router_id, fip_statuses):
"""Call the plugin update floating IPs's operational status."""
self.make_msg('update_floatingip_statuses',
router_id=router_id,
fip_statuses=fip_statuses),
- topic=self.topic,
version='1.1')
def get_ports_by_subnet(self, context, subnet_id):
agent_state},
time=timeutils.strtime())
if use_call:
- return self.call(context, msg, topic=self.topic)
+ return self.call(context, msg)
else:
- return self.cast(context, msg, topic=self.topic)
+ return self.cast(context, msg)
class PluginApi(n_rpc.RpcProxy):
def get_device_details(self, context, device, agent_id, host=None):
return self.call(context,
self.make_msg('get_device_details', device=device,
- agent_id=agent_id, host=host),
- topic=self.topic)
+ agent_id=agent_id, host=host))
def get_devices_details_list(self, context, devices, agent_id, host=None):
res = []
devices=devices,
agent_id=agent_id,
host=host),
- topic=self.topic, version='1.3')
+ version='1.3')
except messaging.UnsupportedVersion:
# If the server has not been upgraded yet, a DVR-enabled agent
# may not work correctly, however it can function in 'degraded'
res = [
self.call(context,
self.make_msg('get_device_details', device=device,
- agent_id=agent_id, host=host),
- topic=self.topic)
+ agent_id=agent_id, host=host))
for device in devices
]
return res
def update_device_down(self, context, device, agent_id, host=None):
return self.call(context,
self.make_msg('update_device_down', device=device,
- agent_id=agent_id, host=host),
- topic=self.topic)
+ agent_id=agent_id, host=host))
def update_device_up(self, context, device, agent_id, host=None):
return self.call(context,
self.make_msg('update_device_up', device=device,
- agent_id=agent_id, host=host),
- topic=self.topic)
+ agent_id=agent_id, host=host))
def tunnel_sync(self, context, tunnel_ip, tunnel_type=None):
return self.call(context,
self.make_msg('tunnel_sync', tunnel_ip=tunnel_ip,
- tunnel_type=tunnel_type),
- topic=self.topic)
+ tunnel_type=tunnel_type))
return self.call(context,
self.make_msg('security_group_rules_for_devices',
devices=devices),
- version=SG_RPC_VERSION,
- topic=self.topic)
+ version=SG_RPC_VERSION)
class SecurityGroupAgentRpcCallbackMixin(object):
'router_id': router_id})
self.fanout_cast(
context, self.make_msg(method,
- router_id=router_id),
- topic=self.topic)
+ router_id=router_id))
def _notification(self, context, method, routers):
"""Notify all the agents that are hosting the routers."""
plugin, constants.L3_AGENT_SCHEDULER_EXT_ALIAS):
self._agent_notification(context, method, routers)
else:
- self.fanout_cast(context, self.make_msg(method, routers=routers),
- topic=self.topic)
+ self.fanout_cast(context, self.make_msg(method, routers=routers))
def router_deleted(self, context, router_id):
self._notification_fanout(context, 'router_deleted', router_id)
return self.call(context,
self.make_msg('get_dvr_mac_address_by_host',
host=host),
- version=self.DVR_RPC_VERSION,
- topic=self.topic)
+ version=self.DVR_RPC_VERSION)
@log.log
def get_dvr_mac_address_list(self, context):
return self.call(context,
self.make_msg('get_dvr_mac_address_list'),
- version=self.DVR_RPC_VERSION,
- topic=self.topic)
+ version=self.DVR_RPC_VERSION)
@log.log
def get_compute_ports_on_host_by_subnet(self, context, host, subnet):
self.make_msg('get_compute_ports_on_host_by_subnet',
host=host,
subnet=subnet),
- version=self.DVR_RPC_VERSION,
- topic=self.topic)
+ version=self.DVR_RPC_VERSION)
@log.log
def get_subnet_for_dvr(self, context, subnet):
return self.call(context,
self.make_msg('get_subnet_for_dvr',
subnet=subnet),
- version=self.DVR_RPC_VERSION,
- topic=self.topic)
+ version=self.DVR_RPC_VERSION)
class DVRServerRpcCallbackMixin(object):
def sdnve_info(self, context, info):
return self.call(context,
- self.make_msg('sdnve_info', info=info),
- topic=self.topic)
+ self.make_msg('sdnve_info', info=info))
class SdnveNeutronAgent(n_rpc.RpcCallback):
def get_ofp_rest_api_addr(self, context):
LOG.debug(_("Get Ryu rest API address"))
return self.call(context,
- self.make_msg('get_ofp_rest_api'),
- topic=self.topic)
+ self.make_msg('get_ofp_rest_api'))
class RyuSecurityGroupAgent(sg_rpc.SecurityGroupAgentRpcMixin):
"""Make a RPC to set the status of a firewall."""
return self.call(context,
self.make_msg('set_firewall_status', host=self.host,
- firewall_id=firewall_id, status=status),
- topic=self.topic)
+ firewall_id=firewall_id, status=status))
def firewall_deleted(self, context, firewall_id):
"""Make a RPC to indicate that the firewall resources are deleted."""
return self.call(context,
self.make_msg('firewall_deleted', host=self.host,
- firewall_id=firewall_id),
- topic=self.topic)
+ firewall_id=firewall_id))
class FWaaSAgentRpcCallbackMixin(object):
return self.call(context,
self.make_msg('get_firewalls_for_tenant',
- host=self.host),
- topic=self.topic)
+ host=self.host))
def get_tenants_with_firewalls(self, context, **kwargs):
"""Get all Tenants that have Firewalls configured from plugin."""
return self.call(context,
self.make_msg('get_tenants_with_firewalls',
- host=self.host),
- topic=self.topic)
+ host=self.host))
class FWaaSL3AgentRpcCallback(api.FWaaSAgentRpcCallbackMixin):
return self.fanout_cast(
context,
self.make_msg('create_firewall', firewall=firewall,
- host=self.host),
- topic=self.topic
+ host=self.host)
)
def update_firewall(self, context, firewall):
return self.fanout_cast(
context,
self.make_msg('update_firewall', firewall=firewall,
- host=self.host),
- topic=self.topic
+ host=self.host)
)
def delete_firewall(self, context, firewall):
return self.fanout_cast(
context,
self.make_msg('delete_firewall', firewall=firewall,
- host=self.host),
- topic=self.topic
+ host=self.host)
)
def get_ready_devices(self):
return self.call(
self.context,
- self.make_msg('get_ready_devices', host=self.host),
- topic=self.topic
+ self.make_msg('get_ready_devices', host=self.host)
)
def pool_destroyed(self, pool_id):
return self.call(
self.context,
- self.make_msg('pool_destroyed', pool_id=pool_id),
- topic=self.topic
+ self.make_msg('pool_destroyed', pool_id=pool_id)
)
def pool_deployed(self, pool_id):
return self.call(
self.context,
- self.make_msg('pool_deployed', pool_id=pool_id),
- topic=self.topic
+ self.make_msg('pool_deployed', pool_id=pool_id)
)
def get_logical_device(self, pool_id):
self.make_msg(
'get_logical_device',
pool_id=pool_id
- ),
- topic=self.topic
+ )
)
def update_status(self, obj_type, obj_id, status):
return self.call(
self.context,
self.make_msg('update_status', obj_type=obj_type, obj_id=obj_id,
- status=status),
- topic=self.topic
+ status=status)
)
def plug_vip_port(self, port_id):
return self.call(
self.context,
- self.make_msg('plug_vip_port', port_id=port_id, host=self.host),
- topic=self.topic
+ self.make_msg('plug_vip_port', port_id=port_id, host=self.host)
)
def unplug_vip_port(self, port_id):
return self.call(
self.context,
- self.make_msg('unplug_vip_port', port_id=port_id, host=self.host),
- topic=self.topic
+ self.make_msg('unplug_vip_port', port_id=port_id, host=self.host)
)
def update_pool_stats(self, pool_id, stats):
pool_id=pool_id,
stats=stats,
host=self.host
- ),
- topic=self.topic
+ )
)
"""
return self.call(context,
self.make_msg('get_vpn_services_on_host',
- host=host),
- topic=self.topic)
+ host=host))
def update_status(self, context, status):
"""Update status for all VPN services and connections."""
return self.cast(context,
self.make_msg('update_status',
- status=status),
- topic=self.topic)
+ status=status))
@six.add_metaclass(abc.ABCMeta)
return self.call(context,
self.make_msg('get_vpn_services_on_host',
host=host),
- version=self.IPSEC_PLUGIN_VERSION,
- topic=self.topic)
+ version=self.IPSEC_PLUGIN_VERSION)
def update_status(self, context, status):
"""Update local status.
return self.cast(context,
self.make_msg('update_status',
status=status),
- version=self.IPSEC_PLUGIN_VERSION,
- topic=self.topic)
+ version=self.IPSEC_PLUGIN_VERSION)
@six.add_metaclass(abc.ABCMeta)
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
+ additional_args = {}
+ if topic:
+ additional_args['topic'] = topic
if expected_version:
- expected = [
- mock.call(ctxt, expected_msg, topic=topic,
- version=expected_version)]
- else:
- expected = [
- mock.call(ctxt, expected_msg, topic=topic)
- ]
+ additional_args['version'] = expected_version
+ expected = [
+ mock.call(ctxt, expected_msg, **additional_args)
+ ]
rpc_method_mock.assert_has_calls(expected)
def test_delete_network(self):
def test_device_details(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_hyperv_neutron_api(
- rpcapi, topics.PLUGIN,
+ rpcapi, None,
'get_device_details', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id',
def test_devices_details_list(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_hyperv_neutron_api(
- rpcapi, topics.PLUGIN,
+ rpcapi, None,
'get_devices_details_list', rpc_method='call',
devices=['fake_device1', 'fake_device2'],
agent_id='fake_agent_id', host='fake_host',
def test_update_device_down(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_hyperv_neutron_api(
- rpcapi, topics.PLUGIN,
+ rpcapi, None,
'update_device_down', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id',
def test_tunnel_sync(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
self._test_hyperv_neutron_api(
- rpcapi, topics.PLUGIN,
+ rpcapi, None,
'tunnel_sync', rpc_method='call',
tunnel_ip='fake_tunnel_ip',
tunnel_type=None)
expected_msg=None, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if method == 'call' else None
- expected_kwargs = {'topic': topic}
+ expected_kwargs = {}
+ if topic:
+ expected_kwargs['topic'] = topic
if 'version' in kwargs:
expected_kwargs['version'] = kwargs.pop('version')
if not expected_msg:
def test_device_details(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
- self._test_lb_api(rpcapi, topics.PLUGIN,
+ self._test_lb_api(rpcapi, None,
'get_device_details', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id',
def test_devices_details_list(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
- self._test_lb_api(rpcapi, topics.PLUGIN,
+ self._test_lb_api(rpcapi, None,
'get_devices_details_list', rpc_method='call',
devices=['fake_device1', 'fake_device2'],
agent_id='fake_agent_id', host='fake_host',
def test_update_device_down(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
- self._test_lb_api(rpcapi, topics.PLUGIN,
+ self._test_lb_api(rpcapi, None,
'update_device_down', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id',
def test_update_device_up(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
- self._test_lb_api(rpcapi, topics.PLUGIN,
+ self._test_lb_api(rpcapi, None,
'update_device_up', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id',
retval = getattr(rpcapi, method)(ctxt, **kwargs)
self.assertEqual(retval, expected_retval)
+ additional_args = {}
+ if topic:
+ additional_args['topic'] = topic
if expected_version:
- expected = [
- mock.call(ctxt, expected_msg, topic=topic,
- version=expected_version)]
- else:
- expected = [
- mock.call(ctxt, expected_msg, topic=topic)
- ]
+ additional_args['version'] = expected_version
+ expected = [
+ mock.call(ctxt, expected_msg, **additional_args)
+ ]
rpc_method_mock.assert_has_calls(expected)
def test_delete_network(self):
def test_device_details(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
- self._test_rpc_api(rpcapi, topics.PLUGIN,
+ self._test_rpc_api(rpcapi, None,
'get_device_details', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id',
def test_devices_details_list(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
- self._test_rpc_api(rpcapi, topics.PLUGIN,
+ self._test_rpc_api(rpcapi, None,
'get_devices_details_list', rpc_method='call',
devices=['fake_device1', 'fake_device2'],
agent_id='fake_agent_id', host='fake_host',
def test_update_device_down(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
- self._test_rpc_api(rpcapi, topics.PLUGIN,
+ self._test_rpc_api(rpcapi, None,
'update_device_down', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id',
def test_tunnel_sync(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
- self._test_rpc_api(rpcapi, topics.PLUGIN,
+ self._test_rpc_api(rpcapi, None,
'tunnel_sync', rpc_method='call',
tunnel_ip='fake_tunnel_ip',
tunnel_type=None)
def test_update_device_up(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
- self._test_rpc_api(rpcapi, topics.PLUGIN,
+ self._test_rpc_api(rpcapi, None,
'update_device_up', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id',
expected_msg=None, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if method == 'call' else None
- expected_kwargs = {'topic': topic}
+ expected_kwargs = {}
+ if topic:
+ expected_kwargs['topic'] = topic
if 'version' in kwargs:
expected_kwargs['version'] = kwargs.pop('version')
if not expected_msg:
def test_device_details(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
- self._test_mlnx_api(rpcapi, topics.PLUGIN,
+ self._test_mlnx_api(rpcapi, None,
'get_device_details', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id',
def test_devices_details_list(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
- self._test_mlnx_api(rpcapi, topics.PLUGIN,
+ self._test_mlnx_api(rpcapi, None,
'get_devices_details_list', rpc_method='call',
devices=['fake_device1', 'fake_device1'],
agent_id='fake_agent_id', host='fake_host',
def test_update_device_down(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
- self._test_mlnx_api(rpcapi, topics.PLUGIN,
+ self._test_mlnx_api(rpcapi, None,
'update_device_down', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id',
def test_update_device_up(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
- self._test_mlnx_api(rpcapi, topics.PLUGIN,
+ self._test_mlnx_api(rpcapi, None,
'update_device_up', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id',
def _test_ovs_api(self, rpcapi, topic, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if method == 'call' else None
- expected_kwargs = {'topic': topic}
+ expected_kwargs = {}
+ if topic:
+ expected_kwargs['topic'] = topic
if 'version' in kwargs:
expected_kwargs['version'] = kwargs.pop('version')
expected_msg = rpcapi.make_msg(method, **kwargs)
def test_device_details(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
- self._test_ovs_api(rpcapi, topics.PLUGIN,
+ self._test_ovs_api(rpcapi, None,
'get_device_details', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id',
def test_devices_details_list(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
- self._test_ovs_api(rpcapi, topics.PLUGIN,
+ self._test_ovs_api(rpcapi, None,
'get_devices_details_list', rpc_method='call',
devices=['fake_device1', 'fake_device2'],
agent_id='fake_agent_id', host='fake_host',
def test_update_device_down(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
- self._test_ovs_api(rpcapi, topics.PLUGIN,
+ self._test_ovs_api(rpcapi, None,
'update_device_down', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id',
def test_tunnel_sync(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
- self._test_ovs_api(rpcapi, topics.PLUGIN,
+ self._test_ovs_api(rpcapi, None,
'tunnel_sync', rpc_method='call',
tunnel_ip='fake_tunnel_ip',
tunnel_type=None)
def test_update_device_up(self):
rpcapi = agent_rpc.PluginApi(topics.PLUGIN)
- self._test_ovs_api(rpcapi, topics.PLUGIN,
+ self._test_ovs_api(rpcapi, None,
'update_device_up', rpc_method='call',
device='fake_device',
agent_id='fake_agent_id',
mock.call('get_ofp_rest_api')
])
mock_call.assert_has_calls([
- mock.call('context', 'msg', topic='topics')
+ mock.call('context', 'msg')
])
mock_call.assert_called_once_with(
mock.sentinel.context,
- mock_make_msg.return_value,
- topic='topic')
+ mock_make_msg.return_value)
def test_firewall_deleted(self):
with contextlib.nested(
mock_call.assert_called_once_with(
mock.sentinel.context,
- mock_make_msg.return_value,
- topic='topic')
+ mock_make_msg.return_value)
self.assertEqual(rv, self.mock_fanoutcast.return_value)
self.mock_fanoutcast.assert_called_once_with(
mock.sentinel.context,
- self.mock_msg.return_value,
- topic='topic'
+ self.mock_msg.return_value
)
self.mock_msg.assert_called_once_with(
self.make_msg.assert_called_once_with('get_ready_devices', host='host')
self.mock_call.assert_called_once_with(
mock.sentinel.context,
- self.make_msg.return_value,
- topic='topic'
+ self.make_msg.return_value
)
def test_get_logical_device(self):
self.mock_call.assert_called_once_with(
mock.sentinel.context,
- self.make_msg.return_value,
- topic='topic'
+ self.make_msg.return_value
)
def test_pool_destroyed(self):
self.mock_call.assert_called_once_with(
mock.sentinel.context,
- self.make_msg.return_value,
- topic='topic'
+ self.make_msg.return_value
)
def test_pool_deployed(self):
self.mock_call.assert_called_once_with(
mock.sentinel.context,
- self.make_msg.return_value,
- topic='topic'
+ self.make_msg.return_value
)
def test_update_status(self):
self.mock_call.assert_called_once_with(
mock.sentinel.context,
self.make_msg.return_value,
- topic='topic'
)
def test_plug_vip_port(self):
self.mock_call.assert_called_once_with(
mock.sentinel.context,
- self.make_msg.return_value,
- topic='topic'
+ self.make_msg.return_value
)
def test_unplug_vip_port(self):
self.mock_call.assert_called_once_with(
mock.sentinel.context,
- self.make_msg.return_value,
- topic='topic'
+ self.make_msg.return_value
)
def test_update_pool_stats(self):
self.mock_call.assert_called_once_with(
mock.sentinel.context,
- self.make_msg.return_value,
- topic='topic'
+ self.make_msg.return_value
)
set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
set_context=True):
- self.mock_fanout.assert_called_with(self.ctx, expected,
- topic=self.topic)
+ self.mock_fanout.assert_called_with(self.ctx, expected)
def test_remove_metering_label_rpc_call(self):
expected = {'args':
with self.router(tenant_id=self.tenant_id, set_context=True):
with self.metering_label(tenant_id=self.tenant_id,
set_context=True):
- self.mock_fanout.assert_called_with(self.ctx, expected,
- topic=self.topic)
+ self.mock_fanout.assert_called_with(self.ctx, expected)
expected['method'] = 'remove_metering_label'
- self.mock_fanout.assert_called_with(self.ctx, expected,
- topic=self.topic)
+ self.mock_fanout.assert_called_with(self.ctx, expected)
def test_remove_one_metering_label_rpc_call(self):
second_uuid = 'e27fe2df-376e-4ac7-ae13-92f050a21f84'
self.mock_uuid.return_value = second_uuid
with self.metering_label(tenant_id=self.tenant_id,
set_context=True):
- self.mock_fanout.assert_called_with(self.ctx, expected_add,
- topic=self.topic)
- self.mock_fanout.assert_called_with(self.ctx, expected_remove,
- topic=self.topic)
+ self.mock_fanout.assert_called_with(self.ctx, expected_add)
+ self.mock_fanout.assert_called_with(self.ctx, expected_remove)
def test_update_metering_label_rules_rpc_call(self):
second_uuid = 'e27fe2df-376e-4ac7-ae13-92f050a21f84'
self.mock_uuid.return_value = second_uuid
with self.metering_label_rule(l['id'], direction='egress'):
self.mock_fanout.assert_called_with(self.ctx,
- expected_add,
- topic=self.topic)
+ expected_add)
self.mock_fanout.assert_called_with(self.ctx,
- expected_del,
- topic=self.topic)
+ expected_del)
def test_delete_metering_label_does_not_clear_router_tenant_id(self):
tenant_id = '654f6b9d-0f36-4ae5-bd1b-01616794ca60'
{'agent_state': expected_agent_state})
self.assertIsInstance(call.call_args[0][1]['args']['time'],
str)
- self.assertEqual(call.call_args[1]['topic'], topic)
def test_plugin_report_state_cast(self):
topic = 'test'
{'agent_state': expected_agent_state})
self.assertIsInstance(cast.call_args[0][1]['args']['time'],
str)
- self.assertEqual(cast.call_args[1]['topic'], topic)
class AgentRPCMethods(base.BaseTestCase):
{'devices': ['fake_device']},
'method': 'security_group_rules_for_devices',
'namespace': None},
- version=sg_rpc.SG_RPC_VERSION,
- topic='fake_topic')])
+ version=sg_rpc.SG_RPC_VERSION)])
class FakeSGNotifierAPI(n_rpc.RpcProxy,