from oslo_log import log as logging
import oslo_messaging
-from neutron.api.rpc.callbacks.producer import registry
+from neutron.api.rpc.callbacks.consumer import registry as cons_registry
+from neutron.api.rpc.callbacks.producer import registry as prod_registry
from neutron.api.rpc.callbacks import resources
from neutron.common import constants
from neutron.common import exceptions
from neutron.common import rpc as n_rpc
from neutron.common import topics
+from neutron.objects import base as obj_base
LOG = logging.getLogger(__name__)
raise ResourceNotFound(resource_type=resource_type,
resource_id=resource_id)
- obj = resource_type_cls.obj_from_primitive(primitive)
- obj.obj_reset_changes()
- return obj
+ return resource_type_cls.clean_obj_from_primitive(primitive)
class ResourcesPullRpcCallback(object):
version='1.0', namespace=constants.RPC_NAMESPACE_RESOURCES)
def pull(self, context, resource_type, version, resource_id):
- _validate_resource_type(resource_type)
-
- obj = registry.pull(resource_type, resource_id, context=context)
+ obj = prod_registry.pull(resource_type, resource_id, context=context)
if obj:
- # don't request a backport for the latest known version
+ #TODO(QoS): Remove in the future with new version of
+ # versionedobjects containing
+ # https://review.openstack.org/#/c/207998/
if version == obj.VERSION:
version = None
return obj.obj_to_primitive(target_version=version)
+
+
+def _object_topic(obj):
+ resource_type = resources.get_resource_type(obj)
+ return topics.RESOURCE_TOPIC_PATTERN % {
+ 'resource_type': resource_type, 'version': obj.VERSION}
+
+
+class ResourcesPushRpcApi(object):
+ """Plugin-side RPC for plugin-to-agents interaction.
+
+ This interface is designed to push versioned object updates to interested
+ agents using fanout topics.
+
+ This class implements the caller side of an rpc interface. The receiver
+ side can be found below: ResourcesPushRpcCallback.
+ """
+
+ def __init__(self):
+ target = oslo_messaging.Target(
+ version='1.0',
+ namespace=constants.RPC_NAMESPACE_RESOURCES)
+ self.client = n_rpc.get_client(target)
+
+ def _prepare_object_fanout_context(self, obj):
+ """Prepare fanout context, one topic per object type."""
+ obj_topic = _object_topic(obj)
+ return self.client.prepare(fanout=True, topic=obj_topic)
+
+ @log_helpers.log_method_call
+ def push(self, context, resource, event_type):
+ resource_type = resources.get_resource_type(resource)
+ _validate_resource_type(resource_type)
+ cctxt = self._prepare_object_fanout_context(resource)
+ #TODO(QoS): Push notifications for every known version once we have
+ # multiple of those
+ dehydrated_resource = resource.obj_to_primitive()
+ cctxt.cast(context, 'push',
+ resource=dehydrated_resource,
+ event_type=event_type)
+
+
+class ResourcesPushRpcCallback(object):
+ """Agent-side RPC for plugin-to-agents interaction.
+
+ This class implements the receiver for notification about versioned objects
+ resource updates used by neutron.api.rpc.callbacks. You can find the
+ caller side in ResourcesPushRpcApi.
+ """
+ # History
+ # 1.0 Initial version
+
+ target = oslo_messaging.Target(version='1.0',
+ namespace=constants.RPC_NAMESPACE_RESOURCES)
+
+ def push(self, context, resource, event_type):
+ resource_obj = obj_base.NeutronObject.clean_obj_from_primitive(
+ resource)
+ LOG.debug("Resources notification (%(event_type)s): %(resource)s",
+ {'event_type': event_type, 'resource': repr(resource_obj)})
+ resource_type = resources.get_resource_type(resource_obj)
+ cons_registry.push(resource_type, resource_obj, event_type)
import mock
from oslo_utils import uuidutils
+from oslo_versionedobjects import base as obj_base
+from oslo_versionedobjects import fields as obj_fields
+import testtools
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
+from neutron.common import topics
from neutron import context
-from neutron.objects.qos import policy
+from neutron.objects import base as objects_base
from neutron.tests import base
+@obj_base.VersionedObjectRegistry.register
+class FakeResource(objects_base.NeutronObject):
+
+ fields = {
+ 'id': obj_fields.UUIDField(),
+ 'field': obj_fields.StringField()
+ }
+
+ @classmethod
+ def get_objects(cls, context, **kwargs):
+ return list()
+
+
class ResourcesRpcBaseTestCase(base.BaseTestCase):
def setUp(self):
super(ResourcesRpcBaseTestCase, self).setUp()
self.context = context.get_admin_context()
- def _create_test_policy_dict(self):
+ def _create_test_dict(self):
return {'id': uuidutils.generate_uuid(),
- 'tenant_id': uuidutils.generate_uuid(),
- 'name': 'test',
- 'description': 'test',
- 'shared': False}
+ 'field': 'foo'}
+
+ def _create_test_resource(self, **kwargs):
+ resource = FakeResource(self.context, **kwargs)
+ resource.obj_reset_changes()
+ return resource
+
- def _create_test_policy(self, policy_dict):
- policy_obj = policy.QosPolicy(self.context, **policy_dict)
- policy_obj.obj_reset_changes()
- return policy_obj
+class _ValidateResourceTypeTestCase(base.BaseTestCase):
+ def setUp(self):
+ super(_ValidateResourceTypeTestCase, self).setUp()
+ self.is_valid_mock = mock.patch.object(
+ resources_rpc.resources, 'is_valid_resource_type').start()
+
+ def test_valid_type(self):
+ self.is_valid_mock.return_value = True
+ resources_rpc._validate_resource_type('foo')
+
+ def test_invalid_type(self):
+ self.is_valid_mock.return_value = False
+ with testtools.ExpectedException(
+ resources_rpc.InvalidResourceTypeClass):
+ resources_rpc._validate_resource_type('foo')
class ResourcesPullRpcApiTestCase(ResourcesRpcBaseTestCase):
def setUp(self):
super(ResourcesPullRpcApiTestCase, self).setUp()
- self.client_p = mock.patch.object(resources_rpc.n_rpc, 'get_client')
- self.client = self.client_p.start()
+ mock.patch.object(resources_rpc.n_rpc, 'get_client').start()
+ mock.patch.object(resources_rpc, '_validate_resource_type').start()
+ mock.patch('neutron.api.rpc.callbacks.resources.get_resource_cls',
+ return_value=FakeResource).start()
self.rpc = resources_rpc.ResourcesPullRpcApi()
- self.mock_cctxt = self.rpc.client.prepare.return_value
+ self.cctxt_mock = self.rpc.client.prepare.return_value
def test_is_singleton(self):
self.assertEqual(id(self.rpc),
id(resources_rpc.ResourcesPullRpcApi()))
def test_pull(self):
- policy_dict = self._create_test_policy_dict()
- expected_policy_obj = self._create_test_policy(policy_dict)
- qos_policy_id = policy_dict['id']
- self.mock_cctxt.call.return_value = (
- expected_policy_obj.obj_to_primitive())
- pull_result = self.rpc.pull(
- self.context, resources.QOS_POLICY, qos_policy_id)
- self.mock_cctxt.call.assert_called_once_with(
- self.context, 'pull', resource_type=resources.QOS_POLICY,
- version=policy.QosPolicy.VERSION, resource_id=qos_policy_id)
- self.assertEqual(expected_policy_obj, pull_result)
-
- def test_pull_invalid_resource_type_cls(self):
- self.assertRaises(
- resources_rpc.InvalidResourceTypeClass, self.rpc.pull,
- self.context, 'foo_type', 'foo_id')
+ resource_dict = self._create_test_dict()
+ expected_obj = self._create_test_resource(**resource_dict)
+ resource_id = resource_dict['id']
+ self.cctxt_mock.call.return_value = expected_obj.obj_to_primitive()
+
+ result = self.rpc.pull(
+ self.context, FakeResource.obj_name(), resource_id)
+
+ self.cctxt_mock.call.assert_called_once_with(
+ self.context, 'pull', resource_type='FakeResource',
+ version=FakeResource.VERSION, resource_id=resource_id)
+ self.assertEqual(expected_obj, result)
def test_pull_resource_not_found(self):
- policy_dict = self._create_test_policy_dict()
- qos_policy_id = policy_dict['id']
- self.mock_cctxt.call.return_value = None
- self.assertRaises(
- resources_rpc.ResourceNotFound, self.rpc.pull,
- self.context, resources.QOS_POLICY, qos_policy_id)
+ resource_dict = self._create_test_dict()
+ resource_id = resource_dict['id']
+ self.cctxt_mock.call.return_value = None
+ with testtools.ExpectedException(resources_rpc.ResourceNotFound):
+ self.rpc.pull(self.context, FakeResource.obj_name(),
+ resource_id)
class ResourcesPullRpcCallbackTestCase(ResourcesRpcBaseTestCase):
def setUp(self):
super(ResourcesPullRpcCallbackTestCase, self).setUp()
self.callbacks = resources_rpc.ResourcesPullRpcCallback()
+ self.resource_dict = self._create_test_dict()
+ self.resource_obj = self._create_test_resource(**self.resource_dict)
def test_pull(self):
- policy_dict = self._create_test_policy_dict()
- policy_obj = self._create_test_policy(policy_dict)
- qos_policy_id = policy_dict['id']
- with mock.patch.object(resources_rpc.registry, 'pull',
- return_value=policy_obj) as registry_mock:
+ with mock.patch.object(
+ resources_rpc.prod_registry, 'pull',
+ return_value=self.resource_obj) as registry_mock:
primitive = self.callbacks.pull(
- self.context, resource_type=resources.QOS_POLICY,
- version=policy.QosPolicy.VERSION,
- resource_id=qos_policy_id)
- registry_mock.assert_called_once_with(
- resources.QOS_POLICY,
- qos_policy_id, context=self.context)
- self.assertEqual(policy_dict, primitive['versioned_object.data'])
- self.assertEqual(policy_obj.obj_to_primitive(), primitive)
-
- @mock.patch.object(policy.QosPolicy, 'obj_to_primitive')
+ self.context, resource_type=FakeResource.obj_name(),
+ version=FakeResource.VERSION,
+ resource_id=self.resource_dict['id'])
+ registry_mock.assert_called_once_with(
+ 'FakeResource', self.resource_dict['id'], context=self.context)
+ self.assertEqual(self.resource_dict,
+ primitive['versioned_object.data'])
+ self.assertEqual(self.resource_obj.obj_to_primitive(), primitive)
+
+ @mock.patch.object(FakeResource, 'obj_to_primitive')
def test_pull_no_backport_for_latest_version(self, to_prim_mock):
- policy_dict = self._create_test_policy_dict()
- policy_obj = self._create_test_policy(policy_dict)
- qos_policy_id = policy_dict['id']
- with mock.patch.object(resources_rpc.registry, 'pull',
- return_value=policy_obj):
+ with mock.patch.object(resources_rpc.prod_registry, 'pull',
+ return_value=self.resource_obj):
self.callbacks.pull(
- self.context, resource_type=resources.QOS_POLICY,
- version=policy.QosPolicy.VERSION,
- resource_id=qos_policy_id)
- to_prim_mock.assert_called_with(target_version=None)
+ self.context, resource_type=FakeResource.obj_name(),
+ version=FakeResource.VERSION,
+ resource_id=self.resource_obj.id)
+ to_prim_mock.assert_called_with(target_version=None)
- @mock.patch.object(policy.QosPolicy, 'obj_to_primitive')
+ @mock.patch.object(FakeResource, 'obj_to_primitive')
def test_pull_backports_to_older_version(self, to_prim_mock):
- policy_dict = self._create_test_policy_dict()
- policy_obj = self._create_test_policy(policy_dict)
- qos_policy_id = policy_dict['id']
- with mock.patch.object(resources_rpc.registry, 'pull',
- return_value=policy_obj):
+ with mock.patch.object(resources_rpc.prod_registry, 'pull',
+ return_value=self.resource_obj):
self.callbacks.pull(
- self.context, resource_type=resources.QOS_POLICY,
+ self.context, resource_type=FakeResource.obj_name(),
version='0.9', # less than initial version 1.0
- resource_id=qos_policy_id)
+ resource_id=self.resource_dict['id'])
to_prim_mock.assert_called_with(target_version='0.9')
+
+
+class ResourcesPushRpcApiTestCase(ResourcesRpcBaseTestCase):
+
+ def setUp(self):
+ super(ResourcesPushRpcApiTestCase, self).setUp()
+ mock.patch.object(resources_rpc.n_rpc, 'get_client').start()
+ mock.patch.object(resources_rpc, '_validate_resource_type').start()
+ self.rpc = resources_rpc.ResourcesPushRpcApi()
+ self.cctxt_mock = self.rpc.client.prepare.return_value
+ resource_dict = self._create_test_dict()
+ self.resource_obj = self._create_test_resource(**resource_dict)
+
+ def test__prepare_object_fanout_context(self):
+ expected_topic = topics.RESOURCE_TOPIC_PATTERN % {
+ 'resource_type': resources.get_resource_type(self.resource_obj),
+ 'version': self.resource_obj.VERSION}
+
+ observed = self.rpc._prepare_object_fanout_context(self.resource_obj)
+
+ self.rpc.client.prepare.assert_called_once_with(
+ fanout=True, topic=expected_topic)
+ self.assertEqual(self.cctxt_mock, observed)
+
+ def test_push(self):
+ self.rpc.push(
+ self.context, self.resource_obj, 'TYPE')
+
+ self.cctxt_mock.cast.assert_called_once_with(
+ self.context, 'push',
+ resource=self.resource_obj.obj_to_primitive(),
+ event_type='TYPE')
+
+
+class ResourcesPushRpcCallbackTestCase(ResourcesRpcBaseTestCase):
+
+ def setUp(self):
+ super(ResourcesPushRpcCallbackTestCase, self).setUp()
+ mock.patch.object(resources_rpc, '_validate_resource_type').start()
+ mock.patch.object(
+ resources_rpc.resources,
+ 'get_resource_cls', return_value=FakeResource).start()
+ resource_dict = self._create_test_dict()
+ self.resource_obj = self._create_test_resource(**resource_dict)
+ self.resource_prim = self.resource_obj.obj_to_primitive()
+ self.callbacks = resources_rpc.ResourcesPushRpcCallback()
+
+ @mock.patch.object(resources_rpc.cons_registry, 'push')
+ def test_push(self, reg_push_mock):
+ self.callbacks.push(self.context, self.resource_prim, 'TYPE')
+ reg_push_mock.assert_called_once_with(self.resource_obj.obj_name(),
+ self.resource_obj, 'TYPE')