Recently, I have been studying message queues, file systems, databases, etc., and gradually found that they all have one core component: logs. Sometimes called write-Ahead logs, commit logs, or transaction logs, this usually refers to writing logs before any changes are applied, typically including replay logs and undo logs. We hear a lot about NoSQL databases, KV storage, Hadoop, Raft, PaxOS, and version control. All of these middleware or protocols rely on logging to some extent in nature, and logging has always played a very important role in distributed systems.

What is a log?

Log is additional, completely and orderly records in time order sequence, is a special kind of file format, the file is a byte array, and the log is a record data, is only relative to the file, each record is in accordance with the time here is the relative order of the log is one of the most simple storage model, read from left to right, Message queues, for example, are typically written linearly to log files, with consumers reading from offset in sequence.

Due to the inherent nature of logging, records are inserted sequentially from left to right, which means that the left record is “older” than the right record, which means we don’t have to rely on the system clock, which is important for distributed systems.



Application of Logs

The application of logs in databases

It is not known when the log will appear, perhaps conceptually too simple. For example, the redo log in MySQL is a disk-based data structure that is used to ensure the correctness and integrity of data when the system crashes. It is also called a write-ahead log. For example, during the execution of an object, The redo log is written before the actual changes are applied, so that when the system recovers after a crash, it can be replayed from the redo log to recover the data (during initialization, there will not be no client connections). Logs can also be used for synchronization between master and slave databases, because essentially all database operations have been written to the log, so we can synchronize the log to the slave and replay it on the slave, as well as many other components that we need. We can subscribe to the Redo log to get all changes to the database and implement personalized business logic, such as auditing, cache synchronization, and so on.

Application of log in distributed system



Distributed system services is essentially about the change in the state, it can be understood as a state machine, two separate processes (do not rely on the external environment, such as the system clock, external interface, etc.) given the same input will produce a consistent output and eventually in a consistent state, and the order of the log due to its inherent nature does not depend on the system clock, It can be used to solve the problem of change orderliness.

We use this feature to solve many problems encountered in distributed systems. For example, the slave node in RocketMQ. The master broker receives requests from the client, logs them, and synchronise them to the SALve in real time. The slave replays locally, and when the master hangs, it can continue processing requests, such as rejecting write requests and continuing processing read requests. Logs can record not only data, but also direct operations, such as SQL statements.



Log is a key data structure to solve the problem of consistency. Log is like a sequence of operations, and each record represents an instruction. For example, Paxos and Raft are widely used consistency protocols built based on log.

Application of logs to the Message Queue

Each data source can generate its own log. Here, the data source can come from various aspects, such as an event stream (page click, cache refresh reminder, database binlog change). We can store the log centrally in a cluster. The subscriber can read each record of the log according to offset, and apply its own changes according to the data and operations in each record.

In this case, logs can be understood as message queues, which can perform asynchronous decoupling and traffic limiting. Why decoupling? Because for consumers, producers, the two roles of responsibilities are very clear, is responsible for production, consumption, and don’t care who’s downstream and upstream, whether to change log of the database, an event, or for a party I don’t need to care about, I only need to pay attention to their interest in the log, and each record in the log.

We know the QPS database is certain, and the lateral expansion of the upper application can generally, 11 this request at this time if the double suddenly scenario, the database will be unable to stand, then we can introduce the message queue, each team database operation is written to the log, consumed by another application for the logging and applied to the database, And even if the database hangs, processing can continue from where the message was last sent when it is recovered (RocketMQ and Kafka both support Exactly Once semantics), it doesn’t matter if the producer’s speed is different from the consumer’s speed. The log serves as a buffer, storing all records in a log. Log writing is handled by the master node. Read requests are divided into two types: tail-read, which means that the consumption speed can keep up with the write speed. Such read requests can be directly cached. The other is the consumer behind the write request, which can be read from the slave node. In this way, the performance can be greatly improved by IO isolation and some file policies of the operating system, such as pagecache and cache prefetch.

Scalability is a very important feature in distributed systems, and any problem that can be solved by adding machines is not a problem. So how do you implement a message queue that can scale horizontally? If we have a single message queue, with the increase of the number of topics, IO, CPU and bandwidth will gradually become bottlenecks and the performance will slowly decline. Then how to optimize the performance?

  1. Topic/log sharding, in essence, the messages written by a topic are the records of the log. As the number of writes increases, the single opportunity gradually becomes the bottleneck. At this time, we can divide a single topic into multiple sub-topics and assign each topic to different machines. For those topics with a large volume of messages, we can add machines to solve the problem, while for some topics with a small volume of messages, they can be divided into the same machine or not partitioned
  2. A group commit, such as Kafka’s producer client, writes a message to a local memory queue, then aggregates the message for each partition and node, and commits the message in batches. On the server or broker side, pagecache is also written. The mode of disk flushing can be determined by the service. For example, the financial service may adopt the synchronous mode.
  3. Avoid unnecessary data copying
  4. IO isolation

conclusion

Log plays an important role in distributed system, and is the key to understand each component of distributed system. With the further understanding, we found that many distributed middleware are built based on log, such as Zookeeper, HDFS, Kafka, RocketMQ, Google Spanner and so on. Even databases such as Redis and MySQL, whose master-slave is based on log synchronization, rely on shared log system, we can implement many systems: Data synchronization between nodes, concurrent update of data sequence (consistency problem), persistence (system crash can continue to provide services through other nodes), distributed lock service and so on, I believe that slowly through practice and a lot of paper reading, I will have a deeper understanding.