实用脚本能批量迁移吗?

wen 实用脚本 10

本文目录导读:

实用脚本能批量迁移吗?

  1. 场景一:批量迁移文件(本地到远程,或远程到远程)
  2. 场景二:批量迁移 MySQL 数据库
  3. 场景三:批量迁移 S3/对象存储(AWS S3 到另一个桶或本地)
  4. 场景四:通用批量迁移框架(支持重试、日志、进度条)
  5. 批量迁移的通用技巧
  6. 建议

是的,实用脚本完全可以实现批量迁移,批量迁移的核心思路是自动化重复性操作,常见的迁移场景包括:

  1. 文件/目录迁移(从一个服务器到另一个服务器,或本地到云端)
  2. 数据库迁移(将数据从一个数据库批量导入另一个库)
  3. 云资源迁移(例如在AWS/Azure/GCP之间迁移对象存储文件)
  4. 容器/镜像迁移(批量推送镜像到新仓库)

下面我给你几个可直接使用的实用脚本示例,涵盖最经典的场景。


批量迁移文件(本地到远程,或远程到远程)

使用 rsync + shell 循环,这是最稳定、增量传输的首选。

脚本:batch_migrate_files.sh

#!/bin/bash
# 配置
SOURCE_DIR="/data/projects"
TARGET_USER="root"
TARGET_IP="192.168.1.100"
TARGET_DIR="/backup/projects"
SSH_PORT=22
LOG_FILE="./migrate_$(date +%Y%m%d_%H%M%S).log"
# 需要迁移的项目列表(每行一个文件夹名)
PROJECT_LIST=("project_a" "project_b" "project_c" "project_d")
echo "开始批量迁移文件..." | tee -a $LOG_FILE
for project in "${PROJECT_LIST[@]}"; do
    echo "正在迁移: $project" | tee -a $LOG_FILE
    # 使用 rsync 进行增量同步,断点续传
    rsync -avz --progress --partial \
          -e "ssh -p $SSH_PORT" \
          "${SOURCE_DIR}/${project}/" \
          "${TARGET_USER}@${TARGET_IP}:${TARGET_DIR}/${project}/"
    if [ $? -eq 0 ]; then
        echo "✅ $project 迁移成功" | tee -a $LOG_FILE
    else
        echo "❌ $project 迁移失败,请检查日志" | tee -a $LOG_FILE
    fi
    echo "------------------------" | tee -a $LOG_FILE
done
echo "全部迁移完成!日志文件: $LOG_FILE"

