1. Basic principles of thread pools

2. Thread pool implementation in C language

2.1 Data structures of thread pools

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include <unistd.h>

typedef struct {
    void *(*function)(void *);  /* Function pointer, callback function */
    void *arg;                  /* The argument to the above function */
} threadpool_task_t;            /* Subthread task structure */

typedef struct threadpool_s {
    pthread_mutex_t lock;            /* Used to lock the structure */
    pthread_mutex_t thread_counter;  /* A lock that records the number of busy threads
    pthread_cond_t queue_not_full;   /* When the queue is full, the thread that added the task blocks, waiting for the condition variable */
    pthread_cond_t queue_not_empty;  /* Notifies threads waiting for a task */ when the task queue is not empty

    pthread_t *workers_tid;          /* Store the tid of each thread in the thread pool
    pthread_t manager_tid;           /* Storage management thread tid*/
    threadpool_task_t *task_queue;   /* Task queue */

    int min_thread_num;                /* Minimum number of threads in the thread pool */
    int max_thread_num;                /* Maximum number of threads in the thread pool */
    int live_thread_num;               /* Number of threads currently alive */
    int busy_thread_num;               /* Number of busy threads */
    int wait_exit_thr_num;           /* Number of threads to destroy */

    int queue_front;                 /*task_queue */
    int queue_rear;                  /*task_queue */
    int queue_size;                  /*task_queue Number of actual tasks */
    int queue_max_size;              /*task_queue Maximum number of tasks */

    int shutdown;                    /* Flag bit, thread pool use state, true or false*/
} threadpool_t;
Copy the code

2.2 Creating a thread Pool

/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * create a thread pool the function name: * threadpool_create () * and number: * min_thread_num: specifies the minimum number of threads in a thread pool * max_thread_num: specifies the maximum number of threads in a thread pool * queue_max_size: specifies the maximum length of a task queue * * 1) Thread pool basic parameters * 2) worker thread * 3) Management thread * 4) Task queue * 5) mutex, condition variables * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * /

threadpool_t *threadpool_create(int min_thread_num, int max_thread_num, int queue_max_size)
{
    int i;
    threadpool_t *pool = NULL;
    
    do {
        pool = (threadpool_t *)malloc(sizeof(threadpool_t));
        if (pool == NULL) {
            printf("malloc threadpool fail\n");
            goto err_1;
        }
        
        pool->min_thread_num  = min_thread_num;
        pool->max_thread_num  = max_thread_num;
        pool->busy_thread_num = 0;
        pool->live_thread_num = min_thread_num;
        pool->queue_size 	  = 0;
        pool->queue_max_size  = queue_max_size;
        pool->queue_front 	  = 0;
        pool->queue_rear 	  = 0;
        pool->shutdown        = 0;
        
        /* Allocate space for worker thread data based on the maximum number of threads, and clear it to zero */
        pool->workers_tid = (pthread_t *)malloc(sizeof(pthread_t) * max_thread_num);
        if (pool->workers_tid == NULL) {
            printf("malloc workers_tid fail\n");
            goto err_2;
        }
        memset(pool->workers_tid, 0.sizeof(pthread_t) * max_thread_num);
        
        /* Queue space */
        pool->task_queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t) * queue_max_size);
        if (pool->task_queue == NULL) {
            printf("malloc task_queue fail\n");
            goto err_3;
        }
        
        /* Initialize the mutex, condition variable */
        if (pthread_mutex_init(&(pool->lock), NULL) != 0			|| 
            pthread_mutex_init(&(pool->thread_counter), NULL) != 0	|| 
            pthread_cond_init(&(pool->queue_not_empty), NULL) != 0	|| 
            pthread_cond_init(&(pool->queue_not_full), NULL)  != 0) {
                printf("init the lock or cond fail\n");
                goto err_4;
        }
        
        Min_thread_num Number of work threads */
        for (i = 0; i < min_thread_num; i++) {
            pthread_create(&(pool->workers_tid[i]), NULL, workers_thread,(void *)pool); /*pool indicates the current thread pool */
            printf("start thread  0x%x...\n", (unsigned int)pool->workers_tid[i]);
        }
		/* Create manager thread */
        pthread_create(&(pool->manager_tid), NULL, manager_thread, (void *)pool);
        
    } while(0);
     
	return pool;
    //threadpool_free(pool); /* Free poll storage */
