数据清洗脚本怎么做?

wen 实用脚本 44

本文目录导读:

数据清洗脚本怎么做?

  1. 核心思想:模块化 + 可追溯
  2. 第一步:脚本基础框架与库导入
  3. 第二步:加载数据与初步探查
  4. 第三步:核心清洗函数(按模块编写)
  5. 第四步:主流程整合
  6. 第五步:使用示例(完整实战)
  7. 进阶技巧(让脚本更专业)
  8. 总结:一个好的清洗脚本 = 规则明确 + 模块化解耦 + 可追溯

数据清洗是数据分析中最重要的前置步骤,通常占整个项目 60%-80% 的时间,好的清洗脚本能极大提升数据质量。

下面我会系统性地讲解一个通用的数据清洗脚本框架,并提供对应的 Python 代码示例(使用 Pandas 库,这是最常用的数据清洗工具)。

核心思想:模块化 + 可追溯

一个成熟的清洗脚本不应是一大坨代码,而应该按步骤拆解。


第一步:脚本基础框架与库导入

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
# 1. 设置显示选项(方便调试)
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 200)
pd.set_option('display.width', 200)

第二步:加载数据与初步探查

# 2. 加载数据(支持 csv, excel, json, sql 等)
def load_data(file_path):
    if file_path.endswith('.csv'):
        df = pd.read_csv(file_path, encoding='utf-8')  # 注意编码问题
    elif file_path.endswith('.xlsx'):
        df = pd.read_excel(file_path, engine='openpyxl')
    elif file_path.endswith('.json'):
        df = pd.read_json(file_path)
    else:
        raise ValueError("不支持的文件格式")
    return df
def initial_inspect(df):
    """初步探查函数"""
    print("="*50)
    print("【基本概况】")
    print(f"行数: {df.shape[0]}, 列数: {df.shape[1]}")
    print("\n【数据类型与缺失值】")
    print(df.info())
    print("\n【缺失值统计】")
    print(df.isnull().sum()[df.isnull().sum() > 0])
    print("\n【前5行数据预览】")
    print(df.head())
    return df
# 调用示例
# df = load_data('your_data.csv')
# df = initial_inspect(df)

第三步:核心清洗函数(按模块编写)

这是脚本的核心,建议写成独立函数方便复用。

1 处理缺失值

def handle_missing_values(df, strategy_dict=None):
    """
    根据列的不同,采用不同的缺失值策略
    strategy_dict: {'col_name': 'drop' / 'fill_mean' / 'fill_median' / 'fill_mode' / value}
    """
    if strategy_dict is None:
        strategy_dict = {}
    for col, strategy in strategy_dict.items():
        if col not in df.columns:
            continue
        null_count = df[col].isnull().sum()
        if null_count == 0:
            continue
        if strategy == 'drop':
            df = df.dropna(subset=[col])
        elif strategy == 'fill_mean':
            df[col].fillna(df[col].mean(), inplace=True)
        elif strategy == 'fill_median':
            df[col].fillna(df[col].median(), inplace=True)
        elif strategy == 'fill_mode':
            df[col].fillna(df[col].mode()[0], inplace=True)
        elif isinstance(strategy, (int, float, str)):
            df[col].fillna(strategy, inplace=True)
        elif strategy == 'forward_fill':
            df[col].fillna(method='ffill', inplace=True)
        elif strategy == 'backward_fill':
            df[col].fillna(method='bfill', inplace=True)
        else:
            print(f"未知策略: {strategy}")
    return df

2 处理重复值

def handle_duplicates(df, subset=None, keep='first'):
    """
    subset: 基于哪些列判断重复(None 表示全部列)
    keep: 'first'/'last'/False
    """
    before = df.shape[0]
    df = df.drop_duplicates(subset=subset, keep=keep)
    after = df.shape[0]
    print(f"去重: 删除了 {before - after} 行重复数据")
    return df

3 处理异常值与类型转换

def clean_outliers_and_types(df, col_configs):
    """
    col_configs: { 'column_name': { 'type': 'int'/'float'/'datetime'/'str',
                                     'lower_bound': 0,
                                     'upper_bound': 100,
                                     'strategy': 'cap'/'drop' } }
    """
    for col, config in col_configs.items():
        if col not in df.columns:
            continue
        # 1. 类型转换
        if 'type' in config:
            try:
                if config['type'] == 'datetime':
                    df[col] = pd.to_datetime(df[col], errors='coerce')
                elif config['type'] == 'int':
                    df[col] = pd.to_numeric(df[col], errors='coerce').astype('Int64')
                elif config['type'] == 'float':
                    df[col] = pd.to_numeric(df[col], errors='coerce')
                elif config['type'] == 'str':
                    df[col] = df[col].astype(str)
            except:
                print(f"列 {col} 类型转换失败")
        # 2. 异常值处理(仅对数值列)
        if 'lower_bound' in config or 'upper_bound' in config:
            lower = config.get('lower_bound', df[col].min())
            upper = config.get('upper_bound', df[col].max())
            strategy = config.get('strategy', 'cap')  # cap 或 drop
            if strategy == 'cap':
                df[col] = df[col].clip(lower, upper)
            elif strategy == 'drop':
                df = df[(df[col] >= lower) & (df[col] <= upper)]
    return df

