mqtt消费堆积

张开发
2026/4/9 17:23:15 15 分钟阅读

分享文章

mqtt消费堆积
mqtt发送主题成功但消费主题只有一个代码实例ServiceActivator(inputChannel caGetConfig) Override public void handleMessage(Message? message) throws MessagingException { try { String payload (String) message.getPayload(); String topic (String) message.getHeaders().get(MQTTRECEIVEDTOPIC); log.info(接收到MQTT消息主题: {}, 消息内容: {}, topic, payload); try { if (StringUtils.isEmpty(payload)) { log.info(接收到空的MQTT消息主题: {}, topic); CompletableFuture.runAsync(() - generateVideoService.getNeMessage()); return; } if (StringUtils.isBlank(topic)) { log.info(接收到空的MQTT消息主题: {}, topic); CompletableFuture.runAsync(() - generateVideoService.getNeMessage()); } // 处理MQTT消息 ListString onlineStatusNewLis JSON.parseArray(payload, String.class); if (CollectionUtils.isNotEmpty(onlineStatusNewLis)){ generateVideoService.getNeMessageByUrl(onlineStatusNewLis) } } catch (Exception e) { log.error(分发MQTT消息失败主题: {}, 消息: {}, topic, payload, e); } } catch (Exception e) { log.error(处理MQTT消息失败消息: {}, message, e); } }问题分析1. MQTT 消费者是单线程的ServiceActivator(inputChannel caGetConfig)基于 Spring Integration MQTT其inputChannel默认是PublishSubscribeChannel同步调用即消息的接收和处理在同一线程中串行执行。只有当handleMessage()方法返回后才能处理下一条消息。2. 使用CompletableFuture.runAsync 异步调用generateVideoService.getNeMessageByUrl(onlineStatusNewLis)方法执行完才能返回。如果onlineStatusNewLis列表较大或者数据库查询较慢这段时间内 MQTT 消费线程就会被阻塞。如果该方法里面是一个while(!Thread.currentThread().isInterrupted())的无限循环这个循环会在 MQTT 消费线程上执行永远无法返回后续所有消息都将被彻底阻塞。添加异步编排执行解决阻塞// 处理MQTT消息 ListString onlineStatusNewLis JSON.parseArray(payload, String.class); if (CollectionUtils.isNotEmpty(onlineStatusNewLis)){ CompletableFuture.runAsync(() - generateVideoService.getNeMessageByUrl(onlineStatusNewLis)); }

更多文章