本文目录导读:

我来为你提供几个常用的服务依赖检查脚本,适用于不同场景。
Linux系统服务依赖检查 (Bash)
基础依赖检查脚本
#!/bin/bash
# 服务依赖检查脚本
# 使用方法: ./check_deps.sh <服务名称>
SERVICE=$1
TIMEOUT=30
INTERVAL=3
# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[0;33m'
NC='\033[0m' # No Color
# 检查服务状态
check_service() {
local service=$1
if systemctl is-active --quiet $service; then
return 0
fi
return 1
}
# 等待服务启动
wait_for_service() {
local service=$1
local timeout=$2
local interval=$3
local elapsed=0
while [ $elapsed -lt $timeout ]; do
if check_service $service; then
echo -e "${GREEN}[✓] $service 已启动${NC}"
return 0
fi
sleep $interval
elapsed=$((elapsed + interval))
done
echo -e "${RED}[✗] $service 启动超时${NC}"
return 1
}
# 获取服务依赖
get_dependencies() {
local service=$1
# 获取 Requires 和 After 中的依赖
local deps=$(systemctl show $service | grep -E "Requires|After" | \
sed 's/.*=//' | tr ' ' '\n' | grep -v "^$" | grep -v ".target" | sort -u)
echo "$deps"
}
# 主逻辑
main() {
if [ -z "$SERVICE" ]; then
echo -e "${YELLOW}用法: $0 <服务名称>${NC}"
echo "示例: $0 nginx"
exit 1
fi
if ! systemctl list-units --type=service | grep -q "$SERVICE"; then
echo -e "${RED}[✗] 服务 $SERVICE 不存在${NC}"
exit 1
fi
echo -e "${YELLOW}===== 服务依赖检查: $SERVICE =====${NC}"
echo ""
# 获取依赖
local deps=$(get_dependencies $SERVICE)
if [ -z "$deps" ]; then
echo -e "${GREEN}[✓] 没有发现依赖服务${NC}"
exit 0
fi
echo -e "${YELLOW}发现依赖服务:${NC}"
echo "$deps"
echo ""
# 检查每个依赖
local failed=0
for dep in $deps; do
if check_service $dep; then
echo -e "${GREEN}[✓] $dep 运行中${NC}"
else
echo -e "${RED}[✗] $dep 未运行${NC}"
echo -n "是否启动 $dep? (y/n): "
read answer
if [ "$answer" = "y" ] || [ "$answer" = "Y" ]; then
systemctl start $dep
if wait_for_service $dep $TIMEOUT $INTERVAL; then
echo -e "${GREEN}[✓] $dep 启动成功${NC}"
else
echo -e "${RED}[✗] $dep 启动失败${NC}"
failed=1
fi
else
failed=1
fi
fi
done
echo ""
if [ $failed -eq 0 ]; then
echo -e "${GREEN}所有依赖服务检查通过${NC}"
exit 0
else
echo -e "${RED}存在未满足的依赖${NC}"
exit 1
fi
}
main
Docker服务依赖检查
#!/bin/bash
# Docker服务依赖检查
DOCKER_COMPOSE_FILE="docker-compose.yml"
TIMEOUT=60
INTERVAL=5
# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[0;33m'
NC='\033[0m'
# 检查服务是否运行
check_docker_service() {
local service=$1
if docker ps --format '{{.Names}}' | grep -q "^${service}$"; then
return 0
fi
return 1
}
# 等待服务就绪
wait_for_docker_service() {
local service=$1
local timeout=$2
local interval=$3
local elapsed=0
echo -n "等待 $service 就绪"
while [ $elapsed -lt $timeout ]; do
if check_docker_service $service; then
echo -e "${GREEN} [✓]${NC}"
return 0
fi
echo -n "."
sleep $interval
elapsed=$((elapsed + interval))
done
echo -e "${RED} [✗] 超时${NC}"
return 1
}
# 获取服务依赖关系
get_docker_dependencies() {
local service=$1
# 如果存在 docker-compose.yml,解析依赖
if [ -f "$DOCKER_COMPOSE_FILE" ]; then
depends_on=$(grep -A5 "^\s*${service}:" "$DOCKER_COMPOSE_FILE" | \
grep "depends_on" -A1 | tail -1 | tr -d ' ' | tr -d '-')
echo "$depends_on"
fi
}
# 主函数
main() {
local target_service=${1:-""}
if [ -z "$target_service" ]; then
echo -e "${YELLOW}用法: $0 <服务名称>${NC}"
exit 1
fi
echo -e "${YELLOW}===== Docker服务依赖检查: $target_service =====${NC}"
# 获取依赖
local deps=$(get_docker_dependencies $target_service)
if [ -z "$deps" ]; then
echo -e "${GREEN}[✓] 没有依赖服务${NC}"
exit 0
fi
echo -e "${YELLOW}依赖服务: $deps${NC}"
# 检查依赖
local all_ready=true
for dep in $deps; do
if ! wait_for_docker_service $dep $TIMEOUT $INTERVAL; then
all_ready=false
fi
done
if [ "$all_ready" = true ]; then
echo -e "${GREEN}[✓] 所有依赖检查通过${NC}"
else
echo -e "${RED}[✗] 存在未满足的依赖${NC}"
exit 1
fi
}
main $@
Python通用依赖检查脚本
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket
import time
import sys
import os
import subprocess
import json
import argparse
class ServiceDependencyChecker:
def __init__(self, config_file=None):
self.services = {}
self.config = {}
if config_file:
self.load_config(config_file)
def load_config(self, config_file):
"""加载配置文件"""
if os.path.exists(config_file):
with open(config_file, 'r') as f:
self.config = json.load(f)
else:
print(f"配置文件 {config_file} 不存在")
sys.exit(1)
def check_tcp_port(self, host, port, timeout=5):
"""检查TCP端口是否可达"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
except:
return False
def check_http_endpoint(self, url, timeout=5):
"""检查HTTP端点"""
try:
import requests
response = requests.get(url, timeout=timeout)
return response.status_code < 500
except:
return False
def check_process(self, process_name):
"""检查进程是否存在"""
try:
result = subprocess.run(
['pgrep', '-f', process_name],
capture_output=True,
text=True
)
return result.returncode == 0
except:
return False
def check_service(self, service_name, service_config):
"""检查单个服务"""
service_type = service_config.get('type', 'tcp')
timeout = service_config.get('timeout', 5)
if service_type == 'tcp':
host = service_config.get('host', 'localhost')
port = service_config.get('port', 80)
return self.check_tcp_port(host, port, timeout)
elif service_type == 'http':
url = service_config.get('url')
return self.check_http_endpoint(url, timeout)
elif service_type == 'process':
process = service_config.get('process')
return self.check_process(process)
return False
def wait_for_services(self, services, timeout=30, interval=3):
"""等待服务就绪"""
start_time = time.time()
while time.time() - start_time < timeout:
all_ready = True
for service_name, service_config in services.items():
if not self.check_service(service_name, service_config):
all_ready = False
print(f" [✗] {service_name} 未就绪")
else:
print(f" [✓] {service_name} 已就绪")
if all_ready:
return True
print(f"\n等待 {interval} 秒后重试...")
time.sleep(interval)
return False
def check_dependencies(self, service_name):
"""检查指定服务的依赖"""
if service_name not in self.config.get('services', {}):
print(f"服务 {service_name} 未定义")
return False
service = self.config['services'][service_name]
dependencies = service.get('depends_on', [])
if not dependencies:
print(f"[✓] {service_name} 没有依赖")
return True
print(f"\n检查 {service_name} 的依赖:")
for dep in dependencies:
if dep in self.config.get('services', {}):
dep_config = self.config['services'][dep]
if self.check_service(dep, dep_config):
print(f" [✓] {dep} 可用")
else:
print(f" [✗] {dep} 不可用")
return False
return True
def main():
parser = argparse.ArgumentParser(description='服务依赖检查工具')
parser.add_argument('-c', '--config', help='配置文件路径')
parser.add_argument('-s', '--service', help='要检查的服务名称')
parser.add_argument('-w', '--wait', action='store_true', help='等待所有依赖就绪')
args = parser.parse_args()
if not args.config:
# 使用简单的内联检查
checker = ServiceDependencyChecker()
if args.service:
# 简单检查单个服务
if checker.check_process(args.service):
print(f"[✓] {args.service} 运行中")
else:
print(f"[✗] {args.service} 未运行")
else:
print("请提供配置文件或服务名称")
else:
checker = ServiceDependencyChecker(args.config)
if args.wait:
# 等待所有服务就绪
services = checker.config.get('services', {})
if checker.wait_for_services(services):
print("\n[✓] 所有服务就绪")
else:
print("\n[✗] 服务就绪超时")
sys.exit(1)
else:
# 检查依赖
checker.check_dependencies(args.service)
if __name__ == '__main__':
main()
配置文件示例 (config.json)
{
"services": {
"database": {
"type": "tcp",
"host": "localhost",
"port": 5432,
"timeout": 5
},
"redis": {
"type": "tcp",
"host": "localhost",
"port": 6379,
"timeout": 3
},
"api_server": {
"type": "http",
"url": "http://localhost:8080/health",
"timeout": 10
},
"worker": {
"type": "process",
"process": "celery"
}
},
"applications": {
"web_app": {
"depends_on": ["database", "redis"],
"type": "http",
"url": "http://localhost:3000"
},
"batch_job": {
"depends_on": ["database", "worker"],
"type": "process",
"process": "batch_processor"
}
}
}
使用示例
# 1. Bash脚本 ./check_deps.sh nginx # 2. Docker检查 ./docker_check.sh web_app # 3. Python脚本 (使用配置文件) python3 dep_checker.py -c config.json -s web_app python3 dep_checker.py -c config.json -s web_app -w # 4. 简单检查 python3 dep_checker.py -s nginx
安装依赖 (Python版本)
pip install requests # HTTP检查需要
这些脚本可以根据你的具体需求进行修改和扩展,选择合适的版本,根据实际场景调整超时时间、检查间隔等参数。