]> review.fuel-infra Code Review - openstack-build/cinder-build.git/commitdiff
Fix the multi-backend storge issue for ZMQ.
authorVincent Hou <sbhou@cn.ibm.com>
Wed, 19 Jun 2013 10:48:15 +0000 (18:48 +0800)
committerVincent Hou <sbhou@cn.ibm.com>
Fri, 26 Jul 2013 04:05:41 +0000 (00:05 -0400)
This issue is not caused by the naming convention as described in Bug 1166899,
but due to the incorrect node_topic registration in ZMQ.

What have been done in this patch:
*Import the latest impl_zmq.py from oslo.
*Change the delimiter from "." to ":" between topic and host. "." will make
node_topic an invalid key for the registration in ZMQ.
*The node_topic should be registered correctly via
topic = '.'.join((topic, CONF.rpc_zmq_host)) in impl_zmq.py.
*Move init_host() in services.py downstairs to make sure the c-vol can be
launched successfully for ZMQ.

Fixed Bug 1166899.

Change-Id: Id982ab9482f08d69bdc68d389fb41a7752efa168

cinder/openstack/common/rpc/__init__.py
cinder/openstack/common/rpc/impl_zmq.py
cinder/service.py
cinder/tests/test_volume_rpcapi.py

index 3ffce8332599cefe75bb40527a43ff3a1f59d7e3..ff131c1132796e61b1c83a28238538073b32e532 100644 (file)
@@ -287,7 +287,7 @@ def queue_get_for(context, topic, host):
     Messages sent to the 'foo.<host>' topic are sent to the nova-foo service on
     <host>.
     """
-    return '%s.%s' % (topic, host) if host else topic
+    return '%s:%s' % (topic, host) if host else topic
 
 
 _RPCIMPL = None
index d3d3599e8f450d3bee02cfc46618e1bb9698c04c..1d4aab768c7b89cf3f51f828b6b968434ae39ab9 100644 (file)
@@ -30,7 +30,6 @@ from cinder.openstack.common import excutils
 from cinder.openstack.common.gettextutils import _
 from cinder.openstack.common import importutils
 from cinder.openstack.common import jsonutils
-from cinder.openstack.common import processutils as utils
 from cinder.openstack.common.rpc import common as rpc_common
 
 zmq = importutils.try_import('eventlet.green.zmq')
@@ -85,8 +84,8 @@ matchmaker = None  # memoized matchmaker object
 
 
 def _serialize(data):
-    """
-    Serialization wrapper
+    """Serialization wrapper.
+
     We prefer using JSON, but it cannot encode all types.
     Error if a developer passes us bad data.
     """
@@ -98,18 +97,15 @@ def _serialize(data):
 
 
 def _deserialize(data):
-    """
-    Deserialization wrapper
-    """
+    """Deserialization wrapper."""
     LOG.debug(_("Deserializing: %s"), data)
     return jsonutils.loads(data)
 
 
 class ZmqSocket(object):
-    """
-    A tiny wrapper around ZeroMQ to simplify the send/recv protocol
-    and connection management.
+    """A tiny wrapper around ZeroMQ.
 
