“This is the fourth day of my participation in the August Gwen Challenge.

Remember before

Because of the service call is through the network to invoke the service is not in the same process, the program may collapse, node goes down, the network may interrupt, all of these accidents are expected, micro service is a thought, all assume that something will go wrong will appear, and no way to completely avoid, Fault tolerance, stability, and availability can only be increased by a few features, collectively known as service governance. Most features use data generated in the past to determine what to do with subsequent requests, and this data is the cornerstone of service governance.

The RPC framework writing Practice series of articles is my thoughts and summary of writing framework RAP.

1. Indicator data

Common functions in service governance, such as traffic limiting, load balancing, fuses, monitoring statistics, and so on, require metrics for most service governance functions to be implemented.

The position of indicator data in service governance is very important. It is data collected and re-counted over a period of time, not real-time data. There are reasons not to directly use real-time data, such as QPS per second, such as good statistical data, if every use of statistics once, not only troublesome, but also waste performance, and the key point of QPS per second is second, which means that its interval should be accurate to second, real-time data interval uncontrollable may be greater than or less than seconds; Another example is monitoring data, which is the summary of data within a unit time, rather than real-time data, because real-time data will bring large computing overhead. For a monitoring system, collecting data in real time will make the CPU of the machine have no downtime, and the hard disk will be filled with a lot of data, but a lot of data is not used, so the best method is to write the summary data of the time interval at intervals. The size of the interval depends on the trade-off between monitoring accuracy and computational performance.

By the change of the index data can be seen that these data will experience together, fixed in two stages, convergence phase can only write/update the data, do not read, fixed data can only be read not write, all at the same time machine resources are limited, we will not be unlimited storage summary data, so the index data will be destroyed stage, Each of these three stages has its own characteristics:

  • Convergence phase (writable) : The current data is not available and changes over time, such as being reset to a new value, or being accumulated and subtracted
  • Fixed phase (readable) : Data is currently readable but not writable.
  • Destruction phase (unwritable and unreadable) : The current data is unreadable and unwritable, waiting to be cleaned up.

In addition, the state of indicator data is irreversible, and changes will be sent along with the triggering condition. When the triggering condition is triggered, the state of the fixed stage will change to the destruction stage, and the state of the aggregation stage will change to the fixed stage, and then a new aggregation stage will be born to collect data.

2. Realize data collection and statistics within a period of time through the sliding window

Assume that the state of each indicator data is a slot, which can store a lot of dataMetricAs shown in the figure below, the slot indicated by pointer is the aggregation stage, the slot of pointer -1 is the fixed stage, and the slot of pointer -2 is the destruction stage:As you can see from the diagram, at the beginning the pointer points to1600000020This window, at the next window time, the pointer points to1600000030And the1600000000The window has disappeared.

In addition, it can be seen from the figure that there are only three time Windows, and the last one is to be reclaimed. There is no other operation. For languages with GC, the object will be reclaimed by GC after removing the reference directly. Very convenient, we can switch the stationary phase in implementation for the destruction phase method to specify the pointer of the tank is removed, then slot will be GC removed, need operation at this moment only two slot, and the realization of the two slot is very simple, to replace the pointer can be directly, the following is a simple implementation:

from typing import Any


class Demo:
    def __init__(self) - >None:
        # funnel bucket
        self.temp_dict: dict = {}
        # fixed barrels
        self.fixed_dict: dict = {}
    
    def set(self, key: Any, value: Any) - >None:
        self.temp_dict[key] = value
    
    def get(self, key: Any) - >None:
        return self.fixed_dict.get(key)

    def replace_dict(self) - >None:
        # switch logic
        self.fixed_dict = self.temp_dict
        self.temp_dict = {}
Copy the code

There are three methods: set, which stores data into the sink bucket, get, which gets data from the fixed bucket, call different buckets, and replace_dict, which replaces buckets. So, how to ensure that each window is switched when the time is up? Since my RPC framework is based on Asyncio, I will consider using Asyncio for scheduling. There is a loop.call_at method in Asyncio. This specifies when to execute the function, assuming an interval of 10 seconds. You can change the method to:

import asyncio


class Demo:
    def __init__(self) - >None:
        # funnel bucket
        self.temp_dict: dict = {}
        # fixed barrels
        self.fixed_dict: dict = {}
        self.loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
    
    def set(self, key: Any, value: Any) - >None:
        self.temp_dict[key] = value
    
    def get(self, key: Any) - >None:
        return self.fixed_dict.get(key)

    def replace_dict(self) - >None:
        # switch logic
        self.fixed_dict = self.temp_dict
        self.temp_dict = {}
        # indicates that replace_dict will be called after 10 seconds
        self.loop.call_at(self.loop.time() + 10, self.replace_dict)
