Tech blog: github.com/yongxinz/te…

Meanwhile, welcome to follow my wechat official account AlwaysBeta, more exciting content waiting for you.

The last two articles introduced:

  • Install RabbitMQ on a Mac
  • The six modes of RabbitMQ

Now let’s talk about surveillance.

Monitoring is important, especially in a production environment. The disk was full, the queue was backlogged, and if we didn’t know about it, the boss would wonder, is this guy on the run?

And I’m in this situation right now, mainly because of the backlog problem. Disk space is not a big concern because it is not a large amount, but sometimes the program execution will report an error and the queue will not consume any more, which can be embarrassing.

Did some research, made a summary. There are three main ways to learn about the state of RabbitMQ: the Management UI, the Rabbitmqctl command, and the REST API.

Management UI

RabbitMQ provides a wealth of Web management features, including access to RabbitMQ’s overall health, switch and queue status, personnel management and permission configuration.

But if you want to monitor it through the page, you can only go wrong by chance. See a problem, is good luck, do not see a problem, that is inevitable.

This is my current situation, so to avoid a major problem, I need to change it quickly.

Note: Use http://127.0.0.1:15672 to access Web pages. By default, the user name and password are guest, but should be changed in the production environment.

Rabbitmqctl command

Corresponding to the front-end page is the back-end command line command, also very rich. It can also be used when you test yourself or check some status temporarily. But as far as I’m concerned, I don’t use much.

I have summarized some common ones and listed them below for everyone to use:

# start service
rabbitmq-server

# stop service
rabbitmqctl stop

# vhost add delete check
rabbitmqctl add_vhost
rabbitmqctl delete_vhost
rabbitmqctl list_vhosts

# query switch
rabbitmqctl list_exchanges

# query queue
rabbitmqctl list_queues

# Check consumer information
rabbitmqctl list_consumers

# user add delete check
rabbitmqctl add_user
rabbitmqctl delete_user
rabbitmqctl list_users
Copy the code

REST API

Finally getting to the point, it’s nice for programmers to see a ready-made API to call.

Automated monitoring and batch operations are best achieved through API calls. For example, some users and permissions that need to be initialized can be completed by one click of script rather than page by page, simple and fast.

Here are some common apis:

# Summarize information
curl -i -u guest:guest http://localhost:15672/api/overview

# vhost list
curl -i -u guest:guest http://localhost:15672/api/vhosts

# the channel list
curl -i -u guest:guest http://localhost:15672/api/channels
      
# Node information
curl -i -u guest:guest http://localhost:15672/api/nodes
      
Switch information
curl -i -u guest:guest http://localhost:15672/api/exchanges
      
# queue information
curl -i -u guest:guest http://localhost:15672/api/queues
Copy the code

As far as I’ve been able to find, the Overview and Queues apis have worked for me, and you can make your choice based on your project’s reality.

API return content is JSON, and there are a lot of fields, at first look will feel confused, the specific meaning of the website and the actual situation to slowly consider, it is not very difficult to understand.

The following code contains the API request and the parsing of the returned results, which can be executed in a test environment and applied to a production environment with minor changes.

import json
import logging
import optparse

import requests

logging.basicConfig(
    format='%(asctime)s - %(pathname)s[%(lineno)d] - %(levelname)s: %(message)s',
    level=logging.INFO)
logger = logging.getLogger(__name__)


