From: Vincent Hou Date: Wed, 19 Jun 2013 10:48:15 +0000 (+0800) Subject: Fix the multi-backend storge issue for ZMQ. X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=a4f6ab0f3d9710713cd58ed9dddf12a8d8b2160b;p=openstack-build%2Fcinder-build.git Fix the multi-backend storge issue for ZMQ. This issue is not caused by the naming convention as described in Bug 1166899, but due to the incorrect node_topic registration in ZMQ. What have been done in this patch: *Import the latest impl_zmq.py from oslo. *Change the delimiter from "." to ":" between topic and host. "." will make node_topic an invalid key for the registration in ZMQ. *The node_topic should be registered correctly via topic = '.'.join((topic, CONF.rpc_zmq_host)) in impl_zmq.py. *Move init_host() in services.py downstairs to make sure the c-vol can be launched successfully for ZMQ. Fixed Bug 1166899. Change-Id: Id982ab9482f08d69bdc68d389fb41a7752efa168 --- diff --git a/cinder/openstack/common/rpc/__init__.py b/cinder/openstack/common/rpc/__init__.py index 3ffce8332..ff131c113 100644 --- a/cinder/openstack/common/rpc/__init__.py +++ b/cinder/openstack/common/rpc/__init__.py @@ -287,7 +287,7 @@ def queue_get_for(context, topic, host): Messages sent to the 'foo.' topic are sent to the nova-foo service on . """ - return '%s.%s' % (topic, host) if host else topic + return '%s:%s' % (topic, host) if host else topic _RPCIMPL = None diff --git a/cinder/openstack/common/rpc/impl_zmq.py b/cinder/openstack/common/rpc/impl_zmq.py index d3d3599e8..1d4aab768 100644 --- a/cinder/openstack/common/rpc/impl_zmq.py +++ b/cinder/openstack/common/rpc/impl_zmq.py @@ -30,7 +30,6 @@ from cinder.openstack.common import excutils from cinder.openstack.common.gettextutils import _ from cinder.openstack.common import importutils from cinder.openstack.common import jsonutils -from cinder.openstack.common import processutils as utils from cinder.openstack.common.rpc import common as rpc_common zmq = importutils.try_import('eventlet.green.zmq') @@ -85,8 +84,8 @@ matchmaker = None # memoized matchmaker object def _serialize(data): - """ - Serialization wrapper + """Serialization wrapper. + We prefer using JSON, but it cannot encode all types. Error if a developer passes us bad data. """ @@ -98,18 +97,15 @@ def _serialize(data): def _deserialize(data): - """ - Deserialization wrapper - """ + """Deserialization wrapper.""" LOG.debug(_("Deserializing: %s"), data) return jsonutils.loads(data) class ZmqSocket(object): - """ - A tiny wrapper around ZeroMQ to simplify the send/recv protocol - and connection management. + """A tiny wrapper around ZeroMQ. + Simplifies the send/recv protocol and connection management. Can be used as a Context (supports the 'with' statement). """ @@ -180,7 +176,7 @@ class ZmqSocket(object): return # We must unsubscribe, or we'll leak descriptors. - if len(self.subscriptions) > 0: + if self.subscriptions: for f in self.subscriptions: try: self.sock.setsockopt(zmq.UNSUBSCRIBE, f) @@ -199,26 +195,24 @@ class ZmqSocket(object): LOG.error("ZeroMQ socket could not be closed.") self.sock = None - def recv(self): + def recv(self, **kwargs): if not self.can_recv: raise RPCException(_("You cannot recv on this socket.")) - return self.sock.recv_multipart() + return self.sock.recv_multipart(**kwargs) - def send(self, data): + def send(self, data, **kwargs): if not self.can_send: raise RPCException(_("You cannot send on this socket.")) - self.sock.send_multipart(data) + self.sock.send_multipart(data, **kwargs) class ZmqClient(object): """Client for ZMQ sockets.""" - def __init__(self, addr, socket_type=None, bind=False): - if socket_type is None: - socket_type = zmq.PUSH - self.outq = ZmqSocket(addr, socket_type, bind=bind) + def __init__(self, addr): + self.outq = ZmqSocket(addr, zmq.PUSH, bind=False) - def cast(self, msg_id, topic, data, envelope=False): + def cast(self, msg_id, topic, data, envelope): msg_id = msg_id or 0 if not envelope: @@ -282,7 +276,7 @@ class InternalContext(object): except greenlet.GreenletExit: # ignore these since they are just from shutdowns pass - except rpc_common.ClientException, e: + except rpc_common.ClientException as e: LOG.debug(_("Expected exception during message handling (%s)") % e._exc_info[1]) return {'exc': @@ -356,16 +350,14 @@ class ConsumerBase(object): class ZmqBaseReactor(ConsumerBase): - """ - A consumer class implementing a - centralized casting broker (PULL-PUSH) - for RoundRobin requests. + """A consumer class implementing a centralized casting broker (PULL-PUSH). + + Used for RoundRobin requests. """ def __init__(self, conf): super(ZmqBaseReactor, self).__init__() - self.mapping = {} self.proxies = {} self.threads = [] self.sockets = [] @@ -373,9 +365,8 @@ class ZmqBaseReactor(ConsumerBase): self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size) - def register(self, proxy, in_addr, zmq_type_in, out_addr=None, - zmq_type_out=None, in_bind=True, out_bind=True, - subscribe=None): + def register(self, proxy, in_addr, zmq_type_in, + in_bind=True, subscribe=None): LOG.info(_("Registering reactor")) @@ -391,21 +382,6 @@ class ZmqBaseReactor(ConsumerBase): LOG.info(_("In reactor registered")) - if not out_addr: - return - - if zmq_type_out not in (zmq.PUSH, zmq.PUB): - raise RPCException("Bad output socktype") - - # Items push out. - outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind) - - self.mapping[inq] = outq - self.mapping[outq] = inq - self.sockets.append(outq) - - LOG.info(_("Out reactor registered")) - def consume_in_thread(self): def _consume(sock): LOG.info(_("Consuming socket")) @@ -430,10 +406,9 @@ class ZmqBaseReactor(ConsumerBase): class ZmqProxy(ZmqBaseReactor): - """ - A consumer class implementing a - topic-based proxy, forwarding to - IPC sockets. + """A consumer class implementing a topic-based proxy. + + Forwards to IPC sockets. """ def __init__(self, conf): @@ -446,11 +421,8 @@ class ZmqProxy(ZmqBaseReactor): def consume(self, sock): ipc_dir = CONF.rpc_zmq_ipc_dir - #TODO(ewindisch): use zero-copy (i.e. references, not copying) - data = sock.recv() - topic = data[1] - - LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) + data = sock.recv(copy=False) + topic = data[1].bytes if topic.startswith('fanout~'): sock_type = zmq.PUB @@ -492,9 +464,7 @@ class ZmqProxy(ZmqBaseReactor): while(True): data = self.topic_proxy[topic].get() - out_sock.send(data) - LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % - {'data': data}) + out_sock.send(data, copy=False) wait_sock_creation = eventlet.event.Event() eventlet.spawn(publisher, wait_sock_creation) @@ -507,37 +477,34 @@ class ZmqProxy(ZmqBaseReactor): try: self.topic_proxy[topic].put_nowait(data) - LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") % - {'data': data}) except eventlet.queue.Full: LOG.error(_("Local per-topic backlog buffer full for topic " "%(topic)s. Dropping message.") % {'topic': topic}) def consume_in_thread(self): - """Runs the ZmqProxy service""" + """Runs the ZmqProxy service.""" ipc_dir = CONF.rpc_zmq_ipc_dir consume_in = "tcp://%s:%s" % \ (CONF.rpc_zmq_bind_address, CONF.rpc_zmq_port) consumption_proxy = InternalContext(None) - if not os.path.isdir(ipc_dir): - try: - utils.execute('mkdir', '-p', ipc_dir, run_as_root=True) - utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()), - ipc_dir, run_as_root=True) - utils.execute('chmod', '750', ipc_dir, run_as_root=True) - except utils.ProcessExecutionError: + try: + os.makedirs(ipc_dir) + except os.error: + if not os.path.isdir(ipc_dir): with excutils.save_and_reraise_exception(): - LOG.error(_("Could not create IPC directory %s") % - (ipc_dir, )) - + LOG.error(_("Required IPC directory does not exist at" + " %s") % (ipc_dir, )) try: self.register(consumption_proxy, consume_in, - zmq.PULL, - out_bind=True) + zmq.PULL) except zmq.ZMQError: + if os.access(ipc_dir, os.X_OK): + with excutils.save_and_reraise_exception(): + LOG.error(_("Permission denied to IPC directory at" + " %s") % (ipc_dir, )) with excutils.save_and_reraise_exception(): LOG.error(_("Could not create ZeroMQ receiver daemon. " "Socket may already be in use.")) @@ -547,8 +514,9 @@ class ZmqProxy(ZmqBaseReactor): def unflatten_envelope(packenv): """Unflattens the RPC envelope. - Takes a list and returns a dictionary. - i.e. [1,2,3,4] => {1: 2, 3: 4} + + Takes a list and returns a dictionary. + i.e. [1,2,3,4] => {1: 2, 3: 4} """ i = iter(packenv) h = {} @@ -561,10 +529,9 @@ def unflatten_envelope(packenv): class ZmqReactor(ZmqBaseReactor): - """ - A consumer class implementing a - consumer for messages. Can also be - used as a 1:1 proxy + """A consumer class implementing a consumer for messages. + + Can also be used as a 1:1 proxy """ def __init__(self, conf): @@ -574,11 +541,6 @@ class ZmqReactor(ZmqBaseReactor): #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data) - if sock in self.mapping: - LOG.debug(_("ROUTER RELAY-OUT %(data)s") % { - 'data': data}) - self.mapping[sock].send(data) - return proxy = self.proxies[sock] @@ -622,7 +584,7 @@ class Connection(rpc_common.Connection): else: sock_type = zmq.PULL subscribe = None - topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host)) + topic = '.'.join((topic, CONF.rpc_zmq_host)) if topic in self.topics: LOG.info(_("Skipping topic registration. Already registered.")) @@ -751,10 +713,9 @@ def _call(addr, context, topic, msg, timeout=None, def _multi_send(method, context, topic, msg, timeout=None, envelope=False, _msg_id=None): - """ - Wraps the sending of messages, - dispatches to the matchmaker and sends - message to all relevant hosts. + """Wraps the sending of messages. + + Dispatches to the matchmaker and sends message to all relevant hosts. """ conf = CONF LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))}) @@ -763,7 +724,7 @@ def _multi_send(method, context, topic, msg, timeout=None, LOG.debug(_("Sending message(s) to: %s"), queues) # Don't stack if we have no matchmaker results - if len(queues) == 0: + if not queues: LOG.warn(_("No matchmaker results. Not casting.")) # While not strictly a timeout, callers know how to handle # this exception and a timeout isn't too big a lie. @@ -807,12 +768,14 @@ def fanout_cast(conf, context, topic, msg, **kwargs): """Send a message to all listening and expect no reply.""" # NOTE(ewindisch): fanout~ is used because it avoid splitting on . # and acts as a non-subtle hint to the matchmaker and ZmqProxy. - _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs) + LOG.error(_('topic is %s.') % topic) + if topic: + _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs) def notify(conf, context, topic, msg, envelope): - """ - Send notification event. + """Send notification event. + Notifications are sent to topic-priority. This differs from the AMQP drivers which send to topic.priority. """ @@ -846,6 +809,11 @@ def _get_ctxt(): def _get_matchmaker(*args, **kwargs): global matchmaker if not matchmaker: - matchmaker = importutils.import_object( - CONF.rpc_zmq_matchmaker, *args, **kwargs) + mm = CONF.rpc_zmq_matchmaker + if mm.endswith('matchmaker.MatchMakerRing'): + mm.replace('matchmaker', 'matchmaker_ring') + LOG.warn(_('rpc_zmq_matchmaker = %(orig)s is deprecated; use' + ' %(new)s instead') % dict( + orig=CONF.rpc_zmq_matchmaker, new=mm)) + matchmaker = importutils.import_object(mm, *args, **kwargs) return matchmaker diff --git a/cinder/service.py b/cinder/service.py index 7eed1e338..73e3027ea 100644 --- a/cinder/service.py +++ b/cinder/service.py @@ -356,7 +356,6 @@ class Service(object): version_string = version.version_string() LOG.audit(_('Starting %(topic)s node (version %(version_string)s)'), {'topic': self.topic, 'version_string': version_string}) - self.manager.init_host() self.model_disconnected = False ctxt = context.get_admin_context() try: @@ -376,13 +375,14 @@ class Service(object): # Share this same connection for these Consumers self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False) - node_topic = '%s.%s' % (self.topic, self.host) + node_topic = '%s:%s' % (self.topic, self.host) self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False) self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True) # Consume from all consumers in a thread self.conn.consume_in_thread() + self.manager.init_host() if self.report_interval: pulse = utils.LoopingCall(self.report_state) diff --git a/cinder/tests/test_volume_rpcapi.py b/cinder/tests/test_volume_rpcapi.py index 223939673..79916b97e 100644 --- a/cinder/tests/test_volume_rpcapi.py +++ b/cinder/tests/test_volume_rpcapi.py @@ -94,7 +94,7 @@ class VolumeRpcAPITestCase(test.TestCase): host = kwargs['host'] else: host = kwargs['volume']['host'] - expected_topic = '%s.%s' % (CONF.volume_topic, host) + expected_topic = '%s:%s' % (CONF.volume_topic, host) self.fake_args = None self.fake_kwargs = None