手把手教你用Python Socket实现TCP长连接:从心跳保活到自动重连的完整代码示例

张开发
2026/4/6 5:19:40 15 分钟阅读

分享文章

手把手教你用Python Socket实现TCP长连接:从心跳保活到自动重连的完整代码示例
Python Socket长连接实战从心跳机制到自动重连的工业级解决方案在物联网设备监控、实时游戏服务器或金融交易系统中TCP长连接的稳定性直接决定业务连续性。我曾为一个智能家居项目调试连接模块时发现设备频繁离线——不是WiFi信号问题而是服务端主动掐断了沉默的连接。这促使我深入研究TCP长连接的保活机制最终形成这套覆盖心跳设计、状态监控、异常恢复的完整方案。1. 长连接基础与心跳机制原理TCP协议本身提供传输保障但默认设置下网络设备如路由器、防火墙会主动清理长时间无数据交互的连接。这就是为什么即使服务端未主动关闭客户端仍可能收到ConnectionResetError: [WinError 10054]错误。1.1 操作系统层保活参数不同操作系统对TCP保活的实现存在差异def set_keepalive(sock, after_idle_sec60, interval_sec30, max_fails5): 跨平台设置TCP保活参数 if sys.platform win32: # Windows专属设置单位毫秒 sock.ioctl(socket.SIO_KEEPALIVE_VALS, (1, after_idle_sec * 1000, interval_sec * 1000)) else: # Linux/Mac通用设置单位秒 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, after_idle_sec) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval_sec) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, max_fails)关键参数对比参数WindowsLinux作用首次探测等待SIO_KEEPALIVE_VALS[1]TCP_KEEPIDLE连接空闲多久开始探测探测间隔SIO_KEEPALIVE_VALS[2]TCP_KEEPINTVL每次探测的间隔时间最大失败次数固定3次TCP_KEEPCNT连续失败多少次判定连接死亡提示生产环境中建议首次探测等待时间设为300秒5分钟避免过于频繁的心跳消耗资源1.2 应用层心跳协议设计操作系统层面的保活只能检测连接存活无法确保业务可用性。我们需要在应用层实现双向心跳class HeartbeatProtocol: PING b\x01 # 心跳请求标识 PONG b\x02 # 心跳响应标识 staticmethod def is_heartbeat(data): return data in (HeartbeatProtocol.PING, HeartbeatProtocol.PONG)典型心跳交互流程客户端每60秒发送PING服务端收到后立即回复PONG客户端连续3次未收到响应视为连接故障2. 连接状态机与自动重连2.1 连接状态建模实现一个状态机管理连接生命周期from enum import Enum, auto class ConnectionState(Enum): DISCONNECTED auto() # 初始状态 CONNECTING auto() # 连接中 CONNECTED auto() # 已连接 RECONNECTING auto() # 重连中状态转换规则连接成功DISCONNECTED → CONNECTING → CONNECTED连接丢失CONNECTED → RECONNECTING重连失败RECONNECTING → DISCONNECTED2.2 带指数退避的重连算法避免网络恢复时的重连风暴import time import random class ReconnectionManager: def __init__(self, base_delay1, max_delay60): self.base_delay base_delay self.max_delay max_delay self.attempts 0 def next_delay(self): delay min(self.base_delay * (2 ** self.attempts), self.max_delay) self.attempts 1 return delay random.uniform(0, 0.1 * delay) # 添加随机抖动 def reset(self): self.attempts 0使用示例reconnector ReconnectionManager() while not connect_to_server(): delay reconnector.next_delay() time.sleep(delay)3. 业务数据缓存与重发3.1 线程安全的消息队列from queue import Queue from threading import Lock class MessageBuffer: def __init__(self, max_size1000): self.queue Queue(maxsizemax_size) self.lock Lock() def put(self, message, priorityFalse): with self.lock: if priority: # 将高优先级消息插入队列前端 temp [] while not self.queue.empty(): temp.append(self.queue.get()) self.queue.put(message) for item in temp: self.queue.put(item) else: self.queue.put(message) def get_all(self): with self.lock: messages [] while not self.queue.empty(): messages.append(self.queue.get()) return messages3.2 消息确认与重发机制设计带唯一ID的消息格式{ msg_id: uuid4, timestamp: 1625097600, payload: {...}, retries: 0 }处理流程发送消息时存入待确认队列收到服务端ACK后移除对应消息定时检查超时未确认的消息30秒重试次数超过阈值如3次触发连接重置4. 完整实现示例4.1 客户端核心类import socket import threading import time import uuid from collections import deque class TCPClient: def __init__(self, host, port): self.host host self.port port self.sock None self.state ConnectionState.DISCONNECTED self.heartbeat_interval 60 self.last_heartbeat 0 self.message_buffer MessageBuffer() self.unconfirmed deque(maxlen1000) self.lock threading.RLock() def connect(self): with self.lock: if self.state ! ConnectionState.DISCONNECTED: return False self.state ConnectionState.CONNECTING threading.Thread(targetself._connect_loop, daemonTrue).start() return True def _connect_loop(self): reconnector ReconnectionManager() while True: try: self.sock socket.socket(socket.AF_INET, socket.SOCK_STREAM) set_keepalive(self.sock) self.sock.connect((self.host, self.port)) with self.lock: self.state ConnectionState.CONNECTED reconnector.reset() self.last_heartbeat time.time() # 启动接收线程 threading.Thread(targetself._recv_thread, daemonTrue).start() # 处理积压消息 self._flush_buffer() # 运行心跳循环 self._heartbeat_loop() except Exception as e: print(fConnection failed: {e}) with self.lock: self.state ConnectionState.RECONNECTING delay reconnector.next_delay() time.sleep(delay) def _heartbeat_loop(self): while True: with self.lock: if self.state ! ConnectionState.CONNECTED: break now time.time() if now - self.last_heartbeat self.heartbeat_interval: try: self.sock.sendall(HeartbeatProtocol.PING) self.last_heartbeat now except: self._handle_disconnect() break time.sleep(1) def _handle_disconnect(self): with self.lock: if self.state ConnectionState.CONNECTED: self.state ConnectionState.RECONNECTING threading.Thread(targetself._connect_loop, daemonTrue).start() def send_message(self, data, priorityFalse): with self.lock: if self.state ConnectionState.CONNECTED: try: msg_id str(uuid.uuid4()) packet self._encode_message(msg_id, data) self.sock.sendall(packet) self.unconfirmed.append({ id: msg_id, data: packet, timestamp: time.time(), retries: 0 }) return True except: self._handle_disconnect() self.message_buffer.put(data, priority) return False def _flush_buffer(self): messages self.message_buffer.get_all() for msg in messages: self.send_message(msg)4.2 服务端心跳处理def handle_client_connection(client_socket): set_keepalive(client_socket) last_active time.time() while True: try: data client_socket.recv(1024) if not data: break if HeartbeatProtocol.is_heartbeat(data): if data HeartbeatProtocol.PING: client_socket.sendall(HeartbeatProtocol.PONG) last_active time.time() else: # 处理业务数据 response process_business_data(data) client_socket.sendall(response) last_active time.time() except socket.timeout: if time.time() - last_active 300: # 5分钟无活动 break except ConnectionResetError: break client_socket.close()5. 性能优化与生产建议在实际部署中还需要考虑以下优化点连接池管理对于需要维护多个长连接的场景实现连接复用设置最大连接数阈值空闲连接回收机制流量控制# 设置发送缓冲区大小 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 8192) # 启用Nagle算法小数据包合并 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 0)监控指标连接成功率平均重连时间心跳丢失率消息往返延迟异常场景测试网络闪断1-5秒长时间断开5分钟服务端重启高负载下的连接稳定性

更多文章