Python案例怎么实现任务队列?

wen python案例 56

本文目录导读:

Python案例怎么实现任务队列?

  1. 使用内置queue模块实现简单任务队列
  2. 使用concurrent.futures实现线程池任务队列
  3. 使用Redis实现分布式任务队列
  4. 使用Celery实现专业任务队列
  5. 使用asyncio实现异步任务队列
  6. 使用建议

我来介绍几种Python实现任务队列的方法,从简单到复杂:

使用内置queue模块实现简单任务队列

import queue
import threading
import time
import random
# 创建任务队列
task_queue = queue.Queue()
# 生产者:生成任务
def producer():
    task_id = 0
    while True:
        task_id += 1
        task_data = {
            'id': task_id,
            'type': random.choice(['计算', '下载', '处理']),
            'data': random.randint(1, 100)
        }
        task_queue.put(task_data)
        print(f"生产任务: {task_data}")
        time.sleep(random.uniform(0.5, 2))
# 消费者:执行任务
def consumer(worker_id):
    while True:
        task = task_queue.get()
        print(f"工作者{worker_id} 开始处理任务: {task}")
        # 模拟任务处理
        processing_time = random.uniform(1, 3)
        time.sleep(processing_time)
        # 任务处理结果
        result = {
            'task_id': task['id'],
            'result': f"{task['type']}完成",
            'worker': worker_id,
            'duration': processing_time
        }
        print(f"任务完成: {result}")
        task_queue.task_done()
# 启动系统
print("启动任务队列系统...")
# 启动生产者线程
producer_thread = threading.Thread(target=producer, daemon=True)
producer_thread.start()
# 启动多个消费者线程
workers = []
for i in range(3):
    worker_thread = threading.Thread(target=consumer, args=(i+1,), daemon=True)
    worker_thread.start()
    workers.append(worker_thread)
# 主线程等待(示例运行10秒)
try:
    time.sleep(10)
except KeyboardInterrupt:
    print("系统停止")

使用concurrent.futures实现线程池任务队列

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
def process_task(task_id, task_type, data):
    """处理单个任务"""
    print(f"开始处理任务 {task_id}: {task_type}")
    time.sleep(random.uniform(1, 3))
    result = f"任务{task_id}({task_type})处理完成,数据: {data}"
    return task_id, result
class TaskQueue:
    def __init__(self, max_workers=4):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.tasks = []
    def add_task(self, task_type, data):
        """添加任务到队列"""
        task_id = len(self.tasks) + 1
        future = self.executor.submit(process_task, task_id, task_type, data)
        self.tasks.append(future)
        print(f"添加任务: #{task_id} - {task_type}")
        return future
    def wait_for_completion(self):
        """等待所有任务完成"""
        print("等待所有任务完成...")
        completed = 0
        for future in as_completed(self.tasks):
            task_id, result = future.result()
            completed += 1
            print(f"任务 #{task_id} 完成: {result}")
        print(f"所有{completed}个任务处理完成")
# 使用示例
queue = TaskQueue(max_workers=3)
# 添加不同类型任务
queue.add_task("数据处理", "用户数据分析")
queue.add_task("文件下载", "https://example.com/file.zip")
queue.add_task("图片处理", "image.jpg")
queue.add_task("邮件发送", "user@example.com")
queue.add_task("日志分析", "server.log")
# 等待所有任务完成
queue.wait_for_completion()

使用Redis实现分布式任务队列

import redis
import json
import time
import uuid
class RedisTaskQueue:
    def __init__(self, queue_name='task_queue'):
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
        self.queue_name = queue_name
    def add_task(self, task_type, data, priority=5):
        """添加任务到Redis队列"""
        task = {
            'id': str(uuid.uuid4()),
            'type': task_type,
            'data': data,
            'priority': priority,
            'created_at': time.time()
        }
        # 使用有序集合实现优先级队列
        self.redis_client.zadd(self.queue_name, {json.dumps(task): -priority})
        print(f"添加任务: {task['id']} ({task_type})")
        return task['id']
    def get_task(self, timeout=0):
        """获取并执行下一个任务"""
        while True:
            # 获取优先级最高的任务
            tasks = self.redis_client.zrange(self.queue_name, 0, 0, withscores=True)
            if tasks:
                task_json, score = tasks[0]
                task = json.loads(task_json)
                # 从队列中移除
                self.redis_client.zrem(self.queue_name, task_json)
                print(f"执行任务: {task['id']}")
                # 处理任务
                if task['type'] == 'email':
                    self._send_email(task['data'])
                elif task['type'] == 'report':
                    self._generate_report(task['data'])
                elif task['type'] == 'backup':
                    self._backup_data(task['data'])
                return task
            if timeout:
                time.sleep(0.1)
            else:
                return None
    def _send_email(self, data):
        """模拟发送邮件"""
        print(f"📧 发送邮件到: {data}")
        time.sleep(2)
    def _generate_report(self, data):
        """模拟生成报告"""
        print(f"📊 生成报告: {data}")
        time.sleep(3)
    def _backup_data(self, data):
        """模拟备份数据"""
        print(f"💾 备份数据: {data}")
        time.sleep(4)
    def get_queue_length(self):
        """获取队列长度"""
        return self.redis_client.zcard(self.queue_name)
