From: Michael Still Date: Thu, 18 Oct 2012 18:43:12 +0000 (-0700) Subject: Update common. X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=1ea9588c4df4ec5d36c2d56c66204138d6b68361;p=openstack-build%2Fcinder-build.git Update common. policy.py is not updated as it causes test failures. I will investigate that separately. Change-Id: I9936137c1ab5b368ba6c3c35c423568b38bf913f --- diff --git a/cinder/openstack/common/rpc/impl_zmq.py b/cinder/openstack/common/rpc/impl_zmq.py index 570431778..f3f3b9e90 100644 --- a/cinder/openstack/common/rpc/impl_zmq.py +++ b/cinder/openstack/common/rpc/impl_zmq.py @@ -58,9 +58,6 @@ zmq_opts = [ cfg.IntOpt('rpc_zmq_port', default=9501, help='ZeroMQ receiver listening port'), - cfg.IntOpt('rpc_zmq_port_pub', default=9502, - help='ZeroMQ fanout publisher port'), - cfg.IntOpt('rpc_zmq_contexts', default=1, help='Number of ZeroMQ contexts, defaults to 1'), @@ -209,7 +206,7 @@ class ZmqClient(object): self.outq = ZmqSocket(addr, socket_type, bind=bind) def cast(self, msg_id, topic, data): - self.outq.send([str(topic), str(msg_id), str('cast'), + self.outq.send([str(msg_id), str(topic), str('cast'), _serialize(data)]) def close(self): @@ -302,9 +299,6 @@ class ConsumerBase(object): else: return [result] - def consume(self, sock): - raise NotImplementedError() - def process(self, style, target, proxy, ctx, data): # Method starting with - are # processed internally. (non-valid method name) @@ -417,17 +411,12 @@ class ZmqProxy(ZmqBaseReactor): zmq.PUB, bind=True) self.sockets.append(self.topic_proxy['zmq_replies']) - self.topic_proxy['fanout~'] = \ - ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address, - CONF.rpc_zmq_port_pub), zmq.PUB, bind=True) - self.sockets.append(self.topic_proxy['fanout~']) - def consume(self, sock): ipc_dir = CONF.rpc_zmq_ipc_dir #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() - topic, msg_id, style, in_msg = data + msg_id, topic, style, in_msg = data topic = topic.split('.', 1)[0] LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) @@ -435,11 +424,6 @@ class ZmqProxy(ZmqBaseReactor): # Handle zmq_replies magic if topic.startswith('fanout~'): sock_type = zmq.PUB - - # This doesn't change what is in the message, - # it only specifies that these messages go to - # the generic fanout topic. - topic = 'fanout~' elif topic.startswith('zmq_replies'): sock_type = zmq.PUB inside = _deserialize(in_msg) @@ -450,32 +434,23 @@ class ZmqProxy(ZmqBaseReactor): else: sock_type = zmq.PUSH - if not topic in self.topic_proxy: - outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), - sock_type, bind=True) - self.topic_proxy[topic] = outq - self.sockets.append(outq) - LOG.info(_("Created topic proxy: %s"), topic) + if not topic in self.topic_proxy: + outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), + sock_type, bind=True) + self.topic_proxy[topic] = outq + self.sockets.append(outq) + LOG.info(_("Created topic proxy: %s"), topic) + + # It takes some time for a pub socket to open, + # before we can have any faith in doing a send() to it. + if sock_type == zmq.PUB: + eventlet.sleep(.5) LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) self.topic_proxy[topic].send(data) LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data}) -class CallbackReactor(ZmqBaseReactor): - """ - A consumer class passing messages to a callback - """ - - def __init__(self, conf, callback): - self._cb = callback - super(CallbackReactor, self).__init__(conf) - - def consume(self, sock): - data = sock.recv() - self._cb(data[3]) - - class ZmqReactor(ZmqBaseReactor): """ A consumer class implementing a @@ -496,7 +471,7 @@ class ZmqReactor(ZmqBaseReactor): self.mapping[sock].send(data) return - topic, msg_id, style, in_msg = data + msg_id, topic, style, in_msg = data ctx, request = _deserialize(in_msg) ctx = RpcContext.unmarshal(ctx) @@ -513,26 +488,6 @@ class Connection(rpc_common.Connection): def __init__(self, conf): self.reactor = ZmqReactor(conf) - def _consume_fanout(self, reactor, topic, proxy, bind=False): - for topic, host in matchmaker.queues("publishers~%s" % (topic, )): - inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port) - reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind) - - def declare_topic_consumer(self, topic, callback=None, - queue_name=None): - """declare_topic_consumer is a private method, but - it is being used by Quantum (Folsom). - This has been added compatibility. - """ - # Only consume on the base topic name. - topic = topic.split('.', 1)[0] - - if CONF.rpc_zmq_host in matchmaker.queues("fanout~%s" % (topic, )): - return - - reactor = CallbackReactor(CONF, callback) - self._consume_fanout(reactor, topic, None, bind=False) - def create_consumer(self, topic, proxy, fanout=False): # Only consume on the base topic name. topic = topic.split('.', 1)[0] @@ -540,35 +495,22 @@ class Connection(rpc_common.Connection): LOG.info(_("Create Consumer for topic (%(topic)s)") % {'topic': topic}) - # Consume direct-push fanout messages (relay to local consumers) + # Subscription scenarios if fanout: - # If we're not in here, we can't receive direct fanout messages - if CONF.rpc_zmq_host in matchmaker.queues(topic): - # Consume from all remote publishers. - self._consume_fanout(self.reactor, topic, proxy) - else: - LOG.warn("This service cannot receive direct PUSH fanout " - "messages without being known by the matchmaker.") - return - - # Configure consumer for direct pushes. - subscribe = (topic, fanout)[type(fanout) == str] + subscribe = ('', fanout)[type(fanout) == str] sock_type = zmq.SUB topic = 'fanout~' + topic - - inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, ) else: sock_type = zmq.PULL subscribe = None - # Receive messages from (local) proxy - inaddr = "ipc://%s/zmq_topic_%s" % \ - (CONF.rpc_zmq_ipc_dir, topic) + # Receive messages from (local) proxy + inaddr = "ipc://%s/zmq_topic_%s" % \ + (CONF.rpc_zmq_ipc_dir, topic) LOG.debug(_("Consumer is a zmq.%s"), ['PULL', 'SUB'][sock_type == zmq.SUB]) - # Consume messages from local rpc-zmq-receiver daemon. self.reactor.register(proxy, inaddr, sock_type, subscribe=subscribe, in_bind=False) diff --git a/cinder/openstack/common/rpc/matchmaker.py b/cinder/openstack/common/rpc/matchmaker.py index ffe4870aa..2c0aac5bb 100644 --- a/cinder/openstack/common/rpc/matchmaker.py +++ b/cinder/openstack/common/rpc/matchmaker.py @@ -132,14 +132,6 @@ class FanoutBinding(Binding): return False -class PublisherBinding(Binding): - """Match on publishers keys, where key starts with 'publishers.' string.""" - def test(self, key): - if key.startswith('publishers~'): - return True - return False - - class StubExchange(Exchange): """Exchange that does nothing.""" def run(self, key): @@ -190,23 +182,6 @@ class RoundRobinRingExchange(RingExchange): return [(key + '.' + host, host)] -class PublisherRingExchange(RingExchange): - """Fanout Exchange based on a hashmap.""" - def __init__(self, ring=None): - super(PublisherRingExchange, self).__init__(ring) - - def run(self, key): - # Assume starts with "publishers~", strip it for lookup. - nkey = key.split('publishers~')[1:][0] - if not self._ring_has(nkey): - LOG.warn( - _("No key defining hosts for topic '%s', " - "see ringfile") % (nkey, ) - ) - return [] - return map(lambda x: (key + '.' + x, x), self.ring[nkey]) - - class FanoutRingExchange(RingExchange): """Fanout Exchange based on a hashmap.""" def __init__(self, ring=None): @@ -221,8 +196,7 @@ class FanoutRingExchange(RingExchange): "see ringfile") % (nkey, ) ) return [] - return map(lambda x: (key + '.' + x, x), self.ring[nkey] + - ['localhost']) + return map(lambda x: (key + '.' + x, x), self.ring[nkey]) class LocalhostExchange(Exchange): @@ -253,7 +227,6 @@ class MatchMakerRing(MatchMakerBase): """ def __init__(self, ring=None): super(MatchMakerRing, self).__init__() - self.add_binding(PublisherBinding(), PublisherRingExchange(ring)) self.add_binding(FanoutBinding(), FanoutRingExchange(ring)) self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(TopicBinding(), RoundRobinRingExchange(ring)) @@ -266,7 +239,6 @@ class MatchMakerLocalhost(MatchMakerBase): """ def __init__(self): super(MatchMakerLocalhost, self).__init__() - self.add_binding(PublisherBinding(), LocalhostExchange()) self.add_binding(FanoutBinding(), LocalhostExchange()) self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(TopicBinding(), LocalhostExchange()) @@ -281,7 +253,6 @@ class MatchMakerStub(MatchMakerBase): def __init__(self): super(MatchMakerLocalhost, self).__init__() - self.add_binding(PublisherBinding(), StubExchange()) self.add_binding(FanoutBinding(), StubExchange()) self.add_binding(DirectBinding(), StubExchange()) self.add_binding(TopicBinding(), StubExchange()) diff --git a/cinder/openstack/common/rpc/service.py b/cinder/openstack/common/rpc/service.py index a35fd6ad4..1738b3d77 100644 --- a/cinder/openstack/common/rpc/service.py +++ b/cinder/openstack/common/rpc/service.py @@ -20,6 +20,7 @@ from cinder.openstack.common.gettextutils import _ from cinder.openstack.common import log as logging from cinder.openstack.common import rpc +from cinder.openstack.common.rpc import dispatcher as rpc_dispatcher from cinder.openstack.common import service @@ -46,15 +47,15 @@ class Service(service.Service): LOG.debug(_("Creating Consumer connection for Service %s") % self.topic) - rpc_dispatcher = rpc.dispatcher.RpcDispatcher([self.manager]) + dispatcher = rpc_dispatcher.RpcDispatcher([self.manager]) # Share this same connection for these Consumers - self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False) + self.conn.create_consumer(self.topic, dispatcher, fanout=False) node_topic = '%s.%s' % (self.topic, self.host) - self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False) + self.conn.create_consumer(node_topic, dispatcher, fanout=False) - self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True) + self.conn.create_consumer(self.topic, dispatcher, fanout=True) # Consume from all consumers in a thread self.conn.consume_in_thread() diff --git a/cinder/openstack/common/setup.py b/cinder/openstack/common/setup.py index 628f5e3c9..4e2a57717 100644 --- a/cinder/openstack/common/setup.py +++ b/cinder/openstack/common/setup.py @@ -31,13 +31,13 @@ from setuptools.command import sdist def parse_mailmap(mailmap='.mailmap'): mapping = {} if os.path.exists(mailmap): - fp = open(mailmap, 'r') - for l in fp: - l = l.strip() - if not l.startswith('#') and ' ' in l: - canonical_email, alias = [x for x in l.split(' ') - if x.startswith('<')] - mapping[alias] = canonical_email + with open(mailmap, 'r') as fp: + for l in fp: + l = l.strip() + if not l.startswith('#') and ' ' in l: + canonical_email, alias = [x for x in l.split(' ') + if x.startswith('<')] + mapping[alias] = canonical_email return mapping @@ -54,7 +54,8 @@ def canonicalize_emails(changelog, mapping): def get_reqs_from_files(requirements_files): for requirements_file in requirements_files: if os.path.exists(requirements_file): - return open(requirements_file, 'r').read().split('\n') + with open(requirements_file, 'r') as fil: + return fil.read().split('\n') return [] @@ -191,14 +192,14 @@ def write_git_changelog(): def generate_authors(): """Create AUTHORS file using git commits.""" - jenkins_email = 'jenkins@review.openstack.org' + jenkins_email = 'jenkins@review.(openstack|stackforge).org' old_authors = 'AUTHORS.in' new_authors = 'AUTHORS' if not os.getenv('SKIP_GENERATE_AUTHORS'): if os.path.isdir('.git'): # don't include jenkins email address in AUTHORS file git_log_cmd = ("git log --format='%aN <%aE>' | sort -u | " - "grep -v " + jenkins_email) + "egrep -v '" + jenkins_email + "'") changelog = _run_shell_command(git_log_cmd) mailmap = parse_mailmap() with open(new_authors, 'w') as new_authors_fh: @@ -236,7 +237,8 @@ def read_versioninfo(project): def write_versioninfo(project, version): """Write a simple file containing the version of the package.""" - open(os.path.join(project, 'versioninfo'), 'w').write("%s\n" % version) + with open(os.path.join(project, 'versioninfo'), 'w') as fil: + fil.write("%s\n" % version) def get_cmdclass(): diff --git a/cinder/openstack/common/timeutils.py b/cinder/openstack/common/timeutils.py index 93b34fc5b..86004391d 100644 --- a/cinder/openstack/common/timeutils.py +++ b/cinder/openstack/common/timeutils.py @@ -74,6 +74,11 @@ def is_older_than(before, seconds): return utcnow() - before > datetime.timedelta(seconds=seconds) +def is_newer_than(after, seconds): + """Return True if after is newer than seconds.""" + return after - utcnow() > datetime.timedelta(seconds=seconds) + + def utcnow_ts(): """Timestamp version of our utcnow function.""" return calendar.timegm(utcnow().timetuple())