Express.js + SSE实战:从零搭建一个类ChatGPT的流式对话后端

张开发
2026/4/8 22:16:19 15 分钟阅读

分享文章

Express.js + SSE实战:从零搭建一个类ChatGPT的流式对话后端
Express.js SSE实战构建类ChatGPT的流式对话后端1. 为什么选择SSE技术实现流式对话在构建实时文本交互应用时开发者常面临通信协议的选择困境。SSEServer-Sent Events技术凭借其轻量级、易实现和与HTTP协议天然兼容的特点成为实现打字机效果的理想选择。与WebSocket相比SSE具有三个显著优势单向通信更简单服务端到客户端的单向数据流完美匹配内容逐字推送场景HTTP协议原生支持无需额外端口或复杂握手协议自动重连机制内置连接中断恢复功能提升用户体验// 典型SSE响应头设置 res.writeHead(200, { Content-Type: text/event-stream, Cache-Control: no-cache, Connection: keep-alive });提示当需要处理用户双向交互时如即时聊天WebSocket仍是更好选择。但对于知识问答、内容生成等场景SSE能提供更简洁的实现方案。2. Express.js中的SSE基础实现2.1 初始化Express应用首先创建基本的Express应用结构npm init -y npm install express cors然后建立核心服务文件// server.js const express require(express); const app express(); const PORT 3000; app.use(require(cors)()); app.get(/stream, (req, res) { // SSE实现将放在这里 }); app.listen(PORT, () { console.log(Server running on port ${PORT}); });2.2 实现基础流式响应模拟ChatGPT的逐字输出效果app.get(/stream, (req, res) { res.set({ Content-Type: text/event-stream, Cache-Control: no-cache, Connection: keep-alive }); const sampleText SSE技术让实时文本流变得简单高效。; let index 0; const sendData setInterval(() { if (index sampleText.length) { res.write(data: ${sampleText.charAt(index)}\n\n); index; } else { clearInterval(sendData); res.write(event: end\ndata: \n\n); } }, 100); });前端对接代码示例script const eventSource new EventSource(http://localhost:3000/stream); eventSource.onmessage (e) { if (e.data) { document.getElementById(output).textContent e.data; } }; eventSource.addEventListener(end, () { eventSource.close(); console.log(Stream ended); }); /script3. 生产环境关键优化策略3.1 连接稳定性保障问题类型解决方案实现代码示例连接超时心跳机制保持连接活跃res.write(: heartbeat\n\n)网络中断自动重连断点续传客户端记录最后接收位置服务重启客户端指数退避重试策略retry: 5000字段实现心跳检测的改进版本// 服务端 const heartbeat setInterval(() { res.write(: heartbeat\n\n); }, 30000); req.on(close, () { clearInterval(heartbeat); }); // 客户端 eventSource.onerror () { setTimeout(() { new EventSource(streamUrl); // 自动重连 }, 5000); };3.2 多客户端连接管理const clients new Set(); app.get(/stream, (req, res) { // ...头部设置... clients.add(res); req.on(close, () { clients.delete(res); }); }); // 广播消息函数 function broadcast(message) { clients.forEach(client { if (!client.finished) { client.write(data: ${message}\n\n); } }); }注意生产环境应考虑使用Redis等工具实现跨进程/跨服务器的连接管理。4. 高级应用场景实现4.1 结合AI文本生成整合大语言模型API的完整流程接收用户提问调用AI生成API流式返回生成结果处理生成中断app.post(/ask, async (req, res) { const { question } req.body; // 设置SSE响应头 res.setSSEHeaders(); // 调用AI API模拟示例 const aiResponse await callAIService(question); // 流式返回 const chunks splitIntoCharacters(aiResponse); for (const chunk of chunks) { await new Promise(resolve setTimeout(resolve, 50)); res.write(data: ${chunk}\n\n); } res.write(event: end\ndata: \n\n); });4.2 性能优化技巧数据压缩对长文本启用gzip压缩批处理适当合并字符包减少请求次数缓存策略对常见问题答案进行缓存// 批处理示例 function sendInBatches(text, batchSize 5) { for (let i 0; i text.length; i batchSize) { const batch text.slice(i, i batchSize); res.write(data: ${batch}\n\n); } }5. 安全与异常处理5.1 安全防护措施CORS配置精确设置允许的源速率限制防止滥用认证鉴权JWT验证// 安全中间件示例 app.use(/stream, (req, res, next) { // JWT验证 try { verifyToken(req.headers.authorization); next(); } catch (err) { res.status(401).end(); } });5.2 异常处理方案// 错误处理中间件 app.use((err, req, res, next) { if (req.headers.accept text/event-stream) { res.write(event: error\ndata: Service unavailable\n\n); res.write(event: end\ndata: \n\n); res.end(); } else { next(err); } }); // 客户端处理 eventSource.addEventListener(error, (e) { showErrorMessage(e.data); eventSource.close(); });在实际项目中SSE连接的生命周期管理往往比想象中复杂。一个常见陷阱是忘记清理中断的连接导致内存泄漏。通过定期ping-pong检测可以及时发现僵尸连接// 连接健康检查 setInterval(() { clients.forEach(client { if (!client.lastPong || Date.now() - client.lastPong 60000) { client.res.end(); // 终止不健康的连接 clients.delete(client); } }); }, 30000);

更多文章