C++开发学习——线程池
线程池是指一种预先创建一组线程的机制,这些线程在程序启动时就已经创建好,等待执行任务。当有新的任务需要执行时,线程池会从集合中分配一个空闲线程来执行该任务。
使用线程池可以提高性能,控制并发量,简化线程管理,常用于服务器应用,高性能计算与异步任务处理。
参考视频https://www.bilibili.com/video/BV1wmpoemEG5/?spm_id_from=333.337.search-card.all.click
线程池类定义
线程池类的定义:
1class ThreadPool {2public:3 //构造函数4 ThreadPool(int threadnums);5
6 //任务7 template <typename F, typename ...Arg>8 auto enques(F&& f, Arg&&... arg) -> std::future<typename std::result_of<F(Arg...)>::type>;9
10 //析构函数11 ~ThreadPool();12
13private:14 void worker(); //线程的执行内容15 bool isstop; //线程池本身的标识5 collapsed lines
16 std::condition_variable cv; //条件17 std::mutex mtx; //互斥锁18 std::vector<std::thread> workers; //线程集合19 std::queue<std::function<void()>> myque; //任务队列20};当创建线程池的时候会调用构造函数,当删除线程池的时候会调用析构函数,我们先来讲一下线程池创建、线程创建与执行任务的逻辑关系。
主程序开始,在主程序中调用构造函数初始化线程池,在初始化的过程中创建若干线程,线程存储在workers数组中。随后将任务存储在myque任务队列中,当线程检测到任务队列不为空时,会随机挑选一个线程执行任务。主程序结束,调用析构函数删除线程池。
接下来给出每个函数的具体内容
ThreadPool构造函数
1ThreadPool::ThreadPool(int threadnums) :isstop(false) {2 for (size_t i = 0; i < threadnums; i++) {3 workers.emplace_back([this]() {4 this->worker();5 });6 }7}这是类的构造函数,当在主函数创建线程池的实例时,就会调用这个构造函数,可以看到由于workers变量是个由线程组成的可变数组,因此调用workers.emplace_back()时会在workers的末尾构造一个元素。由于元素的类型是std::thread,因此等同于调用了std::thread的构造函数。因此最终就会创建threadnums个线程,每个线程都在执行[this]() {this->worker();}这个Lambda表达式,即每个线程都在执行类中的worker函数。
worker函数
1void ThreadPool::worker() {2 while (true) {3 std::function<void()> task;4
5 {6 std::unique_lock<std::mutex> lock(mtx);7 cv.wait(lock, [this] {8 return this->isstop || !this->myque.empty();9 });10 if (isstop && myque.empty()) return;11 task = std::move(this->myque.front());12 this->myque.pop();13 }14
15 task();2 collapsed lines
16 }17}这是每个线程执行的worker函数,也就是线程执行的逻辑。首先给多线程加上锁,防止多个线程同时访问共享资源,随后线程执行到cv.wait()进行等待,当被唤醒时从任务队列中获取任务,随后调用task()函数。
std::unique_lock<std::mutex> lock(mtx)是指给mtx加锁,但是只有一个线程能成功加锁,这个线程会往下运行程序,其他线程无法成功加锁,会卡在这个位置,直到锁被释放才会重新“抢”锁。成功加锁的进程执行到cv.wait()休眠时会自动释放锁,当线程被唤醒时自动加锁。
enques模板
1template <typename F, typename ...Arg>2auto ThreadPool::enques(F&& f, Arg&&... arg) -> std::future<typename std::result_of<F(Arg...)>::type> {3 using functype = typename std::result_of<F(Arg...)>::type;4
5 auto task = std::make_shared<std::packaged_task<functype()>>(6 std::bind(std::forward<F>(f), std::forward<Arg>(arg)...)7 );8
9 std::future<functype> rsfuture = task->get_future();10
11 {12 std::lock_guard<std::mutex> lockguard(mtx);13 if (isstop)14 throw std::runtime_error("Thread Pool Stop !!!");15 myque.emplace([task]() {8 collapsed lines
16 (*task)();17 });18 }19
20 cv.notify_one();21
22 return rsfuture;23}这是设置任务的模板,通过这个模板可以将任务放入任务队列中等待线程执行。首先我们看定义中对于任务队列的定义std::queue<std::function<void()>> myque,可以看到队列中的元素是std::function<void()>即不返回值不接收参数的函数,因此我们需要将我们的任务函数先封装,我们以int add(int a, int b)这个经典的函数为例。
1using functype = typename std::result_of<F(Arg...)>::type;2auto task = std::make_shared<std::packaged_task<functype()>>(3 std::bind(std::forward<F>(f), std::forward<Arg>(arg)...)4);std::result_of<F(Arg...)>::type可以推导出任务的返回类型,例如传入add,1,2程序会自动判断出需要返回一个int。随后打包任务,将传入的函数与它的参数绑定在一起,比如传入的是add函数的地址,其参数是1和2,这样就将add,1,2打包成一个不需要额外参数且返回为推导类型的函数对象。随后将其通过一个智能指针创建,这样就可以通过这个指针访问到封装的函数对象。
随后加上锁,将任务放入任务队列中,并随机唤醒一个进程执行这个任务,随后将先前创建的rsfuture返回,通过rsfuture.get()就可以拿到任务的返回值。
为什么这里需要加锁?如果不加锁就可能出现主线程与创建的子进程同时访问任务队列的情况,这里加锁就是为了保证此时只有主进程才能操作任务队列。
~ThreadPool析构函数
1ThreadPool::~ThreadPool() {2 {3 std::unique_lock<std::mutex> lock(mtx);4 isstop = true;5 }6
7 cv.notify_all();8
9 for (std::thread& onethread : workers) {10 onethread.join();11 }12}这是类的析构函数,当线程池删除时会调用这个函数。当线程池被删除时,程序会先给多线程加锁,随后将isstop变量赋值为true,通过cv.notify_all()通知全体线程。此时若线程未结束,则会在onethread.join()处阻塞,等待所有线程退出后关闭线程池。
测试案例
1#include "test.h"2#include "ThreadPool.hpp"3
4int main() {5 ThreadPool mypool(4);6 for (int i = 0; i < 20; i++) {7 auto rsfuture0 = mypool.enques([](int a, int b)->int {8 std::cout << "Thread: " << std::this_thread::get_id() << std::endl;9 return a + b;10 }, 10 * i, 10 * i);11 std::cout << "Thread rs: " << rsfuture0.get() << std::endl;12 }13 return 0;14}需要用到线程池的时候,将线程池类所在的hpp文件导入即可。
1Thread rs: Thread: 140737322710592203Thread rs: Thread: 1407373394960004205Thread rs: Thread: 1407373311032966407Thread rs: Thread: 1407373478887048609Thread rs: Thread: 140737322710592108011Thread rs: Thread: 1407373394960001210013Thread rs: Thread: 1407373311032961412015Thread rs: Thread: 14073734788870425 collapsed lines
1614017Thread rs: Thread: 1407373227105921816019Thread rs: Thread: 1407373394960002018021Thread rs: Thread: 1407373311032962220023Thread rs: Thread: 1407373478887042422025Thread rs: Thread: 1407373227105922624027Thread rs: Thread: 1407373394960002826029Thread rs: Thread: 1407373311032963028031Thread rs: Thread: 1407373478887043230033Thread rs: Thread: 1407373227105923432035Thread rs: Thread: 1407373394960003634037Thread rs: Thread: 1407373311032963836039Thread rs: Thread: 14073734788870440380可以看到只有四个线程号反复出现140737322710592 140737339496000 140737331103296 140737347888704,这也与创建进程池的时候创建的4个线程所对应。