Java案例怎么跨库迁移数据?

wen java案例 70

本文目录导读:

Java案例怎么跨库迁移数据?

  1. 方案选择:基于什么技术实现?
  2. 核心挑战与解决方案
  3. 方案一:Spring Batch(企业级首选)
  4. 方案二:JDBC流式查询 + 批量写入(轻量级)
  5. 特殊情况处理
  6. 总结建议

跨库数据迁移是Java开发中常见的需求,比如从MySQL迁移到PostgreSQL,或者从旧库迁移到新库,核心思路是从源库读取数据,进行必要的清洗和转换,再写入目标库

这里提供一个通用、高效、健壮的Java跨库迁移方案,包含核心代码示例和最佳实践。

方案选择:基于什么技术实现?

  1. Spring Batch(推荐,适合大数据量):官方推荐的批处理框架,自带分页、读写器、处理器、事务管理、失败重试等机制。
  2. JDBC 原生(适合少量数据或简单场景):直接获取Connection,读一行写一行。
  3. MyBatis-Plus + 流式查询(适合中等数据量):利用游标避免OOM。
  4. ETL工具(如Kettle, DataX):不写代码,但灵活性不如Java。

本回答以最主流、最健壮的 Spring Batch 为例,并结合另外几种方法的代码片段。


核心挑战与解决方案

  • 数据量大导致OOM:不能一次性加载到内存。 → 使用分页查询游标(流式)查询
  • 字段类型不一致:例如源库是 DATE,目标库是 DATETIME。 → 中间加一个处理器(Processor)进行转换。
  • 主键冲突或重复数据:迁移前清理目标库,或使用 REPLACE INTOON DUPLICATE KEY UPDATE
  • 事务与性能:按批次提交,避免长事务锁表。
  • 数据一致性:记录迁移日志,支持断点续传或回滚。

Spring Batch(企业级首选)

假设我们要从 MySQL的 user_old 迁移到 PostgreSQL的 user_new

添加依赖(Maven)

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- 两个数据库的连接驱动 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
</dependency>

配置双数据源(application.yml

spring:
  datasource:
    source: # 源数据库 (MySQL)
      url: jdbc:mysql://source-host:3306/old_db
      username: root
      password: pass
      driver-class-name: com.mysql.cj.jdbc.Driver
    target: # 目标数据库 (PostgreSQL)
      url: jdbc:postgresql://target-host:5432/new_db
      username: admin
      password: pass
      driver-class-name: org.postgresql.Driver
  batch:
    jdbc:
      initialize-schema: always # Spring Batch 需要自己的元数据表,用于记录Job状态

Java代码实现(核心逻辑)

import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.*;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import javax.sql.DataSource;
@Configuration
@EnableBatchProcessing
public class DataMigrationJobConfig {
    // 1. 注入两个数据源 (由Spring配置自动创建)
    @Qualifier("sourceDataSource")
    private final DataSource sourceDataSource;
    @Qualifier("targetDataSource")
    private final DataSource targetDataSource;
    public DataMigrationJobConfig(DataSource sourceDataSource, DataSource targetDataSource) {
        this.sourceDataSource = sourceDataSource;
        this.targetDataSource = targetDataSource;
    }
    // 2. 定义 Reader:从源库分页读取(使用游标,避免OOM)
    @Bean
    public ItemReader<UserOld> reader() {
        JdbcCursorItemReader<UserOld> reader = new JdbcCursorItemReader<>();
        reader.setDataSource(sourceDataSource);
        reader.setSql("SELECT id, name, email, created_at FROM user_old"); // 源库表
        reader.setRowMapper(new BeanPropertyRowMapper<>(UserOld.class));
        // 设置 fetchSize 为合理值,逐行读取
        reader.setFetchSize(1000);
        return reader;
    }
    // 3. 定义 Processor:数据转换
    @Bean
    public ItemProcessor<UserOld, UserNew> processor() {
        return userOld -> {
            UserNew userNew = new UserNew();
            userNew.setUserId(userOld.getId()); // 字段名不同,映射
            userNew.setFullName(userOld.getName()); // 字符串处理
            userNew.setEmail(userOld.getEmail());
            // 类型转换:MySQL的TIMESTAMP转PostgreSQL的TIMESTAMP
            userNew.setCreatedTime(userOld.getCreatedAt().toLocalDateTime());
            // 可以在这里添加数据清洗逻辑,如脱敏、去重
            return userNew;
        };
    }
    // 4. 定义 Writer:批量写入目标库
    @Bean
    public ItemWriter<UserNew> writer() {
        return new JdbcBatchItemWriterBuilder<UserNew>()
                .dataSource(targetDataSource)
                .sql("INSERT INTO user_new (user_id, full_name, email, created_time) VALUES (:userId, :fullName, :email, :createdTime)")
                .beanMapped() // 自动匹配字段
                .build();
    }
    // 5. 组装 Step:设置 chunk 大小(每1000条提交一次)
    @Bean
    public Step migrationStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory.get("migrationStep")
                .<UserOld, UserNew>chunk(1000) // 关键:批次大小
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .faultTolerant()
                .retryLimit(3) // 失败重试
                .retry(Exception.class)
                .skip(Exception.class).skipLimit(10) // 允许跳过10条错误数据
                .build();
    }
    // 6. 组装 Job
    @Bean
    public Job migrationJob(JobBuilderFactory jobBuilderFactory, Step migrationStep) {
        return jobBuilderFactory.get("dataMigrationJob")
                .start(migrationStep)
                .build();
    }
    // 7. 实体类(简单示例)
    public static class UserOld {
        private Long id;
        private String name;
        private String email;
        private java.util.Date createdAt; // 从MySQL来
        // getter/setter 略
    }
    public static class UserNew {
        private Long userId;
        private String fullName;
        private String email;
        private java.time.LocalDateTime createdTime; // 写入PostgreSQL
        // getter/setter 略
    }
}