err_4:
	/* Need to destroy mutex and condition variables */
	free(pool->task_queue);	
err_3:
	free(pool->workers_tid);
err_2:
	free(pool);
err_1:	
    return NULL;
}
Copy the code

2.3 Managing thread handlers

#define DEFAULT_TIME 			60
#define MIN_WAIT_TASK_NUM  		10
#define DEFAULT_THREAD_VERY  	5

/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * manager thread * function name: * manager_thread () * and number: * dynamically resize the threadpool based on the number of tasks * capacity: * 1) Obtain existing threads in the current thread pool and accumulated tasks in the task queue * 2) Dynamically adjust the number of threads in the thread pool according to demand * Missing points: * use too many mutexes and condition variables, and the efficiency on questionable * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * /
void *manager_thread(void *threadpool)
{
    threadpool_t *pool = (threadpool_t *)threadpool;
    int i;
    
    while(! pool->shutdown) { sleep(DEFAULT_TIME);/* Timed thread pool management */
        
        pthread_mutex_lock(&(pool->lock));
        int queue_size = pool->queue_size;
        int live_thread_num = pool->live_thread_num;/* Number of threads in the thread pool */
        pthread_mutex_unlock(&(pool->lock));
        
        pthread_mutex_lock(&(pool->thread_counter));
        int busy_thread_num = pool->busy_thread_num;
        pthread_mutex_unlock(&(pool->thread_counter));
        
        /* Create a new thread algorithm where the number of tasks is greater than the minimum number of thread pools, * and the number of surviving threads is less than the maximum number of threads */
        if (queue_size >= MIN_WAIT_TASK_NUM && live_thread_num < pool->max_thread_num){
            pthread_mutex_lock(&(pool->lock));
            int add = 0;
            
            Add DEFAULT_THREAD_VERY threads */ at a time
            for (i = 0; i < pool->max_thread_num && add < DEFAULT_THREAD_VERY
                    && pool->live_thread_num < pool->max_thread_num; i++) {
                if (pool->workers_tid[i] == 0| |! is_thread_alive(pool->workers_tid[i])) { pthread_create(&(pool->workers_tid[i]),NULL, workers_thread,(void *)pool);
                    add++;
                    pool->live_thread_num++;
                }
            }
            
            pthread_mutex_unlock(&(pool->lock));
        }
        
        /* Destroy the redundant idle thread algorithm, busy thread X2 is less than the number of alive threads and * the number of alive threads is greater than the minimum number of threads */
        if (busy_thread_num * 2 < live_thread_num && live_thread_num > pool->min_thread_num) {
            /* Destroy DEFAULT_THREAD_VERY threads at a time */
            pthread_mutex_lock(&(pool->lock));
            pool->wait_exit_thr_num = DEFAULT_THREAD_VERY;
            pthread_mutex_unlock(&(pool->lock));
            
            for (i = 0; i < DEFAULT_THREAD_VERY; i++) {
                /* Notifies idle threads that they will terminate themselves */pthread_cond_signal(&(pool->queue_not_empty)); }}}return NULL;
}
Copy the code

2.4 Worker thread handling functions

/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * work thread processing function function name: * workers_thread () * and number: * threadpool: contains all parameters in the threadpool. * 1) Sleep waiting for assigned task * 2) Whether to terminate the thread * 3) Take the task from the task queue and change the busy state of the thread * 4) Execute the task * 5) restore to idle state * missing point: * use too many mutexes and condition variables, and the efficiency on questionable * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * /

