From: Denis Meltsaykin Date: Mon, 24 May 2021 21:52:59 +0000 (+0200) Subject: [RabbitMQ] add RabbitMQ bindings checker X-Git-Url: https://review.fuel-infra.org/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F35%2F41735%2F3;p=tools%2Fsustaining.git [RabbitMQ] add RabbitMQ bindings checker This is still mostly Work-In-Progress since there is no clear reproductions for the "bindings issue" and hence it's hard to test if this approach is 100% correct. Change-Id: Ic5cc4ef91afcecd2a00cee751b9cd6f8de342998 --- diff --git a/scripts/rabbitmq_check.py b/scripts/rabbitmq_check.py new file mode 100755 index 0000000..fa21515 --- /dev/null +++ b/scripts/rabbitmq_check.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python3 + +# Copyright 2021 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import argparse +import base64 +import json +import logging + +from typing import Any +from urllib import request as rq + +log = logging.getLogger("rabbitmq_check") +log.addHandler(logging.StreamHandler()) + +parser = argparse.ArgumentParser( + description='RabbitMQ bindings consistency check. ' + 'Uses RabbitMQ Management API endpoint. ' + 'It pushes test messages to exchanges that have queue bindings' + ' and checks if those messages are routed.') +parser.add_argument('-u', '--username', + type=str, default='admin', + help='Username to API') +parser.add_argument('-p', '--password', + type=str, default='r00tme', + help='Password to API') +parser.add_argument('-H', '--hostname', + type=str, default='localhost', + help='Hostname or IP of the API endpoint') +parser.add_argument('-P', '--port', + type=str, default='15672', + help='API Port') +parser.add_argument('-V', '--vhost', + type=str, default='openstack', + help='Virtual Host to operate on') +parser.add_argument('-s', '--ssl', action='store_true', + default=False, help='Use SSL (WIP)') +parser.add_argument('check', help='Runs binding checks') +parser.add_argument('-d', '--debug', action='store_const', + dest='loglevel', const=logging.DEBUG, + help='Print debug messages', default=logging.WARNING) +parser.add_argument('-v', '--verbose', action='store_const', + dest='loglevel', const=logging.INFO, + help='Print more info') + + +class APIConnector(): + def __init__(self, opts: argparse.Namespace) -> None: + self.host = opts.hostname + self.port = opts.port + self.username = opts.username + self.password = opts.password + + def _encode_b64(self, msg: str): + return base64.b64encode(msg.encode('utf-8')).decode('utf-8') + + def get(self, path: str) -> Any: + return self._request('GET', path, None) + + def post(self, path: str, body: bytes) -> Any: + return self._request('POST', path, body) + + def _request(self, method: str, path: str, body: bytes) -> Any: + credentials = "{}:{}".format(self.username, self.password) + headers = {"Authorization": "Basic " + self._encode_b64(credentials)} + headers["Content-Type"] = "application/json" + url = 'http://{}:{}/{}'.format(self.host, self.port, path) + request = rq.Request(url, headers=headers, + method=method, data=body) + log.debug("Request details: %s, %s, %s, %s", + url, method, headers, body) + + with rq.urlopen(request) as data: + out = json.loads(data.read()) + return out + + +class Checker(): + def __init__(self, opts: argparse.Namespace) -> None: + self.conn = APIConnector(opts) + self.opts = opts + + def get_bindings(self): + log.info( + "Getting bindings from: http://%s@%s:%s/%s", self.opts.username, + self.opts.hostname, self.opts.port, self.opts.vhost + ) + bindings = self.conn.get( + 'api/bindings/{}?columns=routing_key,source'.format( + self.opts.vhost)) + return bindings + + def check_bindings(self): + for binding in self.get_bindings(): + if not binding['routing_key'] or not binding['source']: + log.debug("Skipping inconsistent binding: %s", binding) + continue + msg = { + "properties": {}, + "routing_key": binding['routing_key'], + "payload": "TESTING", + "payload_encoding": "string", + "vhost": self.opts.vhost + } + body = json.dumps(msg).encode('utf-8') + path = 'api/exchanges/{}/{}/publish'.format( + self.opts.vhost, binding['source']) + try: + log.info("Cheking binding [%s] from exchange [%s]", + binding['routing_key'], binding['source']) + ret = self.conn.post(path, body) + if isinstance(ret, dict): + if not ret.get('routed'): + print("Possibly unreachable binding [{}] from " + "exchange [{}]!".format(binding['routing_key'], + binding['source'])) + else: + log.info("Message routed succesfuly: [%s]->[%s]", + binding['source'], binding['routing_key']) + except Exception as e: + log.warning("API returned error: %s", e, exc_info=1) + + +def main(): + options = parser.parse_args() + log.setLevel(options.loglevel) + log.debug("Parsed options: %s", options) + + Checker(options).check_bindings() + + +if __name__ == "__main__": + main()