]> review.fuel-infra Code Review - tools/sustaining.git/commitdiff
[RabbitMQ] add RabbitMQ bindings checker 35/41735/3
authorDenis Meltsaykin <dmeltsaykin@mirantis.com>
Mon, 24 May 2021 21:52:59 +0000 (23:52 +0200)
committerDenis Meltsaykin <dmeltsaykin@mirantis.com>
Tue, 25 May 2021 09:45:53 +0000 (11:45 +0200)
This is still mostly Work-In-Progress since there is no clear
reproductions for the "bindings issue" and hence it's hard to
test if this approach is 100% correct.

Change-Id: Ic5cc4ef91afcecd2a00cee751b9cd6f8de342998

scripts/rabbitmq_check.py [new file with mode: 0755]

diff --git a/scripts/rabbitmq_check.py b/scripts/rabbitmq_check.py
new file mode 100755 (executable)
index 0000000..fa21515
--- /dev/null
@@ -0,0 +1,145 @@
+#!/usr/bin/env python3
+
+#    Copyright 2021 Mirantis, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import argparse
+import base64
+import json
+import logging
+
+from typing import Any
+from urllib import request as rq
+
+log = logging.getLogger("rabbitmq_check")
+log.addHandler(logging.StreamHandler())
+
+parser = argparse.ArgumentParser(
+    description='RabbitMQ bindings consistency check. '
+                'Uses RabbitMQ Management API endpoint. '
+                'It pushes test messages to exchanges that have queue bindings'
+                ' and checks if those messages are routed.')
+parser.add_argument('-u', '--username',
+                    type=str, default='admin',
+                    help='Username to API')
+parser.add_argument('-p', '--password',
+                    type=str, default='r00tme',
+                    help='Password to API')
+parser.add_argument('-H', '--hostname',
+                    type=str, default='localhost',
+                    help='Hostname or IP of the API endpoint')
+parser.add_argument('-P', '--port',
+                    type=str, default='15672',
+                    help='API Port')
+parser.add_argument('-V', '--vhost',
+                    type=str, default='openstack',
+                    help='Virtual Host to operate on')
+parser.add_argument('-s', '--ssl', action='store_true',
+                    default=False, help='Use SSL (WIP)')
+parser.add_argument('check', help='Runs binding checks')
+parser.add_argument('-d', '--debug', action='store_const',
+                    dest='loglevel', const=logging.DEBUG,
+                    help='Print debug messages', default=logging.WARNING)
+parser.add_argument('-v', '--verbose', action='store_const',
+                    dest='loglevel', const=logging.INFO,
+                    help='Print more info')
+
+
+class APIConnector():
+    def __init__(self, opts: argparse.Namespace) -> None:
+        self.host = opts.hostname
+        self.port = opts.port
+        self.username = opts.username
+        self.password = opts.password
+
+    def _encode_b64(self, msg: str):
+        return base64.b64encode(msg.encode('utf-8')).decode('utf-8')
+
+    def get(self, path: str) -> Any:
+        return self._request('GET', path, None)
+
+    def post(self, path: str, body: bytes) -> Any:
+        return self._request('POST', path, body)
+
+    def _request(self, method: str, path: str, body: bytes) -> Any:
+        credentials = "{}:{}".format(self.username, self.password)
+        headers = {"Authorization": "Basic " + self._encode_b64(credentials)}
+        headers["Content-Type"] = "application/json"
+        url = 'http://{}:{}/{}'.format(self.host, self.port, path)
+        request = rq.Request(url, headers=headers,
+                             method=method, data=body)
+        log.debug("Request details: %s, %s, %s, %s",
+                  url, method, headers, body)
+
+        with rq.urlopen(request) as data:
+            out = json.loads(data.read())
+        return out
+
+
+class Checker():
+    def __init__(self, opts: argparse.Namespace) -> None:
+        self.conn = APIConnector(opts)
+        self.opts = opts
+
+    def get_bindings(self):
+        log.info(
+            "Getting bindings from: http://%s@%s:%s/%s", self.opts.username,
+            self.opts.hostname, self.opts.port, self.opts.vhost
+        )
+        bindings = self.conn.get(
+            'api/bindings/{}?columns=routing_key,source'.format(
+                self.opts.vhost))
+        return bindings
+
+    def check_bindings(self):
+        for binding in self.get_bindings():
+            if not binding['routing_key'] or not binding['source']:
+                log.debug("Skipping inconsistent binding: %s", binding)
+                continue
+            msg = {
+                "properties": {},
+                "routing_key": binding['routing_key'],
+                "payload": "TESTING",
+                "payload_encoding": "string",
+                "vhost": self.opts.vhost
+            }
+            body = json.dumps(msg).encode('utf-8')
+            path = 'api/exchanges/{}/{}/publish'.format(
+                self.opts.vhost, binding['source'])
+            try:
+                log.info("Cheking binding [%s] from exchange [%s]",
+                         binding['routing_key'], binding['source'])
+                ret = self.conn.post(path, body)
+                if isinstance(ret, dict):
+                    if not ret.get('routed'):
+                        print("Possibly unreachable binding [{}] from "
+                              "exchange [{}]!".format(binding['routing_key'],
+                                                      binding['source']))
+                    else:
+                        log.info("Message routed succesfuly: [%s]->[%s]",
+                                 binding['source'], binding['routing_key'])
+            except Exception as e:
+                log.warning("API returned error: %s", e, exc_info=1)
+
+
+def main():
+    options = parser.parse_args()
+    log.setLevel(options.loglevel)
+    log.debug("Parsed options: %s", options)
+
+    Checker(options).check_bindings()
+
+
+if __name__ == "__main__":
+    main()