Article 2 in actual

A method of communication between threads

queue:

  • Queue is thread-safe and locks are already used in its source code
  • Queue is also a type of shared variable, but it is already thread-safe. You can get out of sync with other variables.
  • Common methods:
    • from queue import Queue
    • put(self, item, block=True, timeout=None): Queues elements, blocks if the queue is full. You can set timeout
    • get(self, block=True, timeout=None): Retrieves elements from a queue. Blocks if the queue is empty. You can set timeout
    • qsize(): Returns the size of the queue
    • empty(): Checks whether it is empty
    • full(): Checks whether the value is full
    • join(): blocks until all elements in the queue are taken out and executed. Used before calling the methodtask_done()Method, otherwise the queue will remain blocked
    • task_done(): indicates that the previous queue task has completed
    • put_nowait(self, item): non-blocking mode. An exception is thrown when the queue is full
    • get_nowait(self): non-blocking mode, throws an exception if the queue is empty

condition

from threading import Condition
Copy the code
  • Condition variables for synchronization between complex threads.

  • Both the Queue and Semaphore implementations of threading use condition

  • The __enter__ and __exit__ methods are defined in Condition, so context management can be implemented using the with statement

  • Common methods:

    • Condition actually has two levels of locks:

      • Condition is locked by a default recursive lock on initialization, which is invoked in the conditionwait()Method is released, called the underlying lock
      • In the callwait()Method also assigns a lock to the current process and adds the lock to a double-ended queue. Wait to callnotify()Method is used to release the lock
      def __init__(self, lock=None) :
          if lock is None:
              lock = RLock()
          self._lock = lock
          # Export the lock's acquire() and release() methods
          self.acquire = lock.acquire
          self.release = lock.release
      
      
      def wait(self, timeout=None) :
          if not self._is_owned():
              raise RuntimeError("cannot wait on un-acquired lock")
      
          waiter = _allocate_lock()
          waiter.acquire()  Allocate a lock to hold the current process
          self._waiters.append(waiter)  Add the lock to a double-endian queue
          saved_state = self._release_save()  Release the underlying lock that was added when conditon was initialized
      
      
      def notify(self, n=1) :
          if not self._is_owned():
              raise RuntimeError("cannot notify on un-acquired lock")
          all_waiters = self._waiters
          waiters_to_notify = _deque(_islice(all_waiters, n))
          if not waiters_to_notify:
              return
          for waiter in waiters_to_notify:
              waiter.release()  # release the lock in the queue
              try:
                  all_waiters.remove(waiter)
              except ValueError:
                  pass
      Copy the code
    • Wait (self, predicate, timeout = None) :

      • Wait until notified or until a timeout occurs.

      This method releases the underlying lock, and then blocks until it is awakened by a notify() or notify_all() for the same condition variable in another thread, or until the optional timeout occurs.

      • Releases the underlying lock on the current thread, making it accessible to other threads. It also locks the current thread and blocks the rest of the current thread until notify() is received.

      • After calling wait(), wait for notify() to start.

    • Notify (self, n = 1) :

      • Wake up one or more threads waiting on this condition.

      • Release the lock from the current thread, making the lock available to the wait() method of other threads

  • The sample code

    class PersonA(threading.Thread) :
        def __init__(self,cond) :
            super().__init__(name="A")
            self.cond = cond
            
        def run(self) :
            with self.cond: Add a bottom lock to PersonA
                print('{}: Hello B, are you there? '.format(self.name))
                self.cond.notify() Select * from B where lock = 'wait'
                self.cond.wait() Release the underlying lock on A to ensure that the thread will be able to access the next notify() notification. At the same time, add A lock to A to block the running of the program
                
     class PersonB(threading.Thread) :
        def __init__(self,cond) :
            super().__init__(name="B")
            self.cond = cond
            
        def run(self) :
            with self.cond: Add an underlying lock for PersonB
                self.cond.wait() Use wait to block the following program, waiting for A notification from A while releasing the underlying lock from B
                print('{}: Hello A, I'm in '.format(self.name))
                self.cond.notify() Select * from 'WAIT' where lock = 'wait'
                
     if __name__ == '__main__':
        cond = threading.Condition()
        A = PersonA(cond)
        B = PersonB(cond)
        B.start() 
        A.start()
        # if the first call A.s tart (), the program will block, unable to continue, the reason is that, when start A notify () method has already been issued, wait while the B () method has not yet started, also can't accept to notify () notice, so the program can't continue>> A: Hello, are you there? B: Yes, I amCopy the code
    • Pay attention tostart()The order is important. Wrongstart()The sequence would block and the program could not proceed.

    The execution process is as follows

Semaphore

  • A lock used to control the number of entries
  • File reads and writes, usually read by multiple threads, and writes by only one thread
  • threading.semaphore(n):N threads can be allowed to execute simultaneously, and the portion larger than n must wait for the previous thread to be released before it can continue to execute.