1 # Copyright (c) 2015 Mellanox Technologies, Ltd
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may
5 # not use this file except in compliance with the License. You may obtain
6 # a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations
16 from oslo_log import helpers as log_helpers
17 from oslo_log import log as logging
20 from neutron._i18n import _
21 from neutron.api.rpc.callbacks.consumer import registry as cons_registry
22 from neutron.api.rpc.callbacks.producer import registry as prod_registry
23 from neutron.api.rpc.callbacks import resources
24 from neutron.common import constants
25 from neutron.common import exceptions
26 from neutron.common import rpc as n_rpc
27 from neutron.common import topics
28 from neutron.objects import base as obj_base
31 LOG = logging.getLogger(__name__)
34 class ResourcesRpcError(exceptions.NeutronException):
38 class InvalidResourceTypeClass(ResourcesRpcError):
39 message = _("Invalid resource type %(resource_type)s")
42 class ResourceNotFound(ResourcesRpcError):
43 message = _("Resource %(resource_id)s of type %(resource_type)s "
47 def _validate_resource_type(resource_type):
48 if not resources.is_valid_resource_type(resource_type):
49 raise InvalidResourceTypeClass(resource_type=resource_type)
52 def resource_type_versioned_topic(resource_type):
53 _validate_resource_type(resource_type)
54 cls = resources.get_resource_cls(resource_type)
55 return topics.RESOURCE_TOPIC_PATTERN % {'resource_type': resource_type,
56 'version': cls.VERSION}
59 class ResourcesPullRpcApi(object):
60 """Agent-side RPC (stub) for agent-to-plugin interaction.
62 This class implements the client side of an rpc interface. The server side
63 can be found below: ResourcesPullRpcCallback. For more information on
64 this RPC interface, see doc/source/devref/rpc_callbacks.rst.
69 if not hasattr(cls, '_instance'):
70 cls._instance = super(ResourcesPullRpcApi, cls).__new__(cls)
71 target = oslo_messaging.Target(
72 topic=topics.PLUGIN, version='1.0',
73 namespace=constants.RPC_NAMESPACE_RESOURCES)
74 cls._instance.client = n_rpc.get_client(target)
77 @log_helpers.log_method_call
78 def pull(self, context, resource_type, resource_id):
79 _validate_resource_type(resource_type)
81 # we've already validated the resource type, so we are pretty sure the
82 # class is there => no need to validate it specifically
83 resource_type_cls = resources.get_resource_cls(resource_type)
85 cctxt = self.client.prepare()
86 primitive = cctxt.call(context, 'pull',
87 resource_type=resource_type,
88 version=resource_type_cls.VERSION, resource_id=resource_id)
91 raise ResourceNotFound(resource_type=resource_type,
92 resource_id=resource_id)
94 return resource_type_cls.clean_obj_from_primitive(primitive)
97 class ResourcesPullRpcCallback(object):
98 """Plugin-side RPC (implementation) for agent-to-plugin interaction.
100 This class implements the server side of an rpc interface. The client side
101 can be found above: ResourcesPullRpcApi. For more information on
102 this RPC interface, see doc/source/devref/rpc_callbacks.rst.
106 # 1.0 Initial version
108 target = oslo_messaging.Target(
109 version='1.0', namespace=constants.RPC_NAMESPACE_RESOURCES)
111 def pull(self, context, resource_type, version, resource_id):
112 obj = prod_registry.pull(resource_type, resource_id, context=context)
114 return obj.obj_to_primitive(target_version=version)
117 class ResourcesPushRpcApi(object):
118 """Plugin-side RPC for plugin-to-agents interaction.
120 This interface is designed to push versioned object updates to interested
121 agents using fanout topics.
123 This class implements the caller side of an rpc interface. The receiver
124 side can be found below: ResourcesPushRpcCallback.
128 target = oslo_messaging.Target(
130 namespace=constants.RPC_NAMESPACE_RESOURCES)
131 self.client = n_rpc.get_client(target)
133 def _prepare_object_fanout_context(self, obj):
134 """Prepare fanout context, one topic per object type."""
135 obj_topic = resource_type_versioned_topic(obj.obj_name())
136 return self.client.prepare(fanout=True, topic=obj_topic)
138 @log_helpers.log_method_call
139 def push(self, context, resource, event_type):
140 resource_type = resources.get_resource_type(resource)
141 _validate_resource_type(resource_type)
142 cctxt = self._prepare_object_fanout_context(resource)
143 #TODO(QoS): Push notifications for every known version once we have
145 dehydrated_resource = resource.obj_to_primitive()
146 cctxt.cast(context, 'push',
147 resource=dehydrated_resource,
148 event_type=event_type)
151 class ResourcesPushRpcCallback(object):
152 """Agent-side RPC for plugin-to-agents interaction.
154 This class implements the receiver for notification about versioned objects
155 resource updates used by neutron.api.rpc.callbacks. You can find the
156 caller side in ResourcesPushRpcApi.
159 # 1.0 Initial version
161 target = oslo_messaging.Target(version='1.0',
162 namespace=constants.RPC_NAMESPACE_RESOURCES)
164 def push(self, context, resource, event_type):
165 resource_obj = obj_base.NeutronObject.clean_obj_from_primitive(
167 LOG.debug("Resources notification (%(event_type)s): %(resource)s",
168 {'event_type': event_type, 'resource': repr(resource_obj)})
169 resource_type = resources.get_resource_type(resource_obj)
170 cons_registry.push(resource_type, resource_obj, event_type)