优点:

  • 支持断点续传(--partial
  • 自动跳过已存在的、未修改的文件(-u 或默认行为)
  • 保留权限、时间戳(-a

批量迁移 MySQL 数据库

当你有几十个数据库需要从旧服务器迁移到新服务器时。

脚本:batch_migrate_mysql.sh

#!/bin/bash
# 旧库配置
OLD_HOST="old-db.example.com"
OLD_USER="root"
OLD_PASS="OldPass123"
# 新库配置
NEW_HOST="new-db.example.com"
NEW_USER="root"
NEW_PASS="NewPass456"
# 要迁移的数据库列表(排除系统库)
DB_LIST=($(mysql -h $OLD_HOST -u $OLD_USER -p$OLD_PASS -e "SHOW DATABASES;" | grep -v "Database\|information_schema\|performance_schema\|mysql\|sys"))
echo "找到 ${#DB_LIST[@]} 个数据库要迁移"
for DB in "${DB_LIST[@]}"; do
    echo "正在导出 $DB..."
    # 导出(排除系统自带的 event 和触发器可能带来的问题)
    mysqldump -h $OLD_HOST -u $OLD_USER -p$OLD_PASS \
              --single-transaction --routines --triggers --events \
              $DB > ./${DB}.sql
    if [ $? -ne 0 ]; then
        echo "❌ 导出 $DB 失败"
        continue
    fi
    # 创建目标库(如果不存在)
    mysql -h $NEW_HOST -u $NEW_USER -p$NEW_PASS -e "CREATE DATABASE IF NOT EXISTS \`$DB\` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;"
    echo "正在导入 $DB 到目标库..."
    mysql -h $NEW_HOST -u $NEW_USER -p$NEW_PASS $DB < ./${DB}.sql
    if [ $? -eq 0 ]; then
        echo "✅ $DB 迁移成功"
        rm -f ./${DB}.sql  # 删除临时文件,节省磁盘
    else
        echo "❌ $DB 导入失败,保留 sql 文件以供排查"
    fi
    echo "--- 休眠 2 秒,避免目标库压力过大 ---"
    sleep 2
done
echo "所有数据库迁移完成"

重要提示:

  • 对大库建议使用 pv 工具查看进度:mysqldump ... | pv | mysql ...
  • 添加 --compress 选项可减少网络传输量
  • 如果数据量大,考虑使用 mydumper/myloader 更高效

批量迁移 S3/对象存储(AWS S3 到另一个桶或本地)

使用 aws s3 sync 命令配合循环。

脚本:batch_migrate_s3.sh

#!/bin/bash
# 配置
SOURCE_BUCKET="my-old-bucket"
DEST_BUCKET="my-new-bucket"
REGION="us-east-1"
PROFILE="migration-profile"  # 如果有跨账号权限,用不同 profile
# 要迁移的文件夹前缀(可以只迁移部分数据)
PREFIXES=("images/" "videos/" "documents/2023/" "documents/2024/")
echo "开始批量迁移 S3 对象..."
for prefix in "${PREFIXES[@]}"; do
    echo "处理前缀: s3://${SOURCE_BUCKET}/${prefix}"
    aws s3 sync "s3://${SOURCE_BUCKET}/${prefix}" \
                "s3://${DEST_BUCKET}/${prefix}" \
                --region $REGION \
                --profile $PROFILE \
                --acl bucket-owner-full-control  # 如果跨账号,确保目标有权限
    if [ $? -eq 0 ]; then
        echo "✅ 前缀 $prefix 迁移成功"
    else
        echo "❌ 前缀 $prefix 迁移失败"
    fi
done
# 验证完整性(可选:检查总对象数)
echo "源桶对象数:"
aws s3 ls s3://${SOURCE_BUCKET}/ --recursive --summarize --profile $PROFILE | tail -5
echo "目标桶对象数:"
aws s3 ls s3://${DEST_BUCKET}/ --recursive --summarize --profile $PROFILE | tail -5

注意:

  • 如果跨云(阿里云OSS → AWS S3),需要使用 rclone 工具,配置两个 remote
  • 也可以用 s5cmd 工具,速度比 aws cli 快 10 倍以上

通用批量迁移框架(支持重试、日志、进度条)

这是一个更通用的模板,你可以用它来包装任何迁移逻辑。

脚本:generic_batch_migrate.py

#!/usr/bin/env python3
import subprocess
import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("migration.log"),
        logging.StreamHandler()
    ]
)
# 要迁移的 "任务" 列表
TASKS = [
    {"id": "001", "source": "db1", "dest": "db1_new", "type": "mysql"},
    {"id": "002", "source": "/data/app1", "dest": "user@192.168.1.10:/backup/app1", "type": "rsync"},
    {"id": "003", "source": "s3://bucket/images", "dest": "s3://new-bucket/images", "type": "s3"}
]
def migrate_single_task(task):
    """执行单个迁移任务,包含重试逻辑"""
    max_retries = 3
    for attempt in range(1, max_retries + 1):
        logging.info(f"开始任务 {task['id']}: {task['source']} -> {task['dest']} (第 {attempt} 次尝试)")
        try:
            if task['type'] == 'mysql':
                # 你的 MySQL 迁移命令
                cmd = f"mysqldump -h old_host {task['source']} | mysql -h new_host {task['dest']}"
            elif task['type'] == 'rsync':
                cmd = f"rsync -avz {task['source']} {task['dest']}"
            elif task['type'] == 's3':
                cmd = f"aws s3 sync {task['source']} {task['dest']}"
            else:
                raise ValueError(f"未知类型: {task['type']}")
            result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=3600)
            if result.returncode == 0:
                logging.info(f"✅ 任务 {task['id']} 成功完成")
                return True, task['id']
            else:
                logging.error(f"❌ 任务 {task['id']} 失败: {result.stderr[:200]}")
        except subprocess.TimeoutExpired:
            logging.warning(f"任务 {task['id']} 超时")
        except Exception as e:
            logging.error(f"任务 {task['id']} 异常: {str(e)}")
        if attempt < max_retries:
            wait_time = 2 ** attempt  # 指数退避
            logging.info(f"等待 {wait_time} 秒后重试...")
            time.sleep(wait_time)
    return False, task['id']
# 使用线程池并行执行(根据网络带宽和磁盘IO调整)
with ThreadPoolExecutor(max_workers=3) as executor:
    futures = {executor.submit(migrate_single_task, task): task for task in TASKS}
    for future in as_completed(futures):
        success, task_id = future.result()
        if success:
            print(f"任务 {task_id} 迁移成功")
        else:
            print(f"任务 {task_id} 迁移失败,请查看日志")

批量迁移的通用技巧

技巧 说明
断点续传 rsync 的 --partial,aws s3 sync 的默认行为
限速 rsync 加 --bwlimit=10000(KB/s),避免占满带宽
并行度 控制同时运行的任务数(上述Python脚本的max_workers
校验 迁移后做一次 diff -rmd5sum 校验
日志 始终记录时间戳、成功/失败、错误信息
幂等性 脚本设计成可多次运行而不破坏已有数据

建议

  1. 先小规模测试:选 1-2 个最小项迁移,验证路径、权限、连接都正确
  2. 关注目标存储容量:迁移前确保目标有足够的空间
  3. 迁移顺序:如果有关联性(如先迁数据库再迁文件),注意调整任务顺序
  4. 监控磁盘和网络:使用 iotopnload 查看资源消耗

你具体想迁移什么类型的数据?服务器、数据库、对象存储还是容器?告诉我详细情况,我可以给你写出一步到位的专用脚本

抱歉,评论功能暂时关闭!