void *workers_thread(void *threadpool)
{
    threadpool_t *pool = (threadpool_t *)thr           eadpool;
    threadpool_task_t task;

    while(1) {
        /* Lock must be taken to wait on condition variable */
        /* If a thread has been created, wait for a task to appear in the task queue. Otherwise, block and wait for a task to appear in the task queue
        pthread_mutex_lock(&(pool->lock));
        
        /* queue_size == 0 indicates that there is no task. Wait to block the condition variable. If there is a task, skip the while */
        while((pool->queue_size == 0) && (! pool->shutdown)) {printf("Workers'thread ID 0x%x is waiting\n", (unsigned int)pthread_self());
            pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));
            
            /* Clear the specified number of idle threads. If the number of threads to be terminated is greater than 0, terminate the thread */
            if (pool->wait_exit_thr_num > 0) { 
                /* If the number of threads in the pool is greater than the minimum, the current thread can be terminated */
                if (pool->live_thread_num > pool->min_thread_num) {
                    printf("Workers'thread ID 0x%x is exiting\n", (unsigned int)pthread_self());
                    pool->live_thread_num--;
					pool->wait_exit_thr_num--;
                    pthread_mutex_unlock(&(pool->lock));
                    pthread_exit(NULL); }}}/* If the thread pool is closed, exit processing */
        if (pool->shutdown == 1) {
			printf("Workers'thread ID 0x%x is exiting\n", (unsigned int)pthread_self());
            pthread_mutex_unlock(&(pool->lock));
            pthread_exit(NULL);
        }
        
        /* Fetching a task from a task queue is a dequeuing operation */
        task.function = pool->task_queue[pool->queue_front].function;
        task.arg = pool->task_queue[pool->queue_front].arg;

        /* Get out of line, simulate ring queue */
        pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;
        pool->queue_size--;
        
        /* Notification that new tasks can be added */
        pthread_cond_broadcast(&(pool->queue_not_full));
        
        /* Release the thread pool lock immediately after the task is retrieved */
        pthread_mutex_unlock(&(pool->lock));
        
		/* Set thread busy */
        pthread_mutex_lock(&(pool->thread_counter));       /* Number of busy threads variable lock */
        pool->busy_thread_num++;                           /* Number of busy threads +1*/
        pthread_mutex_unlock(&(pool->thread_counter));
		
		/* Execute the task */
        (*(task.function))(task.arg);                  
        
        /* Switch from busy to idle */
        pthread_mutex_lock(&(pool->thread_counter));
        pool->busy_thread_num--;
        pthread_mutex_unlock(&(pool->thread_counter));
    }
    
    return NULL;
}
Copy the code

2.5 Adding a Task

/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * to add a task queue of the thread pool task * function name: Threadpool_add () * Number of arguments: * pool: pool to be used * function: task execution function * arg: task execution parameters * Functions: * Add a task to the pool task queue * Contents: * 1) Whether the task queue is full * 2) Add a task * 3) Wake up threads sleeping on the task queue * Lack of points: * use too many mutexes and condition variables, and the efficiency on questionable * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * /
int threadpool_add(threadpool_t *pool, void *function(void *arg), void *arg)
{
    pthread_mutex_lock(&(pool->lock));
    
    /* If true, the queue is full, and wait */
    while((pool->queue_size == pool->queue_max_size) && (! pool->shutdown)) { pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock)); }if (pool->shutdown) {
        pthread_mutex_unlock(&(pool->lock));
		return 0;
    }
    
    /* Clear the argument arg*/ to the callback function called by the worker thread
    if(pool->task_queue[pool->queue_rear].arg ! =NULL) {
        free(pool->task_queue[pool->queue_rear].arg);
        pool->task_queue[pool->queue_rear].arg = NULL;
    }
    /* Add a task to the task queue */
    pool->task_queue[pool->queue_rear].function = function;
    pool->task_queue[pool->queue_rear].arg = arg;
    pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;
    pool->queue_size++;
    
    /* After the task is added, the queue is not empty, and the thread pool is awakened to wait for the task */
    pthread_cond_signal(&(pool->queue_not_empty));
    pthread_mutex_unlock(&(pool->lock));
    
    return 0;
}
Copy the code

