Python案例怎么过滤无效数据?

wen python案例 12

本文目录导读:

Python案例怎么过滤无效数据?

  1. 基础过滤:使用列表推导式
  2. 使用 filter() 函数
  3. Pandas 处理结构化数据(最常用)
  4. 处理 CSV 文件数据(实战案例)
  5. 数值范围过滤与异常值处理
  6. 文本数据过滤(字符串处理)
  7. 实时数据流过滤(进阶)

在Python中进行数据过滤是一个非常常见的任务,我总结了几个最实用的案例和方法,覆盖了从基础到进阶的场景。

基础过滤:使用列表推导式

这是最简洁、最 Pythonic 的方式:

# 原始数据:包含一些无效值(None,空字符串,非数字)
data = [23, None, "45", "", 12, "abc", 0, 15.5, -5, "   "]
# 过滤掉 None 和空字符串
cleaned = [x for x in data if x is not None and str(x).strip() != ""]
print(cleaned)  # [23, '45', 12, 'abc', 0, 15.5, -5]
# 只保留数字(整数和浮点数)
numbers = [x for x in cleaned if isinstance(x, (int, float))]
print(numbers)  # [23, 12, 0, 15.5, -5]
# 过滤掉负数
positive = [x for x in numbers if x >= 0]
print(positive)  # [23, 12, 0, 15.5]

使用 filter() 函数

适合逻辑较复杂或需要复用的过滤条件:

# 定义过滤函数
def is_valid_number(value):
    """判断是否为有效正数"""
    if value is None:
        return False
    try:
        num = float(value)
        return num > 0 and num < 100
    except (ValueError, TypeError):
        return False
data = [23, None, "45", "", "abc", 150, 15.5, "12.3", -5]
# 使用 filter 过滤
valid_data = list(filter(is_valid_number, data))
print(valid_data)  # [23, 15.5, '12.3']
# 使用 lambda 简化(适合简单条件)
positive_data = list(filter(lambda x: isinstance(x, (int, float)) and x > 0, data))
print(positive_data)  # [23, 15.5]

Pandas 处理结构化数据(最常用)

对于表格数据,Pandas 提供了最强大的过滤功能:

import pandas as pd
import numpy as np
# 创建示例数据
df = pd.DataFrame({
    'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve', None],
    'age': [25, None, 30, 150, 17, 28],
    'salary': [50000, 60000, None, -1000, 45000, 70000],
    'department': ['IT', 'HR', 'IT', 'HR', None, 'IT']
})
print("原始数据:")
print(df)
print("\n")
# 1️⃣ 删除包含缺失值的行
df_dropna = df.dropna()
print("删除所有缺失值:")
print(df_dropna)
print("\n")
# 2️⃣ 过滤年龄在合理范围(18-65)内
df_valid_age = df[df['age'].between(18, 65, inclusive='both')]
print("有效年龄:")
print(df_valid_age)
print("\n")
# 3️⃣ 过滤薪水大于0
df_valid_salary = df[df['salary'] > 0]
print("有效薪水:")
print(df_valid_salary)
print("\n")
# 4️⃣ 组合多个条件
df_valid = df[
    (df['age'].between(18, 65)) & 
    (df['salary'] > 0) & 
    (df['name'].notna()) &
    (df['department'].notna())
]
print("完全有效的数据:")
print(df_valid)

处理 CSV 文件数据(实战案例)

这是实际工作中最常见的场景:

import pandas as pd
import numpy as np
# 假设你有一个销售数据 CSV 文件
sales_data = """
日期,产品,数量,单价,备注
2024-01-01,A,5,100,正常
2024-01-01,B,,50,缺少数量
2024-01-02,A,-3,100,负数异常
2024-01-02,B,10,,缺少单价
2024-01-03,A,0,100,数量为0
2024-01-03,B,8,200,正常
2024-01-04,A,,,全部缺失
2024-01-05,B,1000,100,数量异常过大
"""
# 读取数据
df = pd.read_csv(pd.StringIO(sales_data))
print("原始数据:")
print(df)
print("\n")
# 数据清洗函数
def clean_sales_data(df):
    """清洗销售数据"""
    cleaned = df.copy()
    # 1. 处理缺失值
    # 删除关键字段缺失的行
    cleaned = cleaned.dropna(subset=['数量', '单价'])
    # 2. 删除负数(数量或单价为负)
    cleaned = cleaned[(cleaned['数量'] > 0) & (cleaned['单价'] > 0)]
    # 3. 识别并标记异常值(比如数量超过100)
    cleaned['数量异常'] = cleaned['数量'] > 100
    # 4. 或者直接删除异常值
    # cleaned = cleaned[cleaned['数量'] <= 100]
    return cleaned
# 执行清洗
cleaned_df = clean_sales_data(df)
print("清洗后数据:")
print(cleaned_df)
print(f"\n清洗前 {len(df)} 条,清洗后 {len(cleaned_df)} 条")

