“This is the 18th day of my participation in the First Challenge 2022. For details: First Challenge 2022.”

Official Python column # 67, stop! Don’t miss this zero-based article!

The display of the queue library and code parsing are updated over.

Today we’re going to look at the core Queue class, which is an important class in the Queue module, so be sure to read it.

Queue.Queue (queue without Simple in front)

Below is the queue library core class source code, the committee removed some comments.

A look at the code, vaguely feeling is not simple. (So follow the class discussed at the end of this column.)

class Queue:

    def __init__(self, maxsize=0) :
        self.maxsize = maxsize
        self._init(maxsize)
        self.mutex = threading.Lock()
        self.not_empty = threading.Condition(self.mutex)
        self.not_full = threading.Condition(self.mutex)
        self.all_tasks_done = threading.Condition(self.mutex)
        self.unfinished_tasks = 0

    def task_done(self) :

        with self.all_tasks_done:
            unfinished = self.unfinished_tasks - 1
            if unfinished <= 0:
                if unfinished < 0:
                    raise ValueError('task_done() called too many times')
                self.all_tasks_done.notify_all()
            self.unfinished_tasks = unfinished

    def join(self) :
        with self.all_tasks_done:
            while self.unfinished_tasks:
                self.all_tasks_done.wait()

    def qsize(self) :
        with self.mutex:
            return self._qsize()

    def empty(self) :
        with self.mutex:
            return not self._qsize()

    def full(self) :
        with self.mutex:
            return 0 < self.maxsize <= self._qsize()

    def put(self, item, block=True, timeout=None) :
        with self.not_full:
            if self.maxsize > 0:
                if not block:
                    if self._qsize() >= self.maxsize:
                        raise Full
                elif timeout is None:
                    while self._qsize() >= self.maxsize:
                        self.not_full.wait()
                elif timeout < 0:
                    raise ValueError("'timeout' must be a non-negative number")
                else:
                    endtime = time() + timeout
                    while self._qsize() >= self.maxsize:
                        remaining = endtime - time()
                        if remaining <= 0.0:
                            raise Full
                        self.not_full.wait(remaining)
            self._put(item)
            self.unfinished_tasks += 1
            self.not_empty.notify()

    def get(self, block=True, timeout=None) :
        with self.not_empty:
            if not block:
                if not self._qsize():
                    raise Empty
            elif timeout is None:
                while not self._qsize():
                    self.not_empty.wait()
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            else:
                endtime = time() + timeout
                while not self._qsize():
                    remaining = endtime - time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)
            item = self._get()
            self.not_full.notify()
            return item

    def put_nowait(self, item) :
        return self.put(item, block=False)

    def get_nowait(self) :
        return self.get(block=False)


    def _init(self, maxsize) :
        self.queue = deque()

    def _qsize(self) :
        return len(self.queue)

    def _put(self, item) :
        self.queue.append(item)

    def _get(self) :
        return self.queue.popleft()
Copy the code

Read the source code must be patient, this simplified version of the code is a hundred lines.

With the previous SimpleQueue parsing, this article is much easier to read.

At first glance, those functions that start with an underscore are obviously overridden for subclasses to override, which previously shared priority queues and first in last out. As we can see, there are corresponding _init/_qsize/_put/_get methods.

Internal implementation uses a DEQUE

The constructor is as follows, using a double-endian queue as an element container to store elements.

Mutex is also used, and there are three conditions that share the same lock: the mutex mutex.

So with these locks, they’re mutually exclusive.

The following is the constructor source, the committee to __init__ and _init two methods together, looks convenient point:

    def __init__(self, maxsize=0):
        self.maxsize = maxsize
        self.queue = deque()#self._init(maxsize)
        self.mutex = threading.Lock()
        self.not_empty = threading.Condition(self.mutex)
        self.not_full = threading.Condition(self.mutex)
        self.all_tasks_done = threading.Condition(self.mutex)
        self.unfinished_tasks = 0
Copy the code

Ok, let’s move on to the get method:

