本文目录导读:

- 场景一:批量迁移文件(本地到远程,或远程到远程)
- 场景二:批量迁移 MySQL 数据库
- 场景三:批量迁移 S3/对象存储(AWS S3 到另一个桶或本地)
- 场景四:通用批量迁移框架(支持重试、日志、进度条)
- 批量迁移的通用技巧
- 建议
是的,实用脚本完全可以实现批量迁移,批量迁移的核心思路是自动化重复性操作,常见的迁移场景包括:
- 文件/目录迁移(从一个服务器到另一个服务器,或本地到云端)
- 数据库迁移(将数据从一个数据库批量导入另一个库)
- 云资源迁移(例如在AWS/Azure/GCP之间迁移对象存储文件)
- 容器/镜像迁移(批量推送镜像到新仓库)
下面我给你几个可直接使用的实用脚本示例,涵盖最经典的场景。
批量迁移文件(本地到远程,或远程到远程)
使用 rsync + shell 循环,这是最稳定、增量传输的首选。
脚本:batch_migrate_files.sh
#!/bin/bash
# 配置
SOURCE_DIR="/data/projects"
TARGET_USER="root"
TARGET_IP="192.168.1.100"
TARGET_DIR="/backup/projects"
SSH_PORT=22
LOG_FILE="./migrate_$(date +%Y%m%d_%H%M%S).log"
# 需要迁移的项目列表(每行一个文件夹名)
PROJECT_LIST=("project_a" "project_b" "project_c" "project_d")
echo "开始批量迁移文件..." | tee -a $LOG_FILE
for project in "${PROJECT_LIST[@]}"; do
echo "正在迁移: $project" | tee -a $LOG_FILE
# 使用 rsync 进行增量同步,断点续传
rsync -avz --progress --partial \
-e "ssh -p $SSH_PORT" \
"${SOURCE_DIR}/${project}/" \
"${TARGET_USER}@${TARGET_IP}:${TARGET_DIR}/${project}/"
if [ $? -eq 0 ]; then
echo "✅ $project 迁移成功" | tee -a $LOG_FILE
else
echo "❌ $project 迁移失败,请检查日志" | tee -a $LOG_FILE
fi
echo "------------------------" | tee -a $LOG_FILE
done
echo "全部迁移完成!日志文件: $LOG_FILE"
优点:
- 支持断点续传(
--partial) - 自动跳过已存在的、未修改的文件(
-u或默认行为) - 保留权限、时间戳(
-a)
批量迁移 MySQL 数据库
当你有几十个数据库需要从旧服务器迁移到新服务器时。
脚本:batch_migrate_mysql.sh
#!/bin/bash
# 旧库配置
OLD_HOST="old-db.example.com"
OLD_USER="root"
OLD_PASS="OldPass123"
# 新库配置
NEW_HOST="new-db.example.com"
NEW_USER="root"
NEW_PASS="NewPass456"
# 要迁移的数据库列表(排除系统库)
DB_LIST=($(mysql -h $OLD_HOST -u $OLD_USER -p$OLD_PASS -e "SHOW DATABASES;" | grep -v "Database\|information_schema\|performance_schema\|mysql\|sys"))
echo "找到 ${#DB_LIST[@]} 个数据库要迁移"
for DB in "${DB_LIST[@]}"; do
echo "正在导出 $DB..."
# 导出(排除系统自带的 event 和触发器可能带来的问题)
mysqldump -h $OLD_HOST -u $OLD_USER -p$OLD_PASS \
--single-transaction --routines --triggers --events \
$DB > ./${DB}.sql
if [ $? -ne 0 ]; then
echo "❌ 导出 $DB 失败"
continue
fi
# 创建目标库(如果不存在)
mysql -h $NEW_HOST -u $NEW_USER -p$NEW_PASS -e "CREATE DATABASE IF NOT EXISTS \`$DB\` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;"
echo "正在导入 $DB 到目标库..."
mysql -h $NEW_HOST -u $NEW_USER -p$NEW_PASS $DB < ./${DB}.sql
if [ $? -eq 0 ]; then
echo "✅ $DB 迁移成功"
rm -f ./${DB}.sql # 删除临时文件,节省磁盘
else
echo "❌ $DB 导入失败,保留 sql 文件以供排查"
fi
echo "--- 休眠 2 秒,避免目标库压力过大 ---"
sleep 2
done
echo "所有数据库迁移完成"
重要提示:
- 对大库建议使用
pv工具查看进度:mysqldump ... | pv | mysql ... - 添加
--compress选项可减少网络传输量 - 如果数据量大,考虑使用
mydumper/myloader更高效
批量迁移 S3/对象存储(AWS S3 到另一个桶或本地)
使用 aws s3 sync 命令配合循环。
脚本:batch_migrate_s3.sh
#!/bin/bash
# 配置
SOURCE_BUCKET="my-old-bucket"
DEST_BUCKET="my-new-bucket"
REGION="us-east-1"
PROFILE="migration-profile" # 如果有跨账号权限,用不同 profile
# 要迁移的文件夹前缀(可以只迁移部分数据)
PREFIXES=("images/" "videos/" "documents/2023/" "documents/2024/")
echo "开始批量迁移 S3 对象..."
for prefix in "${PREFIXES[@]}"; do
echo "处理前缀: s3://${SOURCE_BUCKET}/${prefix}"
aws s3 sync "s3://${SOURCE_BUCKET}/${prefix}" \
"s3://${DEST_BUCKET}/${prefix}" \
--region $REGION \
--profile $PROFILE \
--acl bucket-owner-full-control # 如果跨账号,确保目标有权限
if [ $? -eq 0 ]; then
echo "✅ 前缀 $prefix 迁移成功"
else
echo "❌ 前缀 $prefix 迁移失败"
fi
done
# 验证完整性(可选:检查总对象数)
echo "源桶对象数:"
aws s3 ls s3://${SOURCE_BUCKET}/ --recursive --summarize --profile $PROFILE | tail -5
echo "目标桶对象数:"
aws s3 ls s3://${DEST_BUCKET}/ --recursive --summarize --profile $PROFILE | tail -5
注意:
- 如果跨云(阿里云OSS → AWS S3),需要使用
rclone工具,配置两个 remote - 也可以用
s5cmd工具,速度比 aws cli 快 10 倍以上
通用批量迁移框架(支持重试、日志、进度条)
这是一个更通用的模板,你可以用它来包装任何迁移逻辑。
脚本:generic_batch_migrate.py
#!/usr/bin/env python3
import subprocess
import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("migration.log"),
logging.StreamHandler()
]
)
# 要迁移的 "任务" 列表
TASKS = [
{"id": "001", "source": "db1", "dest": "db1_new", "type": "mysql"},
{"id": "002", "source": "/data/app1", "dest": "user@192.168.1.10:/backup/app1", "type": "rsync"},
{"id": "003", "source": "s3://bucket/images", "dest": "s3://new-bucket/images", "type": "s3"}
]
def migrate_single_task(task):
"""执行单个迁移任务,包含重试逻辑"""
max_retries = 3
for attempt in range(1, max_retries + 1):
logging.info(f"开始任务 {task['id']}: {task['source']} -> {task['dest']} (第 {attempt} 次尝试)")
try:
if task['type'] == 'mysql':
# 你的 MySQL 迁移命令
cmd = f"mysqldump -h old_host {task['source']} | mysql -h new_host {task['dest']}"
elif task['type'] == 'rsync':
cmd = f"rsync -avz {task['source']} {task['dest']}"
elif task['type'] == 's3':
cmd = f"aws s3 sync {task['source']} {task['dest']}"
else:
raise ValueError(f"未知类型: {task['type']}")
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=3600)
if result.returncode == 0:
logging.info(f"✅ 任务 {task['id']} 成功完成")
return True, task['id']
else:
logging.error(f"❌ 任务 {task['id']} 失败: {result.stderr[:200]}")
except subprocess.TimeoutExpired:
logging.warning(f"任务 {task['id']} 超时")
except Exception as e:
logging.error(f"任务 {task['id']} 异常: {str(e)}")
if attempt < max_retries:
wait_time = 2 ** attempt # 指数退避
logging.info(f"等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
return False, task['id']
# 使用线程池并行执行(根据网络带宽和磁盘IO调整)
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(migrate_single_task, task): task for task in TASKS}
for future in as_completed(futures):
success, task_id = future.result()
if success:
print(f"任务 {task_id} 迁移成功")
else:
print(f"任务 {task_id} 迁移失败,请查看日志")
批量迁移的通用技巧
| 技巧 | 说明 |
|---|---|
| 断点续传 | rsync 的 --partial,aws s3 sync 的默认行为 |
| 限速 | rsync 加 --bwlimit=10000(KB/s),避免占满带宽 |
| 并行度 | 控制同时运行的任务数(上述Python脚本的max_workers) |
| 校验 | 迁移后做一次 diff -r 或 md5sum 校验 |
| 日志 | 始终记录时间戳、成功/失败、错误信息 |
| 幂等性 | 脚本设计成可多次运行而不破坏已有数据 |
建议
- 先小规模测试:选 1-2 个最小项迁移,验证路径、权限、连接都正确
- 关注目标存储容量:迁移前确保目标有足够的空间
- 迁移顺序:如果有关联性(如先迁数据库再迁文件),注意调整任务顺序
- 监控磁盘和网络:使用
iotop、nload查看资源消耗
你具体想迁移什么类型的数据?服务器、数据库、对象存储还是容器?告诉我详细情况,我可以给你写出一步到位的专用脚本。