Python案例如何实现定时清理任务?

wen python案例 73

本文目录导读:

Python案例如何实现定时清理任务?

  1. 方法一:使用 schedule 库(适合轻量级定时任务)
  2. 方法二:使用 APScheduler(功能更强大的定时任务库)
  3. 方法三:使用 crontab(Linux系统自带,最稳定)
  4. 安装依赖
  5. 使用建议

我来为你介绍几种Python实现定时清理任务的方法:

使用 schedule 库(适合轻量级定时任务)

import schedule
import time
import os
import shutil
from datetime import datetime, timedelta
def cleanup_temp_files():
    """清理临时文件"""
    temp_dir = '/tmp/old_files'
    if not os.path.exists(temp_dir):
        os.makedirs(temp_dir)
        print(f"创建目录: {temp_dir}")
        return
    # 获取24小时前的文件
    cutoff_time = datetime.now() - timedelta(hours=24)
    for filename in os.listdir(temp_dir):
        filepath = os.path.join(temp_dir, filename)
        if os.path.isfile(filepath):
            # 获取文件修改时间
            mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
            if mtime < cutoff_time:
                try:
                    os.remove(filepath)
                    print(f"删除文件: {filepath}")
                except Exception as e:
                    print(f"删除失败: {filepath}, 错误: {e}")
def cleanup_old_logs():
    """清理旧日志文件"""
    log_dir = './logs'
    max_days = 7  # 保留7天
    if not os.path.exists(log_dir):
        return
    cutoff_time = datetime.now() - timedelta(days=max_days)
    for filename in os.listdir(log_dir):
        filepath = os.path.join(log_dir, filename)
        if os.path.isfile(filepath) and filename.endswith('.log'):
            mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
            if mtime < cutoff_time:
                try:
                    os.remove(filepath)
                    print(f"删除旧日志: {filepath}")
                except Exception as e:
                    print(f"删除失败: {filepath}, 错误: {e}")
def cleanup_cache_directory():
    """清理缓存目录"""
    cache_dir = './cache'
    max_size_mb = 100  # 缓存最大100MB
    if not os.path.exists(cache_dir):
        return
    total_size = 0
    files_info = []
    # 收集文件信息
    for filename in os.listdir(cache_dir):
        filepath = os.path.join(cache_dir, filename)
        if os.path.isfile(filepath):
            size = os.path.getsize(filepath)
            mtime = os.path.getmtime(filepath)
            files_info.append((filepath, size, mtime))
            total_size += size
    # 如果超过最大限制,删除最旧的文件
    max_size_bytes = max_size_mb * 1024 * 1024
    if total_size > max_size_bytes:
        # 按修改时间排序(最旧的在前)
        files_info.sort(key=lambda x: x[2])
        for filepath, size, _ in files_info:
            if total_size <= max_size_bytes:
                break
            try:
                os.remove(filepath)
                total_size -= size
                print(f"清理缓存: {filepath}")
            except Exception as e:
                print(f"删除失败: {filepath}, 错误: {e}")
# 设置定时任务
schedule.every().day.at("03:00").do(cleanup_temp_files)  # 每天凌晨3点
schedule.every().monday.at("04:00").do(cleanup_old_logs)  # 每周一凌晨4点
schedule.every(6).hours.do(cleanup_cache_directory)       # 每6小时
# 运行定时任务
print("定时清理任务已启动...")
while True:
    schedule.run_pending()
    time.sleep(60)  # 每分钟检查一次

