Title: C Thread Pool Categories:

  • [C++]

    tags:
  • [Programming Language] Date: 2021/06/28

<div align = ‘right’>

<div align = ‘right’> </div

C the thread pool

1. Preparation

View thread related interface function:

Thread creation

int pthread_create(pthread_t thread, const pthread_attr_t attr,void (start_routine) (void ), void arg);

Parameter description:

1. Thread refers to the address where the newly created thread’s ID is stored

2. The attr parameter is used to customize various thread attributes. For the time being, it can be set to NULL to create threads with default attributes.

3. Start_routine is a pointer to a function whose return type is void and whose formal parameter is void. The newly created thread starts at the address of start_routine. This function has only one untyped pointer argument, arg. If you need to pass more than one argument to start_routine, you need to put these arguments into a structure and pass in the address of the structure as the arg argument.

The return value:

Thread creation returns 0 on success and other values on failure

Thread to exit

void pthread_exit(void *retval);

Parameter description:

RetVal is an untyped pointer that can be accessed by other threads in the process by calling pthread_join.

Thread waiting

int pthread_join(pthread_t thread, void **retval);

Parameter description:

The thread that calls this function will block until the specified thread calls pthread_exit. If you are not interested in the thread’s return value, you can set retVal to NULL. In this case, a call to pthread_join waits for the specified thread to terminate, but does not obtain the thread’s termination status.

Thread to cancel

Int pthread_cancel (pthread_t thread);

Parameter description:

Thread is the ID of the thread

Sets the cancle signal for the thread

Int pthread_setcancelState (int state, int * oldState);

PTHREAD_CANCEL_ENABLE: Threads can be canceled. This is the default cancel state for all new threads, including the original thread. The cancellable type of a thread determines when the cancellable thread responds to a cancellation request.

PTHREAD_CANCEL_DISABLE: The thread cannot be canceled. If a cancellation request is received, it will be blocked until it can be unenabled.

Clean up the thread

void pthread_cleanup_push(void (*rtn)(void *), void *arg);

Parameter description:

Void (* RTN)(void *): Thread cleanup function

Arguments passed by arg

Activate all waiting threads

pthread_cond_broadcast(pthread_cond_t *cond);

To view the mutex related interface functions:

Create a mutex

int pthread_mutex_init(pthread_mutex_t restrict mutex,const pthread_mutexattr_t restrict attr);

Parameter description:

1. Before using the mutex, we need to define the mutex (global variable). Define the mutex object as: pthread_mutex_t lock;

2. A mutex is a pointer to a mutex that needs to be initialized.

3. The attr parameter specifies the property of the newly created mutex. If the attr parameter is NULL, the default mutex property is used, which is a fast mutex.

Destroy the mutex

int pthread_mutex_destroy(pthread_mutex_t *mutex);

Parameter description:

A mutex is a mutex that needs to be destroyed.

The mutex

int pthread_mutex_lock(pthread_mutex_t *mutex);

Parameter description:

A mutex is a mutex that requires a lock.

Solving the mutex

int pthread_mutex_unlock(pthread_mutex_t *mute);

Parameter description:

A mutex is a mutex that needs to be unlocked.

To view the related interface function of the condition variable:

A condition variable is a mechanism for synchronizing global variables shared between threads. It mainly includes two actions: a thread suspends while waiting for the condition of the condition variable to hold; The other thread makes the condition true (gives the conditional true signal). To prevent contention, the use of condition variables is always combined with a mutex.

Initialize the condition variable

Int pthread_cond_init(pthread_cond_t cond, pthread_condattr_t cond_attr);

Parameter description:

1. Cond is the initialized condition variable and is a pointer to the structure pthread_cond_t;

Cond_attr is a pointer to the structure pthread_condattr_t;

Destroy condition variable

Int pthread_cond_destroy (pthread_cond_t * cond);

Parameter description:

Cond is the condition variable for destruction;

Wait for the condition variable to hold

int pthread_cond_wait(pthread_cond_t cond, pthread_mutex_t mutex)

Activates a thread waiting on the condition variable

int pthread_cond_signal(pthread_cond_t *__cond);

Activate one of the waiting threads in queue order if there are multiple waiting threads

2. Create the data structure

Task structure

struct task { void *(*task)(void *arg); Void *arg; void *arg; */ struct task *next; /* the next task address */};

Thread pool structure

typedef struct thread_pool { pthread_mutex_t lock; pthread_cond_t cond; struct task *task_list; Pthread_t *tids; pthread_t *tids; pthread_t *tids; /* Pointer to the thread ID */ unsigned waiting_tasks; /* Active_threads */ unsigned active_threads; /* Number of threads in the pool */ bool shutdown; }thread_pool ();}thread_pool ();

3. Thread pool functions

Initialize the thread pool

