“This is the 19th day of my participation in the First Challenge 2022, for more details: First Challenge 2022”.

Official Python column # 68, stop! Don’t miss this zero-based article!

Queue.Queue is a very classic consumer producer model.

In the code, we can see that queue was created with three Condition locks, one of which is all_tasks_done, which also shares the mutex. There are also two methods and the member variable unfinished_tasks, which we didn’t cover in the previous article and continue in this article.

Take a look at a demo where the queue is empty but the number of uncompleted tasks is not 0!!

Unfinished_tasks does not clear 0?

The committee has prepared the following procedure:

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# @time: 2022/2/19 12:40 am
# @Author : LeiXueWei
# @csDN /Juejin/Wechat: Lei Xuewei
# @XueWeiTag: CodingDemo
# @File : originalq_qdemo_monitor.py
# @Project : hello
import datetime
import threading
import time

import queue

q = queue.Queue()

threads = []
for i in range(5) :def operate_q() :
        time.sleep(0.5)
        tname = threading.current_thread().name
        print("%s - tname %s - before put" % (datetime.datetime.now(), tname))
        q.put(tname)
        print("%s - tname %s - q: %s" % (datetime.datetime.now(), tname, q.queue))


    def get_q() :
        time.sleep(0.5)
        tname = threading.current_thread().name
        print("%s - tname %s - before get" % (datetime.datetime.now(), tname))
        ele = q.get()
        print("%s - tname %s - get q: %s" % (datetime.datetime.now(), tname, ele))


    t = threading.Thread(target=operate_q, name='XueWei - put - + str(i + 1))
    threads.append(t)
    t2 = threading.Thread(target=get_q, name='XueWei - get - + str(i + 1))
    threads.append(t2)


def monitor() :
    while True:
        time.sleep(1)
        tname = threading.current_thread().name
        print("monitor-q: %s - q unfinished_tasks %s - queue %s " % (tname, q.unfinished_tasks,q.queue))


threading.Thread(target=monitor, name="XueWei - q - monitor").start()
for t in threads:
    t.start()
Copy the code

Except for the main thread, there are basically three groups of threads, one that puts elements into the Queue, and one that takes elements out of the Queue. There is also a group of only one thread called q-monitor, which monitors threads.

We see that the entire queue is’ hollowed out ‘at the end, without any logic problems, which is expected. But the value unfinished_tasks ends with 5. As mentioned in the previous article, this value increases without a successful call to PUT, so there is no problem.

Here is the result of a run:

Unfinished_tasks does not clear 0? Why?

First of all, it’s not a bug!

The previous article covered most of the methods, but there are still two left. If we look at the code, we’ll see that unfinished_tasks will decrease by one only if the task_done method is called.

def task_done(self) :
    with self.all_tasks_done:
        unfinished = self.unfinished_tasks - 1
        if unfinished <= 0:
            if unfinished < 0:
                raise ValueError('task_done() called too many times')
            self.all_tasks_done.notify_all()
        self.unfinished_tasks = unfinished

def join(self) :
    with self.all_tasks_done:
        while self.unfinished_tasks:
            self.all_tasks_done.wait()
Copy the code

Let’s start with the join method

The join method obtains the all_tasks_done conditional lock. This is to ensure atomicity of the all_tasks_done lock. The entire with block is accessed by only one thread at any time.

The loop then checks unfinished_tasks (the number of uncompleted tasks), and if there are any uncompleted tasks (task_done is required), the current thread will wait.

Besides task_done

Task_done also acquires the all_tasks_done conditional lock.

The all_tasks_done conditional lock was obtained, unfinished_tasks minus 1. In the process of decrement, if you find that the number of outstanding tasks is 0, notify all threads that called the JOIN to touch sleep and resume execution.

Obviously, join and task_done need to be used together. Because they implement multithreaded collaboration based on the Condition lock all_tasks_done.

Here is a code prepared by the committee:

#! /usr/bin/env python
# -*- coding: utf-8 -*-
# @time: 2022/2/19 12:40 am
# @Author : LeiXueWei
# @csDN /Juejin/Wechat: Lei Xuewei
# @XueWeiTag: CodingDemo
# @File : qdemo_producer_consumer.py
# @Project : hello
import datetime
import threading
import time

import queue

q = queue.Queue()

threads = []
for i in range(5) :def operate_q() :
        time.sleep(0.1)
        tname = threading.current_thread().name
        print("%s - tname %s - before put" % (datetime.datetime.now(), tname))
        q.put(tname)
        print("%s - tname %s - before task done" % (datetime.datetime.now(), tname))
        q.task_done()


    def get_q() :
        time.sleep(0.1)
        tname = threading.current_thread().name
        print("%s - tname %s - before join" % (datetime.datetime.now(), tname))
        q.join()
        #print(" %s after join" % tname)
        ele = q.get()
        print("%s - tname %s - get q: %s" % (datetime.datetime.now(), tname, ele))


    t = threading.Thread(target=operate_q, name='dim sum - + str(i + 1))
    threads.append(t)
    t2 = threading.Thread(target=get_q, name='run - + str(i + 1))
    threads.append(t2)


def monitor() :
    i = 0
    while i < 50:
        i += 1
        time.sleep(0.1)
        tname = threading.current_thread().name
        print("monitor-q: %s - q unfinished_tasks %s - queue %s " % (tname, q.unfinished_tasks, q.queue))


threading.Thread(target=monitor, name="XueWei - q - monitor").start()
for t in threads:
    t.start()
Copy the code

Brief explanation:

There are still three sets of threads, and the difference from the previous code in this article is that the set of threads for get called JOIN before calling GET.

So this program is when the queue has elements

The running effect is as follows:

conclusion

Queue.Queue is more complex by design than SimpleQueue, so I’m done parsing this class.

So to speak, look at the source code of this class, which is a test of the developer’s knowledge of Lock/Condition in the thread, combined. If the understanding of multithreading is not much, it is basically do not understand, this piece can go to learn the committee before the multithreading article, but also in the continuous update of the column.

And it’s clever, because if you use it wrong, the queue is designed to be deadlocked.

For those who like Python, please check out the Python Basics section or the Python Getting Started to Master Section

Continuous learning and continuous development, I am Lei Xuewei! Programming is fun. The key is to get the technology right. Welcome to wechat, like support collection!