]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Add rpc agent api and callbacks to resources_rpc
authorJakub Libosvar <libosvar@redhat.com>
Mon, 3 Aug 2015 15:48:02 +0000 (15:48 +0000)
committerIhar Hrachyshka <ihrachys@redhat.com>
Sat, 8 Aug 2015 08:27:12 +0000 (10:27 +0200)
This patch also refactors existing test cases for server side rpc
classes in order to test code in generic manner. Finally, we remove
notify() and get_resource() from consumers or producers modules
respectively in order to remove circular dependencies. The notificitaion
driver will send events directly using RPC api class instead of going
through registry.

Co-Authored-By: Miguel Angel Ajo <mangelajo@redhat.com>
Partially-Implements: blueprint quantum-qos-api
Change-Id: I9120748505856acc7aa8d15d896697dd8487bb02

neutron/api/rpc/handlers/resources_rpc.py
neutron/common/topics.py
neutron/objects/base.py
neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py
neutron/tests/unit/objects/test_base.py

index eed2dfde0763aeeda900885805dac5e9ba589326..dd20eb3c60bbb3db11b5feab1597815d132e4a13 100755 (executable)
@@ -17,12 +17,14 @@ from oslo_log import helpers as log_helpers
 from oslo_log import log as logging
 import oslo_messaging
 
-from neutron.api.rpc.callbacks.producer import registry
+from neutron.api.rpc.callbacks.consumer import registry as cons_registry
+from neutron.api.rpc.callbacks.producer import registry as prod_registry
 from neutron.api.rpc.callbacks import resources
 from neutron.common import constants
 from neutron.common import exceptions
 from neutron.common import rpc as n_rpc
 from neutron.common import topics
+from neutron.objects import base as obj_base
 
 
 LOG = logging.getLogger(__name__)
@@ -83,9 +85,7 @@ class ResourcesPullRpcApi(object):
             raise ResourceNotFound(resource_type=resource_type,
                                    resource_id=resource_id)
 
-        obj = resource_type_cls.obj_from_primitive(primitive)
-        obj.obj_reset_changes()
-        return obj
+        return resource_type_cls.clean_obj_from_primitive(primitive)
 
 
 class ResourcesPullRpcCallback(object):
@@ -103,11 +103,73 @@ class ResourcesPullRpcCallback(object):
         version='1.0', namespace=constants.RPC_NAMESPACE_RESOURCES)
 
     def pull(self, context, resource_type, version, resource_id):
-        _validate_resource_type(resource_type)
-
-        obj = registry.pull(resource_type, resource_id, context=context)
+        obj = prod_registry.pull(resource_type, resource_id, context=context)
         if obj:
-            # don't request a backport for the latest known version
+            #TODO(QoS): Remove in the future with new version of
+            #           versionedobjects containing
+            #           https://review.openstack.org/#/c/207998/
             if version == obj.VERSION:
                 version = None
             return obj.obj_to_primitive(target_version=version)
