cfg.IntOpt('rpc_zmq_port', default=9501,
help='ZeroMQ receiver listening port'),
- cfg.IntOpt('rpc_zmq_port_pub', default=9502,
- help='ZeroMQ fanout publisher port'),
-
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data):
- self.outq.send([str(topic), str(msg_id), str('cast'),
+ self.outq.send([str(msg_id), str(topic), str('cast'),
_serialize(data)])
def close(self):
else:
return [result]
- def consume(self, sock):
- raise NotImplementedError()
-
def process(self, style, target, proxy, ctx, data):
# Method starting with - are
# processed internally. (non-valid method name)
zmq.PUB, bind=True)
self.sockets.append(self.topic_proxy['zmq_replies'])
- self.topic_proxy['fanout~'] = \
- ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address,
- CONF.rpc_zmq_port_pub), zmq.PUB, bind=True)
- self.sockets.append(self.topic_proxy['fanout~'])
-
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
- topic, msg_id, style, in_msg = data
+ msg_id, topic, style, in_msg = data
topic = topic.split('.', 1)[0]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
# Handle zmq_replies magic
if topic.startswith('fanout~'):
sock_type = zmq.PUB
-
- # This doesn't change what is in the message,
- # it only specifies that these messages go to
- # the generic fanout topic.
- topic = 'fanout~'
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
inside = _deserialize(in_msg)
else:
sock_type = zmq.PUSH
- if not topic in self.topic_proxy:
- outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
- sock_type, bind=True)
- self.topic_proxy[topic] = outq
- self.sockets.append(outq)
- LOG.info(_("Created topic proxy: %s"), topic)
+ if not topic in self.topic_proxy:
+ outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
+ sock_type, bind=True)
+ self.topic_proxy[topic] = outq
+ self.sockets.append(outq)
+ LOG.info(_("Created topic proxy: %s"), topic)
+
+ # It takes some time for a pub socket to open,
+ # before we can have any faith in doing a send() to it.
+ if sock_type == zmq.PUB:
+ eventlet.sleep(.5)
LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
self.topic_proxy[topic].send(data)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
-class CallbackReactor(ZmqBaseReactor):
- """
- A consumer class passing messages to a callback
- """
-
- def __init__(self, conf, callback):
- self._cb = callback
- super(CallbackReactor, self).__init__(conf)
-
- def consume(self, sock):
- data = sock.recv()
- self._cb(data[3])
-
-
class ZmqReactor(ZmqBaseReactor):
"""
A consumer class implementing a
self.mapping[sock].send(data)
return
- topic, msg_id, style, in_msg = data
+ msg_id, topic, style, in_msg = data
ctx, request = _deserialize(in_msg)
ctx = RpcContext.unmarshal(ctx)
def __init__(self, conf):
self.reactor = ZmqReactor(conf)
- def _consume_fanout(self, reactor, topic, proxy, bind=False):
- for topic, host in matchmaker.queues("publishers~%s" % (topic, )):
- inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port)
- reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind)
-
- def declare_topic_consumer(self, topic, callback=None,
- queue_name=None):
- """declare_topic_consumer is a private method, but
- it is being used by Quantum (Folsom).
- This has been added compatibility.
- """
- # Only consume on the base topic name.
- topic = topic.split('.', 1)[0]
-
- if CONF.rpc_zmq_host in matchmaker.queues("fanout~%s" % (topic, )):
- return
-
- reactor = CallbackReactor(CONF, callback)
- self._consume_fanout(reactor, topic, None, bind=False)
-
def create_consumer(self, topic, proxy, fanout=False):
# Only consume on the base topic name.
topic = topic.split('.', 1)[0]
LOG.info(_("Create Consumer for topic (%(topic)s)") %
{'topic': topic})
- # Consume direct-push fanout messages (relay to local consumers)
+ # Subscription scenarios
if fanout:
- # If we're not in here, we can't receive direct fanout messages
- if CONF.rpc_zmq_host in matchmaker.queues(topic):
- # Consume from all remote publishers.
- self._consume_fanout(self.reactor, topic, proxy)
- else:
- LOG.warn("This service cannot receive direct PUSH fanout "
- "messages without being known by the matchmaker.")
- return
-
- # Configure consumer for direct pushes.
- subscribe = (topic, fanout)[type(fanout) == str]
+ subscribe = ('', fanout)[type(fanout) == str]
sock_type = zmq.SUB
topic = 'fanout~' + topic
-
- inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, )
else:
sock_type = zmq.PULL
subscribe = None
- # Receive messages from (local) proxy
- inaddr = "ipc://%s/zmq_topic_%s" % \
- (CONF.rpc_zmq_ipc_dir, topic)
+ # Receive messages from (local) proxy
+ inaddr = "ipc://%s/zmq_topic_%s" % \
+ (CONF.rpc_zmq_ipc_dir, topic)
LOG.debug(_("Consumer is a zmq.%s"),
['PULL', 'SUB'][sock_type == zmq.SUB])
- # Consume messages from local rpc-zmq-receiver daemon.
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
return False
-class PublisherBinding(Binding):
- """Match on publishers keys, where key starts with 'publishers.' string."""
- def test(self, key):
- if key.startswith('publishers~'):
- return True
- return False
-
-
class StubExchange(Exchange):
"""Exchange that does nothing."""
def run(self, key):
return [(key + '.' + host, host)]
-class PublisherRingExchange(RingExchange):
- """Fanout Exchange based on a hashmap."""
- def __init__(self, ring=None):
- super(PublisherRingExchange, self).__init__(ring)
-
- def run(self, key):
- # Assume starts with "publishers~", strip it for lookup.
- nkey = key.split('publishers~')[1:][0]
- if not self._ring_has(nkey):
- LOG.warn(
- _("No key defining hosts for topic '%s', "
- "see ringfile") % (nkey, )
- )
- return []
- return map(lambda x: (key + '.' + x, x), self.ring[nkey])
-
-
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
"see ringfile") % (nkey, )
)
return []
- return map(lambda x: (key + '.' + x, x), self.ring[nkey] +
- ['localhost'])
+ return map(lambda x: (key + '.' + x, x), self.ring[nkey])
class LocalhostExchange(Exchange):
"""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
- self.add_binding(PublisherBinding(), PublisherRingExchange(ring))
self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
"""
def __init__(self):
super(MatchMakerLocalhost, self).__init__()
- self.add_binding(PublisherBinding(), LocalhostExchange())
self.add_binding(FanoutBinding(), LocalhostExchange())
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), LocalhostExchange())
def __init__(self):
super(MatchMakerLocalhost, self).__init__()
- self.add_binding(PublisherBinding(), StubExchange())
self.add_binding(FanoutBinding(), StubExchange())
self.add_binding(DirectBinding(), StubExchange())
self.add_binding(TopicBinding(), StubExchange())
def parse_mailmap(mailmap='.mailmap'):
mapping = {}
if os.path.exists(mailmap):
- fp = open(mailmap, 'r')
- for l in fp:
- l = l.strip()
- if not l.startswith('#') and ' ' in l:
- canonical_email, alias = [x for x in l.split(' ')
- if x.startswith('<')]
- mapping[alias] = canonical_email
+ with open(mailmap, 'r') as fp:
+ for l in fp:
+ l = l.strip()
+ if not l.startswith('#') and ' ' in l:
+ canonical_email, alias = [x for x in l.split(' ')
+ if x.startswith('<')]
+ mapping[alias] = canonical_email
return mapping
def get_reqs_from_files(requirements_files):
for requirements_file in requirements_files:
if os.path.exists(requirements_file):
- return open(requirements_file, 'r').read().split('\n')
+ with open(requirements_file, 'r') as fil:
+ return fil.read().split('\n')
return []
def generate_authors():
"""Create AUTHORS file using git commits."""
- jenkins_email = 'jenkins@review.openstack.org'
+ jenkins_email = 'jenkins@review.(openstack|stackforge).org'
old_authors = 'AUTHORS.in'
new_authors = 'AUTHORS'
if not os.getenv('SKIP_GENERATE_AUTHORS'):
if os.path.isdir('.git'):
# don't include jenkins email address in AUTHORS file
git_log_cmd = ("git log --format='%aN <%aE>' | sort -u | "
- "grep -v " + jenkins_email)
+ "egrep -v '" + jenkins_email + "'")
changelog = _run_shell_command(git_log_cmd)
mailmap = parse_mailmap()
with open(new_authors, 'w') as new_authors_fh:
def write_versioninfo(project, version):
"""Write a simple file containing the version of the package."""
- open(os.path.join(project, 'versioninfo'), 'w').write("%s\n" % version)
+ with open(os.path.join(project, 'versioninfo'), 'w') as fil:
+ fil.write("%s\n" % version)
def get_cmdclass():