From: Jakub Libosvar Date: Mon, 3 Aug 2015 15:48:02 +0000 (+0000) Subject: Add rpc agent api and callbacks to resources_rpc X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=ac3e1e1256402ab014902239a93ecceff76637d1;p=openstack-build%2Fneutron-build.git Add rpc agent api and callbacks to resources_rpc This patch also refactors existing test cases for server side rpc classes in order to test code in generic manner. Finally, we remove notify() and get_resource() from consumers or producers modules respectively in order to remove circular dependencies. The notificitaion driver will send events directly using RPC api class instead of going through registry. Co-Authored-By: Miguel Angel Ajo Partially-Implements: blueprint quantum-qos-api Change-Id: I9120748505856acc7aa8d15d896697dd8487bb02 --- diff --git a/neutron/api/rpc/handlers/resources_rpc.py b/neutron/api/rpc/handlers/resources_rpc.py index eed2dfde0..dd20eb3c6 100755 --- a/neutron/api/rpc/handlers/resources_rpc.py +++ b/neutron/api/rpc/handlers/resources_rpc.py @@ -17,12 +17,14 @@ from oslo_log import helpers as log_helpers from oslo_log import log as logging import oslo_messaging -from neutron.api.rpc.callbacks.producer import registry +from neutron.api.rpc.callbacks.consumer import registry as cons_registry +from neutron.api.rpc.callbacks.producer import registry as prod_registry from neutron.api.rpc.callbacks import resources from neutron.common import constants from neutron.common import exceptions from neutron.common import rpc as n_rpc from neutron.common import topics +from neutron.objects import base as obj_base LOG = logging.getLogger(__name__) @@ -83,9 +85,7 @@ class ResourcesPullRpcApi(object): raise ResourceNotFound(resource_type=resource_type, resource_id=resource_id) - obj = resource_type_cls.obj_from_primitive(primitive) - obj.obj_reset_changes() - return obj + return resource_type_cls.clean_obj_from_primitive(primitive) class ResourcesPullRpcCallback(object): @@ -103,11 +103,73 @@ class ResourcesPullRpcCallback(object): version='1.0', namespace=constants.RPC_NAMESPACE_RESOURCES) def pull(self, context, resource_type, version, resource_id): - _validate_resource_type(resource_type) - - obj = registry.pull(resource_type, resource_id, context=context) + obj = prod_registry.pull(resource_type, resource_id, context=context) if obj: - # don't request a backport for the latest known version + #TODO(QoS): Remove in the future with new version of + # versionedobjects containing + # https://review.openstack.org/#/c/207998/ if version == obj.VERSION: version = None return obj.obj_to_primitive(target_version=version) + + +def _object_topic(obj): + resource_type = resources.get_resource_type(obj) + return topics.RESOURCE_TOPIC_PATTERN % { + 'resource_type': resource_type, 'version': obj.VERSION} + + +class ResourcesPushRpcApi(object): + """Plugin-side RPC for plugin-to-agents interaction. + + This interface is designed to push versioned object updates to interested + agents using fanout topics. + + This class implements the caller side of an rpc interface. The receiver + side can be found below: ResourcesPushRpcCallback. + """ + + def __init__(self): + target = oslo_messaging.Target( + version='1.0', + namespace=constants.RPC_NAMESPACE_RESOURCES) + self.client = n_rpc.get_client(target) + + def _prepare_object_fanout_context(self, obj): + """Prepare fanout context, one topic per object type.""" + obj_topic = _object_topic(obj) + return self.client.prepare(fanout=True, topic=obj_topic) + + @log_helpers.log_method_call + def push(self, context, resource, event_type): + resource_type = resources.get_resource_type(resource) + _validate_resource_type(resource_type) + cctxt = self._prepare_object_fanout_context(resource) + #TODO(QoS): Push notifications for every known version once we have + # multiple of those + dehydrated_resource = resource.obj_to_primitive() + cctxt.cast(context, 'push', + resource=dehydrated_resource, + event_type=event_type) + + +class ResourcesPushRpcCallback(object): + """Agent-side RPC for plugin-to-agents interaction. + + This class implements the receiver for notification about versioned objects + resource updates used by neutron.api.rpc.callbacks. You can find the + caller side in ResourcesPushRpcApi. + """ + # History + # 1.0 Initial version + + target = oslo_messaging.Target(version='1.0', + namespace=constants.RPC_NAMESPACE_RESOURCES) + + def push(self, context, resource, event_type): + resource_obj = obj_base.NeutronObject.clean_obj_from_primitive( + resource) + LOG.debug("Resources notification (%(event_type)s): %(resource)s", + {'event_type': event_type, 'resource': repr(resource_obj)}) + resource_type = resources.get_resource_type(resource_obj) + cons_registry.push(resource_type, resource_obj, event_type) diff --git a/neutron/common/topics.py b/neutron/common/topics.py index 18acbcb7b..d0cc55a57 100644 --- a/neutron/common/topics.py +++ b/neutron/common/topics.py @@ -38,6 +38,8 @@ DHCP_AGENT = 'dhcp_agent' METERING_AGENT = 'metering_agent' LOADBALANCER_AGENT = 'n-lbaas_agent' +RESOURCE_TOPIC_PATTERN = "neutron-vo-%(resource_type)s-%(version)s" + def get_topic_name(prefix, table, operation, host=None): """Create a topic name. diff --git a/neutron/objects/base.py b/neutron/objects/base.py index f10966106..230f53dcd 100644 --- a/neutron/objects/base.py +++ b/neutron/objects/base.py @@ -48,6 +48,12 @@ class NeutronObject(obj_base.VersionedObject, def to_dict(self): return dict(self.items()) + @classmethod + def clean_obj_from_primitive(cls, primitive, context=None): + obj = cls.obj_from_primitive(primitive, context) + obj.obj_reset_changes() + return obj + @classmethod def get_by_id(cls, context, id): raise NotImplementedError() diff --git a/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py b/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py index f7b52201f..9a6ccd4a6 100755 --- a/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py +++ b/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py @@ -15,71 +15,100 @@ import mock from oslo_utils import uuidutils +from oslo_versionedobjects import base as obj_base +from oslo_versionedobjects import fields as obj_fields +import testtools from neutron.api.rpc.callbacks import resources from neutron.api.rpc.handlers import resources_rpc +from neutron.common import topics from neutron import context -from neutron.objects.qos import policy +from neutron.objects import base as objects_base from neutron.tests import base +@obj_base.VersionedObjectRegistry.register +class FakeResource(objects_base.NeutronObject): + + fields = { + 'id': obj_fields.UUIDField(), + 'field': obj_fields.StringField() + } + + @classmethod + def get_objects(cls, context, **kwargs): + return list() + + class ResourcesRpcBaseTestCase(base.BaseTestCase): def setUp(self): super(ResourcesRpcBaseTestCase, self).setUp() self.context = context.get_admin_context() - def _create_test_policy_dict(self): + def _create_test_dict(self): return {'id': uuidutils.generate_uuid(), - 'tenant_id': uuidutils.generate_uuid(), - 'name': 'test', - 'description': 'test', - 'shared': False} + 'field': 'foo'} + + def _create_test_resource(self, **kwargs): + resource = FakeResource(self.context, **kwargs) + resource.obj_reset_changes() + return resource + - def _create_test_policy(self, policy_dict): - policy_obj = policy.QosPolicy(self.context, **policy_dict) - policy_obj.obj_reset_changes() - return policy_obj +class _ValidateResourceTypeTestCase(base.BaseTestCase): + def setUp(self): + super(_ValidateResourceTypeTestCase, self).setUp() + self.is_valid_mock = mock.patch.object( + resources_rpc.resources, 'is_valid_resource_type').start() + + def test_valid_type(self): + self.is_valid_mock.return_value = True + resources_rpc._validate_resource_type('foo') + + def test_invalid_type(self): + self.is_valid_mock.return_value = False + with testtools.ExpectedException( + resources_rpc.InvalidResourceTypeClass): + resources_rpc._validate_resource_type('foo') class ResourcesPullRpcApiTestCase(ResourcesRpcBaseTestCase): def setUp(self): super(ResourcesPullRpcApiTestCase, self).setUp() - self.client_p = mock.patch.object(resources_rpc.n_rpc, 'get_client') - self.client = self.client_p.start() + mock.patch.object(resources_rpc.n_rpc, 'get_client').start() + mock.patch.object(resources_rpc, '_validate_resource_type').start() + mock.patch('neutron.api.rpc.callbacks.resources.get_resource_cls', + return_value=FakeResource).start() self.rpc = resources_rpc.ResourcesPullRpcApi() - self.mock_cctxt = self.rpc.client.prepare.return_value + self.cctxt_mock = self.rpc.client.prepare.return_value def test_is_singleton(self): self.assertEqual(id(self.rpc), id(resources_rpc.ResourcesPullRpcApi())) def test_pull(self): - policy_dict = self._create_test_policy_dict() - expected_policy_obj = self._create_test_policy(policy_dict) - qos_policy_id = policy_dict['id'] - self.mock_cctxt.call.return_value = ( - expected_policy_obj.obj_to_primitive()) - pull_result = self.rpc.pull( - self.context, resources.QOS_POLICY, qos_policy_id) - self.mock_cctxt.call.assert_called_once_with( - self.context, 'pull', resource_type=resources.QOS_POLICY, - version=policy.QosPolicy.VERSION, resource_id=qos_policy_id) - self.assertEqual(expected_policy_obj, pull_result) - - def test_pull_invalid_resource_type_cls(self): - self.assertRaises( - resources_rpc.InvalidResourceTypeClass, self.rpc.pull, - self.context, 'foo_type', 'foo_id') + resource_dict = self._create_test_dict() + expected_obj = self._create_test_resource(**resource_dict) + resource_id = resource_dict['id'] + self.cctxt_mock.call.return_value = expected_obj.obj_to_primitive() + + result = self.rpc.pull( + self.context, FakeResource.obj_name(), resource_id) + + self.cctxt_mock.call.assert_called_once_with( + self.context, 'pull', resource_type='FakeResource', + version=FakeResource.VERSION, resource_id=resource_id) + self.assertEqual(expected_obj, result) def test_pull_resource_not_found(self): - policy_dict = self._create_test_policy_dict() - qos_policy_id = policy_dict['id'] - self.mock_cctxt.call.return_value = None - self.assertRaises( - resources_rpc.ResourceNotFound, self.rpc.pull, - self.context, resources.QOS_POLICY, qos_policy_id) + resource_dict = self._create_test_dict() + resource_id = resource_dict['id'] + self.cctxt_mock.call.return_value = None + with testtools.ExpectedException(resources_rpc.ResourceNotFound): + self.rpc.pull(self.context, FakeResource.obj_name(), + resource_id) class ResourcesPullRpcCallbackTestCase(ResourcesRpcBaseTestCase): @@ -87,45 +116,91 @@ class ResourcesPullRpcCallbackTestCase(ResourcesRpcBaseTestCase): def setUp(self): super(ResourcesPullRpcCallbackTestCase, self).setUp() self.callbacks = resources_rpc.ResourcesPullRpcCallback() + self.resource_dict = self._create_test_dict() + self.resource_obj = self._create_test_resource(**self.resource_dict) def test_pull(self): - policy_dict = self._create_test_policy_dict() - policy_obj = self._create_test_policy(policy_dict) - qos_policy_id = policy_dict['id'] - with mock.patch.object(resources_rpc.registry, 'pull', - return_value=policy_obj) as registry_mock: + with mock.patch.object( + resources_rpc.prod_registry, 'pull', + return_value=self.resource_obj) as registry_mock: primitive = self.callbacks.pull( - self.context, resource_type=resources.QOS_POLICY, - version=policy.QosPolicy.VERSION, - resource_id=qos_policy_id) - registry_mock.assert_called_once_with( - resources.QOS_POLICY, - qos_policy_id, context=self.context) - self.assertEqual(policy_dict, primitive['versioned_object.data']) - self.assertEqual(policy_obj.obj_to_primitive(), primitive) - - @mock.patch.object(policy.QosPolicy, 'obj_to_primitive') + self.context, resource_type=FakeResource.obj_name(), + version=FakeResource.VERSION, + resource_id=self.resource_dict['id']) + registry_mock.assert_called_once_with( + 'FakeResource', self.resource_dict['id'], context=self.context) + self.assertEqual(self.resource_dict, + primitive['versioned_object.data']) + self.assertEqual(self.resource_obj.obj_to_primitive(), primitive) + + @mock.patch.object(FakeResource, 'obj_to_primitive') def test_pull_no_backport_for_latest_version(self, to_prim_mock): - policy_dict = self._create_test_policy_dict() - policy_obj = self._create_test_policy(policy_dict) - qos_policy_id = policy_dict['id'] - with mock.patch.object(resources_rpc.registry, 'pull', - return_value=policy_obj): + with mock.patch.object(resources_rpc.prod_registry, 'pull', + return_value=self.resource_obj): self.callbacks.pull( - self.context, resource_type=resources.QOS_POLICY, - version=policy.QosPolicy.VERSION, - resource_id=qos_policy_id) - to_prim_mock.assert_called_with(target_version=None) + self.context, resource_type=FakeResource.obj_name(), + version=FakeResource.VERSION, + resource_id=self.resource_obj.id) + to_prim_mock.assert_called_with(target_version=None) - @mock.patch.object(policy.QosPolicy, 'obj_to_primitive') + @mock.patch.object(FakeResource, 'obj_to_primitive') def test_pull_backports_to_older_version(self, to_prim_mock): - policy_dict = self._create_test_policy_dict() - policy_obj = self._create_test_policy(policy_dict) - qos_policy_id = policy_dict['id'] - with mock.patch.object(resources_rpc.registry, 'pull', - return_value=policy_obj): + with mock.patch.object(resources_rpc.prod_registry, 'pull', + return_value=self.resource_obj): self.callbacks.pull( - self.context, resource_type=resources.QOS_POLICY, + self.context, resource_type=FakeResource.obj_name(), version='0.9', # less than initial version 1.0 - resource_id=qos_policy_id) + resource_id=self.resource_dict['id']) to_prim_mock.assert_called_with(target_version='0.9') + + +class ResourcesPushRpcApiTestCase(ResourcesRpcBaseTestCase): + + def setUp(self): + super(ResourcesPushRpcApiTestCase, self).setUp() + mock.patch.object(resources_rpc.n_rpc, 'get_client').start() + mock.patch.object(resources_rpc, '_validate_resource_type').start() + self.rpc = resources_rpc.ResourcesPushRpcApi() + self.cctxt_mock = self.rpc.client.prepare.return_value + resource_dict = self._create_test_dict() + self.resource_obj = self._create_test_resource(**resource_dict) + + def test__prepare_object_fanout_context(self): + expected_topic = topics.RESOURCE_TOPIC_PATTERN % { + 'resource_type': resources.get_resource_type(self.resource_obj), + 'version': self.resource_obj.VERSION} + + observed = self.rpc._prepare_object_fanout_context(self.resource_obj) + + self.rpc.client.prepare.assert_called_once_with( + fanout=True, topic=expected_topic) + self.assertEqual(self.cctxt_mock, observed) + + def test_push(self): + self.rpc.push( + self.context, self.resource_obj, 'TYPE') + + self.cctxt_mock.cast.assert_called_once_with( + self.context, 'push', + resource=self.resource_obj.obj_to_primitive(), + event_type='TYPE') + + +class ResourcesPushRpcCallbackTestCase(ResourcesRpcBaseTestCase): + + def setUp(self): + super(ResourcesPushRpcCallbackTestCase, self).setUp() + mock.patch.object(resources_rpc, '_validate_resource_type').start() + mock.patch.object( + resources_rpc.resources, + 'get_resource_cls', return_value=FakeResource).start() + resource_dict = self._create_test_dict() + self.resource_obj = self._create_test_resource(**resource_dict) + self.resource_prim = self.resource_obj.obj_to_primitive() + self.callbacks = resources_rpc.ResourcesPushRpcCallback() + + @mock.patch.object(resources_rpc.cons_registry, 'push') + def test_push(self, reg_push_mock): + self.callbacks.push(self.context, self.resource_prim, 'TYPE') + reg_push_mock.assert_called_once_with(self.resource_obj.obj_name(), + self.resource_obj, 'TYPE') diff --git a/neutron/tests/unit/objects/test_base.py b/neutron/tests/unit/objects/test_base.py index 84bdb13be..14e8b1d17 100644 --- a/neutron/tests/unit/objects/test_base.py +++ b/neutron/tests/unit/objects/test_base.py @@ -26,6 +26,8 @@ from neutron.tests import base as test_base SQLALCHEMY_COMMIT = 'sqlalchemy.engine.Connection._commit_impl' +OBJECTS_BASE_OBJ_FROM_PRIMITIVE = ('oslo_versionedobjects.base.' + 'VersionedObject.obj_from_primitive') class FakeModel(object): @@ -214,6 +216,13 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase): delete_mock.assert_called_once_with( self.context, self._test_class.db_model, self.db_obj['id']) + @mock.patch(OBJECTS_BASE_OBJ_FROM_PRIMITIVE) + def test_clean_obj_from_primitive(self, get_prim_m): + expected_obj = get_prim_m.return_value + observed_obj = self._test_class.clean_obj_from_primitive('foo', 'bar') + self.assertIs(expected_obj, observed_obj) + self.assertTrue(observed_obj.obj_reset_changes.called) + class BaseDbObjectTestCase(_BaseObjectTestCase):