使用 APScheduler(功能更强大的定时任务库)

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
import os
import shutil
from datetime import datetime, timedelta
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class FileCleaner:
    """文件清理器"""
    def __init__(self, config=None):
        self.config = config or {
            'temp_dir': '/tmp/old_files',
            'log_dir': './logs',
            'cache_dir': './cache',
            'temp_max_days': 7,
            'log_max_days': 30,
            'cache_max_mb': 500,
            'downloads_dir': './downloads'
        }
    def cleanup_temp_files(self):
        """清理临时文件"""
        temp_dir = self.config['temp_dir']
        max_days = self.config['temp_max_days']
        if not os.path.exists(temp_dir):
            logger.info(f"临时目录不存在: {temp_dir}")
            return
        cutoff_time = datetime.now() - timedelta(days=max_days)
        cleaned_count = 0
        for root, dirs, files in os.walk(temp_dir):
            for filename in files:
                filepath = os.path.join(root, filename)
                try:
                    mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
                    if mtime < cutoff_time:
                        os.remove(filepath)
                        cleaned_count += 1
                        logger.debug(f"删除文件: {filepath}")
                except Exception as e:
                    logger.error(f"删除失败: {filepath}, 错误: {e}")
        # 清理空目录
        for root, dirs, files in os.walk(temp_dir, topdown=False):
            for dir_name in dirs:
                dir_path = os.path.join(root, dir_name)
                try:
                    if not os.listdir(dir_path):
                        os.rmdir(dir_path)
                        logger.info(f"删除空目录: {dir_path}")
                except Exception as e:
                    logger.error(f"删除目录失败: {dir_path}, 错误: {e}")
        logger.info(f"临时文件清理完成,共清理 {cleaned_count} 个文件")
    def cleanup_old_logs(self):
        """清理旧日志"""
        log_dir = self.config['log_dir']
        max_days = self.config['log_max_days']
        if not os.path.exists(log_dir):
            return
        cutoff_time = datetime.now() - timedelta(days=max_days)
        cleaned_count = 0
        for filename in os.listdir(log_dir):
            filepath = os.path.join(log_dir, filename)
            if os.path.isfile(filepath):
                try:
                    mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
                    if mtime < cutoff_time:
                        os.remove(filepath)
                        cleaned_count += 1
                        logger.debug(f"删除日志: {filepath}")
                except Exception as e:
                    logger.error(f"删除日志失败: {filepath}, 错误: {e}")
        logger.info(f"日志清理完成,共清理 {cleaned_count} 个文件")
    def cleanup_cache(self):
        """清理缓存目录"""
        cache_dir = self.config['cache_dir']
        max_size = self.config['cache_max_mb'] * 1024 * 1024  # 转换为字节
        if not os.path.exists(cache_dir):
            return
        total_size = 0
        files_info = []
        for root, dirs, files in os.walk(cache_dir):
            for filename in files:
                filepath = os.path.join(root, filename)
                try:
                    size = os.path.getsize(filepath)
                    mtime = os.path.getmtime(filepath)
                    files_info.append((filepath, size, mtime))
                    total_size += size
                except Exception as e:
                    logger.error(f"读取文件信息失败: {filepath}, 错误: {e}")
        # 如果超过限制,删除最旧的文件
        if total_size > max_size:
            files_info.sort(key=lambda x: x[2])
            deleted_size = 0
            for filepath, size, _ in files_info:
                try:
                    os.remove(filepath)
                    deleted_size += size
                    total_size -= size
                    logger.debug(f"清理缓存: {filepath}")
                    if total_size <= max_size:
                        break
                except Exception as e:
                    logger.error(f"删除缓存文件失败: {filepath}, 错误: {e}")
            logger.info(f"缓存清理完成,释放 {deleted_size / 1024 / 1024:.2f}MB")
    def cleanup_downloads(self):
        """清理下载目录中未完成的下载"""
        downloads_dir = self.config['downloads_dir']
        if not os.path.exists(downloads_dir):
            return
        # 定义需要清理的文件模式
        cleanup_patterns = [
            '.tmp', '.part', '.crdownload', '.download'
        ]
        cleaned_count = 0
        for filename in os.listdir(downloads_dir):
            filepath = os.path.join(downloads_dir, filename)
            if os.path.isfile(filepath):
                # 检查文件扩展名
                ext = os.path.splitext(filename)[1].lower()
                if ext in cleanup_patterns:
                    try:
                        os.remove(filepath)
                        cleaned_count += 1
                        logger.info(f"删除未完成下载: {filepath}")
                    except Exception as e:
                        logger.error(f"删除失败: {filepath}, 错误: {e}")
        logger.info(f"下载目录清理完成,共清理 {cleaned_count} 个文件")
def check_disk_space():
    """检查磁盘空间"""
    import psutil
    disk = psutil.disk_usage('/')
    free_space_gb = disk.free / (1024 ** 3)
    total_space_gb = disk.total / (1024 ** 3)
    usage_percent = disk.percent
    logger.info(f"磁盘空间: 总空间 {total_space_gb:.2f}GB, "
                f"已用 {usage_percent}%, "
                f"剩余 {free_space_gb:.2f}GB")
    # 如果磁盘使用率超过90%,触发清理
    if usage_percent > 90:
        logger.warning("磁盘使用率超过90%,触发紧急清理")
        cleaner.cleanup_temp_files()
        cleaner.cleanup_cache()