+    Simplifies the send/recv protocol and connection management.
     Can be used as a Context (supports the 'with' statement).
     """
 
@@ -180,7 +176,7 @@ class ZmqSocket(object):
             return
 
         # We must unsubscribe, or we'll leak descriptors.
-        if len(self.subscriptions) > 0:
+        if self.subscriptions:
             for f in self.subscriptions:
                 try:
                     self.sock.setsockopt(zmq.UNSUBSCRIBE, f)
@@ -199,26 +195,24 @@ class ZmqSocket(object):
             LOG.error("ZeroMQ socket could not be closed.")
         self.sock = None
 
-    def recv(self):
+    def recv(self, **kwargs):
         if not self.can_recv:
             raise RPCException(_("You cannot recv on this socket."))
-        return self.sock.recv_multipart()
+        return self.sock.recv_multipart(**kwargs)
 
-    def send(self, data):
+    def send(self, data, **kwargs):
         if not self.can_send:
             raise RPCException(_("You cannot send on this socket."))
-        self.sock.send_multipart(data)
+        self.sock.send_multipart(data, **kwargs)
 
 
 class ZmqClient(object):
     """Client for ZMQ sockets."""
 
-    def __init__(self, addr, socket_type=None, bind=False):
-        if socket_type is None:
-            socket_type = zmq.PUSH
-        self.outq = ZmqSocket(addr, socket_type, bind=bind)
+    def __init__(self, addr):
+        self.outq = ZmqSocket(addr, zmq.PUSH, bind=False)
 
-    def cast(self, msg_id, topic, data, envelope=False):
+    def cast(self, msg_id, topic, data, envelope):
         msg_id = msg_id or 0
 
         if not envelope:
@@ -282,7 +276,7 @@ class InternalContext(object):
         except greenlet.GreenletExit:
             # ignore these since they are just from shutdowns
             pass
-        except rpc_common.ClientException, e:
+        except rpc_common.ClientException as e:
             LOG.debug(_("Expected exception during message handling (%s)") %
                       e._exc_info[1])
             return {'exc':
@@ -356,16 +350,14 @@ class ConsumerBase(object):
 
 
 class ZmqBaseReactor(ConsumerBase):
-    """
-    A consumer class implementing a
-    centralized casting broker (PULL-PUSH)
-    for RoundRobin requests.
+    """A consumer class implementing a centralized casting broker (PULL-PUSH).
+
+    Used for RoundRobin requests.
     """
 
     def __init__(self, conf):
         super(ZmqBaseReactor, self).__init__()
 
-        self.mapping = {}
         self.proxies = {}
         self.threads = []
         self.sockets = []
@@ -373,9 +365,8 @@ class ZmqBaseReactor(ConsumerBase):
 
         self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size)
 
-    def register(self, proxy, in_addr, zmq_type_in, out_addr=None,
-                 zmq_type_out=None, in_bind=True, out_bind=True,
-                 subscribe=None):
+    def register(self, proxy, in_addr, zmq_type_in,
+                 in_bind=True, subscribe=None):
 
         LOG.info(_("Registering reactor"))
 
@@ -391,21 +382,6 @@ class ZmqBaseReactor(ConsumerBase):
 
         LOG.info(_("In reactor registered"))
 
-        if not out_addr:
-            return
-
-        if zmq_type_out not in (zmq.PUSH, zmq.PUB):
-            raise RPCException("Bad output socktype")
-
-        # Items push out.
-        outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind)
-
-        self.mapping[inq] = outq
-        self.mapping[outq] = inq
-        self.sockets.append(outq)
-
-        LOG.info(_("Out reactor registered"))
-
     def consume_in_thread(self):
         def _consume(sock):
             LOG.info(_("Consuming socket"))
@@ -430,10 +406,9 @@ class ZmqBaseReactor(ConsumerBase):
 
 
 class ZmqProxy(ZmqBaseReactor):
