本文目录导读:

“数据转储归档脚本”通常是指定期将生产库中的历史数据(旧数据),从热表(Online)导出(Dump)并存储到归档文件(如SQL、CSV、Parquet)或冷存储中,然后清空或删除原表数据,以释放数据库空间、提升查询性能。
下面提供几种常见场景下的脚本示例,你可以根据使用的数据库类型(MySQL、PostgreSQL、Oracle)或技术栈(Shell、Python)进行选择。
MySQL + Shell脚本(最常用)
需求: 将 orders 表中超过 90 天的订单数据导出为 SQL 文件,压缩存储,然后删除原表数据。
脚本:archive_mysql.sh
#!/bin/bash
# ===== 配置参数 =====
DB_USER="root"
DB_PASS="your_password"
DB_HOST="localhost"
DB_NAME="your_database"
TABLE_NAME="orders"
ARCHIVE_DIR="/data/archive/mysql"
RETENTION_DAYS=90
DATE_TAG=$(date +%Y%m%d_%H%M%S)
ARCHIVE_FILE="${ARCHIVE_DIR}/${TABLE_NAME}_${DATE_TAG}.sql.gz"
# ===== 0. 创建归档目录 =====
mkdir -p ${ARCHIVE_DIR}
# ===== 1. 导出历史数据 =====
echo "[INFO] 开始导出 ${TABLE_NAME} 表中超过 ${RETENTION_DAYS} 天的数据..."
# 使用 mysqldump 带 WHERE 条件导出
mysqldump -u${DB_USER} -p${DB_PASS} -h${DB_HOST} \
--no-create-info \
--skip-add-drop-table \
${DB_NAME} ${TABLE_NAME} \
--where="create_date < DATE_SUB(NOW(), INTERVAL ${RETENTION_DAYS} DAY)" \
| gzip > ${ARCHIVE_FILE}
# 检查导出是否成功
if [ $? -eq 0 ] && [ -s ${ARCHIVE_FILE} ]; then
echo "[SUCCESS] 归档文件已生成: ${ARCHIVE_FILE}"
FILE_SIZE=$(du -h ${ARCHIVE_FILE} | cut -f1)
echo "[INFO] 文件大小: ${FILE_SIZE}"
else
echo "[ERROR] 导出失败,请检查错误日志"
exit 1
fi
# ===== 2. 删除已归档的原表数据 =====
echo "[INFO] 开始从原表删除已归档数据..."
mysql -u${DB_USER} -p${DB_PASS} -h${DB_HOST} -e "
DELETE FROM ${DB_NAME}.${TABLE_NAME}
WHERE create_date < DATE_SUB(NOW(), INTERVAL ${RETENTION_DAYS} DAY)
LIMIT 10000; -- 采用分批删除,减少锁表时间
"
# 优化表空间(可选,大表慎用)
# mysql -u${DB_USER} -p${DB_PASS} -h${DB_HOST} -e "OPTIMIZE TABLE ${DB_NAME}.${TABLE_NAME};"
echo "[INFO] 归档完成于: $(date)"
说明:
--no-create-info:不导出建表语句,只导数据,以便直接回滚。- 分批删除:
DELETE ... LIMIT 10000可以防止一次删太多导致主从延迟或锁等待,建议用循环脚本控制。- 生产建议:对于大表,更应该采用 分区表 +
TRUNCATE PARTITION,而不是逐条 DELETE。
Python 脚本(支持多种数据库,更灵活)
需求: 从 PostgreSQL 读取旧数据,写入到本地的 CSV 文件(方便后续导入数仓或数据湖),然后删除原表数据。
脚本:archive_to_csv.py
import psycopg2
import pandas as pd
import os
from datetime import datetime, timedelta
import logging
# ===== 配置 =====
DB_CONFIG = {
"host": "localhost",
"port": 5432,
"dbname": "your_db",
"user": "your_user",
"password": "your_pass"
}
TABLE = "logs"
DATE_COLUMN = "event_time" # 用于判断时间列
ARCHIVE_DAYS = 30
ARCHIVE_DIR = "/data/archive/pg_csv"
BATCH_SIZE = 50000 # 每批处理行数
# ===== 日志 =====
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def archive_data():
cutoff_date = datetime.now() - timedelta(days=ARCHIVE_DAYS)
date_str = cutoff_date.strftime("%Y%m%d")
output_file = os.path.join(ARCHIVE_DIR, f"{TABLE}_{date_str}.csv")
conn = psycopg2.connect(**DB_CONFIG)
cursor = conn.cursor()
try:
# 1. 获取总行数(预估)
cursor.execute(f"""
SELECT COUNT(*) FROM {TABLE}
WHERE {DATE_COLUMN} < %s
""", (cutoff_date,))
total_rows = cursor.fetchone()[0]
logging.info(f"待归档数据共 {total_rows} 行")
if total_rows == 0:
logging.info("没有需要归档的数据")
return
# 2. 分批读取并写入 CSV
offset = 0
file_handle = open(output_file, 'w', newline='')
first_batch = True
while offset < total_rows:
query = f"""
SELECT * FROM {TABLE}
WHERE {DATE_COLUMN} < %s
ORDER BY {DATE_COLUMN}
LIMIT %s OFFSET %s
"""
df = pd.read_sql_query(query, conn, params=(cutoff_date, BATCH_SIZE, offset))
# 写入 CSV
header = first_batch
df.to_csv(file_handle, index=False, header=header, mode='a')
first_batch = False
offset += BATCH_SIZE
logging.info(f"已处理 {min(offset, total_rows)} / {total_rows} 行")
file_handle.close()
logging.info(f"CSV 归档文件已生成: {output_file}")
# 3. 删除已导出数据(使用 IN 或 游标 方式,防止大事务)
# 注意:这里用分批 DELETE + 小事务
conn.commit() # 先提交前面的读事务
offset = 0
while True:
with conn.cursor() as del_cursor:
del_cursor.execute(f"""
DELETE FROM {TABLE}
WHERE ctid IN (
SELECT ctid FROM {TABLE}
WHERE {DATE_COLUMN} < %s
LIMIT %s
)
""", (cutoff_date, BATCH_SIZE))
deleted = del_cursor.rowcount
conn.commit()
if deleted == 0:
break
logging.info(f"已删除 {deleted} 行")
logging.info("归档与清理全部完成")
except Exception as e:
conn.rollback()
logging.error(f"归档失败: {e}")
raise
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
os.makedirs(ARCHIVE_DIR, exist_ok=True)
archive_data()
优势: 支持复杂转换(如JSON扁平化、字段脱敏),异常处理完善,适合集成到调度系统(Airflow、Cron)。
Oracle + 分区表归档
核心思路: 如果建表时按时间做了分区,归档就变得非常高效——直接移动分区,而不是逐条 DELETE。
-- 1. 交换分区到独立表(秒级)
ALTER TABLE orders
EXCHANGE PARTITION p_2024_01
WITH TABLE orders_archive_202401
INCLUDING INDEXES;
-- 2. 将独立表导出为 DMP 文件(作为备份)
expdp user/pass directory=ARCHIVE_DIR dumpfile=orders_202401.dmp tables=orders_archive_202401
-- 3. 确认备份无误后,删除(或 TRUNCATE)辅助表
DROP TABLE orders_archive_202401;
优点: 不影响在线业务,几乎不产生 Redo 日志。 前提: 表需要按时间范围分区(RANGE PARTITION)。
通用最佳实践(无论用什么脚本)
-
不要在原库 DELETE 大量行:除非表很小,否则应该:
- 使用分区交换(Oracle/MySQL 8.0)
- 使用
pt-archiver(Percona Toolkit) - 使用分批方式 + 小事务
-
脚本加入幂等性检查:
- 检查归档文件是否已存在。
- 检查
max(create_date)是否已小于归档点,避免重复操作导致数据空洞。
-
加入监控与告警:
- 记录归档行数、耗时、文件大小。
- 如果连续 5 次归档文件大小为 0,触发告警(可能表明数据未正常写入)。
-
保留源数据完整性:
- 在删除原表数据前,务必验证归档文件的有效性(
gunzip -t检查压缩包)。 - 对于关键业务,先完全导出,存储到冷备,再执行删除或清理。
- 在删除原表数据前,务必验证归档文件的有效性(
-
Crontab 调度示例(每天凌晨 3 点执行):
0 3 * * * /usr/local/bin/archive_mysql.sh >> /var/log/archive.log 2>&1