+
+
+def _object_topic(obj):
+    resource_type = resources.get_resource_type(obj)
+    return topics.RESOURCE_TOPIC_PATTERN % {
+        'resource_type': resource_type, 'version': obj.VERSION}
+
+
+class ResourcesPushRpcApi(object):
+    """Plugin-side RPC for plugin-to-agents interaction.
+
+    This interface is designed to push versioned object updates to interested
+    agents using fanout topics.
+
+    This class implements the caller side of an rpc interface.  The receiver
+    side can be found below: ResourcesPushRpcCallback.
+    """
+
+    def __init__(self):
+        target = oslo_messaging.Target(
+            version='1.0',
+            namespace=constants.RPC_NAMESPACE_RESOURCES)
+        self.client = n_rpc.get_client(target)
+
+    def _prepare_object_fanout_context(self, obj):
+        """Prepare fanout context, one topic per object type."""
+        obj_topic = _object_topic(obj)
+        return self.client.prepare(fanout=True, topic=obj_topic)
+
+    @log_helpers.log_method_call
+    def push(self, context, resource, event_type):
+        resource_type = resources.get_resource_type(resource)
+        _validate_resource_type(resource_type)
+        cctxt = self._prepare_object_fanout_context(resource)
+        #TODO(QoS): Push notifications for every known version once we have
+        #           multiple of those
+        dehydrated_resource = resource.obj_to_primitive()
+        cctxt.cast(context, 'push',
+                   resource=dehydrated_resource,
+                   event_type=event_type)
+
+
+class ResourcesPushRpcCallback(object):
+    """Agent-side RPC for plugin-to-agents interaction.
+
+    This class implements the receiver for notification about versioned objects
+    resource updates used by neutron.api.rpc.callbacks. You can find the
+    caller side in ResourcesPushRpcApi.
+    """
+    # History
+    #   1.0 Initial version
+
+    target = oslo_messaging.Target(version='1.0',
+                                   namespace=constants.RPC_NAMESPACE_RESOURCES)
+
+    def push(self, context, resource, event_type):
+        resource_obj = obj_base.NeutronObject.clean_obj_from_primitive(
+            resource)
+        LOG.debug("Resources notification (%(event_type)s): %(resource)s",
+                  {'event_type': event_type, 'resource': repr(resource_obj)})
+        resource_type = resources.get_resource_type(resource_obj)
+        cons_registry.push(resource_type, resource_obj, event_type)
index 18acbcb7bacc1a3101eafec94b57c9ad6333ff56..d0cc55a57e3b3d5c24e046617a9716a0a7c01bac 100644 (file)
@@ -38,6 +38,8 @@ DHCP_AGENT = 'dhcp_agent'
 METERING_AGENT = 'metering_agent'
 LOADBALANCER_AGENT = 'n-lbaas_agent'
 
