本文目录导读:

- 案例1:列表去重 —— 从O(n²)到O(n)
- 案例2:字符串拼接 —— 从O(n²)到O(n)
- 案例3:大数据处理 —— 从内存全量到流式/向量化
- 案例4:数值计算 —— 从Python循环到NumPy向量化
- 案例5:数据分组统计 —— 从手动字典到pandas
- 案例6:并行计算 —— 利用多核加速IO密集型/CPU密集型任务
- 综合优化检查清单
- 一个完整的综合案例:大文件词频统计
提升Python代码效率,可以从算法优化、语言特性利用、工具选型、并行计算这几个维度入手,下面通过具体案例(从慢到快)展示优化思路。
案例1:列表去重 —— 从O(n²)到O(n)
低效写法(双重循环)
def dedup_slow(arr):
result = []
for x in arr:
if x not in result: # 每次查找O(n),整体O(n²)
result.append(x)
return result
高效写法(利用set)
def dedup_fast(arr):
return list(set(arr)) # set查找O(1),整体O(n)
原理:in操作在list中是线性扫描,在set/hash table中是O(1)。
案例2:字符串拼接 —— 从O(n²)到O(n)
低效写法(拼接)
def concat_slow(n):
s = ""
for i in range(n):
s += str(i) # 每次创建新字符串,复制旧内容
return s
高效写法(join)
def concat_fast(n):
return "".join(str(i) for i in range(n)) # 一次分配内存
原理:字符串不可变,每次创建新对象;join先计算总长度再一次性分配,避免频繁内存拷贝。
案例3:大数据处理 —— 从内存全量到流式/向量化
场景:对10亿行文本统计单词数
低效写法(读入内存)
import sys
def count_words_slow(filename):
with open(filename) as f:
data = f.read() # 可能内存溢出
return len(data.split())
高效写法(迭代器+生成器)
def count_words_fast(filename):
count = 0
with open(filename) as f:
for line in f: # 逐行读取,不占内存
count += len(line.split())
return count
更高效写法(正则+生成器表达式)
import re
def count_words_faster(filename):
with open(filename) as f:
return sum(len(re.findall(r'\w+', line)) for line in f)
案例4:数值计算 —— 从Python循环到NumPy向量化
场景:数组每个元素平方后求和
低效写法(纯Python循环)
def sum_squares_slow(arr):
total = 0
for x in arr:
total += x * x
return total
高效写法(NumPy向量化)
import numpy as np
def sum_squares_fast(arr):
return np.sum(arr ** 2) # C语言级别循环
性能差异:10万元素,纯Python约0.2秒,NumPy约0.003秒(快60倍)。
案例5:数据分组统计 —— 从手动字典到pandas
场景:按类别计算销售额总和
低效写法
def group_sum_slow(data):
result = {}
for cat, val in data:
result[cat] = result.get(cat, 0) + val
return result
高效写法(pandas)
import pandas as pd
def group_sum_fast(data):
df = pd.DataFrame(data, columns=['cat', 'val'])
return df.groupby('cat')['val'].sum().to_dict()
对百万行数据,pandas利用底层C+优化算法快5-10倍。
案例6:并行计算 —— 利用多核加速IO密集型/CPU密集型任务
CPU密集型:质数判断
串行写法
def is_prime(n):
if n < 2: return False
for i in range(2, int(n**0.5)+1):
if n % i == 0: return False
return True
def find_primes_slow(start, end):
return [n for n in range(start, end) if is_prime(n)]
并行写法(concurrent.futures)
from concurrent.futures import ProcessPoolExecutor
def find_primes_fast(start, end, workers=4):
chunk_size = (end - start) // workers
ranges = [(i, i+chunk_size) for i in range(start, end, chunk_size)]
if ranges[-1][1] < end:
ranges[-1] = (ranges[-1][0], end)
with ProcessPoolExecutor(max_workers=workers) as executor:
results = executor.map(lambda r: find_primes_slow(r[0], r[1]), ranges)
return [p for res in results for p in res]
加速比:4核处理器可获得接近4倍加速(需注意进程启动开销,适合大规模任务)。
综合优化检查清单
| 优化类别 | 具体做法 | 典型案例 |
|---|---|---|
| 算法选择 | 用hash表代替list搜索 | 去重、查找 |
| 数据结构 | 用set/dict替代list的in操作 |
成员检查 |
| 字符串 | 用join替代 |
拼接数千字符串 |
| I/O | 使用迭代器逐行处理 | 大文件读取 |
| 数值计算 | 用NumPy/SciPy替代手动循环 | 矩阵运算 |
| 数据统计 | 用pandas替代手写groupby | 分组聚合 |
| 并行 | 多进程/多线程拆分任务 | 独立子任务 |
| 编译加速 | 用Cython/Numba/@jit |
数值循环 |
| 缓存结果 | 用functools.lru_cache |
递归、重复计算 |
一个完整的综合案例:大文件词频统计
需求:统计1GB文本文件中出现频率最高的100个单词。
优化演进
- 初级:
f.read().split()→ 内存溢出 - 改进:逐行读取 +
collections.Counter - 进阶:多进程分块统计,再归并
- 极致:使用
mmap内存映射文件 + 手动解析
# 方法2(实用且高效)
from collections import Counter
import re
def word_freq(filename):
counter = Counter()
with open(filename, 'r') as f:
for line in f:
counter.update(re.findall(r'\w+', line.lower()))
return counter.most_common(100)
优化原则:先衡量,再优化,用timeit或cProfile定位瓶颈,避免过早优化。