Python案例怎么实现并发锁控制?

wen python案例 75

本文目录导读:

Python案例怎么实现并发锁控制?

  1. threading.Lock - 线程锁
  2. RLock - 可重入锁
  3. 信号量 Semaphore
  4. Condition - 条件变量
  5. 进程锁 - multiprocessing.Lock
  6. 对比总结
  7. 实践建议

我来详细介绍Python中实现并发锁控制的几种常见方式:

threading.Lock - 线程锁

基础用法

import threading
import time
class BankAccount:
    def __init__(self, balance=0):
        self.balance = balance
        self.lock = threading.Lock()
    def deposit(self, amount):
        # 获取锁
        self.lock.acquire()
        try:
            # 模拟耗时操作
            current = self.balance
            time.sleep(0.1)  # 模拟IO操作
            self.balance = current + amount
            print(f"存款{amount}元,余额:{self.balance}")
        finally:
            # 释放锁
            self.lock.release()
    def withdraw(self, amount):
        # 更推荐的写法:使用with语句自动管理锁
        with self.lock:
            if self.balance >= amount:
                current = self.balance
                time.sleep(0.1)
                self.balance = current - amount
                print(f"取款{amount}元,余额:{self.balance}")
            else:
                print(f"余额不足!当前余额:{self.balance}")
# 测试
account = BankAccount(1000)
# 创建多个线程同时操作
threads = []
for i in range(5):
    t1 = threading.Thread(target=account.deposit, args=(100,))
    t2 = threading.Thread(target=account.withdraw, args=(50,))
    threads.extend([t1, t2])
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"最终余额:{account.balance}")

RLock - 可重入锁

import threading
class Counter:
    def __init__(self):
        self.count = 0
        self.lock = threading.RLock()  # 可重入锁
    def increment(self):
        with self.lock:
            self.count += 1
            # 可以在同一线程中再次获取锁
            self._do_something()
    def _do_something(self):
        with self.lock:  # 同一线程可以重复获得RLock
            print(f"当前计数:{self.count}")
    def get_count(self):
        with self.lock:
            return self.count
# 测试
counter = Counter()
threads = []
for i in range(10):
    t = threading.Thread(target=counter.increment)
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print(f"最终计数:{counter.get_count()}")

信号量 Semaphore

import threading
import time
import random
class ConnectionPool:
    def __init__(self, max_connections=3):
        self.semaphore = threading.Semaphore(max_connections)
        self.connections = []
        self.lock = threading.Lock()
    def get_connection(self, thread_name):
        print(f"{thread_name} 等待获取连接...")
        with self.semaphore:
            print(f"{thread_name} 获取到连接")
            time.sleep(random.uniform(1, 3))  # 模拟使用连接
            print(f"{thread_name} 释放连接")
def worker(pool, name):
    pool.get_connection(name)
# 测试
pool = ConnectionPool(3)
threads = []
for i in range(6):
    t = threading.Thread(target=worker, args=(pool, f"线程-{i}"))
    threads.append(t)
    t.start()
for t in threads:
    t.join()

Condition - 条件变量

import threading
import time
import random
class ProducerConsumer:
    def __init__(self, max_size=5):
        self.buffer = []
        self.max_size = max_size
        self.lock = threading.Lock()
        self.not_full = threading.Condition(self.lock)
        self.not_empty = threading.Condition(self.lock)
    def produce(self, item, producer_id):
        with self.not_full:
            while len(self.buffer) >= self.max_size:
                print(f"生产者{producer_id}:缓冲区已满,等待...")
                self.not_full.wait()
            self.buffer.append(item)
            print(f"生产者{producer_id}:生产了{item},缓冲区大小:{len(self.buffer)}")
            self.not_empty.notify()
    def consume(self, consumer_id):
        with self.not_empty:
            while len(self.buffer) == 0:
                print(f"消费者{consumer_id}:缓冲区为空,等待...")
                self.not_empty.wait()
            item = self.buffer.pop(0)
            print(f"消费者{consumer_id}:消费了{item},缓冲区大小:{len(self.buffer)}")
            self.not_full.notify()
            return item
def producer_task(pc, producer_id):
    for i in range(5):
        time.sleep(random.uniform(0.5, 2))
        pc.produce(f"产品-{producer_id}-{i}", producer_id)
def consumer_task(pc, consumer_id):
    for i in range(5):
        time.sleep(random.uniform(1, 3))
        pc.consume(consumer_id)
# 测试
pc = ProducerConsumer()
threads = []
# 创建2个生产者
for i in range(2):
    t = threading.Thread(target=producer_task, args=(pc, i))
    threads.append(t)
# 创建2个消费者
for i in range(2):
    t = threading.Thread(target=consumer_task, args=(pc, i))
    threads.append(t)
for t in threads:
    t.start()
for t in threads:
    t.join()

进程锁 - multiprocessing.Lock

from multiprocessing import Process, Lock, Value
import time
class SharedCounter:
    def __init__(self):
        self.count = Value('i', 0)
        self.lock = Lock()
    def increment(self, n):
        for _ in range(n):
            with self.lock:
                self.count.value += 1
                time.sleep(0.01)  # 模拟工作
def worker(counter, n, worker_id):
    print(f"进程{worker_id}开始工作")
    counter.increment(n)
    print(f"进程{worker_id}完成工作")
# 测试
if __name__ == '__main__':
    counter = SharedCounter()
    processes = []
    for i in range(4):
        p = Process(target=worker, args=(counter, 100, i))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    print(f"最终计数:{counter.count.value}")

对比总结

锁类型 适用场景 特点
Lock 基本的线程同步 简单,不可重入
RLock 同一线程多次获取锁 可重入,避免死锁
Semaphore 控制并发数量 限制资源访问数量
Condition 生产者-消费者模式 条件等待和通知
multiprocessing.Lock 进程间同步 跨进程同步

实践建议

import threading
from contextlib import contextmanager
class SafeDataProcessor:
    def __init__(self):
        self.data = {}
        self.lock = threading.Lock()
    def update(self, key, value):
        # 使用with语句,确保锁会释放
        with self.lock:
            old_value = self.data.get(key)
            self.data[key] = value
            return old_value
    @contextmanager
    def batch_update(self):
        """批量更新,一次获取锁完成多个操作"""
        self.lock.acquire()
        try:
            yield self.data
        finally:
            self.lock.release()
    def get(self, key):
        with self.lock:
            return self.data.get(key)
# 使用示例
processor = SafeDataProcessor()
def worker_update(processor, key, value):
    processor.update(key, value)
# 批量更新示例
def batch_worker(processor):
    with processor.batch_update():
        for i in range(100):
            processor.data[f"key_{i}"] = i

关键要点:

  • 尽量使用with语句自动管理锁
  • 锁的粒度要适中,过大影响性能,过小可能不安全
  • 注意死锁问题,特别是使用多个锁时
  • 选择合适的锁类型适应不同的并发场景

抱歉,评论功能暂时关闭!