如何用实用脚本自动分割大文件?

wen 实用脚本 3

本文目录导读:

如何用实用脚本自动分割大文件?

  1. 基于文件行数分割(通用型)
  2. 按文件大小分割
  3. 智能分割(保持数据完整性)
  4. Shell脚本版本(Linux/Unix)
  5. 增强版:支持多种格式
  6. 使用示例
  7. 性能优化建议

基于文件行数分割(通用型)

Python脚本

#!/usr/bin/env python3
# split_large_file.py
import os
import sys
from pathlib import Path
def split_by_lines(input_file, lines_per_file=100000):
    """按行数分割大文件"""
    input_path = Path(input_file)
    if not input_path.exists():
        print(f"文件 {input_file} 不存在")
        return
    # 生成输出文件前缀
    base_name = input_path.stem
    output_dir = input_path.parent / f"{base_name}_split"
    output_dir.mkdir(exist_ok=True)
    file_count = 0
    current_lines = []
    with open(input_file, 'r', encoding='utf-8') as f:
        for line_num, line in enumerate(f, 1):
            current_lines.append(line)
            if len(current_lines) >= lines_per_file:
                file_count += 1
                output_file = output_dir / f"{base_name}_part_{file_count:04d}.txt"
                with open(output_file, 'w', encoding='utf-8') as out:
                    out.writelines(current_lines)
                print(f"已创建: {output_file}")
                current_lines = []
    # 处理剩余的行
    if current_lines:
        file_count += 1
        output_file = output_dir / f"{base_name}_part_{file_count:04d}.txt"
        with open(output_file, 'w', encoding='utf-8') as out:
            out.writelines(current_lines)
        print(f"已创建: {output_file}")
    print(f"完成! 共分割为 {file_count} 个文件")
if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("用法: python split_large_file.py <文件名> [行数]")
        sys.exit(1)
    input_file = sys.argv[1]
    lines_per_file = int(sys.argv[2]) if len(sys.argv) > 2 else 100000
    split_by_lines(input_file, lines_per_file)

按文件大小分割

#!/usr/bin/env python3
# split_by_size.py
import os
import sys
from pathlib import Path
def split_by_file_size(input_file, max_size_mb=100):
    """按文件大小分割(近似)"""
    input_path = Path(input_file)
    max_bytes = max_size_mb * 1024 * 1024
    base_name = input_path.stem
    output_dir = input_path.parent / f"{base_name}_split_by_size"
    output_dir.mkdir(exist_ok=True)
    file_count = 0
    current_size = 0
    current_lines = []
    with open(input_file, 'r', encoding='utf-8') as f:
        for line in f:
            current_lines.append(line)
            current_size += len(line.encode('utf-8'))
            if current_size >= max_bytes:
                file_count += 1
                output_file = output_dir / f"{base_name}_part_{file_count:04d}.txt"
                with open(output_file, 'w', encoding='utf-8') as out:
                    out.writelines(current_lines)
                print(f"已创建: {output_file} ({current_size/1024/1024:.2f}MB)")
                current_lines = []
                current_size = 0
    if current_lines:
        file_count += 1
        output_file = output_dir / f"{base_name}_part_{file_count:04d}.txt"
        with open(output_file, 'w', encoding='utf-8') as out:
            out.writelines(current_lines)
        print(f"已创建: {output_file}")
    print(f"完成! 共分割为 {file_count} 个文件")
if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("用法: python split_by_size.py <文件名> [最大MB]")
        sys.exit(1)
    input_file = sys.argv[1]
    max_size = int(sys.argv[2]) if len(sys.argv) > 2 else 100
    split_by_file_size(input_file, max_size)

智能分割(保持数据完整性)

