Hyperf方案 协程池管理

张开发
2026/4/4 17:07:34 15 分钟阅读
Hyperf方案 协程池管理
?php/** * 案例标题协程池管理 * 说明使用Hyperf协程池控制并发资源防止协程无限制创建把内存撑爆 * 需要安装的包 * composer require hyperf/coroutine * composer require hyperf/guzzle */declare(strict_types1);// app/Pool/HttpClientPool.php namespaceApp\Pool;useGuzzleHttp\Client;useHyperf\Pool\Pool;useHyperf\Pool\PoolOption;usePsr\Container\ContainerInterface;/** * HTTP客户端连接池复用GuzzleHttp客户端不用每次都创建新的 */classHttpClientPoolextendsPool{publicfunction__construct(ContainerInterface$container,protectedarray$config[]){$this-configarray_merge([min_connections2,// 最少保持2个客户端max_connections20,// 最多20个超过就等connect_timeout10.0,wait_timeout3.0,// 等池子里有空闲的最多等3秒max_idle_time60,],$config);parent::__construct($container);}protectedfunctioncreateConnection():HttpClientConnection{// 创建一个新的GuzzleHttp连接放入池子returnnewHttpClientConnection($this-container,$this-config);}protectedfunctiongetOption():PoolOption{$optionnewPoolOption();$option-setMinConnections($this-config[min_connections]);$option-setMaxConnections($this-config[max_connections]);$option-setConnectTimeout($this-config[connect_timeout]);$option-setWaitTimeout($this-config[wait_timeout]);$option-setMaxIdleTime($this-config[max_idle_time]);return$option;}}// app/Pool/HttpClientConnection.php namespaceApp\Pool;useGuzzleHttp\Client;useHyperf\Pool\Connection;useHyperf\Pool\Pool;usePsr\Container\ContainerInterface;classHttpClientConnectionextendsConnection{privateClient$client;// 实际的HTTP客户端publicfunction__construct(ContainerInterface$container,array$config){parent::__construct($container,$this-createPool($container,$config));// 创建HTTP客户端配置连接超时等参数$this-clientnewClient([timeout$config[request_timeout]??30,connect_timeout$config[connect_timeout]??10,]);$this-lastUseTimemicrotime(true);}publicfunctiongetClient():Client{return$this-client;}publicfunctionreconnect():bool{// HTTP客户端无状态重建就行$this-clientnewClient([timeout30]);returntrue;}publicfunctioncheck():bool{returntrue;// HTTP客户端不需要心跳检查}publicfunctionclose():bool{returntrue;}privatefunctioncreatePool(ContainerInterface$container,array$config):Pool{returnnewHttpClientPool($container,$config);}}// app/Service/ConcurrentHttpService.php namespaceApp\Service;useHyperf\Coroutine\Coroutine;useHyperf\Coroutine\WaitGroup;useHyperf\Engine\Channel;/** * 并发HTTP请求服务用协程池控制并发数量 */classConcurrentHttpService{/** * 并发请求多个URL但限制最大并发数为$concurrency * 比如有100个URLconcurrency10就分批次每次10个并发 */publicfunctionfetchAll(array$urls,int$concurrency10):array{$results[];$channelnewChannel($concurrency);// Channel当信号量用控制并发数$wgnewWaitGroup();// 等所有协程完成foreach($urlsas$index$url){$channel-push(1);// 占一个坑满了就会阻塞等其他协程释放$wg-add();Coroutine::create(function()use($url,$index,$channel,$wg,$results){try{$result$this-fetchOne($url);// 实际发HTTP请求$results[$index]$result;}catch(\Throwable$e){$results[$index][error$e-getMessage()];// 失败也记录不影响其他}finally{$channel-pop();// 释放坑位让下一个协程进来$wg-done();// 通知WaitGroup这个协程完了}});}$wg-wait(30.0);// 最多等30秒全部完成或超时后返回return$results;}privatefunctionfetchOne(string$url):array{// 实际用GuzzleHttp请求这里简化return[url$url,status200,timemicrotime(true)];}/** * 协程池执行任务限制同时运行的任务数 * 适合批量处理数据库操作、文件读写等 */publicfunctionrunWithPool(array$tasks,int$poolSize5):array{$results[];$semnewChannel($poolSize);// 用Channel做信号量$wgnewWaitGroup();foreach($tasksas$key$task){$sem-push(true);// 获取信号量满了等$wg-add();Coroutine::create(function()use($task,$key,$sem,$wg,$results){try{$results[$key]$task();// 执行任务闭包}finally{$sem-pop();// 释放信号量$wg-done();}});}$wg-wait(60.0);return$results;}}// app/Controller/ConcurrentController.php namespaceApp\Controller;useApp\Service\ConcurrentHttpService;useHyperf\HttpServer\Annotation\Controller;useHyperf\HttpServer\Annotation\PostMapping;useHyperf\HttpServer\Contract\RequestInterface;#[Controller(prefix:/concurrent)]classConcurrentController{publicfunction__construct(privateConcurrentHttpService$service){}/** * POST /concurrent/fetch - 并发请求多个URL最多10个同时跑 */#[PostMapping(path:/fetch)]publicfunctionfetch(RequestInterface$request):array{$urls$request-input(urls,[]);if(empty($urls)){return[code400,msg请传入urls数组];}$startmicrotime(true);$results$this-service-fetchAll($urls,concurrency:10);$elapsedround(microtime(true)-$start,3);return[code0,elapsed$elapsed.s,// 总耗时体现并发优势countcount($results),data$results,];}}

更多文章