服务依赖检查脚本?

wen 实用脚本 41

本文目录导读:

服务依赖检查脚本?

  1. Linux系统服务依赖检查 (Bash)
  2. Docker服务依赖检查
  3. Python通用依赖检查脚本
  4. 配置文件示例 (config.json)
  5. 使用示例
  6. 安装依赖 (Python版本)

我来为你提供几个常用的服务依赖检查脚本,适用于不同场景。

Linux系统服务依赖检查 (Bash)

基础依赖检查脚本

#!/bin/bash
# 服务依赖检查脚本
# 使用方法: ./check_deps.sh <服务名称>
SERVICE=$1
TIMEOUT=30
INTERVAL=3
# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[0;33m'
NC='\033[0m' # No Color
# 检查服务状态
check_service() {
    local service=$1
    if systemctl is-active --quiet $service; then
        return 0
    fi
    return 1
}
# 等待服务启动
wait_for_service() {
    local service=$1
    local timeout=$2
    local interval=$3
    local elapsed=0
    while [ $elapsed -lt $timeout ]; do
        if check_service $service; then
            echo -e "${GREEN}[✓] $service 已启动${NC}"
            return 0
        fi
        sleep $interval
        elapsed=$((elapsed + interval))
    done
    echo -e "${RED}[✗] $service 启动超时${NC}"
    return 1
}
# 获取服务依赖
get_dependencies() {
    local service=$1
    # 获取 Requires 和 After 中的依赖
    local deps=$(systemctl show $service | grep -E "Requires|After" | \
                 sed 's/.*=//' | tr ' ' '\n' | grep -v "^$" | grep -v ".target" | sort -u)
    echo "$deps"
}
# 主逻辑
main() {
    if [ -z "$SERVICE" ]; then
        echo -e "${YELLOW}用法: $0 <服务名称>${NC}"
        echo "示例: $0 nginx"
        exit 1
    fi
    if ! systemctl list-units --type=service | grep -q "$SERVICE"; then
        echo -e "${RED}[✗] 服务 $SERVICE 不存在${NC}"
        exit 1
    fi
    echo -e "${YELLOW}===== 服务依赖检查: $SERVICE =====${NC}"
    echo ""
    # 获取依赖
    local deps=$(get_dependencies $SERVICE)
    if [ -z "$deps" ]; then
        echo -e "${GREEN}[✓] 没有发现依赖服务${NC}"
        exit 0
    fi
    echo -e "${YELLOW}发现依赖服务:${NC}"
    echo "$deps"
    echo ""
    # 检查每个依赖
    local failed=0
    for dep in $deps; do
        if check_service $dep; then
            echo -e "${GREEN}[✓] $dep 运行中${NC}"
        else
            echo -e "${RED}[✗] $dep 未运行${NC}"
            echo -n "是否启动 $dep? (y/n): "
            read answer
            if [ "$answer" = "y" ] || [ "$answer" = "Y" ]; then
                systemctl start $dep
                if wait_for_service $dep $TIMEOUT $INTERVAL; then
                    echo -e "${GREEN}[✓] $dep 启动成功${NC}"
                else
                    echo -e "${RED}[✗] $dep 启动失败${NC}"
                    failed=1
                fi
            else
                failed=1
            fi
        fi
    done
    echo ""
    if [ $failed -eq 0 ]; then
        echo -e "${GREEN}所有依赖服务检查通过${NC}"
        exit 0
    else
        echo -e "${RED}存在未满足的依赖${NC}"
        exit 1
    fi
}
main

Docker服务依赖检查

#!/bin/bash
# Docker服务依赖检查
DOCKER_COMPOSE_FILE="docker-compose.yml"
TIMEOUT=60
INTERVAL=5
# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[0;33m'
NC='\033[0m'
# 检查服务是否运行
check_docker_service() {
    local service=$1
    if docker ps --format '{{.Names}}' | grep -q "^${service}$"; then
        return 0
    fi
    return 1
}
# 等待服务就绪
wait_for_docker_service() {
    local service=$1
    local timeout=$2
    local interval=$3
    local elapsed=0
    echo -n "等待 $service 就绪"
    while [ $elapsed -lt $timeout ]; do
        if check_docker_service $service; then
            echo -e "${GREEN} [✓]${NC}"
            return 0
        fi
        echo -n "."
        sleep $interval
        elapsed=$((elapsed + interval))
    done
    echo -e "${RED} [✗] 超时${NC}"
    return 1
}
# 获取服务依赖关系
get_docker_dependencies() {
    local service=$1
    # 如果存在 docker-compose.yml,解析依赖
    if [ -f "$DOCKER_COMPOSE_FILE" ]; then
        depends_on=$(grep -A5 "^\s*${service}:" "$DOCKER_COMPOSE_FILE" | \
                    grep "depends_on" -A1 | tail -1 | tr -d ' ' | tr -d '-')
        echo "$depends_on"
    fi
}
# 主函数
main() {
    local target_service=${1:-""}
    if [ -z "$target_service" ]; then
        echo -e "${YELLOW}用法: $0 <服务名称>${NC}"
        exit 1
    fi
    echo -e "${YELLOW}===== Docker服务依赖检查: $target_service =====${NC}"
    # 获取依赖
    local deps=$(get_docker_dependencies $target_service)
    if [ -z "$deps" ]; then
        echo -e "${GREEN}[✓] 没有依赖服务${NC}"
        exit 0
    fi
    echo -e "${YELLOW}依赖服务: $deps${NC}"
    # 检查依赖
    local all_ready=true
    for dep in $deps; do
        if ! wait_for_docker_service $dep $TIMEOUT $INTERVAL; then
            all_ready=false
        fi
    done
    if [ "$all_ready" = true ]; then
        echo -e "${GREEN}[✓] 所有依赖检查通过${NC}"
    else
        echo -e "${RED}[✗] 存在未满足的依赖${NC}"
        exit 1
    fi
}
main $@

