Python案例怎么监控程序运行状态?

wen python案例 75

本文目录导读:

Python案例怎么监控程序运行状态?

  1. 基础日志监控 📝
  2. 使用Python内置模块监控 🔄
  3. 健康检查API监控 🏥
  4. 自定义装饰器监控 🎯
  5. 完整的监控系统集成 🔧
  6. 实际应用建议

我来介绍几种监控Python程序运行状态的方法,从简单到复杂:

基础日志监控 📝

import logging
import time
from datetime import datetime
# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('app_monitor.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)
class AppMonitor:
    def __init__(self):
        self.start_time = None
        self.stats = {
            'total_requests': 0,
            'success_count': 0,
            'error_count': 0,
            'avg_response_time': 0
        }
    def start_monitoring(self):
        self.start_time = time.time()
        logger.info(f"应用开始监控 - {datetime.now()}")
    def record_request(self, status='success', response_time=0):
        self.stats['total_requests'] += 1
        if status == 'success':
            self.stats['success_count'] += 1
        else:
            self.stats['error_count'] += 1
        # 计算平均响应时间
        prev_avg = self.stats['avg_response_time']
        count = self.stats['total_requests']
        self.stats['avg_response_time'] = (prev_avg * (count - 1) + response_time) / count
        # 记录日志
        logger.info(f"请求 # {self.stats['total_requests']} - "
                   f"状态: {status}, 响应时间: {response_time:.2f}s")
    def get_status_report(self):
        runtime = time.time() - self.start_time if self.start_time else 0
        return {
            '运行时间': f"{runtime:.2f}秒",
            '总请求数': self.stats['total_requests'],
            '成功数': self.stats['success_count'],
            '失败数': self.stats['error_count'],
            '成功率': f"{(self.stats['success_count'] / max(self.stats['total_requests'], 1)) * 100:.2f}%",
            '平均响应时间': f"{self.stats['avg_response_time']:.2f}秒"
        }
# 使用示例
monitor = AppMonitor()
monitor.start_monitoring()
# 模拟程序运行
for i in range(5):
    response_time = 0.1 * (i + 1)
    status = 'success' if i != 3 else 'error'
    monitor.record_request(status, response_time)
    time.sleep(0.5)
print("\n📊 运行状态报告:")
for key, value in monitor.get_status_report().items():
    print(f"{key}: {value}")

使用Python内置模块监控 🔄