class RabbitMQMoniter(object):
    """ RabbitMQ Management API """
    def __init__(self, host=' ', port=15672, username='guest', password='guest'):
        self.host = host
        self.port = port
        self.username = username
        self.password = password

    def call_api(self, path):
        logger.info('call rabbit api to get data on ' + path)

        headers = {'content-type': 'application/json'}
        url = '{0}://{1}:{2}/api/{3}'.format('http', self.host, self.port, path)
        res = requests.get(url, headers=headers, auth=(self.username, self.password))

        return res.json()

    def list_queues(self):
        """ curl -i -u guest:guest http://localhost:15672/api/queues return: list """
        queues = []
        for queue in self.call_api('queues'):
            element = {
                'vhost': queue['vhost'].'queue': queue['name']
            }
            queues.append(element)
            logger.info('get queue ' + queue['vhost'] + '/' + queue['name'])
        return queues

    def list_nodes(self):
        """ curl -i -u guest:guest http://localhost:15672/api/nodes return: list """
        nodes = []
        for node in self.call_api('nodes'):
            name = node['name'].split(The '@') [1]
            element = {
                'node': name,
                'node_type': node['type']
            }
            nodes.append(element)
            logger.info('get nodes ' + name + '/' + node['type'])
        return nodes

    def check_queue(self):
        """ check queue """
        for queue in self.call_api('queues'):
            self._get_queue_data(queue)
        return True

    def _get_queue_data(self, queue):
        """ get queue data """
        for item in ['memory'.'messages'.'messages_ready'.'messages_unacknowledged'.'consumers']:
            key = 'rabbitmq.queues[{0},queue_{1},{2}]'.format(queue['vhost'], item, queue['name'])
            value = queue.get(item, 0)
            logger.info('queue data: - %s %s' % (key, value))

        for item in ['deliver_get'.'publish']:
            key = 'rabbitmq.queues[{0},queue_message_stats_{1},{2}]'.format(queue['vhost'], item, queue['name'])
            value = queue.get('message_stats', {}).get(item, 0)
            logger.info('queue data: - %s %s' % (key, value))

    def check_aliveness(self):
        """ check alive """
        return self.call_api('aliveness-test/%2f') ['status']

    def check_overview(self, item):
        """ check overview """
        if item in ['channels'.'connections'.'consumers'.'exchanges'.'queues'] :return self.call_api('overview').get('object_totals').get(item, 0)
        elif item in ['messages'.'messages_ready'.'messages_unacknowledged'] :return self.call_api('overview').get('queue_totals').get(item, 0)
        elif item == 'message_stats_deliver_get':
            return self.call_api('overview').get('message_stats', {}).get('deliver_get'.0)
        elif item == 'message_stats_publish':
            return self.call_api('overview').get('message_stats', {}).get('publish'.0)
        elif item == 'message_stats_ack':
            return self.call_api('overview').get('message_stats', {}).get('ack'.0)
        elif item == 'message_stats_redeliver':
            return self.call_api('overview').get('message_stats', {}).get('redeliver'.0)
        elif item == 'rabbitmq_version':
            return self.call_api('overview').get('rabbitmq_version'.'None')

    def check_server(self, item, node_name):
        """ check server """
        node_name = node_name.split('. ') [0]
        for nodeData in self.call_api('nodes') :if node_name in nodeData['name'] :return nodeData.get(item, 0)
        return 'Not Found'


def main(a):
    """ Command-line """
    choices = ['list_queues'.'list_nodes'.'queues'.'check_aliveness'.'overview'.'server']

    parser = optparse.OptionParser()
    parser.add_option('--username', help='RabbitMQ API username', default='guest')
    parser.add_option('--password', help='RabbitMQ API password', default='guest')
    parser.add_option('--host', help='RabbitMQ API host', default='127.0.0.1')
    parser.add_option('--port', help='RabbitMQ API port', type='int', default=15672)
    parser.add_option('--check', type='choice', choices=choices, help='Type of check')
    parser.add_option('--metric', help='Which metric to evaluate', default=' ')
    parser.add_option('--node', help='Which node to check (valid for --check=server)')
    (options, args) = parser.parse_args()

    if not options.check:
        parser.error('At least one check should be specified')

    logger.info('start running ... ')

    api = RabbitMQMoniter(username=options.username, password=options.password, host=options.host, port=options.port)

    if options.check == 'list_queues':
        logger.info(json.dumps({'data': api.list_queues()}, indent=4, separators=(', '.':')))
    elif options.check == 'list_nodes':
        logger.info(json.dumps({'data': api.list_nodes()}, indent=4, separators=(', '.':')))
    elif options.check == 'queues':
        logger.info(api.check_queue())
    elif options.check == 'check_aliveness':
        logger.info(api.check_aliveness())
    elif options.check == 'overview':
        if not options.metric:
            parser.error('Missing required parameter: "metric"')
        else:
            if options.node:
                logger.info(api.check_overview(options.metric))
            else:
                logger.info(api.check_overview(options.metric))
    elif options.check == 'server':
        if not options.metric:
            parser.error('Missing required parameter: "metric"')
        else:
            if options.node:
                logger.info(api.check_server(options.metric, options.node))
            else:
                logger.info(api.check_server(options.metric, api.host))


if __name__ == '__main__':
    main()
Copy the code

Call and return:

python3 rabbitmq_status.py --check list_queues

# 2020-04-12 14:33:15, 298-rabbitmq_status. py[142] -info: start running...
# 2020-04-12 14:33:15, kbytes - Rabbitmq_status. py[26] - INFO: Call Rabbit API to Get data on Queues
# 2020-04-12 14:33:15,312 - rabbitmq_status.py[46] - INFO: get queue //task_queue
# 2020-04-12 14:33:15,312 - rabbitmq_status.py[147] - INFO: {
# "data":[
# {
# "vhost":"/",
# "queue":"task_queue"
#}
#]
#}
Copy the code

By parsing the returned results, you can determine the RabbitMQ running status. If the RabbitMQ threshold is exceeded, you can send an alarm or email to monitor the RabbitMQ status.

You can monitor the backlog of a queue in either of the following ways: Set a threshold for the backlog length of a queue. If the threshold is exceeded, an alarm is generated. Second, the length of the last five backlogs is saved. If the backlogs grow and exceed the threshold, an alarm is generated.

The second method is better, more accurate and less likely to give false warnings, but also more complex to implement.

This is just an idea, and I will share the results and code later. Or do you have a better way? Welcome to leave a message.

Source code address:

Github.com/yongxinz/te…

Reference Documents:

www.rabbitmq.com/monitoring….

Blog.51cto.com/john88wang/…