"attribute %(attr)s of %(resource)s")
-class CiscoCsrIPsecVpnDriverApi(n_rpc.RpcProxy):
+class CiscoCsrIPsecVpnDriverApi(object):
"""RPC API for agent to plugin messaging."""
+ def __init__(self, topic):
+ target = messaging.Target(topic=topic, version='1.0')
+ self.client = n_rpc.get_client(target)
+
def get_vpn_services_on_host(self, context, host):
"""Get list of vpnservices on this host.
The vpnservices including related ipsec_site_connection,
ikepolicy, ipsecpolicy, and Cisco info on this host.
"""
- return self.call(context,
- self.make_msg('get_vpn_services_on_host',
- host=host))
+ cctxt = self.client.prepare()
+ return cctxt.call(context, 'get_vpn_services_on_host', host=host)
def update_status(self, context, status):
"""Update status for all VPN services and connections."""
- return self.cast(context,
- self.make_msg('update_status',
- status=status))
+ cctxt = self.client.prepare()
+ return cctxt.call(context, 'update_status', status=status)
@six.add_metaclass(abc.ABCMeta)
self.conn.create_consumer(node_topic, self.endpoints, fanout=False)
self.conn.consume_in_threads()
self.agent_rpc = (
- CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC, '1.0'))
+ CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC))
self.periodic_report = loopingcall.FixedIntervalLoopingCall(
self.report_status, context)
self.periodic_report.start(
self.connection_status = {}
-class IPsecVpnDriverApi(n_rpc.RpcProxy):
+class IPsecVpnDriverApi(object):
"""IPSecVpnDriver RPC api."""
- IPSEC_PLUGIN_VERSION = '1.0'
+
+ def __init__(self, topic):
+ target = messaging.Target(topic=topic, version='1.0')
+ self.client = n_rpc.get_client(target)
def get_vpn_services_on_host(self, context, host):
"""Get list of vpnservices.
The vpnservices including related ipsec_site_connection,
ikepolicy and ipsecpolicy on this host
"""
- return self.call(context,
- self.make_msg('get_vpn_services_on_host',
- host=host),
- version=self.IPSEC_PLUGIN_VERSION)
+ cctxt = self.client.prepare()
+ return cctxt.call(context, 'get_vpn_services_on_host', host=host)
def update_status(self, context, status):
"""Update local status.
This method call updates status attribute of
VPNServices.
"""
- return self.cast(context,
- self.make_msg('update_status',
- status=status),
- version=self.IPSEC_PLUGIN_VERSION)
+ cctxt = self.client.prepare()
+ return cctxt.call(context, 'update_status', status=status)
@six.add_metaclass(abc.ABCMeta)
self.endpoints = [self]
self.conn.create_consumer(node_topic, self.endpoints, fanout=False)
self.conn.consume_in_threads()
- self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC, '1.0')
+ self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC)
self.process_status_cache_check = loopingcall.FixedIntervalLoopingCall(
self.report_status, self.context)
self.process_status_cache_check.start(
import abc
+from oslo import messaging
import six
from neutron.common import rpc as n_rpc
pass
-class BaseIPsecVpnAgentApi(n_rpc.RpcProxy):
+class BaseIPsecVpnAgentApi(object):
"""Base class for IPSec API to agent."""
def __init__(self, topic, default_version, driver):
self.topic = topic
self.driver = driver
- super(BaseIPsecVpnAgentApi, self).__init__(topic, default_version)
+ target = messaging.Target(topic=topic, version=default_version)
+ self.client = n_rpc.get_client(target)
def _agent_notification(self, context, method, router_id,
version=None, **kwargs):
'host': l3_agent.host,
'method': method,
'args': kwargs})
- self.cast(
- context, self.make_msg(method, **kwargs),
- version=version,
- topic='%s.%s' % (self.topic, l3_agent.host))
+ cctxt = self.client.prepare(server=l3_agent.host, version=version)
+ cctxt.cast(context, method, **kwargs)
def vpnservice_updated(self, context, router_id, **kwargs):
"""Send update event of vpnservices."""
'method': method,
'args': kwargs,
'router': router_id})
- self.cast(context, self.make_msg(method, **kwargs),
- version=version,
- topic='%s.%s' % (self.topic, host))
+ cctxt = self.client.prepare(server=host, version=version)
+ cctxt.cast(context, method, **kwargs)
class CiscoCsrIPsecVPNDriver(service_drivers.VpnDriver):
# License for the specific language governing permissions and limitations
# under the License.
+import contextlib
+
import mock
from oslo.config import cfg
self.context = n_ctx.Context('some_user', 'some_tenant')
def _test_update(self, func, args, additional_info=None):
- with mock.patch.object(self.driver.agent_rpc, 'cast') as cast:
+ with contextlib.nested(
+ mock.patch.object(self.driver.agent_rpc.client, 'cast'),
+ mock.patch.object(self.driver.agent_rpc.client, 'prepare'),
+ ) as (
+ rpc_mock, prepare_mock
+ ):
+ prepare_mock.return_value = self.driver.agent_rpc.client
func(self.context, *args)
- cast.assert_called_once_with(
- self.context,
- {'args': additional_info,
- 'namespace': None,
- 'method': 'vpnservice_updated'},
- version='1.0',
- topic='cisco_csr_ipsec_agent.fake_host')
+
+ prepare_args = {'server': 'fake_host', 'version': '1.0'}
+ prepare_mock.assert_called_once_with(**prepare_args)
+
+ rpc_mock.assert_called_once_with(self.context, 'vpnservice_updated',
+ reason=mock.ANY)
def test_create_ipsec_site_connection(self):
self._test_update(self.driver.create_ipsec_site_connection,
# License for the specific language governing permissions and limitations
# under the License.
+import contextlib
+
import mock
from oslo.config import cfg
def _test_update(self, func, args):
ctxt = n_ctx.Context('', 'somebody')
- with mock.patch.object(self.driver.agent_rpc, 'cast') as cast:
+ with contextlib.nested(
+ mock.patch.object(self.driver.agent_rpc.client, 'cast'),
+ mock.patch.object(self.driver.agent_rpc.client, 'prepare'),
+ ) as (
+ rpc_mock, prepare_mock
+ ):
+ prepare_mock.return_value = self.driver.agent_rpc.client
func(ctxt, *args)
- cast.assert_called_once_with(
- ctxt,
- {'args': {},
- 'namespace': None,
- 'method': 'vpnservice_updated'},
- version='1.0',
- topic='ipsec_agent.fake_host')
+
+ prepare_args = {'server': 'fake_host', 'version': '1.0'}
+ prepare_mock.assert_called_once_with(**prepare_args)
+
+ rpc_mock.assert_called_once_with(ctxt, 'vpnservice_updated')
def test_create_ipsec_site_connection(self):
self._test_update(self.driver.create_ipsec_site_connection,