Python案例怎么定时统计数据?

wen python案例 73

本文目录导读:

Python案例怎么定时统计数据?

  1. 使用 time.sleep() 简单定时
  2. 使用 sched 模块(更灵活)
  3. 使用 threading.Timer(非阻塞方式)
  4. 使用 APScheduler(推荐用于生产环境)
  5. 使用 schedule 库(简单易用)
  6. 推荐选择

我来介绍几种在 Python 中定时统计数据的方法:

使用 time.sleep() 简单定时

import time
import random
from collections import Counter
from datetime import datetime
# 模拟数据生成和统计
def collect_and_stat():
    # 模拟收集数据
    data = [random.randint(1, 10) for _ in range(100)]
    # 统计
    stats = Counter(data)
    total = sum(data)
    avg = total / len(data) if data else 0
    # 输出结果
    print(f"\n{'='*40}")
    print(f"统计时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"数据总数: {len(data)}")
    print(f"总和: {total}")
    print(f"平均值: {avg:.2f}")
    print(f"分布情况:")
    for num, count in sorted(stats.items()):
        print(f"  {num}: {'█' * count} ({count})")
    print(f"{'='*40}\n")
# 每5秒执行一次
while True:
    collect_and_stat()
    time.sleep(5)  # 等待5秒

使用 sched 模块(更灵活)

import sched
import time
from datetime import datetime
import random
# 创建调度器
scheduler = sched.scheduler(time.time, time.sleep)
def timed_statistics():
    # 模拟数据统计
    print(f"\n[{datetime.now().strftime('%H:%M:%S')}] 执行统计...")
    # 模拟统计数据
    data = [random.gauss(50, 15) for _ in range(50)]
    avg = sum(data) / len(data)
    max_val = max(data)
    min_val = min(data)
    print(f"  平均值: {avg:.2f}")
    print(f"  最大值: {max_val:.2f}")
    print(f"  最小值: {min_val:.2f}")
    # 重新调度(每10秒执行一次)
    scheduler.enter(10, 1, timed_statistics, ())
# 首次调度
scheduler.enter(1, 1, timed_statistics, ())
print("开始定时统计(每10秒执行一次)...")
scheduler.run()

使用 threading.Timer(非阻塞方式)

import threading
import time
from collections import defaultdict
from datetime import datetime
import random
class StatisticsTimer:
    def __init__(self, interval=5):
        self.interval = interval
        self.data_store = defaultdict(list)
        self.running = True
    def start(self):
        """开始定时统计"""
        self.schedule_next()
        print(f"定时统计已启动,每{self.interval}秒执行一次")
    def schedule_next(self):
        """安排下一次统计"""
        if self.running:
            timer = threading.Timer(self.interval, self.run_statistics)
            timer.daemon = True
            timer.start()
    def run_statistics(self):
        """执行统计任务"""
        # 模拟数据收集
        self.data_store['temperature'].append(random.uniform(20, 30))
        self.data_store['humidity'].append(random.uniform(40, 80))
        # 统计数据
        print(f"\n[{datetime.now().strftime('%H:%M:%S')}] 统计报告:")
        for key, values in self.data_store.items():
            if values:
                recent = values[-10:]  # 最近10个数据
                print(f"  {key}:")
                print(f"    最新: {values[-1]:.2f}")
                print(f"    平均: {sum(recent)/len(recent):.2f}")
                print(f"    数据量: {len(values)}")
        self.schedule_next()
    def stop(self):
        self.running = False
# 使用示例
timer = StatisticsTimer(interval=3)
timer.start()
# 主程序可以继续做其他事情
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    timer.stop()
    print("\n定时统计已停止")

使用 APScheduler(推荐用于生产环境)

首先安装:pip install apscheduler

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import random
class DataStatistics:
    def __init__(self):
        self.data = []
    def collect_data(self):
        """模拟数据收集"""
        self.data.append({
            'time': datetime.now(),
            'value': random.randint(1, 100),
            'type': random.choice(['A', 'B', 'C'])
        })
        # 只保留最近1000条数据
        if len(self.data) > 1000:
            self.data = self.data[-1000:]
    def hourly_statistics(self):
        """每小时统计"""
        if not self.data:
            return
        print(f"\n{'='*50}")
        print(f"每小时统计报告 - {datetime.now()}")
        # 统计全量数据
        values = [d['value'] for d in self.data]
        types = [d['type'] for d in self.data]
        print(f"数据总量: {len(values)}")
        print(f"平均值: {sum(values)/len(values):.2f}")
        print(f"最大值: {max(values)}")
        print(f"最小值: {min(values)}")
        # 按类型统计
        type_stats = {}
        for t in set(types):
            type_values = [d['value'] for d in self.data if d['type'] == t]
            type_stats[t] = {
                'count': len(type_values),
                'avg': sum(type_values)/len(type_values)
            }
        print("\n按类型统计:")
        for t, stats in type_stats.items():
            print(f"  类型 {t}: 数量={stats['count']}, 平均值={stats['avg']:.2f}")
        print(f"{'='*50}\n")
    def daily_statistics(self):
        """每天统计"""
        print(f"\n[每日统计] {datetime.now().strftime('%Y-%m-%d')}")
        print(f"总数据量: {len(self.data)}")
# 使用 BackgroundScheduler(非阻塞)
def start_background_scheduler():
    stats = DataStatistics()
    scheduler = BackgroundScheduler()
    # 每5秒收集一次数据
    scheduler.add_job(stats.collect_data, 'interval', seconds=5)
    # 每分钟统计一次
    scheduler.add_job(stats.hourly_statistics, 'interval', minutes=1)
    # 每天零点统计
    scheduler.add_job(stats.daily_statistics, 'cron', hour=0, minute=0)
    scheduler.start()
    print("定时任务已启动(后台运行)...")
    return scheduler, stats
# 使用示例
scheduler, stats = start_background_scheduler()
# 主线程继续执行
try:
    import time
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    scheduler.shutdown()
    print("定时任务已停止")

使用 schedule 库(简单易用)

首先安装:pip install schedule

import schedule
import time
from datetime import datetime
import json
import os
class SimpleStatistics:
    def __init__(self):
        self.data_file = "statistics_data.json"
        self.load_data()
    def load_data(self):
        """加载已保存的数据"""
        if os.path.exists(self.data_file):
            with open(self.data_file, 'r') as f:
                self.data = json.load(f)
        else:
            self.data = {"visits": [], "errors": []}
    def save_data(self):
        """保存数据"""
        with open(self.data_file, 'w') as f:
            json.dump(self.data, f, indent=2)
    def record_visit(self):
        """记录访问"""
        self.data["visits"].append({
            "time": datetime.now().isoformat(),
            "ip": "192.168.1.1"
        })
        self.save_data()
    def hourly_report(self):
        """每小时报告"""
        now = datetime.now()
        hour_visits = [v for v in self.data["visits"] 
                      if datetime.fromisoformat(v["time"]).hour == now.hour]
        print(f"\n[整点报告] {now.strftime('%Y-%m-%d %H:00')}")
        print(f"本小时访问量: {len(hour_visits)}")
        print(f"总访问量: {len(self.data['visits'])}")
        print(f"总错误数: {len(self.data['errors'])}")
    def daily_cleanup(self):
        """每天清理旧数据"""
        from datetime import timedelta
        cutoff = datetime.now() - timedelta(days=7)
        self.data["visits"] = [v for v in self.data["visits"]
                              if datetime.fromisoformat(v["time"]) > cutoff]
        self.data["errors"] = [e for e in self.data["errors"]
                              if datetime.fromisoformat(e["time"]) > cutoff]
        self.save_data()
        print(f"清理完成,保留最近7天的数据")
# 使用示例
stats = SimpleStatistics()
# 每分钟记录一次访问
schedule.every(1).minutes.do(stats.record_visit)
# 每小时生成报告
schedule.every().hour.do(stats.hourly_report)
# 每天凌晨3点清理数据
schedule.every().day.at("03:00").do(stats.daily_cleanup)
print("定时统计任务已启动...")
print("按 Ctrl+C 停止")
while True:
    schedule.run_pending()
    time.sleep(1)

推荐选择

  1. 简单场景:使用 time.sleep()threading.Timer
  2. 灵活调度:使用 APScheduler
  3. 易于使用:使用 schedule
  4. 精确时间控制:使用 sched 模块

选择哪种方式取决于你的具体需求:

  • 是否需要精确到秒级
  • 是否需要持久化存储统计数据
  • 是否需要复杂的调度逻辑(如工作日、特定时间等)
  • 系统资源消耗要求

抱歉,评论功能暂时关闭!