数值范围过滤与异常值处理

import numpy as np
import pandas as pd
# 模拟传感器数据
np.random.seed(42)
sensor_data = np.random.randn(100) * 10 + 50  # 均值50,标准差10
# 加入一些异常值
sensor_data[10] = 1000  # 传感器故障
sensor_data[20] = -500   # 传感器故障
sensor_data[30] = 55    # 正常值
# 方法1: 基于标准差(假设正态分布)
mean = np.mean(sensor_data)
std = np.std(sensor_data)
threshold = 3  # 3倍标准差
valid_data1 = sensor_data[
    (sensor_data > mean - threshold * std) & 
    (sensor_data < mean + threshold * std)
]
print(f"原始数据: {len(sensor_data)} 条")
print(f"3σ原则过滤后: {len(valid_data1)} 条")
# 方法2: 基于四分位距(IQR,更适合非正态分布)
Q1 = np.percentile(sensor_data, 25)
Q3 = np.percentile(sensor_data, 75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
valid_data2 = sensor_data[
    (sensor_data >= lower_bound) & 
    (sensor_data <= upper_bound)
]
print(f"IQR方法过滤后: {len(valid_data2)} 条")

文本数据过滤(字符串处理)

import re
# 手机号码验证
phone_numbers = [
    "13812345678",
    "12345678901",    # 无效前缀
    "139abcd5678",    # 包含字母
    "18600001111",
    "1381234567",     # 少一位
    "138123456789",   # 多一位
]
def validate_phone(phone):
    """验证中国手机号码"""
    pattern = r'^1[3-9]\d{9}$'
    return bool(re.match(pattern, str(phone)))
valid_phones = list(filter(validate_phone, phone_numbers))
print(f"有效手机号: {valid_phones}")
# 邮箱过滤
emails = [
    "user@example.com",
    "invalid.email",       # 缺少@
    "user@.com",          # 无效域名
    "test@test.com",
    "@gmail.com",         # 缺少用户名
]
def validate_email(email):
    pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
    return bool(re.match(pattern, str(email)))
valid_emails = list(filter(validate_email, emails))
print(f"有效邮箱: {valid_emails}")

实时数据流过滤(进阶)

from collections import deque
import time
import random
class DataFilter:
    """实时数据过滤器"""
    def __init__(self, window_size=10):
        self.window = deque(maxlen=window_size)
        self.window_size = window_size
    def add_data(self, value):
        """添加新数据并过滤"""
        # 基本验证
        if value is None or not isinstance(value, (int, float)):
            return None
        # 范围检查
        if value < 0 or value > 100:
            return None
        # 添加到窗口
        self.window.append(value)
        # 基于滑动窗口的均值过滤(去除突变)
        if len(self.window) >= self.window_size:
            mean = sum(self.window) / len(self.window)
            # 如果偏离均值太多,认为是异常
            if abs(value - mean) > 3 * (sum((x - mean)**2 for x in self.window) / len(self.window))**0.5:
                return None
        return value
# 模拟实时数据
filter = DataFilter(window_size=5)
for _ in range(20):
    # 模拟传感器读数(偶尔有异常值)
    if random.random() < 0.1:
        data = random.choice([None, 999, -5])  # 异常数据
    else:
        data = random.gauss(50, 10)  # 正常数据
    result = filter.add_data(data)
    if result is not None:
        print(f"有效数据: {result:.2f}")
    else:
        print(f"过滤掉: {data}")
    time.sleep(0.1)
  1. 先理解数据:不要盲目过滤,先了解数据分布和特征
  2. 保留原始数据:在副本上操作,或记录过滤原因
  3. 逐步过滤:每一步过滤后检查结果,避免过度清洗
  4. 文档化:记录过滤规则和依据
  5. 验证结果:确保过滤后的数据仍然合理
# 推荐的通用过滤流程
def smart_filter(data, rules=None):
    """
    智能数据过滤
    rules: 字典格式 {列名: (验证函数, 错误信息)}
    """
    if rules is None:
        # 默认规则
        rules = {
            '年龄': (lambda x: 0 < x < 150, '年龄超出范围'),
            '薪水': (lambda x: x > 0, '薪水应为正数'),
            '邮箱': (lambda x: '@' in str(x), '邮箱格式错误')
        }
    filtered = data.copy()
    errors = []
    for column, (validator, error_msg) in rules.items():
        if column in filtered.columns:
            mask = filtered[column].apply(validator)
            bad_data = filtered[~mask]
            if not bad_data.empty:
                errors.append(f"{column}: {error_msg} (共{len(bad_data)}条)")
            filtered = filtered[mask]
    return filtered, errors

这些方法覆盖了大多数数据过滤场景,选择哪种方法取决于你的数据类型、数据量大小和具体需求,对于小型数据集,列表推导式就足够了;对于大型结构化数据,Pandas 是更好的选择。

抱歉,评论功能暂时关闭!