www.cnblogs.com/lzpong/p/63…

Github.com/lzpong/thre… \

Ahem. C++11 adds the threading library, ending the standard library’s history of not supporting concurrency. However, c++ support for multithreading is still relatively low, a little more advanced usage need to be implemented, such as thread pool, semaphore, etc.. When asked about thread pool many times in interviews, the common answer is: “Manage a task queue, a thread queue, and then take one task at a time and assign it to one thread to do it, and so on.” There seems to be no problem. But when it came time to write the program, something went wrong.

Nonsense not to say, first implementation, and then wordy. (dont talk, show me ur code !)

Code implementation

1 #pragma once 2 #ifndef THREAD_POOL_H 3 #define THREAD_POOL_H 4 5 #include <vector> 6 #include <queue> 7 #include <thread> 8 #include <atomic> 9 #include <condition_variable> 10 #include <future> 11 #include <functional> 12 #include <stdexcept> 13 14 Namespace STD 15 {16 #define MAX_THREAD_NUM 256 17 18 20 class threadPool 21 {22 using Task = STD ::function<void()>; STD ::vector< STD ::thread> pool; STD ::queue<Task> tasks; 28 STD ::mutex m_lock; 30 STD ::condition_variable cv_task; 32 STD :: Atomic <bool> stoped; STD :: Atomic <int> idlThrNum; 35 36 public: 37 inline threadpool(unsigned short size = 4) :stoped{ false } 38 { 39 idlThrNum = size < 1 ? 1 : size; 40 for (size = 0; size < idlThrNum; ++size) 42 {pool.emplace_back(43 [this] 44 {// worker function 45 while(! this->stoped) 46 { 47 std::function<void()> task; 49 STD ::unique_lock< STD ::mutex> lock{this->m_lock}; // Unique_lock has advantages over lock_guard: Can unlock the lock () () and 50 this - > cv_task. Wait (lock, 51 [this] {52 return this - > stoped. The load () | |! this->tasks.empty(); 53} 54); // Wait until task 55 if (this->stoped && this->tasks.empty()) 56 return; 57 task = std::move(this->tasks.front()); This ->tasks.pop(); 59 } 60 idlThrNum--; 61 task(); 62 idlThrNum++; 63} 64} 65); 66 } 67 } 68 inline ~threadpool() 69 { 70 stoped.store(true); 71 cv_task.notify_all(); // Wake up all threads to execute 72 for (STD ::thread& thread: pool) {73 //thread.detach(); 74 if(thread.joinable()) 75 thread.join(); 76} 77} 78 79 public: 82 // There are two ways to call a class member, 83 // One is to use bind:  .commit(std::bind(&Dog::sayHello, &dog)); Commit (STD ::mem_fn(&dog ::sayHello), &dog) 85 template<class F, class... Args> 86 auto commit(F&& f, Args&&... args) ->std::future<decltype(f(args...) )> 87 { 88 if (stoped.load()) // stop == true ?? 89 throw std::runtime_error("commit on ThreadPool is stopped."); 90 91 using RetType = decltype(f(args...) ); // typename std::result_of<F(Args...) >::type, 92 auto task = STD ::make_shared< STD ::packaged_task<RetType()> >(93 STD ::bind(STD ::forward< f >(f), std::forward<Args>(args)...) 94); // wtf ! 95 std::future<RetType> future = task->get_future(); 96 {// Add task to queue 97 STD ::lock_guard< STD ::mutex> lock{m_lock}; Lock_guard is a stack encapsulation class for mutex. When constructed, lock(), Unlock () 98 tasks.emplace(99 [task]() 100 {// push(task {... }) 101 (*task)(); 102} 103); 104 } 105 cv_task.notify_one(); // Wake up a thread to execute 106 107 return future; Int idlCount() {return idlThrNum; 112 113}}; 114 115 } 116 117 #endifCopy the code

Not much code, hundreds of lines of code to complete the thread pool, and look at the commit, ha, not fixed parameters, unlimited number of parameters! This is thanks to variable parameter templates.

How to use it?

