From f8e2c112829157a0ec12cfd1afa1d1a7ab9becb4 Mon Sep 17 00:00:00 2001 From: Yong Sheng Gong Date: Wed, 25 Jul 2012 22:55:33 +0800 Subject: [PATCH] Notification for network/subnet/port create/delete/update. blueprint quantum-notifications Add host configuration into conf, which is determined by socket.gethostname() function by default. Host name is part of publiser id, which is in the format of 'network'.$host. We add create/update/delete start and end notification for each kind of resource. By default, the notification do nothing since the notifier driver is no_op_notifier. We can define it in quantum.conf. Change-Id: Ibc5eacac7a324584e6ccff120f573444932a88ef --- etc/quantum.conf | 109 ++++++++++++++++++++++++++++++ quantum/api/v2/base.py | 51 +++++++++++++- quantum/tests/unit/test_api_v2.py | 46 +++++++++++++ 3 files changed, 204 insertions(+), 2 deletions(-) diff --git a/etc/quantum.conf b/etc/quantum.conf index 7a563e38f..247bdf96e 100644 --- a/etc/quantum.conf +++ b/etc/quantum.conf @@ -47,3 +47,112 @@ api_paste_config = api-paste.ini # default driver to use for quota checks # quota_driver = quantum.quota.ConfDriver + +# ============ Notification System Options ===================== + +# Notifications can be sent when network/subnet/port are create, updated or deleted. +# There are four methods of sending notifications, logging (via the +# log_file directive), rpc (via a message queue), +# noop (no notifications sent, the default) or list of them + +# Defined in notifier api +# notification_driver = quantum.openstack.common.notifier.no_op_notifier +# default_notification_level = INFO +# myhost = myhost.com +# default_publisher_id = $myhost + +# Defined in rabbit_notifier for rpc way +# notification_topics = notifications + +# Defined in list_notifier +# list_notifier_drivers = quantum.openstack.common.notifier.no_op_notifier + +# Defined in rpc __init__ +# The messaging module to use, defaults to kombu. +# rpc_backend =quantum.openstack.common.notifier.rpc.impl_kombu +# Size of RPC thread pool +# rpc_thread_pool_size = 64, +# Size of RPC connection pool +# rpc_conn_pool_size = 30 +# Seconds to wait for a response from call or multicall +# rpc_response_timeout = 60 +# Seconds to wait before a cast expires (TTL). Only supported by impl_zmq. +# rpc_cast_timeout = 30 +# Modules of exceptions that are permitted to be recreated +# upon receiving exception data from an rpc call. +# allowed_rpc_exception_modules = quantum.openstack.common.exception, nova.exception +# AMQP exchange to connect to if using RabbitMQ or Qpid +# control_exchange = nova +# If passed, use a fake RabbitMQ provider +# fake_rabbit = False + +# Configuration options if sending notifications via kombu rpc (these are +# the defaults) +# SSL version to use (valid only if SSL enabled) +# kombu_ssl_version = +# SSL key file (valid only if SSL enabled) +# kombu_ssl_keyfile = +# SSL cert file (valid only if SSL enabled) +# kombu_ssl_certfile = +# SSL certification authority file (valid only if SSL enabled)' +# kombu_ssl_ca_certs = +# IP address of the RabbitMQ installation +# rabbit_host = localhost +# Password of the RabbitMQ server +# rabbit_password = guest +# Port where RabbitMQ server is running/listening +# rabbit_port = 5672 +# User ID used for RabbitMQ connections +# rabbit_userid = guest +# Location of a virtual RabbitMQ installation. +# rabbit_virtual_host = / +# Maximum retries with trying to connect to RabbitMQ +# (the default of 0 implies an infinite retry count) +# rabbit_max_retries = 0 +# RabbitMQ connection retry interval +# rabbit_retry_interval = 1 + +# QPID +# rpc_backend=quantum.openstack.common.rpc.impl_qpid +# Qpid broker hostname +# qpid_hostname = localhost +# Qpid broker port +# qpid_port = 5672 +# Username for qpid connection +# qpid_username = '' +# Password for qpid connection +# qpid_password = '' +# Space separated list of SASL mechanisms to use for auth +# qpid_sasl_mechanisms = '' +# Automatically reconnect +# qpid_reconnect = True +# Reconnection timeout in seconds +# qpid_reconnect_timeout = 0 +# Max reconnections before giving up +# qpid_reconnect_limit = 0 +# Minimum seconds between reconnection attempts +# qpid_reconnect_interval_min = 0 +# Maximum seconds between reconnection attempts +# qpid_reconnect_interval_max = 0 +# Equivalent to setting max and min to the same value +# qpid_reconnect_interval = 0 +# Seconds between connection keepalive heartbeats +# qpid_heartbeat = 5 +# Transport to use, either 'tcp' or 'ssl' +# qpid_protocol = tcp +# Disable Nagle algorithm +# qpid_tcp_nodelay = True + +# ZMQ +# rpc_backend=quantum.openstack.common.rpc.impl_zmq +# ZeroMQ bind address. Should be a wildcard (*), an ethernet interface, or IP. +# The "host" option should point or resolve to this address. +# rpc_zmq_bind_address = * +# MatchMaker driver +# rpc_zmq_matchmaker = openstack.common.rpc.matchmaker.MatchMakerLocalhost +# ZeroMQ receiver listening port +# rpc_zmq_port = 9501 +# Number of ZeroMQ contexts, defaults to 1 +# rpc_zmq_contexts = 1 +# Directory for holding IPC sockets +# rpc_zmq_ipc_dir = /var/run/openstack diff --git a/quantum/api/v2/base.py b/quantum/api/v2/base.py index c093a8e9e..f43bb470a 100644 --- a/quantum/api/v2/base.py +++ b/quantum/api/v2/base.py @@ -14,12 +14,16 @@ # limitations under the License. import logging +import socket + import webob.exc from quantum.api.v2 import attributes from quantum.api.v2 import resource as wsgi_resource from quantum.common import exceptions from quantum.common import utils +from quantum.openstack.common import cfg +from quantum.openstack.common.notifier import api as notifier_api from quantum import policy from quantum import quota @@ -40,6 +44,14 @@ FAULT_MAP = {exceptions.NotFound: webob.exc.HTTPNotFound, QUOTAS = quota.QUOTAS +def _get_hostname(): + return socket.gethostname() + + +# Register the configuration options +cfg.CONF.register_opt(cfg.StrOpt('host', default=_get_hostname())) + + def fields(request): """ Extracts the list of fields to return @@ -111,6 +123,7 @@ class Controller(object): self._policy_attrs = [name for (name, info) in self._attr_info.items() if 'required_by_policy' in info and info['required_by_policy']] + self._publisher_id = notifier_api.publisher_id('network') def _is_visible(self, attr): attr_val = self._attr_info.get(attr) @@ -189,6 +202,11 @@ class Controller(object): def create(self, request, body=None): """Creates a new instance of the requested entity""" + notifier_api.notify(request.context, + self._publisher_id, + self._resource + '.create.start', + notifier_api.INFO, + body) body = self._prepare_request_body(request.context, body, True, allow_bulk=True) action = "create_%s" % self._resource @@ -229,10 +247,21 @@ class Controller(object): obj_creator = getattr(self._plugin, action) kwargs = {self._resource: body} obj = obj_creator(request.context, **kwargs) - return {self._resource: self._view(obj)} + result = {self._resource: self._view(obj)} + notifier_api.notify(request.context, + self._publisher_id, + self._resource + '.create.end', + notifier_api.INFO, + result) + return result def delete(self, request, id): """Deletes the specified entity""" + notifier_api.notify(request.context, + self._publisher_id, + self._resource + '.delete.start', + notifier_api.INFO, + {self._resource + '_id': id}) action = "delete_%s" % self._resource # Check authz @@ -246,9 +275,21 @@ class Controller(object): obj_deleter = getattr(self._plugin, action) obj_deleter(request.context, id) + notifier_api.notify(request.context, + self._publisher_id, + self._resource + '.delete.end', + notifier_api.INFO, + {self._resource + '_id': id}) def update(self, request, id, body=None): """Updates the specified entity's attributes""" + payload = body.copy() + payload['id'] = id + notifier_api.notify(request.context, + self._publisher_id, + self._resource + '.update.start', + notifier_api.INFO, + payload) body = self._prepare_request_body(request.context, body, False) action = "update_%s" % self._resource @@ -264,7 +305,13 @@ class Controller(object): obj_updater = getattr(self._plugin, action) kwargs = {self._resource: body} obj = obj_updater(request.context, id, **kwargs) - return {self._resource: self._view(obj)} + result = {self._resource: self._view(obj)} + notifier_api.notify(request.context, + self._publisher_id, + self._resource + '.update.end', + notifier_api.INFO, + result) + return result def _populate_tenant_id(self, context, res_dict, is_create): diff --git a/quantum/tests/unit/test_api_v2.py b/quantum/tests/unit/test_api_v2.py index ec887bc10..1bf680f23 100644 --- a/quantum/tests/unit/test_api_v2.py +++ b/quantum/tests/unit/test_api_v2.py @@ -32,6 +32,7 @@ from quantum import context from quantum.extensions.extensions import PluginAwareExtensionManager from quantum.manager import QuantumManager from quantum.openstack.common import cfg +from quantum.openstack.common.notifier import api as notifer_api LOG = logging.getLogger(__name__) @@ -755,6 +756,51 @@ class V2Views(unittest.TestCase): self._view(keys, 'subnets', 'subnet') +class NotificationTest(APIv2TestBase): + def _resource_op_notifier(self, opname, resource, expected_errors=False): + initial_input = {resource: {'name': 'myname'}} + instance = self.plugin.return_value + instance.get_networks.return_value = initial_input + expected_code = exc.HTTPCreated.code + with mock.patch.object(notifer_api, 'notify') as mynotifier: + if opname == 'create': + initial_input[resource]['tenant_id'] = _uuid() + res = self.api.post_json( + _get_path('networks'), initial_input, expected_errors) + if opname == 'update': + res = self.api.put_json( + _get_path('networks', id=_uuid()), + initial_input, expect_errors=expected_errors) + expected_code = exc.HTTPOk.code + if opname == 'delete': + initial_input[resource]['tenant_id'] = _uuid() + res = self.api.delete( + _get_path('networks', id=_uuid()), + expect_errors=expected_errors) + expected_code = exc.HTTPNoContent.code + expected = [mock.call(mock.ANY, + 'network.' + cfg.CONF.host, + resource + "." + opname + ".start", + 'INFO', + mock.ANY), + mock.call(mock.ANY, + 'network.' + cfg.CONF.host, + resource + "." + opname + ".end", + 'INFO', + mock.ANY)] + self.assertEqual(expected, mynotifier.call_args_list) + self.assertEqual(res.status_int, expected_code) + + def test_network_create_notifer(self): + self._resource_op_notifier('create', 'network') + + def test_network_delete_notifer(self): + self._resource_op_notifier('delete', 'network') + + def test_network_update_notifer(self): + self._resource_op_notifier('update', 'network') + + class QuotaTest(APIv2TestBase): def test_create_network_quota(self): cfg.CONF.set_override('quota_network', 1, group='QUOTAS') -- 2.45.2