03-高并发写架构详解

张开发
2026/4/13 23:32:08 15 分钟阅读

分享文章

03-高并发写架构详解
高并发写架构详解一、知识概述高并发写场景常见于日志采集、订单创建、消息发送、数据上报等业务,核心挑战是如何高效处理海量写入请求,同时保证数据不丢失、系统不崩溃。核心指标:写入TPS:1万 - 100万+数据可靠性:99.999%写入延迟:P99 100ms典型特征:写多读少(或读写分离)数据持续涌入可容忍短延迟(异步处理)需要持久化保证二、知识点详细讲解2.1 异步写入架构2.1.1 消息队列缓冲┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Producer │ ───→ │ MQ │ ───→ │ Consumer │ │ (应用) │ │ (缓冲池) │ │ (写入DB) │ └──────────────┘ └──────────────┘ └──────────────┘ 同步写入 削峰填谷 异步落库 10ms 百万级 可控速率2.1.2 批量写入策略/** * 批量写入缓冲区 */@ComponentpublicclassBatchWriteBufferT{privatefinalBlockingQueueTbuffer;privatefinalintbatchSize;privatefinalintflushInterval;// msprivatefinalConsumerListTwriter;publicBatchWriteBuffer(intbufferSize,intbatchSize,intflushInterval,ConsumerListTwriter){this.buffer=newLinkedBlockingQueue(bufferSize);this.batchSize=batchSize;this.flushInterval=flushInterval;this.writer=writer;// 启动后台刷新线程startFlushThread();}/** * 添加数据(非阻塞) */publicbooleanadd(Titem){returnbuffer.offer(item);// 队列满则快速失败}/** * 后台定时刷新 */privatevoidstartFlushThread(){ScheduledExecutorServiceexecutor=Executors.newSingleThreadScheduledExecutor();executor.scheduleWithFixedDelay(()-{ListTbatch=newArrayList(batchSize);// 取出批量数据buffer.drainTo(batch,batchSize);if(!batch.isEmpty()){try{// 批量写入writer.accept(batch);}catch(Exceptione){log.error("批量写入失败",e);// 失败重试或持久化到本地handleFailure(batch);}}},flushInterval,flushInterval,TimeUnit.MILLISECONDS);}}// 使用示例@BeanpublicBatchWriteBufferOrderorderWriteBuffer(OrderServiceorderService){returnnewBatchWriteBuffer(100000,// 缓冲区大小1000,// 批次大小1000,// 刷新间隔 1秒orderService::batchInsert);}2.2 分库分表写入2.2.1 写入路由策略/** * 分库分表写入服务 */@ServicepublicclassShardingWriteService{@AutowiredprivateListDataSourcedataSources;// 多数据源/** * 路由算法 */privateintrouteToDb(LongshardingKey,intdbCount){returnMath.abs(shardingKey.hashCode())%dbCount;}/** * 插入订单 */publicvoidinsertOrder(Orderorder){intdbIndex=routeToDb(order.getUserId(),dataSources.size());DataSourceds=dataSources.get(dbIndex);try(Connectionconn=ds.getConnection()){Stringsql="INSERT INTO t_order VALUES(?,?,?,?,?)";try(PreparedStatementps=conn.prepareStatement(sql)){ps.setLong(1,order.getId());ps.setLong(2,order.getUserId());ps.setBigDecimal(3,order.getAmount());ps.setInt(4,order.getStatus());ps.setTimestamp(5,newTimestamp(order.getCreateTime().getTime()));ps.executeUpdate();}}catch(SQLExceptione){log.error("插入订单失败",e);thrownewRuntimeException("数据库写入失败",e);}}/** * 批量插入 */publicvoidbatchInsert(ListOrderorders){// 按库分组MapInteger,ListOrdergroupedOrders=orders.stream().collect(Collectors.groupingBy(o-routeToDb(o.getUserId(),dataSources.size())));// 并行写入各库groupedOrders.parallelStream().forEach(entry-{intdbIndex=entry.getKey();List

更多文章