本文目录导读:

- 案例场景:银行账户转账
- 解决方案一:使用锁(Lock)
- 解决方案二:使用可重入锁(RLock)
- 解决方案三:使用条件变量(Condition)
- 解决方案四:使用信号量(Semaphore)
- 解决方案五:使用事件(Event)
- 解决方案六:使用队列(Queue)
线程冲突(也称为竞态条件)是多线程编程中最常见的问题之一,在Python中,由于全局解释器锁(GIL)的存在,某些场景下的线程冲突有所缓解,但当多个线程同时访问共享资源(如全局变量、文件、数据库连接等)时,冲突仍然会发生。
以下是Python中解决线程冲突的几种典型方法,通过具体案例来讲解。
案例场景:银行账户转账
假设我们有一个银行账户类,多个线程同时进行存款和取款操作,如果不加控制,会导致账户余额不正确。
问题代码(存在线程冲突)
import threading
import time
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
def deposit(self, amount):
"""存款"""
# 模拟读取余额、计算、写入的过程
current_balance = self.balance
time.sleep(0.1) # 模拟耗时操作,增加冲突概率
new_balance = current_balance + amount
self.balance = new_balance
print(f"存款 {amount},余额:{self.balance}")
def withdraw(self, amount):
"""取款"""
if self.balance >= amount:
current_balance = self.balance
time.sleep(0.1)
new_balance = current_balance - amount
self.balance = new_balance
print(f"取款 {amount},余额:{self.balance}")
else:
print(f"余额不足,无法取款 {amount},当前余额:{self.balance}")
# 创建账户
account = BankAccount(1000)
# 创建多个线程同时操作
threads = []
for _ in range(5):
t = threading.Thread(target=account.deposit, args=(200,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终余额:{account.balance}") # 预期:2000,实际往往小于2000
问题分析:多个线程几乎同时读取self.balance,都读取到相同的值(如1000),然后各自计算新余额(1200),最终写入的值可能被覆盖,导致金额丢失。
解决方案一:使用锁(Lock)
锁是最基本的同步机制,确保同一时间只有一个线程能访问共享资源。
import threading
import time
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
self.lock = threading.Lock() # 创建锁对象
def deposit(self, amount):
with self.lock: # 获取锁,离开with块时自动释放
current_balance = self.balance
time.sleep(0.1)
new_balance = current_balance + amount
self.balance = new_balance
print(f"存款 {amount},余额:{self.balance}")
def withdraw(self, amount):
with self.lock:
if self.balance >= amount:
current_balance = self.balance
time.sleep(0.1)
new_balance = current_balance - amount
self.balance = new_balance
print(f"取款 {amount},余额:{self.balance}")
else:
print(f"余额不足,无法取款 {amount},当前余额:{self.balance}")
# 测试
account = BankAccount(1000)
threads = []
# 5个存款线程
for _ in range(5):
t = threading.Thread(target=account.deposit, args=(200,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终余额:{account.balance}") # 正确输出:2000
优点:简单有效,最常用的同步方式。 缺点:可能导致死锁(如果锁使用不当),降低并发性能。
解决方案二:使用可重入锁(RLock)
如果同一个线程需要多次获取锁(例如在递归调用中),普通锁会导致死锁,这时需要使用RLock。
import threading
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
self.lock = threading.RLock() # 可重入锁
def deposit(self, amount):
with self.lock:
print(f"开始存款 {amount}")
self._update_balance(amount) # 内部方法也会尝试获取锁
def _update_balance(self, amount):
with self.lock: # 同一个线程可以多次获取RLock
self.balance += amount
print(f"余额更新为:{self.balance}")
def transfer(self, amount, to_account):
"""转账方法,涉及两个账户,需要注意避免死锁"""
# 按固定顺序获取锁,避免死锁
first = self if id(self) < id(to_account) else to_account
second = to_account if first == self else self
with first.lock:
with second.lock:
if self.balance >= amount:
self.balance -= amount
to_account.balance += amount
print(f"转账 {amount} 成功")
else:
print("余额不足")
# 测试
account1 = BankAccount(1000)
account2 = BankAccount(500)
# 转账
t1 = threading.Thread(target=account1.transfer, args=(200, account2))
t2 = threading.Thread(target=account2.transfer, args=(100, account1))
t1.start()
t2.start()
t1.join()
t2.join()
print(f"账户1余额:{account1.balance}") # 正确:1100
print(f"账户2余额:{account2.balance}") # 正确:400
解决方案三:使用条件变量(Condition)
当线程需要等待某个条件满足时(如生产者-消费者模式),使用Condition可以更高效。
import threading
import time
import random
class SharedQueue:
def __init__(self, max_size=5):
self.queue = []
self.max_size = max_size
self.condition = threading.Condition()
def produce(self, item):
with self.condition:
# 当队列满时等待
while len(self.queue) >= self.max_size:
print(f"队列已满,生产者等待...")
self.condition.wait()
self.queue.append(item)
print(f"生产 {item},队列大小:{len(self.queue)}")
# 通知消费者可以消费了
self.condition.notify()
def consume(self):
with self.condition:
# 当队列为空时等待
while len(self.queue) == 0:
print(f"队列为空,消费者等待...")
self.condition.wait()
item = self.queue.pop(0)
print(f"消费 {item},队列大小:{len(self.queue)}")
# 通知生产者可以生产了
self.condition.notify()
return item
# 测试生产者-消费者
queue = SharedQueue(3)
def producer():
for i in range(10):
queue.produce(i)
time.sleep(random.uniform(0.1, 0.5))
def consumer():
for _ in range(10):
item = queue.consume()
time.sleep(random.uniform(0.2, 0.6))
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
解决方案四:使用信号量(Semaphore)
信号量用于控制同时访问资源的线程数量,例如限制同时连接数据库的线程数。
import threading
import time
import random
# 数据库连接池模拟
class DatabaseConnectionPool:
def __init__(self, max_connections=3):
self.max_connections = max_connections
self.semaphore = threading.Semaphore(max_connections)
self.current_connections = 0
self.lock = threading.Lock()
def use_connection(self, thread_id):
with self.semaphore: # 获取信号量,如果已满则阻塞
with self.lock:
self.current_connections += 1
print(f"线程 {thread_id} 获取连接,当前连接数:{self.current_connections}")
# 模拟使用数据库
time.sleep(random.uniform(0.5, 1.5))
with self.lock:
self.current_connections -= 1
print(f"线程 {thread_id} 释放连接,当前连接数:{self.current_connections}")
# 测试
pool = DatabaseConnectionPool(3)
threads = []
for i in range(10):
t = threading.Thread(target=pool.use_connection, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
print("所有线程执行完毕")
解决方案五:使用事件(Event)
事件用于线程之间的简单通信,一个线程等待另一个线程的信号。
import threading
import time
def worker(event, thread_id):
print(f"线程 {thread_id} 等待事件...")
event.wait() # 等待事件被设置
print(f"线程 {thread_id} 收到事件,开始执行")
time.sleep(0.5)
print(f"线程 {thread_id} 执行完毕")
# 主线程
event = threading.Event()
threads = []
# 启动多个工作线程
for i in range(3):
t = threading.Thread(target=worker, args=(event, i))
threads.append(t)
t.start()
print("主线程准备就绪,2秒后触发事件...")
time.sleep(2)
print("主线程触发事件!")
event.set() # 设置事件,所有等待的线程将被唤醒
for t in threads:
t.join()
print("所有线程完成")
解决方案六:使用队列(Queue)
queue.Queue是线程安全的,内部已经实现了锁机制,是解决生产者-消费者问题的最佳选择。
import threading
import time
import random
from queue import Queue
def producer(queue):
for i in range(5):
item = f"产品-{i}"
queue.put(item)
print(f"生产者放入 {item}")
time.sleep(random.uniform(0.1, 0.3))
def consumer(queue, name):
while True:
item = queue.get()
if item is None: # 结束信号
break
print(f"消费者 {name} 取出 {item}")
time.sleep(random.uniform(0.2, 0.5))
queue.task_done()
# 测试
q = Queue(maxsize=3)
# 启动生产者
producer_thread = threading.Thread(target=producer, args=(q,))
# 启动多个消费者
consumers = []
for i in range(2):
t = threading.Thread(target=consumer, args=(q, f"消费者-{i}"))
consumers.append(t)
t.start()
producer_thread.start()
producer_thread.join()
# 等待所有任务完成
q.join()
# 发送结束信号给消费者
for _ in consumers:
q.put(None)
for t in consumers:
t.join()
| 同步机制 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| Lock | 保护共享资源,简单互斥 | 简单易用 | 可能死锁 |
| RLock | 同一线程需要多次获取锁 | 避免自死锁 | 性能略低 |
| Condition | 生产者-消费者模式 | 精确控制线程等待条件 | 使用复杂 |
| Semaphore | 控制并发访问数量 | 限制资源使用 | 不适用所有场景 |
| Event | 线程间简单信号通信 | 实现简单 | 功能有限 |
| Queue | 生产者-消费者、任务分发 | 线程安全,功能完善 | 需要额外设计 |
通用建议:
- 优先使用
queue.Queue:对于生产者-消费者模式,它最安全、最简单。 - 使用
with语句管理锁:避免忘记释放锁。 - 最小化临界区:只在必要的地方加锁,减少并发性能开销。
- 考虑使用
threading.local():如果数据是线程私有的,避免使用共享资源。
选择哪种方案取决于你的具体场景,但记住:避免共享状态往往比共享状态加锁更简单、更安全。