# License for the specific language governing permissions and limitations
# under the License.
+from oslo import messaging
+
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
-class AgentNotifierApi(n_rpc.RpcProxy):
+class AgentNotifierApi(object):
'''Agent side of the openvswitch rpc API.
API version history:
'''
- BASE_RPC_API_VERSION = '1.0'
-
def __init__(self, topic):
- super(AgentNotifierApi, self).__init__(
- topic=topic, default_version=self.BASE_RPC_API_VERSION)
+ self.topic = topic
self.topic_network_delete = topics.get_topic_name(topic,
topics.NETWORK,
topics.DELETE)
self.topic_tunnel_update = topics.get_topic_name(topic,
constants.TUNNEL,
topics.UPDATE)
+ target = messaging.Target(topic=topic, version='1.0')
+ self.client = n_rpc.get_client(target)
def network_delete(self, context, network_id):
- self.fanout_cast(context,
- self.make_msg('network_delete',
- network_id=network_id),
- topic=self.topic_network_delete)
+ cctxt = self.client.prepare(topic=self.topic_network_delete,
+ fanout=True)
+ cctxt.cast(context, 'network_delete', network_id=network_id)
def port_update(self, context, port, network_type, segmentation_id,
physical_network):
- self.fanout_cast(context,
- self.make_msg('port_update',
- port=port,
- network_type=network_type,
- segmentation_id=segmentation_id,
- physical_network=physical_network),
- topic=self.topic_port_update)
+ cctxt = self.client.prepare(topic=self.topic_port_update,
+ fanout=True)
+ cctxt.cast(context, 'port_update', port=port,
+ network_type=network_type, segmentation_id=segmentation_id,
+ physical_network=physical_network)
def port_delete(self, context, port_id):
- self.fanout_cast(context,
- self.make_msg('port_delete',
- port_id=port_id),
- topic=self.topic_port_delete)
+ cctxt = self.client.prepare(topic=self.topic_port_delete,
+ fanout=True)
+ cctxt.cast(context, 'port_delete', port_id=port_id)
def tunnel_update(self, context, tunnel_ip, tunnel_id):
- self.fanout_cast(context,
- self.make_msg('tunnel_update',
- tunnel_ip=tunnel_ip,
- tunnel_id=tunnel_id),
- topic=self.topic_tunnel_update)
+ cctxt = self.client.prepare(topic=self.topic_tunnel_update,
+ fanout=True)
+ cctxt.cast(context, 'tunnel_update', tunnel_ip=tunnel_ip,
+ tunnel_id=tunnel_id)
import mock
from neutron.agent import rpc as agent_rpc
-from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.openstack.common import context
from neutron.plugins.hyperv import agent_notifier_api as ana
class rpcHyperVApiTestCase(base.BaseTestCase):
- def _test_hyperv_neutron_api_legacy(
- self, rpcapi, topic, method, rpc_method, **kwargs):
- # NOTE(russellb) This version of the test method is used for interfaces
- # not yet converted away from using the RpcProxy compatibility class.
- ctxt = context.RequestContext('fake_user', 'fake_project')
- expected_retval = 'foo' if rpc_method == 'call' else None
- expected_version = kwargs.pop('version', None)
- expected_msg = rpcapi.make_msg(method, **kwargs)
-
- proxy = n_rpc.RpcProxy
- with mock.patch.object(proxy, rpc_method) as rpc_method_mock:
- rpc_method_mock.return_value = expected_retval
- retval = getattr(rpcapi, method)(ctxt, **kwargs)
-
- self.assertEqual(retval, expected_retval)
- additional_args = {}
- if topic:
- additional_args['topic'] = topic
- if expected_version:
- additional_args['version'] = expected_version
- expected = [
- mock.call(ctxt, expected_msg, **additional_args)
- ]
- rpc_method_mock.assert_has_calls(expected)
-
def _test_hyperv_neutron_api(
self, rpcapi, topic, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if rpc_method == 'call' else None
expected_version = kwargs.pop('version', None)
+ fanout = kwargs.pop('fanout', False)
with contextlib.nested(
mock.patch.object(rpcapi.client, rpc_method),
prepare_args = {}
if expected_version:
prepare_args['version'] = expected_version
+ if fanout:
+ prepare_args['fanout'] = True
+ if topic:
+ prepare_args['topic'] = topic
prepare_mock.assert_called_once_with(**prepare_args)
rpc_mock.assert_called_once_with(ctxt, method, **kwargs)
def test_delete_network(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT)
- self._test_hyperv_neutron_api_legacy(
+ self._test_hyperv_neutron_api(
rpcapi,
topics.get_topic_name(
topics.AGENT,
topics.NETWORK,
topics.DELETE),
- 'network_delete', rpc_method='fanout_cast',
+ 'network_delete', rpc_method='cast', fanout=True,
network_id='fake_request_spec')
def test_port_update(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT)
- self._test_hyperv_neutron_api_legacy(
+ self._test_hyperv_neutron_api(
rpcapi,
topics.get_topic_name(
topics.AGENT,
topics.PORT,
topics.UPDATE),
- 'port_update', rpc_method='fanout_cast',
+ 'port_update', rpc_method='cast', fanout=True,
port='fake_port',
network_type='fake_network_type',
segmentation_id='fake_segmentation_id',
def test_port_delete(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT)
- self._test_hyperv_neutron_api_legacy(
+ self._test_hyperv_neutron_api(
rpcapi,
topics.get_topic_name(
topics.AGENT,
topics.PORT,
topics.DELETE),
- 'port_delete', rpc_method='fanout_cast',
+ 'port_delete', rpc_method='cast', fanout=True,
port_id='port_id')
def test_tunnel_update(self):
rpcapi = ana.AgentNotifierApi(topics.AGENT)
- self._test_hyperv_neutron_api_legacy(
+ self._test_hyperv_neutron_api(
rpcapi,
topics.get_topic_name(
topics.AGENT,
constants.TUNNEL,
topics.UPDATE),
- 'tunnel_update', rpc_method='fanout_cast',
+ 'tunnel_update', rpc_method='cast', fanout=True,
tunnel_ip='fake_ip', tunnel_id='fake_id')
def test_device_details(self):