Copy the code

Replace_dict is then run from the first call as long as the event loop is running, through the event loop, without maximum recursion.

However, two problems will be exposed here. One is that when the window time is very small, such as 1 second, the error of event cycle scheduling will be enlarged. Because scheduling is very complex, it can only be determined as far as possible to call around that time point. The amount of error can be determined by many complex factors (the same principle applies to thread scheduling, etc.); Another problem is that when the window time is limited to 1 second, if you want to collect data of 2 seconds, you can only open another statistics of 2 seconds, if you want to collect data of 3 seconds, you can only open another statistics of 3 seconds… This analogy can be very troublesome and requires a good uniform approach.

Enabling a thread to loop in the background and automatically switch every n seconds may seem like a good idea, but threads are also scheduled by callers, which can expose problems when the time window is much smaller. At the same time, I will call some hook functions when switching data, so it will also increase the interval error, this scheme is not acceptable.

3. Realize data collection and statistics within a period of time through time round

As mentioned above, the sliding window solution exposes two problems that lead to a less than good user experience and need to be fixed.

The first problem is that the handoff scheduler has some call errors in automatic switching. The core of this problem is that data is strongly correlated with time, so it must depend on time. Data reading and writing must be correlated with time. However, the above time window scheme uses passive switching, which will reduce the accuracy of time.

The logic of active switchover is to determine which slot to operate according to the operation time, and then read or write data (performance will be slightly reduced). In this scheme, the logic of read and write needs to be changed:

  • 1. The first stored array should restore the slot in the destruction stage, otherwise in extreme cases, the write and read subscripts may be the same, in this case the array length is 3, and then record a start time.
  • 2. Assume that interval to 1 second, every time reading and writing data, according to the current time and start time difference divided by the number 1 again and array length and the remainder, the remainder range in 0, 1, 2, and the 0 and 1, 2 is just save data array subscript range, with the passage of time, the subscript will always to 0, 1, 2, Zero, one, two keeps going round and round.
  • 3. The basic logic is done, but each time the time to read data is not fixed, such as writing data for the first time, just hit the subscript 0, the second time, while writing just hit the subscript 2, 1, if just have read data to identify 1 here at this moment data is not valid, at the same time 0, 1, 2 have been cycle, 1 May have reached the second round, and 2 is still the data of the first round, so there needs to be a variable to mark the number of rounds, and the number of rounds is also very simple, which is the difference between the current time and the start time and then the interval quotient.

About the logic of understanding, you can rewrite the above code, rewrite the code as follows:

import time
from typing import Dict.List.Tuple


class Demo:
    def __init__(self) - >None:
        # First the destroyed bucket should be restored, otherwise in extreme cases the placed and fetched indices will be the same, and then become an array just like a real sliding window
        The # CNT element represents the number of rounds
        self.bucket_list: List[Tuple[dict]] = [{'cnt': 0}, {'cnt': 0}, {'cnt': 0}]
        The total length of #
        self.bucket_len: int = len(self.bucket_list)
        # Set a variable to record the start time
        self.start_timestamp: float = time.time()
    
    def get_index(self) - >Tuple[int.int] :
        # *1000 is compatible with some intervals of milliseconds
        The first value is the array index (pointer), and the second value represents the number of rounds
        diff: int = int((time.time() - self.start_timestamp) * 1000)
        return diff % self.bucket_len, diff // self.bucket_len

    def set(self, key: Any, value: Any) - >None:
        index, cnt = self.get_index()
        bucket: dict = self.bucket_list[index]
        if bucket['cnt'] != cnt:
            Initialization is required for different rounds
            bucket = {key: value, 'cnt': cnt}
        else:
            # Update data in rounds
            bucket[key] = value
    
    def get(self, key: Any) - >None:
        index, cnt = self.get_index()
        index = index - 1
        The value is smaller than the current pointer
        if index == -1:
            index = self.bucket_len - 1
        bucket: dict = self.bucket_list[index]
        if bucket['cnt'] != cnt:
            # If the number of rounds is different, the data does not exist
            raise KeyError(key)
        else:
            # Update data in rounds
            return bucket.get(key)
Copy the code

The code is very simple, removing the passive switching logic of replace_dict and instead calculating which interval should fall into each set and get. So that solves the first problem, and you can look at the second problem, and you can see from the code in problem 1 that the bucket_list is an array, but the quotient of the time and the length of the array is always 0, 1, 2, 0, 1, 2, So it actually runs like a time wheel, and the data of the new round overwrites the old one, while the total number of slots in this time wheel remains the same, as shown in the following figure (drawn by draw. IO is a little ugly) :

