本文目录导读:

编写资产盘点脚本的核心目标是自动化收集设备信息、对比历史数据,并输出差异报告,脚本的复杂程度取决于你的网络规模、资产类型和盘点要求。
以下我将提供一个通用、模块化的Python脚本框架,涵盖常见的三种盘点模式(主机信息、网络设备、软件授权),你可以根据自己的环境(Windows/Linux/Mixed)进行修改和扩展。
脚本核心思路
- 采集层:通过系统命令或API获取当前资产快照。
- 存储层:将数据写入文件(CSV/JSON/SQLite)或数据库。
- 对比层:将当前数据与上次的基线数据对比。
- 输出层:生成变更报告(新增、移除、配置漂移)。
通用资产盘点脚本框架 (Python)
这个脚本设计了三种工作模式:collect(采集)、report(报告)、compare(对比)。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
通用资产盘点脚本
支持:主机信息、网络设备存活检测、软件列表
作者:[你的名字]
版本: 1.0
"""
import os
import sys
import json
import csv
import platform
import subprocess
import socket
import datetime
import hashlib
import argparse
from pathlib import Path
# 可选:如果你需要ping扫描,安装 ping3: pip install ping3
# import ping3
# ============== 配置区 ==============
# 工作目录
WORK_DIR = Path("./asset_inventory")
# 基线文件存储路径
BASELINE_DIR = WORK_DIR / "baseline"
# 当前快照文件存储路径
SNAPSHOT_DIR = WORK_DIR / "snapshot"
# 报告输出路径
REPORT_DIR = WORK_DIR / "report"
# 要扫描的网段 (用于网络设备发现,格式如 "192.168.1.0/24")
NETWORK_SEGMENTS = ["192.168.1.0/24"]
# 要监控的关键进程或服务 (仅示例)
KEY_PROCESSES = ["nginx", "mysql", "docker"]
# ============== 工具函数 ==============
def ensure_dirs():
"""确保目录结构存在"""
for dir_path in [WORK_DIR, BASELINE_DIR, SNAPSHOT_DIR, REPORT_DIR]:
dir_path.mkdir(parents=True, exist_ok=True)
def get_machine_id():
"""生成主机唯一标识(基于MAC地址和主机名)"""
# 实际使用中建议使用dmidecode或wmi获取更稳定的ID
try:
hostname = socket.gethostname()
# 获取第一个非回环MAC
mac = ':'.join(hex(uuid.getnode())[2:].zfill(12)[i:i+2] for i in range(0,12,2))
raw_id = f"{hostname}-{mac}"
return hashlib.md5(raw_id.encode()).hexdigest()[:8]
except Exception:
return hostname[:8]
def run_cmd(command, timeout=10):
"""安全的执行系统命令,返回标准输出"""
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=timeout
)
return result.stdout.strip()
except subprocess.TimeoutExpired:
return "[TIMEOUT]"
except Exception as e:
return f"[ERROR: {str(e)}]"
# ============== 采集模块 ==============
def collect_system_info():
"""采集操作系统和硬件信息"""
info = {
"timestamp": datetime.datetime.now().isoformat(),
"hostname": socket.gethostname(),
"machine_id": get_machine_id(),
"os": platform.system(),
"os_version": platform.release(),
"architecture": platform.machine(),
"cpu_count": os.cpu_count(),
"python_version": sys.version,
}
# 操作系统特定采集
if platform.system() == "Linux":
info.update({
"kernel": run_cmd("uname -r"),
"uptime": run_cmd("uptime -p"),
"memory": run_cmd("free -h"),
"disk": run_cmd("df -h /"),
"ip_address": run_cmd("hostname -I"),
})
elif platform.system() == "Windows":
# Windows下使用wmic或powershell
info.update({
"memory": run_cmd("wmic MemoryChip get Capacity /format:csv"),
"disk": run_cmd("wmic logicaldisk get size,freespace,caption"),
"mac_address": run_cmd("getmac /FO CSV /NH"),
})
elif platform.system() == "Darwin": # macOS
info.update({
"memory": run_cmd("sysctl hw.memsize"),
})
return info
def collect_software_list():
"""采集已安装软件列表"""
software = []
if platform.system() == "Linux":
# Debian/Ubuntu
output = run_cmd("dpkg-query -W -f='${Package} | ${Version} | ${Status}\n'")
if output and "[ERROR]" not in output:
for line in output.split('\n'):
if 'install ok installed' in line:
parts = line.split(' | ')
software.append({"name": parts[0], "version": parts[1], "status": "installed"})
else:
# CentOS/RHEL
output = run_cmd("rpm -qa --queryformat '%{NAME} | %{VERSION}\n'")
if output and "[ERROR]" not in output:
for line in output.split('\n'):
parts = line.split(' | ')
if len(parts) == 2:
software.append({"name": parts[0], "version": parts[1], "status": "installed"})
elif platform.system() == "Windows":
# 使用wmic获取
output = run_cmd("wmic product get name,version /format:csv")
if output:
for line in output.split('\n')[1:]: # 跳过标题
parts = line.split(',')
if len(parts) >= 3:
software.append({"name": parts[1], "version": parts[2], "status": "installed"})
return software
def ping_host(ip):
"""检测主机是否在线 (使用ping)"""
param = "-n 1" if platform.system().lower() == "windows" else "-c 1"
result = subprocess.run(
f"ping {param} -W 1 {ip}",
shell=True,
capture_output=True,
text=True,
timeout=5
)
return result.returncode == 0
def scan_network_segment():
"""扫描配置的网段,获取在线设备列表"""
online_devices = []
# 注意:这里简化了网段解析,实际生产环境建议使用python-nmap或arp-scan
for segment in NETWORK_SEGMENTS:
base_ip = segment.split('/')[0]
# 假设是24位掩码,从1扫描到254
ip_prefix = '.'.join(base_ip.split('.')[:3])
print(f" 扫描网段: {segment} ...")
for last_octet in range(1, 255): # 慎用,255个IP可能会慢
ip = f"{ip_prefix}.{last_octet}"
if ping_host(ip):
try:
hostname = socket.gethostbyaddr(ip)[0]
except:
hostname = "Unknown"
online_devices.append({
"ip": ip,
"hostname": hostname,
"discovered_at": datetime.datetime.now().isoformat()
})
return online_devices
# ============== 存储与对比模块 ==============
def save_snapshot(data, filename):
"""保存当前采集的快照到JSON文件"""
filepath = SNAPSHOT_DIR / filename
with open(filepath, 'w') as f:
json.dump(data, f, indent=2, default=str)
return filepath
def load_baseline(filename):
"""加载上次的基线数据"""
filepath = BASELINE_DIR / filename
if filepath.exists():
with open(filepath, 'r') as f:
return json.load(f)
return {}
def save_baseline(data, filename):
"""将当前数据设为新基线"""
filepath = BASELINE_DIR / filename
with open(filepath, 'w') as f:
json.dump(data, f, indent=2, default=str)
return filepath
def compare_snapshots(current, baseline):
"""对比当前快照与基线,返回差异"""
added = []
removed = []
changed = []
# 获取键集合
current_keys = set(current.keys())
baseline_keys = set(baseline.keys())
added_keys = current_keys - baseline_keys
removed_keys = baseline_keys - current_keys
common_keys = current_keys & baseline_keys
for key in added_keys:
added.append({"key": key, "value": current[key]})
for key in removed_keys:
removed.append({"key": key, "value": baseline[key]})
for key in common_keys:
if current[key] != baseline[key]:
changed.append({
"key": key,
"old_value": baseline[key],
"new_value": current[key]
})
return {"added": added, "removed": removed, "changed": changed}
# ============== 主逻辑 ==============
def main():
parser = argparse.ArgumentParser(description='资产盘点脚本')
parser.add_argument('action', choices=['collect', 'report', 'compare', 'all'],
help='执行操作: collect(采集) / report(生成报告) / compare(对比基线)')
parser.add_argument('--baseline', help='基线文件名称(用于compare)', default=None)
args = parser.parse_args()
ensure_dirs()
machine_id = get_machine_id()
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
if args.action in ['collect', 'all']:
print("="*50)
print("开始采集资产信息...")
print("="*50)
# 1. 采集主机信息
print("[1/3] 采集系统信息...")
sys_info = collect_system_info()
print(f" -> 主机名: {sys_info.get('hostname')}")
# 2. 采集软件列表
print("[2/3] 采集软件列表...")
sw_list = collect_software_list()
print(f" -> 发现 {len(sw_list)} 个软件包")
# 3. 扫描网络设备(可选)
print("[3/3] 扫描网络设备...")
online_devices = scan_network_segment()
print(f" -> 发现 {len(online_devices)} 台在线设备")
# 组装快照
snapshot = {
"machine_id": machine_id,
"timestamp": timestamp,
"system_info": sys_info,
"software_list": sw_list,
"network_devices": online_devices
}
# 保存快照
snapshot_file = save_snapshot(snapshot, f"snapshot_{machine_id}_{timestamp}.json")
print(f"\n✅ 快照已保存至: {snapshot_file}")
if args.action in ['report', 'all']:
print("\n" + "="*50)
print("生成盘点报告...")
print("="*50)
# 找到最新的快照文件
snapshot_files = sorted(SNAPSHOT_DIR.glob(f"snapshot_{machine_id}_*.json"), reverse=True)
if not snapshot_files:
print("❌ 未找到快照文件,请先执行 collect")
return
latest_snapshot = snapshot_files[0]
with open(latest_snapshot) as f:
data = json.load(f)
# 生成CSV报告
report_file = REPORT_DIR / f"asset_report_{machine_id}_{timestamp}.csv"
with open(report_file, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['类别', '字段', '值'])
# 系统信息
for key, value in data.get("system_info", {}).items():
writer.writerow(['系统信息', key, value])
# 软件列表
for app in data.get("software_list", []):
writer.writerow(['软件', app.get('name'), app.get('version')])
# 网络设备
for device in data.get("network_devices", []):
writer.writerow(['网络设备', device.get('ip'), device.get('hostname')])
print(f"✅ CSV报告已生成: {report_file}")
if args.action in ['compare', 'all']:
print("\n" + "="*50)
print("对比基线...")
print("="*50)
# 如果没有指定基线,使用最新的快照作为基线的前一个版本
snapshot_files = sorted(SNAPSHOT_DIR.glob(f"snapshot_{machine_id}_*.json"))
if len(snapshot_files) < 2:
print("⚠️ 需要至少两次采集才能进行对比,将当前快照设为基线。")
if snapshot_files:
save_baseline(data, f"baseline_{machine_id}_{timestamp}.json")
return
# 加载最新快照
current_file = snapshot_files[-1]
with open(current_file) as f:
current = json.load(f)
# 加载基线(上一个快照)
baseline_file = snapshot_files[-2] if not args.baseline else BASELINE_DIR / args.baseline
with open(baseline_file) as f:
baseline = json.load(f)
diff = compare_snapshots(current.get("software_list", []), baseline.get("software_list", []))
# 输出差异报告
diff_file = REPORT_DIR / f"diff_report_{machine_id}_{timestamp}.txt"
with open(diff_file, 'w') as f:
f.write(f"资产变更报告\n")
f.write(f"时间: {datetime.datetime.now()}\n")
f.write(f"基线: {baseline_file.name}\n")
f.write(f"当前: {current_file.name}\n")
f.write("="*50 + "\n")
if diff['added']:
f.write("\n新增资产:\n")
for item in diff['added']:
f.write(f" + {item}\n")
if diff['removed']:
f.write("\n移除资产:\n")
for item in diff['removed']:
f.write(f" - {item}\n")
if diff['changed']:
f.write("\n变更资产:\n")
for item in diff['changed']:
f.write(f" ~ {item['key']}: {item['old_value']} -> {item['new_value']}\n")
print(f"✅ 差异报告已生成: {diff_file}")
# 最后更新基线
if args.action in ['collect', 'all']:
save_baseline(snapshot, f"baseline_{machine_id}_{timestamp}.json")
print("基线已更新。")
if __name__ == "__main__":
# 如果你的环境没有安装额外的库,上面的ping_host函数使用了系统ping命令,是跨平台的。
# 注意:扫描整个网段(254个IP)可能需要几分钟时间,生产环境中可以优化为并发扫描。
main()
如何使用这个脚本
-
保存脚本:将上述代码保存为
asset_inventory.py。 -
首次运行(采集 + 生成初始基线):
python3 asset_inventory.py all
这会在当前目录下创建
asset_inventory文件夹,包含snapshot、baseline、report子目录。 -
第二次运行(采集最新数据):
python3 asset_inventory.py collect
-
对比两次采集的差异:
python3 asset_inventory.py compare
这会自动找到最新的两次快照进行对比。
-
生成可读报告(CSV格式):
python3 asset_inventory.py report
进阶建议 & 定制化
-
生产级扫描:
- Nmap集成:将
ping_host替换为python-nmap库,可以获取开放端口、操作系统指纹等更详细的资产信息。 - 并发扫描:使用
concurrent.futures.ThreadPoolExecutor来加速网络扫描。
- Nmap集成:将
-
数据持久化:
- 将
JSON文件改为 SQLite 或 MySQL 数据库,使用sqlite3模块存储历史数据,便于SQL查询和分析。
- 将
-
跨平台优化:
- Windows:使用
wmi库(pip install wmi)获取更精准的硬件信息(如序列号、BIOS版本、MAC地址)。 - Linux:读取
/proc文件系统(如/proc/cpuinfo,/proc/meminfo)获取详细信息。
- Windows:使用
-
CMDB对接:
- 如果是公司级资产盘点,脚本采集到的数据可以通过REST API自动推送到 CMDB(配置管理数据库)中。
-
自动化调度:
- 使用 crontab (Linux) 或 Task Scheduler (Windows) 每天凌晨自动运行
collect和compare操作,然后通过邮件发送差异报告。
- 使用 crontab (Linux) 或 Task Scheduler (Windows) 每天凌晨自动运行
风险提示
- 权限:采集某些信息(如物理内存、磁盘序列号)需要管理员/root权限,建议用非root用户运行普通采集,并用特定工具(如sudoers配置)赋予必要权限。
- 网络扫描:在企业网络中进行大规模ping扫描可能会被入侵检测系统(IDS)视为攻击行为,务必获得网络管理员的授权。
- 隐私:保存的数据可能包含个人电脑上的软件列表,确保遵守公司的隐私政策。
你可以直接复制使用上面的脚本,根据你的实际需求(例如只采集Linux服务器,或只扫描特定IP列表)修改其中的函数和配置即可。