C++ 无锁编程:单停多发送场景高性能方案

张开发
2026/4/2 18:44:41 15 分钟阅读
C++ 无锁编程:单停多发送场景高性能方案
C 无锁编程单停多发送场景高性能方案引言在多线程编程中一个常见的场景是多个线程并发地向某个资源或服务发送数据而在某个时刻需要优雅地停止所有发送操作并等待已经开始的发送完成后再进行资源清理。如果使用互斥锁mutex来同步虽然能保证正确性但会引入锁竞争降低并发性能尤其在发送操作非常频繁时锁的开销会成为瓶颈。本文将介绍一种基于原子变量std::atomic和位域的无锁实现方案它能够在不使用互斥锁的前提下安全地处理“单停、多发送”的场景既保证了高性能又确保了线程安全。问题背景假设我们有一个网络发送器Sender它允许多个线程同时调用send()方法发送数据。当需要停止发送器例如关闭连接或销毁资源时我们必须阻止新的发送操作开始。等待所有已经开始的发送操作完成。最后安全地释放资源。如果直接在发送过程中强行停止可能会导致访问已释放的内存或其它资源引发崩溃。传统做法是引入一个全局的互斥锁每个发送线程先加锁检查停止标志然后发送最后解锁停止线程也需要加锁来修改停止标志并等待条件变量。这种方案虽然正确但锁的争用会严重影响吞吐量。那么有没有一种无锁的方法呢答案是肯定的。思路我们的核心想法是利用一个std::atomicuint32_t变量将其划分为两个区域最高位第31位作为“停止标志”STOP_BIT当该位为1时表示已经请求停止。低31位作为“正在发送的任务计数”count每个发送线程在开始前原子递增计数结束后原子递减计数。关键操作如下发送线程原子递增计数fetch_add。检查递增前的旧值如果停止标志已经设置则原子递减计数回退并放弃发送。执行实际发送逻辑。发送完成后原子递减计数。停止线程原子设置停止标志fetch_or。自旋等待计数变为0期间让出CPUyield。此时所有发送都已结束可以安全清理资源。整个过程仅使用了原子操作和自旋等待完全避免互斥锁。核心代码常量定义#includeatomic#includethread#includecstdintclassSender{std::atomicuint32_tstate_{0};staticconstexpruint32_tSTOP_BIT1u31;// 最高位staticconstexpruint32_tCOUNT_MASKSTOP_BIT-1;// 低31位掩码public:// ... 方法实现};发送尝试booltry_send(){// 1. 增加计数并获取旧值uint32_toldstate_.fetch_add(1,std::memory_order_acq_rel);if(oldSTOP_BIT){// 检查停止标志// 已停止回退计数。使用 relaxed 即可因为无需同步其他数据state_.fetch_sub(1,std::memory_order_relaxed);returnfalse;// 发送被拒绝}// 2. 执行实际发送操作模拟耗时// ... 这里可以安全地访问共享资源std::this_thread::sleep_for(std::chrono::milliseconds(10));// 3. 发送完成减少计数。使用 release 确保之前的写入对停止线程可见state_.fetch_sub(1,std::memory_order_release);returntrue;}停止请求voidstop(){// 1. 设置停止标志若已设置则忽略重复设置uint32_toldstate_.fetch_or(STOP_BIT,std::memory_order_acq_rel);// 无论是否已经设置过停止标志都需要等待计数归零因为可能仍有发送未完成// 若计数已归零循环条件为假直接返回while((state_.load(std::memory_order_acquire)COUNT_MASK)!0){std::this_thread::yield();// 让出CPU避免空转}// 2. 此时所有发送都已结束可以安全清理资源}内存序Memory OrderC 原子操作的内存序是多线程正确性的关键。让我们逐一分析每个原子操作的选择理由。发送线程的fetch_adduint32_toldstate_.fetch_add(1,std::memory_order_acq_rel);为什么用acq_relfetch_add是一个“读-改-写”操作它返回修改前的值。我们既需要读取旧值检查停止标志又需要确保后续的写操作比如实际发送中的写入不会重排到这次加法之前否则可能导致停止线程在计数减之前看不到这些写入。memory_order_acq_rel提供了“获取-释放”语义获取acquire部分保证之后的所有读写操作不会重排到此操作之前。释放release部分保证之前的所有读写操作不会重排到此操作之后。这样当发送线程看到停止标志未设置时可以安全地开始发送并且发送中的写入在fetch_sub之前对所有线程可见。发送线程的fetch_sub回退时state_.fetch_sub(1,std::memory_order_relaxed);为什么用relaxed此时已经确定停止标志已设置我们只是简单地减少计数并且不会有任何后续需要同步的共享数据访问。relaxed保证了原子性而停止线程通过load(acquire)最终会看到这个修改因为原子变量的修改对所有线程是可见的尽管没有同步关系但最终会传播。使用relaxed避免了不必要的内存屏障性能最优。发送线程的fetch_sub正常完成时state_.fetch_sub(1,std::memory_order_release);为什么用release我们需要确保本次发送中对共享资源的所有写入在计数减少之前对其他线程可见。release语义保证了在它之前的所有写入不会被重排到它之后因此停止线程如果通过acquire看到计数减少也就一定能看到这些写入。停止线程的fetch_oruint32_toldstate_.fetch_or(STOP_BIT,std::memory_order_acq_rel);为什么用acq_rel设置停止标志之前需要看到之前发送线程的fetch_sub的release效果尽管此时可能计数还未归零使用acquire部分可以保证。设置停止标志后需要确保后续的load能看到这个标志使用release部分可以保证。停止线程的load自旋等待while((state_.load(std::memory_order_acquire)COUNT_MASK)!0){...}为什么用acquire每次读取计数时我们希望看到发送线程在完成时对计数的fetch_sub的release效果。acquire与release配对能够正确同步。此外acquire还保证了在循环中不会将后续操作重排到加载之前。自旋等待的优化自旋等待spin-wait如果设计不当会导致 CPU 空转浪费资源。我们的代码使用了std::this_thread::yield()它将当前线程的时间片让给其他可运行的线程适合等待时间稍长的场景。如果预期等待时间极短比如几个 CPU 周期可以使用_mm_pause()或std::this_thread::sleep_for的短时间休眠来降低功耗。通常yield已经足够平衡响应性与资源占用。异常安全与 RAII 封装如果在发送过程中抛出异常必须确保计数被正确递减否则计数会永远无法归零导致停止线程死锁。可以使用 RAII 技术封装计数管理classSendGuard{std::atomicuint32_tstate_;boolactive_;public:SendGuard(std::atomicuint32_ts):state_(s),active_(false){uint32_toldstate_.fetch_add(1,std::memory_order_acq_rel);if(!(oldSTOP_BIT)){active_true;// 成功需要负责析构时递减}else{state_.fetch_sub(1,std::memory_order_relaxed);}}~SendGuard(){if(active_){state_.fetch_sub(1,std::memory_order_release);}}explicitoperatorbool()const{returnactive_;}};使用方式booltry_send(){SendGuardguard(state_);if(!guard)returnfalse;// 已停止// 执行发送returntrue;}这样即使发送过程中抛出异常SendGuard的析构函数也会自动减少计数确保状态一致。示例下面是一个完整的可运行示例演示了多个发送线程和一个停止线程的协作#includeatomic#includethread#includeiostream#includevector#includechronoclassSender{std::atomicuint32_tstate_{0};staticconstexpruint32_tSTOP_BIT1u31;staticconstexpruint32_tCOUNT_MASKSTOP_BIT-1;public:booltry_send(){uint32_toldstate_.fetch_add(1,std::memory_order_acq_rel);if(oldSTOP_BIT){state_.fetch_sub(1,std::memory_order_relaxed);returnfalse;}// 模拟发送操作耗时std::this_thread::sleep_for(std::chrono::milliseconds(10));state_.fetch_sub(1,std::memory_order_release);returntrue;}voidstop(){uint32_toldstate_.fetch_or(STOP_BIT,std::memory_order_acq_rel);// 等待所有发送完成while((state_.load(std::memory_order_acquire)COUNT_MASK)!0){std::this_thread::yield();}std::coutAll sending completed, stopped.\n;}};intmain(){Sender sender;std::vectorstd::threadthreads;// 启动 10 个发送线程for(inti0;i10;i){threads.emplace_back([sender]{if(sender.try_send()){std::coutSend succeeded\n;}else{std::coutSend rejected (stopped)\n;}});}// 等待一小段时间让部分发送线程开始std::this_thread::sleep_for(std::chrono::milliseconds(5));// 停止发送器sender.stop();// 等待所有线程结束for(autot:threads){t.join();}return0;}竞态测试为了验证实现的正确性我们需要进行更严格的并发测试。以下测试将启动多个发送线程每个线程尝试发送多次。在某个时间点启动停止线程。使用原子计数器记录成功发送的次数并验证停止后的状态。测试代码#includeatomic#includethread#includeiostream#includevector#includechrono#includecassert#includerandomclassSender{std::atomicuint32_tstate_{0};staticconstexpruint32_tSTOP_BIT1u31;staticconstexpruint32_tCOUNT_MASKSTOP_BIT-1;public:booltry_send(){uint32_toldstate_.fetch_add(1,std::memory_order_acq_rel);if(oldSTOP_BIT){state_.fetch_sub(1,std::memory_order_relaxed);returnfalse;}// 模拟发送操作随机耗时std::this_thread::sleep_for(std::chrono::microseconds(rand()%100));state_.fetch_sub(1,std::memory_order_release);returntrue;}voidstop(){uint32_toldstate_.fetch_or(STOP_BIT,std::memory_order_acq_rel);while((state_.load(std::memory_order_acquire)COUNT_MASK)!0){std::this_thread::yield();}}// 用于测试检查停止标志和计数boolis_stopped()const{return(state_.load(std::memory_order_relaxed)STOP_BIT)!0;}uint32_tactive_count()const{returnstate_.load(std::memory_order_relaxed)COUNT_MASK;}};intmain(){constexprintNUM_SENDERS10;constexprintSENDS_PER_THREAD1000;Sender sender;std::vectorstd::threadthreads;std::atomicintsuccess_count{0};std::atomicintreject_count{0};// 启动发送线程for(inti0;iNUM_SENDERS;i){threads.emplace_back([sender,success_count,reject_count]{for(intj0;jSENDS_PER_THREAD;j){if(sender.try_send()){success_count.fetch_add(1,std::memory_order_relaxed);}else{reject_count.fetch_add(1,std::memory_order_relaxed);}}});}// 让发送线程运行一段时间std::this_thread::sleep_for(std::chrono::milliseconds(100));// 停止sender.stop();// 等待所有发送线程结束for(autot:threads){t.join();}// 验证最终状态std::coutSuccess count: success_count.load()std::endl;std::coutReject count: reject_count.load()std::endl;std::coutActive count after stop: sender.active_count()std::endl;std::coutStop flag: sender.is_stopped()std::endl;// 断言停止后没有活跃计数assert(sender.active_count()0);assert(sender.is_stopped()true);// 可选再尝试发送应该被拒绝assert(sender.try_send()false);return0;}说明10个发送线程每个发送1000次总发送尝试次数为10000。在发送过程中随机延迟模拟真实负载。停止线程在100ms后调用stop()等待所有活跃发送完成。使用success_count和reject_count分别记录成功和失败的发送次数。最终检查计数应为0停止标志为true且再尝试发送会失败。运行多次观察输出应该始终满足断言条件证明实现正确。性能无锁所有操作均为原子指令fetch_add、fetch_sub、fetch_or在多核 CPU 上通常比互斥锁快很多尤其是在高并发时。内存开销仅使用一个 32 位原子变量非常轻量。扩展性发送线程之间完全并行没有共享锁竞争。停止线程只在设置停止标志和自旋等待时短暂活跃。自旋等待使用yield()避免了忙等但停止线程仍会消耗少量 CPU 时间。如果停止操作非常罕见这种开销可以忽略。注意计数溢出低31位的最大值为(131)-1约21亿。如果并发发送数超过这个值计数会溢出到停止位导致逻辑错误。实际应用中并发发送数通常远小于此所以安全。若需要更大计数可改用uint64_t。内存序选择错误的内存序可能导致可见性问题。例如若发送线程的fetch_add使用relaxed停止线程可能看不到发送线程对共享资源的写入造成数据竞争。务必理解每种内存序的含义。自旋等待的公平性如果停止线程长时间等待例如发送操作耗时很长yield()会不断让出 CPU但该线程仍会频繁被调度增加系统开销。此时可考虑使用条件变量与原子变量结合但会引入锁降低无锁性。异常安全务必确保计数在任何退出路径下都被正确递减推荐使用 RAII。多次停止stop()方法的设计允许多次调用每次调用都会等待计数归零确保资源安全。如果希望只有第一次调用有效可以增加额外的状态但通常不需要。标注本文介绍了一种在 C 中实现“单停、多发送”场景的无锁编程技术。通过将原子变量拆分为停止标志和发送计数并合理使用内存序我们成功地在不使用互斥锁的情况下保证了线程安全。该方案性能优异实现简洁适用于网络框架、线程池、消息队列等需要高效停止机制的场景。无锁编程虽然强大但也需要开发者对原子操作和内存模型有深入理解。希望本文能为您的并发编程实践提供有价值的参考。参考C 标准库文档std::atomic《C Concurrency in Action》by Anthony WilliamsIntel 开发者手册内存屏障与原子操作

更多文章