preface

In an automated operation and maintenance system, it is often necessary to monitor logs, which are constantly updated. This article provides a Python implementation of real-time log monitoring. The main functions are as follows:

  • Grab terminal output from a remote machine to the server.
  • The server logs are displayed on the client web page in real time.

The examples in this article are based on Python and Flask.

Mainly rely on:

  • Flask
  • Redis and its Python client
  • paramiko

Analysis of the

The main job of the server is to connect to the remote machine through SSH to perform tasks, and the Worker is used to manage each job in the project. The Worker runs in a separate thread, saves the result to the database and deletes itself from the “contractor”.

In this way, a log will be generated as the worker works. Due to the fast update frequency of logs, I thought of using Redis as the intermediate storage, and the data structure is Redis list, which will be sent to customers as soon as there is an update. In addition, if I want to see the log after the worker is finished, I have to save the log as a static file.

The general logic goes like this:

Request web page -> Get logs -> Worker Is working? -> Yes: obtain logs from the Redis log queue and update them in real time

-> No: reads data from a file

Get remote output

So the next problem to solve is how to get the terminal output from the remote machine and add it to the log queue. In Python, the library for SSH connections is Paramiko, so I naturally wanted to use the following method:

client = paramiko.SSHClient()
client.load_system_host_keys()
client.connect(host)
stdin, stdout, stderr = client.exec_command(command)
while stdout.channel.exit_status_ready():
    logger.log(stdout.readline())
Copy the code

However, both readline and read(size) block until the run is complete. This does not meet my requirements. Fortunately, there is a corresponding channel object for me to use, so the code changed to:

stdin, stdout, stderr = client.exec_command(command)
channel = stdout.channel
pending = err_pending = None
while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready():
    readq, _, _ = select.select([channel], [], [], 1)
        for c in readq:
            if c.recv_ready():
                chunk = c.recv(len(c.in_buffer))
                if pending is not None:
                    chunk = pending + chunk
                lines = chunk.splitlines()
                if lines and lines[-1] and lines[-1][-1] == chunk[-1]:
                    pending = lines.pop()
                else:
                    pending = None
                [logger.log(line) for line in lines]
            if c.recv_stderr_ready():
                chunk = c.recv_stderr(len(c.in_stderr_buffer))
                if err_pending is not None:
                    chunk = err_pending + chunk
                lines = chunk.splitlines()
                if lines and lines[-1] and lines[-1][-1] == chunk[-1]:
                    err_pending = lines.pop()
                else:
                    err_pending = None
                [logger.log(line) for line in lines]
Copy the code

Channel.closed is True when all output has been read, and exit_status_ready() is True when the process has finished running. Pending and chunk are used for whole row reads. Logger’s log method is defined as follows:

def log(self, message, *args):
    self.log_queue.put(message % args)
Copy the code

Log_queue.put pushes the message to the Redis list.

Real-time Log Update

Now we need to implement a webpage display, when the user visits, display the current log, if the log update, as long as the webpage is still open, no need to refresh, the log is real-time update to the webpage. Also, if there are multiple client connections, the logs should be updated synchronously.

For a normal HTTP connection, the client receives a response immediately after completing a request. If the client does not request again, it cannot get a new response. The server is passive. There are roughly three ways to implement this server’s active sending: AJAX, SSE, and Websocket.

AJAX is when a client automatically sends a request at a predetermined interval, not real time. SSE is actually a long connection, which can only realize the active sending of messages from the server to the client. Websocket is a full-duplex channel between the server and the client, requiring software support from the backend.

Considering the above three factors, SSE is the least costly choice that can meet my requirements. It works by setting up an event listener on the client side to listen for a message at a specified URL. On the server side, the response from this URL must be of a stream type. Just set the response body to a generator and set the header to mimeType =’text/event-stream’. Flask-sse is a packaged extension that can be installed and used in Flask. Flask-sse is a message queue implemented via Redis’ Pubsub. However, data sent can only be received after the connection has been established. As long as the web page is generated, the message saved in the log queue is displayed, and the event listener is established to accept the new log. The code is as follows:

from flask_sse import sse
app.register_blueprint(sse, url_prefix='stream')

@app.route('/job_status/<int:job_id>/log')
def job_log(job_id):
    worker = wk.get_worker(job_id)
    body = []
    if not worker:
        report_folder = utils.get_directory_path(
                            current_app.root_path,
                            current_app.config['REPORT_FOLDER'],
                            str(job_id),
                       )
        log_file = os.path.join(report_folder, 'worker.log')
        if os.path.exists(log_file):
            body = open(log_file, 'r').readlines()
    else:
        body = list(worker.logger.log_queue)
    html_body = '\n'.join('<p>%s</p>' % line for line in body)
    return render_template('job_log.html', body=html_body, job_id=job_id)
Copy the code

Accordingly, when logging is added, a message is sent to Pubsub:

def log(self, message, *args):
    message = message % args
    self.log_queue.put(message)
    sse.publish({'message': message}, type='log')
Copy the code

The content of job_log.html is as follows:

{% extends 'layout.html' %} {% block content %} <div class="row"> <div class="col-md-10 col-md-offset-1"> <div class="log-container"> {{ body|safe }} </div> </div> </div> {% endblock %} {% block extra_script %} <script type="text/javascript"> var source = new EventSource("{{ url_for('sse.stream') }}"); var div = document.getElementsByClassName('log-container')[0]; source.addEventListener('log', function(event) { var data = JSON.parse(event.data); var newLine = document.createElement('p'); newLine.innerHTML = data.message; $(newLine).appendTo($('.log-container')); div.scrollTop = div.scrollHeight; }, false); $(function() {div.scrollTop = div.scrollHeight; }); </script> {% endblock %}Copy the code

Reference links:

  • Flask – sse. Readthedocs. IO/en/latest/q…
  • stackoverflow.com/a/32758464