Remember before

Tail is a common Linux command that can print the last n lines of a file or append data to a file in real time. Tail is simple to implement, but there are a lot of details that need to be considered in order to implement a complete tail, and there are other mechanisms that need to be introduced if performance is important. It started out as a simple implementation of tail’s basic functionality for Linux, but was later refined by Linux’s inotify mechanism for high-performance log file reads. Related source :github.com/so1n/exampl…


So1n. Me /2019/03/07/…

1. First version — Reads live data from the end of the file

The main idea is: open the file, move the pointer to the end of the file, and then there is data output data, no data sleep for a period of time.

import time
import sys

from typing import Callable, NoReturn


class Tail(object) :
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1
    ) :
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval

    def __call__(self) :
        with open(self.file_name) as f:
            f.seek(0.2)  # seek from the end of the file
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # print a new line each time
                else:
                    time.sleep(self.interval)


if __name__ == '__main__':
    filename: str = sys.argv[0]
    Tail(filename)()
Copy the code

Then just do the following call:

python xxx.py filename 
Copy the code

2. Version 2 — Implementationtail -f

Tail -f By default, the last 10 lines of data are read first and then the real-time data is read from the end of the file. For small files, you can read all the contents of the file first, and output the last 10 lines, but the performance of reading the full text and then fetch the last 10 lines is not high, and the boundary conditions of rolling the last 10 lines from the last 10 lines are also very complex, first read the full text and then fetch the last 10 lines implementation:

import time
import sys

from typing import Callable, NoReturn


class Tail(object) :
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1
    ) :
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval

    def __call__(self) :
        with open(self.file_name) as f:
            self.read_last_line(f)
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # print a new line each time
                else:
                    time.sleep(self.interval)

    def read_last_line(self, f) :
        last_lines = f.readlines()[-10:]
        for line in last_lines:
            self.output(line)

if __name__ == '__main__':
    filename: str = sys.argv[0]
    Tail(filename)()   
Copy the code

As you can see, the implementation is very simple, compared to the first version of the read_last_line function, the next problem is to solve the performance problem, when the file is very large, this logic does not work, especially some log files are often several gigabytes in size, if you read all memory. In Linux, there is no interface that can specify a pointer to the bottom 10 lines. The only way to simulate the bottom 10 lines is as follows:

  • First, the cursor jumps to the latest character, saves the current cursor, and then estimates the length of a line of data, preferably more than 1024 characters, here I use the length of a line to deal with
  • Then jump to the characters of seek(-1024 * 10, 2) using the method of seek, and this is what we expect in the bottom 10 lines
  • Then judge the content, if the jump character length is less than 10 * 1024, then prove that the entire file does not have 10 lines, then use the originalread_last_lineMethods.
  • If a jump to a length equal to 1024 *10, use the newline character to count the number of lines taken. If the number of lines is greater than 10, only the last 10 lines are printed. If you read only 4 lines, continue reading 6*1024 until 10 lines are read

Through the above step, the bottom 10 lines of data calculated can be printed out, can enter the additional data, but at this time the file content may have changed, our cursor has also changed, this time to jump back to the cursor saved, to prevent missing or repeated printing data.

Once the analysis is complete, you can start refactoring the read_last_line function.

import time
import sys

from typing import Callable.List, NoReturn


class Tail(object) :
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1,
            len_line: int = 1024
    ) :
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval
        self.len_line: int = len_line

    def __call__(self, n: int = 10) :
        with open(self.file_name) as f:
            self.read_last_line(f, n)
            while True:
                line: str = f.readline()
                if line:
                    self.output(line)  # print a new line each time
                else:
                    time.sleep(self.interval)

    def read_last_line(self, file, n) :
        read_len: int = self.len_line * n

        # Jump cursor to end
        file.seek(0.2)
        Get the cursor position of the current end
        now_tell: int = file.tell()
        while True:
            if read_len > file.tell():
                If the jump is longer than the original file, print out the entire file
                file.seek(0) Since the read method prints as a cursor, the cursor is reset
                last_line_list: List[str] = file.read().split('\n')[-n:]
                Retrieve cursor position
                now_tell: int = file.tell()
                break
            # Jump to our expected character position
            file.seek(-read_len, 2)
            read_str: str = file.read(read_len)
            cnt: int = read_str.count('\n')
            if cnt >= n:
                If the number of rows obtained is greater than the required number of rows, the number of the first n rows is obtained
                last_line_list: List[str] = read_str.split('\n')[-n:]
                break
            else:
                If the required number of rows is less than the required number of rows, estimate the number of rows to be fetched and continue to fetch
                if cnt == 0:
                    line_per: int = read_len
                else:
                    line_per: int = int(read_len / cnt)
                read_len = line_per * n

        for line in last_line_list:
            self.output(line + '\n')
        Reset the cursor to ensure that the data printed next is not duplicated
        file.seek(now_tell)


if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("-f"."--filename")
    parser.add_argument("-n"."--num", default=10)
    args, unknown = parser.parse_known_args()
    if not args.filename:
        raise RuntimeError('filename args error')
    Tail(args.filename)(int(args.num))
Copy the code

3. Version 3 – Elegantly read output log files

