指标采集脚本怎么写?

wen 实用脚本 51

手把手教你编写高效指标采集脚本(附实战代码)

📚 目录导读

  1. 为什么需要指标采集脚本?
  2. 前期准备:你需要了解的核心概念
  3. 明确采集目标与指标定义
  4. 选择合适的技术栈
  5. 编写基础采集脚本(Python实战)
    • 1 单机资源指标采集(CPU/内存/磁盘)
    • 2 网络服务指标采集(HTTP响应/延迟)
    • 3 业务逻辑指标采集(API调用次数/错误率)
  6. 让脚本更健壮 - 异常处理与重试机制
  7. 输出与存储 - 从控制台到数据库
  8. 自动化与调度 - 告别手动运行
  9. 常见问题问答(FAQ)
  10. 总结与最佳实践

为什么需要指标采集脚本?

在现代软件运维、DevOps 和可观测性体系中,指标采集脚本是连接系统状态与监控平台的桥梁,无论是追踪服务器CPU使用率、监控API响应时间,还是统计用户行为数据,脚本化的指标采集能实现自动化、高频次、低误差的数据获取,相比手动登录服务器逐一检查,写一个采集脚本能节省90%以上的时间,并避免人为遗漏。

指标采集脚本怎么写?


前期准备:你需要了解的核心概念

  • 指标(Metric):可量化的数据点,如 cpu_usage_percenthttp_status_200_count
  • 标签(Label):附加在指标上的键值对,用于区分维度,如 host=web-01region=us-east
  • 采集频率:多久采集一次(例如15秒、1分钟、5分钟)。
  • 数据格式:常见的有Prometheus Exposition格式、JSON、OpenTelemetry协议。
  • 推(Push)与拉(Pull)模式
    • Push:脚本主动发送数据到中心(如InfluxDB、Datadog)。
    • Pull:监控系统主动向脚本暴露的端口获取数据(如Prometheus Exporter)。

步骤一:明确采集目标与指标定义

在写代码之前,先回答三个问题:

  1. 采集什么? 服务器CPU使用率、内存占用、磁盘I/O、网络流量、数据库连接数、API调用量。
  2. 谁来消费数据? Prometheus?Grafana?自建控制台?决定输出格式。
  3. 需要多细的时间粒度? 高频(秒级)影响系统开销,低频(分钟级)可能丢失细节。

示例目标

  • 每60秒采集一次Web服务器CPU、内存、进程数。
  • 输出为Prometheus Exporter格式,便于Grafana可视化。

步骤二:选择合适的技术栈

场景 推荐语言/工具 理由
轻量级、快速开发 Python(psutil、requests) 生态丰富、代码简洁
高性能、低资源消耗 Go(expvar、prometheus/client_golang) 编译后小、并发强
已有Java系统集成 Java(Micrometer、Dropwizard Metrics) 原生JVM支持
不需要编写代码 Telegraf、Collectd、Node Exporter(已有插件) 开箱即用

本文以Python为例,因为它最容易上手且库支持全面。


步骤三:编写基础采集脚本(Python实战)

1 单机资源指标采集

import psutil
import time
import json
def collect_system_metrics():
    """采集CPU、内存、磁盘使用率"""
    metrics = {
        "cpu_usage_percent": psutil.cpu_percent(interval=1),
        "memory_usage_percent": psutil.virtual_memory().percent,
        "disk_usage_percent": psutil.disk_usage('/').percent,
    }
    return metrics
if __name__ == "__main__":
    # 模拟一次采集
    print(json.dumps(collect_system_metrics(), indent=2))

解释

  • psutil.cpu_percent(interval=1) 采样1秒内的平均CPU使用率。
  • 返回字典可灵活扩展,例如加上磁盘IO、网络收发等。

2 网络服务指标采集

import requests
def http_endpoint_check(url="https://example.com/api/health"):
    """检查HTTP端点的响应时间和状态码"""
    start = time.time()
    try:
        resp = requests.get(url, timeout=5)
        response_time = time.time() - start
        status_code = resp.status_code
    except requests.exceptions.RequestException as e:
        response_time = -1
        status_code = 599  # 自定义超时错误码
    return {
        "http_response_time_seconds": round(response_time, 4),
        "http_status_code": status_code,
    }
# 输出示例:{'http_response_time_seconds': 0.234, 'http_status_code': 200}

3 业务逻辑指标采集

假设你需要统计某接口过去5分钟的调用次数和错误数,可借助缓存或简单文件持久化。

from collections import deque
from datetime import datetime
import json
# 滑动窗口存储最近5分钟数据(生产环境建议用Redis或数据库)
recent_calls = deque(maxlen=300)  # 每秒最多记录一次,5分钟=300秒
def record_api_call(success=True):
    now = datetime.now().isoformat()
    recent_calls.append({"time": now, "success": success})
def collect_business_metrics():
    """分析最近5分钟的API调用情况"""
    total = len(recent_calls)
    errors = sum(1 for c in recent_calls if not c['success'])
    return {
        "api_total_calls_5m": total,
        "api_error_count_5m": errors,
        "api_error_rate": round(errors / total, 4) if total > 0 else 0,
    }

注意:实际系统应使用分布式计数器(如Redis INCR、Prometheus Histogram),此处仅演示概念。


步骤四:让脚本更健壮 - 异常处理与重试机制

指标采集脚本常运行在无人值守环境,必须做好异常处理。

