本文目录导读:

我来分享几种常见的数据去重方法,从简单到复杂逐步实现。
基础列表去重
简单列表去重
# 方法1:使用set(无序,会改变原始顺序)
data = [1, 2, 2, 3, 4, 4, 5]
unique_data = list(set(data))
print(f"set去重(顺序可能改变): {unique_data}")
# 方法2:保持原始顺序
def remove_duplicates_ordered(lst):
seen = set()
result = []
for item in lst:
if item not in seen:
seen.add(item)
result.append(item)
return result
data = [3, 1, 2, 1, 3, 4, 2, 5]
print(f"有序去重: {remove_duplicates_ordered(data)}")
字典列表去重(根据特定字段)
# 假设我们有一个用户列表,需要根据email去重
users = [
{'id': 1, 'name': '张三', 'email': 'zhang@example.com'},
{'id': 2, 'name': '李四', 'email': 'li@example.com'},
{'id': 3, 'name': '张三', 'email': 'zhang@example.com'}, # 重复
{'id': 4, 'name': '王五', 'email': 'wang@example.com'}
]
# 根据email去重,保留第一次出现的记录
def deduplicate_dicts(data, key_field):
seen = set()
result = []
for item in data:
value = item[key_field]
if value not in seen:
seen.add(value)
result.append(item)
return result
deduped_users = deduplicate_dicts(users, 'email')
print("字典列表去重结果:")
for user in deduped_users:
print(f" ID: {user['id']}, 姓名: {user['name']}, 邮箱: {user['email']}")
# 使用pandas(如果你安装了)
try:
import pandas as pd
df = pd.DataFrame(users)
df_deduped = df.drop_duplicates(subset=['email'], keep='first')
print(f"\nPandas去重:\n{df_deduped}")
except ImportError:
print("\npandas未安装,跳过")
复杂对象去重
from dataclasses import dataclass
@dataclass
class Student:
id: int
name: str
grade: int
def __hash__(self):
# 只根据id和name判断是否重复
return hash((self.id, self.name))
def __eq__(self, other):
if not isinstance(other, Student):
return False
return self.id == other.id and self.name == other.name
# 创建学生列表
students = [
Student(1, '小明', 90),
Student(2, '小红', 85),
Student(1, '小明', 95), # 重复(id和name相同)
Student(3, '小华', 88)
]
# 去重
unique_students = list(set(students))
print("对象去重结果:")
for s in unique_students:
print(f" ID: {s.id}, 姓名: {s.name}, 成绩: {s.grade}")
文件数据去重(实战案例)
import csv
from io import StringIO
# 模拟CSV数据
csv_data = """姓名,电话,城市
张三,13800138000,北京
李四,13900139000,上海
张三,13800138000,北京
王五,13700137000,广州
李四,13900139000,上海"""
def deduplicate_csv(csv_content, key_columns=None):
"""
对CSV数据进行去重
Args:
csv_content: CSV字符串
key_columns: 用于判断重复的列,默认所有列
"""
reader = csv.DictReader(StringIO(csv_content))
seen = set()
unique_rows = []
for row in reader:
if key_columns:
# 只根据指定列判断
key = tuple(row[col] for col in key_columns)
else:
# 根据所有列判断
key = tuple(row.values())
if key not in seen:
seen.add(key)
unique_rows.append(row)
return unique_rows
# 去重
result = deduplicate_csv(csv_data, key_columns=['姓名', '电话'])
print("CSV去重结果:")
for row in result:
print(f" 姓名: {row['姓名']}, 电话: {row['电话']}, 城市: {row['城市']}")
模糊去重(字符串相似度)
from difflib import SequenceMatcher
def similar(a, b, threshold=0.8):
"""判断两个字符串是否相似"""
return SequenceMatcher(None, a, b).ratio() > threshold
def fuzzy_deduplicate(data, threshold=0.8):
"""模糊去重"""
result = []
for item in data:
is_duplicate = False
for existing in result:
if similar(item, existing, threshold):
is_duplicate = True
break
if not is_duplicate:
result.append(item)
return result
# 示例:员工姓名模糊去重
names = [
'张三', '张 三', '张三丰', '李四',
'李 四', '王五', '张三(销售部)'
]
cleaned_names = fuzzy_deduplicate(names, threshold=0.6)
print("模糊去重结果:")
for name in cleaned_names:
print(f" - {name}")
完整的去重工具函数
from typing import List, Dict, Any, Callable
import pandas as pd
class DataDeduplicator:
"""数据去重工具类"""
@staticmethod
def simple_list(lst: List) -> List:
"""简单列表去重(保持顺序)"""
seen = set()
return [x for x in lst if not (x in seen or seen.add(x))]
@staticmethod
def dict_list(data: List[Dict], keys: List[str] = None) -> List[Dict]:
"""字典列表去重"""
seen = set()
result = []
for item in data:
if keys:
key = tuple(item[k] for k in keys)
else:
key = tuple(item.values())
if key not in seen:
seen.add(key)
result.append(item)
return result
@staticmethod
def by_custom_key(data: List, key_func: Callable) -> List:
"""自定义键去重"""
seen = set()
result = []
for item in data:
key = key_func(item)
if key not in seen:
seen.add(key)
result.append(item)
return result
# 使用示例
dedup = DataDeduplicator()
# 简单列表
numbers = [1, 2, 2, 3, 3, 4, 5, 5]
print(f"数字去重: {dedup.simple_list(numbers)}")
# 字典列表
users = [
{'name': '张三', 'email': 'zhang@test.com'},
{'name': '李四', 'email': 'li@test.com'},
{'name': '张三', 'email': 'zhang@test.com'}
]
print(f"用户去重: {dedup.dict_list(users, ['name', 'email'])}")
# 自定义键
strings = ['hello', 'Hello', 'HELLO', 'world']
case_insensitive = dedup.by_custom_key(strings, str.lower)
print(f"不区分大小写去重: {case_insensitive}")
性能对比
import time
import random
# 生成测试数据
test_data = [random.randint(1, 1000) for _ in range(100000)]
# 测试不同方法
def test_performance():
# set去重(无序)
start = time.time()
result_set = list(set(test_data))
print(f"set去重: {time.time() - start:.4f}秒")
# 有序去重
start = time.time()
seen = set()
result_ordered = [x for x in test_data if not (x in seen or seen.add(x))]
print(f"有序去重: {time.time() - start:.4f}秒")
# 使用pandas
try:
import pandas as pd
start = time.time()
s = pd.Series(test_data).drop_duplicates().tolist()
print(f"Pandas去重: {time.time() - start:.4f}秒")
except ImportError:
pass
test_performance()
选择建议
- 简单数据:直接用
set()或列表推导式 - 字典列表:根据业务需求指定去重字段
- 大数据量:使用pandas或数据库去重
- 需要保持顺序:使用
seen集合+列表 - 模糊匹配:考虑使用字符串相似度算法
根据你的实际需求选择合适的方法,如果有具体场景,欢迎继续交流!