And time wheel is also a project of the problem can be solved by just two, assuming that our first time interval for 60 seconds, if you want to compatible with 1 second, 2 seconds, 3 seconds, four seconds, 5 seconds and so on the many kinds of interval statistics, you can take their minimum interval common factor 1 as a child, it needs 60 slot, we can put the time round as a whole, Then the time wheel is cut into 60 equal slots, and the time is used to calculate which slot the pointer points to (the pointer will change with the remainder of the operation time and the total number of slots of the time wheel). When writing data, the slot specified by the pointer will be written, which is the aggregation data slot. The slot behind the counterclockwise pointer is distinguished from a fixed slot and an empty slot based on whether there is data. (During initialization, all slots are empty and can only become a fixed slot or an aggregation slot after data is written for the first time.) Data can be obtained only from the data behind the current pointer that is in the current round and not empty.

Then, the 1-second interval takes only the slot data from the last cell of the pointer, the 2-second interval takes only the sum of the slot data from the last two cells of the pointer, and the 3-second interval takes only the sum of the slot data from the last three cells of the pointer, and so on. In this way, a single class can be compatible with statistics of various time intervals. However, this scheme has a disadvantage, that is, the above time wheel has 60 slots, and one slot is defined as a data aggregation slot, so there are only 59 fixed slots left to obtain data at most, and there is still one slot missing. At this time, slot filling is needed, for example, 60 slots can be expanded to 65 slots. Then restrict access to only the last 60 slots, the remaining 5 slots can be used as buffer bits, the code changed again is as follows:

This scheme is compatible with a small range of data, using a 1-minute interval to accommodate a second level clock interval, if you want to accommodate a large range of time intervals, such as 5 minutes, 10 minutes, 15 minutes, etc., you can use the same scheme as a pointer clock. If you understand the design of Kafka’s time wheel, you know that Kafka has multiple time wheels. A small time wheel rotates one space, and a large time wheel rotates one space. When the time range of the large time wheel is less than or equal to that of the small time wheel, the data flows to the small time wheel. However, service governance data needs to be as valid as possible, so its time span is generally less than a minute.

import time
from typing import Dict.List.Tuple


class Demo:
    def __init__(self, max_interval: int = 60, min_interval: int = 1) - >None:
        self.max_interval = max_interval
        Figure out how many slots there are
        self.bucket_len: int = (max_interval // min_interval) + 5
        # set slot, the first element represents the number of rounds of data, easy to override
        self.bucket_list: List[dict] = [{'cnt': 0} for _ in range(self.bucket_len)]
        # Set a variable to record the start time
        self.start_timestamp: float = time.time()
    
    def get_index(self) - >Tuple[int.int] :
        # *1000 is compatible with some intervals of milliseconds
        The first value is the array index (pointer), and the second value represents the number of rounds
        diff: int = int((time.time() - self.start_timestamp) * 1000)
        return diff % self.bucket_len, diff // self.bucket_len

    def set(self, key: Any, value: Any) - >None:
        index, cnt = self.get_index()
        bucket: dict = self.bucket_list[index]
        if bucket['cnt'] != cnt:
            Initialization is required for different rounds
            bucket = {'cnt': cnt, key: value}
        else:
            # Update data in rounds
            bucket[1][key] = value
    
    def get(self, key: Any, diff: int = 0) - >None:
        # add diff parameter, which specifies the number of slots to retrieve data from the pointer
        if diff > max_interval:
            raise ValueError(f"diff:{diff} > {max_interval}")
        index, cnt = self.get_index()
        value: int = 0

        for i in range(diff):
            bucket: dict = self.bucket_list[index - i]
            if bucket['cnt'] != cnt:
                break
            value += bucket.get(key, 0)
        return value
Copy the code

The above code is a simple implementation. In order to make it usable, it needs to add some logic and optimize the performance of reading and writing. At the same time, it can also add some functions according to its own needs.

  • In the index data, there are distinctionsGaugeandCountOne type is stable and changes in real time, and only the latest fixed slot data is required. The other type is cumulative, and the data of all slots in this period is added up. To distinguish the data, you need to add a type identifier for each data.
  • getIs more cumbersome, if you can cache it will be better, reduce the performance loss when reading. If some data needs to be pushed passively, the above callback mode can be called, such as every 5 seconds. Although the data is executed once every 5 seconds, the latest round of data can still be retrieved due to the existence of the time cycle.

The resulting code will be longer. Check out the code I implemented in the RAP framework, collect_statistics.py