Python队列使用实战:从基础应用到高并发场景的完整指南
目录导读
- 队列的本质与Python实现
- queue模块核心类解析
- 实战案例:生产者-消费者模型
- 进阶应用:多线程任务调度
- 队列在爬虫中的经典用法
- 性能优化与常见错误避坑
- 常见问答(Q&A)
队列的本质与Python实现
队列(Queue) 是一种遵循先进先出(FIFO)原则的数据结构,在Python中,标准库queue提供了线程安全的队列实现,而collections.deque则用于高性能的双端队列,两者的核心差异在于:queue.Queue内置了锁机制,适用于多线程环境;而deque速度更快,但需要自行处理线程安全。

代码示例:基础队列操作
from queue import Queue
# 创建容量为3的队列
q = Queue(maxsize=3)
# 入队
q.put("任务A")
q.put("任务B")
q.put("任务C")
# 出队
print(q.get()) # 输出:任务A
print(q.get()) # 输出:任务B
queue模块核心类解析
Python的queue模块提供了三个核心类,分别应对不同场景:
| 类名 | 特性 | 适用场景 |
|---|---|---|
Queue |
标准FIFO队列 | 生产者-消费者模型 |
LifoQueue |
后进先出(栈) | 需要回溯或撤销操作 |
PriorityQueue |
优先级队列 | 任务按紧急程度处理 |
关键方法说明:
put(item, block=True, timeout=None):如果队列满,默认阻塞等待get(block=True, timeout=None):如果队列空,默认阻塞等待task_done():标记任务处理完成,配合join()使用join():阻塞直到所有任务被标记完成
实战案例:生产者-消费者模型
这是队列最经典的用途——解耦数据生产与消费,以下是一个模拟日志处理系统的完整案例:
from queue import Queue
import threading
import time
import random
class LogProducer(threading.Thread):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
for i in range(5):
log = f"日志#{i} 级别: {random.choice(['INFO', 'ERROR', 'WARN'])}"
self.queue.put(log)
print(f"[生产者] 生成: {log}")
time.sleep(random.uniform(0.1, 0.5))
class LogConsumer(threading.Thread):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
while True:
try:
# 阻塞等待,超时后判断结束
log = self.queue.get(timeout=3)
print(f"[消费者] 处理: {log} -> 写入数据库")
self.queue.task_done()
except:
# 超时后退出循环
break
if __name__ == "__main__":
log_queue = Queue()
producer = LogProducer(log_queue)
consumer = LogConsumer(log_queue)
producer.start()
consumer.start()
producer.join()
log_queue.join() # 等待所有任务处理完成
# 消费者线程会因超时而退出
关键设计点:
- 使用
task_done()和join()确保消费者完成所有任务再退出 - 消费者通过
timeout参数优雅退出,避免无限阻塞
进阶应用:多线程任务调度
在需要控制并发数量的场景(如API请求限制),队列可以充当线程池管理器:
from queue import Queue
import threading
import requests
class WorkerPool:
def __init__(self, max_workers=3):
self.task_queue = Queue()
self.workers = [threading.Thread(target=self._worker) for _ in range(max_workers)]
def _worker(self):
while True:
url, callback = self.task_queue.get()
if url is None: # 哨兵值,结束线程
break
try:
resp = requests.get(url)
callback(resp.status_code)
except Exception as e:
callback(e)
finally:
self.task_queue.task_done()
def add_task(self, url, callback):
self.task_queue.put((url, callback))
def start(self):
for w in self.workers:
w.start()
def shutdown(self):
# 发送结束信号
for _ in self.workers:
self.task_queue.put((None, None))
for w in self.workers:
w.join()
# 使用示例
def handle_result(status):
print(f"状态码: {status}")
pool = WorkerPool(max_workers=2)
pool.start()
pool.add_task("https://httpbin.org/get", handle_result)
pool.add_task("https://httpbin.org/status/404", handle_result)
pool.add_task("https://httpbin.org/delay/1", handle_result) # 会被排队
pool.task_queue.join()
pool.shutdown()
处理技巧:
- 使用
None作为哨兵值安全关闭线程 - 回调函数模式保持灵活性,避免全局变量污染
队列在爬虫中的经典用法
爬虫是队列的典型应用场域,特别是需要管理URL去重与请求调度时:
from queue import PriorityQueue
import hashlib
class SmartCrawler:
def __init__(self):
self.url_queue = PriorityQueue()
self.visited = set()
def add_url(self, url, priority=0, depth=0):
"""优先级越低越先处理"""
url_hash = hashlib.md5(url.encode()).hexdigest()
if url_hash not in self.visited:
self.visited.add(url_hash)
self.url_queue.put((priority, depth, url))
def crawl(self):
while not self.url_queue.empty():
priority, depth, url = self.url_queue.get()
# 实际爬取逻辑...
print(f"深度{depth} 爬取: {url}")
# 提取新链接并递归添加
new_links = [f"{url}/page{i}" for i in range(2)]
for link in new_links:
self.add_url(link, priority=depth+1, depth=depth+1)
crawler = SmartCrawler()
crawler.add_url("https://example.com", priority=0)
crawler.crawl()
设计亮点:
- 使用
PriorityQueue实现广度优先遍历(depth低的优先) - 哈希表去重,避免重复请求
性能优化与常见错误避坑
常见错误:
- 忘记调用task_done():导致join()永久阻塞
- 生产者比消费者快太多:队列爆满导致内存溢出,应限制
maxsize - 未处理get()超时异常:在多线程结束时容易导致程序挂起
性能优化建议:
- 批量操作:一次put多个元素可减少锁争用
- 使用deque替代Queue:单线程场景下性能提升10倍以上
- 避免大对象:队列中的对象应轻量化(如使用任务ID而非完整数据)
# 单线程高效队列
from collections import deque
d = deque(maxlen=1000)
d.append("任务1")
d.popleft() # 比Queue.get()快30%
常见问答(Q&A)
Q1: queue.Queue和collections.deque什么区别?
A: Queue是线程安全的,内部有锁;deque不是线程安全但速度更快,多线程必须用Queue,单线程或协程环境优先用deque。
Q2: 如何实现一个超时自动丢弃的队列?
A: 使用queue.Full异常捕获,结合put(block=False)实现:
try:
q.put(item, block=False)
except queue.Full:
print("队列已满,丢弃该任务")
Q3: 队列可以存储不同类型的数据吗?
A: 可以,Python队列不限制元素类型,但实践中建议保持类型一致,避免消费者解析错误。
Q4: 如何清空队列中所有元素?
A: 使用循环while not q.empty(): q.get(),但注意这个操作不是原子性的,多线程环境下需加锁。
Q5: 队列大小无限时会发生什么?
A: 当内存耗尽时程序会因MemoryError崩溃,生产环境必须设置maxsize,或结合监控自动扩容。
延伸阅读: 如果您需要更深入的分布式队列方案(如RabbitMQ集成),可以研究pika库;对于异步框架(如asyncio),推荐使用asyncio.Queue实现协程级别的任务调度。