♚ \

The author yunlong, a senior operation and maintenance development engineer, is responsible for the design and development of game system configuration management platform. Currently, he focuses on the development of new CMDB system, and usually also pays attention to operation and maintenance automation, DevOPS, Python development and other technologies. This article is posted on the wechat account of netease Game Operation & Maintenance (NTES Gameops), affiliated with the Ntes Game Operation & Infrastructure Department.

Email: [email protected]

background

CMDB system inside the machine the data will be divided into many types, such as the system service data, hardware, data assets related data, offline calculation data, and so on, these data can be considered to be streaming data, database present complete information of a machine, a corresponding station machines combined the results of the flow data.

When merging data streams, we adopt the mechanism of multi-process and multi-thread to improve processing efficiency, but at the same time, we also encounter the problem of data inconsistency caused by multiple threads reading and writing the information of a machine at the same time.

Take a machine as an example, where D represents a certain data flow for the machine. Suppose there are N data sources, ideally, the final data presented by a machine should be D1 + D2 +… + DN, as shown below:

Figure 1- Single thread merging data

If the Merge above is a single-thread operation, the result in the database is correct. However, if the Merge becomes multi-thread, that is, multiple threads are simultaneously reading and writing the Machine data in the above picture, data inconsistency will inevitably occur, as shown in the following figure:

Figure 2- Multithreading merging data

Assume that the original data of a machine (machine in the figure) in the database is D0, and the processing process in the figure above is as follows:

  1. At time T1, data D1 and D2 of two data sources arrive at the data processing layer respectively. The main process allocates thread Merge1 to process D1 and Merge2 to process D2, and both of them simultaneously (assuming it is still T1) obtain the original data D0 from the database
  2. At t2, Merge1 merges the data of D0 and D1 and saves the merged data in the database. The data in the database becomes D0 + D1
  3. At t3, Merge2 merges the data of D0 and D2 and saves the merged data in the database, which becomes D0 + D2
  4. From T1 to T3, the final data in the database becomes D0 + D2, and the data in the data source D1 disappears, causing data inconsistency.

Plan to explore

The problem listed above is caused by multiple threads simultaneously reading and writing to a shared data. As long as we find a solution to make the access to the shared data synchronous, we can solve the problem. When a thread or process has accessed the data, other processes or threads must wait for the access to complete before they can have access to the shared data (entering the critical section). The simplest way to do this is to add a synchronization lock.

The implementation of locks, depending on the application implementation architecture, may be of the following types:

  • If your handler is single-process and multi-threaded, you can use the Threading module’s Lock object to restrict synchronized access to shared variables in Python to achieve thread-safety.
  • In Python, the Lock object of Multiprocessing can be used to handle single-machine multi-process situations.
  • In the case of a multi-machine, multi-process deployment, a third-party component (storage lock object) is required to implement a distributed synchronous lock.

CMDB system is currently multi-machine multi-process multi-thread processing mechanism, so in line with the third way.

Distributed lock implementation

At present, the mainstream distributed lock implementation methods are as follows:

  • Based on databases, such as mysql
  • Based on caching, such as Redis
  • Based on ZooKeeper

Each implementation has its own strengths and weaknesses. After comprehensive consideration, we finally decided to use Redis for the following reasons:

  • Redis is based on memory operation, access speed is faster than database, under high concurrency, after locking performance does not degrade too much
  • Redis can set TTL for key values
  • Redis is simple to use and low overall implementation cost

Distributed locks implemented using Redis also need to meet the following conditions:

  1. Only one thread can hold the lock at a time, and the other threads must wait until the lock is released
  2. The operation of the lock must satisfy atomicity
  3. Deadlocks do not occur, such as when a thread that has acquired a lock suddenly exits unexpectedly before releasing it, causing other threads to loop around waiting for the lock to be released
  4. Lock addition and release must be set by the same thread

Distributed locks keep data consistent

On the basis of Figure 2, we add a layer of lock between Data process and Database. We add a lock_key in Redis as the identifier of the lock. The flow chart is as follows:

Figure 3- Merging data using distributed locks

Assuming that the original data of a machine (machine in the figure) in the database is D0, the processing flow in the figure above becomes:

  1. At time T1, data d1 and D2 of two data sources reach the data processing layer at the same time. The main process allocates thread Merge1 to process D1 and thread Merge2 to process D2, and both try to obtain the lock from Redis at the same time
  2. At t2, Merge1 successfully obtains the lock, and meanwhile loads machine’s original data D0 from the database. Merge2 waits for Merge1 to release the lock in a loop
  3. At t3, Merge1 finishes merging the data, stores the merged data D0 + D1 in the database, and finally releases the lock
  4. At t4, Merge2 obtains the lock and loads machine’s data D0 + D1 from the database at the same time
  5. At t5, Merge2 finishes merging data, stores the merged data D0 + D1 + D2 to the database, and finally releases the lock