Take a look at the following code (expand to view)

1 #include "threadpool.h" 2 #include <iostream> 3 4 void fun1(int slp) 5 { 6 printf(" hello, fun1 ! %d\n" ,std::this_thread::get_id()); 7 if (slp>0) { 8 printf(" ======= fun1 sleep %d ========= %d\n",slp, std::this_thread::get_id()); 9 std::this_thread::sleep_for(std::chrono::milliseconds(slp)); 10 } 11 } 12 13 struct gfun { 14 int operator()(int n) { 15 printf("%d hello, gfun ! %d\n" ,n, std::this_thread::get_id() ); 16 return 42; 17}}; 19 20 class A { 21 public: 22 static int Afun(int n = 0) {// The function must be static to use the thread pool. " << std::this_thread::get_id() << std::endl; 24 return n; 25 } 26 27 static std::string Bfun(int n, std::string str, char c) { 28 std::cout << n << " hello, Bfun ! "<< str.c_str() <<" " << (int)c <<" " << std::this_thread::get_id() << std::endl; 29 return str; 30}} 31; 32 33 int main() 34 try { 35 std::threadpool executor{ 50 }; 36 A a; 38 STD ::future<void> ff = executor.com MIT (fun1,0); 38 std::future<int> fg = executor.commit(gfun{},0); 39 std::future<int> gg = executor.commit(a.Afun, 9999); 40 STD :: Future < STD ::string> gh = executor.com MIT (A::Bfun, 9998,"mult args", 123); 41 std::future<std::string> fh = executor.commit([]()->std::string { std::cout << "hello, fh ! " << std::this_thread::get_id() << std::endl; return "hello,fh ret !" ; }); 42 43 std::cout << " ======= sleep ========= " << std::this_thread::get_id() << std::endl; 44 std::this_thread::sleep_for(std::chrono::microseconds(900)); 45 46 for (int i = 0; i < 50; i++) { 47 executor.commit(fun1,i*100 ); 48 } 49 std::cout << " ======= commit all ========= " << std::this_thread::get_id()<< " idlsize="<<executor.idlCount() << std::endl; 50 51 std::cout << " ======= sleep ========= " << std::this_thread::get_id() << std::endl; 52 std::this_thread::sleep_for(std::chrono::seconds(3)); 53 54 ff.get(); / / call the get () to obtain the return value will finish waiting thread execution, to obtain the return value of 55 STD: : cout < < fg. The get () < < "" < < fh. The get (). C_str () < <" "< < STD: : this_thread: : get_id () < <  std::endl; 56 57 std::cout << " ======= sleep ========= " << std::this_thread::get_id() << std::endl; 58 std::this_thread::sleep_for(std::chrono::seconds(3)); 59 60 STD: : cout < < "= = = = = = = fun1, 55 = = = = = = = = =" < < STD: : this_thread: : get_id () < < STD: : endl; 61 executor.com MIT (fun1, 55). The get (); 62 63 STD ::cout << "end... " << std::this_thread::get_id() << std::endl; 64 65 66 std::threadpool pool(4); 67 std::vector< std::future<int> > results; 68 69 for (int i = 0; i < 8; ++i) { 70 results.emplace_back( 71 pool.commit([i] { 72 std::cout << "hello " << i << std::endl; 73 std::this_thread::sleep_for(std::chrono::seconds(1)); 74 std::cout << "world " << i << std::endl; 75 return i*i; 76}) 77); 78 } 79 std::cout << " ======= commit all2 ========= " << std::this_thread::get_id() << std::endl; 80 81 for (auto && result : results) 82 std::cout << result.get() << ' '; 83 std::cout << std::endl; 84 return 0; 85 } 86 catch (std::exception& e) { 87 std::cout << "some unhappy happened... " << std::this_thread::get_id() << e.what() << std::endl; 88}Copy the code

 

In order to avoid suspicion, first carry out a copyright statement: code is me “write”, but the idea comes from the Internet, especially this thread pool implementation (basic copy of the implementation, coupled with the implementation and explanation of the students, good things worth copy! Then it was changed to be more concise).

