资产盘点脚本如何写?

wen 实用脚本 40

本文目录导读:

资产盘点脚本如何写?

  1. 脚本核心思路
  2. 通用资产盘点脚本框架 (Python)
  3. 如何使用这个脚本
  4. 进阶建议 & 定制化
  5. 风险提示

编写资产盘点脚本的核心目标是自动化收集设备信息、对比历史数据,并输出差异报告,脚本的复杂程度取决于你的网络规模、资产类型和盘点要求。

以下我将提供一个通用、模块化的Python脚本框架,涵盖常见的三种盘点模式(主机信息、网络设备、软件授权),你可以根据自己的环境(Windows/Linux/Mixed)进行修改和扩展。

脚本核心思路

  1. 采集层:通过系统命令或API获取当前资产快照。
  2. 存储层:将数据写入文件(CSV/JSON/SQLite)或数据库。
  3. 对比层:将当前数据与上次的基线数据对比。
  4. 输出层:生成变更报告(新增、移除、配置漂移)。

通用资产盘点脚本框架 (Python)

这个脚本设计了三种工作模式:collect(采集)、report(报告)、compare(对比)。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
通用资产盘点脚本
支持:主机信息、网络设备存活检测、软件列表
作者:[你的名字]
版本: 1.0
"""
import os
import sys
import json
import csv
import platform
import subprocess
import socket
import datetime
import hashlib
import argparse
from pathlib import Path
# 可选:如果你需要ping扫描,安装 ping3: pip install ping3
# import ping3
# ============== 配置区 ==============
# 工作目录
WORK_DIR = Path("./asset_inventory")
# 基线文件存储路径
BASELINE_DIR = WORK_DIR / "baseline"
# 当前快照文件存储路径
SNAPSHOT_DIR = WORK_DIR / "snapshot"
# 报告输出路径
REPORT_DIR = WORK_DIR / "report"
# 要扫描的网段 (用于网络设备发现,格式如 "192.168.1.0/24")
NETWORK_SEGMENTS = ["192.168.1.0/24"]
# 要监控的关键进程或服务 (仅示例)
KEY_PROCESSES = ["nginx", "mysql", "docker"]
# ============== 工具函数 ==============
def ensure_dirs():
    """确保目录结构存在"""
    for dir_path in [WORK_DIR, BASELINE_DIR, SNAPSHOT_DIR, REPORT_DIR]:
        dir_path.mkdir(parents=True, exist_ok=True)
def get_machine_id():
    """生成主机唯一标识(基于MAC地址和主机名)"""
    # 实际使用中建议使用dmidecode或wmi获取更稳定的ID
    try:
        hostname = socket.gethostname()
        # 获取第一个非回环MAC
        mac = ':'.join(hex(uuid.getnode())[2:].zfill(12)[i:i+2] for i in range(0,12,2))
        raw_id = f"{hostname}-{mac}"
        return hashlib.md5(raw_id.encode()).hexdigest()[:8]
    except Exception:
        return hostname[:8]
def run_cmd(command, timeout=10):
    """安全的执行系统命令,返回标准输出"""
    try:
        result = subprocess.run(
            command,
            shell=True,
            capture_output=True,
            text=True,
            timeout=timeout
        )
        return result.stdout.strip()
    except subprocess.TimeoutExpired:
        return "[TIMEOUT]"
    except Exception as e:
        return f"[ERROR: {str(e)}]"
# ============== 采集模块 ==============
def collect_system_info():
    """采集操作系统和硬件信息"""
    info = {
        "timestamp": datetime.datetime.now().isoformat(),
        "hostname": socket.gethostname(),
        "machine_id": get_machine_id(),
        "os": platform.system(),
        "os_version": platform.release(),
        "architecture": platform.machine(),
        "cpu_count": os.cpu_count(),
        "python_version": sys.version,
    }
    # 操作系统特定采集
    if platform.system() == "Linux":
        info.update({
            "kernel": run_cmd("uname -r"),
            "uptime": run_cmd("uptime -p"),
            "memory": run_cmd("free -h"),
            "disk": run_cmd("df -h /"),
            "ip_address": run_cmd("hostname -I"),
        })
    elif platform.system() == "Windows":
        # Windows下使用wmic或powershell
        info.update({
            "memory": run_cmd("wmic MemoryChip get Capacity /format:csv"),
            "disk": run_cmd("wmic logicaldisk get size,freespace,caption"),
            "mac_address": run_cmd("getmac /FO CSV /NH"),
        })
    elif platform.system() == "Darwin": # macOS
        info.update({
            "memory": run_cmd("sysctl hw.memsize"),
        })
    return info
def collect_software_list():
    """采集已安装软件列表"""
    software = []
    if platform.system() == "Linux":
        # Debian/Ubuntu
        output = run_cmd("dpkg-query -W -f='${Package} | ${Version} | ${Status}\n'")
        if output and "[ERROR]" not in output:
            for line in output.split('\n'):
                if 'install ok installed' in line:
                    parts = line.split(' | ')
                    software.append({"name": parts[0], "version": parts[1], "status": "installed"})
        else:
            # CentOS/RHEL
            output = run_cmd("rpm -qa --queryformat '%{NAME} | %{VERSION}\n'")
            if output and "[ERROR]" not in output:
                for line in output.split('\n'):
                    parts = line.split(' | ')
                    if len(parts) == 2:
                        software.append({"name": parts[0], "version": parts[1], "status": "installed"})
    elif platform.system() == "Windows":
        # 使用wmic获取
        output = run_cmd("wmic product get name,version /format:csv")
        if output:
            for line in output.split('\n')[1:]:  # 跳过标题
                parts = line.split(',')
                if len(parts) >= 3:
                    software.append({"name": parts[1], "version": parts[2], "status": "installed"})
    return software
def ping_host(ip):
    """检测主机是否在线 (使用ping)"""
    param = "-n 1" if platform.system().lower() == "windows" else "-c 1"
    result = subprocess.run(
        f"ping {param} -W 1 {ip}",
        shell=True,
        capture_output=True,
        text=True,
        timeout=5
    )
    return result.returncode == 0
def scan_network_segment():
    """扫描配置的网段,获取在线设备列表"""
    online_devices = []
    # 注意:这里简化了网段解析,实际生产环境建议使用python-nmap或arp-scan
    for segment in NETWORK_SEGMENTS:
        base_ip = segment.split('/')[0]
        # 假设是24位掩码,从1扫描到254
        ip_prefix = '.'.join(base_ip.split('.')[:3])
        print(f" 扫描网段: {segment} ...")
        for last_octet in range(1, 255):  # 慎用,255个IP可能会慢
            ip = f"{ip_prefix}.{last_octet}"
            if ping_host(ip):
                try:
                    hostname = socket.gethostbyaddr(ip)[0]
                except:
                    hostname = "Unknown"
                online_devices.append({
                    "ip": ip,
                    "hostname": hostname,
                    "discovered_at": datetime.datetime.now().isoformat()
                })
    return online_devices
# ============== 存储与对比模块 ==============
def save_snapshot(data, filename):
    """保存当前采集的快照到JSON文件"""
    filepath = SNAPSHOT_DIR / filename
    with open(filepath, 'w') as f:
        json.dump(data, f, indent=2, default=str)
    return filepath
def load_baseline(filename):
    """加载上次的基线数据"""
    filepath = BASELINE_DIR / filename
    if filepath.exists():
        with open(filepath, 'r') as f:
            return json.load(f)
    return {}
def save_baseline(data, filename):
    """将当前数据设为新基线"""
    filepath = BASELINE_DIR / filename
    with open(filepath, 'w') as f:
        json.dump(data, f, indent=2, default=str)
    return filepath
def compare_snapshots(current, baseline):
    """对比当前快照与基线,返回差异"""
    added = []
    removed = []
    changed = []
    # 获取键集合
    current_keys = set(current.keys())
    baseline_keys = set(baseline.keys())
    added_keys = current_keys - baseline_keys
    removed_keys = baseline_keys - current_keys
    common_keys = current_keys & baseline_keys
    for key in added_keys:
        added.append({"key": key, "value": current[key]})
    for key in removed_keys:
        removed.append({"key": key, "value": baseline[key]})
    for key in common_keys:
        if current[key] != baseline[key]:
            changed.append({
                "key": key,
                "old_value": baseline[key],
                "new_value": current[key]
            })
    return {"added": added, "removed": removed, "changed": changed}
# ============== 主逻辑 ==============
def main():
    parser = argparse.ArgumentParser(description='资产盘点脚本')
    parser.add_argument('action', choices=['collect', 'report', 'compare', 'all'],
                       help='执行操作: collect(采集) / report(生成报告) / compare(对比基线)')
    parser.add_argument('--baseline', help='基线文件名称(用于compare)', default=None)
    args = parser.parse_args()
    ensure_dirs()
    machine_id = get_machine_id()
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    if args.action in ['collect', 'all']:
        print("="*50)
        print("开始采集资产信息...")
        print("="*50)
        # 1. 采集主机信息
        print("[1/3] 采集系统信息...")
        sys_info = collect_system_info()
        print(f"  -> 主机名: {sys_info.get('hostname')}")
        # 2. 采集软件列表
        print("[2/3] 采集软件列表...")
        sw_list = collect_software_list()
        print(f"  -> 发现 {len(sw_list)} 个软件包")
        # 3. 扫描网络设备(可选)
        print("[3/3] 扫描网络设备...")
        online_devices = scan_network_segment()
        print(f"  -> 发现 {len(online_devices)} 台在线设备")
        # 组装快照
        snapshot = {
            "machine_id": machine_id,
            "timestamp": timestamp,
            "system_info": sys_info,
            "software_list": sw_list,
            "network_devices": online_devices
        }
        # 保存快照
        snapshot_file = save_snapshot(snapshot, f"snapshot_{machine_id}_{timestamp}.json")
        print(f"\n✅ 快照已保存至: {snapshot_file}")
    if args.action in ['report', 'all']:
        print("\n" + "="*50)
        print("生成盘点报告...")
        print("="*50)
        # 找到最新的快照文件
        snapshot_files = sorted(SNAPSHOT_DIR.glob(f"snapshot_{machine_id}_*.json"), reverse=True)
        if not snapshot_files:
            print("❌ 未找到快照文件,请先执行 collect")
            return
        latest_snapshot = snapshot_files[0]
        with open(latest_snapshot) as f:
            data = json.load(f)
        # 生成CSV报告
        report_file = REPORT_DIR / f"asset_report_{machine_id}_{timestamp}.csv"
        with open(report_file, 'w', newline='', encoding='utf-8') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(['类别', '字段', '值'])
            # 系统信息
            for key, value in data.get("system_info", {}).items():
                writer.writerow(['系统信息', key, value])
            # 软件列表
            for app in data.get("software_list", []):
                writer.writerow(['软件', app.get('name'), app.get('version')])
            # 网络设备
            for device in data.get("network_devices", []):
                writer.writerow(['网络设备', device.get('ip'), device.get('hostname')])
        print(f"✅ CSV报告已生成: {report_file}")
    if args.action in ['compare', 'all']:
        print("\n" + "="*50)
        print("对比基线...")
        print("="*50)
        # 如果没有指定基线,使用最新的快照作为基线的前一个版本
        snapshot_files = sorted(SNAPSHOT_DIR.glob(f"snapshot_{machine_id}_*.json"))
        if len(snapshot_files) < 2:
            print("⚠️ 需要至少两次采集才能进行对比,将当前快照设为基线。")
            if snapshot_files:
                save_baseline(data, f"baseline_{machine_id}_{timestamp}.json")
            return
        # 加载最新快照
        current_file = snapshot_files[-1]
        with open(current_file) as f:
            current = json.load(f)
        # 加载基线(上一个快照)
        baseline_file = snapshot_files[-2] if not args.baseline else BASELINE_DIR / args.baseline
        with open(baseline_file) as f:
            baseline = json.load(f)
        diff = compare_snapshots(current.get("software_list", []), baseline.get("software_list", []))
        # 输出差异报告
        diff_file = REPORT_DIR / f"diff_report_{machine_id}_{timestamp}.txt"
        with open(diff_file, 'w') as f:
            f.write(f"资产变更报告\n")
            f.write(f"时间: {datetime.datetime.now()}\n")
            f.write(f"基线: {baseline_file.name}\n")
            f.write(f"当前: {current_file.name}\n")
            f.write("="*50 + "\n")
            if diff['added']:
                f.write("\n新增资产:\n")
                for item in diff['added']:
                    f.write(f"  + {item}\n")
            if diff['removed']:
                f.write("\n移除资产:\n")
                for item in diff['removed']:
                    f.write(f"  - {item}\n")
            if diff['changed']:
                f.write("\n变更资产:\n")
                for item in diff['changed']:
                    f.write(f"  ~ {item['key']}: {item['old_value']} -> {item['new_value']}\n")
        print(f"✅ 差异报告已生成: {diff_file}")
    # 最后更新基线
    if args.action in ['collect', 'all']:
        save_baseline(snapshot, f"baseline_{machine_id}_{timestamp}.json")
        print("基线已更新。")
if __name__ == "__main__":
    # 如果你的环境没有安装额外的库,上面的ping_host函数使用了系统ping命令,是跨平台的。
    # 注意:扫描整个网段(254个IP)可能需要几分钟时间,生产环境中可以优化为并发扫描。
    main()

如何使用这个脚本

  1. 保存脚本:将上述代码保存为 asset_inventory.py

  2. 首次运行(采集 + 生成初始基线)

    python3 asset_inventory.py all

    这会在当前目录下创建 asset_inventory 文件夹,包含 snapshotbaselinereport 子目录。

  3. 第二次运行(采集最新数据)

    python3 asset_inventory.py collect
  4. 对比两次采集的差异

    python3 asset_inventory.py compare

    这会自动找到最新的两次快照进行对比。

  5. 生成可读报告(CSV格式)

    python3 asset_inventory.py report

进阶建议 & 定制化

  1. 生产级扫描

    • Nmap集成:将 ping_host 替换为 python-nmap 库,可以获取开放端口、操作系统指纹等更详细的资产信息。
    • 并发扫描:使用 concurrent.futures.ThreadPoolExecutor 来加速网络扫描。
  2. 数据持久化

    • JSON 文件改为 SQLiteMySQL 数据库,使用 sqlite3 模块存储历史数据,便于SQL查询和分析。
  3. 跨平台优化

    • Windows:使用 wmi 库(pip install wmi)获取更精准的硬件信息(如序列号、BIOS版本、MAC地址)。
    • Linux:读取 /proc 文件系统(如 /proc/cpuinfo, /proc/meminfo)获取详细信息。
  4. CMDB对接

    • 如果是公司级资产盘点,脚本采集到的数据可以通过REST API自动推送到 CMDB(配置管理数据库)中。
  5. 自动化调度

    • 使用 crontab (Linux) 或 Task Scheduler (Windows) 每天凌晨自动运行 collectcompare 操作,然后通过邮件发送差异报告。

风险提示

  • 权限:采集某些信息(如物理内存、磁盘序列号)需要管理员/root权限,建议用非root用户运行普通采集,并用特定工具(如sudoers配置)赋予必要权限。
  • 网络扫描:在企业网络中进行大规模ping扫描可能会被入侵检测系统(IDS)视为攻击行为,务必获得网络管理员的授权
  • 隐私:保存的数据可能包含个人电脑上的软件列表,确保遵守公司的隐私政策。

你可以直接复制使用上面的脚本,根据你的实际需求(例如只采集Linux服务器,或只扫描特定IP列表)修改其中的函数和配置即可。

抱歉,评论功能暂时关闭!