手把手教你编写高效指标采集脚本(附实战代码)
📚 目录导读
- 为什么需要指标采集脚本?
- 前期准备:你需要了解的核心概念
- 明确采集目标与指标定义
- 选择合适的技术栈
- 编写基础采集脚本(Python实战)
- 1 单机资源指标采集(CPU/内存/磁盘)
- 2 网络服务指标采集(HTTP响应/延迟)
- 3 业务逻辑指标采集(API调用次数/错误率)
- 让脚本更健壮 - 异常处理与重试机制
- 输出与存储 - 从控制台到数据库
- 自动化与调度 - 告别手动运行
- 常见问题问答(FAQ)
- 总结与最佳实践
为什么需要指标采集脚本?
在现代软件运维、DevOps 和可观测性体系中,指标采集脚本是连接系统状态与监控平台的桥梁,无论是追踪服务器CPU使用率、监控API响应时间,还是统计用户行为数据,脚本化的指标采集能实现自动化、高频次、低误差的数据获取,相比手动登录服务器逐一检查,写一个采集脚本能节省90%以上的时间,并避免人为遗漏。

前期准备:你需要了解的核心概念
- 指标(Metric):可量化的数据点,如
cpu_usage_percent、http_status_200_count。 - 标签(Label):附加在指标上的键值对,用于区分维度,如
host=web-01、region=us-east。 - 采集频率:多久采集一次(例如15秒、1分钟、5分钟)。
- 数据格式:常见的有Prometheus Exposition格式、JSON、OpenTelemetry协议。
- 推(Push)与拉(Pull)模式:
- Push:脚本主动发送数据到中心(如InfluxDB、Datadog)。
- Pull:监控系统主动向脚本暴露的端口获取数据(如Prometheus Exporter)。
步骤一:明确采集目标与指标定义
在写代码之前,先回答三个问题:
- 采集什么? 服务器CPU使用率、内存占用、磁盘I/O、网络流量、数据库连接数、API调用量。
- 谁来消费数据? Prometheus?Grafana?自建控制台?决定输出格式。
- 需要多细的时间粒度? 高频(秒级)影响系统开销,低频(分钟级)可能丢失细节。
示例目标:
- 每60秒采集一次Web服务器CPU、内存、进程数。
- 输出为Prometheus Exporter格式,便于Grafana可视化。
步骤二:选择合适的技术栈
| 场景 | 推荐语言/工具 | 理由 |
|---|---|---|
| 轻量级、快速开发 | Python(psutil、requests) | 生态丰富、代码简洁 |
| 高性能、低资源消耗 | Go(expvar、prometheus/client_golang) | 编译后小、并发强 |
| 已有Java系统集成 | Java(Micrometer、Dropwizard Metrics) | 原生JVM支持 |
| 不需要编写代码 | Telegraf、Collectd、Node Exporter(已有插件) | 开箱即用 |
本文以Python为例,因为它最容易上手且库支持全面。
步骤三:编写基础采集脚本(Python实战)
1 单机资源指标采集
import psutil
import time
import json
def collect_system_metrics():
"""采集CPU、内存、磁盘使用率"""
metrics = {
"cpu_usage_percent": psutil.cpu_percent(interval=1),
"memory_usage_percent": psutil.virtual_memory().percent,
"disk_usage_percent": psutil.disk_usage('/').percent,
}
return metrics
if __name__ == "__main__":
# 模拟一次采集
print(json.dumps(collect_system_metrics(), indent=2))
解释:
psutil.cpu_percent(interval=1)采样1秒内的平均CPU使用率。- 返回字典可灵活扩展,例如加上磁盘IO、网络收发等。
2 网络服务指标采集
import requests
def http_endpoint_check(url="https://example.com/api/health"):
"""检查HTTP端点的响应时间和状态码"""
start = time.time()
try:
resp = requests.get(url, timeout=5)
response_time = time.time() - start
status_code = resp.status_code
except requests.exceptions.RequestException as e:
response_time = -1
status_code = 599 # 自定义超时错误码
return {
"http_response_time_seconds": round(response_time, 4),
"http_status_code": status_code,
}
# 输出示例:{'http_response_time_seconds': 0.234, 'http_status_code': 200}
3 业务逻辑指标采集
假设你需要统计某接口过去5分钟的调用次数和错误数,可借助缓存或简单文件持久化。
from collections import deque
from datetime import datetime
import json
# 滑动窗口存储最近5分钟数据(生产环境建议用Redis或数据库)
recent_calls = deque(maxlen=300) # 每秒最多记录一次,5分钟=300秒
def record_api_call(success=True):
now = datetime.now().isoformat()
recent_calls.append({"time": now, "success": success})
def collect_business_metrics():
"""分析最近5分钟的API调用情况"""
total = len(recent_calls)
errors = sum(1 for c in recent_calls if not c['success'])
return {
"api_total_calls_5m": total,
"api_error_count_5m": errors,
"api_error_rate": round(errors / total, 4) if total > 0 else 0,
}
注意:实际系统应使用分布式计数器(如Redis INCR、Prometheus Histogram),此处仅演示概念。
步骤四:让脚本更健壮 - 异常处理与重试机制
指标采集脚本常运行在无人值守环境,必须做好异常处理。
def safe_collect():
try:
result = collect_system_metrics()
except PermissionError as e:
print(f"权限不足: {e},请以管理员/root用户运行或配置sudoers")
return {} # 返回空字典,不要中断整个流程
except Exception as e:
# 记录日志但继续运行
print(f"采集失败: {e}")
return {"error": str(e)}
return result
重试逻辑:对于网络相关的指标,推荐使用tenacity库或自定义重试。
import time
from functools import wraps
def retry(max_attempts=3, delay=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts:
raise
print(f"第{attempt}次尝试失败,{delay}秒后重试")
time.sleep(delay)
return None
return wrapper
return decorator
@retry()
def unstable_api_check():
# 模拟可能失败的网络请求
pass
步骤五:输出与存储 - 从控制台到数据库
输出到Prometheus Exporter格式(最推荐的方式之一):
def format_for_prometheus(metrics):
lines = []
for key, value in metrics.items():
# 添加标签,hostname
lines.append(f'# HELP {key} Description of {key}')
lines.append(f'# TYPE {key} gauge')
lines.append(f'{key}{{host="web01"}} {value}')
return '\n'.join(lines)
# 启动HTTP服务暴露给Prometheus抓取(需安装flask或http.server)
from http.server import HTTPServer, BaseHTTPRequestHandler
class MetricsHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/metrics':
metrics = collect_system_metrics()
self.send_response(200)
self.send_header('Content-Type', 'text/plain; version=0.0.4')
self.end_headers()
self.wfile.write(format_for_prometheus(metrics).encode())
else:
self.send_response(404)
# 运行 python metrics_server.py 即可在 http://0.0.0.0:8000/metrics 查看
if __name__ == '__main__':
server = HTTPServer(('0.0.0.0', 8000), MetricsHandler)
server.serve_forever()
写入时序数据库(以InfluxDB为例):
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
client = InfluxDBClient(url="http://localhost:8086", token="your_token", org="your_org")
write_api = client.write_api(write_options=SYNCHRONOUS)
def send_to_influxdb(metrics):
point = Point("system_metrics") \
.tag("host", "web-01") \
.field("cpu_percent", metrics["cpu_usage_percent"]) \
.field("mem_percent", metrics["memory_usage_percent"])
write_api.write(bucket="monitoring", record=point)
步骤六:自动化与调度 - 告别手动运行
crontab(Linux)
# 每5分钟执行一次采集并写入文件 */5 * * * * /usr/bin/python3 /opt/metrics/collector.py >> /var/log/metrics_collect.log 2>&1
systemd timer(更可靠)
- 创建 .service 和 .timer 单元文件,支持依赖和日志管理。
使用Celery或APSchedule(Python内部)
from apscheduler.schedulers.background import BackgroundScheduler
def job():
metrics = collect_system_metrics()
send_to_influxdb(metrics)
scheduler = BackgroundScheduler()
scheduler.add_job(job, 'interval', minutes=1)
scheduler.start()
# 程序保持运行
while True:
time.sleep(10)
常见问题问答(FAQ)
Q1:指标采集脚本通常跑在哪里?会影响业务吗?
A: 一般跑在业务服务器上作为独立进程,或使用专门的采集节点。要控制资源占用:
- 避免高频采集(如cron调度1秒一次);
- 使用非阻塞IO;
- 对于已有Node Exporter/Telegraf的环境,最好复用现有工具。
Q2:采集脚本写不好会不会造成数据泄露?
A: 建议:
- 不采集敏感字段(如密码、Token);
- 暴露的 /metrics 端点配置IP白名单或基本认证;
- 使用TLS加密传输;
- 存储到数据库时限制权限。
Q3:如何调试采集脚本?
A:
- 先用
print或logging输出原始值; - 对比
top、htop、iftop等系统工具手动验证; - 使用Prometheus UI的“Graph”页面查询指标是否有数据;
- 单元测试:模拟异常网络、无权限环境。
Q4:指标采集脚本与APM(应用性能监控)有什么区别?
A: 指标采集脚本通常获取系统级或独立服务的聚合数据(如CPU、请求速率);APM则深入到代码内部(如方法调用耗时、SQL执行计划),通常需要语言Agent或字节码注入,两者是互补关系。
Q5:如何保证采集脚本高可用?
A:
- 容器化部署,配合K8s/自动重启;
- 多副本采集,数据去重(通过时间戳+唯一ID);
- 考虑使用消息队列缓冲(如Kafka),避免数据库写入失败丢数据。
总结与最佳实践
编写优秀的指标采集脚本,核心在于正确性、稳定性和可维护性,最后给出六条原则:
- 唯一性:每条指标有明确语义和单位(如
bytes、seconds、percent)。 - 标签适度:不过度添加高基数标签(如用户ID、订单ID),避免存储爆炸。
- 资源友好:设置合理的时间间隔,使用流式处理而非一次加载海量数据。
- 可观测自身:脚本自身也暴露存活检查端点,出现异常能告警。
- 版本化:指标格式变更时标注版本或兼容旧格式,避免监控仪表盘失效。
- 从已有工具开始:如果需求不复杂,直接用Node Exporter、Telegraf配上简单配置文件即可,不必自己写代码。
当你掌握了从定义→编写→调试→部署的全流程,你会发现指标采集脚本不仅是一个技术工程,更是建立系统可观测性的基石,就从上面给出的示例代码开始,为自己环境写第一个采集脚本吧!