本文目录导读:

在Python中,多进程主要通过multiprocessing模块实现,下面我会用多个案例来展示多进程的常用方法。
基础用法:创建和启动进程
import multiprocessing
import time
import os
def worker(name):
"""进程要执行的任务"""
print(f"进程 {name} 开始工作,PID: {os.getpid()}")
time.sleep(2)
print(f"进程 {name} 工作完成")
if __name__ == '__main__':
# 创建多个进程
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker, args=(f'Worker-{i}',))
processes.append(p)
p.start()
# 等待所有进程完成
for p in processes:
p.join()
print("所有进程执行完毕")
进程间通信:队列(Queue)
import multiprocessing
import time
import random
def producer(queue, items):
"""生产者进程"""
for i in range(items):
item = random.randint(1, 100)
queue.put(item)
print(f"生产者放入: {item}")
time.sleep(0.5)
queue.put(None) # 发送结束信号
def consumer(queue):
"""消费者进程"""
while True:
item = queue.get()
if item is None:
break
print(f"消费者取出: {item}, 处理结果: {item * 2}")
time.sleep(0.8)
if __name__ == '__main__':
# 创建队列
queue = multiprocessing.Queue()
# 创建生产者和消费者进程
p1 = multiprocessing.Process(target=producer, args=(queue, 5))
p2 = multiprocessing.Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()
print("生产消费完成")
进程池(Pool)的使用
import multiprocessing
import time
def calculate_square(n):
"""计算平方"""
time.sleep(1) # 模拟耗时操作
print(f"计算 {n} 的平方 = {n * n}")
return n * n
def calculate_cube(n):
"""计算立方"""
time.sleep(1)
print(f"计算 {n} 的立方 = {n ** 3}")
return n ** 3
if __name__ == '__main__':
numbers = list(range(1, 11))
# 创建进程池,默认使用CPU核心数
with multiprocessing.Pool(processes=4) as pool:
# 方式1: map - 批量处理
results = pool.map(calculate_square, numbers)
print(f"map结果: {results}")
print("\n" + "="*30 + "\n")
# 方式2: apply_async - 异步执行
async_results = []
for num in numbers[:5]:
result = pool.apply_async(calculate_cube, (num,))
async_results.append(result)
# 获取结果
for result in async_results:
print(f"异步结果: {result.get()}")
print("\n" + "="*30 + "\n")
# 方式3: starmap - 处理多参数
params = [(1, 2), (3, 4), (5, 6)]
def add(a, b):
time.sleep(0.5)
return a + b
results = pool.starmap(add, params)
print(f"starmap结果: {results}")
共享内存(Value和Array)
import multiprocessing
import time
def increment(counter, lock):
"""递增计数器"""
for _ in range(100):
with lock: # 使用锁避免竞态条件
counter.value += 1
time.sleep(0.001)
def decrement(counter, lock):
"""递减计数器"""
for _ in range(100):
with lock:
counter.value -= 1
time.sleep(0.001)
if __name__ == '__main__':
# 创建共享变量和锁
counter = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
# 创建10个进程同时操作共享变量
processes = []
for i in range(5):
p1 = multiprocessing.Process(target=increment, args=(counter, lock))
p2 = multiprocessing.Process(target=decrement, args=(counter, lock))
processes.extend([p1, p2])
p1.start()
p2.start()
for p in processes:
p.join()
print(f"最终计数器值: {counter.value}") # 应该是0
实战案例:批量下载文件
import multiprocessing
import time
import requests
def download_file(url, save_path):
"""下载文件"""
try:
print(f"开始下载: {url}")
response = requests.get(url, timeout=10)
with open(save_path, 'wb') as f:
f.write(response.content)
print(f"下载完成: {save_path}")
return True
except Exception as e:
print(f"下载失败 {url}: {e}")
return False
def batch_download():
"""批量下载示例"""
# 模拟下载任务
download_tasks = [
("http://example.com/file1.jpg", "file1.jpg"),
("http://example.com/file2.jpg", "file2.jpg"),
("http://example.com/file3.jpg", "file3.jpg"),
("http://example.com/file4.jpg", "file4.jpg"),
("http://example.com/file5.jpg", "file5.jpg"),
]
start_time = time.time()
# 使用进程池并行下载
with multiprocessing.Pool(processes=3) as pool:
results = pool.starmap(download_file, download_tasks)
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f}秒")
print(f"成功下载: {sum(results)}/{len(results)}")
if __name__ == '__main__':
batch_download()
进程同步:Event
import multiprocessing
import time
import random
def worker_initializer(event):
"""等待信号后开始工作"""
print("进程准备就绪,等待开始信号...")
event.wait() # 等待事件触发
print("收到开始信号,开始工作")
time.sleep(random.uniform(0.5, 1.5))
print("工作完成")
if __name__ == '__main__':
# 创建事件对象
start_event = multiprocessing.Event()
# 创建多个进程
processes = []
for i in range(5):
p = multiprocessing.Process(target=worker_initializer, args=(start_event,))
processes.append(p)
p.start()
print("主进程准备中...")
time.sleep(2)
print("主进程发出开始信号!")
start_event.set() # 触发事件
for p in processes:
p.join()
print("所有进程完成")
- 使用
if __name__ == '__main__':在Windows上必须,避免无限递归创建进程 - 进程间通信:使用
Queue、Pipe、Manager等 - 进程池:
Pool可以自动管理进程,常用方法有map、apply_async、starmap - 共享数据:使用
Value、Array、Manager.dict()等 - 同步机制:
Lock、Event、Semaphore等控制进程同步 - 避免全局共享:多进程不共享全局变量,需要使用特殊机制
这些例子覆盖了多进程编程的主要场景,可以根据具体需求选择合适的实现方式。