def safe_collect():
    try:
        result = collect_system_metrics()
    except PermissionError as e:
        print(f"权限不足: {e},请以管理员/root用户运行或配置sudoers")
        return {}  # 返回空字典,不要中断整个流程
    except Exception as e:
        # 记录日志但继续运行
        print(f"采集失败: {e}")
        return {"error": str(e)}
    return result

重试逻辑:对于网络相关的指标,推荐使用tenacity库或自定义重试。

import time
from functools import wraps
def retry(max_attempts=3, delay=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts:
                        raise
                    print(f"第{attempt}次尝试失败,{delay}秒后重试")
                    time.sleep(delay)
            return None
        return wrapper
    return decorator
@retry()
def unstable_api_check():
    # 模拟可能失败的网络请求
    pass

步骤五:输出与存储 - 从控制台到数据库

输出到Prometheus Exporter格式(最推荐的方式之一):

def format_for_prometheus(metrics):
    lines = []
    for key, value in metrics.items():
        # 添加标签,hostname
        lines.append(f'# HELP {key} Description of {key}')
        lines.append(f'# TYPE {key} gauge')
        lines.append(f'{key}{{host="web01"}} {value}')
    return '\n'.join(lines)
# 启动HTTP服务暴露给Prometheus抓取(需安装flask或http.server)
from http.server import HTTPServer, BaseHTTPRequestHandler
class MetricsHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == '/metrics':
            metrics = collect_system_metrics()
            self.send_response(200)
            self.send_header('Content-Type', 'text/plain; version=0.0.4')
            self.end_headers()
            self.wfile.write(format_for_prometheus(metrics).encode())
        else:
            self.send_response(404)
# 运行 python metrics_server.py 即可在 http://0.0.0.0:8000/metrics 查看
if __name__ == '__main__':
    server = HTTPServer(('0.0.0.0', 8000), MetricsHandler)
    server.serve_forever()

写入时序数据库(以InfluxDB为例):

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
client = InfluxDBClient(url="http://localhost:8086", token="your_token", org="your_org")
write_api = client.write_api(write_options=SYNCHRONOUS)
def send_to_influxdb(metrics):
    point = Point("system_metrics") \
        .tag("host", "web-01") \
        .field("cpu_percent", metrics["cpu_usage_percent"]) \
        .field("mem_percent", metrics["memory_usage_percent"])
    write_api.write(bucket="monitoring", record=point)

步骤六:自动化与调度 - 告别手动运行

crontab(Linux)

# 每5分钟执行一次采集并写入文件
*/5 * * * * /usr/bin/python3 /opt/metrics/collector.py >> /var/log/metrics_collect.log 2>&1

systemd timer(更可靠)

  • 创建 .service 和 .timer 单元文件,支持依赖和日志管理。

使用Celery或APSchedule(Python内部)

from apscheduler.schedulers.background import BackgroundScheduler
def job():
    metrics = collect_system_metrics()
    send_to_influxdb(metrics)
scheduler = BackgroundScheduler()
scheduler.add_job(job, 'interval', minutes=1)
scheduler.start()
# 程序保持运行
while True:
    time.sleep(10)

常见问题问答(FAQ)

Q1:指标采集脚本通常跑在哪里?会影响业务吗?

A: 一般跑在业务服务器上作为独立进程,或使用专门的采集节点。要控制资源占用

  • 避免高频采集(如cron调度1秒一次);
  • 使用非阻塞IO;
  • 对于已有Node Exporter/Telegraf的环境,最好复用现有工具。

Q2:采集脚本写不好会不会造成数据泄露?

A: 建议:

  • 不采集敏感字段(如密码、Token);
  • 暴露的 /metrics 端点配置IP白名单或基本认证;
  • 使用TLS加密传输;
  • 存储到数据库时限制权限。

Q3:如何调试采集脚本?

A:

  1. 先用print或logging输出原始值;
  2. 对比tophtopiftop等系统工具手动验证;
  3. 使用Prometheus UI的“Graph”页面查询指标是否有数据;
  4. 单元测试:模拟异常网络、无权限环境。

Q4:指标采集脚本与APM(应用性能监控)有什么区别?

A: 指标采集脚本通常获取系统级或独立服务的聚合数据(如CPU、请求速率);APM则深入到代码内部(如方法调用耗时、SQL执行计划),通常需要语言Agent或字节码注入,两者是互补关系。

Q5:如何保证采集脚本高可用?

A:

  • 容器化部署,配合K8s/自动重启;
  • 多副本采集,数据去重(通过时间戳+唯一ID);
  • 考虑使用消息队列缓冲(如Kafka),避免数据库写入失败丢数据。

总结与最佳实践

编写优秀的指标采集脚本,核心在于正确性、稳定性和可维护性,最后给出六条原则:

  1. 唯一性:每条指标有明确语义和单位(如 bytessecondspercent)。
  2. 标签适度:不过度添加高基数标签(如用户ID、订单ID),避免存储爆炸。
  3. 资源友好:设置合理的时间间隔,使用流式处理而非一次加载海量数据。
  4. 可观测自身:脚本自身也暴露存活检查端点,出现异常能告警。
  5. 版本化:指标格式变更时标注版本或兼容旧格式,避免监控仪表盘失效。
  6. 从已有工具开始:如果需求不复杂,直接用Node Exporter、Telegraf配上简单配置文件即可,不必自己写代码。

当你掌握了从定义→编写→调试→部署的全流程,你会发现指标采集脚本不仅是一个技术工程,更是建立系统可观测性的基石,就从上面给出的示例代码开始,为自己环境写第一个采集脚本吧!

抱歉,评论功能暂时关闭!