本文目录导读:

在Python中,信号量(Semaphore)是用于控制多线程/多进程并发访问共享资源数量的同步原语,下面通过几个案例来说明其用法。
基础用法:限制并发线程数
import threading
import time
import random
# 创建一个信号量,最多允许3个线程同时访问
semaphore = threading.Semaphore(3)
def worker(worker_id):
print(f"工作线程 {worker_id} 正在等待信号量...")
semaphore.acquire() # 获取信号量
try:
print(f"工作线程 {worker_id} 获得信号量,开始工作")
time.sleep(random.uniform(1, 3)) # 模拟工作
print(f"工作线程 {worker_id} 完成工作")
finally:
# 释放信号量,让其他线程可以使用
semaphore.release()
# 创建10个工作线程
threads = []
for i in range(10):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
print("所有工作完成")
使用上下文管理器(推荐方式)
import threading
import time
semaphore = threading.Semaphore(2)
def web_scraper(task_id):
# 使用with语句自动管理acquire和release
with semaphore:
print(f"任务 {task_id} 开始爬取...")
time.sleep(1)
print(f"任务 {task_id} 爬取完成")
# 启动多个爬虫任务
tasks = []
for i in range(6):
t = threading.Thread(target=web_scraper, args=(i,))
tasks.append(t)
t.start()
for t in tasks:
t.join()
生产者-消费者模式
import threading
import time
import random
# 限制缓冲区最大容量
buffer = []
buffer_semaphore = threading.Semaphore(5) # 最多存放5个产品
mutex = threading.Lock() # 互斥锁保护缓冲区
def producer():
for i in range(10):
product = f"产品-{i}"
# 尝试放入缓冲区
buffer_semaphore.acquire() # 信号量减1
with mutex:
buffer.append(product)
print(f"生产者: 生产 {product}, 缓冲区大小: {len(buffer)}")
time.sleep(random.random())
def consumer():
for i in range(10):
# 从缓冲区取出产品
buffer_semaphore.release() # 信号量加1
with mutex:
if buffer:
product = buffer.pop(0)
print(f"消费者: 消费 {product}, 缓冲区大小: {len(buffer)}")
time.sleep(random.random())
# 创建生产者和消费者线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
数据库连接池示例
import threading
import time
from queue import Queue
class DatabaseConnectionPool:
def __init__(self, max_connections=5):
self.max_connections = max_connections
self.semaphore = threading.Semaphore(max_connections)
self.connections = Queue(maxsize=max_connections)
# 初始化连接池
for i in range(max_connections):
self.connections.put(f"连接-{i}")
def get_connection(self):
"""获取数据库连接"""
self.semaphore.acquire() # 信号量减1
return self.connections.get()
def return_connection(self, conn):
"""归还数据库连接"""
self.connections.put(conn)
self.semaphore.release() # 信号量加1
# 使用连接池
pool = DatabaseConnectionPool(max_connections=3)
def query_database(query_id):
conn = pool.get_connection()
try:
print(f"查询 {query_id}: 使用 {conn}")
time.sleep(1)
print(f"查询 {query_id}: 完成")
finally:
pool.return_connection(conn)
# 模拟多个数据库查询
threads = []
for i in range(8):
t = threading.Thread(target=query_database, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
BoundedSemaphore(有界信号量)
import threading
import time
# BoundedSemaphore不会超过初始值
bounded_sem = threading.BoundedSemaphore(2)
def task(name):
bounded_sem.acquire()
print(f"{name} 获得资源")
time.sleep(0.5)
bounded_sem.release()
print(f"{name} 释放资源")
# 测试
t1 = threading.Thread(target=task, args=("Task-1",))
t2 = threading.Thread(target=task, args=("Task-2",))
t3 = threading.Thread(target=task, args=("Task-3",))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
print("所有任务完成")
# 如果释放太多次会引发ValueError
try:
bounded_sem.release() # 这会引发错误
bounded_sem.release() # 超过初始值2
bounded_sem.release() # 这里会抛出异常
except ValueError as e:
print(f"错误: {e}")
- 信号量创建:
threading.Semaphore(n)创建初始值为n的信号量 - 获取信号量:
acquire()或with semaphore: - 释放信号量:
release()或with语句自动释放 - BoundedSemaphore:确保不会超过初始值,防止编程错误
- 适用场景:限制并发访问数、连接池、资源池等
信号量适合用于限流场景,确保同时访问资源的线程/进程数不超过预设值。