Redis源码探究系列—Redis 时间事件(Time Event)源码详解

张开发
2026/4/11 21:11:35 15 分钟阅读

分享文章

Redis源码探究系列—Redis 时间事件(Time Event)源码详解
欢迎各位同学关注我哦~在这个 AI 喧嚣的时代不忘初心戒骄戒躁认真沉淀在前面几篇介绍到Redis的事件驱动机制中有两类时间一直贯穿始终时间事件Time Event是与上一篇文章介绍的文件事件。这些事件在前文中也多多少少的介绍到一点。时间事件用于处理定时任务如服务器定时维护、客户端超时检测、AOF持久化、主从复制心跳等。Redis通过时间事件实现了一个轻量级的定时器机制在单线程中高效地管理各类周期性任务。在下面的内容中我们将由浅入深的去了解Redis时间事件的实现原理从数据结构到核心API再到实际应用场景。准备好了的话我们就开始咯~二、核心数据结构2.1 时间事件结构体aeTimeEvent先来简单过一过时间事件的结构体// src/ae.h:79-88typedefstructaeTimeEvent{longlongid;// 时间事件唯一标识符全局递增longwhen_sec;// 触发时间的秒数部分绝对时间longwhen_ms;// 触发时间的毫秒数部分绝对时间aeTimeProc*timeProc;// 时间事件回调函数aeEventFinalizerProc*finalizerProc;// 事件销毁时的清理函数可选void*clientData;// 用户自定义数据传递给回调函数structaeTimeEvent*prev;// 指向前一个时间事件structaeTimeEvent*next;// 指向下一个时间事件形成双向链表}aeTimeEvent;2.2 事件循环中的时间事件在事件循环中也有几个字段适合时间事件是紧密相关的// src/ae.h:97-109typedefstructaeEventLoop{intmaxfd;// 当前注册的最大文件描述符intsetsize;// 事件表容量上限longlongtimeEventNextId;// 下一个时间事件的 ID全局递增计数器time_tlastTime;// 上次处理时间事件的时间用于检测时钟偏移aeFileEvent*events;// 注册的文件事件表aeFiredEvent*fired;// 已触发的文件事件表aeTimeEvent*timeEventHead;// 时间事件链表头节点intstop;// 停止标志void*apidata;// 多路复用 API 的私有数据aeBeforeSleepProc*beforesleep;// 事件循环前回调aeBeforeSleepProc*aftersleep;// 事件循环后回调}aeEventLoop;其中有一下几个字段和时间事件的关系紧密timeEventNextId全局递增的事件ID计数器timeEventHead时间事件链表的头节点所有时间事件通过链表串联lastTime记录上次处理时间事件的时间用于检测系统时钟偏移那么为什么事件循环中要包含时间事件Redis 采用统一的事件驱动模型将文件事件网络 I/O和时间事件定时任务放在同一个事件循环中处理。这种设计有以下原因统一的处理入口aeProcessEvents函数同时处理文件事件和时间事件简化了主循环逻辑精确的超时控制时间事件决定了aeApiPoll的阻塞时长避免阻塞过久而延迟定时任务资源共享时间事件和文件事件共享同一个事件循环上下文便于在定时任务中操作网络连接单线程架构Redis主线程只有一个事件循环时间事件必须融入其中否则需要额外的线程或信号机制简单来说aeEventLoop是 Redis 事件驱动模型的核心容器它需要同时管理文件事件和时间事件才能实现高效的 I/O 多路复用与定时任务调度。三、时间事件的类型Redis 的时间事件主要分为两大类一次性时间事件周期性时间事件。3.1 一次性时间事件只执行一次事件触发执行后自动删除不再重复执行。通过回调函数返回AE_NOMORE实现// src/ae.h:51#defineAE_NOMORE-13.2 周期性时间事件顾名思义事件触发执行后根据回调函数的返回值重新计算下次触发时间实现周期执行。返回值为距离下次触发的毫秒数。Redis 中最典型的周期性时间事件是serverCron默认每100ms执行一次。四、核心 API 实现4.1 创建时间事件aeCreateTimeEvent// src/ae.c:208-228longlongaeCreateTimeEvent(aeEventLoop*eventLoop,longlongmilliseconds,aeTimeProc*proc,void*clientData,aeEventFinalizerProc*finalizerProc){longlongideventLoop-timeEventNextId;// 分配全局唯一IDaeTimeEvent*te;tezmalloc(sizeof(*te));// 分配时间事件结构体内存if(teNULL)returnAE_ERR;te-idid;// 计算触发的绝对时间当前时间延迟毫秒数aeAddMillisecondsToNow(milliseconds,te-when_sec,te-when_ms);te-timeProcproc;// 设置时间事件回调函数te-finalizerProcfinalizerProc;// 设置销毁时的清理函数te-clientDataclientData;// 设置用户自定义数据// 头插法插入双向链表新事件插入到链表头部te-prevNULL;te-nexteventLoop-timeEventHead;if(te-next)te-next-prevte;eventLoop-timeEventHeadte;returnid;// 返回事件ID用于后续删除操作}有的同学可能会有这样的疑问为什么使用头插法而不是按触发时间有序插入如果按触发时间有序插入aeSearchNearestTimer查找最近事件就只需要取链表头部复杂度从O(n)降为O(1)。但Redis选择了无序链表 头插法原因是插入效率优先有序插入需要O(n)找到插入位置而头插法只需O(1)时间事件极少Redis 通常只有1-2个时间事件主要是serverCron遍历开销几乎为零在实际场景中时间事件数量远少于文件事件优化时间事件的查找收益甚微简单来说当n很小时O(n)和O(1)的差距可以忽略简单的实现更优。创建时间事件的步骤可以简单概述如下分配全局唯一的事件ID分配并初始化时间事件结构体计算事件触发的绝对时间将事件插入双向链表头部返回事件ID4.2 删除时间事件aeDeleteTimeEvent// src/ae.c:230-241intaeDeleteTimeEvent(aeEventLoop*eventLoop,longlongid){aeTimeEvent*teeventLoop-timeEventHead;while(te){if(te-idid){te-idAE_DELETED_EVENT_ID;// 标记为已删除returnAE_OK;}tete-next;}returnAE_ERR;}删除的时间事件的时候并不是直接删除的而是采用了延迟删除先通过修改时间的id为AE_DELETED_EVENT_ID,将该事件标记为已删除的状态然后实际释放内存的操作是在processTimeEvents中进行尽量避免了在遍历链表的时候修改链表的结构而导致的一些不可预料的问题。4.3 查找最近的时间事件aeSearchNearestTimer// src/ae.c:254-267staticaeTimeEvent*aeSearchNearestTimer(aeEventLoop*eventLoop){aeTimeEvent*teeventLoop-timeEventHead;// 从链表头开始遍历aeTimeEvent*nearestNULL;// 记录最近的时间事件// 遍历整个链表找到触发时间最早的事件while(te){// 比较when_sec和when_ms找到最小的时间if(!nearest||te-when_secnearest-when_sec||(te-when_secnearest-when_secte-when_msnearest-when_ms))nearestte;// 更新最近事件tete-next;}returnnearest;// 返回最近的时间事件可能为NULL}时间事件链表是无序的需要遍历查找最早触发的事件该函数用于计算aeApiPoll的超时时间确保不会错过时间事件时间复杂度O(n)但Redis通常只有1个时间事件开销可忽略4.4 处理时间事件processTimeEvents// src/ae.c:270-342staticintprocessTimeEvents(aeEventLoop*eventLoop){intprocessed0;// 统计已处理的事件数量aeTimeEvent*te;longlongmaxId;time_tnowtime(NULL);// 获取当前时间秒// 检测系统时钟是否被向后调整时钟回退if(noweventLoop-lastTime){// 时钟被向后调整立即触发所有时间事件// 将所有事件的触发时间设为0使其立即执行teeventLoop-timeEventHead;while(te){// 直接从链表头遍历到链表尾te-when_sec0;// 设置为立即触发tete-next;}}eventLoop-lastTimenow;// 更新上次处理时间// 遍历处理所有到期的时间事件teeventLoop-timeEventHead;maxIdeventLoop-timeEventNextId-1;// 记录当前最大IDwhile(te){longnow_sec,now_ms;longlongid;// 处理被标记删除的事件从链表中移除并释放if(te-idAE_DELETED_EVENT_ID){aeTimeEvent*nextte-next;if(te-prev)te-prev-nextte-next;elseeventLoop-timeEventHeadte-next;if(te-next)te-next-prevte-prev;if(te-finalizerProc)te-finalizerProc(eventLoop,te-clientData);zfree(te);tenext;continue;}// 跳过在本次处理过程中新创建的事件ID maxId// 防止新事件在本次循环中被立即处理if(te-idmaxId){tete-next;continue;}// 获取当前精确时间秒 毫秒aeGetTime(now_sec,now_ms);// 检查事件是否已到期触发时间 当前时间if(now_secte-when_sec||(now_secte-when_secnow_mste-when_ms)){intretval;idte-id;// 调用时间事件回调函数retvalte-timeProc(eventLoop,id,te-clientData);processed;// 统计已处理的事件数// 根据返回值决定事件是否继续if(retval!AE_NOMORE){// 周期事件重新计算下次触发时间// retval 为距离下次触发的毫秒数aeAddMillisecondsToNow(retval,te-when_sec,te-when_ms);}else{// 一次性事件返回AE_NOMORE标记为删除te-idAE_DELETED_EVENT_ID;}}tete-next;// 继续处理下一个事件}returnprocessed;// 返回已处理的事件数量}在上面的方法中我们也终于看到了前文中出现的被标记为删除节点的删除过程。简单过一遍上面的这个方法后不知道大家有没有想过下面的两个问题1. 为什么用maxId跳过新创建的事件在遍历处理过程中回调函数timeProc可能会创建新的时间事件如serverCron中可能触发新的定时任务。新事件会被头插到链表头部如果继续处理可能导致同一轮循环中重复处理新事件这样就进入了无限循环新事件又创建新事件。实用maxId记录遍历开始前的最大事件 ID跳过ID更大的新事件确保每轮循环只处理已存在的事件。2. 时钟回退检测的意义系统时钟被向后调整如NTP同步、手动修改会导致定时任务延迟执行。当检测到时钟回退后强制所有时间事件立即触发确保关键任务如过期键清理、主从心跳不被无限推迟。梳理一下processTimeEvents的主要执行流程检测系统时钟偏移处理时钟回退情况遍历时间事件链表清理被标记删除的事件检查每个事件是否已到期到期事件调用回调函数根据返回值决定是否继续返回处理的事件数量4.5 计算超时时间在aeProcessEvents中会根据最近的时间事件计算aeApiPoll的超时时间// src/ae.c:369-407// ......// ......// 判断是否需要计算超时时间if(eventLoop-maxfd!-1||((flagsAE_TIME_EVENTS)!(flagsAE_DONT_WAIT))){intj;aeTimeEvent*shortestNULL;// 最近的时间事件structtimevaltv,*tvp;// 查找最近的时间事件if(flagsAE_TIME_EVENTS!(flagsAE_DONT_WAIT))shortestaeSearchNearestTimer(eventLoop);if(shortest){// 存在时间事件计算超时时间longnow_sec,now_ms;aeGetTime(now_sec,now_ms);// 获取当前时间tvptv;// 计算距离最近时间事件还有多少毫秒longlongms(shortest-when_sec-now_sec)*1000shortest-when_ms-now_ms;if(ms0){// 还有剩余时间设置为超时值tvp-tv_secms/1000;// 秒部分tvp-tv_usec(ms%1000)*1000;// 微秒部分}else{// 时间已到立即返回不阻塞tvp-tv_sec0;tvp-tv_usec0;}}else{// 没有时间事件if(flagsAE_DONT_WAIT){// 设置了不等待标志立即返回tv.tv_sectv.tv_usec0;tvptv;}else{// 没有时间事件且允许等待无限阻塞直到有文件事件tvpNULL;/* wait forever */}}// 调用 I/O 多路复用等待文件事件或超时numeventsaeApiPoll(eventLoop,tvp);// ......// ......说明如果有时间事件计算最近事件的剩余时间作为超时值如果没有时间事件可能无限等待直到有文件事件如果设置了AE_DONT_WAIT超时为0立即返回五、时间事件在 Redis 中的应用5.1 serverCron 的注册Redis启动时会在initServer中注册serverCron时间事件// src/server.c:2122-2124// 创建定时器回调这是处理后台任务的核心机制// 包括客户端超时、过期键清理等if(aeCreateTimeEvent(server.el,1,serverCron,NULL,NULL)AE_ERR){serverPanic(Cant create event loop timers.);exit(1);}serverCron是Redis的核心定时任务负责更新服务器时间缓存处理客户端超时执行过期键清理触发 AOF 重写/RDB 保存主从复制心跳集群心跳等等…5.2 serverCron 的实现// src/server.c:1090-1352intserverCron(structaeEventLoop*eventLoop,longlongid,void*clientData){intj;UNUSED(eventLoop);UNUSED(id);UNUSED(clientData);// 软件看门狗如果serverCron执行超时发送SIGALRM信号if(server.watchdog_period0)watchdogScheduleSignal(server.watchdog_period);updateCachedTime(1);// 更新服务器时间缓存避免频繁调用time()server.lruclockgetLRUClock();// 更新LRU时钟用于键淘汰// 记录峰值内存使用量if(zmalloc_used_memory()server.stat_peak_memory)server.stat_peak_memoryzmalloc_used_memory();server.resident_set_sizezmalloc_get_rss();// 获取进程实际物理内存// 处理 SIGTERM 信号安全关闭服务器if(server.shutdown_asap){if(prepareForShutdown(SHUTDOWN_NOFLAGS)C_OK)exit(0);serverLog(LL_WARNING,SIGTERM received but errors trying to shut down the server, check the logs for more information);server.shutdown_asap0;}// 每5秒打印一次数据库统计信息run_with_period(5000){for(j0;jserver.dbnum;j){longlongsize,used,vkeys;sizedictSlots(server.db[j].dict);// 哈希表槽位数useddictSize(server.db[j].dict);// 已使用键数量vkeysdictSize(server.db[j].expires);// 设置过期时间的键数量if(used||vkeys){serverLog(LL_VERBOSE,DB %d: %lld keys (%lld volatile) in %lld slots HT.,j,used,vkeys,size);}}}// 每 5 秒打印一次客户端连接信息if(!server.sentinel_mode){run_with_period(5000){serverLog(LL_VERBOSE,%lu clients connected (%lu replicas), %zu bytes in use,listLength(server.clients)-listLength(server.slaves),listLength(server.slaves),zmalloc_used_memory());}}clientsCron();// 异步处理客户端相关任务超时检测、输入输出缓冲区检查databasesCron();// 数据库后台任务过期键清理、字典rehash// 如果AOF重写被延迟因为当时有BGSAVE现在尝试执行if(server.rdb_child_pid-1server.aof_child_pid-1server.aof_rewrite_scheduled){rewriteAppendOnlyFileBackground();}// 检查后台子进程是否结束RDB或AOFif(server.rdb_child_pid!-1||server.aof_child_pid!-1||ldbPendingChildren()){intstatloc;pid_tpid;// 非阻塞等待子进程结束if((pidwait3(statloc,WNOHANG,NULL))!0){intexitcodeWEXITSTATUS(statloc);// 退出码intbysignal0;if(WIFSIGNALED(statloc))bysignalWTERMSIG(statloc);// 终止信号if(pid-1){// wait3 出错serverLog(LL_WARNING,wait3() returned an error: %s. rdb_child_pid %d, aof_child_pid %d,strerror(errno),(int)server.rdb_child_pid,(int)server.aof_child_pid);}elseif(pidserver.rdb_child_pid){// RDB 后台保存完成backgroundSaveDoneHandler(exitcode,bysignal);}elseif(pidserver.aof_child_pid){// AOF 后台重写完成backgroundRewriteDoneHandler(exitcode,bysignal);}else{if(!ldbPendingChildren()){serverLog(LL_WARNING,Warning, detected child with unmatched pid: %ld,(long)pid);}}updateDictResizePolicy();// 根据是否有子进程调整字典rehash策略}}else{// 没有后台任务检查是否需要执行RDB保存for(j0;jserver.saveparamslen;j){structsaveparam*spserver.saveparamsj;// 检查 save 配置条件变更数 sp-changes 且时间 sp-secondsif(server.dirtysp-changesserver.unixtime-server.lastsavesp-seconds){serverLog(LL_NOTICE,%d changes in %d seconds. Saving...,sp-changes,(int)sp-seconds);rdbSaveInfo rsi,*rsiptr;rsiptrrdbPopulateSaveInfo(rsi);rdbSaveBackground(server.rdb_filename,rsiptr);break;}}// 检查是否需要触发AOF重写if(server.rdb_child_pid-1server.aof_child_pid-1server.aof_rewrite_percserver.aof_current_sizeserver.aof_rewrite_min_size){// AOF文件增长率超过阈值时触发重写longlongbaselineserver.aof_rewrite_base_size?server.aof_rewrite_base_size:1;longlonggrowth(server.aof_current_size*100/baseline)-100;if(growthserver.aof_rewrite_perc){serverLog(LL_NOTICE,Starting automatic rewriting of AOF on %lld%% growth,growth);rewriteAppendOnlyFileBackground();}}}// ... 更多任务}5.3 serverCron的返回值serverCron返回下次执行的间隔时间毫秒// src/server.c:1351-1352server.cronloops;// 增加循环计数器用于run_with_period宏return1000/server.hz;// 返回下次执行的毫秒数server.hz默认值为10表示每秒执行10次返回1000/10 100即每100ms执行一次可通过配置文件调整hz参数影响定时任务的执行频率5.4 run_with_period宏serverCron中大量使用run_with_period宏来控制任务执行频率// src/server.h:452#definerun_with_period(_ms_)if((_ms_1000/server.hz)||!(server.cronloops%((_ms_)/(1000/server.hz))))宏解析如果_ms_ 1000/server.hz则每次serverCron都执行否则每隔_ms_/(1000/server.hz)次循环执行一次用法示例// 每5秒打印一次数据库信息run_with_period(5000){// 打印数据库统计信息}// 每100ms执行一次集群心跳run_with_period(100){if(server.cluster_enabled)clusterCron();}六、时间事件处理流程图时间事件是 Redis 实现定时任务的核心机制serverCron作为唯一的时间事件承载了服务器的大部分后台维护工作。欢迎各位同学关注我哦~在这个 AI 喧嚣的时代不忘初心戒骄戒躁认真沉淀

更多文章