-    """
-    A consumer class implementing a
-    topic-based proxy, forwarding to
-    IPC sockets.
+    """A consumer class implementing a topic-based proxy.
+
+    Forwards to IPC sockets.
     """
 
     def __init__(self, conf):
@@ -446,11 +421,8 @@ class ZmqProxy(ZmqBaseReactor):
     def consume(self, sock):
         ipc_dir = CONF.rpc_zmq_ipc_dir
 
-        #TODO(ewindisch): use zero-copy (i.e. references, not copying)
-        data = sock.recv()
-        topic = data[1]
-
-        LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
+        data = sock.recv(copy=False)
+        topic = data[1].bytes
 
         if topic.startswith('fanout~'):
             sock_type = zmq.PUB
@@ -492,9 +464,7 @@ class ZmqProxy(ZmqBaseReactor):
 
                 while(True):
                     data = self.topic_proxy[topic].get()
-                    out_sock.send(data)
-                    LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
-                              {'data': data})
+                    out_sock.send(data, copy=False)
 
             wait_sock_creation = eventlet.event.Event()
             eventlet.spawn(publisher, wait_sock_creation)
@@ -507,37 +477,34 @@ class ZmqProxy(ZmqBaseReactor):
 
         try:
             self.topic_proxy[topic].put_nowait(data)
-            LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
-                      {'data': data})
         except eventlet.queue.Full:
             LOG.error(_("Local per-topic backlog buffer full for topic "
                         "%(topic)s. Dropping message.") % {'topic': topic})
 
     def consume_in_thread(self):
-        """Runs the ZmqProxy service"""
+        """Runs the ZmqProxy service."""
         ipc_dir = CONF.rpc_zmq_ipc_dir
         consume_in = "tcp://%s:%s" % \
             (CONF.rpc_zmq_bind_address,
              CONF.rpc_zmq_port)
         consumption_proxy = InternalContext(None)
 
-        if not os.path.isdir(ipc_dir):
-            try:
-                utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
-                utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
-                              ipc_dir, run_as_root=True)
-                utils.execute('chmod', '750', ipc_dir, run_as_root=True)
-            except utils.ProcessExecutionError:
+        try:
+            os.makedirs(ipc_dir)
+        except os.error:
+            if not os.path.isdir(ipc_dir):
                 with excutils.save_and_reraise_exception():
-                    LOG.error(_("Could not create IPC directory %s") %
-                              (ipc_dir, ))
-
+                    LOG.error(_("Required IPC directory does not exist at"
+                                " %s") % (ipc_dir, ))
         try:
             self.register(consumption_proxy,
                           consume_in,
-                          zmq.PULL,
-                          out_bind=True)
+                          zmq.PULL)
         except zmq.ZMQError:
+            if os.access(ipc_dir, os.X_OK):
+                with excutils.save_and_reraise_exception():
+                    LOG.error(_("Permission denied to IPC directory at"
+                                " %s") % (ipc_dir, ))
             with excutils.save_and_reraise_exception():
                 LOG.error(_("Could not create ZeroMQ receiver daemon. "
                             "Socket may already be in use."))
@@ -547,8 +514,9 @@ class ZmqProxy(ZmqBaseReactor):
 
 def unflatten_envelope(packenv):
     """Unflattens the RPC envelope.
-       Takes a list and returns a dictionary.
-       i.e. [1,2,3,4] => {1: 2, 3: 4}
+
+    Takes a list and returns a dictionary.
+    i.e. [1,2,3,4] => {1: 2, 3: 4}
     """
     i = iter(packenv)
     h = {}
@@ -561,10 +529,9 @@ def unflatten_envelope(packenv):
 
 
 class ZmqReactor(ZmqBaseReactor):
-    """
-    A consumer class implementing a
-    consumer for messages. Can also be
-    used as a 1:1 proxy
+    """A consumer class implementing a consumer for messages.
+
+    Can also be used as a 1:1 proxy
     """
 
     def __init__(self, conf):
@@ -574,11 +541,6 @@ class ZmqReactor(ZmqBaseReactor):
         #TODO(ewindisch): use zero-copy (i.e. references, not copying)
         data = sock.recv()
         LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
-        if sock in self.mapping:
-            LOG.debug(_("ROUTER RELAY-OUT %(data)s") % {
-                'data': data})
-            self.mapping[sock].send(data)
-            return
 
         proxy = self.proxies[sock]
 
@@ -622,7 +584,7 @@ class Connection(rpc_common.Connection):
         else:
             sock_type = zmq.PULL
             subscribe = None
