本文目录导读:

- 使用
time.sleep()简单定时 - 使用
sched模块(更灵活) - 使用
threading.Timer(非阻塞方式) - 使用
APScheduler(推荐用于生产环境) - 使用
schedule库(简单易用) - 推荐选择
我来介绍几种在 Python 中定时统计数据的方法:
使用 time.sleep() 简单定时
import time
import random
from collections import Counter
from datetime import datetime
# 模拟数据生成和统计
def collect_and_stat():
# 模拟收集数据
data = [random.randint(1, 10) for _ in range(100)]
# 统计
stats = Counter(data)
total = sum(data)
avg = total / len(data) if data else 0
# 输出结果
print(f"\n{'='*40}")
print(f"统计时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"数据总数: {len(data)}")
print(f"总和: {total}")
print(f"平均值: {avg:.2f}")
print(f"分布情况:")
for num, count in sorted(stats.items()):
print(f" {num}: {'█' * count} ({count})")
print(f"{'='*40}\n")
# 每5秒执行一次
while True:
collect_and_stat()
time.sleep(5) # 等待5秒
使用 sched 模块(更灵活)
import sched
import time
from datetime import datetime
import random
# 创建调度器
scheduler = sched.scheduler(time.time, time.sleep)
def timed_statistics():
# 模拟数据统计
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] 执行统计...")
# 模拟统计数据
data = [random.gauss(50, 15) for _ in range(50)]
avg = sum(data) / len(data)
max_val = max(data)
min_val = min(data)
print(f" 平均值: {avg:.2f}")
print(f" 最大值: {max_val:.2f}")
print(f" 最小值: {min_val:.2f}")
# 重新调度(每10秒执行一次)
scheduler.enter(10, 1, timed_statistics, ())
# 首次调度
scheduler.enter(1, 1, timed_statistics, ())
print("开始定时统计(每10秒执行一次)...")
scheduler.run()
使用 threading.Timer(非阻塞方式)
import threading
import time
from collections import defaultdict
from datetime import datetime
import random
class StatisticsTimer:
def __init__(self, interval=5):
self.interval = interval
self.data_store = defaultdict(list)
self.running = True
def start(self):
"""开始定时统计"""
self.schedule_next()
print(f"定时统计已启动,每{self.interval}秒执行一次")
def schedule_next(self):
"""安排下一次统计"""
if self.running:
timer = threading.Timer(self.interval, self.run_statistics)
timer.daemon = True
timer.start()
def run_statistics(self):
"""执行统计任务"""
# 模拟数据收集
self.data_store['temperature'].append(random.uniform(20, 30))
self.data_store['humidity'].append(random.uniform(40, 80))
# 统计数据
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] 统计报告:")
for key, values in self.data_store.items():
if values:
recent = values[-10:] # 最近10个数据
print(f" {key}:")
print(f" 最新: {values[-1]:.2f}")
print(f" 平均: {sum(recent)/len(recent):.2f}")
print(f" 数据量: {len(values)}")
self.schedule_next()
def stop(self):
self.running = False
# 使用示例
timer = StatisticsTimer(interval=3)
timer.start()
# 主程序可以继续做其他事情
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
timer.stop()
print("\n定时统计已停止")
使用 APScheduler(推荐用于生产环境)
首先安装:pip install apscheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import random
class DataStatistics:
def __init__(self):
self.data = []
def collect_data(self):
"""模拟数据收集"""
self.data.append({
'time': datetime.now(),
'value': random.randint(1, 100),
'type': random.choice(['A', 'B', 'C'])
})
# 只保留最近1000条数据
if len(self.data) > 1000:
self.data = self.data[-1000:]
def hourly_statistics(self):
"""每小时统计"""
if not self.data:
return
print(f"\n{'='*50}")
print(f"每小时统计报告 - {datetime.now()}")
# 统计全量数据
values = [d['value'] for d in self.data]
types = [d['type'] for d in self.data]
print(f"数据总量: {len(values)}")
print(f"平均值: {sum(values)/len(values):.2f}")
print(f"最大值: {max(values)}")
print(f"最小值: {min(values)}")
# 按类型统计
type_stats = {}
for t in set(types):
type_values = [d['value'] for d in self.data if d['type'] == t]
type_stats[t] = {
'count': len(type_values),
'avg': sum(type_values)/len(type_values)
}
print("\n按类型统计:")
for t, stats in type_stats.items():
print(f" 类型 {t}: 数量={stats['count']}, 平均值={stats['avg']:.2f}")
print(f"{'='*50}\n")
def daily_statistics(self):
"""每天统计"""
print(f"\n[每日统计] {datetime.now().strftime('%Y-%m-%d')}")
print(f"总数据量: {len(self.data)}")
# 使用 BackgroundScheduler(非阻塞)
def start_background_scheduler():
stats = DataStatistics()
scheduler = BackgroundScheduler()
# 每5秒收集一次数据
scheduler.add_job(stats.collect_data, 'interval', seconds=5)
# 每分钟统计一次
scheduler.add_job(stats.hourly_statistics, 'interval', minutes=1)
# 每天零点统计
scheduler.add_job(stats.daily_statistics, 'cron', hour=0, minute=0)
scheduler.start()
print("定时任务已启动(后台运行)...")
return scheduler, stats
# 使用示例
scheduler, stats = start_background_scheduler()
# 主线程继续执行
try:
import time
while True:
time.sleep(1)
except KeyboardInterrupt:
scheduler.shutdown()
print("定时任务已停止")
使用 schedule 库(简单易用)
首先安装:pip install schedule
import schedule
import time
from datetime import datetime
import json
import os
class SimpleStatistics:
def __init__(self):
self.data_file = "statistics_data.json"
self.load_data()
def load_data(self):
"""加载已保存的数据"""
if os.path.exists(self.data_file):
with open(self.data_file, 'r') as f:
self.data = json.load(f)
else:
self.data = {"visits": [], "errors": []}
def save_data(self):
"""保存数据"""
with open(self.data_file, 'w') as f:
json.dump(self.data, f, indent=2)
def record_visit(self):
"""记录访问"""
self.data["visits"].append({
"time": datetime.now().isoformat(),
"ip": "192.168.1.1"
})
self.save_data()
def hourly_report(self):
"""每小时报告"""
now = datetime.now()
hour_visits = [v for v in self.data["visits"]
if datetime.fromisoformat(v["time"]).hour == now.hour]
print(f"\n[整点报告] {now.strftime('%Y-%m-%d %H:00')}")
print(f"本小时访问量: {len(hour_visits)}")
print(f"总访问量: {len(self.data['visits'])}")
print(f"总错误数: {len(self.data['errors'])}")
def daily_cleanup(self):
"""每天清理旧数据"""
from datetime import timedelta
cutoff = datetime.now() - timedelta(days=7)
self.data["visits"] = [v for v in self.data["visits"]
if datetime.fromisoformat(v["time"]) > cutoff]
self.data["errors"] = [e for e in self.data["errors"]
if datetime.fromisoformat(e["time"]) > cutoff]
self.save_data()
print(f"清理完成,保留最近7天的数据")
# 使用示例
stats = SimpleStatistics()
# 每分钟记录一次访问
schedule.every(1).minutes.do(stats.record_visit)
# 每小时生成报告
schedule.every().hour.do(stats.hourly_report)
# 每天凌晨3点清理数据
schedule.every().day.at("03:00").do(stats.daily_cleanup)
print("定时统计任务已启动...")
print("按 Ctrl+C 停止")
while True:
schedule.run_pending()
time.sleep(1)
推荐选择
- 简单场景:使用
time.sleep()或threading.Timer - 灵活调度:使用
APScheduler - 易于使用:使用
schedule库 - 精确时间控制:使用
sched模块
选择哪种方式取决于你的具体需求:
- 是否需要精确到秒级
- 是否需要持久化存储统计数据
- 是否需要复杂的调度逻辑(如工作日、特定时间等)
- 系统资源消耗要求