+RESOURCE_TOPIC_PATTERN = "neutron-vo-%(resource_type)s-%(version)s"
+
 
 def get_topic_name(prefix, table, operation, host=None):
     """Create a topic name.
index f10966106baa61e5dd5b371e357659ea1494cd52..230f53dcdeeab76cd05c8609663bb11d74f1c5a1 100644 (file)
@@ -48,6 +48,12 @@ class NeutronObject(obj_base.VersionedObject,
     def to_dict(self):
         return dict(self.items())
 
+    @classmethod
+    def clean_obj_from_primitive(cls, primitive, context=None):
+        obj = cls.obj_from_primitive(primitive, context)
+        obj.obj_reset_changes()
+        return obj
+
     @classmethod
     def get_by_id(cls, context, id):
         raise NotImplementedError()
index f7b52201f6f509e590a75b1bebfda7b0b71802e2..9a6ccd4a6f077968ab09a217ff3516eaa578592e 100755 (executable)
 
 import mock
 from oslo_utils import uuidutils
+from oslo_versionedobjects import base as obj_base
+from oslo_versionedobjects import fields as obj_fields
+import testtools
 
 from neutron.api.rpc.callbacks import resources
 from neutron.api.rpc.handlers import resources_rpc
+from neutron.common import topics
 from neutron import context
-from neutron.objects.qos import policy
+from neutron.objects import base as objects_base
 from neutron.tests import base
 
 
+@obj_base.VersionedObjectRegistry.register
+class FakeResource(objects_base.NeutronObject):
+
+    fields = {
+        'id': obj_fields.UUIDField(),
+        'field': obj_fields.StringField()
+    }
+
+    @classmethod
+    def get_objects(cls, context, **kwargs):
+        return list()
+
+
 class ResourcesRpcBaseTestCase(base.BaseTestCase):
 
     def setUp(self):
         super(ResourcesRpcBaseTestCase, self).setUp()
         self.context = context.get_admin_context()
 
-    def _create_test_policy_dict(self):
+    def _create_test_dict(self):
         return {'id': uuidutils.generate_uuid(),
-                'tenant_id': uuidutils.generate_uuid(),
-                'name': 'test',
-                'description': 'test',
-                'shared': False}
+                'field': 'foo'}
+
+    def _create_test_resource(self, **kwargs):
+        resource = FakeResource(self.context, **kwargs)
+        resource.obj_reset_changes()
+        return resource
+
 
-    def _create_test_policy(self, policy_dict):
-        policy_obj = policy.QosPolicy(self.context, **policy_dict)
-        policy_obj.obj_reset_changes()
-        return policy_obj
+class _ValidateResourceTypeTestCase(base.BaseTestCase):
+    def setUp(self):
+        super(_ValidateResourceTypeTestCase, self).setUp()
+        self.is_valid_mock = mock.patch.object(
+            resources_rpc.resources, 'is_valid_resource_type').start()
+
+    def test_valid_type(self):
+        self.is_valid_mock.return_value = True
+        resources_rpc._validate_resource_type('foo')
+
+    def test_invalid_type(self):
+        self.is_valid_mock.return_value = False
+        with testtools.ExpectedException(
+                resources_rpc.InvalidResourceTypeClass):
+            resources_rpc._validate_resource_type('foo')
 
 
 class ResourcesPullRpcApiTestCase(ResourcesRpcBaseTestCase):
 
     def setUp(self):
         super(ResourcesPullRpcApiTestCase, self).setUp()
-        self.client_p = mock.patch.object(resources_rpc.n_rpc, 'get_client')
-        self.client = self.client_p.start()
+        mock.patch.object(resources_rpc.n_rpc, 'get_client').start()
+        mock.patch.object(resources_rpc, '_validate_resource_type').start()
+        mock.patch('neutron.api.rpc.callbacks.resources.get_resource_cls',
+                   return_value=FakeResource).start()
         self.rpc = resources_rpc.ResourcesPullRpcApi()
-        self.mock_cctxt = self.rpc.client.prepare.return_value
+        self.cctxt_mock = self.rpc.client.prepare.return_value
 
     def test_is_singleton(self):
         self.assertEqual(id(self.rpc),
                          id(resources_rpc.ResourcesPullRpcApi()))
 
     def test_pull(self):
-        policy_dict = self._create_test_policy_dict()
-        expected_policy_obj = self._create_test_policy(policy_dict)
-        qos_policy_id = policy_dict['id']
-        self.mock_cctxt.call.return_value = (
-            expected_policy_obj.obj_to_primitive())
-        pull_result = self.rpc.pull(
-            self.context, resources.QOS_POLICY, qos_policy_id)
-        self.mock_cctxt.call.assert_called_once_with(
-            self.context, 'pull', resource_type=resources.QOS_POLICY,
-            version=policy.QosPolicy.VERSION, resource_id=qos_policy_id)
-        self.assertEqual(expected_policy_obj, pull_result)
-
-    def test_pull_invalid_resource_type_cls(self):
-        self.assertRaises(
-            resources_rpc.InvalidResourceTypeClass, self.rpc.pull,
-            self.context, 'foo_type', 'foo_id')
+        resource_dict = self._create_test_dict()
+        expected_obj = self._create_test_resource(**resource_dict)
+        resource_id = resource_dict['id']
+        self.cctxt_mock.call.return_value = expected_obj.obj_to_primitive()
+
+        result = self.rpc.pull(
+            self.context, FakeResource.obj_name(), resource_id)
+
+        self.cctxt_mock.call.assert_called_once_with(
+            self.context, 'pull', resource_type='FakeResource',
+            version=FakeResource.VERSION, resource_id=resource_id)
+        self.assertEqual(expected_obj, result)
 
     def test_pull_resource_not_found(self):
-        policy_dict = self._create_test_policy_dict()
-        qos_policy_id = policy_dict['id']
-        self.mock_cctxt.call.return_value = None
-        self.assertRaises(
-            resources_rpc.ResourceNotFound, self.rpc.pull,
-            self.context, resources.QOS_POLICY, qos_policy_id)
+        resource_dict = self._create_test_dict()
+        resource_id = resource_dict['id']
+        self.cctxt_mock.call.return_value = None
+        with testtools.ExpectedException(resources_rpc.ResourceNotFound):
+            self.rpc.pull(self.context, FakeResource.obj_name(),
+                          resource_id)
 
 
 class ResourcesPullRpcCallbackTestCase(ResourcesRpcBaseTestCase):
@@ -87,45 +116,91 @@ class ResourcesPullRpcCallbackTestCase(ResourcesRpcBaseTestCase):
     def setUp(self):
         super(ResourcesPullRpcCallbackTestCase, self).setUp()
         self.callbacks = resources_rpc.ResourcesPullRpcCallback()
+        self.resource_dict = self._create_test_dict()
+        self.resource_obj = self._create_test_resource(**self.resource_dict)
 
     def test_pull(self):
-        policy_dict = self._create_test_policy_dict()
-        policy_obj = self._create_test_policy(policy_dict)
-        qos_policy_id = policy_dict['id']
-        with mock.patch.object(resources_rpc.registry, 'pull',
-                               return_value=policy_obj) as registry_mock:
+        with mock.patch.object(
+                resources_rpc.prod_registry, 'pull',
+                return_value=self.resource_obj) as registry_mock:
             primitive = self.callbacks.pull(
-                self.context, resource_type=resources.QOS_POLICY,
-                version=policy.QosPolicy.VERSION,
-                resource_id=qos_policy_id)
-            registry_mock.assert_called_once_with(
-                resources.QOS_POLICY,
-                qos_policy_id, context=self.context)
-        self.assertEqual(policy_dict, primitive['versioned_object.data'])
-        self.assertEqual(policy_obj.obj_to_primitive(), primitive)
-
-    @mock.patch.object(policy.QosPolicy, 'obj_to_primitive')
+                self.context, resource_type=FakeResource.obj_name(),
+                version=FakeResource.VERSION,
+                resource_id=self.resource_dict['id'])
+        registry_mock.assert_called_once_with(
+            'FakeResource', self.resource_dict['id'], context=self.context)
+        self.assertEqual(self.resource_dict,
+                         primitive['versioned_object.data'])
+        self.assertEqual(self.resource_obj.obj_to_primitive(), primitive)
+
+    @mock.patch.object(FakeResource, 'obj_to_primitive')
     def test_pull_no_backport_for_latest_version(self, to_prim_mock):
-        policy_dict = self._create_test_policy_dict()
-        policy_obj = self._create_test_policy(policy_dict)
-        qos_policy_id = policy_dict['id']
-        with mock.patch.object(resources_rpc.registry, 'pull',
-                               return_value=policy_obj):
+        with mock.patch.object(resources_rpc.prod_registry, 'pull',
+                               return_value=self.resource_obj):
             self.callbacks.pull(
-                self.context, resource_type=resources.QOS_POLICY,
-                version=policy.QosPolicy.VERSION,
-                resource_id=qos_policy_id)
-            to_prim_mock.assert_called_with(target_version=None)
+                self.context, resource_type=FakeResource.obj_name(),
+                version=FakeResource.VERSION,
+                resource_id=self.resource_obj.id)
+        to_prim_mock.assert_called_with(target_version=None)
 
-    @mock.patch.object(policy.QosPolicy, 'obj_to_primitive')
+    @mock.patch.object(FakeResource, 'obj_to_primitive')
     def test_pull_backports_to_older_version(self, to_prim_mock):
-        policy_dict = self._create_test_policy_dict()
-        policy_obj = self._create_test_policy(policy_dict)
-        qos_policy_id = policy_dict['id']
-        with mock.patch.object(resources_rpc.registry, 'pull',
-                               return_value=policy_obj):
+        with mock.patch.object(resources_rpc.prod_registry, 'pull',
+                               return_value=self.resource_obj):
             self.callbacks.pull(
-                self.context, resource_type=resources.QOS_POLICY,
+                self.context, resource_type=FakeResource.obj_name(),
                 version='0.9',  # less than initial version 1.0
-                resource_id=qos_policy_id)
+                resource_id=self.resource_dict['id'])
             to_prim_mock.assert_called_with(target_version='0.9')
+
+
+class ResourcesPushRpcApiTestCase(ResourcesRpcBaseTestCase):
+
+    def setUp(self):
+        super(ResourcesPushRpcApiTestCase, self).setUp()
+        mock.patch.object(resources_rpc.n_rpc, 'get_client').start()
+        mock.patch.object(resources_rpc, '_validate_resource_type').start()
+        self.rpc = resources_rpc.ResourcesPushRpcApi()
+        self.cctxt_mock = self.rpc.client.prepare.return_value
+        resource_dict = self._create_test_dict()
+        self.resource_obj = self._create_test_resource(**resource_dict)
+
+    def test__prepare_object_fanout_context(self):
+        expected_topic = topics.RESOURCE_TOPIC_PATTERN % {
+            'resource_type': resources.get_resource_type(self.resource_obj),
+            'version': self.resource_obj.VERSION}
+
+        observed = self.rpc._prepare_object_fanout_context(self.resource_obj)
+
+        self.rpc.client.prepare.assert_called_once_with(
+            fanout=True, topic=expected_topic)
+        self.assertEqual(self.cctxt_mock, observed)
+
+    def test_push(self):
+        self.rpc.push(
+            self.context, self.resource_obj, 'TYPE')
+
+        self.cctxt_mock.cast.assert_called_once_with(
+            self.context, 'push',
+            resource=self.resource_obj.obj_to_primitive(),
+            event_type='TYPE')
+
+
+class ResourcesPushRpcCallbackTestCase(ResourcesRpcBaseTestCase):
+
+    def setUp(self):
+        super(ResourcesPushRpcCallbackTestCase, self).setUp()
+        mock.patch.object(resources_rpc, '_validate_resource_type').start()
+        mock.patch.object(
+            resources_rpc.resources,
+            'get_resource_cls', return_value=FakeResource).start()
+        resource_dict = self._create_test_dict()
+        self.resource_obj = self._create_test_resource(**resource_dict)
+        self.resource_prim = self.resource_obj.obj_to_primitive()
+        self.callbacks = resources_rpc.ResourcesPushRpcCallback()
+
+    @mock.patch.object(resources_rpc.cons_registry, 'push')
+    def test_push(self, reg_push_mock):
+        self.callbacks.push(self.context, self.resource_prim, 'TYPE')
+        reg_push_mock.assert_called_once_with(self.resource_obj.obj_name(),
+                                              self.resource_obj, 'TYPE')
index 84bdb13be233ed50b9cc3db16e91e46409b6f74e..14e8b1d1733821aeaeb3d718d383412d3ccf49d8 100644 (file)
@@ -26,6 +26,8 @@ from neutron.tests import base as test_base
 
 
 SQLALCHEMY_COMMIT = 'sqlalchemy.engine.Connection._commit_impl'
+OBJECTS_BASE_OBJ_FROM_PRIMITIVE = ('oslo_versionedobjects.base.'
+                                   'VersionedObject.obj_from_primitive')
 
 
 class FakeModel(object):
@@ -214,6 +216,13 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
         delete_mock.assert_called_once_with(
             self.context, self._test_class.db_model, self.db_obj['id'])
 
+    @mock.patch(OBJECTS_BASE_OBJ_FROM_PRIMITIVE)
+    def test_clean_obj_from_primitive(self, get_prim_m):
+        expected_obj = get_prim_m.return_value
+        observed_obj = self._test_class.clean_obj_from_primitive('foo', 'bar')
+        self.assertIs(expected_obj, observed_obj)
+        self.assertTrue(observed_obj.obj_reset_changes.called)
+
 
 class BaseDbObjectTestCase(_BaseObjectTestCase):