# 创建清理器实例
cleaner = FileCleaner()
# 创建调度器
scheduler = BackgroundScheduler()
# 添加定时任务
scheduler.add_job(
    cleaner.cleanup_temp_files,
    CronTrigger(hour=3, minute=0),  # 每天凌晨3点
    id='temp_cleanup',
    name='临时文件清理'
)
scheduler.add_job(
    cleaner.cleanup_old_logs,
    CronTrigger(day_of_week='mon', hour=4, minute=0),  # 每周一凌晨4点
    id='log_cleanup',
    name='日志清理'
)
scheduler.add_job(
    cleaner.cleanup_cache,
    IntervalTrigger(hours=6),  # 每6小时
    id='cache_cleanup',
    name='缓存清理'
)
scheduler.add_job(
    cleaner.cleanup_downloads,
    IntervalTrigger(hours=12),  # 每12小时
    id='download_cleanup',
    name='下载清理'
)
scheduler.add_job(
    check_disk_space,
    IntervalTrigger(hours=24),  # 每天检查
    id='disk_check',
    name='磁盘检查'
)
# 启动调度器
scheduler.start()
logger.info("定时清理任务已启动")
# 保持程序运行
try:
    while True:
        time.sleep(1)
except (KeyboardInterrupt, SystemExit):
    scheduler.shutdown()
    logger.info("定时清理任务已停止")

使用 crontab(Linux系统自带,最稳定)

# crontab_example.py
import os
import sys
import argparse
from datetime import datetime, timedelta
import logging
# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('/var/log/cleanup.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)
def cleanup_old_files(directory, days, pattern='*'):
    """清理指定目录中超过指定天数的文件"""
    import glob
    if not os.path.exists(directory):
        logger.warning(f"目录不存在: {directory}")
        return
    cutoff_time = datetime.now() - timedelta(days=days)
    cleaned_count = 0
    for filepath in glob.glob(os.path.join(directory, pattern)):
        if os.path.isfile(filepath):
            try:
                mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
                if mtime < cutoff_time:
                    os.remove(filepath)
                    cleaned_count += 1
                    logger.debug(f"删除: {filepath}")
            except Exception as e:
                logger.error(f"删除失败: {filepath}, 错误: {e}")
    return cleaned_count
def main():
    parser = argparse.ArgumentParser(description='定时清理任务')
    parser.add_argument('--action', choices=['temp', 'logs', 'cache'], required=True)
    parser.add_argument('--days', type=int, default=7, help='保留天数')
    parser.add_argument('--directory', help='目标目录')
    args = parser.parse_args()
    if args.action == 'temp':
        dir_path = args.directory or '/tmp/cleanup'
        count = cleanup_old_files(dir_path, args.days)
        logger.info(f"清理临时文件: {dir_path}, 共清理 {count} 个文件")
    elif args.action == 'logs':
        dir_path = args.directory or '/var/log/myapp'
        count = cleanup_old_files(dir_path, args.days, '*.log')
        logger.info(f"清理日志: {dir_path}, 共清理 {count} 个文件")
    elif args.action == 'cache':
        dir_path = args.directory or '/var/cache/myapp'
        count = cleanup_old_files(dir_path, args.days)
        logger.info(f"清理缓存: {dir_path}, 共清理 {count} 个文件")
if __name__ == '__main__':
    main()

然后在Linux中配置crontab:

# 编辑crontab
crontab -e
# 添加以下内容
# 每天凌晨3点清理临时文件
0 3 * * * /usr/bin/python3 /path/to/crontab_example.py --action temp --days 7
# 每周一凌晨4点清理日志
0 4 * * 1 /usr/bin/python3 /path/to/crontab_example.py --action logs --days 30
# 每6小时清理缓存
0 */6 * * * /usr/bin/python3 /path/to/crontab_example.py --action cache --days 1

安装依赖

# 方法一:安装schedule
pip install schedule
# 方法二:安装APScheduler
pip install apscheduler
# 磁盘检查需要psutil
pip install psutil

使用建议

  1. 选择合适的方法

    • 简单场景:使用 schedule
    • 复杂场景:使用 APScheduler
    • 生产环境:优先使用系统 crontab
  2. 注意事项

    • 设置合理的清理周期,避免频繁I/O操作
    • 记录清理日志,方便问题排查
    • 添加异常处理,避免清理任务中断
    • 先测试再部署,避免误删重要文件
  3. 最佳实践

    • 明确指定要清理的文件类型和目录
    • 设置合理的保留时间
    • 保留最近几天的文件用于故障排查
    • 考虑添加文件大小限制
    • 定期检查清理效果

抱歉,评论功能暂时关闭!