import psutil
import os
import time
from threading import Thread
class SystemMonitor:
    """系统资源监控器"""
    def __init__(self, pid=None, interval=2):
        self.pid = pid or os.getpid()
        self.process = psutil.Process(self.pid)
        self.interval = interval
        self.running = False
        self.stats_history = []
    def start(self):
        """开始监控"""
        self.running = True
        self.monitor_thread = Thread(target=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
    def stop(self):
        """停止监控"""
        self.running = False
        self.monitor_thread.join()
        self._generate_report()
    def _monitor_loop(self):
        while self.running:
            try:
                stats = {
                    'cpu_percent': self.process.cpu_percent(),
                    'memory_percent': self.process.memory_percent(),
                    'memory_rss': self.process.memory_info().rss / 1024 / 1024,  # MB
                    'num_threads': self.process.num_threads(),
                    'open_fds': self.process.num_fds(),
                    'connections': len(self.process.connections())
                }
                self.stats_history.append(stats)
                # 实时显示
                print(f"\rCPU: {stats['cpu_percent']:.1f}% | "
                      f"内存: {stats['memory_rss']:.1f}MB | "
                      f"线程: {stats['num_threads']} | "
                      f"连接: {stats['connections']}", end='')
                time.sleep(self.interval)
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                break
    def _generate_report(self):
        if not self.stats_history:
            return
        avg_cpu = sum(s['cpu_percent'] for s in self.stats_history) / len(self.stats_history)
        max_memory = max(s['memory_rss'] for s in self.stats_history)
        print("\n\n📊 监控报告:")
        print(f"平均CPU使用率: {avg_cpu:.1f}%")
        print(f"最大内存使用: {max_memory:.1f}MB")
        print(f"总采样点数: {len(self.stats_history)}")
# 使用示例
monitor = SystemMonitor()
monitor.start()
# 模拟程序运行
for i in range(10):
    # 模拟CPU密集操作
    sum([j**2 for j in range(1000000)])
    time.sleep(0.5)
monitor.stop()

健康检查API监控 🏥

from flask import Flask, jsonify
import threading
import time
from datetime import datetime
app = Flask(__name__)
class HealthChecker:
    def __init__(self):
        self.checks = {
            'database': {'status': 'unknown', 'last_check': None},
            'memory': {'status': 'unknown', 'last_check': None},
            'disk': {'status': 'unknown', 'last_check': None}
        }
        # 启动后台检查线程
        self.check_thread = threading.Thread(target=self._periodic_check)
        self.check_thread.daemon = True
        self.check_thread.start()
    def _check_database(self):
        """模拟数据库检查"""
        try:
            # 实际应用中可以检查数据库连接
            import sqlite3
            conn = sqlite3.connect(':memory:')
            conn.close()
            return True, "数据库连接正常"
        except Exception as e:
            return False, f"数据库连接异常: {str(e)}"
    def _check_memory(self):
        """检查内存使用情况"""
        import psutil
        memory = psutil.virtual_memory()
        if memory.percent > 90:
            return False, f"内存使用率过高: {memory.percent}%"
        elif memory.percent > 80:
            return True, f"内存使用率警告: {memory.percent}%"
        else:
            return True, f"内存使用正常: {memory.percent}%"
    def _check_disk(self):
        """检查磁盘空间"""
        import psutil
        disk = psutil.disk_usage('/')
        if disk.percent > 90:
            return False, f"磁盘使用率过高: {disk.percent}%"
        else:
            return True, f"磁盘使用正常: {disk.percent}%"
    def _periodic_check(self):
        """定期执行健康检查"""
        while True:
            self.checks['database'] = self._perform_check('database')
            self.checks['memory'] = self._perform_check('memory')
            self.checks['disk'] = self._perform_check('disk')
            time.sleep(30)  # 每30秒检查一次
    def _perform_check(self, check_name):
        check_methods = {
            'database': self._check_database,
            'memory': self._check_memory,
            'disk': self._check_disk
        }
        status, message = check_methods[check_name]()
        return {
            'status': 'healthy' if status else 'unhealthy',
            'message': message,
            'timestamp': datetime.now().isoformat()
        }
    def get_all_checks(self):
        return self.checks
# 初始化健康检查器
health_checker = HealthChecker()
@app.route('/health')
def health_endpoint():
    """健康检查API端点"""
    checks = health_checker.get_all_checks()
    all_healthy = all(
        check['status'] == 'healthy' 
        for check in checks.values()
    )
    return jsonify({
        'status': 'healthy' if all_healthy else 'unhealthy',
        'timestamp': datetime.now().isoformat(),
        'checks': checks
    })
# 使用示例(需要安装Flask)
if __name__ == '__main__':
    print("健康检查服务启动在 http://localhost:5000/health")
    print("访问 http://localhost:5000/health 查看状态")
    # app.run(debug=True)  # 取消注释以运行

自定义装饰器监控 🎯

import functools
import time
import logging
from typing import Any, Callable
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class FunctionMonitor:
    """函数监控装饰器"""
    def __init__(self, name=None, alert_threshold=1.0):
        self.name = name
        self.alert_threshold = alert_threshold
        self.stats = {
            'total_calls': 0,
            'total_time': 0,
            'max_time': 0,
            'min_time': float('inf'),
            'errors': 0
        }
    def __call__(self, func: Callable) -> Callable:
        func_name = self.name or func.__name__
        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            start_time = time.time()
            self.stats['total_calls'] += 1
            try:
                result = func(*args, **kwargs)
                # 计算执行时间
                execution_time = time.time() - start_time
                self.stats['total_time'] += execution_time
                self.stats['max_time'] = max(self.stats['max_time'], execution_time)
                self.stats['min_time'] = min(self.stats['min_time'], execution_time)
                # 如果执行时间超过阈值,发出警告
                if execution_time > self.alert_threshold:
                    logger.warning(f"⚠️ {func_name} 执行时间过长: {execution_time:.2f}s")
                return result
            except Exception as e:
                self.stats['errors'] += 1
                logger.error(f"❌ {func_name} 执行失败: {str(e)}")
                raise
        # 添加统计方法
        wrapper.get_stats = lambda: {
            'function': func_name,
            'total_calls': self.stats['total_calls'],
            'avg_time': self.stats['total_time'] / max(self.stats['total_calls'], 1),
            'max_time': self.stats['max_time'],
            'min_time': self.stats['min_time'] if self.stats['min_time'] != float('inf') else 0,
            'errors': self.stats['errors'],
            'error_rate': f"{(self.stats['errors'] / max(self.stats['total_calls'], 1)) * 100:.2f}%"
        }
        return wrapper
# 使用示例
@FunctionMonitor(alert_threshold=0.5)
def slow_function():
    """模拟慢函数"""
    time.sleep(0.3)
    return "完成"
@FunctionMonitor(name="数据处理", alert_threshold=1.0)
def data_processing():
    """模拟数据处理"""
    time.sleep(0.1)
    if time.time() % 2 < 0.5:
        raise ValueError("模拟错误")
    return "数据就绪"
# 测试
for _ in range(5):
    try:
        slow_function()
        data_processing()
    except:
        pass
print("\n📊 函数监控统计:")
for func in [slow_function, data_processing]:
    stats = func.get_stats()
    print(f"\n--- {stats['function']} ---")
    for key, value in stats.items():
        print(f"{key}: {value}")

完整的监控系统集成 🔧

import json
import time
from datetime import datetime
from collections import deque
import threading
class CompleteMonitor:
    """完整的监控系统"""
    def __init__(self, max_history=100):
        self.metrics = {
            'cpu': deque(maxlen=max_history),
            'memory': deque(maxlen=max_history),
            'requests': deque(maxlen=max_history),
            'errors': deque(maxlen=max_history)
        }
        self.alerts = []
        self.running = False
    def start(self):
        """启动监控"""
        self.running = True
        self.start_time = datetime.now()
        # 启动指标收集线程
        self.collector_thread = threading.Thread(target=self._collect_metrics)
        self.collector_thread.daemon = True
        self.collector_thread.start()
        # 启动警报检查线程
        self.alert_thread = threading.Thread(target=self._check_alerts)
        self.alert_thread.daemon = True
        self.alert_thread.start()
        logger.info("监控系统启动")
    def stop(self):
        """停止监控"""
        self.running = False
        logger.info("监控系统停止")
    def _collect_metrics(self):
        """收集系统指标"""
        import psutil
        while self.running:
            self.metrics['cpu'].append({
                'timestamp': datetime.now().isoformat(),
                'value': psutil.cpu_percent()
            })
            self.metrics['memory'].append({
                'timestamp': datetime.now().isoformat(),
                'value': psutil.virtual_memory().percent
            })
            time.sleep(5)  # 每5秒收集一次
    def _check_alerts(self):
        """检查警报条件"""
        while self.running:
            # 检查CPU
            if self.metrics['cpu'] and self.metrics['cpu'][-1]['value'] > 90:
                self._add_alert('critical', f"CPU使用率过高: {self.metrics['cpu'][-1]['value']}%")
            # 检查内存
            if self.metrics['memory'] and self.metrics['memory'][-1]['value'] > 85:
                self._add_alert('warning', f"内存使用率过高: {self.metrics['memory'][-1]['value']}%")
            time.sleep(10)
    def _add_alert(self, level, message):
        """添加警报"""
        alert = {
            'timestamp': datetime.now().isoformat(),
            'level': level,
            'message': message
        }
        self.alerts.append(alert)
        logger.warning(f"[{level.upper()}] {message}")
    def record_request(self, response_time, status_code):
        """记录请求"""
        self.metrics['requests'].append({
            'timestamp': datetime.now().isoformat(),
            'response_time': response_time,
            'status_code': status_code
        })
        if status_code >= 400:
            self.metrics['errors'].append({
                'timestamp': datetime.now().isoformat(),
                'status_code': status_code
            })
    def get_snapshot(self):
        """获取当前快照"""
        uptime = (datetime.now() - self.start_time).seconds if hasattr(self, 'start_time') else 0
        return {
            'status': 'running' if self.running else 'stopped',
            'uptime': f"{uptime}秒",
            'metrics': {
                'latest_cpu': self.metrics['cpu'][-1]['value'] if self.metrics['cpu'] else None,
                'latest_memory': self.metrics['memory'][-1]['value'] if self.metrics['memory'] else None,
                'total_requests': len(self.metrics['requests']),
                'total_errors': len(self.metrics['errors']),
                'error_rate': f"{(len(self.metrics['errors']) / max(len(self.metrics['requests']), 1)) * 100:.2f}%"
            },
            'recent_alerts': list(self.alerts)[-5:] if self.alerts else []
        }
# 使用示例
if __name__ == '__main__':
    monitor = CompleteMonitor()
    monitor.start()
    # 模拟程序运行
    for i in range(10):
        response_time = 0.1 + (i * 0.05)
        status_code = 200 if i % 5 != 0 else 500
        monitor.record_request(response_time, status_code)
        time.sleep(1)
    print("\n📊 监控系统快照:")
    print(json.dumps(monitor.get_snapshot(), indent=2, ensure_ascii=False))
    monitor.stop()

实际应用建议

  1. 选择合适的监控粒度

    • 开发环境:详细日志
    • 生产环境:关键指标+警报
  2. 重要的监控指标

    • CPU、内存使用率
    • 请求响应时间
    • 错误率
    • 吞吐量
  3. 存储与可视化

    • 使用时序数据库(InfluxDB)
    • 配合Grafana可视化
  4. 警报设置

    • 设置合理的阈值
    • 分级警报(info/warning/critical)

选择哪种方法取决于你的具体需求:

  • 简单应用:基础日志监控
  • 系统级监控:使用psutil
  • 微服务架构:健康检查API
  • 性能优化:装饰器监控
  • 完整方案:集成监控系统

抱歉,评论功能暂时关闭!