Python通用依赖检查脚本

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket
import time
import sys
import os
import subprocess
import json
import argparse
class ServiceDependencyChecker:
    def __init__(self, config_file=None):
        self.services = {}
        self.config = {}
        if config_file:
            self.load_config(config_file)
    def load_config(self, config_file):
        """加载配置文件"""
        if os.path.exists(config_file):
            with open(config_file, 'r') as f:
                self.config = json.load(f)
        else:
            print(f"配置文件 {config_file} 不存在")
            sys.exit(1)
    def check_tcp_port(self, host, port, timeout=5):
        """检查TCP端口是否可达"""
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(timeout)
            result = sock.connect_ex((host, port))
            sock.close()
            return result == 0
        except:
            return False
    def check_http_endpoint(self, url, timeout=5):
        """检查HTTP端点"""
        try:
            import requests
            response = requests.get(url, timeout=timeout)
            return response.status_code < 500
        except:
            return False
    def check_process(self, process_name):
        """检查进程是否存在"""
        try:
            result = subprocess.run(
                ['pgrep', '-f', process_name],
                capture_output=True,
                text=True
            )
            return result.returncode == 0
        except:
            return False
    def check_service(self, service_name, service_config):
        """检查单个服务"""
        service_type = service_config.get('type', 'tcp')
        timeout = service_config.get('timeout', 5)
        if service_type == 'tcp':
            host = service_config.get('host', 'localhost')
            port = service_config.get('port', 80)
            return self.check_tcp_port(host, port, timeout)
        elif service_type == 'http':
            url = service_config.get('url')
            return self.check_http_endpoint(url, timeout)
        elif service_type == 'process':
            process = service_config.get('process')
            return self.check_process(process)
        return False
    def wait_for_services(self, services, timeout=30, interval=3):
        """等待服务就绪"""
        start_time = time.time()
        while time.time() - start_time < timeout:
            all_ready = True
            for service_name, service_config in services.items():
                if not self.check_service(service_name, service_config):
                    all_ready = False
                    print(f"  [✗] {service_name} 未就绪")
                else:
                    print(f"  [✓] {service_name} 已就绪")
            if all_ready:
                return True
            print(f"\n等待 {interval} 秒后重试...")
            time.sleep(interval)
        return False
    def check_dependencies(self, service_name):
        """检查指定服务的依赖"""
        if service_name not in self.config.get('services', {}):
            print(f"服务 {service_name} 未定义")
            return False
        service = self.config['services'][service_name]
        dependencies = service.get('depends_on', [])
        if not dependencies:
            print(f"[✓] {service_name} 没有依赖")
            return True
        print(f"\n检查 {service_name} 的依赖:")
        for dep in dependencies:
            if dep in self.config.get('services', {}):
                dep_config = self.config['services'][dep]
                if self.check_service(dep, dep_config):
                    print(f"  [✓] {dep} 可用")
                else:
                    print(f"  [✗] {dep} 不可用")
                    return False
        return True
def main():
    parser = argparse.ArgumentParser(description='服务依赖检查工具')
    parser.add_argument('-c', '--config', help='配置文件路径')
    parser.add_argument('-s', '--service', help='要检查的服务名称')
    parser.add_argument('-w', '--wait', action='store_true', help='等待所有依赖就绪')
    args = parser.parse_args()
    if not args.config:
        # 使用简单的内联检查
        checker = ServiceDependencyChecker()
        if args.service:
            # 简单检查单个服务
            if checker.check_process(args.service):
                print(f"[✓] {args.service} 运行中")
            else:
                print(f"[✗] {args.service} 未运行")
        else:
            print("请提供配置文件或服务名称")
    else:
        checker = ServiceDependencyChecker(args.config)
        if args.wait:
            # 等待所有服务就绪
            services = checker.config.get('services', {})
            if checker.wait_for_services(services):
                print("\n[✓] 所有服务就绪")
            else:
                print("\n[✗] 服务就绪超时")
                sys.exit(1)
        else:
            # 检查依赖
            checker.check_dependencies(args.service)
if __name__ == '__main__':
    main()

配置文件示例 (config.json)

{
  "services": {
    "database": {
      "type": "tcp",
      "host": "localhost",
      "port": 5432,
      "timeout": 5
    },
    "redis": {
      "type": "tcp",
      "host": "localhost",
      "port": 6379,
      "timeout": 3
    },
    "api_server": {
      "type": "http",
      "url": "http://localhost:8080/health",
      "timeout": 10
    },
    "worker": {
      "type": "process",
      "process": "celery"
    }
  },
  "applications": {
    "web_app": {
      "depends_on": ["database", "redis"],
      "type": "http",
      "url": "http://localhost:3000"
    },
    "batch_job": {
      "depends_on": ["database", "worker"],
      "type": "process",
      "process": "batch_processor"
    }
  }
}

使用示例

# 1. Bash脚本
./check_deps.sh nginx
# 2. Docker检查
./docker_check.sh web_app
# 3. Python脚本 (使用配置文件)
python3 dep_checker.py -c config.json -s web_app
python3 dep_checker.py -c config.json -s web_app -w
# 4. 简单检查
python3 dep_checker.py -s nginx

安装依赖 (Python版本)

pip install requests  # HTTP检查需要

这些脚本可以根据你的具体需求进行修改和扩展,选择合适的版本,根据实际场景调整超时时间、检查间隔等参数。

抱歉,评论功能暂时关闭!