Python案例如何定时备份数据?

wen python案例 24

我来为你介绍几种Python定时备份数据的方法和案例。

Python案例如何定时备份数据?

使用 schedule 库定时备份

基础备份脚本

import schedule
import time
import shutil
import os
from datetime import datetime
import logging
# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
def backup_data(source_path, backup_dir):
    """
    备份数据函数
    :param source_path: 源文件/目录路径
    :param backup_dir: 备份目录
    """
    try:
        # 生成备份文件名(带时间戳)
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = os.path.basename(source_path)
        backup_file = f"{timestamp}_{filename}"
        backup_path = os.path.join(backup_dir, backup_file)
        # 创建备份目录(如果不存在)
        os.makedirs(backup_dir, exist_ok=True)
        # 执行备份
        if os.path.isdir(source_path):
            # 备份整个目录
            shutil.copytree(source_path, backup_path)
        else:
            # 备份单个文件
            shutil.copy2(source_path, backup_path)
        logging.info(f"备份成功: {backup_path}")
        # 清理旧备份(保留最近N个备份)
        cleanup_old_backups(backup_dir, keep_count=7)
    except Exception as e:
        logging.error(f"备份失败: {e}")
def cleanup_old_backups(backup_dir, keep_count=7):
    """
    清理旧备份文件
    """
    files = [f for f in os.listdir(backup_dir) 
             if os.path.isfile(os.path.join(backup_dir, f))]
    # 按修改时间排序
    files.sort(key=lambda x: os.path.getmtime(os.path.join(backup_dir, x)))
    # 删除多余文件
    while len(files) > keep_count:
        old_file = os.path.join(backup_dir, files.pop(0))
        os.remove(old_file)
        logging.info(f"清理旧备份: {old_file}")
# 配置备份任务
source = "/path/to/your/data"  # 要备份的数据路径
backup_dir = "/path/to/backup"   # 备份存储路径
# 每天凌晨2点备份
schedule.every().day.at("02:00").do(backup_data, source, backup_dir)
# 或者每小时备份一次
# schedule.every(1).hours.do(backup_data, source, backup_dir)
# 或者每小时的第30分钟备份
# schedule.every().hour.at(":30").do(backup_data, source, backup_dir)
# 运行任务
while True:
    schedule.run_pending()
    time.sleep(60)  # 每分钟检查一次

备份MySQL数据库

import schedule
import time
import subprocess
import os
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)
def backup_mysql_database(db_config, backup_dir):
    """
    备份MySQL数据库
    """
    try:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_file = f"database_backup_{timestamp}.sql"
        backup_path = os.path.join(backup_dir, backup_file)
        # 创建备份目录
        os.makedirs(backup_dir, exist_ok=True)
        # MySQL命令
        command = f"mysqldump -h {db_config['host']} -u {db_config['user']} -p{db_config['password']} {db_config['database']} > {backup_path}"
        # 执行备份
        result = subprocess.run(command, shell=True, capture_output=True, text=True)
        if result.returncode == 0:
            logging.info(f"数据库备份成功: {backup_file}")
            # 压缩备份文件
            import gzip
            with open(backup_path, 'rb') as f_in:
                with gzip.open(f"{backup_path}.gz", 'wb') as f_out:
                    f_out.writelines(f_in)
            # 删除未压缩文件
            os.remove(backup_path)
            logging.info(f"备份文件已压缩: {backup_file}.gz")
            # 清理旧备份
            cleanup_old_backups(backup_dir, keep_count=7)
        else:
            logging.error(f"数据库备份失败: {result.stderr}")
    except Exception as e:
        logging.error(f"备份过程出错: {e}")
# 数据库配置
db_config = {
    'host': 'localhost',
    'user': 'root',
    'password': 'your_password',
    'database': 'your_database'
}
backup_dir = "/path/to/backup/mysql"
# 每天凌晨3点备份数据库
schedule.every().day.at("03:00").do(backup_mysql_database, db_config, backup_dir)
while True:
    schedule.run_pending()
    time.sleep(60)

备份到远程服务器(使用rsync)

import schedule
import time
import subprocess
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
def backup_to_remote(source_path, remote_config):
    """
    使用rsync备份到远程服务器
    """
    try:
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        # rsync命令
        command = [
            "rsync", "-avz", "--delete",
            source_path,
            f"{remote_config['user']}@{remote_config['host']}:{remote_config['path']}"
        ]
        # 执行rsync
        result = subprocess.run(command, capture_output=True, text=True)
        if result.returncode == 0:
            logging.info(f"[{timestamp}] 远程备份成功")
            logging.info(f"同步统计: {result.stdout.splitlines()[-1]}")
        else:
            logging.error(f"远程备份失败: {result.stderr}")
    except Exception as e:
        logging.error(f"备份过程出错: {e}")