4 标准化文本数据(去除空格、大小写等)

def standardize_text(df, text_cols, strip=True, lower=True, replace_dict=None):
    """
    对文本列进行标准化处理
    """
    for col in text_cols:
        if col not in df.columns:
            continue
        if strip:
            df[col] = df[col].str.strip()
        if lower:
            df[col] = df[col].str.lower()
        if replace_dict:
            for old, new in replace_dict.items():
                df[col] = df[col].str.replace(old, new, regex=False)
    return df

第四步:主流程整合

def run_data_cleaning_pipeline(df, config):
    """
    主清洗管道
    config: 字典,包含所有清洗参数
    """
    initial_inspect(df)
    # 1. 删除全空的行或列
    df = df.dropna(how='all')
    # 2. 处理重复值
    if config.get('remove_duplicates'):
        df = handle_duplicates(df, 
                               subset=config['duplicate_subset'], 
                               keep=config['duplicate_keep'])
    # 3. 处理缺失值
    if config.get('missing_strategy'):
        df = handle_missing_values(df, config['missing_strategy'])
    # 4. 处理异常值和类型
    if config.get('col_configs'):
        df = clean_outliers_and_types(df, config['col_configs'])
    # 5. 文本标准化
    if config.get('text_cols'):
        df = standardize_text(df, config['text_cols'])
    # 6. 重置索引
    df = df.reset_index(drop=True)
    print("\n" + "="*50)
    print("【清洗完成】")
    print(f"最终数据集: {df.shape[0]} 行 x {df.shape[1]} 列")
    return df

第五步:使用示例(完整实战)

假设你有一个 sales.csv 文件,可以这样配置:

if __name__ == "__main__":
    # 加载数据
    df = load_data('sales.csv')
    # 配置清洗规则
    cleaning_config = {
        'remove_duplicates': True,
        'duplicate_subset': ['order_id'],  # 基于订单号去重
        'duplicate_keep': 'first',
        'missing_strategy': {
            'age': 'fill_median',          # 年龄用中位数填充
            'income': 'fill_mean',         # 收入用均值填充
            'gender': 'fill_mode',         # 性别用众数填充
            'comment': 'forward_fill',     # 评论向下填充
            'city': '未知城市',             # 城市用固定值
            'order_date': 'drop'           # 日期缺失则删除该行
        },
        'col_configs': {
            'age': { 'type': 'int', 'lower_bound': 0, 'upper_bound': 120, 'strategy': 'cap' },
            'price': { 'type': 'float', 'lower_bound': 0, 'strategy': 'drop' },
            'order_date': { 'type': 'datetime' },
            'phone': { 'type': 'str' }
        },
        'text_cols': ['name', 'city', 'product']
    }
    # 执行清洗
    df_cleaned = run_data_cleaning_pipeline(df, cleaning_config)
    # 保存清洗后的数据
    df_cleaned.to_csv('sales_cleaned.csv', index=False, encoding='utf-8-sig')
    # 输出清洗日志
    print("\n清洗报告已生成:sales_cleaned.csv")

进阶技巧(让脚本更专业)

  1. 记录清洗日志:在每步操作后记录 原始行数 -> 清洗后行数,方便审计。
  2. 可视化检查:在清洗前后绘制箱线图、直方图,观察数据分布变化。
  3. 处理日期格式:使用 pd.to_datetime() 并设置 errors='coerce',无效日期会转为 NaT。
  4. 内存优化:对于大文件(>1GB),可以先读部分行,或者使用 dtype 参数指定列的数据类型以节省内存。
  5. 脚本参数化:把配置文件写成 JSON 或 YAML 文件,脚本不变,只改配置文件(适合多个不同数据源)。

一个好的清洗脚本 = 规则明确 + 模块化解耦 + 可追溯

  • 不要一次性改所有列:按列清洗,定义清晰的规则。
  • 不要直接覆盖原始数据:保存清洗日志和清洗后的副本。
  • 不要硬编码:把策略放在字典或配置文件中,便于修改。

希望这个框架对你有帮助,如果你有具体的数据(比如有什么样的脏数据),可以告诉我,我可以给你更针对性的代码。

抱歉,评论功能暂时关闭!