Realize the principle of

Pick up where you left off. “Manage one task queue, one thread queue, and then take one task at a time and assign it to one thread to do it, and so on.” What’s wrong with this idea? Thread pooling usually requires multiple threads, so taking a task and assigning it to a thread and then reassigning it to another thread is not supported at the language level: threads in common languages execute a fixed task function, and the thread ends when it completes execution (at least in c++). So how do you allocate tasks and threads?

Let each thread execute the scheduling function: loop through a task and execute it.

Idea is not very good! The uniqueness of the thread function is guaranteed, and the task is executed by multiple threads.

Even if you understand idea, the code needs to be explained in detail.

  1. One thread pool, one task queue, should be fine;
  2. Task queues are typical producer-consumer models that require at least two tools: one MUTEX plus one condition variable, or one MUtex plus one semaphore. Mutex is essentially a lock that guarantees the mutual exclusivity of adding and removing tasks (fetching), and a condition variable that guarantees synchronization of fetching tasks: a queue of empty threads should wait (blocking);
  3. An atomic

    is an atomic type, so its load()/store() operations are atomic, so there is no need to add mutex.

C++ language details

Just because you understand the principles doesn’t mean you can write a program. Many c++11 quirks have been used above, which are briefly described below.

  1. Using Task = function

    is a type alias that simplifies the use of typedefs. Function

    can be thought of as a function type and accepts any function whose prototype is void(), function object, or anonymous function. Void () means no arguments and no return value.
    ()>
    ()>
  2. pool.emplace_back([this]{… }) and pool. The push_back ([this] {… }) function the same, only the former performance is better;
  3. pool.emplace_back([this]{… }) construct a thread object, execute function is ramda anonymous function;
  4. All objects are initialized in {} mode instead of () mode, because the style is not consistent and prone to error;
  5. Anonymous functions: [this]{… } No more. Cv_task. wait(lock,[this]{cv_task.wait(lock,[this]{cv_task.wait(lock,[this]{… }) to block the thread;
  6. Delctype (expr) is used to infer the type of expr. Similar to auto, delctype(expr) is a type placeholder that occupies the position of a type. Auto f(A, B B) -> decltype(A + B) f(A A, B B) ! That’s how c++ rules it!
  7. The commit method is a bit weird! You can take as many arguments as you want. The first argument is f, followed by arguments to the function f! Mutable parameter templates are one of the highlights of c++11. As for why Arg… And arg… Because that’s how the rules work!
  8. Commit directly uses the smart call stdCall function, but there are two ways to implement calling class members. One is to use bind:. Commit (STD ::bind(&Dog::sayHello, &Dog)); One is to call mem_fn:.commit(STD ::mem_fn(&dog ::sayHello), &dog);
  9. Make_shared is used to construct the shared_ptr smart pointer. Shared_ptr

    p = make_shared

    (4) then *p == 4 The nice thing about smart Pointers is that they automatically delete!

  10. The bind function, which takes f and some arguments, returns an anonymous function that can bear bear of currying. For example, bind(add, 4) can implement functions like add4.
  11. The forward() function is similar to the move() function, which rvalues arguments. How can I put it? A reference type that does not change the type originally passed (lvalue or lvalue, rvalue or rvalue);
  12. Packaged_task is the enclosing class of the task function. Get_future is used to retrieve the future, which can then be used to retrieve the return value of the function (future.get()). Packaged_task itself can be called like a function ();
  13. Queue is the queue class, front() gets the header element, pop() removes the header element; Back () gets the tail element, push() adds the tail element;
  14. Lock (); unlock();
  15. condition_variable cv; Condition variable, need to be used with unique_lock; The advantage of unique_lock over lock_guard is that you can unlock() and lock() at any time. Before CV. Wait (), hold the mutex. Wait itself unlocks () the mutex and reholds the mutex if the condition is met.
  16. At the end of the thread pool destruct,join() can wait for all tasks to finish, which is safe!

Git

The code is stored in git, and the latest code is available here: github.com/lzpong/thre…

\