# 配置
source = "/path/to/your/data"
remote_config = {
    'host': '192.168.1.100',
    'user': 'backup_user',
    'path': '/backup/location'
}
# 每天凌晨4点备份到远程服务器
schedule.every().day.at("04:00").do(backup_to_remote, source, remote_config)
while True:
    schedule.run_pending()
    time.sleep(60)

使用配置文件管理多个备份任务

# config.yaml 配置文件
"""
tasks:
  - name: "备份项目文件"
    type: "local"
    source: "/path/to/project"
    destination: "/backup/project"
    schedule: "02:00"
    keep_count: 7
  - name: "备份数据库"
    type: "database"
    db_type: "mysql"
    host: "localhost"
    database: "myapp"
    schedule: "03:00"
    keep_count: 14
  - name: "备份到远程"
    type: "remote"
    source: "/path/to/data"
    remote_host: "192.168.1.100"
    remote_user: "backup"
    remote_path: "/backup/data"
    schedule: "04:00"
"""
# backup_manager.py
import yaml
import schedule
import time
import logging
logging.basicConfig(level=logging.INFO)
def load_config(config_file="config.yaml"):
    """加载配置文件"""
    with open(config_file, 'r') as f:
        return yaml.safe_load(f)
def setup_tasks(config):
    """根据配置设置定时任务"""
    for task in config['tasks']:
        if task['type'] == 'local':
            from functools import partial
            func = partial(backup_data, task['source'], task['destination'])
            schedule.every().day.at(task['schedule']).do(func)
        elif task['type'] == 'database':
            db_config = {
                'host': task['host'],
                'database': task['database']
            }
            schedule.every().day.at(task['schedule']).do(
                backup_mysql_database, db_config, task.get('destination', '/backup/db')
            )
        elif task['type'] == 'remote':
            remote_config = {
                'host': task['remote_host'],
                'user': task['remote_user'],
                'path': task['remote_path']
            }
            schedule.every().day.at(task['schedule']).do(
                backup_to_remote, task['source'], remote_config
            )
        logging.info(f"已设置任务: {task['name']} @ {task['schedule']}")
# 主程序
if __name__ == "__main__":
    config = load_config()
    setup_tasks(config)
    while True:
        schedule.run_pending()
        time.sleep(60)

使用Windows任务计划程序(Windows系统)

# windows_scheduler_backup.py
import os
import sys
import shutil
from datetime import datetime
import logging
logging.basicConfig(
    level=logging.INFO,
    filename='backup.log',
    format='%(asctime)s - %(levelname)s - %(message)s'
)
def scheduled_backup():
    """Windows任务计划程序调用的备份函数"""
    try:
        # 备份配置
        source = r"C:\MyData"
        backup_dir = r"D:\Backups"
        # 创建备份
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_name = f"backup_{timestamp}"
        backup_path = os.path.join(backup_dir, backup_name)
        shutil.copytree(source, backup_path)
        logging.info(f"备份成功: {backup_path}")
        # 清理超过30天的备份
        cleanup_days = 30
        for folder in os.listdir(backup_dir):
            folder_path = os.path.join(backup_dir, folder)
            if os.path.isdir(folder_path):
                folder_time = os.path.getctime(folder_path)
                if (time.time() - folder_time) > (cleanup_days * 86400):
                    shutil.rmtree(folder_path)
                    logging.info(f"清理过期备份: {folder}")
    except Exception as e:
        logging.error(f"备份失败: {e}")
if __name__ == "__main__":
    scheduled_backup()

安装依赖

# 安装schedule库
pip install schedule
# 如果是YAML配置方式
pip install pyyaml
# 安装其他可能需要依赖
pip install schedule pyyaml

运行建议

  1. 在生产环境使用

    • 使用 nohupsystemd 服务保持脚本运行
    • 添加错误处理和日志记录
    • 设置合理的备份保留策略
  2. 作为systemd服务运行

    # /etc/systemd/system/backup.service
    """
    [Unit]
    Description=Data Backup Service
    After=network.target

[Service] Type=simple ExecStart=/usr/bin/python3 /path/to/backup_script.py Restart=always User=backup_user

[Install] WantedBy=multi-user.target """

启用服务

sudo systemctl enable backup.service sudo systemctl start backup.service


这些方案覆盖了不同的备份场景,你可以根据实际需求选择合适的方案或组合使用,记得在实际部署前进行充分测试。

抱歉,评论功能暂时关闭!