From the above you can see actually not difficult also to keep data consistent principle, does not use a key to make multiple threads on the same machine of data reading and writing are synchronous, but in the process of implementation, tend to ignore the distributed lock has to have a certain condition, the extreme, or there will be a data inconsistency problem.

The implementation process

In combination with the above three lock conditions, we will give a few implementation methods to observe if any of the conditions are not met, whether the result of test_key meets our expectations. The same test case will be used throughout the implementation. As follows:

# test.py def increase(redis, lock, key): Sleep (0.1) value += 1; sleep(0.1) value += 1 Redis.set (key, value) thread_name = threading.current_thread(). Redis = RedisCli(REDIS_CACHE_HOST_LIST, REDIS_CACHE_MASTER_NAME) lock = RedisLock(redis) key = 'test_key' thread_count = 10 redis.delete(key) for i in xrange(thread_count): thread = threading.Thread(target=increase, args=(redis, lock, key)) thread.start()Copy the code

We have enabled multi-threading to increment the test_key value in redis. Ideally, test_key should be equal to the number of threads. For example, if 10 threads are used, test_key should end up being 10.

Mode 1: lock operation is not atomic

In this version, when thread A’s get(key) value is null, the set key value is 1 and returns. This means that thread A has acquired the lock and can continue to perform other operations. Otherwise, thread A must continue to acquire the lock until the key value is null again.

The code is as follows:

class RedisLock(object): def __init__(self, rediscli): self.rediscli = rediscli def get_lock_key(self, key): lock_key = "lock_%s" % key return lock_key def get_lock(self, key): lock_key = self.get_lock_key(key) while True: value = self.rediscli.get(lock_key) if not value: Self.rediscli. set(lock_key, '1') return True time.sleep(0.01) def del_lock(self, key, new_expire_time): lock_key = self.get_lock_key(key) return self.rediscli.delete(lock_key)Copy the code

Execute the test script and get the following results:

# python test.py
Thread-1 1
Thread-5 2
Thread-2 2
Thread-6 3
Thread-7 3
Thread-4 3
Thread-9 4
Thread-8 5
Thread-10 5 
Thread-3 5
Copy the code

When you look at it, multiple threads at the same time produce the same output. Get (key) and set(key, value) are not atomic. Thread A finds that get(key) and set(key, value) are null, so it sets (key, value) again. However, at the moment before the set is completed, thread B gets a null value when it happens to get(key), and then obtains the lock smoothly, resulting in data being modified by two or more threads at the same time, and finally inconsistent, as shown in Figure 2.

Method 2: use setNx to achieve

In view of the above version because the command is not atomic operation caused by two or more threads at the same time to acquire the lock problem, this version is changed to use redis setnx command to query and set the lock operation, setnx is set if not exists, Value is set when the key does not exist and returns 1. If the key already exists, nothing is done and 0 is returned.

The code improvements are as follows:

def get_lock(self, key):
    lock_key = self.get_lock_key(key)
    while True:
        value = self.rediscli.setnx(lock_key, 1)
        if value:
            return True
        time.sleep(0.01)
Copy the code

Test results:

Thread-1 1
Thread-4 2
Thread-2 3
Thread-3 4
Thread-7 5
Thread-6 6
Thread-5 7
Thread-8 8
Thread-9 9
Thread-10 10
Copy the code

The result is correct, but there is still A problem if you are satisfied with this. For example, suppose thread A acquired the lock, but for some abnormal reason, the thread crashed and did not release the lock. Let’s slightly alter the increase function in our test case to simulate a thread exiting due to an exception before releasing the lock.

The code is as follows:

def increase(redis, lock, key): thread_name = threading.current_thread().name lock_value = lock.get_lock(key) value = redis.get(key) if not value: Sleep (0.1) value = int(value) + 1 redis. Set (key, value) print thread_name, If thread_name == 'thread-2 ': print' thread-2 crash... ' import sys sys.exit(1) lock.del_lock(key, lock_value)Copy the code

Test results:

Thread-2 3
Thread-2 crash..
Thread-7 waiting..
Thread-3 waiting..
 Thread-5 waiting..
Thread-4 waiting..
Thread-9 waiting..
Thread-6 waiting..
Thread-10 waiting..
Copy the code

