]> review.fuel-infra Code Review - openstack-build/neutron-build.git/commitdiff
Notification for network/subnet/port create/delete/update.
authorYong Sheng Gong <gongysh@cn.ibm.com>
Wed, 25 Jul 2012 14:55:33 +0000 (22:55 +0800)
committerYong Sheng Gong <gongysh@cn.ibm.com>
Thu, 2 Aug 2012 07:26:33 +0000 (15:26 +0800)
blueprint quantum-notifications

Add host configuration into conf, which is determined by socket.gethostname()
function by default. Host name is part of publiser id, which is in the format
of 'network'.$host.

We add create/update/delete start and end notification for each kind of resource.
By default, the notification do nothing since the notifier driver is no_op_notifier.
We can define it in quantum.conf.

Change-Id: Ibc5eacac7a324584e6ccff120f573444932a88ef

etc/quantum.conf
quantum/api/v2/base.py
quantum/tests/unit/test_api_v2.py

index 7a563e38fddb0de6de352b44ef826620bdbce4d4..247bdf96eb7c9c0cb9b1d421b9b969245f8a922e 100644 (file)
@@ -47,3 +47,112 @@ api_paste_config = api-paste.ini
 
 # default driver to use for quota checks
 # quota_driver = quantum.quota.ConfDriver
+
+# ============ Notification System Options =====================
+
+# Notifications can be sent when network/subnet/port are create, updated or deleted.
+# There are four methods of sending notifications, logging (via the
+# log_file directive), rpc (via a message queue),
+# noop (no notifications sent, the default) or list of them
+
+# Defined in notifier api
+# notification_driver = quantum.openstack.common.notifier.no_op_notifier
+# default_notification_level = INFO
+# myhost = myhost.com
+# default_publisher_id = $myhost
+
+# Defined in rabbit_notifier for rpc way
+# notification_topics = notifications
+
+# Defined in list_notifier
+# list_notifier_drivers = quantum.openstack.common.notifier.no_op_notifier
+
+# Defined in rpc __init__
+# The messaging module to use, defaults to kombu.
+# rpc_backend =quantum.openstack.common.notifier.rpc.impl_kombu
+# Size of RPC thread pool
+# rpc_thread_pool_size = 64,
+# Size of RPC connection pool
+# rpc_conn_pool_size = 30
+# Seconds to wait for a response from call or multicall
+# rpc_response_timeout = 60
+# Seconds to wait before a cast expires (TTL). Only supported by impl_zmq.
+# rpc_cast_timeout = 30
+# Modules of exceptions that are permitted to be recreated
+# upon receiving exception data from an rpc call.
+# allowed_rpc_exception_modules = quantum.openstack.common.exception, nova.exception
+# AMQP exchange to connect to if using RabbitMQ or Qpid
+# control_exchange = nova
+# If passed, use a fake RabbitMQ provider
+# fake_rabbit = False
+
+# Configuration options if sending notifications via kombu rpc (these are
+# the defaults)
+# SSL version to use (valid only if SSL enabled)
+# kombu_ssl_version =
+# SSL key file (valid only if SSL enabled)
+# kombu_ssl_keyfile =
+# SSL cert file (valid only if SSL enabled)
+# kombu_ssl_certfile =
+# SSL certification authority file (valid only if SSL enabled)'
+# kombu_ssl_ca_certs =
+# IP address of the RabbitMQ installation
+# rabbit_host = localhost
+# Password of the RabbitMQ server
+# rabbit_password = guest
+# Port where RabbitMQ server is running/listening
+# rabbit_port = 5672
+# User ID used for RabbitMQ connections
+# rabbit_userid = guest
+# Location of a virtual RabbitMQ installation.
+# rabbit_virtual_host = /
+# Maximum retries with trying to connect to RabbitMQ
+# (the default of 0 implies an infinite retry count)
+# rabbit_max_retries = 0
+# RabbitMQ connection retry interval
+# rabbit_retry_interval = 1
+
+# QPID
+# rpc_backend=quantum.openstack.common.rpc.impl_qpid
+# Qpid broker hostname
+# qpid_hostname = localhost
+# Qpid broker port
+# qpid_port = 5672
+# Username for qpid connection
+# qpid_username = ''
+# Password for qpid connection
+# qpid_password = ''
+# Space separated list of SASL mechanisms to use for auth
+# qpid_sasl_mechanisms = ''
+# Automatically reconnect
+# qpid_reconnect = True
+# Reconnection timeout in seconds
+# qpid_reconnect_timeout = 0
+# Max reconnections before giving up
+# qpid_reconnect_limit = 0
+# Minimum seconds between reconnection attempts
+# qpid_reconnect_interval_min = 0
+# Maximum seconds between reconnection attempts
+# qpid_reconnect_interval_max = 0
+# Equivalent to setting max and min to the same value
+# qpid_reconnect_interval = 0
+# Seconds between connection keepalive heartbeats
+# qpid_heartbeat = 5
+# Transport to use, either 'tcp' or 'ssl'
+# qpid_protocol = tcp
+# Disable Nagle algorithm
+# qpid_tcp_nodelay = True
+
+# ZMQ
+# rpc_backend=quantum.openstack.common.rpc.impl_zmq
+# ZeroMQ bind address. Should be a wildcard (*), an ethernet interface, or IP.
+# The "host" option should point or resolve to this address.
+# rpc_zmq_bind_address = *
+# MatchMaker driver
+# rpc_zmq_matchmaker = openstack.common.rpc.matchmaker.MatchMakerLocalhost
+# ZeroMQ receiver listening port
+# rpc_zmq_port = 9501
+# Number of ZeroMQ contexts, defaults to 1
+# rpc_zmq_contexts = 1
+# Directory for holding IPC sockets
+# rpc_zmq_ipc_dir = /var/run/openstack
index c093a8e9e5b6682102c031f52ba7aa02d6296b85..f43bb470abe0db12b495d47f84cd37215c264930 100644 (file)
 # limitations under the License.
 
 import logging