It can be found that the logical performance of reading that block in real time is still very poor. If the file is read once per second, the real-time performance is too slow. If the interval is changed to small, the processor takes up too much. The best case scenario is if the file is updated and then printed, performance is guaranteed. Thankfully, inotify provides this functionality in Linux. In addition, one of the features of the log file is that it is logrotate. If the log is logrotate, we need to reopen the file and read the data further. Inotify can also be used in this case. When inotify gets an event to reopen the file, we reopen the file and read it again.

import os
import sys

from typing import Callable.List, NoReturn

import pyinotify

multi_event = pyinotify.IN_MODIFY | pyinotify.IN_MOVE_SELF  # Monitor multiple events


class InotifyEventHandler(pyinotify.ProcessEvent) :  Customize the event handler class, pay attention to inheritance
    Perform encapsulation of inotify Events
    f: 'open()'
    filename: str
    path: str
    wm: 'pyinotify.WatchManager'
    output: Callable

    def my_init(self, **kargs) :
        """pyinotify.ProcessEvent requires that you not inherit __init__ directly. Instead, you override my_init.

        # fetch file
        filename: str = kargs.pop('filename')
        if not os.path.exists(filename):
            raise RuntimeError('Not Found filename')
        if '/' not in filename:
            filename = os.getcwd() + '/' + filename
        index = filename.rfind('/')
        if index == len(filename) - 1 or index == -1:
            raise RuntimeError('Not a legal path')

        self.f = None
        self.filename = filename
        self.output: Callable = kargs.pop('output')
        self.wm = kargs.pop('wm')
        Monitor only the path so that you know if the file has moved
        self.path = filename[:index]
        self.wm.add_watch(self.path, multi_event)

    def read_line(self) :
        """ Unified output method ""
        for line in self.f.readlines():
            self.output(line)

    def process_IN_MODIFY(self, event) :
        """ must be process_ event name, event stands for event object, here means file change is monitored, file read """
        if event.pathname == self.filename:
            self.read_line()

    def process_IN_MOVE_SELF(self, event) :
        """ must be process_ event name, event represents the event object, where the file is being monitored for reopening, file reading """
        if event.pathname == self.filename:
            # Reopen file detected that file has been moved
            self.f.close()
            self.f = open(self.filename)
            self.read_line()

    def __enter__(self) -> 'InotifyEventHandler':
        self.f = open(self.filename)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) :
        self.f.close()


class Tail(object) :
    def __init__(
            self,
            file_name: str,
            output: Callable[[str], NoReturn] = sys.stdout.write,
            interval: int = 1,
            len_line: int = 1024
    ) :
        self.file_name: str = file_name
        self.output: Callable[[str], NoReturn] = output
        self.interval: int = interval
        self.len_line: int = len_line

        wm = pyinotify.WatchManager()  # create WatchManager object
        inotify_event_handler = InotifyEventHandler(
            **dict(filename=file_name, wm=wm, output=output)
        )  # instantiate our custom event handler class using dict parameters
        wm.add_watch('/tmp', multi_event)  # Add a directory to monitor, and events
        self.notifier = pyinotify.Notifier(wm, inotify_event_handler)  # is passed in when notifier is instantiated, and notifier is executed automatically
        self.inotify_event_handle: 'InotifyEventHandler' = inotify_event_handler

    def __call__(self, n: int = 10) :
        Open files through inotify's with management
        with self.inotify_event_handle as i:
            First read the specified number of rows
            self.read_last_line(i.f, n)
            Enable inotify to listen
            self.notifier.loop()

    def read_last_line(self, file, n) :
        read_len: int = self.len_line * n

        Get the cursor position of the current end
        file.seek(0.2)
        now_tell: int = file.tell()
        while True:
            if read_len > file.tell():
                If the jump is longer than the original file, print out the entire file
                file.seek(0)
                last_line_list: List[str] = file.read().split('\n')[-n:]
                Retrieve cursor position
                now_tell: int = file.tell()
                break
            file.seek(-read_len, 2)
            read_str: str = file.read(read_len)
            cnt: int = read_str.count('\n')
            if cnt >= n:
                If the number of rows obtained is greater than the required number of rows, the number of the first n rows is obtained
                last_line_list: List[str] = read_str.split('\n')[-n:]
                break
            else:
                If the required number of rows is less than the required number of rows, estimate the number of rows to be fetched and continue to fetch
                if cnt == 0:
                    line_per: int = read_len
                else:
                    line_per: int = int(read_len / cnt)
                read_len = line_per * n

        for line in last_line_list:
            self.output(line + '\n')
        Reset the cursor to ensure that the data printed next is not duplicated
        file.seek(now_tell)


if __name__ == '__main__':
    import argparse

    parser = argparse.ArgumentParser()
    parser.add_argument("-f"."--filename")
    parser.add_argument("-n"."--num", default=10)
    args, unknown = parser.parse_known_args()
    if not args.filename:
        raise RuntimeError('filename args error')
    Tail(args.filename)(int(args.num))
Copy the code

As you can see, instead of opening the file with open, you open it with inotify (which calls the my_init method to initialize it), and you run the same code that we opened n lines of, and inotify runs it. Before inotify runs, we mount the re-open file method and print file method in inotifiy events, and then when inotify runs, we execute the corresponding methods based on the corresponding events.