When thread 2 crashes, the subsequent thread cannot acquire the lock, so it has been waiting for the lock. If the data is multithreaded, for example, one thread is opened for each incoming data, the accumulation of threads will gradually increase, and eventually the system may crash.

The thread that produced the lock is unable to release the lock due to an abnormal exit, so we may have to find another way to release the lock. Since we use Redis to implement distributed locks, why not use redis TTL mechanism, add the lock expiration time, not can solve the above problem?

But if this is the case, use redis expire to set the lock expiration time:

value = self.rediscli.setnx(lock_key, '1')
if value:
    self.rediscli.expire(lock_key, 5)
Copy the code

The set command has been supported with nX and EXPIRE since Redis 2.6.12.

The improvements are as follows:

def get_lock(self, key, timeout=3): lock_key = self.get_lock_key(key) while True: Set (lock_key, '1', nx=True, ex=timeout) if value: return True time.sleep(0.01)Copy the code

The test results are as follows:

Thread-1 1
Thread-9 2
Thread-6 3
Thread-2 4
Thread-4 5
Thread-5 6
Thread-8 7
Thread-3 8
Thread-7 9
Thread-10 10
Copy the code

Simulate thread crash:

Thread-1 1
Thread-2 2
Thread-2 crash..
Thread-10 3
Thread-7 4
Thread-4 5
Thread-8 6
Thread-3 7
Thread-9 8
Thread-6 9
Thread-5 10
Copy the code

The result is correct, thread 2 crashes, and other threads wait until the lock expires. (This is not a good demonstration, but if you are interested, you can try it yourself.)

At this point, it looks like you’ve solved the data inconsistency problem, but while you’re happy, think about what else might happen. For example, suppose that process A’s logic has not been processed, but the lock is automatically released due to the expiration time, and then thread B acquires the lock and begins to process process B’s logic, and then process process A’s logic, then delete process B’s lock? That’s the next question.

Method three: lock generation and deletion must be the same thread

Let’s change the test case to the following:

def increase(redis, lock, key): Thread_name = threading.current_thread(). Name # set the lock expiration time to 2s lock_value = lock.get_lock(key, thread_name, thread_name) timeout=2) value = redis.get(key) if not value: Time. Sleep (2.5) value = int(value) + 1 print thread_name, value redis.set(key, value) lock.del_lock(key, lock_value)Copy the code

In the above example, we let the thread execute longer than the lock expiration time, causing the lock to be released automatically when it expires.

Test results:

Thread-1 1
Thread-3 1
Thread-2 2
Thread-9 2
Thread-5 3
Thread-7 3
Thread-6 4
Thread-4 4
Thread-8 5
Thread-10 5
Copy the code

Can be seen from the above results, as each thread’s execution time is greater than the expiration date of the lock, when thread task haven’t performed, lock is released automatically, making the next thread got a lock and then on the next thread lock is a thread deleted or is performed automatically release (depending on the thread of execution time and the release of the lock time), The same data is then modified by two or more threads at the same time, resulting in inconsistent data.

We draw the flow chart in chronological order with four threads as follows:

It can be seen that locks are deleted by mistake at 2.5s and 5s.

Since this phenomenon is caused by the error of deleting someone else’s lock due to the expiration of the lock, we will follow this idea and force threads to delete only their own locks. If so, a unique identifier would have to be added to each thread’s lock. Each time we add a lock, we set the lock_key to 1. Neither the key nor the value is unique. If we make the key unique to each thread, then N (equal to the total number of threads) keys will be generated in a distributed system. From the perspective of intuitiveness and maintainability, this is not desirable, so we can only start with value. We can see each thread to a unique identifier, i.e. the thread ID, if combined with process of PID, and the machine’s IP, can constitute a unique identifier of a thread lock, if you still worry about is not the only, again hit a timestamp, as a result, our final version becomes a distributed lock the following:

class RedisLock(object): def __init__(self, rediscli): Self. Rediscli = rediscli.master # IP is fetched at instantiation time, DNS self.ip = socket.gethostByName (socket.gethostname()) self.pid = os.getpid() def gen_lock_key(self, key): lock_key = "lock_%s" % key return lock_key def gen_unique_value(self): thread_name = threading.current_thread().name time_now = time.time() unique_value = "{0}-{1}-{2}-{3}".format(self.ip, self.pid, thread_name, time_now) return unique_value def get(self, key, timeout=3): lock_key = self.gen_lock_key(key) unique_value = self.gen_unique_value() logger.info("unique value %s" % unique_value) while True: value = self.rediscli.set(lock_key, unique_value, nx=True, ex=timeout) if value: Sleep (0.1) def delete(self, key, value): lock_key = self.gen_lock_key(key) old_value = self.rediscli.get(lock_key) if old_value == value: return self.rediscli.delete(lock_key)Copy the code