启动迁移

Spring Boot 启动后,Job 会自动运行(或通过 API 触发)。

@RestController
public class MigrateController {
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private Job migrationJob;
    @GetMapping("/migrate")
    public String migrate() throws Exception {
        JobParameters params = new JobParametersBuilder()
                .addLong("time", System.currentTimeMillis()) // 每次参数不同,避免重复执行
                .toJobParameters();
        jobLauncher.run(migrationJob, params);
        return "Migration started!";
    }
}

JDBC流式查询 + 批量写入(轻量级)

适合不想引入Spring Batch的简单项目,或者数据量几百万以下。

import java.sql.*;
public class SimpleJdbcMigration {
    private static final int BATCH_SIZE = 500;
    public void migrate() throws Exception {
        // 1. 获取两个连接
        try (Connection sourceConn = DriverManager.getConnection("jdbc:mysql://source", "user", "pass");
             Connection targetConn = DriverManager.getConnection("jdbc:postgresql://target", "user", "pass")) {
            // 2. 源库:流式读取 (fetchSize = Integer.MIN_VALUE 是MySQL的关键)
            sourceConn.setAutoCommit(false);
            String selectSql = "SELECT id, name, email FROM user_old";
            PreparedStatement selectStmt = sourceConn.prepareStatement(selectSql,
                    ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            selectStmt.setFetchSize(Integer.MIN_VALUE); // MySQL驱动流式读取的关键
            ResultSet rs = selectStmt.executeQuery();
            // 3. 目标库:批量写入
            targetConn.setAutoCommit(false);
            String insertSql = "INSERT INTO user_new (user_id, full_name, email) VALUES (?, ?, ?)";
            PreparedStatement insertStmt = targetConn.prepareStatement(insertSql);
            int count = 0;
            while (rs.next()) {
                insertStmt.setLong(1, rs.getLong("id"));
                insertStmt.setString(2, rs.getString("name"));
                insertStmt.setString(3, rs.getString("email"));
                insertStmt.addBatch();
                count++;
                if (count % BATCH_SIZE == 0) {
                    insertStmt.executeBatch();
                    targetConn.commit();
                    // 可以在这里记录进度
                    System.out.println("Migrated " + count + " rows...");
                }
            }
            // 处理最后一批
            insertStmt.executeBatch();
            targetConn.commit();
            rs.close();
            selectStmt.close();
            insertStmt.close();
        }
    }
}

注意:MySQL使用 setFetchSize(Integer.MIN_VALUE) 开启流式读取;PostgreSQL不需要,正常设置 fetchSize 即可。


特殊情况处理

  1. 异构数据库字段映射:如果字段名完全不同,Processor(或转换层)是必需的,可以用Map或反射处理。

  2. 数据量大 + 断点续传

    • Spring Batch:自带,重启Job会自动从中断的Step继续。
    • JDBC:需要在目标库或Redis中记录一个last_migrated_id,每次启动从该ID开始查询源库。
  3. 跨库查询(无法直接JOIN)

    • 先读源库一批数据,根据外键ID去目标库查询关联数据。
    • 或者:将源库需要关联的表先迁移到目标库临时表,再在目标库用SQL做JOIN。
  4. 写入性能优化

    • 关闭目标库的索引(迁移后再重建)。
    • 使用批量写入(addBatch + executeBatch)。
    • 调整数据库的 max_allowed_packet(MySQL)或 wal_buffers(PostgreSQL)。

总结建议

  • 首选Spring Batch:如果你有Spring Boot环境,它提供了事务、重试、跳过、监听器、分步、并行执行等完整功能,几乎是为数据迁移量身定制。
  • 次选JDBC流式:简单、无依赖,但需要自己处理错误和断点续传。
  • 绝对不要:用 SELECT * FROM huge_table 不加分页和流式,会直接OOM。

迁移前请务必在测试库验证,并做好 全量备份

抱歉,评论功能暂时关闭!