From: Russell Bryant Date: Wed, 19 Nov 2014 17:32:38 +0000 (+0000) Subject: Drop RpcProxy usage from VPNaaS code X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=2161b1aea123bb23b393f01da4a2232704ecae75;p=openstack-build%2Fneutron-build.git Drop RpcProxy usage from VPNaaS code Drop the usage of the RpcProxy compatibility class from the VPNaaS code. The equivalent direct usage of oslo.messaging APIs is now used instead. Part of blueprint drop-rpc-compat. Change-Id: I4ff0bfe0b5e909bfe088f4059d85aa6366526dad --- diff --git a/neutron/services/vpn/device_drivers/cisco_ipsec.py b/neutron/services/vpn/device_drivers/cisco_ipsec.py index 0eb3871d1..7776a3457 100644 --- a/neutron/services/vpn/device_drivers/cisco_ipsec.py +++ b/neutron/services/vpn/device_drivers/cisco_ipsec.py @@ -67,24 +67,26 @@ class CsrUnknownMappingError(exceptions.NeutronException): "attribute %(attr)s of %(resource)s") -class CiscoCsrIPsecVpnDriverApi(n_rpc.RpcProxy): +class CiscoCsrIPsecVpnDriverApi(object): """RPC API for agent to plugin messaging.""" + def __init__(self, topic): + target = messaging.Target(topic=topic, version='1.0') + self.client = n_rpc.get_client(target) + def get_vpn_services_on_host(self, context, host): """Get list of vpnservices on this host. The vpnservices including related ipsec_site_connection, ikepolicy, ipsecpolicy, and Cisco info on this host. """ - return self.call(context, - self.make_msg('get_vpn_services_on_host', - host=host)) + cctxt = self.client.prepare() + return cctxt.call(context, 'get_vpn_services_on_host', host=host) def update_status(self, context, status): """Update status for all VPN services and connections.""" - return self.cast(context, - self.make_msg('update_status', - status=status)) + cctxt = self.client.prepare() + return cctxt.call(context, 'update_status', status=status) @six.add_metaclass(abc.ABCMeta) @@ -117,7 +119,7 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver): self.conn.create_consumer(node_topic, self.endpoints, fanout=False) self.conn.consume_in_threads() self.agent_rpc = ( - CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC, '1.0')) + CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC)) self.periodic_report = loopingcall.FixedIntervalLoopingCall( self.report_status, context) self.periodic_report.start( diff --git a/neutron/services/vpn/device_drivers/ipsec.py b/neutron/services/vpn/device_drivers/ipsec.py index a7579e27d..fb393c1b0 100644 --- a/neutron/services/vpn/device_drivers/ipsec.py +++ b/neutron/services/vpn/device_drivers/ipsec.py @@ -442,9 +442,12 @@ class OpenSwanProcess(BaseSwanProcess): self.connection_status = {} -class IPsecVpnDriverApi(n_rpc.RpcProxy): +class IPsecVpnDriverApi(object): """IPSecVpnDriver RPC api.""" - IPSEC_PLUGIN_VERSION = '1.0' + + def __init__(self, topic): + target = messaging.Target(topic=topic, version='1.0') + self.client = n_rpc.get_client(target) def get_vpn_services_on_host(self, context, host): """Get list of vpnservices. @@ -452,10 +455,8 @@ class IPsecVpnDriverApi(n_rpc.RpcProxy): The vpnservices including related ipsec_site_connection, ikepolicy and ipsecpolicy on this host """ - return self.call(context, - self.make_msg('get_vpn_services_on_host', - host=host), - version=self.IPSEC_PLUGIN_VERSION) + cctxt = self.client.prepare() + return cctxt.call(context, 'get_vpn_services_on_host', host=host) def update_status(self, context, status): """Update local status. @@ -463,10 +464,8 @@ class IPsecVpnDriverApi(n_rpc.RpcProxy): This method call updates status attribute of VPNServices. """ - return self.cast(context, - self.make_msg('update_status', - status=status), - version=self.IPSEC_PLUGIN_VERSION) + cctxt = self.client.prepare() + return cctxt.call(context, 'update_status', status=status) @six.add_metaclass(abc.ABCMeta) @@ -504,7 +503,7 @@ class IPsecDriver(device_drivers.DeviceDriver): self.endpoints = [self] self.conn.create_consumer(node_topic, self.endpoints, fanout=False) self.conn.consume_in_threads() - self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC, '1.0') + self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC) self.process_status_cache_check = loopingcall.FixedIntervalLoopingCall( self.report_status, self.context) self.process_status_cache_check.start( diff --git a/neutron/services/vpn/service_drivers/__init__.py b/neutron/services/vpn/service_drivers/__init__.py index d66e37850..b2f0bfdcb 100644 --- a/neutron/services/vpn/service_drivers/__init__.py +++ b/neutron/services/vpn/service_drivers/__init__.py @@ -15,6 +15,7 @@ import abc +from oslo import messaging import six from neutron.common import rpc as n_rpc @@ -71,13 +72,14 @@ class VpnDriver(object): pass -class BaseIPsecVpnAgentApi(n_rpc.RpcProxy): +class BaseIPsecVpnAgentApi(object): """Base class for IPSec API to agent.""" def __init__(self, topic, default_version, driver): self.topic = topic self.driver = driver - super(BaseIPsecVpnAgentApi, self).__init__(topic, default_version) + target = messaging.Target(topic=topic, version=default_version) + self.client = n_rpc.get_client(target) def _agent_notification(self, context, method, router_id, version=None, **kwargs): @@ -100,10 +102,8 @@ class BaseIPsecVpnAgentApi(n_rpc.RpcProxy): 'host': l3_agent.host, 'method': method, 'args': kwargs}) - self.cast( - context, self.make_msg(method, **kwargs), - version=version, - topic='%s.%s' % (self.topic, l3_agent.host)) + cctxt = self.client.prepare(server=l3_agent.host, version=version) + cctxt.cast(context, method, **kwargs) def vpnservice_updated(self, context, router_id, **kwargs): """Send update event of vpnservices.""" diff --git a/neutron/services/vpn/service_drivers/cisco_ipsec.py b/neutron/services/vpn/service_drivers/cisco_ipsec.py index 2fafc21be..d88ccb8ec 100644 --- a/neutron/services/vpn/service_drivers/cisco_ipsec.py +++ b/neutron/services/vpn/service_drivers/cisco_ipsec.py @@ -107,9 +107,8 @@ class CiscoCsrIPsecVpnAgentApi(service_drivers.BaseIPsecVpnAgentApi, 'method': method, 'args': kwargs, 'router': router_id}) - self.cast(context, self.make_msg(method, **kwargs), - version=version, - topic='%s.%s' % (self.topic, host)) + cctxt = self.client.prepare(server=host, version=version) + cctxt.cast(context, method, **kwargs) class CiscoCsrIPsecVPNDriver(service_drivers.VpnDriver): diff --git a/neutron/tests/unit/services/vpn/service_drivers/test_cisco_ipsec.py b/neutron/tests/unit/services/vpn/service_drivers/test_cisco_ipsec.py index 2c18cc8e1..436a458ab 100644 --- a/neutron/tests/unit/services/vpn/service_drivers/test_cisco_ipsec.py +++ b/neutron/tests/unit/services/vpn/service_drivers/test_cisco_ipsec.py @@ -12,6 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib + import mock from oslo.config import cfg @@ -352,15 +354,20 @@ class TestCiscoIPsecDriver(testlib_api.SqlTestCase): self.context = n_ctx.Context('some_user', 'some_tenant') def _test_update(self, func, args, additional_info=None): - with mock.patch.object(self.driver.agent_rpc, 'cast') as cast: + with contextlib.nested( + mock.patch.object(self.driver.agent_rpc.client, 'cast'), + mock.patch.object(self.driver.agent_rpc.client, 'prepare'), + ) as ( + rpc_mock, prepare_mock + ): + prepare_mock.return_value = self.driver.agent_rpc.client func(self.context, *args) - cast.assert_called_once_with( - self.context, - {'args': additional_info, - 'namespace': None, - 'method': 'vpnservice_updated'}, - version='1.0', - topic='cisco_csr_ipsec_agent.fake_host') + + prepare_args = {'server': 'fake_host', 'version': '1.0'} + prepare_mock.assert_called_once_with(**prepare_args) + + rpc_mock.assert_called_once_with(self.context, 'vpnservice_updated', + reason=mock.ANY) def test_create_ipsec_site_connection(self): self._test_update(self.driver.create_ipsec_site_connection, diff --git a/neutron/tests/unit/services/vpn/service_drivers/test_ipsec.py b/neutron/tests/unit/services/vpn/service_drivers/test_ipsec.py index b2aab3a6b..9f100fd60 100644 --- a/neutron/tests/unit/services/vpn/service_drivers/test_ipsec.py +++ b/neutron/tests/unit/services/vpn/service_drivers/test_ipsec.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +import contextlib + import mock from oslo.config import cfg @@ -240,15 +242,19 @@ class TestIPsecDriver(base.BaseTestCase): def _test_update(self, func, args): ctxt = n_ctx.Context('', 'somebody') - with mock.patch.object(self.driver.agent_rpc, 'cast') as cast: + with contextlib.nested( + mock.patch.object(self.driver.agent_rpc.client, 'cast'), + mock.patch.object(self.driver.agent_rpc.client, 'prepare'), + ) as ( + rpc_mock, prepare_mock + ): + prepare_mock.return_value = self.driver.agent_rpc.client func(ctxt, *args) - cast.assert_called_once_with( - ctxt, - {'args': {}, - 'namespace': None, - 'method': 'vpnservice_updated'}, - version='1.0', - topic='ipsec_agent.fake_host') + + prepare_args = {'server': 'fake_host', 'version': '1.0'} + prepare_mock.assert_called_once_with(**prepare_args) + + rpc_mock.assert_called_once_with(ctxt, 'vpnservice_updated') def test_create_ipsec_site_connection(self): self._test_update(self.driver.create_ipsec_site_connection,