plugin-api
db_layer
rpc_api
+ rpc_callbacks
layer3
l2_agents
quality_of_service
--- /dev/null
+=================================
+Neutron Messaging Callback System
+=================================
+
+Neutron already has a callback system [link-to: callbacks.rst] for
+in-process resource callbacks where publishers and subscribers are able
+to publish, subscribe and extend resources.
+
+This system is different, and is intended to be used for inter-process
+callbacks, via the messaging fanout mechanisms.
+
+In Neutron, agents may need to subscribe to specific resource details which
+may change over time. And the purpose of this messaging callback system
+is to allow agent subscription to those resources without the need to extend
+modify existing RPC calls, or creating new RPC messages.
+
+A few resource which can benefit of this system:
+
+* security groups members
+* security group rules,
+* QoS policies.
+
+Using a remote publisher/subscriber pattern, the information about such
+resources could be published using fanout queues to all interested nodes,
+minimizing messaging requests from agents to server since the agents
+get subscribed for their whole lifecycle (unless they unsubscribe).
+
+Within an agent, there could be multiple subscriber callbacks to the same
+resource events, the resources updates would be dispatched to the subscriber
+callbacks from a single message. Any update would come in a single message,
+doing only a single oslo versioned objects deserialization on each receiving
+agent.
+
+This publishing/subscription mechanism is highly dependent on the format
+of the resources passed around. This is why the library only allows
+versioned objects to be published and subscribed. Oslo versioned objects
+allow object version down/up conversion. #[vo_mkcompat]_ #[vo_mkcptests]_
+
+For the VO's versioning schema look here: #[vo_versioning]_
+
+
+
+versioned_objects serialization/deserialization with the
+obj_to_primitive(target_version=..) and primitive_to_obj() #[ov_serdes]_
+methods is used internally to convert/retrieve objects before/after messaging.
+
+Considering rolling upgrades, there are several scenarios to look at:
+
+* publisher (generally neutron-server or a service) and subscriber (agent)
+ know the same version of the objects, so they serialize, and deserialize
+ without issues.
+
+* publisher knows (and sends) an older version of the object, subscriber
+ will get the object updated to latest version on arrival before any
+ callback is called.
+
+* publisher sends a newer version of the object, subscriber won't be able
+ to deserialize the object, in this case (PLEASE DISCUSS), we can think of two
+ strategies:
+
+a) During upgrades, we pin neutron-server to a compatible version for resource
+ fanout updates, and server sends both the old, and the newer version to
+ different topic, queues. Old agents receive the updates on the old version
+ topic, new agents receive updates on the new version topic.
+ When the whole system upgraded, we un-pin the compatible version fanout.
+
+ A variant of this could be using a single fanout queue, and sending the
+ pinned version of the object to all. Newer agents can deserialize to the
+ latest version and upgrade any fields internally. Again at the end, we
+ unpin the version and restart the service.
+
+b) The subscriber will rpc call the publisher to start publishing also a downgraded
+ version of the object on every update on a separate queue. The complication
+ of this version, is the need to ignore new version objects as long as we keep
+ receiving the downgraded ones, and otherwise resend the request to send the
+ downgraded objects after a certain timeout (thinking of the case where the
+ request for downgraded queue is done, but the publisher restarted).
+ This approach is more complicated to implement, but more automated from the
+ administrator point of view. We may want to look into it as a second step
+ from a
+
+c) The subscriber will send a registry.get_info for the latest specific version
+ he knows off. This can have scalability issues during upgrade as any outdated
+ agent will require a flow of two messages (request, and response). This is
+ indeed very bad at scale if you have hundreds or thousands of agents.
+
+Option a seems like a reasonable strategy, similar to what nova does now with
+versioned objects.
+
+Serialized versioned objects look like::
+
+ {'versioned_object.version': '1.0',
+ 'versioned_object.name': 'QoSProfile',
+ 'versioned_object.data': {'rules': [
+ {'versioned_object.version': '1.0',
+ 'versioned_object.name': 'QoSRule',
+ 'versioned_object.data': {'name': u'a'},
+ 'versioned_object.namespace': 'versionedobjects'}
+ ],
+ 'uuid': u'abcde',
+ 'name': u'aaa'},
+ 'versioned_object.namespace': 'versionedobjects'}
+
+Topic names for the fanout queues
+=================================
+
+if we adopted option a:
+neutron-<resouce_type>_<resource_id>-<vo_version>
+[neutron-<resouce_type>_<resource_id>-<vo_version_compat>]
+
+if we adopted option b for rolling upgrades:
+neutron-<resource_type>-<resource_id>
+neutron-<resource_type>-<resource_id>-<vo_version>
+
+for option c, just:
+neutron-<resource_type>-<resource_id>
+
+Subscribing to resources
+========================
+
+Imagine that you have agent A, which just got to handle a new port, which
+has an associated security group, and QoS policy.
+
+The agent code processing port updates may look like::
+
+ from neutron.rpc_resources import events
+ from neutron.rpc_resources import resources
+ from neutron.rpc_resources import registry
+
+
+ def process_resource_updates(resource_type, resource_id, resource_list, action_type):
+
+ # send to the right handler which will update any control plane
+ # details related to the updated resource...
+
+
+ def port_update(...):
+
+ # here we extract sg_id and qos_policy_id from port..
+
+ registry.subscribe(resources.SG_RULES, sg_id,
+ callback=process_resource_updates)
+ sg_rules = registry.get_info(resources.SG_RULES, sg_id)
+
+ registry.subscribe(resources.SG_MEMBERS, sg_id,
+ callback=process_resource_updates)
+ sg_members = registry.get_info(resources.SG_MEMBERS, sg_id)
+
+ registry.subscribe(resources.QOS_RULES, qos_policy_id,
+ callback=process_resource_updates)
+ qos_rules = registry.get_info(resources.QOS_RULES, qos_policy_id,
+ callback=process_resource_updates)
+
+ cleanup_subscriptions()
+
+
+ def cleanup_subscriptions()
+ sg_ids = determine_unreferenced_sg_ids()
+ qos_policy_id = determine_unreferenced_qos_policy_ids()
+ registry.unsubscribe_info(resource.SG_RULES, sg_ids)
+ registry.unsubscribe_info(resource.SG_MEMBERS, sg_ids)
+ registry.unsubscribe_info(resource.QOS_RULES, qos_policy_id)
+
+Another unsubscription strategy could be to lazily unsubscribe resources when
+we receive updates for them, and we discover that they are not needed anymore.
+
+Deleted resources are automatically unsubscribed as we receive the delete event.
+
+NOTE(irenab): this could be extended to core resources like ports, making use
+of the standard neutron in-process callbacks at server side and propagating
+AFTER_UPDATE events, for example, but we may need to wait until those callbacks
+are used with proper versioned objects.
+
+
+Unsubscribing to resources
+==========================
+
+There are a few options to unsubscribe registered callbacks:
+
+* unsubscribe_resource_id(): it selectively unsubscribes an specific
+ resource type + id.
+* unsubscribe_resource_type(): it unsubscribes from an specific resource type,
+ any ID.
+* unsubscribe_all(): it unsubscribes all subscribed resources and ids.
+
+
+Sending resource updates
+========================
+
+On the server side, resource updates could come from anywhere, a service plugin,
+an extension, anything that updates the resource and that it's of any interest
+to the agents.
+
+The server/publisher side may look like::
+
+ from neutron.rpc_resources import events
+ from neutron.rpc_resources import resources
+ from neutron.rpc_resources import registry as rpc_registry
+
+ def add_qos_x_rule(...):
+ update_the_db(...)
+ send_rpc_updates_on_qos_policy(qos_policy_id)
+
+ def del_qos_x_rule(...):
+ update_the_db(...)
+ send_rpc_deletion_of_qos_policy(qos_policy_id)
+
+ def send_rpc_updates_on_qos_policy(qos_policy_id):
+ rules = get_qos_policy_rules_versioned_object(qos_policy_id)
+ rpc_registry.notify(resources.QOS_RULES, qos_policy_id, rules, events.UPDATE)
+
+ def send_rpc_deletion_of_qos_policy(qos_policy_id):
+ rpc_registry.notify(resources.QOS_RULES, qos_policy_id, None, events.DELETE)
+
+ # This part is added for the registry mechanism, to be able to request
+ # older versions of the notified objects if any oudated agent requires
+ # them.
+ def retrieve_older_version_callback(qos_policy_id, version):
+ return get_qos_policy_rules_versioned_object(qos_policy_id, version)
+
+ rpc_registry.register_retrieve_callback(resource.QOS_RULES,
+ retrieve_older_version_callback)
+
+References
+==========
+.. [#ov_serdes] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/tests/test_objects.py#L621
+.. [#vo_mkcompat] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/base.py#L460
+.. [#vo_mkcptests] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/tests/test_objects.py#L111
+.. [#vo_versioning] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/base.py#L236
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+UPDATED = 'updated'
+DELETED = 'deleted'
+
+VALID = (
+ UPDATED,
+ DELETED
+)
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.api.rpc.callbacks import resource_manager
+
+# TODO(ajo): consider adding locking
+CALLBACK_MANAGER = None
+
+
+def _get_resources_callback_manager():
+ global CALLBACK_MANAGER
+ if CALLBACK_MANAGER is None:
+ CALLBACK_MANAGER = resource_manager.ResourcesCallbacksManager()
+ return CALLBACK_MANAGER
+
+
+#resource implementation callback registration functions
+def get_info(resource_type, resource_id, **kwargs):
+ """Get information about resource type with resource id.
+
+ The function will check the providers for an specific remotable
+ resource and get the resource.
+
+ :returns: an oslo versioned object.
+ """
+ callback = _get_resources_callback_manager().get_callback(resource_type)
+ if callback:
+ return callback(resource_type, resource_id, **kwargs)
+
+
+def register_provider(callback, resource_type):
+ _get_resources_callback_manager().register(callback, resource_type)
+
+
+# resource RPC callback for pub/sub
+#Agent side
+def subscribe(callback, resource_type, resource_id):
+ #TODO(QoS): we have to finish the real update notifications
+ raise NotImplementedError("we should finish update notifications")
+
+
+def unsubscribe(callback, resource_type, resource_id):
+ #TODO(QoS): we have to finish the real update notifications
+ raise NotImplementedError("we should finish update notifications")
+
+
+def unsubscribe_all():
+ #TODO(QoS): we have to finish the real update notifications
+ raise NotImplementedError("we should finish update notifications")
+
+
+#Server side
+def notify(resource_type, event, obj):
+ #TODO(QoS): we have to finish the real update notifications
+ raise NotImplementedError("we should finish update notifications")
+
+
+def clear():
+ _get_resources_callback_manager().clear()
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+
+from oslo_log import log as logging
+
+from neutron.api.rpc.callbacks import resources
+from neutron.callbacks import exceptions
+
+LOG = logging.getLogger(__name__)
+
+
+class ResourcesCallbacksManager(object):
+ """A callback system that allows information providers in a loose manner.
+ """
+
+ def __init__(self):
+ self.clear()
+
+ def register(self, callback, resource):
+ """register callback for a resource .
+
+ One callback can be register to a resource
+
+ :param callback: the callback. It must raise or return a dict.
+ :param resource: the resource. It must be a valid resource.
+ """
+ LOG.debug("register: %(callback)s %(resource)s",
+ {'callback': callback, 'resource': resource})
+ if resource not in resources.VALID:
+ raise exceptions.Invalid(element='resource', value=resource)
+
+ self._callbacks[resource] = callback
+
+ def unregister(self, resource):
+ """Unregister callback from the registry.
+
+ :param callback: the callback.
+ :param resource: the resource.
+ """
+ LOG.debug("Unregister: %(resource)s",
+ {'resource': resource})
+ if resource not in resources.VALID:
+ raise exceptions.Invalid(element='resource', value=resource)
+ self._callbacks[resource] = None
+
+ def clear(self):
+ """Brings the manager to a clean slate."""
+ self._callbacks = collections.defaultdict(dict)
+
+ def get_callback(self, resource):
+ """Return the callback if found, None otherwise.
+
+ :param resource: the resource. It must be a valid resource.
+ """
+ if resource not in resources.VALID:
+ raise exceptions.Invalid(element='resource', value=resource)
+
+ return self._callbacks[resource]
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+QOS_POLICY = 'qos-policy'
+QOS_RULE = 'qos-rule'
+
+VALID = (
+ QOS_POLICY,
+ QOS_RULE,
+)
# License for the specific language governing permissions and limitations
# under the License.
+from neutron import manager
+
+from neutron.api.rpc.callbacks import registry as rpc_registry
+from neutron.api.rpc.callbacks import resources
from neutron.extensions import qos
+from neutron.i18n import _LW
+from neutron.plugins.common import constants
+
+from oslo_log import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+#TODO(QoS): remove this stub when db is ready
+def _get_qos_policy_cb_stub(resource, policy_id, **kwargs):
+ """Hardcoded stub for testing until we get the db working."""
+ qos_policy = {
+ "tenant_id": "8d4c70a21fed4aeba121a1a429ba0d04",
+ "id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
+ "name": "10Mbit",
+ "description": "This policy limits the ports to 10Mbit max.",
+ "shared": False,
+ "rules": [{
+ "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
+ "max_kbps": "10000",
+ "max_burst_kbps": "0",
+ "type": "bandwidth_limit"
+ }]
+ }
+ return qos_policy
+
+
+def _get_qos_policy_cb(resource, policy_id, **kwargs):
+ qos_plugin = manager.NeutronManager.get_service_plugins().get(
+ constants.QOS)
+ context = kwargs.get('context')
+ if context is None:
+ LOG.warning(_LW(
+ 'Received %(resource)s %(policy_id)s without context'),
+ {'resource': resource, 'policy_id': policy_id}
+ )
+ return
+
+ qos_policy = qos_plugin.get_qos_policy(context, policy_id)
+ return qos_policy
+
+
+#TODO(QoS): remove this stub when db is ready
+def _get_qos_bandwidth_limit_rule_cb_stub(resource, rule_id, **kwargs):
+ """Hardcoded for testing until we get the db working."""
+ bandwidth_limit = {
+ "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
+ "qos_policy_id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
+ "max_kbps": "10000",
+ "max_burst_kbps": "0",
+ }
+ return bandwidth_limit
+
+
+def _get_qos_bandwidth_limit_rule_cb(resource, rule_id, **kwargs):
+ qos_plugin = manager.NeutronManager.get_service_plugins().get(
+ constants.QOS)
+ context = kwargs.get('context')
+ if context is None:
+ LOG.warning(_LW(
+ 'Received %(resource)s %(rule_id,)s without context '),
+ {'resource': resource, 'rule_id,': rule_id}
+ )
+ return
+
+ bandwidth_limit = qos_plugin.get_qos_bandwidth_limit_rule(
+ context,
+ rule_id)
+ return bandwidth_limit
class QoSPlugin(qos.QoSPluginBase):
def __init__(self):
super(QoSPlugin, self).__init__()
- #self.register_rpc()
+ self.register_resource_providers()
#self.register_port_callbacks()
#self.register_net_callbacks()
-
- def register_rpc(self):
- # RPC support
- # TODO(ajo): register ourselves to the generic RPC framework
- # so we will provide QoS information for ports and
- # networks.
- pass
+ self._inline_test()
+
+ def _inline_test(self):
+ #TODO(gampel) remove inline unitesting
+ self.ctx = None
+ kwargs = {'context': self.ctx}
+ qos_policy = rpc_registry.get_info(
+ resources.QOS_POLICY,
+ "46ebaec0-0570-43ac-82f6-60d2b03168c4",
+ **kwargs)
+
+ LOG.debug("qos_policy test : %s)",
+ qos_policy)
+
+ def register_resource_providers(self):
+ rpc_registry.register_provider(
+ _get_qos_bandwidth_limit_rule_cb_stub,
+ resources.QOS_RULE)
+
+ rpc_registry.register_provider(
+ _get_qos_policy_cb_stub,
+ resources.QOS_POLICY)
def register_port_callbacks(self):
# TODO(qos): Register the callbacks to properly manage
--- /dev/null
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+from neutron.api.rpc.callbacks import registry as rpc_registry
+from neutron.api.rpc.callbacks import resources
+
+
+from neutron.tests import base
+
+
+class ResourcesCallbackRequestTestCase(base.BaseTestCase):
+
+ def setUp(self):
+ super(ResourcesCallbackRequestTestCase, self).setUp()
+ self.resource_id = '46ebaec0-0570-43ac-82f6-60d2b03168c4'
+ self.qos_rule_id = '5f126d84-551a-4dcf-bb01-0e9c0df0c793'
+
+ def test_resource_callback_request(self):
+
+ #TODO(QoS) convert it to the version object format
+ def _get_qos_policy_cb(resource, policy_id, **kwargs):
+ qos_policy = {
+ "tenant_id": "8d4c70a21fed4aeba121a1a429ba0d04",
+ "id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
+ "name": "10Mbit",
+ "description": "This policy limits the ports to 10Mbit max.",
+ "shared": False,
+ "rules": [{
+ "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
+ "max_kbps": "10000",
+ "max_burst_kbps": "0",
+ "type": "bnadwidth_limit"
+ }]
+ }
+ return qos_policy
+
+ #TODO(QoS) convert it to the version object format
+ def _get_qos_bandwidth_limit_rule_cb(resource, rule_id, **kwargs):
+ bandwidth_limit = {
+ "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793",
+ "qos_policy_id": "46ebaec0-0570-43ac-82f6-60d2b03168c4",
+ "max_kbps": "10000",
+ "max_burst_kbps": "0",
+ }
+ return bandwidth_limit
+
+ rpc_registry.register_provider(
+ _get_qos_bandwidth_limit_rule_cb,
+ resources.QOS_RULE)
+
+ rpc_registry.register_provider(
+ _get_qos_policy_cb,
+ resources.QOS_POLICY)
+
+ self.ctx = None
+ kwargs = {'context': self.ctx}
+
+ qos_policy = rpc_registry.get_info(
+ resources.QOS_POLICY,
+ self.resource_id,
+ **kwargs)
+ self.assertEqual(self.resource_id, qos_policy['id'])
+
+ qos_rule = rpc_registry.get_info(
+ resources.QOS_RULE,
+ self.qos_rule_id,
+ **kwargs)
+ self.assertEqual(self.qos_rule_id, qos_rule['id'])