#!/usr/bin/env python3
# smart_split.py
import os
import re
import sys
from pathlib import Path
def smart_split(input_file, max_lines=50000):
    """智能分割,避免在数据中间截断"""
    input_path = Path(input_file)
    base_name = input_path.stem
    output_dir = input_path.parent / f"{base_name}_smart_split"
    output_dir.mkdir(exist_ok=True)
    # 检测文件类型
    ext = input_path.suffix.lower()
    # CSV、JSON等格式的边界检测
    boundary_patterns = {
        '.csv': r'^[\d\w]',  # CSV通常每行以数据开始
        '.json': r'^[{[]?[\s]*["\d{]',  # JSON对象开始
        '.log': r'^\[\d{4}-\d{2}-\d{2}',  # 日志时间戳开头
    }
    # 默认按完整行分割
    pattern = boundary_patterns.get(ext, None)
    file_count = 0
    current_lines = []
    with open(input_file, 'r', encoding='utf-8') as f:
        for line_num, line in enumerate(f, 1):
            current_lines.append(line)
            if len(current_lines) >= max_lines:
                # 如果设置了边界模式,尝试在自然边界处分割
                if pattern:
                    # 从末尾查找边界
                    split_point = len(current_lines)
                    for i in range(len(current_lines) - 1, len(current_lines) // 2, -1):
                        if re.match(pattern, current_lines[i]):
                            split_point = i
                            break
                    if split_point > len(current_lines) // 2:
                        file_count += 1
                        output_lines = current_lines[:split_point]
                        current_lines = current_lines[split_point:]
                        output_file = output_dir / f"{base_name}_part_{file_count:04d}{ext}"
                        with open(output_file, 'w', encoding='utf-8') as out:
                            out.writelines(output_lines)
                        print(f"已创建: {output_file} ({len(output_lines)} 行)")
                else:
                    file_count += 1
                    output_file = output_dir / f"{base_name}_part_{file_count:04d}{ext}"
                    with open(output_file, 'w', encoding='utf-8') as out:
                        out.writelines(current_lines)
                    print(f"已创建: {output_file} ({len(current_lines)} 行)")
                    current_lines = []
    # 处理剩余行
    if current_lines:
        file_count += 1
        output_file = output_dir / f"{base_name}_part_{file_count:04d}{ext}"
        with open(output_file, 'w', encoding='utf-8') as out:
            out.writelines(current_lines)
        print(f"已创建: {output_file} ({len(current_lines)} 行)")
    print(f"完成! 共分割为 {file_count} 个文件")
    return file_count
if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("用法: python smart_split.py <文件名> [行数]")
        sys.exit(1)
    input_file = sys.argv[1]
    lines_per_file = int(sys.argv[2]) if len(sys.argv) > 2 else 50000
    smart_split(input_file, lines_per_file)

Shell脚本版本(Linux/Unix)

#!/bin/bash
# split_large_file.sh
# 检查参数
if [ $# -lt 1 ]; then
    echo "用法: $0 <文件名> [行数]"
    exit 1
fi
INPUT_FILE="$1"
LINES=${2:-100000}
# 检查文件是否存在
if [ ! -f "$INPUT_FILE" ]; then
    echo "错误: 文件 $INPUT_FILE 不存在"
    exit 1
fi
# 创建输出目录
BASE_NAME=$(basename "$INPUT_FILE" .txt)
OUTPUT_DIR="$(dirname "$INPUT_FILE")/${BASE_NAME}_split"
mkdir -p "$OUTPUT_DIR"
# 使用split命令分割
split -l "$LINES" \
    -d \
    -a 4 \
    --additional-suffix=.txt \
    "$INPUT_FILE" \
    "${OUTPUT_DIR}/${BASE_NAME}_part_"
echo "完成! 文件分割到 $OUTPUT_DIR 目录"

增强版:支持多种格式

#!/usr/bin/env python3
# universal_splitter.py
import os
import sys
import json
import csv
from pathlib import Path
from typing import List, Optional
class FileSplitter:
    def __init__(self, input_file: str):
        self.input_file = Path(input_file)
        if not self.input_file.exists():
            raise FileNotFoundError(f"文件 {input_file} 不存在")
        self.base_name = self.input_file.stem
        self.output_dir = self.input_file.parent / f"{self.base_name}_parts"
        self.output_dir.mkdir(exist_ok=True)
    def split_by_lines(self, lines_per_file: int = 100000) -> int:
        """按行数分割"""
        file_count = 0
        current_batch = []
        with open(self.input_file, 'r', encoding='utf-8') as f:
            for line in f:
                current_batch.append(line)
                if len(current_batch) >= lines_per_file:
                    file_count += 1
                    self._write_batch(current_batch, file_count)
                    current_batch = []
        if current_batch:
            file_count += 1
            self._write_batch(current_batch, file_count)
        return file_count
    def split_csv_with_header(self, lines_per_file: int = 100000) -> int:
        """分割CSV,保留表头"""
        file_count = 0
        header = None
        current_batch = []
        with open(self.input_file, 'r', encoding='utf-8') as f:
            reader = csv.reader(f)
            header = next(reader, None)
            for row in reader:
                current_batch.append(row)
                if len(current_batch) >= lines_per_file:
                    file_count += 1
                    self._write_csv_batch(header, current_batch, file_count)
                    current_batch = []
        if current_batch:
            file_count += 1
            self._write_csv_batch(header, current_batch, file_count)
        return file_count
    def _write_batch(self, lines: List[str], batch_num: int):
        """写入文本批次"""
        output_file = self.output_dir / f"{self.base_name}_part_{batch_num:04d}.txt"
        with open(output_file, 'w', encoding='utf-8') as f:
            f.writelines(lines)
        size_mb = output_file.stat().st_size / 1024 / 1024
        print(f"创建文件: {output_file} ({len(lines)} 行, {size_mb:.2f} MB)")
    def _write_csv_batch(self, header: Optional[List], rows: List[List], batch_num: int):
        """写入CSV批次"""
        output_file = self.output_dir / f"{self.base_name}_part_{batch_num:04d}.csv"
        with open(output_file, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            if header:
                writer.writerow(header)
            writer.writerows(rows)
        size_mb = output_file.stat().st_size / 1024 / 1024
        print(f"创建CSV: {output_file} ({len(rows)} 行, {size_mb:.2f} MB)")
def main():
    if len(sys.argv) < 2:
        print("用法: python universal_splitter.py <文件名> [行数] [格式]")
        print("格式: auto, csv, text (默认 auto)")
        sys.exit(1)
    input_file = sys.argv[1]
    lines_per_file = int(sys.argv[2]) if len(sys.argv) > 2 else 100000
    file_format = sys.argv[3] if len(sys.argv) > 3 else "auto"
    try:
        splitter = FileSplitter(input_file)
        # 自动检测格式
        if file_format == "auto":
            ext = Path(input_file).suffix.lower()
            if ext == '.csv':
                file_format = "csv"
            else:
                file_format = "text"
        # 执行分割
        if file_format == "csv":
            file_count = splitter.split_csv_with_header(lines_per_file)
        else:
            file_count = splitter.split_by_lines(lines_per_file)
        print(f"\n完成! 文件已分割为 {file_count} 个部分")
        print(f"输出目录: {splitter.output_dir}")
    except Exception as e:
        print(f"错误: {e}")
        sys.exit(1)
if __name__ == "__main__":
    main()

使用示例

# 基本用法
python split_large_file.py large_log.txt 50000
# 按大小分割
python split_by_size.py data.csv 200
# 智能分割(保持数据完整)
python smart_split.py logfile.json 30000
# Shell版本
./split_large_file.sh data.txt 100000
# 通用分割器
python universal_splitter.py large_dataset.csv 50000 csv

性能优化建议

  1. 大文件处理:使用 with 语句自动管理文件句柄
  2. 内存控制:分批读取,避免一次性加载整个文件
  3. 编码处理:统一使用 UTF-8 编码
  4. 错误处理:完善的异常捕获和用户提示
  5. 进度显示:添加进度条或日志输出

选择适合你需求的脚本,或者根据需要组合使用。

抱歉,评论功能暂时关闭!