本文目录导读:

我来详细介绍Python中实现并发锁控制的几种常见方式:
threading.Lock - 线程锁
基础用法
import threading
import time
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
self.lock = threading.Lock()
def deposit(self, amount):
# 获取锁
self.lock.acquire()
try:
# 模拟耗时操作
current = self.balance
time.sleep(0.1) # 模拟IO操作
self.balance = current + amount
print(f"存款{amount}元,余额:{self.balance}")
finally:
# 释放锁
self.lock.release()
def withdraw(self, amount):
# 更推荐的写法:使用with语句自动管理锁
with self.lock:
if self.balance >= amount:
current = self.balance
time.sleep(0.1)
self.balance = current - amount
print(f"取款{amount}元,余额:{self.balance}")
else:
print(f"余额不足!当前余额:{self.balance}")
# 测试
account = BankAccount(1000)
# 创建多个线程同时操作
threads = []
for i in range(5):
t1 = threading.Thread(target=account.deposit, args=(100,))
t2 = threading.Thread(target=account.withdraw, args=(50,))
threads.extend([t1, t2])
for t in threads:
t.start()
for t in threads:
t.join()
print(f"最终余额:{account.balance}")
RLock - 可重入锁
import threading
class Counter:
def __init__(self):
self.count = 0
self.lock = threading.RLock() # 可重入锁
def increment(self):
with self.lock:
self.count += 1
# 可以在同一线程中再次获取锁
self._do_something()
def _do_something(self):
with self.lock: # 同一线程可以重复获得RLock
print(f"当前计数:{self.count}")
def get_count(self):
with self.lock:
return self.count
# 测试
counter = Counter()
threads = []
for i in range(10):
t = threading.Thread(target=counter.increment)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"最终计数:{counter.get_count()}")
信号量 Semaphore
import threading
import time
import random
class ConnectionPool:
def __init__(self, max_connections=3):
self.semaphore = threading.Semaphore(max_connections)
self.connections = []
self.lock = threading.Lock()
def get_connection(self, thread_name):
print(f"{thread_name} 等待获取连接...")
with self.semaphore:
print(f"{thread_name} 获取到连接")
time.sleep(random.uniform(1, 3)) # 模拟使用连接
print(f"{thread_name} 释放连接")
def worker(pool, name):
pool.get_connection(name)
# 测试
pool = ConnectionPool(3)
threads = []
for i in range(6):
t = threading.Thread(target=worker, args=(pool, f"线程-{i}"))
threads.append(t)
t.start()
for t in threads:
t.join()
Condition - 条件变量
import threading
import time
import random
class ProducerConsumer:
def __init__(self, max_size=5):
self.buffer = []
self.max_size = max_size
self.lock = threading.Lock()
self.not_full = threading.Condition(self.lock)
self.not_empty = threading.Condition(self.lock)
def produce(self, item, producer_id):
with self.not_full:
while len(self.buffer) >= self.max_size:
print(f"生产者{producer_id}:缓冲区已满,等待...")
self.not_full.wait()
self.buffer.append(item)
print(f"生产者{producer_id}:生产了{item},缓冲区大小:{len(self.buffer)}")
self.not_empty.notify()
def consume(self, consumer_id):
with self.not_empty:
while len(self.buffer) == 0:
print(f"消费者{consumer_id}:缓冲区为空,等待...")
self.not_empty.wait()
item = self.buffer.pop(0)
print(f"消费者{consumer_id}:消费了{item},缓冲区大小:{len(self.buffer)}")
self.not_full.notify()
return item
def producer_task(pc, producer_id):
for i in range(5):
time.sleep(random.uniform(0.5, 2))
pc.produce(f"产品-{producer_id}-{i}", producer_id)
def consumer_task(pc, consumer_id):
for i in range(5):
time.sleep(random.uniform(1, 3))
pc.consume(consumer_id)
# 测试
pc = ProducerConsumer()
threads = []
# 创建2个生产者
for i in range(2):
t = threading.Thread(target=producer_task, args=(pc, i))
threads.append(t)
# 创建2个消费者
for i in range(2):
t = threading.Thread(target=consumer_task, args=(pc, i))
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
进程锁 - multiprocessing.Lock
from multiprocessing import Process, Lock, Value
import time
class SharedCounter:
def __init__(self):
self.count = Value('i', 0)
self.lock = Lock()
def increment(self, n):
for _ in range(n):
with self.lock:
self.count.value += 1
time.sleep(0.01) # 模拟工作
def worker(counter, n, worker_id):
print(f"进程{worker_id}开始工作")
counter.increment(n)
print(f"进程{worker_id}完成工作")
# 测试
if __name__ == '__main__':
counter = SharedCounter()
processes = []
for i in range(4):
p = Process(target=worker, args=(counter, 100, i))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"最终计数:{counter.count.value}")
对比总结
| 锁类型 | 适用场景 | 特点 |
|---|---|---|
| Lock | 基本的线程同步 | 简单,不可重入 |
| RLock | 同一线程多次获取锁 | 可重入,避免死锁 |
| Semaphore | 控制并发数量 | 限制资源访问数量 |
| Condition | 生产者-消费者模式 | 条件等待和通知 |
| multiprocessing.Lock | 进程间同步 | 跨进程同步 |
实践建议
import threading
from contextlib import contextmanager
class SafeDataProcessor:
def __init__(self):
self.data = {}
self.lock = threading.Lock()
def update(self, key, value):
# 使用with语句,确保锁会释放
with self.lock:
old_value = self.data.get(key)
self.data[key] = value
return old_value
@contextmanager
def batch_update(self):
"""批量更新,一次获取锁完成多个操作"""
self.lock.acquire()
try:
yield self.data
finally:
self.lock.release()
def get(self, key):
with self.lock:
return self.data.get(key)
# 使用示例
processor = SafeDataProcessor()
def worker_update(processor, key, value):
processor.update(key, value)
# 批量更新示例
def batch_worker(processor):
with processor.batch_update():
for i in range(100):
processor.data[f"key_{i}"] = i
关键要点:
- 尽量使用
with语句自动管理锁 - 锁的粒度要适中,过大影响性能,过小可能不安全
- 注意死锁问题,特别是使用多个锁时
- 选择合适的锁类型适应不同的并发场景