This is the 24th day of my participation in Gwen Challenge

background

Threadper-message Pattern is a Pattern in which a Thread is assigned to perform work for each command or request. It implements the delegate side of the message and the execution side of the message in two different threads. The threading pattern mainly consists of three parts:

  • The Request participant (principal) is the sender of the message or the requester of the command
  • The Host actor, which accepts requests for messages, is responsible for assigning a worker thread to each message.
  • The Worker participant, the thread that performs the Request participant’s task, is started by the Host participant.

Because after invoking a method normally, you have to wait for the method to complete before continuing to perform the next operation, with threads, you don’t have to wait for the specific task to complete and can immediately go back to continue to perform the next operation.

In the Thread-per-message Pattern, a Thread is generated and started for each request, and the starting of the Thread takes a lot of time. Therefore, Worker threads are proposed to reuse the started threads.

Thread pool schema definition

Worker threads are commonly referred to as Thread pools. This mode starts a certain number of worker threads beforehand. When there is no request for work, all worker threads will wait for new requests to come, and once a job arrives, they will immediately wake up a thread from the thread pool to execute the task, and then continue to wait for the arrival of the task pool request in the thread pool.

  • Task pool: mainly stores the collection of accepted requests. It can be used to buffer the received requests. The size can be set to indicate the maximum number of requests that can be accepted at the same time. This task pool is primarily accessible to thread pools.
  • Thread pool: This is a collection of worker threads that can be sized to provide concurrent processing. For the size of the thread pool, a certain number of threads can be generated in advance, and the number of threads can be dynamically increased or decreased according to the actual situation. The size of the thread pool is not as large as possible, and thread switching can be time consuming.

Roles in thread pool mode

The data structure that holds the pool, which can be an array or a collection. A Vector is usually used in a collection class, which is thread-safe.

Roles of Worker Threads:

  • Client Participant that sends a Request
  • Channel actor, which caches requests for requests, initializes the starter thread, and allocates worker threads
  • Worker participant, the Worker thread that executes the Request
  • Request the participants’

Waiting for the task pool to be non-empty inside Worker threads is called forward waiting. The way in which the Channel thread provides the Worker thread to determine that the task pool is not empty is called reverse waiting.

Application instance

Synchronous blocks are used for processing, and vectors store client requests. We use a Vector for storage, again adding requests to the last position of each collection and removing requests from the beginning. In Channel, there are caching request method and processing request method, which use producer and consumer pattern to process storage request, and use forward wait to judge the non-empty state of task pool.

This example, can be used for reference in the network ServerSocket processing user request mode, has good scalability and practicability.

The Channel participants

public class Channel {
    public final static int THREAD_COUNT=4;
    public static void main(String[] args) {

      // Note: Map is not a subclass of Collection, but is a sibling of java.util.*
      Vector pool=new Vector();
      // The worker thread is initially allocated a certain number of quotas
      WorkerThread[] workers=new WorkerThread[THREAD_COUNT];

      // Initializes the start worker thread
      for(int i = 0; i < workers.length; i++)
      {
          workers[i] = new WorkerThread(pool);
          workers[i].start();
      }

      // Accept the new task and store it in the Vector
      Object task = new Object();// The simulated task entity class
      // Omit specific work here
      ServerSocket. Accept accepts a Socket to wake up the thread

      // When a specific request is reached
      synchronized(pool)
      {
          pool.add(pool.size(), task);
          pool.notifyAll();// Notify all threads waiting in the pool wait set to wake up a thread for processing
      }
      // Note that the task pool request, as well as the notification thread, can be implemented inside the worker thread
      // Define the method as static, use synchronized blocks in the method body, and share the thread pool as static

      // The following step may or may not depend on the actual situation
      // Cancel the waiting thread
      for(int i = 0; i < workers.length; i++) { workers[i].interrupt(); }}}Copy the code

