数据迁移脚本怎么写?

wen 实用脚本 69

数据迁移脚本怎么写?从零到生产级的最佳实践指南

📖 目录导读

  1. 数据迁移脚本的核心挑战与误区
  2. 迁移脚本设计的五大原则
  3. 主流数据迁移工具与脚本语言选型
  4. 实战:一个完整的数据迁移脚本模板(Python版)
  5. 常见问题(Q&A)
  6. 性能优化与异常处理策略
  7. 迁移后的数据校验与回滚方案

数据迁移脚本的核心挑战与误区

很多开发者在第一次写数据迁移脚本时,容易陷入“直接写SQL跑就完事”的思维。真正生产级的数据迁移脚本需要解决:

数据迁移脚本怎么写?

  • 数据一致性:迁移过程中新增/修改的数据如何处理?
  • 断点续传:如果脚本执行到一半崩溃,能否从失败点继续?
  • 类型转换:源库与目标库字段类型不匹配(如MySQL的DATETIME vs PostgreSQL的TIMESTAMPTZ)的兼容处理。
  • 性能瓶颈:全表SELECT + INSERT在千万级数据下可能导致数据库死锁或OOM。

误区一:认为迁移脚本就是一次性的,不需要考虑可维护性。
误区二:忽略数据清洗,直接搬运脏数据。


迁移脚本设计的五大原则

1 幂等性(Idempotency)

每次执行迁移脚本应产生相同的结果,使用INSERT ... ON DUPLICATE KEY UPDATEMERGE语句,而非简单INSERT。

2 可追溯性

每条迁移记录都应包含:迁移时间戳、执行批次ID、源记录唯一键,建议在目标库增加migration_log表。

3 小批量+可配置

避免一次性加载100万条记录到内存,使用分页(LIMIT/OFFSET)或游标方式,每批处理1000-5000条(根据字段宽度调整)。

4 错误隔离

单条记录转换失败不应中断整个迁移,记录错误日志到独立表,支持后续重试。

5 数据校验驱动

迁移脚本应自带“数据对比”功能,在迁移完成后自动检查源和目标库的总数、关键字段MD5值。


主流数据迁移工具与脚本语言选型

场景 推荐工具/语言 优点
同构数据库小规模迁移 SQL脚本(INSERT SELECT) 零依赖,执行快
异构数据库(如 MySQL → PostgreSQL) Python + pandas / sqlachemy 灵活的类型映射
云数据库迁移(AWS RDS → Aurora) AWS DMS(Database Migration Service) 支持CDC(持续变更捕获)
大数据量批处理(亿级别) Apache Spark / 阿里云DataWorks 分布式计算,内置断点续传

本文重点讲解Python脚本方案,因为它最灵活且易于理解。


实战:一个完整的数据迁移脚本模板(Python版)

import pandas as pd
from sqlalchemy import create_engine, text
import logging
import time
from datetime import datetime
# 配置日志
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 数据库连接(示例:MySQL → PostgreSQL)
source_engine = create_engine("mysql+pymysql://user:pass@host/source_db?charset=utf8mb4")
target_engine = create_engine("postgresql+psycopg2://user:pass@host/target_db")
# 配置参数
BATCH_SIZE = 2000          # 每批处理行数
TABLE_NAME = "users"       # 要迁移的表
STATUS_TABLE = "migration_status"  # 记录迁移进度的表
def migrate_table():
    """核心迁移函数:支持断点续传"""
    # 1. 获取上次迁移的断点
    last_id = get_last_migrated_id(TABLE_NAME)
    # 2. 分页读取源数据
    offset = last_id
    while True:
        start_time = time.time()
        query = text(f"""
            SELECT * FROM {TABLE_NAME} 
            WHERE id > :offset 
            ORDER BY id 
            LIMIT :limit
        """)
        df = pd.read_sql(query, source_engine, params={"offset": offset, "limit": BATCH_SIZE})
        if df.empty:
            logger.info(f"{TABLE_NAME} 迁移完成,总行数: {offset}")
            break
        # 3. 数据清洗与转换
        df = transform_data(df)  # 自定义转换函数
        # 4. 批量写入目标库(使用事务)
        with target_engine.begin() as conn:
            df.to_sql(TABLE_NAME, conn, if_exists='append', index=False, method='multi')
            # 5. 更新断点记录
            max_id = df['id'].max()
            conn.execute(text(f"""
                INSERT INTO {STATUS_TABLE} (table_name, last_id, updated_at)
                VALUES (:tbl, :lid, :now)
                ON CONFLICT (table_name) DO UPDATE SET last_id = :lid, updated_at = :now
            """), {"tbl": TABLE_NAME, "lid": max_id, "now": datetime.now()})
        offset = max_id
        elapsed = time.time() - start_time
        logger.info(f"批次完成: 处理 {len(df)} 行, 耗时 {elapsed:.2f}s, 当前ID: {offset}")
