本文目录导读:

在Python中避免并发冲突,主要需要理解竞争条件(Race Condition)和临界区(Critical Section)这两个概念,下面我会从不同层次给出常用的解决方案。
多线程场景(threading)
Python的GIL虽然保证了单个字节码指令的原子性,但复合操作(如count += 1)仍可能冲突。
使用Lock(互斥锁)
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
with lock: # 自动获取和释放锁
counter += 1
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # 正确输出 1,000,000
使用RLock(可重入锁)
适合同一个线程需要多次获取锁的情况(如递归函数)。
使用Semaphore(信号量)
控制同时访问某资源的线程数量:
semaphore = threading.Semaphore(3) # 最多3个线程同时访问
多进程场景(multiprocessing)
由于进程间不共享内存(默认),需要使用跨进程同步机制。
Manager + Lock
from multiprocessing import Process, Manager, Lock
def worker(d, lock):
with lock:
d['count'] = d.get('count', 0) + 1
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
lock = Lock()
processes = [Process(target=worker, args=(d, lock)) for _ in range(10)]
for p in processes: p.start()
for p in processes: p.join()
print(d['count']) # 10
共享内存(Value/Array)+ Lock
from multiprocessing import Process, Value, Lock
def increment(val, lock):
for _ in range(100):
with lock:
val.value += 1
异步编程(asyncio)
异步任务在单线程内协作执行,不存在真正的并行,但仍有共享状态问题。
使用asyncio.Lock
import asyncio
async def update(counter, lock):
async with lock:
# 执行非原子操作
temp = counter.value + 1
await asyncio.sleep(0.01) # 模拟耗时
counter.value = temp
无锁并发技巧
不可变数据结构
避免修改共享状态,而是生成新的数据:
# 线程安全版本
class SafeCounter:
def __init__(self):
self._lock = threading.Lock()
self._count = 0
def increment(self):
with self._lock:
# 返回新的不可变值
return self._count + 1
使用队列进行生产者-消费者
from queue import Queue
q = Queue() # 线程安全
# 生产者
def producer():
for i in range(10):
q.put(i)
# 消费者
def consumer():
while True:
item = q.get()
# 处理item
q.task_done()
原子操作(特定场景)
某些场景可以用threading.local()或concurrent.futures.ThreadPoolExecutor的map(对每个任务独立)。
数据库/文件并发
文件锁
import fcntl
with open('data.txt', 'a') as f:
fcntl.flock(f, fcntl.LOCK_EX) # 排他锁
f.write('data\n')
# 自动释放锁
数据库事务
使用ORM的乐观锁或悲观锁:
# 悲观锁示例 (SQL) SELECT * FROM accounts WHERE id=1 FOR UPDATE; # 乐观锁 (版本号) UPDATE accounts SET balance=balance-100, version=version+1 WHERE id=1 AND version=old_version;
常见陷阱与最佳实践
陷阱1: 忘记释放锁
# 错误写法
lock.acquire()
if condition:
return # 忘记释放锁!会导致死锁
lock.release()
# 正确写法
with lock: # 自动处理
if condition:
return
陷阱2: 锁粒度太大
过多的锁会降低并发性能,考虑:
- 使用读写锁(
threading.RLock或第三方ReaderWriterLock) - 分片锁(Striped Lock)
陷阱3: 死锁
避免嵌套锁或使用锁排序策略:
# 保证所有线程按相同顺序获取锁
lock1 = threading.Lock()
lock2 = threading.Lock()
def safe_operation():
with lock1:
with lock2: # 始终先lock1再lock2
pass
实际案例对比
# 冲突版
counter = 0
def bad_increment():
global counter
for _ in range(1000000):
counter += 1 # 非原子操作
# 安全版
safe_counter = 0
lock = threading.Lock()
def safe_increment():
global safe_counter
for _ in range(1000000):
with lock:
safe_counter += 1
选择哪种方法取决于你的场景:
- 多线程 →
Lock/RLock/Semaphore - 多进程 →
Manager/Value+Lock - 异步 →
asyncio.Lock - 高吞吐无阻塞 → 队列/不可变对象/原子操作
最重要的是:明确临界区、最小化锁范围、避免死锁。