-            topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
+            topic = '.'.join((topic, CONF.rpc_zmq_host))
 
         if topic in self.topics:
             LOG.info(_("Skipping topic registration. Already registered."))
@@ -751,10 +713,9 @@ def _call(addr, context, topic, msg, timeout=None,
 
 def _multi_send(method, context, topic, msg, timeout=None,
                 envelope=False, _msg_id=None):
-    """
-    Wraps the sending of messages,
-    dispatches to the matchmaker and sends
-    message to all relevant hosts.
+    """Wraps the sending of messages.
+
+    Dispatches to the matchmaker and sends message to all relevant hosts.
     """
     conf = CONF
     LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
@@ -763,7 +724,7 @@ def _multi_send(method, context, topic, msg, timeout=None,
     LOG.debug(_("Sending message(s) to: %s"), queues)
 
     # Don't stack if we have no matchmaker results
-    if len(queues) == 0:
+    if not queues:
         LOG.warn(_("No matchmaker results. Not casting."))
         # While not strictly a timeout, callers know how to handle
         # this exception and a timeout isn't too big a lie.
@@ -807,12 +768,14 @@ def fanout_cast(conf, context, topic, msg, **kwargs):
     """Send a message to all listening and expect no reply."""
     # NOTE(ewindisch): fanout~ is used because it avoid splitting on .
     # and acts as a non-subtle hint to the matchmaker and ZmqProxy.
-    _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
+    LOG.error(_('topic is %s.') % topic)
+    if topic:
+        _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
 
 
 def notify(conf, context, topic, msg, envelope):
-    """
-    Send notification event.
+    """Send notification event.
+
     Notifications are sent to topic-priority.
     This differs from the AMQP drivers which send to topic.priority.
     """
@@ -846,6 +809,11 @@ def _get_ctxt():
 def _get_matchmaker(*args, **kwargs):
     global matchmaker
     if not matchmaker:
-        matchmaker = importutils.import_object(
-            CONF.rpc_zmq_matchmaker, *args, **kwargs)
+        mm = CONF.rpc_zmq_matchmaker
+        if mm.endswith('matchmaker.MatchMakerRing'):
+            mm.replace('matchmaker', 'matchmaker_ring')
+            LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use'
+                       ' %(new)s instead') % dict(
+                     orig=CONF.rpc_zmq_matchmaker, new=mm))
+        matchmaker = importutils.import_object(mm, *args, **kwargs)
     return matchmaker
index 7eed1e3382e40f76f06dfaa2e0d0382b5190ed52..73e3027ea01dd6285fc9d379d499fac7e3e110ca 100644 (file)
@@ -356,7 +356,6 @@ class Service(object):
         version_string = version.version_string()
         LOG.audit(_('Starting %(topic)s node (version %(version_string)s)'),
                   {'topic': self.topic, 'version_string': version_string})
-        self.manager.init_host()
         self.model_disconnected = False
         ctxt = context.get_admin_context()
         try:
@@ -376,13 +375,14 @@ class Service(object):
         # Share this same connection for these Consumers
         self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
 
-        node_topic = '%s.%s' % (self.topic, self.host)
+        node_topic = '%s:%s' % (self.topic, self.host)
         self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
 
         self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
 
         # Consume from all consumers in a thread
         self.conn.consume_in_thread()
+        self.manager.init_host()
 
         if self.report_interval:
             pulse = utils.LoopingCall(self.report_state)
index 2239396739adc17d12c8db5ea07219fb025fb513..79916b97ef47281b8f591cf3dd0818250b0a3c1a 100644 (file)
@@ -94,7 +94,7 @@ class VolumeRpcAPITestCase(test.TestCase):
             host = kwargs['host']
         else:
             host = kwargs['volume']['host']
-        expected_topic = '%s.%s' % (CONF.volume_topic, host)
+        expected_topic = '%s:%s' % (CONF.volume_topic, host)
 
         self.fake_args = None
         self.fake_kwargs = None