数据转储归档脚本?

wen 实用脚本 47

本文目录导读:

数据转储归档脚本?

  1. 场景一:MySQL + Shell脚本(最常用)
  2. 场景二:Python 脚本(支持多种数据库,更灵活)
  3. 场景三:Oracle + 分区表归档
  4. 通用最佳实践(无论用什么脚本)

“数据转储归档脚本”通常是指定期将生产库中的历史数据(旧数据),从热表(Online)导出(Dump)并存储到归档文件(如SQL、CSV、Parquet)或冷存储中,然后清空或删除原表数据,以释放数据库空间、提升查询性能。

下面提供几种常见场景下的脚本示例,你可以根据使用的数据库类型(MySQL、PostgreSQL、Oracle)或技术栈(Shell、Python)进行选择。

MySQL + Shell脚本(最常用)

需求:orders 表中超过 90 天的订单数据导出为 SQL 文件,压缩存储,然后删除原表数据。

脚本:archive_mysql.sh

#!/bin/bash
# ===== 配置参数 =====
DB_USER="root"
DB_PASS="your_password"
DB_HOST="localhost"
DB_NAME="your_database"
TABLE_NAME="orders"
ARCHIVE_DIR="/data/archive/mysql"
RETENTION_DAYS=90
DATE_TAG=$(date +%Y%m%d_%H%M%S)
ARCHIVE_FILE="${ARCHIVE_DIR}/${TABLE_NAME}_${DATE_TAG}.sql.gz"
# ===== 0. 创建归档目录 =====
mkdir -p ${ARCHIVE_DIR}
# ===== 1. 导出历史数据 =====
echo "[INFO] 开始导出 ${TABLE_NAME} 表中超过 ${RETENTION_DAYS} 天的数据..."
# 使用 mysqldump 带 WHERE 条件导出
mysqldump -u${DB_USER} -p${DB_PASS} -h${DB_HOST} \
    --no-create-info \
    --skip-add-drop-table \
    ${DB_NAME} ${TABLE_NAME} \
    --where="create_date < DATE_SUB(NOW(), INTERVAL ${RETENTION_DAYS} DAY)" \
    | gzip > ${ARCHIVE_FILE}
# 检查导出是否成功
if [ $? -eq 0 ] && [ -s ${ARCHIVE_FILE} ]; then
    echo "[SUCCESS] 归档文件已生成: ${ARCHIVE_FILE}"
    FILE_SIZE=$(du -h ${ARCHIVE_FILE} | cut -f1)
    echo "[INFO] 文件大小: ${FILE_SIZE}"
else
    echo "[ERROR] 导出失败,请检查错误日志"
    exit 1
fi
# ===== 2. 删除已归档的原表数据 =====
echo "[INFO] 开始从原表删除已归档数据..."
mysql -u${DB_USER} -p${DB_PASS} -h${DB_HOST} -e "
    DELETE FROM ${DB_NAME}.${TABLE_NAME}
    WHERE create_date < DATE_SUB(NOW(), INTERVAL ${RETENTION_DAYS} DAY)
    LIMIT 10000;  -- 采用分批删除,减少锁表时间
"
# 优化表空间(可选,大表慎用)
# mysql -u${DB_USER} -p${DB_PASS} -h${DB_HOST} -e "OPTIMIZE TABLE ${DB_NAME}.${TABLE_NAME};"
echo "[INFO] 归档完成于: $(date)"

说明:

  • --no-create-info:不导出建表语句,只导数据,以便直接回滚。
  • 分批删除DELETE ... LIMIT 10000 可以防止一次删太多导致主从延迟或锁等待,建议用循环脚本控制。
  • 生产建议:对于大表,更应该采用 分区表 + TRUNCATE PARTITION,而不是逐条 DELETE。

Python 脚本(支持多种数据库,更灵活)

需求: 从 PostgreSQL 读取旧数据,写入到本地的 CSV 文件(方便后续导入数仓或数据湖),然后删除原表数据。

脚本:archive_to_csv.py