# 使用示例
def main():
    # 创建任务队列
    task_queue = RedisTaskQueue()
    # 添加各种任务
    task_queue.add_task('email', {'to': 'user@example.com', 'subject': 'Hello'})
    task_queue.add_task('report', 'monthly_sales_report')
    task_queue.add_task('backup', '/var/www/html')
    # 处理任务
    print(f"队列中有 {task_queue.get_queue_length()} 个任务")
    task_queue.get_task()  # 处理一个任务
    task_queue.get_task()  # 处理下一个任务
    print(f"剩余 {task_queue.get_queue_length()} 个任务")
if __name__ == "__main__":
    main()

使用Celery实现专业任务队列

# 首先安装: pip install celery redis
# tasks.py
from celery import Celery
import time
# 创建Celery应用
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def send_email(to, subject, body):
    """发送邮件任务"""
    print(f"发送邮件到 {to}")
    print(f"主题: {subject}")
    time.sleep(3)  # 模拟发送过程
    print("邮件发送完成")
    return f"邮件已发送到 {to}"
@app.task
def process_image(image_path, filters=None):
    """图片处理任务"""
    print(f"处理图片: {image_path}")
    if filters:
        print(f"应用滤镜: {filters}")
    time.sleep(5)
    print("图片处理完成")
    return f"图片已处理: {image_path}"
@app.task
def generate_report(report_type, data):
    """生成报告任务"""
    print(f"生成{report_type}报告")
    time.sleep(4)
    print("报告生成完成")
    return f"{report_type}报告已生成"
# 调用端代码
def submit_tasks():
    # 异步提交任务
    email_task = send_email.delay(
        'user@example.com',
        '欢迎注册',
        '感谢您注册我们的服务'
    )
    image_task = process_image.delay(
        '/path/to/image.jpg',
        filters=['grayscale', 'blur']
    )
    report_task = generate_report.delay(
        '月度销售',
        {'month': '2024-01', 'sales': 100000}
    )
    print(f"邮件任务ID: {email_task.id}")
    print(f"图片任务ID: {image_task.id}")
    print(f"报告任务ID: {report_task.id}")
    # 获取任务结果
    print(f"邮件状态: {email_task.status}")
    print(f"邮件结果: {email_task.get(timeout=10)}")
if __name__ == "__main__":
    submit_tasks()

使用asyncio实现异步任务队列

import asyncio
import random
import time
class AsyncTaskQueue:
    def __init__(self, max_concurrent=5):
        self.queue = asyncio.Queue()
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.results = []
    async def process_task(self, task):
        """异步处理单个任务"""
        async with self.semaphore:
            print(f"开始处理任务: {task}")
            # 模拟异步操作
            await asyncio.sleep(random.uniform(0.5, 2))
            result = f"任务'{task}'处理完成"
            print(f"任务完成: {result}")
            return result
    async def worker(self, worker_id):
        """工作协程"""
        while True:
            try:
                task = await self.queue.get()
                print(f"工作者{worker_id} 拿到任务: {task}")
                result = await self.process_task(task)
                self.results.append(result)
                self.queue.task_done()
            except asyncio.CancelledError:
                break
    async def add_task(self, task):
        """添加任务"""
        await self.queue.put(task)
    async def run(self, tasks, num_workers=3):
        """运行任务队列"""
        # 创建工作者
        workers = [
            asyncio.create_task(self.worker(i))
            for i in range(num_workers)
        ]
        # 添加所有任务
        for task in tasks:
            await self.add_task(task)
        # 等待所有任务完成
        await self.queue.join()
        # 取消工作者
        for worker in workers:
            worker.cancel()
        return self.results
# 使用示例
async def main():
    queue = AsyncTaskQueue(max_concurrent=3)
    # 创建任务列表
    tasks = [
        f"任务-{i}" 
        for i in range(10)
    ]
    print("开始处理任务队列...")
    start_time = time.time()
    # 执行所有任务
    results = await queue.run(tasks)
    elapsed = time.time() - start_time
    print(f"\n所有任务完成,耗时: {elapsed:.2f}秒")
    print(f"完成的任务数: {len(results)}")
if __name__ == "__main__":
    asyncio.run(main())

使用建议

  1. 小型项目:使用queue模块或concurrent.futures
  2. 需要持久化:使用Redis实现
  3. 企业级应用:使用Celery
  4. 异步场景:使用asyncio

选择哪种方案取决于你的具体需求:任务复杂度、并发量、是否需要持久化等。

抱歉,评论功能暂时关闭!