SpringBoot集成SpringBatch与XXL-Job实现高效数据批处理(实战指南)

张开发
2026/4/16 12:56:36 15 分钟阅读

分享文章

SpringBoot集成SpringBatch与XXL-Job实现高效数据批处理(实战指南)
1. 为什么需要SpringBatchXXL-Job组合方案最近接手了一个老项目的重构任务原来的定时数据同步功能用的是Spring自带的Scheduled随着数据量增长逐渐暴露出两个致命问题一是任务执行耗时越来越长经常出现超时失败二是缺乏可视化监控出了问题只能翻日志。经过技术调研最终选择了SpringBatchXXL-Job的组合方案这里分享下我的实战经验。SpringBatch作为轻量级批处理框架在处理大数据量时有三把利器分片处理Partitioning可以把50万数据拆成多个分片并行处理事务块Chunk机制每处理1000条提交一次事务避免内存溢出完善的元数据管理能记录每次处理的读写数量。而XXL-Job作为分布式任务调度平台则解决了任务可视化管控、失败告警、调度日志等运维痛点。实测下来这个组合在处理百万级数据同步时相比原生方案性能提升8倍以上。比如原来需要2小时的月结报表生成现在15分钟就能跑完。下面通过两个典型场景带你完整实现这套方案。2. 环境准备与基础配置2.1 项目依赖配置首先在pom.xml引入关键依赖注意要排除SpringBoot自带的HikariCP以避免连接池冲突dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-batch/artifactId /dependency dependency groupIdcom.xuxueli/groupId artifactIdxxl-job-core/artifactId version2.3.1/version /dependency !-- 使用Druid连接池 -- dependency groupIdcom.alibaba/groupId artifactIddruid-spring-boot-starter/artifactId version1.2.16/version /dependency2.2 数据库表初始化SpringBatch需要9张元数据表来记录任务执行状态建表脚本在org.springframework.batch.core目录下。这里有个避坑点BATCH_JOB_INSTANCE表的JOB_KEY字段长度在MySQL5.7下需要改为100字符否则运行时会报错。建议直接使用我调整过的脚本CREATE TABLE BATCH_JOB_INSTANCE ( JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY, VERSION BIGINT, JOB_NAME VARCHAR(100) NOT NULL, JOB_KEY VARCHAR(100) NOT NULL, -- 修改为100长度 constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY) ) ENGINEInnoDB;2.3 基础配置项在application.yml中需要关闭SpringBatch的自动启动否则项目启动时会立即执行所有批处理任务spring: batch: job: enabled: false xxl: job: admin: addresses: http://xxl-job-admin:8080/xxl-job-admin executor: appname:>Bean public FlatFileItemReaderUser csvReader() { return new FlatFileItemReaderBuilderUser() .name(userItemReader) .resource(new ClassPathResource(user.csv)) .delimited() .names(id,name,age,email) // 对应CSV列名 .fieldSetMapper(new BeanWrapperFieldSetMapperUser() {{ setTargetType(User.class); }}) .linesToSkip(1) // 跳过标题行 .build(); }3.2 数据库写入配置采用MyBatis批量写入注意要设置sqlSessionTemplate的ExecutorType为BATCH模式Bean public MyBatisBatchItemWriterUser dbWriter() { SqlSessionTemplate sqlSessionTemplate new SqlSessionTemplate( sqlSessionFactory, ExecutorType.BATCH); return new MyBatisBatchItemWriterBuilderUser() .sqlSessionTemplate(sqlSessionTemplate) .statementId(com.example.mapper.UserMapper.batchInsert) .assertUpdates(false) // 允许零更新 .build(); }3.3 任务步骤与并发控制配置Chunk大小为5000并启用异步线程池提升性能Bean public Step csvImportStep() { return stepBuilderFactory.get(csvImportStep) .User, Userchunk(5000) .reader(csvReader()) .processor(userValidator()) .writer(dbWriter()) .taskExecutor(taskExecutor()) // 自定义线程池 .throttleLimit(5) // 最大并发线程数 .build(); } Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setQueueCapacity(20); return executor; }4. 实现数据库间分片同步4.1 分片策略设计当需要从表A同步500万数据到表B时采用分片处理方案。通过Partitioner将数据划分为多个区间public class RangePartitioner implements Partitioner { Override public MapString, ExecutionContext partition(int gridSize) { MapString, ExecutionContext result new HashMap(); int totalRecords 5000000; int range totalRecords / gridSize; for (int i 0; i gridSize; i) { ExecutionContext context new ExecutionContext(); context.putInt(minValue, i * range); context.putInt(maxValue, (i 1) * range); result.put(partition i, context); } return result; } }4.2 从库读取配置使用MyBatisPagingItemReader实现分页查询注意要设置pageSize与chunkSize一致Bean StepScope public MyBatisPagingItemReaderUser pagingReader( Value(#{stepExecutionContext[minValue]}) int min, Value(#{stepExecutionContext[maxValue]}) int max) { MapString, Object params new HashMap(); params.put(minId, min); params.put(maxId, max); MyBatisPagingItemReaderUser reader new MyBatisPagingItemReader(); reader.setSqlSessionFactory(sqlSessionFactory); reader.setQueryId(com.example.mapper.UserMapper.selectByRange); reader.setParameterValues(params); reader.setPageSize(1000); // 关键参数 return reader; }4.3 主从步骤配置主步骤负责分片调度从步骤处理具体数据Bean public Step masterStep() { return stepBuilderFactory.get(masterStep) .partitioner(slaveStep, partitioner()) .partitionHandler(partitionHandler()) .build(); } Bean public PartitionHandler partitionHandler() { TaskExecutorPartitionHandler handler new TaskExecutorPartitionHandler(); handler.setStep(slaveStep()); handler.setTaskExecutor(taskExecutor()); handler.setGridSize(10); // 分片数量 return handler; }5. 与XXL-Job集成实战5.1 任务Handler实现在XXL-Job中创建执行器通过JobLauncher触发批处理任务XxlJob(dataSyncHandler) public void execute() throws Exception { JobParameters params new JobParametersBuilder() .addLong(startTime, System.currentTimeMillis()) .toJobParameters(); JobExecution execution jobLauncher.run(dataSyncJob(), params); if (execution.getStatus() BatchStatus.FAILED) { throw new RuntimeException(批处理执行失败); } }5.2 失败重试策略在XXL-Job控制台配置重试策略配合SpringBatch的Skip机制实现容错Bean public Step faultTolerantStep() { return stepBuilderFactory.get(faultTolerantStep) .User, Userchunk(1000) .reader(reader()) .writer(writer()) .faultTolerant() .skipLimit(100) // 最大跳过次数 .skip(DataIntegrityViolationException.class) .noRetry(OptimisticLockingFailureException.class) .retryLimit(3) .retry(DeadlockLoserDataAccessException.class) .build(); }5.3 监控指标对接将SpringBatch的元数据与XXL-Job的日志系统对接XxlJob(monitorHandler) public void monitor() { ListJobInstance instances jobExplorer.getJobInstances(dataSyncJob, 0, 10); instances.forEach(instance - { JobExecution lastExecution jobExplorer.getLastJobExecution(instance); log.info(任务[{}] 状态: {}, 读取: {}, 写入: {}, instance.getJobName(), lastExecution.getStatus(), lastExecution.getStepExecutions() .stream().findFirst() .map(StepExecution::getReadCount) .orElse(0), lastExecution.getStepExecutions() .stream().findFirst() .map(StepExecution::getWriteCount) .orElse(0)); }); }6. 性能优化技巧6.1 内存控制方案处理千万级数据时需要特别注意内存使用在JVM参数中添加-XX:UseG1GC启用G1垃圾回收器配置ItemReader时设置saveStatefalse避免状态缓存使用JdbcCursorItemReader替代JdbcPagingItemReaderBean public JdbcCursorItemReaderUser cursorReader() { return new JdbcCursorItemReaderBuilderUser() .dataSource(dataSource) .name(cursorReader) .saveState(false) // 关键参数 .sql(SELECT * FROM large_table) .rowMapper(new BeanPropertyRowMapper(User.class)) .build(); }6.2 批处理参数调优根据服务器配置调整关键参数chunkSize建议1000-5000之间throttleLimit不超过线程池maxPoolSizefetchSize对Oracle设置为1000MySQL设置为Integer.MIN_VALUEBean public Step optimizedStep() { return stepBuilderFactory.get(optimizedStep) .User, Userchunk(3000) // 适中值 .reader(jdbcReader()) .writer(writer()) .fetchSize(1000) // Oracle建议值 .throttleLimit(8) // 根据CPU核数调整 .build(); }6.3 分布式锁方案当多个执行器同时运行时需要加分布式锁避免重复处理BeforeStep public void beforeStep(StepExecution stepExecution) { String lockKey syncLock: stepExecution.getJobParameters().getString(run.id); try { boolean locked redisTemplate.opsForValue() .setIfAbsent(lockKey, 1, 30, TimeUnit.MINUTES); if (!locked) { throw new JobExecutionException(任务正在其他节点执行); } } finally { stepExecution.getExecutionContext().put(lockKey, lockKey); } } AfterStep public void afterStep(StepExecution stepExecution) { String lockKey (String) stepExecution.getExecutionContext().get(lockKey); redisTemplate.delete(lockKey); }

更多文章