线程池:固定式线程池FixedThreadPool

张开发
2026/4/10 2:17:23 15 分钟阅读

分享文章

线程池:固定式线程池FixedThreadPool
一、固定式线程池的概念固定式线程池是指在创建时就确定好线程数量的线程池实现。池内维护一组预先创建好的工作线程所有提交的任务不会立刻执行而是放入一个任务队列中由这些固定数量的线程依次取出并执行。特点线程数量固定任务队列通常是有界的防止内存无线膨胀线程复用任务执行完后线程不会销毁而是继续等待下一个任务生产者-消费者模式调用方生产者提交任务工作线程消费者执行任务与“动态线程池”CachedThreadPool不同固定式线程池不会根据任务量动态增减线程更适合 CPU 密集型场景或需要严格控制并发度的系统。二、为什么会设计出固定式线程池直接在每次任务到来时 new std::thread 的做法有严重缺陷创建/销毁线程开销大每次创建线程都需要内核分配栈空间、上下文切换销毁同样耗时。在高并发场景下频繁创建线程会导致“线程爆炸”系统瞬间生成成百上千个线程池CPU、内存被耗尽。资源无法控制操作系统对线程总数有限制Linux默认几千个超出后程序直接崩溃。性能不可预测线程越多上下文切换越频繁整体吞吐量反而下降。固定式线程池的解决方案是提前创建固定数量的线程通常设为CPU核心数彻底消除创建/销毁开销。通过有界任务队列缓冲多余的任务。实现“生产者阻塞”或“队列满等待”从而天然限流。资源可预测、可控程序启动时就能知道最多占用多少线程和内存适合服务器、游戏服务器、后台任务处理等生产环境。三、代码设计同步队列SyncQueue#includelist#includemutex#includecondition_variable#includeiostreamusingnamespacestd;constintMaxTaskCount200;templateclassTclassSyncQueue{private:std::listTm_queue;mutablestd::mutex m_mutex;std::condition_variable m_notEmpty;std::condition_variable m_notFull;intm_maxSize;boolm_needStop;boolIsFull()const{boolfullm_queue.size()m_maxSize;if(full){coutm_queue 已经满了需要等待...endl;}returnfull;}boolIsEmpty()const{boolemptym_queue.empty();if(empty){coutm_queue 已经空了需要等待...endl;}returnempty;}templateclassFvoidAdd(Ftask){std::unique_lockstd::mutexlocker(m_mutex);m_notFull.wait(locker,[this]{returnm_needStop||!IsFull();});if(m_needStop){return;}m_queue.push_back(std::forwardF(task));m_notEmpty.notify_one();}public:SyncQueue(intmaxsize):m_maxSize(maxsize),m_needStop(false){}~SyncQueue(){}voidPut(constTtask){Add(task);}voidPut(Ttask){Add(std::forwardT(task));}voidTake(std::listTlist){std::unique_lockstd::mutexlocker(m_mutex);m_notEmpty.wait(locker,[this]{returnm_needStop||!IsEmpty();});if(m_needStop){return;}liststd::move(m_queue);m_notFull.notify_one();}voidTake(Ttask){std::unique_lockstd::mutexlocker(m_mutex);m_notEmpty.wait(locker,[this]{returnm_needStop||!IsEmpty();});if(m_needStop){return;}taskm_queue.front();m_queue.pop_front();m_notFull.notify_one();}voidStop(){{std::lock_guardstd::mutexlocker(m_mutex);m_needStoptrue;}m_notFull.notify_all();m_notEmpty.notify_all();}boolEmpty()const{std::lock_guardstd::mutexlocker(m_mutex);returnm_queue.empty();}boolFull()const{std::lock_guardstd::mutexlocker(m_mutex);returnm_queue.size()m_maxSize;}size_tSize()const{std::lock_guardstd::mutexlocker(m_mutex);returnm_queue.size();}size_tCount()const{returnm_queue.size();}};同步队列框架讲解SyncQueue 是整个线程池的“心脏”它实现了线程安全的生产者-消费者有界队列。关键成员std::listTm_queue;// 实际存储任务list 便于整体移动mutablestd::mutex m_mutex;// 保护队列和所有状态std::condition_variable m_notEmpty;// 消费者等待“非空”std::condition_variable m_notFull;// 生产者等待“非满”intm_maxSize;// 队列上限boolm_needStop;// 优雅停止标志核心方法1.IsFull() / IsEmpty()在 condition_variable::wait 的 lambda 中被调用。这些函数在持有 mutex 的情况下被调用wait 的 predicate 语义保证所以里面直接访问 m_queue.size() 是安全的。打印调试信息方便读者观察“队列满/空时阻塞”的行为。2,Add模板Put (生产者接口)templateclassFvoidAdd(Ftask){std::unique_lockstd::mutexlocker(m_mutex);m_notFull.wait(locker,[this]{returnm_needStop||!IsFull();});if(m_needStop)return;m_queue.push_back(std::forwardF(task));m_notEmpty.notify_one();// 只唤醒一个消费者效率最高}完美转发支持左值/右值零拷贝。wait predicate 防止虚假唤醒和队列已满。m_needStop 判断让 Stop 后立即返回避免析构时死锁。3.Take消费者接口Take(T task)单任务取出最常用。Take(std::list list)一次性取出所有任务批量消费可扩展。同样使用 m_notEmpty.wait predicate if(m_needStop) return;。取出后 m_notFull.notify_one() 唤醒生产者。4.Stop() —— 优雅停止voidStop(){{std::lock_guardstd::mutexlocker(m_mutex);m_needStoptrue;}m_notFull.notify_all();m_notEmpty.notify_all();}先在锁内设置标志再在锁外 notify_all避免通知丢失。让正在 wait 的生产者和消费者立刻醒来并退出。为什么这样设计双条件变量notEmpty notFull是经典的有界缓冲区实现能同时支持生产者阻塞和消费者阻塞。m_needStop predicate 是线程池优雅关闭的标准技巧避免析构时线程还在 wait 导致程序 hang。使用 std::list 而非 std::queue 是因为 Take(list) 可以 std::move 整个队列性能极高批量消费时优势明显。所有查询方法Empty/Full/Size都加锁。固定式线程池FixedThreadPool#includeSyncQueue.hpp#includefunctionalclassFixedThreadPool{public:usingTaskstd::functionvoid(void);private:std::liststd::shared_ptrstd::threadm_threadgroup;SyncQueueTaskm_queue;std::atomic_bool m_running;std::once_flag m_flag;voidStart(intnumthreads){m_runningtrue;for(inti0;inumthreads;i){m_threadgroup.push_back(std::make_sharedstd::thread(FixedThreadPool::RunInThread,this));//m_threadgroup.push_back(std::shared_ptrstd::thread(new thread(FixedThreadPool::RunInThread,this)))}}voidRunInThread(){while(m_running){Task task;m_queue.Take(task);if(m_runningtask){task();}}}voidStopThreadGroup(){m_queue.Stop();m_runningfalse;for(autothread:m_threadgroup){if(thread){thread-join();}}m_threadgroup.clear();}public:FixedThreadPool(intnumThreadsstd::thread::hardware_concurrency()):m_queue(MaxTaskCount),m_running(false){Start(numThreads);}~FixedThreadPool(){Stop();}voidStop(){std::call_once(m_flag,[this]{StopThreadGroup();});}voidAddTask(Tasktask){m_queue.Put(std::forwardTask(task));}voidAddTask(constTasktask){m_queue.Put(task);}};固定式线程池框架讲解FixedThreadPool 完全建立在 SyncQueue 之上职责清晰分离std::liststd::shared_ptrstd::threadm_threadgroup;// 线程组必须用 shared_ptr因为 thread 不可复制SyncQueueTaskm_queue;// 任务队列MaxTaskCount200std::atomic_bool m_running;// 原子运行标志std::once_flag m_flag;// 防止重复 Stop关键方法Start(int numThreads)创建固定数量线程每个线程跑 RunInThread。RunInThread()工作线程主循环while(m_running){Task task;m_queue.Take(task);// 阻塞等待任务if(m_runningtask)// 双重检查task();}if (m_running task) 是防止 Stop 后还执行任务的防护。StopThreadGroup()m_queue.Stop() → 唤醒所有 Take/Put。m_running false。join() 所有线程。清空线程组。Stop() 使用 std::call_once 保证只停止一次析构和手动 Stop 都安全。AddTask 直接调用 m_queue.Put支持完美转发。为什么要这样设计职责分离SyncQueue 只管“任务存储同步”FixedThreadPool 只管“线程生命周期”代码清晰、可维护。默认线程数 hardware_concurrency()最合理的 CPU 密集型默认值。shared_ptr list 是 C 中存储动态线程组的标准做法thread 不可复制、不可移动到 vector 后又 join。std::once_flag 解决“析构时重复 Stop”导致的二次 join 崩溃问题。生产者AddTask可能阻塞队列满这正是固定线程池的“背压”机制防止任务无限堆积。测试代码#includeFixedThreadPool.hpp#includefutureFixedThreadPoolpool(4);std::mutex g_cout_mutex;voidAdd(inta,intb,std::promiseintc_promise){coutadd begin ...endl;std::this_thread::sleep_for(std::chrono::milliseconds(2000));intcab;c_promise.set_value(c);std::this_thread::sleep_for(std::chrono::milliseconds(1000));coutadd end ...endl;}voidadd_a(){std::promiseintc_promise;std::futureinta_futurec_promise.get_future();std::functionvoid(void)fstd::bind(Add,10,20,std::ref(c_promise));pool.AddTask(f);{std::unique_lockstd::mutexlocker(g_cout_mutex);coutadd_a:a_future.get()endl;}}voidadd_b(){std::promiseintc_promise;std::futureinta_futurec_promise.get_future();std::functionvoid(void)fstd::bind(Add,20,30,std::ref(c_promise));pool.AddTask(f);{std::unique_lockstd::mutexlocker(g_cout_mutex);coutadd_b:a_future.get()endl;}}voidadd_c(){std::promiseintc_promise;std::futureinta_futurec_promise.get_future();std::functionvoid(void)fstd::bind(Add,30,40,std::ref(c_promise));pool.AddTask(f);{std::unique_lockstd::mutexlocker(g_cout_mutex);coutadd_c:a_future.get()endl;}}intmain(){std::threadtha(add_a);std::threadthb(add_b);std::threadthc(add_c);tha.join();thb.join();thc.join();return0;}测试代码的目的验证多生产者 多消费者并发3 个线程tha/thb/thc同时调用 AddTask池内 4 个工作线程同时执行。验证异步执行 结果返回每个任务用 std::promise std::future 把计算结果ab传回主线程。future.get() 会阻塞调用方直到工作线程 set_value。验证任务执行的耗时与交错Add 函数里 sleep(2000ms) sleep(1000ms)让 cout 输出 begin/end 明显交错肉眼可见“任务在后台并行执行”。验证线程池在析构时的正确停止main 结束时 ~FixedThreadPool 调用 Stop()此时所有任务已完成因为 get() 已返回不会丢失任务。验证互斥输出g_cout_mutex 保护打印 add_a:30 等结果避免多线程 cout 乱序。

更多文章