本文目录导读:

- 使用内置queue模块实现简单任务队列
- 使用concurrent.futures实现线程池任务队列
- 使用Redis实现分布式任务队列
- 使用Celery实现专业任务队列
- 使用asyncio实现异步任务队列
- 使用建议
我来介绍几种Python实现任务队列的方法,从简单到复杂:
使用内置queue模块实现简单任务队列
import queue
import threading
import time
import random
# 创建任务队列
task_queue = queue.Queue()
# 生产者:生成任务
def producer():
task_id = 0
while True:
task_id += 1
task_data = {
'id': task_id,
'type': random.choice(['计算', '下载', '处理']),
'data': random.randint(1, 100)
}
task_queue.put(task_data)
print(f"生产任务: {task_data}")
time.sleep(random.uniform(0.5, 2))
# 消费者:执行任务
def consumer(worker_id):
while True:
task = task_queue.get()
print(f"工作者{worker_id} 开始处理任务: {task}")
# 模拟任务处理
processing_time = random.uniform(1, 3)
time.sleep(processing_time)
# 任务处理结果
result = {
'task_id': task['id'],
'result': f"{task['type']}完成",
'worker': worker_id,
'duration': processing_time
}
print(f"任务完成: {result}")
task_queue.task_done()
# 启动系统
print("启动任务队列系统...")
# 启动生产者线程
producer_thread = threading.Thread(target=producer, daemon=True)
producer_thread.start()
# 启动多个消费者线程
workers = []
for i in range(3):
worker_thread = threading.Thread(target=consumer, args=(i+1,), daemon=True)
worker_thread.start()
workers.append(worker_thread)
# 主线程等待(示例运行10秒)
try:
time.sleep(10)
except KeyboardInterrupt:
print("系统停止")
使用concurrent.futures实现线程池任务队列
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
def process_task(task_id, task_type, data):
"""处理单个任务"""
print(f"开始处理任务 {task_id}: {task_type}")
time.sleep(random.uniform(1, 3))
result = f"任务{task_id}({task_type})处理完成,数据: {data}"
return task_id, result
class TaskQueue:
def __init__(self, max_workers=4):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.tasks = []
def add_task(self, task_type, data):
"""添加任务到队列"""
task_id = len(self.tasks) + 1
future = self.executor.submit(process_task, task_id, task_type, data)
self.tasks.append(future)
print(f"添加任务: #{task_id} - {task_type}")
return future
def wait_for_completion(self):
"""等待所有任务完成"""
print("等待所有任务完成...")
completed = 0
for future in as_completed(self.tasks):
task_id, result = future.result()
completed += 1
print(f"任务 #{task_id} 完成: {result}")
print(f"所有{completed}个任务处理完成")
# 使用示例
queue = TaskQueue(max_workers=3)
# 添加不同类型任务
queue.add_task("数据处理", "用户数据分析")
queue.add_task("文件下载", "https://example.com/file.zip")
queue.add_task("图片处理", "image.jpg")
queue.add_task("邮件发送", "user@example.com")
queue.add_task("日志分析", "server.log")
# 等待所有任务完成
queue.wait_for_completion()
使用Redis实现分布式任务队列
import redis
import json
import time
import uuid
class RedisTaskQueue:
def __init__(self, queue_name='task_queue'):
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
self.queue_name = queue_name
def add_task(self, task_type, data, priority=5):
"""添加任务到Redis队列"""
task = {
'id': str(uuid.uuid4()),
'type': task_type,
'data': data,
'priority': priority,
'created_at': time.time()
}
# 使用有序集合实现优先级队列
self.redis_client.zadd(self.queue_name, {json.dumps(task): -priority})
print(f"添加任务: {task['id']} ({task_type})")
return task['id']
def get_task(self, timeout=0):
"""获取并执行下一个任务"""
while True:
# 获取优先级最高的任务
tasks = self.redis_client.zrange(self.queue_name, 0, 0, withscores=True)
if tasks:
task_json, score = tasks[0]
task = json.loads(task_json)
# 从队列中移除
self.redis_client.zrem(self.queue_name, task_json)
print(f"执行任务: {task['id']}")
# 处理任务
if task['type'] == 'email':
self._send_email(task['data'])
elif task['type'] == 'report':
self._generate_report(task['data'])
elif task['type'] == 'backup':
self._backup_data(task['data'])
return task
if timeout:
time.sleep(0.1)
else:
return None
def _send_email(self, data):
"""模拟发送邮件"""
print(f"📧 发送邮件到: {data}")
time.sleep(2)
def _generate_report(self, data):
"""模拟生成报告"""
print(f"📊 生成报告: {data}")
time.sleep(3)
def _backup_data(self, data):
"""模拟备份数据"""
print(f"💾 备份数据: {data}")
time.sleep(4)
def get_queue_length(self):
"""获取队列长度"""
return self.redis_client.zcard(self.queue_name)
# 使用示例
def main():
# 创建任务队列
task_queue = RedisTaskQueue()
# 添加各种任务
task_queue.add_task('email', {'to': 'user@example.com', 'subject': 'Hello'})
task_queue.add_task('report', 'monthly_sales_report')
task_queue.add_task('backup', '/var/www/html')
# 处理任务
print(f"队列中有 {task_queue.get_queue_length()} 个任务")
task_queue.get_task() # 处理一个任务
task_queue.get_task() # 处理下一个任务
print(f"剩余 {task_queue.get_queue_length()} 个任务")
if __name__ == "__main__":
main()
使用Celery实现专业任务队列
# 首先安装: pip install celery redis
# tasks.py
from celery import Celery
import time
# 创建Celery应用
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def send_email(to, subject, body):
"""发送邮件任务"""
print(f"发送邮件到 {to}")
print(f"主题: {subject}")
time.sleep(3) # 模拟发送过程
print("邮件发送完成")
return f"邮件已发送到 {to}"
@app.task
def process_image(image_path, filters=None):
"""图片处理任务"""
print(f"处理图片: {image_path}")
if filters:
print(f"应用滤镜: {filters}")
time.sleep(5)
print("图片处理完成")
return f"图片已处理: {image_path}"
@app.task
def generate_report(report_type, data):
"""生成报告任务"""
print(f"生成{report_type}报告")
time.sleep(4)
print("报告生成完成")
return f"{report_type}报告已生成"
# 调用端代码
def submit_tasks():
# 异步提交任务
email_task = send_email.delay(
'user@example.com',
'欢迎注册',
'感谢您注册我们的服务'
)
image_task = process_image.delay(
'/path/to/image.jpg',
filters=['grayscale', 'blur']
)
report_task = generate_report.delay(
'月度销售',
{'month': '2024-01', 'sales': 100000}
)
print(f"邮件任务ID: {email_task.id}")
print(f"图片任务ID: {image_task.id}")
print(f"报告任务ID: {report_task.id}")
# 获取任务结果
print(f"邮件状态: {email_task.status}")
print(f"邮件结果: {email_task.get(timeout=10)}")
if __name__ == "__main__":
submit_tasks()
使用asyncio实现异步任务队列
import asyncio
import random
import time
class AsyncTaskQueue:
def __init__(self, max_concurrent=5):
self.queue = asyncio.Queue()
self.semaphore = asyncio.Semaphore(max_concurrent)
self.results = []
async def process_task(self, task):
"""异步处理单个任务"""
async with self.semaphore:
print(f"开始处理任务: {task}")
# 模拟异步操作
await asyncio.sleep(random.uniform(0.5, 2))
result = f"任务'{task}'处理完成"
print(f"任务完成: {result}")
return result
async def worker(self, worker_id):
"""工作协程"""
while True:
try:
task = await self.queue.get()
print(f"工作者{worker_id} 拿到任务: {task}")
result = await self.process_task(task)
self.results.append(result)
self.queue.task_done()
except asyncio.CancelledError:
break
async def add_task(self, task):
"""添加任务"""
await self.queue.put(task)
async def run(self, tasks, num_workers=3):
"""运行任务队列"""
# 创建工作者
workers = [
asyncio.create_task(self.worker(i))
for i in range(num_workers)
]
# 添加所有任务
for task in tasks:
await self.add_task(task)
# 等待所有任务完成
await self.queue.join()
# 取消工作者
for worker in workers:
worker.cancel()
return self.results
# 使用示例
async def main():
queue = AsyncTaskQueue(max_concurrent=3)
# 创建任务列表
tasks = [
f"任务-{i}"
for i in range(10)
]
print("开始处理任务队列...")
start_time = time.time()
# 执行所有任务
results = await queue.run(tasks)
elapsed = time.time() - start_time
print(f"\n所有任务完成,耗时: {elapsed:.2f}秒")
print(f"完成的任务数: {len(results)}")
if __name__ == "__main__":
asyncio.run(main())
使用建议
- 小型项目:使用
queue模块或concurrent.futures - 需要持久化:使用Redis实现
- 企业级应用:使用Celery
- 异步场景:使用asyncio
选择哪种方案取决于你的具体需求:任务复杂度、并发量、是否需要持久化等。