/* * @description: create thread pool* @param {thread_pool*} pool: create thread pool* * @description: create thread pool* @param {thread_pool*} pool: create thread pool* * @description: create thread pool* * @description: create thread pool* * @description: * bool init_pool(thread_pool *pool, unsigned int threads_number) {pthread_mutex_init(&pool->lock, pthread_mutex_init) NULL); Pthread_cond_init (&pool->cond, NULL); */ pool->shutdown = false; pool->task_list = malloc(sizeof(struct task)); pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS); if(pool->task_list == NULL || pool->tids == NULL) { perror("allocate memory error"); return false; } pool->task_list->next = NULL; pool->waiting_tasks = 0; pool->active_threads = threads_number; int i; for(i=0; i<pool->active_threads; i++) { if(pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) ! = 0) { perror("create threads error"); return false; } } return true; }

Add a task to the thread pool

Void *(void *arg)} (*task): void *(void *arg)} (*task): void *(void *arg)} Passed parameter * @return: */ bool add_task(thread_pool *pool, void *(*task)(void *arg), void *arg) { struct task *new_task = malloc(sizeof(struct task)); if(new_task == NULL) { perror("allocate memory error"); return false; } new_task->task = task; new_task->arg = arg; new_task->next = NULL; pthread_mutex_lock(&pool->lock); if(pool->waiting_tasks >= MAX_WAITING_TASKS) { pthread_mutex_unlock(&pool->lock); fprintf(stderr, "too many tasks.\n"); free(new_task); return false; } struct task *tmp = pool->task_list; while(tmp->next ! = NULL) tmp = tmp->next; tmp->next = new_task; pool->waiting_tasks++; pthread_mutex_unlock(&pool->lock); pthread_cond_signal(&pool->cond); return true; }

Add a thread to the thread pool

/* * @description: add threads to the thread pool* @param {thread_pool*} pool: add threads to the thread pool* @return: Int add_thread(thread_pool *pool, unsigned int additional_threads) {if(additional_threads == 0) return 0; unsigned int total_threads = pool->active_threads + additional_threads; int i, actual_increment = 0; for(i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++) { if(pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) ! = 0) { perror("add threads error"); if(actual_increment == 0) return -1; break; } actual_increment++; } pool->active_threads += actual_increment; return actual_increment; }

The callback handler for the thread

/* * @description: Callback handler * @param {void *} arg: Passed parameter * @return: Void handler(void *arg) {pthread_mutex_unlock((pthread_mutex_t *)arg); } /* * @description: Callback handler for the thread * @param {void *} arg: Passed parameter * @return: */ void * loop (void *arg) {thread_pool *pool = (thread_pool *)arg; struct task *p; while(1) { pthread_cleanup_push(handler, (void *)&pool->lock); pthread_mutex_lock(&pool->lock); while(pool->waiting_tasks == 0 && ! pool->shutdown) { pthread_cond_wait(&pool->cond, &pool->lock); } if(pool->waiting_tasks == 0 && pool->shutdown == true) { pthread_mutex_unlock(&pool->lock); pthread_exit(NULL); } p = pool->task_list->next; pool->task_list->next = p->next; pool->waiting_tasks--; pthread_mutex_unlock(&pool->lock); pthread_cleanup_pop(0); pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); (p->task)(p->arg); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); free(p); } pthread_exit(NULL); }

Cancel the thread from the thread pool

/* * @description: * @param {thread_pool*} pool: {nsigned int} removing_threads: * @return: -1 */ int remove_thread(thread_pool *pool) unsigned int removing_threads) { if(removing_threads == 0) return pool->active_threads; int remain_threads = pool->active_threads - removing_threads; remain_threads = remain_threads>0 ? remain_threads:1; int i; for(i=pool->active_threads-1; i>remain_threads-1; i--) { errno = pthread_cancel(pool->tids[i]); if(errno ! = 0) break; } if(i == pool->active_threads-1) return -1; else { pool->active_threads = i+1; return i+1; }}

Destroy thread pool

/* * @description: Thread pool* @param {thread_pool*} pool: Thread pool* @return: Return true */ bool destroy_pool(thread_pool *pool) {pool->shutdown = true; pthread_cond_broadcast(&pool->cond); int i; for(i=0; i<pool->active_threads; i++) { errno = pthread_join(pool->tids[i], NULL); if(errno ! = 0) { printf("join tids[%d] error: %s\n", i, strerror(errno)); } else printf("[%u] is joined\n", (unsigned)pool->tids[i]); } free(pool->task_list); free(pool->tids); free(pool); return true; }

Complete code

Due to the length of the code is not posted out Baidu Cloud, the need to reply to the WeChat public number [thread] can get a link to download

Use: Linux to enter the folder make to generate the executable file test can be executed

If you think the article is good, you can give it a triplet.

I’m working overtime. See you next time