2.6 Thread Pool destruction

/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * destruction of the thread pool * function name: * threadpool_distory () * and number: * threadpool: threadpool to be destroyed * * 1) send tasks, destroying threads recycling thread resources * * 2) deficiency: * no * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * /
int threadpool_distory(threadpool_t *pool)
{
    int i;
    if (pool == NULL) {
        return - 1;
    }

    pool->shutdown = 1;
    
    /* First destroy the admin thread */
    pthread_join(pool->manager_tid, NULL);
    
    for (i = 0; i < pool->live_thread_num; i++) {/* Notify all idle threads */
        pthread_cond_broadcast(&(pool->queue_not_empty));
    }
    for (i = 0; i < pool->live_thread_num; i++) {/* Reclaim all administrator thread resources */
        pthread_join(pool->workers_tid[i], NULL);
    }
    threadpool_free(pool);
    
    return 0;
}
/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * release thread pool resources function name: * threadpool_free () * and number: * ThreadPool: The pool of threads to be released * * 1) Release the task queue * 2) Destroy the mutex and condition variables * 3) Release the thread pool * missing points: * no * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * /
int threadpool_free(threadpool_t *pool)
{
    if (pool == NULL) {
        printf("thread pool is already free\n");
        return - 1;
    }
    
    if (pool->task_queue) {
        free(pool->task_queue);
    }
    
    if (pool->workers_tid) {
        free(pool->workers_tid);
        pthread_mutex_lock(&(pool->lock));
        pthread_mutex_destroy(&(pool->lock));
        pthread_mutex_lock(&(pool->thread_counter));
        pthread_mutex_destroy(&(pool->thread_counter));
        pthread_cond_destroy(&(pool->queue_not_empty));
        pthread_cond_destroy(&(pool->queue_not_full));
    }
    free(pool);
    pool = NULL;
    
    return 0;
}
Copy the code

2.7 Other Interfaces

/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * judge whether the current thread is * function name: * is_thread_alive() * Number of parameters: * tid: PID of the thread to be queried * To judge * * 1) send zero signal missing points: * no * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * /
int is_thread_alive(pthread_t tid)
{
    int kill_rc = pthread_kill(tid, 0); /* Signal 0 to test whether the thread is alive */
    if (kill_rc == ESRCH) {
        return 0;
    }   
    return 1;
}
Copy the code

2.9 test the demo

/ * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * the following for testing the demo * function name: * the process () * and number: * arg: Task parameters * uses: * Task processing function * contents: * lack of points: 1) to perform a task * * no * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * /

/* Thread pool thread, simulate processing business */
void process(void *arg)
{
    printf("thread 0x%x working on task %d\n", (unsigned int)pthread_self(),
        *(int *)arg);
    sleep(1);
    printf("task %d is end\n", * (int *)arg);
}

int main(int argc, char **argv)
{
    int num[20], i;
    /*threadpool_t *threadpool_create(int min_thread_num, int max_thread_num, int queue_max_size)*/
    
    /* Create a thread pool with a minimum of 3 threads, a maximum of 100 threads, and a maximum of 100 queues
    threadpool_t *thp = threadpool_create(3.100.100);
    if (thp == NULL) {
        printf("threadpool_create fail\n");
        return 0;
    }
    printf("pool init\n");
    
    for (i = 0; i < 20; i++) {
        num[i] = i;
        printf("add task %d\n", i);
        /* Add tasks */ to the thread pool
        threadpool_add(thp, (void *)&process, (void *)&num[i]);
    }
    /* Wait for the child thread to complete */
	
	sleep(30);
	
    threadpool_distory(thp);
    
    return 0;
}
Copy the code

Code implementation reference article: