# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
from oslo.config import cfg
+from oslo import messaging
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.common import rpc as n_rpc
LOG = logging.getLogger(__name__)
-class AgentNotifierApi(n_rpc.RpcProxy,
- sg_rpc.SecurityGroupAgentRpcApiMixin):
+class AgentNotifierApi(sg_rpc.SecurityGroupAgentRpcApiMixin):
"""Agent side of the Embedded Switch RPC API.
API version history:
1.1 - Added get_active_networks_info, create_dhcp_port,
and update_dhcp_port methods.
"""
- BASE_RPC_API_VERSION = '1.1'
-
def __init__(self, topic):
- super(AgentNotifierApi, self).__init__(
- topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.topic = topic
self.topic_network_delete = topics.get_topic_name(topic,
topics.NETWORK,
self.topic_port_update = topics.get_topic_name(topic,
topics.PORT,
topics.UPDATE)
+ target = messaging.Target(topic=topic, version='1.0')
+ self.client = n_rpc.get_client(target)
def network_delete(self, context, network_id):
LOG.debug(_("Sending delete network message"))
- self.fanout_cast(context,
- self.make_msg('network_delete',
- network_id=network_id),
- topic=self.topic_network_delete)
+ cctxt = self.client.prepare(topic=self.topic_network_delete,
+ fanout=True)
+ cctxt.cast(context, 'network_delete', network_id=network_id)
def port_update(self, context, port, physical_network,
network_type, vlan_id):
'segmentation_id': vlan_id}
if cfg.CONF.AGENT.rpc_support_old_agents:
kwargs['vlan_id'] = vlan_id
- msg = self.make_msg('port_update', **kwargs)
- self.fanout_cast(context, msg,
- topic=self.topic_port_update)
+ cctxt = self.client.prepare(topic=self.topic_port_update, fanout=True)
+ cctxt.cast(context, 'port_update', **kwargs)
import contextlib
import mock
-import fixtures
from oslo.config import cfg
from neutron.agent import rpc as agent_rpc
class rpcApiTestCase(base.BaseTestCase):
- def _test_mlnx_api_legacy(self, rpcapi, topic, method, rpc_method,
- expected_msg=None, **kwargs):
- # NOTE(russellb) This method can be removed once the AgentNotifierApi
- # has been converted to no longer use the RpcProxy class.
- ctxt = context.RequestContext('fake_user', 'fake_project')
- expected_retval = 'foo' if rpc_method == 'call' else None
- expected_kwargs = {}
- if topic:
- expected_kwargs['topic'] = topic
- if 'version' in kwargs:
- expected_kwargs['version'] = kwargs.pop('version')
- if not expected_msg:
- expected_msg = rpcapi.make_msg(method, **kwargs)
-
- self.fake_args = None
- self.fake_kwargs = None
-
- def _fake_rpc_method(*args, **kwargs):
- self.fake_args = args
- self.fake_kwargs = kwargs
- if expected_retval:
- return expected_retval
-
- self.useFixture(fixtures.MonkeyPatch(
- 'neutron.common.rpc.RpcProxy.' + rpc_method,
- _fake_rpc_method))
-
- retval = getattr(rpcapi, method)(ctxt, **kwargs)
-
- self.assertEqual(expected_retval, retval)
- expected_args = [ctxt, expected_msg]
-
- # skip the first argument which is 'self'
- for arg, expected_arg in zip(self.fake_args[1:], expected_args):
- self.assertEqual(expected_arg, arg)
- self.assertEqual(expected_kwargs, self.fake_kwargs)
-
def _test_mlnx_api(self, rpcapi, topic, method, rpc_method, **kwargs):
ctxt = context.RequestContext('fake_user', 'fake_project')
expected_retval = 'foo' if rpc_method == 'call' else None
expected_version = kwargs.pop('version', None)
+ fanout = kwargs.pop('fanout', False)
with contextlib.nested(
mock.patch.object(rpcapi.client, rpc_method),
prepare_args = {}
if expected_version:
prepare_args['version'] = expected_version
+ if fanout:
+ prepare_args['fanout'] = True
+ if topic:
+ prepare_args['topic'] = topic
prepare_mock.assert_called_once_with(**prepare_args)
+ if method == 'port_update':
+ kwargs['segmentation_id'] = kwargs['vlan_id']
+ if not cfg.CONF.AGENT.rpc_support_old_agents:
+ del kwargs['vlan_id']
+
self.assertEqual(retval, expected_retval)
rpc_mock.assert_called_once_with(ctxt, method, **kwargs)
def test_delete_network(self):
rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT)
- self._test_mlnx_api_legacy(
+ self._test_mlnx_api(
rpcapi,
topics.get_topic_name(topics.AGENT,
topics.NETWORK,
topics.DELETE),
- 'network_delete', rpc_method='fanout_cast',
+ 'network_delete', rpc_method='cast', fanout=True,
network_id='fake_request_spec')
def test_port_update(self):
cfg.CONF.set_override('rpc_support_old_agents', False, 'AGENT')
rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT)
- expected_msg = rpcapi.make_msg('port_update',
- port='fake_port',
- network_type='vlan',
- physical_network='fake_net',
- segmentation_id='fake_vlan_id')
- self._test_mlnx_api_legacy(
+ self._test_mlnx_api(
rpcapi,
topics.get_topic_name(topics.AGENT,
topics.PORT,
topics.UPDATE),
- 'port_update', rpc_method='fanout_cast',
- expected_msg=expected_msg,
+ 'port_update', rpc_method='cast', fanout=True,
port='fake_port',
network_type='vlan',
physical_network='fake_net',
vlan_id='fake_vlan_id')
- def test_port_update_ib(self):
- cfg.CONF.set_override('rpc_support_old_agents', False, 'AGENT')
- rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT)
- expected_msg = rpcapi.make_msg('port_update',
- port='fake_port',
- network_type='ib',
- physical_network='fake_net',
- segmentation_id='fake_vlan_id')
- self._test_mlnx_api_legacy(
- rpcapi,
- topics.get_topic_name(topics.AGENT,
- topics.PORT,
- topics.UPDATE),
- 'port_update', rpc_method='fanout_cast',
- expected_msg=expected_msg,
- port='fake_port',
- network_type='ib',
- physical_network='fake_net',
- vlan_id='fake_vlan_id')
-
def test_port_update_old_agent(self):
cfg.CONF.set_override('rpc_support_old_agents', True, 'AGENT')
rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT)
- expected_msg = rpcapi.make_msg('port_update',
- port='fake_port',
- network_type='vlan',
- physical_network='fake_net',
- segmentation_id='fake_vlan_id',
- vlan_id='fake_vlan_id')
- self._test_mlnx_api_legacy(
+ self._test_mlnx_api(
rpcapi,
topics.get_topic_name(topics.AGENT,
topics.PORT,
topics.UPDATE),
- 'port_update', rpc_method='fanout_cast',
- expected_msg=expected_msg,
+ 'port_update', rpc_method='cast', fanout=True,
port='fake_port',
network_type='vlan',
physical_network='fake_net',