本文目录导读:

我来为你详细介绍Python合并分片文件的几种常见方法。
基础文件合并
按顺序合并二进制文件
def merge_files_chunked(input_files, output_file, chunk_size=8192):
"""
合并分片文件
:param input_files: 分片文件列表(需按顺序)
:param output_file: 输出文件路径
:param chunk_size: 每次读取的块大小
"""
with open(output_file, 'wb') as outfile:
for file_path in input_files:
with open(file_path, 'rb') as infile:
while True:
chunk = infile.read(chunk_size)
if not chunk:
break
outfile.write(chunk)
print(f"文件合并完成: {output_file}")
# 使用示例
file_parts = ['file.part1', 'file.part2', 'file.part3']
merge_files_chunked(file_parts, 'merged_file.mp4')
处理带有编号的文件
import os
import re
def merge_numbered_files(directory, output_file, pattern=r'part_(\d+)\.*'):
"""
合并编号分片文件
:param directory: 分片文件所在目录
:param output_file: 输出文件路径
:param pattern: 文件名匹配模式
"""
# 获取并排序所有分片文件
files = []
for filename in os.listdir(directory):
match = re.search(pattern, filename)
if match:
part_num = int(match.group(1))
files.append((part_num, os.path.join(directory, filename)))
# 按编号排序
files.sort(key=lambda x: x[0])
# 合并文件
with open(output_file, 'wb') as outfile:
for _, file_path in files:
with open(file_path, 'rb') as infile:
outfile.write(infile.read())
print(f"已合并: {os.path.basename(file_path)}")
print(f"合并完成!输出文件: {output_file}")
# 使用示例
merge_numbered_files('./chunks', 'merged_result.zip')
支持断点续传的合并
import hashlib
def merge_with_progress(input_files, output_file):
"""
带进度显示的合并
"""
total_size = 0
file_sizes = []
# 计算总大小
for file_path in input_files:
size = os.path.getsize(file_path)
file_sizes.append(size)
total_size += size
# 开始合并
merged_size = 0
with open(output_file, 'wb') as outfile:
for i, (file_path, size) in enumerate(zip(input_files, file_sizes)):
print(f"合并分片 {i+1}/{len(input_files)}: {os.path.basename(file_path)}")
with open(file_path, 'rb') as infile:
while True:
chunk = infile.read(8192)
if not chunk:
break
outfile.write(chunk)
merged_size += len(chunk)
# 显示进度
progress = (merged_size / total_size) * 100
print(f"\r进度: {progress:.1f}%", end='')
print() # 换行
print(f"\n合并完成!文件大小: {merged_size / 1024 / 1024:.2f} MB")
def merge_with_checksum(input_files, output_file, checksum_file=None):
"""
合并并校验文件完整性
"""
sha256_hash = hashlib.sha256()
with open(output_file, 'wb') as outfile:
for file_path in input_files:
with open(file_path, 'rb') as infile:
for chunk in iter(lambda: infile.read(4096), b''):
outfile.write(chunk)
sha256_hash.update(chunk)
final_hash = sha256_hash.hexdigest()
print(f"文件合并完成,SHA256: {final_hash}")
# 验证校验和
if checksum_file:
with open(checksum_file, 'r') as f:
expected_hash = f.read().strip()
if final_hash == expected_hash:
print("✓ 校验通过,文件完整")
else:
print("✗ 校验失败,文件可能损坏")
return final_hash
# 使用示例
files = ['video.part1', 'video.part2', 'video.part3']
merge_with_progress(files, 'complete_video.mp4')
merge_with_checksum(files, 'video.mp4', 'checksum.txt')
智能分片合并器
import json
from typing import List, Optional
class SmartFileMerger:
"""智能文件合并器"""
def __init__(self, manifest_file: Optional[str] = None):
self.manifest = {}
if manifest_file:
self.load_manifest(manifest_file)
def load_manifest(self, manifest_file: str):
"""加载清单文件"""
with open(manifest_file, 'r') as f:
self.manifest = json.load(f)
def analyze_files(self, directory: str, pattern: str = "*.part*"):
"""分析分片文件"""
import glob
files = glob.glob(os.path.join(directory, pattern))
# 提取信息
self.manifest['directory'] = directory
self.manifest['files'] = []
self.manifest['total_size'] = 0
for file_path in sorted(files):
size = os.path.getsize(file_path)
self.manifest['files'].append({
'name': os.path.basename(file_path),
'path': file_path,
'size': size
})
self.manifest['total_size'] += size
# 保存清单
self.save_manifest(os.path.join(directory, 'manifest.json'))
return self.manifest
def save_manifest(self, file_path: str):
"""保存清单文件"""
with open(file_path, 'w') as f:
json.dump(self.manifest, f, indent=2)
def merge(self, output_file: str, verify: bool = True):
"""合并所有分片"""
if not self.manifest.get('files'):
raise ValueError("没有要合并的文件")
total_size = self.manifest['total_size']
merged_size = 0
print(f"开始合并 {len(self.manifest['files'])} 个分片文件")
print(f"总大小: {total_size / 1024 / 1024:.2f} MB")
# 创建圆环进度条
from itertools import cycle
spinner = cycle(['-', '/', '|', '\\'])
sha256_hash = hashlib.sha256()
with open(output_file, 'wb') as outfile:
for i, file_info in enumerate(self.manifest['files']):
file_path = file_info['path']
file_size = file_info['size']
print(f"\n合并分片 {i+1}/{len(self.manifest['files'])}: "
f"{file_info['name']} ({file_size/1024/1024:.2f} MB)")
with open(file_path, 'rb') as infile:
while True:
chunk = infile.read(8192)
if not chunk:
break
outfile.write(chunk)
merged_size += len(chunk)
sha256_hash.update(chunk)
# 更新进度
progress = (merged_size / total_size) * 100
print(f"\r{next(spinner)} 进度: {progress:.2f}% "
f"({merged_size/1024/1024:.2f} MB)", end='')
print(f"\n\n合并完成!输出文件: {output_file}")
print(f"文件大小: {os.path.getsize(output_file) / 1024 / 1024:.2f} MB")
if verify:
self._verify_checksum(output_file, sha256_hash.hexdigest())
def _verify_checksum(self, file_path: str, hash_value: str):
"""校验文件完整性"""
print(f"SHA256: {hash_value}")
if 'checksum' in self.manifest:
if hash_value == self.manifest['checksum']:
print("✓ 文件完整性验证通过")
else:
print("✗ 文件完整性验证失败!")
# 使用示例
merger = SmartFileMerger()
# 分析分片文件
merger.analyze_files('./downloads', 'video_*.part*')
# 执行合并
merger.merge('final_video.mp4', verify=True)
处理特殊格式文件
def merge_text_files_with_encoding(input_files, output_file, encoding='utf-8'):
"""
合并文本文件(处理编码)
"""
with open(output_file, 'w', encoding=encoding) as outfile:
for file_path in input_files:
with open(file_path, 'r', encoding=encoding) as infile:
outfile.write(infile.read())
outfile.write('\n') # 添加换行符
def merge_csv_files(input_files, output_file, exclude_header=False):
"""
合并CSV文件
"""
import csv
header_written = False
with open(output_file, 'w', newline='', encoding='utf-8') as outfile:
writer = csv.writer(outfile)
for i, file_path in enumerate(input_files):
with open(file_path, 'r', encoding='utf-8') as infile:
reader = csv.reader(infile)
for j, row in enumerate(reader):
# 跳过标题行(除了第一个文件)
if exclude_header and j == 0 and i > 0:
continue
writer.writerow(row)
# 使用示例
merge_csv_files(['data_part1.csv', 'data_part2.csv'],
'merged_data.csv',
exclude_header=True)
命令行工具版本
#!/usr/bin/env python3
import argparse
import sys
def main():
parser = argparse.ArgumentParser(description='文件分片合并工具')
parser.add_argument('output', help='输出文件路径')
parser.add_argument('input', nargs='+', help='输入分片文件列表')
parser.add_argument('--chunk-size', type=int, default=8192,
help='读取缓冲区大小(字节)')
parser.add_argument('--verify', action='store_true',
help='计算并显示SHA256校验和')
args = parser.parse_args()
merger = SmartFileMerger()
merger.manifest['files'] = [
{'name': f, 'path': f, 'size': os.path.getsize(f)}
for f in sorted(args.input)
]
merger.manifest['total_size'] = sum(
f['size'] for f in merger.manifest['files']
)
merger.merge(args.output, args.verify)
if __name__ == '__main__':
main()
# 命令行使用
# python file_merger.py output.mp4 part1 part2 part3 --verify
关键要点
- 文件顺序:确保按正确顺序合并
- 缓冲区大小:根据文件大小选择合适的缓冲区
- 内存管理:分批读取,避免内存溢出
- 完整性验证:使用校验和确保文件完整
- 错误处理:添加适当的异常处理
选择哪种方法取决于你的具体需求:文件大小、分片方式、是否需要续传等。