本文目录导读:

我来介绍几种监控Python程序运行状态的方法,从简单到复杂:
基础日志监控 📝
import logging
import time
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app_monitor.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class AppMonitor:
def __init__(self):
self.start_time = None
self.stats = {
'total_requests': 0,
'success_count': 0,
'error_count': 0,
'avg_response_time': 0
}
def start_monitoring(self):
self.start_time = time.time()
logger.info(f"应用开始监控 - {datetime.now()}")
def record_request(self, status='success', response_time=0):
self.stats['total_requests'] += 1
if status == 'success':
self.stats['success_count'] += 1
else:
self.stats['error_count'] += 1
# 计算平均响应时间
prev_avg = self.stats['avg_response_time']
count = self.stats['total_requests']
self.stats['avg_response_time'] = (prev_avg * (count - 1) + response_time) / count
# 记录日志
logger.info(f"请求 # {self.stats['total_requests']} - "
f"状态: {status}, 响应时间: {response_time:.2f}s")
def get_status_report(self):
runtime = time.time() - self.start_time if self.start_time else 0
return {
'运行时间': f"{runtime:.2f}秒",
'总请求数': self.stats['total_requests'],
'成功数': self.stats['success_count'],
'失败数': self.stats['error_count'],
'成功率': f"{(self.stats['success_count'] / max(self.stats['total_requests'], 1)) * 100:.2f}%",
'平均响应时间': f"{self.stats['avg_response_time']:.2f}秒"
}
# 使用示例
monitor = AppMonitor()
monitor.start_monitoring()
# 模拟程序运行
for i in range(5):
response_time = 0.1 * (i + 1)
status = 'success' if i != 3 else 'error'
monitor.record_request(status, response_time)
time.sleep(0.5)
print("\n📊 运行状态报告:")
for key, value in monitor.get_status_report().items():
print(f"{key}: {value}")
使用Python内置模块监控 🔄
import psutil
import os
import time
from threading import Thread
class SystemMonitor:
"""系统资源监控器"""
def __init__(self, pid=None, interval=2):
self.pid = pid or os.getpid()
self.process = psutil.Process(self.pid)
self.interval = interval
self.running = False
self.stats_history = []
def start(self):
"""开始监控"""
self.running = True
self.monitor_thread = Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
def stop(self):
"""停止监控"""
self.running = False
self.monitor_thread.join()
self._generate_report()
def _monitor_loop(self):
while self.running:
try:
stats = {
'cpu_percent': self.process.cpu_percent(),
'memory_percent': self.process.memory_percent(),
'memory_rss': self.process.memory_info().rss / 1024 / 1024, # MB
'num_threads': self.process.num_threads(),
'open_fds': self.process.num_fds(),
'connections': len(self.process.connections())
}
self.stats_history.append(stats)
# 实时显示
print(f"\rCPU: {stats['cpu_percent']:.1f}% | "
f"内存: {stats['memory_rss']:.1f}MB | "
f"线程: {stats['num_threads']} | "
f"连接: {stats['connections']}", end='')
time.sleep(self.interval)
except (psutil.NoSuchProcess, psutil.AccessDenied):
break
def _generate_report(self):
if not self.stats_history:
return
avg_cpu = sum(s['cpu_percent'] for s in self.stats_history) / len(self.stats_history)
max_memory = max(s['memory_rss'] for s in self.stats_history)
print("\n\n📊 监控报告:")
print(f"平均CPU使用率: {avg_cpu:.1f}%")
print(f"最大内存使用: {max_memory:.1f}MB")
print(f"总采样点数: {len(self.stats_history)}")
# 使用示例
monitor = SystemMonitor()
monitor.start()
# 模拟程序运行
for i in range(10):
# 模拟CPU密集操作
sum([j**2 for j in range(1000000)])
time.sleep(0.5)
monitor.stop()
健康检查API监控 🏥
from flask import Flask, jsonify
import threading
import time
from datetime import datetime
app = Flask(__name__)
class HealthChecker:
def __init__(self):
self.checks = {
'database': {'status': 'unknown', 'last_check': None},
'memory': {'status': 'unknown', 'last_check': None},
'disk': {'status': 'unknown', 'last_check': None}
}
# 启动后台检查线程
self.check_thread = threading.Thread(target=self._periodic_check)
self.check_thread.daemon = True
self.check_thread.start()
def _check_database(self):
"""模拟数据库检查"""
try:
# 实际应用中可以检查数据库连接
import sqlite3
conn = sqlite3.connect(':memory:')
conn.close()
return True, "数据库连接正常"
except Exception as e:
return False, f"数据库连接异常: {str(e)}"
def _check_memory(self):
"""检查内存使用情况"""
import psutil
memory = psutil.virtual_memory()
if memory.percent > 90:
return False, f"内存使用率过高: {memory.percent}%"
elif memory.percent > 80:
return True, f"内存使用率警告: {memory.percent}%"
else:
return True, f"内存使用正常: {memory.percent}%"
def _check_disk(self):
"""检查磁盘空间"""
import psutil
disk = psutil.disk_usage('/')
if disk.percent > 90:
return False, f"磁盘使用率过高: {disk.percent}%"
else:
return True, f"磁盘使用正常: {disk.percent}%"
def _periodic_check(self):
"""定期执行健康检查"""
while True:
self.checks['database'] = self._perform_check('database')
self.checks['memory'] = self._perform_check('memory')
self.checks['disk'] = self._perform_check('disk')
time.sleep(30) # 每30秒检查一次
def _perform_check(self, check_name):
check_methods = {
'database': self._check_database,
'memory': self._check_memory,
'disk': self._check_disk
}
status, message = check_methods[check_name]()
return {
'status': 'healthy' if status else 'unhealthy',
'message': message,
'timestamp': datetime.now().isoformat()
}
def get_all_checks(self):
return self.checks
# 初始化健康检查器
health_checker = HealthChecker()
@app.route('/health')
def health_endpoint():
"""健康检查API端点"""
checks = health_checker.get_all_checks()
all_healthy = all(
check['status'] == 'healthy'
for check in checks.values()
)
return jsonify({
'status': 'healthy' if all_healthy else 'unhealthy',
'timestamp': datetime.now().isoformat(),
'checks': checks
})
# 使用示例(需要安装Flask)
if __name__ == '__main__':
print("健康检查服务启动在 http://localhost:5000/health")
print("访问 http://localhost:5000/health 查看状态")
# app.run(debug=True) # 取消注释以运行
自定义装饰器监控 🎯
import functools
import time
import logging
from typing import Any, Callable
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class FunctionMonitor:
"""函数监控装饰器"""
def __init__(self, name=None, alert_threshold=1.0):
self.name = name
self.alert_threshold = alert_threshold
self.stats = {
'total_calls': 0,
'total_time': 0,
'max_time': 0,
'min_time': float('inf'),
'errors': 0
}
def __call__(self, func: Callable) -> Callable:
func_name = self.name or func.__name__
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
start_time = time.time()
self.stats['total_calls'] += 1
try:
result = func(*args, **kwargs)
# 计算执行时间
execution_time = time.time() - start_time
self.stats['total_time'] += execution_time
self.stats['max_time'] = max(self.stats['max_time'], execution_time)
self.stats['min_time'] = min(self.stats['min_time'], execution_time)
# 如果执行时间超过阈值,发出警告
if execution_time > self.alert_threshold:
logger.warning(f"⚠️ {func_name} 执行时间过长: {execution_time:.2f}s")
return result
except Exception as e:
self.stats['errors'] += 1
logger.error(f"❌ {func_name} 执行失败: {str(e)}")
raise
# 添加统计方法
wrapper.get_stats = lambda: {
'function': func_name,
'total_calls': self.stats['total_calls'],
'avg_time': self.stats['total_time'] / max(self.stats['total_calls'], 1),
'max_time': self.stats['max_time'],
'min_time': self.stats['min_time'] if self.stats['min_time'] != float('inf') else 0,
'errors': self.stats['errors'],
'error_rate': f"{(self.stats['errors'] / max(self.stats['total_calls'], 1)) * 100:.2f}%"
}
return wrapper
# 使用示例
@FunctionMonitor(alert_threshold=0.5)
def slow_function():
"""模拟慢函数"""
time.sleep(0.3)
return "完成"
@FunctionMonitor(name="数据处理", alert_threshold=1.0)
def data_processing():
"""模拟数据处理"""
time.sleep(0.1)
if time.time() % 2 < 0.5:
raise ValueError("模拟错误")
return "数据就绪"
# 测试
for _ in range(5):
try:
slow_function()
data_processing()
except:
pass
print("\n📊 函数监控统计:")
for func in [slow_function, data_processing]:
stats = func.get_stats()
print(f"\n--- {stats['function']} ---")
for key, value in stats.items():
print(f"{key}: {value}")
完整的监控系统集成 🔧
import json
import time
from datetime import datetime
from collections import deque
import threading
class CompleteMonitor:
"""完整的监控系统"""
def __init__(self, max_history=100):
self.metrics = {
'cpu': deque(maxlen=max_history),
'memory': deque(maxlen=max_history),
'requests': deque(maxlen=max_history),
'errors': deque(maxlen=max_history)
}
self.alerts = []
self.running = False
def start(self):
"""启动监控"""
self.running = True
self.start_time = datetime.now()
# 启动指标收集线程
self.collector_thread = threading.Thread(target=self._collect_metrics)
self.collector_thread.daemon = True
self.collector_thread.start()
# 启动警报检查线程
self.alert_thread = threading.Thread(target=self._check_alerts)
self.alert_thread.daemon = True
self.alert_thread.start()
logger.info("监控系统启动")
def stop(self):
"""停止监控"""
self.running = False
logger.info("监控系统停止")
def _collect_metrics(self):
"""收集系统指标"""
import psutil
while self.running:
self.metrics['cpu'].append({
'timestamp': datetime.now().isoformat(),
'value': psutil.cpu_percent()
})
self.metrics['memory'].append({
'timestamp': datetime.now().isoformat(),
'value': psutil.virtual_memory().percent
})
time.sleep(5) # 每5秒收集一次
def _check_alerts(self):
"""检查警报条件"""
while self.running:
# 检查CPU
if self.metrics['cpu'] and self.metrics['cpu'][-1]['value'] > 90:
self._add_alert('critical', f"CPU使用率过高: {self.metrics['cpu'][-1]['value']}%")
# 检查内存
if self.metrics['memory'] and self.metrics['memory'][-1]['value'] > 85:
self._add_alert('warning', f"内存使用率过高: {self.metrics['memory'][-1]['value']}%")
time.sleep(10)
def _add_alert(self, level, message):
"""添加警报"""
alert = {
'timestamp': datetime.now().isoformat(),
'level': level,
'message': message
}
self.alerts.append(alert)
logger.warning(f"[{level.upper()}] {message}")
def record_request(self, response_time, status_code):
"""记录请求"""
self.metrics['requests'].append({
'timestamp': datetime.now().isoformat(),
'response_time': response_time,
'status_code': status_code
})
if status_code >= 400:
self.metrics['errors'].append({
'timestamp': datetime.now().isoformat(),
'status_code': status_code
})
def get_snapshot(self):
"""获取当前快照"""
uptime = (datetime.now() - self.start_time).seconds if hasattr(self, 'start_time') else 0
return {
'status': 'running' if self.running else 'stopped',
'uptime': f"{uptime}秒",
'metrics': {
'latest_cpu': self.metrics['cpu'][-1]['value'] if self.metrics['cpu'] else None,
'latest_memory': self.metrics['memory'][-1]['value'] if self.metrics['memory'] else None,
'total_requests': len(self.metrics['requests']),
'total_errors': len(self.metrics['errors']),
'error_rate': f"{(len(self.metrics['errors']) / max(len(self.metrics['requests']), 1)) * 100:.2f}%"
},
'recent_alerts': list(self.alerts)[-5:] if self.alerts else []
}
# 使用示例
if __name__ == '__main__':
monitor = CompleteMonitor()
monitor.start()
# 模拟程序运行
for i in range(10):
response_time = 0.1 + (i * 0.05)
status_code = 200 if i % 5 != 0 else 500
monitor.record_request(response_time, status_code)
time.sleep(1)
print("\n📊 监控系统快照:")
print(json.dumps(monitor.get_snapshot(), indent=2, ensure_ascii=False))
monitor.stop()
实际应用建议
-
选择合适的监控粒度:
- 开发环境:详细日志
- 生产环境:关键指标+警报
-
重要的监控指标:
- CPU、内存使用率
- 请求响应时间
- 错误率
- 吞吐量
-
存储与可视化:
- 使用时序数据库(InfluxDB)
- 配合Grafana可视化
-
警报设置:
- 设置合理的阈值
- 分级警报(info/warning/critical)
选择哪种方法取决于你的具体需求:
- 简单应用:基础日志监控
- 系统级监控:使用psutil
- 微服务架构:健康检查API
- 性能优化:装饰器监控
- 完整方案:集成监控系统