本文目录导读:

跨库数据迁移是Java开发中常见的需求,比如从MySQL迁移到PostgreSQL,或者从旧库迁移到新库,核心思路是从源库读取数据,进行必要的清洗和转换,再写入目标库。
这里提供一个通用、高效、健壮的Java跨库迁移方案,包含核心代码示例和最佳实践。
方案选择:基于什么技术实现?
- Spring Batch(推荐,适合大数据量):官方推荐的批处理框架,自带分页、读写器、处理器、事务管理、失败重试等机制。
- JDBC 原生(适合少量数据或简单场景):直接获取Connection,读一行写一行。
- MyBatis-Plus + 流式查询(适合中等数据量):利用游标避免OOM。
- ETL工具(如Kettle, DataX):不写代码,但灵活性不如Java。
本回答以最主流、最健壮的 Spring Batch 为例,并结合另外几种方法的代码片段。
核心挑战与解决方案
- 数据量大导致OOM:不能一次性加载到内存。 → 使用分页查询或游标(流式)查询。
- 字段类型不一致:例如源库是
DATE,目标库是DATETIME。 → 中间加一个处理器(Processor)进行转换。 - 主键冲突或重复数据:迁移前清理目标库,或使用
REPLACE INTO、ON DUPLICATE KEY UPDATE。 - 事务与性能:按批次提交,避免长事务锁表。
- 数据一致性:记录迁移日志,支持断点续传或回滚。
Spring Batch(企业级首选)
假设我们要从 MySQL的 user_old 表 迁移到 PostgreSQL的 user_new 表。
添加依赖(Maven)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- 两个数据库的连接驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
配置双数据源(application.yml)
spring:
datasource:
source: # 源数据库 (MySQL)
url: jdbc:mysql://source-host:3306/old_db
username: root
password: pass
driver-class-name: com.mysql.cj.jdbc.Driver
target: # 目标数据库 (PostgreSQL)
url: jdbc:postgresql://target-host:5432/new_db
username: admin
password: pass
driver-class-name: org.postgresql.Driver
batch:
jdbc:
initialize-schema: always # Spring Batch 需要自己的元数据表,用于记录Job状态
Java代码实现(核心逻辑)
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.*;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import javax.sql.DataSource;
@Configuration
@EnableBatchProcessing
public class DataMigrationJobConfig {
// 1. 注入两个数据源 (由Spring配置自动创建)
@Qualifier("sourceDataSource")
private final DataSource sourceDataSource;
@Qualifier("targetDataSource")
private final DataSource targetDataSource;
public DataMigrationJobConfig(DataSource sourceDataSource, DataSource targetDataSource) {
this.sourceDataSource = sourceDataSource;
this.targetDataSource = targetDataSource;
}
// 2. 定义 Reader:从源库分页读取(使用游标,避免OOM)
@Bean
public ItemReader<UserOld> reader() {
JdbcCursorItemReader<UserOld> reader = new JdbcCursorItemReader<>();
reader.setDataSource(sourceDataSource);
reader.setSql("SELECT id, name, email, created_at FROM user_old"); // 源库表
reader.setRowMapper(new BeanPropertyRowMapper<>(UserOld.class));
// 设置 fetchSize 为合理值,逐行读取
reader.setFetchSize(1000);
return reader;
}
// 3. 定义 Processor:数据转换
@Bean
public ItemProcessor<UserOld, UserNew> processor() {
return userOld -> {
UserNew userNew = new UserNew();
userNew.setUserId(userOld.getId()); // 字段名不同,映射
userNew.setFullName(userOld.getName()); // 字符串处理
userNew.setEmail(userOld.getEmail());
// 类型转换:MySQL的TIMESTAMP转PostgreSQL的TIMESTAMP
userNew.setCreatedTime(userOld.getCreatedAt().toLocalDateTime());
// 可以在这里添加数据清洗逻辑,如脱敏、去重
return userNew;
};
}
// 4. 定义 Writer:批量写入目标库
@Bean
public ItemWriter<UserNew> writer() {
return new JdbcBatchItemWriterBuilder<UserNew>()
.dataSource(targetDataSource)
.sql("INSERT INTO user_new (user_id, full_name, email, created_time) VALUES (:userId, :fullName, :email, :createdTime)")
.beanMapped() // 自动匹配字段
.build();
}
// 5. 组装 Step:设置 chunk 大小(每1000条提交一次)
@Bean
public Step migrationStep(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("migrationStep")
.<UserOld, UserNew>chunk(1000) // 关键:批次大小
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
.retryLimit(3) // 失败重试
.retry(Exception.class)
.skip(Exception.class).skipLimit(10) // 允许跳过10条错误数据
.build();
}
// 6. 组装 Job
@Bean
public Job migrationJob(JobBuilderFactory jobBuilderFactory, Step migrationStep) {
return jobBuilderFactory.get("dataMigrationJob")
.start(migrationStep)
.build();
}
// 7. 实体类(简单示例)
public static class UserOld {
private Long id;
private String name;
private String email;
private java.util.Date createdAt; // 从MySQL来
// getter/setter 略
}
public static class UserNew {
private Long userId;
private String fullName;
private String email;
private java.time.LocalDateTime createdTime; // 写入PostgreSQL
// getter/setter 略
}
}
启动迁移
Spring Boot 启动后,Job 会自动运行(或通过 API 触发)。
@RestController
public class MigrateController {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job migrationJob;
@GetMapping("/migrate")
public String migrate() throws Exception {
JobParameters params = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis()) // 每次参数不同,避免重复执行
.toJobParameters();
jobLauncher.run(migrationJob, params);
return "Migration started!";
}
}
JDBC流式查询 + 批量写入(轻量级)
适合不想引入Spring Batch的简单项目,或者数据量几百万以下。
import java.sql.*;
public class SimpleJdbcMigration {
private static final int BATCH_SIZE = 500;
public void migrate() throws Exception {
// 1. 获取两个连接
try (Connection sourceConn = DriverManager.getConnection("jdbc:mysql://source", "user", "pass");
Connection targetConn = DriverManager.getConnection("jdbc:postgresql://target", "user", "pass")) {
// 2. 源库:流式读取 (fetchSize = Integer.MIN_VALUE 是MySQL的关键)
sourceConn.setAutoCommit(false);
String selectSql = "SELECT id, name, email FROM user_old";
PreparedStatement selectStmt = sourceConn.prepareStatement(selectSql,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
selectStmt.setFetchSize(Integer.MIN_VALUE); // MySQL驱动流式读取的关键
ResultSet rs = selectStmt.executeQuery();
// 3. 目标库:批量写入
targetConn.setAutoCommit(false);
String insertSql = "INSERT INTO user_new (user_id, full_name, email) VALUES (?, ?, ?)";
PreparedStatement insertStmt = targetConn.prepareStatement(insertSql);
int count = 0;
while (rs.next()) {
insertStmt.setLong(1, rs.getLong("id"));
insertStmt.setString(2, rs.getString("name"));
insertStmt.setString(3, rs.getString("email"));
insertStmt.addBatch();
count++;
if (count % BATCH_SIZE == 0) {
insertStmt.executeBatch();
targetConn.commit();
// 可以在这里记录进度
System.out.println("Migrated " + count + " rows...");
}
}
// 处理最后一批
insertStmt.executeBatch();
targetConn.commit();
rs.close();
selectStmt.close();
insertStmt.close();
}
}
}
注意:MySQL使用 setFetchSize(Integer.MIN_VALUE) 开启流式读取;PostgreSQL不需要,正常设置 fetchSize 即可。
特殊情况处理
-
异构数据库字段映射:如果字段名完全不同,Processor(或转换层)是必需的,可以用Map或反射处理。
-
数据量大 + 断点续传:
- Spring Batch:自带,重启Job会自动从中断的Step继续。
- JDBC:需要在目标库或Redis中记录一个
last_migrated_id,每次启动从该ID开始查询源库。
-
跨库查询(无法直接JOIN):
- 先读源库一批数据,根据外键ID去目标库查询关联数据。
- 或者:将源库需要关联的表先迁移到目标库临时表,再在目标库用SQL做JOIN。
-
写入性能优化:
- 关闭目标库的索引(迁移后再重建)。
- 使用批量写入(
addBatch+executeBatch)。 - 调整数据库的
max_allowed_packet(MySQL)或wal_buffers(PostgreSQL)。
总结建议
- 首选Spring Batch:如果你有Spring Boot环境,它提供了事务、重试、跳过、监听器、分步、并行执行等完整功能,几乎是为数据迁移量身定制。
- 次选JDBC流式:简单、无依赖,但需要自己处理错误和断点续传。
- 绝对不要:用
SELECT * FROM huge_table不加分页和流式,会直接OOM。
迁移前请务必在测试库验证,并做好 全量备份。