--- /dev/null
+#!/usr/bin/env python3
+
+# Copyright 2021 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import base64
+import json
+import logging
+
+from typing import Any
+from urllib import request as rq
+
+log = logging.getLogger("rabbitmq_check")
+log.addHandler(logging.StreamHandler())
+
+parser = argparse.ArgumentParser(
+ description='RabbitMQ bindings consistency check. '
+ 'Uses RabbitMQ Management API endpoint. '
+ 'It pushes test messages to exchanges that have queue bindings'
+ ' and checks if those messages are routed.')
+parser.add_argument('-u', '--username',
+ type=str, default='admin',
+ help='Username to API')
+parser.add_argument('-p', '--password',
+ type=str, default='r00tme',
+ help='Password to API')
+parser.add_argument('-H', '--hostname',
+ type=str, default='localhost',
+ help='Hostname or IP of the API endpoint')
+parser.add_argument('-P', '--port',
+ type=str, default='15672',
+ help='API Port')
+parser.add_argument('-V', '--vhost',
+ type=str, default='openstack',
+ help='Virtual Host to operate on')
+parser.add_argument('-s', '--ssl', action='store_true',
+ default=False, help='Use SSL (WIP)')
+parser.add_argument('check', help='Runs binding checks')
+parser.add_argument('-d', '--debug', action='store_const',
+ dest='loglevel', const=logging.DEBUG,
+ help='Print debug messages', default=logging.WARNING)
+parser.add_argument('-v', '--verbose', action='store_const',
+ dest='loglevel', const=logging.INFO,
+ help='Print more info')
+
+
+class APIConnector():
+ def __init__(self, opts: argparse.Namespace) -> None:
+ self.host = opts.hostname
+ self.port = opts.port
+ self.username = opts.username
+ self.password = opts.password
+
+ def _encode_b64(self, msg: str):
+ return base64.b64encode(msg.encode('utf-8')).decode('utf-8')
+
+ def get(self, path: str) -> Any:
+ return self._request('GET', path, None)
+
+ def post(self, path: str, body: bytes) -> Any:
+ return self._request('POST', path, body)
+
+ def _request(self, method: str, path: str, body: bytes) -> Any:
+ credentials = "{}:{}".format(self.username, self.password)
+ headers = {"Authorization": "Basic " + self._encode_b64(credentials)}
+ headers["Content-Type"] = "application/json"
+ url = 'http://{}:{}/{}'.format(self.host, self.port, path)
+ request = rq.Request(url, headers=headers,
+ method=method, data=body)
+ log.debug("Request details: %s, %s, %s, %s",
+ url, method, headers, body)
+
+ with rq.urlopen(request) as data:
+ out = json.loads(data.read())
+ return out
+
+
+class Checker():
+ def __init__(self, opts: argparse.Namespace) -> None:
+ self.conn = APIConnector(opts)
+ self.opts = opts
+
+ def get_bindings(self):
+ log.info(
+ "Getting bindings from: http://%s@%s:%s/%s", self.opts.username,
+ self.opts.hostname, self.opts.port, self.opts.vhost
+ )
+ bindings = self.conn.get(
+ 'api/bindings/{}?columns=routing_key,source'.format(
+ self.opts.vhost))
+ return bindings
+
+ def check_bindings(self):
+ for binding in self.get_bindings():
+ if not binding['routing_key'] or not binding['source']:
+ log.debug("Skipping inconsistent binding: %s", binding)
+ continue
+ msg = {
+ "properties": {},
+ "routing_key": binding['routing_key'],
+ "payload": "TESTING",
+ "payload_encoding": "string",
+ "vhost": self.opts.vhost
+ }
+ body = json.dumps(msg).encode('utf-8')
+ path = 'api/exchanges/{}/{}/publish'.format(
+ self.opts.vhost, binding['source'])
+ try:
+ log.info("Cheking binding [%s] from exchange [%s]",
+ binding['routing_key'], binding['source'])
+ ret = self.conn.post(path, body)
+ if isinstance(ret, dict):
+ if not ret.get('routed'):
+ print("Possibly unreachable binding [{}] from "
+ "exchange [{}]!".format(binding['routing_key'],
+ binding['source']))
+ else:
+ log.info("Message routed succesfuly: [%s]->[%s]",
+ binding['source'], binding['routing_key'])
+ except Exception as e:
+ log.warning("API returned error: %s", e, exc_info=1)
+
+
+def main():
+ options = parser.parse_args()
+ log.setLevel(options.loglevel)
+ log.debug("Parsed options: %s", options)
+
+ Checker(options).check_bindings()
+
+
+if __name__ == "__main__":
+ main()