def transform_data(df):
    """字段类型转换与数据清洗示例"""
    # 示例1: PostgreSQL不支持MySQL的tinyint,转换为boolean
    if 'is_active' in df.columns:
        df['is_active'] = df['is_active'].astype(bool)
    # 示例2: 处理JSON字段的读取兼容性
    if 'metadata' in df.columns:
        df['metadata'] = df['metadata'].apply(lambda x: json.dumps(x) if isinstance(x, dict) else x)
    return df
def get_last_migrated_id(table_name):
    """从状态表读取上次迁移的断点ID"""
    query = text(f"SELECT COALESCE(MAX(last_id), 0) FROM {STATUS_TABLE} WHERE table_name = :tbl")
    with target_engine.connect() as conn:
        result = conn.execute(query, {"tbl": table_name}).scalar()
    return result
if __name__ == "__main__":
    migrate_table()

关键点说明

  • 使用ORDER BY idWHERE id > :offset代替LIMIT/OFFSET,避免因删除数据导致的偏移偏移(Skew)。
  • if_exists='append'确保目标表已存在时不覆盖。
  • method='multi'一次性插入多行,性能比逐行INSERT提高10倍以上。

常见问题(Q&A)

Q1: 迁移过程中源库数据还在变化,怎么保证一致性?

:对于关键业务,建议在低峰期执行迁移,并配合以下策略:

  1. 快照读:如果是MySQL,可设置session.transaction_isolation = REPEATABLE-READ,读取一致性快照。
  2. 增量迁移:第一次全量迁移后,使用CDC工具(如Debezium)捕获后续变更,最终切换到目标库。

Q2: 目标表缺少源表的某些字段怎么办?

:在transform_data阶段进行处理:

  • 忽略不需要的字段:df = df.drop(columns=['unused_col'])
  • 设置默认值:df['new_col'] = None 或使用fillna()
  • 写迁移前,一定要检查目标表结构:SELECT column_name, data_type FROM information_schema.columns WHERE table_name = ...

Q3: 迁移脚本跑崩了,需要手动回滚怎么办?

:脚本设计时就要包含回滚逻辑:

  • 在迁移之前,先对目标表做完整备份:CREATE TABLE users_bak AS SELECT * FROM users
  • 或者记录每个批次的最小、最大ID,便于定向删除:DELETE FROM target.users WHERE id BETWEEN :start AND :end

性能优化与异常处理策略

1 分批大小调优

  • MySQL内存限制:BATCH_SIZE 建议设置为 innodb_buffer_pool_size 的0.1%(例如2GB内存池对应0.2万行)。
  • 观察磁盘IO:如果iowait高,降低BATCH_SIZE;如果网络延迟高,适当增大(如5000)。

2 并发处理

使用Python的concurrent.futuresasyncio对不同表并行迁移,但要避免超过数据库连接池上限:

from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as executor:
    executor.map(migrate_table, ['users', 'orders', 'products'])

3 异常处理增强

try:
    df.to_sql(...)
except Exception as e:
    logger.error(f"批次写入失败,ID范围: {start_id}-{offset}, 错误: {e}")
    # 将失败批次写入 error_log 表,包含完整数据行
    df['error_msg'] = str(e)
    df.to_sql('error_log', target_engine, if_exists='append')

迁移后的数据校验与回滚方案

1 自动化校验脚本

-- 对比记录数
SELECT 'source' as db, COUNT(*) FROM source_db.users
UNION ALL
SELECT 'target' as db, COUNT(*) FROM target_db.users;
-- 对比关键字段MD5(适用于小表)
SELECT MD5(CONCAT_WS(',', id, name, email)) as checksum
FROM source_db.users
EXCEPT
SELECT MD5(CONCAT_WS(',', id, name, email)) as checksum
FROM target_db.users;

2 回滚方案

  • 全量回滚:执行备份表的INSERT INTO users SELECT * FROM users_bak
  • 增量回滚:根据migration_status表记录的批次ID,执行DELETE FROM users WHERE id > last_successful_id

通过以上实践,你写的将不再是一个“跑完就扔”的脚本,而是可维护、可监控、可回滚的数据迁移系统。好的迁移脚本,不怕失败,因为它知道如何从失败中优雅地恢复。

抱歉,评论功能暂时关闭!