+import socket
+
 import webob.exc
 
 from quantum.api.v2 import attributes
 from quantum.api.v2 import resource as wsgi_resource
 from quantum.common import exceptions
 from quantum.common import utils
+from quantum.openstack.common import cfg
+from quantum.openstack.common.notifier import api as notifier_api
 from quantum import policy
 from quantum import quota
 
@@ -40,6 +44,14 @@ FAULT_MAP = {exceptions.NotFound: webob.exc.HTTPNotFound,
 QUOTAS = quota.QUOTAS
 
 
+def _get_hostname():
+    return socket.gethostname()
+
+
+# Register the configuration options
+cfg.CONF.register_opt(cfg.StrOpt('host', default=_get_hostname()))
+
+
 def fields(request):
     """
     Extracts the list of fields to return
@@ -111,6 +123,7 @@ class Controller(object):
         self._policy_attrs = [name for (name, info) in self._attr_info.items()
                               if 'required_by_policy' in info
                               and info['required_by_policy']]
+        self._publisher_id = notifier_api.publisher_id('network')
 
     def _is_visible(self, attr):
         attr_val = self._attr_info.get(attr)
@@ -189,6 +202,11 @@ class Controller(object):
 
     def create(self, request, body=None):
         """Creates a new instance of the requested entity"""
+        notifier_api.notify(request.context,
+                            self._publisher_id,
+                            self._resource + '.create.start',
+                            notifier_api.INFO,
+                            body)
         body = self._prepare_request_body(request.context, body, True,
                                           allow_bulk=True)
         action = "create_%s" % self._resource
@@ -229,10 +247,21 @@ class Controller(object):
         obj_creator = getattr(self._plugin, action)
         kwargs = {self._resource: body}
         obj = obj_creator(request.context, **kwargs)
-        return {self._resource: self._view(obj)}
+        result = {self._resource: self._view(obj)}
+        notifier_api.notify(request.context,
+                            self._publisher_id,
+                            self._resource + '.create.end',
+                            notifier_api.INFO,
+                            result)
+        return result
 
     def delete(self, request, id):
         """Deletes the specified entity"""
+        notifier_api.notify(request.context,
+                            self._publisher_id,
+                            self._resource + '.delete.start',
+                            notifier_api.INFO,
+                            {self._resource + '_id': id})
         action = "delete_%s" % self._resource
 
         # Check authz
@@ -246,9 +275,21 @@ class Controller(object):
 
         obj_deleter = getattr(self._plugin, action)
         obj_deleter(request.context, id)
+        notifier_api.notify(request.context,
+                            self._publisher_id,
+                            self._resource + '.delete.end',
+                            notifier_api.INFO,
+                            {self._resource + '_id': id})
 
     def update(self, request, id, body=None):
         """Updates the specified entity's attributes"""
+        payload = body.copy()
+        payload['id'] = id
+        notifier_api.notify(request.context,
+                            self._publisher_id,
+                            self._resource + '.update.start',
+                            notifier_api.INFO,
+                            payload)
         body = self._prepare_request_body(request.context, body, False)
         action = "update_%s" % self._resource
 
@@ -264,7 +305,13 @@ class Controller(object):
         obj_updater = getattr(self._plugin, action)
         kwargs = {self._resource: body}
         obj = obj_updater(request.context, id, **kwargs)
-        return {self._resource: self._view(obj)}
+        result = {self._resource: self._view(obj)}
+        notifier_api.notify(request.context,
+                            self._publisher_id,
+                            self._resource + '.update.end',
+                            notifier_api.INFO,
+                            result)
+        return result
 
     def _populate_tenant_id(self, context, res_dict, is_create):
 
index ec887bc10404fb915a991188f00ea72c733e929a..1bf680f23b2058e57ba621ad92907fe5bc28be65 100644 (file)
@@ -32,6 +32,7 @@ from quantum import context
 from quantum.extensions.extensions import PluginAwareExtensionManager
 from quantum.manager import QuantumManager
 from quantum.openstack.common import cfg
+from quantum.openstack.common.notifier import api as notifer_api
 
 
 LOG = logging.getLogger(__name__)
@@ -755,6 +756,51 @@ class V2Views(unittest.TestCase):
         self._view(keys, 'subnets', 'subnet')
 
 
+class NotificationTest(APIv2TestBase):
+    def _resource_op_notifier(self, opname, resource, expected_errors=False):
+        initial_input = {resource: {'name': 'myname'}}
+        instance = self.plugin.return_value
+        instance.get_networks.return_value = initial_input
+        expected_code = exc.HTTPCreated.code
+        with mock.patch.object(notifer_api, 'notify') as mynotifier:
+            if opname == 'create':
+                initial_input[resource]['tenant_id'] = _uuid()
+                res = self.api.post_json(
+                    _get_path('networks'), initial_input, expected_errors)
+            if opname == 'update':
+                res = self.api.put_json(
+                    _get_path('networks', id=_uuid()),
+                    initial_input, expect_errors=expected_errors)
+                expected_code = exc.HTTPOk.code
+            if opname == 'delete':
+                initial_input[resource]['tenant_id'] = _uuid()
+                res = self.api.delete(
+                    _get_path('networks', id=_uuid()),
+                    expect_errors=expected_errors)
+                expected_code = exc.HTTPNoContent.code
+            expected = [mock.call(mock.ANY,
+                                  'network.' + cfg.CONF.host,
+                                  resource + "." + opname + ".start",
+                                  'INFO',
+                                  mock.ANY),
+                        mock.call(mock.ANY,
+                                  'network.' + cfg.CONF.host,
+                                  resource + "." + opname + ".end",
+                                  'INFO',
+                                  mock.ANY)]
+            self.assertEqual(expected, mynotifier.call_args_list)
+        self.assertEqual(res.status_int, expected_code)
+
+    def test_network_create_notifer(self):
+        self._resource_op_notifier('create', 'network')
+
+    def test_network_delete_notifer(self):
+        self._resource_op_notifier('delete', 'network')
+
+    def test_network_update_notifer(self):
+        self._resource_op_notifier('update', 'network')
+
+
 class QuotaTest(APIv2TestBase):
     def test_create_network_quota(self):
         cfg.CONF.set_override('quota_network', 1, group='QUOTAS')