Python案例中的队列如何使用?

wen python案例 2

Python队列使用实战:从基础应用到高并发场景的完整指南

目录导读

  1. 队列的本质与Python实现
  2. queue模块核心类解析
  3. 实战案例:生产者-消费者模型
  4. 进阶应用:多线程任务调度
  5. 队列在爬虫中的经典用法
  6. 性能优化与常见错误避坑
  7. 常见问答(Q&A)

队列的本质与Python实现

队列(Queue) 是一种遵循先进先出(FIFO)原则的数据结构,在Python中,标准库queue提供了线程安全的队列实现,而collections.deque则用于高性能的双端队列,两者的核心差异在于:queue.Queue内置了锁机制,适用于多线程环境;而deque速度更快,但需要自行处理线程安全。

Python案例中的队列如何使用?

代码示例:基础队列操作

from queue import Queue
# 创建容量为3的队列
q = Queue(maxsize=3)
# 入队
q.put("任务A")
q.put("任务B")
q.put("任务C")
# 出队
print(q.get())  # 输出:任务A
print(q.get())  # 输出:任务B

queue模块核心类解析

Python的queue模块提供了三个核心类,分别应对不同场景:

类名 特性 适用场景
Queue 标准FIFO队列 生产者-消费者模型
LifoQueue 后进先出(栈) 需要回溯或撤销操作
PriorityQueue 优先级队列 任务按紧急程度处理

关键方法说明:

  • put(item, block=True, timeout=None):如果队列满,默认阻塞等待
  • get(block=True, timeout=None):如果队列空,默认阻塞等待
  • task_done():标记任务处理完成,配合join()使用
  • join():阻塞直到所有任务被标记完成

实战案例:生产者-消费者模型

这是队列最经典的用途——解耦数据生产与消费,以下是一个模拟日志处理系统的完整案例:

from queue import Queue
import threading
import time
import random
class LogProducer(threading.Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
    def run(self):
        for i in range(5):
            log = f"日志#{i} 级别: {random.choice(['INFO', 'ERROR', 'WARN'])}"
            self.queue.put(log)
            print(f"[生产者] 生成: {log}")
            time.sleep(random.uniform(0.1, 0.5))
class LogConsumer(threading.Thread):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
    def run(self):
        while True:
            try:
                # 阻塞等待,超时后判断结束
                log = self.queue.get(timeout=3)
                print(f"[消费者] 处理: {log} -> 写入数据库")
                self.queue.task_done()
            except:
                # 超时后退出循环
                break
if __name__ == "__main__":
    log_queue = Queue()
    producer = LogProducer(log_queue)
    consumer = LogConsumer(log_queue)
    producer.start()
    consumer.start()
    producer.join()
    log_queue.join()  # 等待所有任务处理完成
    # 消费者线程会因超时而退出

关键设计点:

  • 使用task_done()join()确保消费者完成所有任务再退出
  • 消费者通过timeout参数优雅退出,避免无限阻塞

进阶应用:多线程任务调度

在需要控制并发数量的场景(如API请求限制),队列可以充当线程池管理器:

from queue import Queue
import threading
import requests
class WorkerPool:
    def __init__(self, max_workers=3):
        self.task_queue = Queue()
        self.workers = [threading.Thread(target=self._worker) for _ in range(max_workers)]
    def _worker(self):
        while True:
            url, callback = self.task_queue.get()
            if url is None:  # 哨兵值,结束线程
                break
            try:
                resp = requests.get(url)
                callback(resp.status_code)
            except Exception as e:
                callback(e)
            finally:
                self.task_queue.task_done()
    def add_task(self, url, callback):
        self.task_queue.put((url, callback))
    def start(self):
        for w in self.workers:
            w.start()
    def shutdown(self):
        # 发送结束信号
        for _ in self.workers:
            self.task_queue.put((None, None))
        for w in self.workers:
            w.join()
# 使用示例
def handle_result(status):
    print(f"状态码: {status}")
pool = WorkerPool(max_workers=2)
pool.start()
pool.add_task("https://httpbin.org/get", handle_result)
pool.add_task("https://httpbin.org/status/404", handle_result)
pool.add_task("https://httpbin.org/delay/1", handle_result)  # 会被排队
pool.task_queue.join()
pool.shutdown()

处理技巧:

  • 使用None作为哨兵值安全关闭线程
  • 回调函数模式保持灵活性,避免全局变量污染

队列在爬虫中的经典用法

爬虫是队列的典型应用场域,特别是需要管理URL去重与请求调度时:

from queue import PriorityQueue
import hashlib
class SmartCrawler:
    def __init__(self):
        self.url_queue = PriorityQueue()
        self.visited = set()
    def add_url(self, url, priority=0, depth=0):
        """优先级越低越先处理"""
        url_hash = hashlib.md5(url.encode()).hexdigest()
        if url_hash not in self.visited:
            self.visited.add(url_hash)
            self.url_queue.put((priority, depth, url))
    def crawl(self):
        while not self.url_queue.empty():
            priority, depth, url = self.url_queue.get()
            # 实际爬取逻辑...
            print(f"深度{depth} 爬取: {url}")
            # 提取新链接并递归添加
            new_links = [f"{url}/page{i}" for i in range(2)]
            for link in new_links:
                self.add_url(link, priority=depth+1, depth=depth+1)
crawler = SmartCrawler()
crawler.add_url("https://example.com", priority=0)
crawler.crawl()

设计亮点:

  • 使用PriorityQueue实现广度优先遍历(depth低的优先)
  • 哈希表去重,避免重复请求

性能优化与常见错误避坑

常见错误:
  1. 忘记调用task_done():导致join()永久阻塞
  2. 生产者比消费者快太多:队列爆满导致内存溢出,应限制maxsize
  3. 未处理get()超时异常:在多线程结束时容易导致程序挂起
性能优化建议:
  • 批量操作:一次put多个元素可减少锁争用
  • 使用deque替代Queue:单线程场景下性能提升10倍以上
  • 避免大对象:队列中的对象应轻量化(如使用任务ID而非完整数据)
# 单线程高效队列
from collections import deque
d = deque(maxlen=1000)
d.append("任务1")
d.popleft()  # 比Queue.get()快30%

常见问答(Q&A)

Q1: queue.Queue和collections.deque什么区别?
A: Queue是线程安全的,内部有锁;deque不是线程安全但速度更快,多线程必须用Queue,单线程或协程环境优先用deque。

Q2: 如何实现一个超时自动丢弃的队列?
A: 使用queue.Full异常捕获,结合put(block=False)实现:

try:
    q.put(item, block=False)
except queue.Full:
    print("队列已满,丢弃该任务")

Q3: 队列可以存储不同类型的数据吗?
A: 可以,Python队列不限制元素类型,但实践中建议保持类型一致,避免消费者解析错误。

Q4: 如何清空队列中所有元素?
A: 使用循环while not q.empty(): q.get(),但注意这个操作不是原子性的,多线程环境下需加锁。

Q5: 队列大小无限时会发生什么?
A: 当内存耗尽时程序会因MemoryError崩溃,生产环境必须设置maxsize,或结合监控自动扩容。


延伸阅读: 如果您需要更深入的分布式队列方案(如RabbitMQ集成),可以研究pika库;对于异步框架(如asyncio),推荐使用asyncio.Queue实现协程级别的任务调度。

抱歉,评论功能暂时关闭!