本文目录导读:

在Python中守护程序进程(Daemon Process)主要有两种理解:一种是创建系统级守护进程(后台服务),另一种是监控并保持目标进程运行(进程守护者),下面分别介绍这两种实现方式。
创建守护进程(后台运行服务)
使用标准库的 daemon 实现
#!/usr/bin/env python3
import os
import sys
import time
import atexit
import signal
def daemonize(pidfile=None):
"""
将当前进程转变为守护进程
"""
# 第一次fork:脱离控制终端
pid = os.fork()
if pid > 0:
# 父进程退出
sys.exit(0)
# 子进程成为新会话组长
os.setsid()
# 第二次fork:确保进程不会重新控制终端
pid = os.fork()
if pid > 0:
sys.exit(0)
# 设置工作目录
os.chdir('/')
# 设置文件权限掩码
os.umask(0)
# 关闭文件描述符
maxfd = os.sysconf('SC_OPEN_MAX')
if maxfd is None or maxfd < 0:
maxfd = 1024
for fd in range(maxfd):
try:
os.close(fd)
except OSError:
pass
# 重定向标准文件描述符到/dev/null
os.open('/dev/null', os.O_RDWR) # 0: stdin
os.dup2(0, 1) # 1: stdout
os.dup2(0, 2) # 2: stderr
# 写入PID文件
if pidfile:
with open(pidfile, 'w') as f:
f.write(str(os.getpid()))
atexit.register(lambda: os.remove(pidfile))
# 处理信号
signal.signal(signal.SIGCHLD, signal.SIG_IGN)
signal.signal(signal.SIGHUP, signal.SIG_IGN)
# 使用示例
def main():
daemonize('/var/run/mydaemon.pid')
# 这里是守护进程的业务逻辑
while True:
with open('/var/log/mydaemon.log', 'a') as f:
f.write(f'Daemon running at {time.ctime()}\n')
time.sleep(60)
if __name__ == '__main__':
main()
使用 python-daemon 库(推荐)
首先安装:
pip install python-daemon
使用示例:
#!/usr/bin/env python3
import daemon
import time
import logging
def run_daemon():
"""守护进程的业务逻辑"""
# 配置日志
logging.basicConfig(
filename='/var/log/mydaemon.log',
level=logging.INFO,
format='%(asctime)s - %(message)s'
)
while True:
logging.info('Daemon is running...')
time.sleep(60)
# 创建并运行守护进程
with daemon.DaemonContext(
working_directory='/var/run',
umask=0o002,
pidfile=daemon.pidfile.TimeoutPIDLockFile('/var/run/mydaemon.pid'),
stdout=open('/var/log/mydaemon_stdout.log', 'w'),
stderr=open('/var/log/mydaemon_stderr.log', 'w'),
):
run_daemon()
监控并守护目标进程
简单的进程监控器
#!/usr/bin/env python3
import os
import sys
import time
import subprocess
import signal
class ProcessGuard:
"""
进程守护者:监控并保持指定进程运行
"""
def __init__(self, command, check_interval=5):
self.command = command
self.check_interval = check_interval
self.process = None
def start_process(self):
"""启动目标进程"""
print(f'Starting process: {self.command}')
self.process = subprocess.Popen(
self.command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
def check_process(self):
"""检查进程是否存活"""
if self.process is None:
return False
# 检查进程是否还在运行
return self.process.poll() is None
def restart_process(self):
"""重启进程"""
print(f'Restarting process: {self.command}')
if self.process:
try:
self.process.terminate()
self.process.wait(timeout=5)
except:
self.process.kill()
self.start_process()
def run(self):
"""启动守护循环"""
print('Process guard started')
# 信号处理
def signal_handler(signum, frame):
print(f'Received signal {signum}, stopping...')
if self.process:
self.process.terminate()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# 首次启动
self.start_process()
try:
while True:
if not self.check_process():
print('Process died, restarting...')
self.restart_process()
time.sleep(self.check_interval)
except KeyboardInterrupt:
print('Stopping guard...')
if self.process:
self.process.terminate()
# 使用示例
if __name__ == '__main__':
# 守护一个Python脚本
guard = ProcessGuard('python3 my_worker.py', check_interval=5)
guard.run()
使用 supervisor(生产环境推荐)
Supervisor 是一个流行的进程管理工具,配置文件示例:
; /etc/supervisor/conf.d/myapp.conf [program:myapp] command=python3 /path/to/your/app.py directory=/path/to/your/app user=www-data autostart=true autorestart=true startsecs=5 startretries=3 ; 日志配置 stdout_logfile=/var/log/myapp_stdout.log stderr_logfile=/var/log/myapp_stderr.log stdout_logfile_maxbytes=10MB stderr_logfile_maxbytes=10MB ; 环境变量 environment=PYTHONUNBUFFERED=1
使用命令:
# 安装 supervisor pip install supervisor # 启动 supervisor supervisord -c /etc/supervisor/supervisord.conf # 管理进程 supervisorctl status supervisorctl restart myapp supervisorctl start myapp supervisorctl stop myapp
实用工具:使用 systemd(Linux推荐)
创建 systemd 服务文件:
; /etc/systemd/system/myapp.service [Unit] Description=My Python Application After=network.target [Service] Type=simple User=www-data WorkingDirectory=/path/to/your/app ExecStart=/usr/bin/python3 /path/to/your/app.py Restart=always RestartSec=5 ; 日志配置 StandardOutput=syslog StandardError=syslog SyslogIdentifier=myapp [Install] WantedBy=multi-user.target
使用命令:
# 重新加载配置 sudo systemctl daemon-reload # 启动服务 sudo systemctl start myapp # 设置开机自启 sudo systemctl enable myapp # 查看状态 sudo systemctl status myapp # 查看日志 sudo journalctl -u myapp # 停止服务 sudo systemctl stop myapp
完整的守护进程示例:带健康检查
#!/usr/bin/env python3
"""
完整的守护进程示例:包含健康检查和自动重启
"""
import os
import sys
import time
import signal
import logging
import subprocess
from datetime import datetime
class HealthCheckDaemon:
"""带健康检查的守护进程"""
def __init__(self, target_script, check_port=None):
self.target_script = target_script
self.check_port = check_port
self.process = None
self.restart_count = 0
self.max_restarts = 5
self.restart_window = 60 # 60秒内最多重启5次
self.restart_times = []
# 配置日志
self.setup_logging()
def setup_logging(self):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler('/var/log/health_daemon.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def start_target(self):
"""启动目标脚本"""
try:
self.process = subprocess.Popen(
['python3', self.target_script],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
self.logger.info(f'Started target process (PID: {self.process.pid})')
return True
except Exception as e:
self.logger.error(f'Failed to start target: {e}')
return False
def check_health(self):
"""健康检查"""
if self.process is None:
return False
# 检查进程是否存活
if self.process.poll() is not None:
self.logger.warning('Process has exited')
return False
# 可选:检查端口是否监听
if self.check_port:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(('localhost', self.check_port))
sock.close()
return True
except:
self.logger.warning(f'Port {self.check_port} is not responding')
return False
return True
def restart_target(self):
"""重启目标进程"""
current_time = time.time()
# 清理旧的重启记录
self.restart_times = [t for t in self.restart_times
if current_time - t < self.restart_window]
# 检查重启频率
if len(self.restart_times) >= self.max_restarts:
self.logger.error(f'Too many restarts in {self.restart_window}s')
return False
# 停止旧进程
if self.process:
try:
self.process.terminate()
self.process.wait(timeout=5)
except:
self.process.kill()
# 启动新进程
self.restart_times.append(current_time)
return self.start_target()
def cleanup(self):
"""清理资源"""
if self.process and self.process.poll() is None:
self.logger.info('Cleaning up...')
self.process.terminate()
try:
self.process.wait(timeout=5)
except:
self.process.kill()
def run(self):
"""运行守护进程"""
# 信号处理
def signal_handler(signum, frame):
self.logger.info(f'Received signal {signum}')
self.cleanup()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# 首次启动
if not self.start_target():
sys.exit(1)
try:
while True:
if not self.check_health():
if not self.restart_target():
self.logger.critical('Failed to restart, exiting')
break
time.sleep(10) # 每10秒检查一次
except KeyboardInterrupt:
self.logger.info('Received keyboard interrupt')
finally:
self.cleanup()
# 使用示例
if __name__ == '__main__':
daemon = HealthCheckDaemon(
target_script='/path/to/your/worker.py',
check_port=8080 # 可选,检查端口
)
daemon.run()
最佳实践建议
- 生产环境:推荐使用
supervisor或systemd管理进程 - 日志管理:使用日志轮转避免磁盘占满
- 错误处理:实现优雅的重启策略(指数退避)
- 监控告警:添加监控指标(CPU、内存、连接数)
- 资源限制:使用
ulimit限制资源使用 - 配置管理:配置文件独立于代码
选择合适的方案取决于你的具体需求:如果需要简单的后台运行,使用 supervisor;如果需要深度定制,可以自己实现守护进程逻辑。