pthread

View the current system thread library

$ getconf GNU_LIBPTHREAD_VERSION
NPTL 2.17
Copy the code

The POSIX Thread Library (NPTL) enables the Linux kernel to run programs written using the POSIX Thread standard very efficiently.

Prior to kernel 2.6, scheduling entities were processes, and the kernel didn’t really support threads. This can be done with a clone() system call, which creates a copy of the calling process that, unlike fork(), shares exactly the address space of the calling process. It is through this system call that Linux Threads are supported at the kernel level. (Many previous threading implementations were entirely user-centric, and the kernel was unaware of threads.) Unfortunately, there is quite a bit of this approach that does not follow POSIX standards, especially in signal processing, scheduling, interprocess communication primitives, and so on.

Obviously, to improve LinuxThread, the kernel must support it and the thread library must be rewritten. To fulfill this requirement, two competing projects began: NGTP(Next Generation POSIX Threads) started by IBM, and NPTL by Redhat. IBM abandoned NGTP in mid-2003, around the time Redhat released the original NPTL.

Originally released in Redhat Linux 9, NPTL is now supported in kernel 2.6 starting with RHEL3 and is fully part of the GNU C library.

Prepare a file a.txt to be written.

$ echo -e "This is a.txt\n" >> a.txt
$ cat a.txt 
This is a.txt

$ 
Copy the code

A semaphore

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/prctl.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
// Protect file semaphores
sem_t mysem;

// The new function to be called by thread
void *writeFile(void *arg)
{
    printf("another thread... \n");
    char *filename = (char *)arg;
    // Set the thread name
    prctl(PR_SET_NAME,"ANOTHER_THREAD");
    // Wait for mySEM to be non-zero, atomic operation minus one
    sem_wait(&mysem);
    int fd = open(filename,O_RDWR|O_APPEND);
    // Wait for 3s, simulate other unrelated operations
    // The concept of granularity appears, if the three seconds do not use fd, should not read so early
    // Wait for the file protected by the mysem semaphore in the while loop of main
    // It prevents main from reading files.
    sleep(3);
    // Write a sentence
    char *buf="thread1 add something... \n";
    write(fd,buf,strlen(buf));
    printf("fd %d is going to close.\n",fd);
    close(fd);
    // The semaphore +1 is not zero, which means that a.txt is available to write other things
    sem_post(&mysem);
    // Exit the thread with the parameters returned to pthread_join
    pthread_exit((void *)"run");
}

int main(a)
{
    pthread_t another_thread;
    char *fname = "./a.txt";
	// Initialize the semaphore, set to 1 to indicate that the file is available and sem_wait does not block
    sem_init(&mysem,0.1);
    // Create thread, parameter none, thread entry function is writeFile, parameter is filename
    pthread_create(&another_thread,NULL,writeFile,(void *)fname);

    int fd = - 1;
    int i = 0;
    int ret;
    while(true)
    {   
        sleep(1);
        //sem_wait(&mysem); 
        // non-blocking sem_wait, so I can print a sentence to prompt the user
        // Instead of just unresponsive blocking
        ret = sem_trywait(&mysem);
        // Wait is waiting for the semaphore not to be 0 if it is
        // Returns -1 with the error variable errno set to EAGAIN
        if(ret==- 1 && errno==EAGAIN)
        {
            printf("main func wait %d sec... \n",++i);
            continue;
        }
        // The file is available
        fd = open(fname,O_RDWR|O_APPEND);
        char *buf="man func add something... \n";
        write(fd,buf,strlen(buf));
        close(fd);
        break;
    }
    puts("main func while loop exit... \n");
    char *msg;
    // Wait for the thread to return the thread argument
    pthread_join(another_thread,(void **)&msg);
    printf("got thread1: %s\n",msg);
}
Copy the code

The print result is as follows:

$g++ sem_file.cc -pthread
$ ./a.out 
another thread...
main func wait 1 sec...
main func wait 2 sec...
main func wait 3 sec...
fd 3 is going to close.
main func while loop exit...

got thread1: run
$ cat a.txt
This is a.txt

thread1 add something...
man func add something...
Copy the code

Check the thread name (the last line), and sure enough, it is already the thread name we set.

[hqinglau@centos ~]$ ps -Taux | grep a.out
hqinglau 18220  0.0  0.0  23036   864 pts/0    Sl+  22:24   0:00 ./a.out
hqinglau 18233  0.0  0.0 112816   992 pts/1    S+   22:24   0:00 grep --color=auto a.out
[hqinglau@centos ~]$ cat /proc/18220/task/18220/stat
18220 (a.out) S ...
[hqinglau@centos ~]$ cat /proc/18220/task/18221/stat
18221 (ANOTHER_THREAD) S ...
Copy the code

Mutex and condition variables are used much the same way. Let’s focus on thread pools.

The thread pool

A basic idea is to maintain a task queue and assign tasks to a thread, either through polling or lock contention.

Here is an example of lock contention.

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>
#include <sys/prctl.h>
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include <string>
#include <iostream>
#include <queue>
using namespace std;


template <typename T>
class ThreadPool
{
public:
    ThreadPool(int n_threads):threadnum(n_threads),stop(false)
    {
        assert(n_threads>0);
        mutex = PTHREAD_MUTEX_INITIALIZER;
        threads = new pthread_t[n_threads];
        for (int i=0; i<n_threads; i++) {pthread_create(threads+i,NULL,worker,this);
            if (pthread_detach(threads[i]))
            {
                delete[] threads;
                throw std::exception(a); ~}}}ThreadPool()
    {
        delete []threads;
    }
    bool addTask(T d)
    {
        pthread_mutex_lock(&mutex);
        taskQueue.push(d);
        pthread_mutex_unlock(&mutex);
    }
private:
    static void *worker(void *arg);
    void loop(a)
    {
        while(! stop) {pthread_mutex_lock(&mutex);
            if(taskQueue.empty())
            {
                pthread_mutex_unlock(&mutex);
                continue;
            }
            // This is usually an interface, i.e. a process function
            // After pop, call the queue object's process function directly
            // Here is a simple printf
            T data = taskQueue.front(a); taskQueue.pop(a);//display(data);
            int fd = open("/home/hqinglau/vscodeFile/server/pthread/a.txt",O_RDWR|O_APPEND);
            write(fd,((string)data).c_str(),((string)data).size());
            close(fd);
            pthread_mutex_unlock(&mutex); }}private:
    int threadnum;
    pthread_t *threads;
    queue<T> taskQueue;
    pthread_mutex_t mutex;
    bool stop;
};

template <typename T>
void * ThreadPool<T>::worker(void *arg)
{
    ThreadPool *p = (ThreadPool *)arg;
    p->loop(a); }int main(a)
{
    file_mutex = PTHREAD_MUTEX_INITIALIZER;
    ThreadPool<string> *pool = new ThreadPool<string>(3);
    char buf[10];
    string data;
    for(int i=0; i<10; i++) {sprintf(buf,"data %d",i);
        data = buf;
        string d=buf;
        pool->addTask(d);
    }
    while(1);
    //delete pool;
    return 0;
}
Copy the code

Implementation: Write files (not necessary, just for example)

[hqinglau@centos pthread]$ ./a.out 
^C
[hqinglau@centos pthread]$ cat a.txt
data 0data 1data 2data 3data 4data 5data 6data 7data 8data 9
Copy the code