def get(self, block=True, timeout=None): with self.not_empty: if not block: if not self._qsize(): raise Empty elif timeout is None: while not self._qsize(): self.not_empty.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while not self._qsize(): Remaining = endtime-time () if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) item = self.queue.popleft() self.not_full.notify() return itemCopy the code

Get source code above readers can look again.

Get method parsing

First try to acquire a non-empty lock if there are other threads in get. The current thread will wait because the condition variable has the same mutex behind it.

Secondly, a non-null conditional lock is obtained, and there are many judgments in it, four branches. Judge in turn.

Finally, the element is fetched, and the not_FULL conditional lock is released. (This is called by the PUT method. If maxsize is set, the not_full lock limits excess PUT operations, more on that later.)

The above three steps are the general logic of the GET method. Let’s look at the four branches:

  • The first step is to check whether the queue is empty or not. If the queue is empty or not, the second step is to execute the last step mentioned above.

  • The second check is to see if timeout is given, so the subtext is block=True, so there is no timeout. If the data container inside the queue is empty, the current thread will wait. If a non-empty jump judgment is performed, perform the last step mentioned above.

  • The third level determines whether timeout is valid or not. If timeout is less than 0, a numerical exception is directly thrown and there is no follow-up.

  • Block =True, timeout= valid time. In this case, a timed loop is entered, and if there is data in the queue at this point, the loop is broken and the last step is performed. Otherwise, the loop is executed and the queue remains empty within a given timeout, and an empty queue exception is thrown. If one or more elements are put into the queue within the time limit, the loop is broken. Perform the last step mentioned above.

Next up is the put method

def put(self, item, block=True, timeout=None): with self.not_full: if self.maxsize > 0: if not block: if self._qsize() >= self.maxsize: raise Full elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: Endtime = time() + timeout while self._qsize() >= self.maxsize: remaining = endtime-time () if remaining <= 0.0: raise Full self.not_full.wait(remaining) self.queue.append(item) self.unfinished_tasks += 1 self.not_empty.notify()Copy the code

This code is also logically complex.

The not_FULL conditional lock is first acquired, and if the queue is full at this point, put enters the wait. Until not_FULL is notified, that is, the thread that notifies not_FULL when get last fetches the element. (Note that this is not a notify_all, because a notify_all gets an element without getting it once, and notify_all is a notify_all overdeposit, which is a serious bug.)

Then a maxsize judgment is entered, and if you don’t set the value >0, which is the default way to create the queue, then things are simple. Go straight to the last step, otherwise it’s 4 more judgments.

Finally, append elements to the queue and set unfinished_tasks to 1. Unfinished_task increases by 1 each time a new element is added. The other is to notify a thread trying to get that the queue is not empty and can fetch elements.

The above three steps are the rough logic of the PUT method. Let’s look at the four branches:

This is actually more symmetric with the GET method.

  • If the queue is full, throw an exception.

  • The second check is to see if timeout is given, so the subtext is block=True, so there is no timeout. If the queue is full, the current thread will wait. If not, jump out and perform the last step mentioned above.

  • The third level determines whether timeout is valid or not. If timeout is less than 0, a numerical exception is directly thrown and there is no follow-up.

  • Block =True, timeout= valid time. In this case, a timed loop is entered, and if the queue is not completely full at this point, the loop is broken and the last step is performed. Otherwise, execute the loop, and if the queue is still full within a given timeout, then throw a full exception. If one or more elements can fit within a time limit, the loop is broken. Perform the last step mentioned above.

So get and put understand one, and the other is actually quite easy to understand.

It’s a little long. I’ll stop there.

conclusion

Queue.Queue is more complex than SimpleQueue by design. This article only explores its get and PUT attributes, as well as several conditional locks, but there are several methods and unfinished_tasks that we haven’t talked about yet.

And it’s the parent of the priority queue, the first in, last out queue, and it’s kind of the backbone of the whole built-in queue, so HOPEFULLY you’ll spend some time reading about it.

For those who like Python, please check out the Python Basics section or the Python Getting Started to Master Section

Continuous learning and continuous development, I am Lei Xuewei! Programming is fun. The key is to get the technology right. Welcome to wechat, like support collection!