The main functions of this are as follows

  • Buffering the client request thread (utilizing the producer and consumer pattern)
  • The thread that stores client requests
  • Initialization starts a number of threads
  • To actively wake up some threads in the wait set of the task pool to execute the task

Define two collections, one to hold client requests, using a Vector. One is thread storage, which is the number of threads in the thread pool. Vector is thread-safe. It implements collections and lists. It implements growable arrays of objects. Like an array, it contains components that can be accessed using integer indexes. But the size of a Vector can grow or shrink as needed to accommodate adding or removing items after creating the Vector.

Collection mainly includes list related collections, set related collections and Queue related collections.

The worker thread

public class WorkerThread extends Thread {
    private List pool;// Task request pool
    private static int fileCompressed = 0;// Shared by all instances

    public WorkerThread(List pool)
    {
          this.pool = pool;
    }

    Use static synchronized as an entire synchronized class method. Only one synchronized method can be used at a time
    private static synchronized void incrementFilesCompressed(a)
    {
        fileCompressed++;
    }

    public void run(a)
    {
        while(true)
        {
            synchronized(pool)
            {
                // Make use of the multithreaded design pattern
                // A system of Guarded Suspension Pattern shall be kept when the pool is not empty
                while(pool.isEmpty())
                {
                    try{
                        pool.wait();// Enter pool's wait set and release pool lock
                    }catch(InterruptedException e)
                    {
                    }
                }                                 
                pool.remove(0);// Get the tasks in the task pool and convert them
            }
            // Here's what the thread does}}}Copy the code

Pool variables are accessed through shared mutex while infinite loop waits are guaranteed. When the thread is awakened, it needs to re-acquire the pool lock and do the rest of the synchronized block again; When it is not empty, continue to judge whether it is empty. If it is not empty, jump out of the loop. A task must be removed from the task pool to execute, adding it from the end and removing it from the beginning

conclusion

Thread pool mode is widely used in practical programming, such as database connection pool, socket long connection and so on. The limited and expensive resources of threads are saved by having a certain number of worker threads to execute continuously submitted tasks. This mode has the following benefits:

  • Offset the overhead of thread creation and improve responsiveness
  • Encapsulates worker thread lifecycle management
  • Reduce the overhead of thread destruction

But to make good use of the thread pool pattern, consider the following:

  • The selection of work queue: there are usually three queue modes. BoundedQueue does not limit the number of tasks waiting to run in the thread pool, but the actual number of tasks that can be held in the work queue depends on the resource usage of the task itself. UnboundQueue work queues limit the number of important people waiting in a thread pool and limit resource consumption to a certain extent. Direct transfer queue (SymchrinousQueue) does not apply to the non-blocking queuing method of the work queue when a task is submitted inside the buffer space, so there is no waiting queue and a new thread will process the failed task.
  • The thread pool size minor parameter: Too large is a waste of resources and too large to make full use of resources, so the size of the thread pool depends on the nature of the task being processed by the thread pool, system resources, and scarce resources used by the task lock.
  • Thread pool monitoring: The size of the thread pool, the capacity of the work queue, and thread idle time limits are all familiar debugging processes that need to be monitored by programs to facilitate debugging. The ThreadPoolExecutor class provides a way to monitor these processes.
  • Thread leak: Worker threads in a thread pool can terminate unexpectedly, reducing the number of worker threads actually available in the thread pool. This occurs because the exception handling of the run method on the thread object did not catch RuntimeException and Error, causing the run method to return unexpectedly, causing the corresponding thread to terminate unexpectedly. So inject to catch the corresponding exception. However, there is another possible case to be aware of: if a thread needs to request an external resource and there is no time limit on the external resource request, the thread may actually have leaked.
  • Reliability and thread pool saturation strategy: Work queue selection has no way to handle changes in thread size requirements, so a thread saturation strategy is required.
  • Deadlock: A thread requesting a similar resource may form a deadlock.
  • Thread pool idle thread cleanup: Threads that have not performed a task for too long are a waste of system resources and require processing code.