Python案例如何避免并发冲突?

wen python案例 77

本文目录导读:

Python案例如何避免并发冲突?

  1. 多线程场景(threading)
  2. 多进程场景(multiprocessing)
  3. 异步编程(asyncio)
  4. 无锁并发技巧
  5. 数据库/文件并发
  6. 常见陷阱与最佳实践
  7. 实际案例对比

在Python中避免并发冲突,主要需要理解竞争条件(Race Condition)临界区(Critical Section)这两个概念,下面我会从不同层次给出常用的解决方案。

多线程场景(threading)

Python的GIL虽然保证了单个字节码指令的原子性,但复合操作(如count += 1)仍可能冲突。

使用Lock(互斥锁)

import threading
counter = 0
lock = threading.Lock()
def increment():
    global counter
    for _ in range(100000):
        with lock:  # 自动获取和释放锁
            counter += 1
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(counter)  # 正确输出 1,000,000

使用RLock(可重入锁)

适合同一个线程需要多次获取锁的情况(如递归函数)。

使用Semaphore(信号量)

控制同时访问某资源的线程数量:

semaphore = threading.Semaphore(3)  # 最多3个线程同时访问

多进程场景(multiprocessing)

由于进程间不共享内存(默认),需要使用跨进程同步机制

Manager + Lock

from multiprocessing import Process, Manager, Lock
def worker(d, lock):
    with lock:
        d['count'] = d.get('count', 0) + 1
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        lock = Lock()
        processes = [Process(target=worker, args=(d, lock)) for _ in range(10)]
        for p in processes: p.start()
        for p in processes: p.join()
        print(d['count'])  # 10

共享内存(Value/Array)+ Lock

from multiprocessing import Process, Value, Lock
def increment(val, lock):
    for _ in range(100):
        with lock:
            val.value += 1

异步编程(asyncio)

异步任务在单线程内协作执行,不存在真正的并行,但仍有共享状态问题。

使用asyncio.Lock

import asyncio
async def update(counter, lock):
    async with lock:
        # 执行非原子操作
        temp = counter.value + 1
        await asyncio.sleep(0.01)  # 模拟耗时
        counter.value = temp

无锁并发技巧

不可变数据结构

避免修改共享状态,而是生成新的数据:

# 线程安全版本
class SafeCounter:
    def __init__(self):
        self._lock = threading.Lock()
        self._count = 0
    def increment(self):
        with self._lock:
            # 返回新的不可变值
            return self._count + 1

使用队列进行生产者-消费者

from queue import Queue
q = Queue()  # 线程安全
# 生产者
def producer():
    for i in range(10):
        q.put(i)
# 消费者
def consumer():
    while True:
        item = q.get()
        # 处理item
        q.task_done()

原子操作(特定场景)

某些场景可以用threading.local()concurrent.futures.ThreadPoolExecutormap(对每个任务独立)。

数据库/文件并发

文件锁

import fcntl
with open('data.txt', 'a') as f:
    fcntl.flock(f, fcntl.LOCK_EX)  # 排他锁
    f.write('data\n')
    # 自动释放锁

数据库事务

使用ORM的乐观锁或悲观锁:

# 悲观锁示例 (SQL)
SELECT * FROM accounts WHERE id=1 FOR UPDATE;
# 乐观锁 (版本号)
UPDATE accounts SET balance=balance-100, version=version+1 
WHERE id=1 AND version=old_version;

常见陷阱与最佳实践

陷阱1: 忘记释放锁

# 错误写法
lock.acquire()
if condition:
    return  # 忘记释放锁!会导致死锁
lock.release()
# 正确写法
with lock:  # 自动处理
    if condition:
        return

陷阱2: 锁粒度太大

过多的锁会降低并发性能,考虑:

  • 使用读写锁(threading.RLock或第三方ReaderWriterLock
  • 分片锁(Striped Lock)

陷阱3: 死锁

避免嵌套锁或使用锁排序策略:

# 保证所有线程按相同顺序获取锁
lock1 = threading.Lock()
lock2 = threading.Lock()
def safe_operation():
    with lock1:
        with lock2:  # 始终先lock1再lock2
            pass

实际案例对比

# 冲突版
counter = 0
def bad_increment():
    global counter
    for _ in range(1000000):
        counter += 1  # 非原子操作
# 安全版
safe_counter = 0
lock = threading.Lock()
def safe_increment():
    global safe_counter
    for _ in range(1000000):
        with lock:
            safe_counter += 1

选择哪种方法取决于你的场景:

  • 多线程Lock/RLock/Semaphore
  • 多进程Manager/Value+Lock
  • 异步asyncio.Lock
  • 高吞吐无阻塞 → 队列/不可变对象/原子操作

最重要的是:明确临界区最小化锁范围避免死锁

抱歉,评论功能暂时关闭!