import psycopg2
import pandas as pd
import os
from datetime import datetime, timedelta
import logging
# ===== 配置 =====
DB_CONFIG = {
    "host": "localhost",
    "port": 5432,
    "dbname": "your_db",
    "user": "your_user",
    "password": "your_pass"
}
TABLE = "logs"
DATE_COLUMN = "event_time"  # 用于判断时间列
ARCHIVE_DAYS = 30
ARCHIVE_DIR = "/data/archive/pg_csv"
BATCH_SIZE = 50000     # 每批处理行数
# ===== 日志 =====
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def archive_data():
    cutoff_date = datetime.now() - timedelta(days=ARCHIVE_DAYS)
    date_str = cutoff_date.strftime("%Y%m%d")
    output_file = os.path.join(ARCHIVE_DIR, f"{TABLE}_{date_str}.csv")
    conn = psycopg2.connect(**DB_CONFIG)
    cursor = conn.cursor()
    try:
        # 1. 获取总行数(预估)
        cursor.execute(f"""
            SELECT COUNT(*) FROM {TABLE} 
            WHERE {DATE_COLUMN} < %s
        """, (cutoff_date,))
        total_rows = cursor.fetchone()[0]
        logging.info(f"待归档数据共 {total_rows} 行")
        if total_rows == 0:
            logging.info("没有需要归档的数据")
            return
        # 2. 分批读取并写入 CSV
        offset = 0
        file_handle = open(output_file, 'w', newline='')
        first_batch = True
        while offset < total_rows:
            query = f"""
                SELECT * FROM {TABLE} 
                WHERE {DATE_COLUMN} < %s 
                ORDER BY {DATE_COLUMN} 
                LIMIT %s OFFSET %s
            """
            df = pd.read_sql_query(query, conn, params=(cutoff_date, BATCH_SIZE, offset))
            # 写入 CSV
            header = first_batch
            df.to_csv(file_handle, index=False, header=header, mode='a')
            first_batch = False
            offset += BATCH_SIZE
            logging.info(f"已处理 {min(offset, total_rows)} / {total_rows} 行")
        file_handle.close()
        logging.info(f"CSV 归档文件已生成: {output_file}")
        # 3. 删除已导出数据(使用 IN 或 游标 方式,防止大事务)
        # 注意:这里用分批 DELETE + 小事务
        conn.commit()  # 先提交前面的读事务
        offset = 0
        while True:
            with conn.cursor() as del_cursor:
                del_cursor.execute(f"""
                    DELETE FROM {TABLE} 
                    WHERE ctid IN (
                        SELECT ctid FROM {TABLE} 
                        WHERE {DATE_COLUMN} < %s 
                        LIMIT %s
                    )
                """, (cutoff_date, BATCH_SIZE))
                deleted = del_cursor.rowcount
                conn.commit()
                if deleted == 0:
                    break
            logging.info(f"已删除 {deleted} 行")
        logging.info("归档与清理全部完成")
    except Exception as e:
        conn.rollback()
        logging.error(f"归档失败: {e}")
        raise
    finally:
        cursor.close()
        conn.close()
if __name__ == "__main__":
    os.makedirs(ARCHIVE_DIR, exist_ok=True)
    archive_data()

优势: 支持复杂转换(如JSON扁平化、字段脱敏),异常处理完善,适合集成到调度系统(Airflow、Cron)。


Oracle + 分区表归档

核心思路: 如果建表时按时间做了分区,归档就变得非常高效——直接移动分区,而不是逐条 DELETE。

-- 1. 交换分区到独立表(秒级)
ALTER TABLE orders 
    EXCHANGE PARTITION p_2024_01 
    WITH TABLE orders_archive_202401 
    INCLUDING INDEXES;
-- 2. 将独立表导出为 DMP 文件(作为备份)
expdp user/pass directory=ARCHIVE_DIR dumpfile=orders_202401.dmp tables=orders_archive_202401
-- 3. 确认备份无误后,删除(或 TRUNCATE)辅助表
DROP TABLE orders_archive_202401;

优点: 不影响在线业务,几乎不产生 Redo 日志。 前提: 表需要按时间范围分区(RANGE PARTITION)。


通用最佳实践(无论用什么脚本)

  1. 不要在原库 DELETE 大量行:除非表很小,否则应该:

    • 使用分区交换(Oracle/MySQL 8.0)
    • 使用 pt-archiver (Percona Toolkit)
    • 使用分批方式 + 小事务
  2. 脚本加入幂等性检查

    • 检查归档文件是否已存在。
    • 检查 max(create_date) 是否已小于归档点,避免重复操作导致数据空洞。
  3. 加入监控与告警

    • 记录归档行数、耗时、文件大小。
    • 如果连续 5 次归档文件大小为 0,触发告警(可能表明数据未正常写入)。
  4. 保留源数据完整性

    • 在删除原表数据前,务必验证归档文件的有效性gunzip -t 检查压缩包)。
    • 对于关键业务,先完全导出,存储到冷备,再执行删除或清理。
  5. Crontab 调度示例(每天凌晨 3 点执行):

    0 3 * * * /usr/local/bin/archive_mysql.sh >> /var/log/archive.log 2>&1

抱歉,评论功能暂时关闭!