本文目录导读:

- 核心思想:模块化 + 可追溯
- 第一步:脚本基础框架与库导入
- 第二步:加载数据与初步探查
- 第三步:核心清洗函数(按模块编写)
- 第四步:主流程整合
- 第五步:使用示例(完整实战)
- 进阶技巧(让脚本更专业)
- 总结:一个好的清洗脚本 = 规则明确 + 模块化解耦 + 可追溯
数据清洗是数据分析中最重要的前置步骤,通常占整个项目 60%-80% 的时间,好的清洗脚本能极大提升数据质量。
下面我会系统性地讲解一个通用的数据清洗脚本框架,并提供对应的 Python 代码示例(使用 Pandas 库,这是最常用的数据清洗工具)。
核心思想:模块化 + 可追溯
一个成熟的清洗脚本不应是一大坨代码,而应该按步骤拆解。
第一步:脚本基础框架与库导入
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
# 1. 设置显示选项(方便调试)
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 200)
pd.set_option('display.width', 200)
第二步:加载数据与初步探查
# 2. 加载数据(支持 csv, excel, json, sql 等)
def load_data(file_path):
if file_path.endswith('.csv'):
df = pd.read_csv(file_path, encoding='utf-8') # 注意编码问题
elif file_path.endswith('.xlsx'):
df = pd.read_excel(file_path, engine='openpyxl')
elif file_path.endswith('.json'):
df = pd.read_json(file_path)
else:
raise ValueError("不支持的文件格式")
return df
def initial_inspect(df):
"""初步探查函数"""
print("="*50)
print("【基本概况】")
print(f"行数: {df.shape[0]}, 列数: {df.shape[1]}")
print("\n【数据类型与缺失值】")
print(df.info())
print("\n【缺失值统计】")
print(df.isnull().sum()[df.isnull().sum() > 0])
print("\n【前5行数据预览】")
print(df.head())
return df
# 调用示例
# df = load_data('your_data.csv')
# df = initial_inspect(df)
第三步:核心清洗函数(按模块编写)
这是脚本的核心,建议写成独立函数方便复用。
1 处理缺失值
def handle_missing_values(df, strategy_dict=None):
"""
根据列的不同,采用不同的缺失值策略
strategy_dict: {'col_name': 'drop' / 'fill_mean' / 'fill_median' / 'fill_mode' / value}
"""
if strategy_dict is None:
strategy_dict = {}
for col, strategy in strategy_dict.items():
if col not in df.columns:
continue
null_count = df[col].isnull().sum()
if null_count == 0:
continue
if strategy == 'drop':
df = df.dropna(subset=[col])
elif strategy == 'fill_mean':
df[col].fillna(df[col].mean(), inplace=True)
elif strategy == 'fill_median':
df[col].fillna(df[col].median(), inplace=True)
elif strategy == 'fill_mode':
df[col].fillna(df[col].mode()[0], inplace=True)
elif isinstance(strategy, (int, float, str)):
df[col].fillna(strategy, inplace=True)
elif strategy == 'forward_fill':
df[col].fillna(method='ffill', inplace=True)
elif strategy == 'backward_fill':
df[col].fillna(method='bfill', inplace=True)
else:
print(f"未知策略: {strategy}")
return df
2 处理重复值
def handle_duplicates(df, subset=None, keep='first'):
"""
subset: 基于哪些列判断重复(None 表示全部列)
keep: 'first'/'last'/False
"""
before = df.shape[0]
df = df.drop_duplicates(subset=subset, keep=keep)
after = df.shape[0]
print(f"去重: 删除了 {before - after} 行重复数据")
return df
3 处理异常值与类型转换
def clean_outliers_and_types(df, col_configs):
"""
col_configs: { 'column_name': { 'type': 'int'/'float'/'datetime'/'str',
'lower_bound': 0,
'upper_bound': 100,
'strategy': 'cap'/'drop' } }
"""
for col, config in col_configs.items():
if col not in df.columns:
continue
# 1. 类型转换
if 'type' in config:
try:
if config['type'] == 'datetime':
df[col] = pd.to_datetime(df[col], errors='coerce')
elif config['type'] == 'int':
df[col] = pd.to_numeric(df[col], errors='coerce').astype('Int64')
elif config['type'] == 'float':
df[col] = pd.to_numeric(df[col], errors='coerce')
elif config['type'] == 'str':
df[col] = df[col].astype(str)
except:
print(f"列 {col} 类型转换失败")
# 2. 异常值处理(仅对数值列)
if 'lower_bound' in config or 'upper_bound' in config:
lower = config.get('lower_bound', df[col].min())
upper = config.get('upper_bound', df[col].max())
strategy = config.get('strategy', 'cap') # cap 或 drop
if strategy == 'cap':
df[col] = df[col].clip(lower, upper)
elif strategy == 'drop':
df = df[(df[col] >= lower) & (df[col] <= upper)]
return df
4 标准化文本数据(去除空格、大小写等)
def standardize_text(df, text_cols, strip=True, lower=True, replace_dict=None):
"""
对文本列进行标准化处理
"""
for col in text_cols:
if col not in df.columns:
continue
if strip:
df[col] = df[col].str.strip()
if lower:
df[col] = df[col].str.lower()
if replace_dict:
for old, new in replace_dict.items():
df[col] = df[col].str.replace(old, new, regex=False)
return df
第四步:主流程整合
def run_data_cleaning_pipeline(df, config):
"""
主清洗管道
config: 字典,包含所有清洗参数
"""
initial_inspect(df)
# 1. 删除全空的行或列
df = df.dropna(how='all')
# 2. 处理重复值
if config.get('remove_duplicates'):
df = handle_duplicates(df,
subset=config['duplicate_subset'],
keep=config['duplicate_keep'])
# 3. 处理缺失值
if config.get('missing_strategy'):
df = handle_missing_values(df, config['missing_strategy'])
# 4. 处理异常值和类型
if config.get('col_configs'):
df = clean_outliers_and_types(df, config['col_configs'])
# 5. 文本标准化
if config.get('text_cols'):
df = standardize_text(df, config['text_cols'])
# 6. 重置索引
df = df.reset_index(drop=True)
print("\n" + "="*50)
print("【清洗完成】")
print(f"最终数据集: {df.shape[0]} 行 x {df.shape[1]} 列")
return df
第五步:使用示例(完整实战)
假设你有一个 sales.csv 文件,可以这样配置:
if __name__ == "__main__":
# 加载数据
df = load_data('sales.csv')
# 配置清洗规则
cleaning_config = {
'remove_duplicates': True,
'duplicate_subset': ['order_id'], # 基于订单号去重
'duplicate_keep': 'first',
'missing_strategy': {
'age': 'fill_median', # 年龄用中位数填充
'income': 'fill_mean', # 收入用均值填充
'gender': 'fill_mode', # 性别用众数填充
'comment': 'forward_fill', # 评论向下填充
'city': '未知城市', # 城市用固定值
'order_date': 'drop' # 日期缺失则删除该行
},
'col_configs': {
'age': { 'type': 'int', 'lower_bound': 0, 'upper_bound': 120, 'strategy': 'cap' },
'price': { 'type': 'float', 'lower_bound': 0, 'strategy': 'drop' },
'order_date': { 'type': 'datetime' },
'phone': { 'type': 'str' }
},
'text_cols': ['name', 'city', 'product']
}
# 执行清洗
df_cleaned = run_data_cleaning_pipeline(df, cleaning_config)
# 保存清洗后的数据
df_cleaned.to_csv('sales_cleaned.csv', index=False, encoding='utf-8-sig')
# 输出清洗日志
print("\n清洗报告已生成:sales_cleaned.csv")
进阶技巧(让脚本更专业)
- 记录清洗日志:在每步操作后记录
原始行数 -> 清洗后行数,方便审计。 - 可视化检查:在清洗前后绘制箱线图、直方图,观察数据分布变化。
- 处理日期格式:使用
pd.to_datetime()并设置errors='coerce',无效日期会转为 NaT。 - 内存优化:对于大文件(>1GB),可以先读部分行,或者使用
dtype参数指定列的数据类型以节省内存。 - 脚本参数化:把配置文件写成 JSON 或 YAML 文件,脚本不变,只改配置文件(适合多个不同数据源)。
一个好的清洗脚本 = 规则明确 + 模块化解耦 + 可追溯
- 不要一次性改所有列:按列清洗,定义清晰的规则。
- 不要直接覆盖原始数据:保存清洗日志和清洗后的副本。
- 不要硬编码:把策略放在字典或配置文件中,便于修改。
希望这个框架对你有帮助,如果你有具体的数据(比如有什么样的脏数据),可以告诉我,我可以给你更针对性的代码。