Test results:

Thread-1 1
Thread-2 2
Thread-4 3
Thread-5 4
Thread-10 5
Thread-3 6
Thread-9 7
Thread-6 8
Thread-8 9
Thread-7 10
Copy the code

After testing the lock expiration, test case:

def increase(redis, lock, key): thread_name = threading.current_thread().name lock_value = lock.get_lock(key, timeout=1) value = redis.get(key) if not value: Time. Sleep (3) value = int(value) + 1 print thread_name, value redis.set(key, value) lock.del_lock(key, lock_value)Copy the code

Test results:

Thread-1 1
Thread-2 1
Thread-5 1
Thread-6 2
Thread-8 2
Thread-10 2
Thread-9 3
Thread-3 3
Thread-4 3
Thread-7 4
Copy the code

As can be seen from the above, the problem has not been solved. For what reason? The uniqueness of the value we set above can only ensure that the thread will not delete locks generated by other threads by mistake, and then A series of locks will be deleted by mistake. For example, A deletes the lock of B, and B deletes the lock of C after executing the lock. Using the expiration mechanism of Redis, as long as the service processing time is longer than the lock expiration time, there is no good way to avoid the lock expiration caused by other threads to occupy the lock at the same time. Therefore, you need to be familiar with the service execution time to set the lock expiration time properly.

If the value of the lock is equal to the value of the lock, then the lock is deleted. If the value of the lock is equal to the value of the lock, then the lock is deleted. At the moment of judgment, the lock has expired and is occupied by another thread, so the last step of deletion may cause the lock to be deleted by mistake. You can use the official recommended Lua script to ensure atomicity:

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end
Copy the code

However, as long as the lock expiration time is set reasonably enough, this problem can be ignored, it can also be said that the probability of this extreme situation is extremely small. After all, suddenly inserting a script into our elegant Python code doesn’t look pythonic.

conclusion

Above, we use Redis to achieve a distributed synchronization lock to ensure data consistency. Its characteristics are:

  • If mutual exclusion is met, only one thread can acquire the lock at a time
  • The TTL of Redis is used to ensure that deadlocks will not occur, but at the same time, it will also bring about the problem of multi-threading simultaneously occupying the lock due to the expiration of the lock. We need to set the expiration time of the lock reasonably to avoid
  • The uniqueness of locks is used to ensure that locks are not deleted by mistake

In the above scenario, we assume that the Redis server is a single cluster and highly available, ignoring the following issues: If the redis master node fails at some time and a slave node becomes the master node in the cluster, the lock on the original master node may not be synchronized to the slave node in time, resulting in other threads acquiring the lock at the same time. For this problem, you can refer to the official Redis redLock algorithm, but unfortunately, this algorithm does not solve the lock expiration problem well.

Recruitment: a big data company in Chengdu is recruiting development engineers, treatment interview, location chengdu High-tech Zone Duhui Road

Job Responsibilities:

1. Responsible for technical architecture planning of big data cloud platform and design and development of core components;

2. Responsible for the realization of the core algorithm of the big data platform;

3. Responsible for product performance tuning;

4. Responsible for writing technical documents.

Job Requirements:

1. Bachelor degree or above, major in computer related;

2. Familiar with Hadoop, Spark, Hbase, Hive and other framework components, and have a deep understanding of distributed computing principles;

3. Familiar with Java, Python, Scala;

4. Familiar with Linux system, at least proficient in shell or Python, able to write task scheduling scripts;

5. Strong learning ability and logical thinking ability, able to solve problems independently;

6. Worked in big data system research and development for 3 consecutive years, and participated as the implementation manager of a big data production project based on Hadoop ecological environment as the main member;

7. Strong communication and coordination ability, document writing ability and pressure tolerance ability.

If you are interested in the above positions, please send your resume to [email protected] and directly push it to the person in charge with the subject of email: “Apply” + Chengdu + name

Hot recommended for your Python program encryption in Python development timer hardcore | in Python to send his girlfriend a eggs With the Pandas recommend Python library realize the MySQL database, speaking, reading and writing in Chinese community’s public \ several service class

Long press the qr code above the scan code or click below to read the original text

Free to become a community registered member, members can enjoy more rights and interests