本文目录导读:

我来为你详细介绍Python实现文件分片上传的完整方案。
分片上传核心思路
graph LR
A[大文件] --> B[分割成N个分片]
B --> C[上传分片1]
B --> D[上传分片2]
B --> E[上传分片N]
C --> F[服务端合并]
D --> F
E --> F
F --> G[完整文件]
客户端分片上传实现
1 基础分片上传客户端
import os
import hashlib
import requests
from typing import List, Dict
import math
class FileChunkUploader:
def __init__(self, chunk_size: int = 5 * 1024 * 1024): # 默认5MB分片
self.chunk_size = chunk_size
self.upload_url = "http://your-server.com/upload"
self.merge_url = "http://your-server.com/merge"
def calculate_file_hash(self, file_path: str) -> str:
"""计算文件哈希值(用于校验)"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def split_file(self, file_path: str) -> List[bytes]:
"""将文件分割成多个分片"""
chunks = []
with open(file_path, "rb") as f:
while True:
chunk_data = f.read(self.chunk_size)
if not chunk_data:
break
chunks.append(chunk_data)
return chunks
def upload_chunk(self, chunk_data: bytes, chunk_index: int,
total_chunks: int, file_name: str, file_hash: str) -> bool:
"""上传单个分片"""
try:
# 计算分片哈希
chunk_hash = hashlib.md5(chunk_data).hexdigest()
# 准备上传数据
files = {
'chunk': (f'chunk_{chunk_index}', chunk_data, 'application/octet-stream')
}
data = {
'chunk_index': chunk_index,
'total_chunks': total_chunks,
'file_name': file_name,
'file_hash': file_hash,
'chunk_hash': chunk_hash
}
# 发送请求
response = requests.post(
self.upload_url + f"/{chunk_index}",
files=files,
data=data
)
return response.status_code == 200
except Exception as e:
print(f"上传分片 {chunk_index} 失败: {str(e)}")
return False
def upload_file(self, file_path: str) -> bool:
"""上传整个文件"""
if not os.path.exists(file_path):
print("文件不存在")
return False
file_name = os.path.basename(file_path)
file_hash = self.calculate_file_hash(file_path)
file_size = os.path.getsize(file_path)
# 计算总分片数
total_chunks = math.ceil(file_size / self.chunk_size)
print(f"文件大小: {file_size} bytes, 分片数: {total_chunks}")
# 分割文件
chunks = self.split_file(file_path)
# 依次上传每个分片
for i, chunk_data in enumerate(chunks):
print(f"正在上传第 {i+1}/{total_chunks} 个分片...")
success = self.upload_chunk(
chunk_data, i, total_chunks,
file_name, file_hash
)
if not success:
print(f"第 {i+1} 个分片上传失败")
return False
# 通知服务器合并文件
return self.request_merge(file_name, total_chunks, file_hash)
def request_merge(self, file_name: str, total_chunks: int,
file_hash: str) -> bool:
"""请求服务器合并分片"""
try:
response = requests.post(self.merge_url, data={
'file_name': file_name,
'total_chunks': total_chunks,
'file_hash': file_hash
})
return response.status_code == 200
except Exception as e:
print(f"合并请求失败: {str(e)}")
return False
2 带断点续传的客户端
import json
import threading
from typing import Optional
class ResumeFileUploader(FileChunkUploader):
def __init__(self, chunk_size: int = 5 * 1024 * 1024):
super().__init__(chunk_size)
self.progress_file = "upload_progress.json"
self.lock = threading.Lock()
def save_progress(self, file_name: str, uploaded_chunks: List[int]):
"""保存上传进度"""
progress = {}
if os.path.exists(self.progress_file):
with open(self.progress_file, 'r') as f:
progress = json.load(f)
progress[file_name] = uploaded_chunks
with open(self.progress_file, 'w') as f:
json.dump(progress, f)
def load_progress(self, file_name: str) -> Optional[List[int]]:
"""加载上传进度"""
if not os.path.exists(self.progress_file):
return None
with open(self.progress_file, 'r') as f:
progress = json.load(f)
return progress.get(file_name)
def check_chunk_status(self, file_name: str, total_chunks: int) -> List[int]:
"""检查哪些分片已上传"""
try:
response = requests.get(
f"{self.upload_url}/status/{file_name}/{total_chunks}"
)
if response.status_code == 200:
return response.json().get('uploaded_chunks', [])
except:
pass
return []
def upload_file_with_resume(self, file_path: str) -> bool:
"""带断点续传的上传"""
if not os.path.exists(file_path):
print("文件不存在")
return False
file_name = os.path.basename(file_path)
file_size = os.path.getsize(file_path)
total_chunks = math.ceil(file_size / self.chunk_size)
# 获取已上传的分片
uploaded_chunks = self.check_chunk_status(file_name, total_chunks)
print(f"已上传 {len(uploaded_chunks)}/{total_chunks} 个分片")
# 分割文件
chunks = self.split_file(file_path)
# 并发上传未完成的分片
def upload_worker(chunk_index: int):
if chunk_index in uploaded_chunks:
return True
chunk_data = chunks[chunk_index]
success = self.upload_chunk(
chunk_data, chunk_index, total_chunks,
file_name, hashlib.md5(open(file_path, 'rb').read()).hexdigest()
)
if success:
with self.lock:
uploaded_chunks.append(chunk_index)
self.save_progress(file_name, uploaded_chunks)
return success
# 创建线程池上传
threads = []
max_concurrent = 3 # 最大并发数
for i in range(total_chunks):
if i not in uploaded_chunks:
while len(threads) >= max_concurrent:
threads = [t for t in threads if t.is_alive()]
thread = threading.Thread(target=upload_worker, args=(i,))
thread.start()
threads.append(thread)
# 等待所有线程完成
for thread in threads:
thread.join()
# 检查是否所有分片都上传成功
if len(uploaded_chunks) == total_chunks:
return self.request_merge(file_name, total_chunks,
hashlib.md5(open(file_path, 'rb').read()).hexdigest())
else:
print(f"上传未完成: {len(uploaded_chunks)}/{total_chunks}")
return False
服务端实现
1 Flask服务端
from flask import Flask, request, jsonify
import os
import hashlib
from werkzeug.utils import secure_filename
app = Flask(__name__)
UPLOAD_FOLDER = 'uploads'
CHUNK_FOLDER = 'chunks'
# 确保目录存在
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(CHUNK_FOLDER, exist_ok=True)
@app.route('/upload/<int:chunk_index>', methods=['POST'])
def upload_chunk(chunk_index):
"""接收分片上传"""
try:
# 获取文件分片
chunk_file = request.files['chunk']
file_name = request.form['file_name']
total_chunks = int(request.form['total_chunks'])
file_hash = request.form['file_hash']
chunk_hash = request.form['chunk_hash']
# 安全处理文件名
safe_name = secure_filename(file_name)
# 创建文件专属的分片目录
chunk_dir = os.path.join(CHUNK_FOLDER, safe_name)
os.makedirs(chunk_dir, exist_ok=True)
# 保存分片
chunk_path = os.path.join(chunk_dir, f'chunk_{chunk_index}')
chunk_file.save(chunk_path)
# 验证分片完整性
with open(chunk_path, 'rb') as f:
actual_hash = hashlib.md5(f.read()).hexdigest()
if actual_hash != chunk_hash:
os.remove(chunk_path)
return jsonify({'error': '分片校验失败'}), 400
return jsonify({
'message': '分片上传成功',
'chunk_index': chunk_index,
'total_chunks': total_chunks
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/merge', methods=['POST'])
def merge_chunks():
"""合并分片"""
try:
file_name = request.form['file_name']
total_chunks = int(request.form['total_chunks'])
file_hash = request.form['file_hash']
safe_name = secure_filename(file_name)
chunk_dir = os.path.join(CHUNK_FOLDER, safe_name)
# 合并文件
output_path = os.path.join(UPLOAD_FOLDER, safe_name)
with open(output_path, 'wb') as output_file:
for i in range(total_chunks):
chunk_path = os.path.join(chunk_dir, f'chunk_{i}')
if not os.path.exists(chunk_path):
return jsonify({'error': f'分片 {i} 缺失'}), 400
with open(chunk_path, 'rb') as chunk_file:
output_file.write(chunk_file.read())
# 验证合并后的文件
with open(output_path, 'rb') as f:
actual_hash = hashlib.md5(f.read()).hexdigest()
# 清理分片文件
import shutil
shutil.rmtree(chunk_dir)
if actual_hash != file_hash:
os.remove(output_path)
return jsonify({'error': '文件校验失败'}), 400
return jsonify({
'message': '合并成功',
'file_name': file_name,
'file_path': output_path
}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/upload/status/<file_name>/<int:total_chunks>', methods=['GET'])
def get_upload_status(file_name, total_chunks):
"""获取上传状态"""
safe_name = secure_filename(file_name)
chunk_dir = os.path.join(CHUNK_FOLDER, safe_name)
uploaded_chunks = []
if os.path.exists(chunk_dir):
for i in range(total_chunks):
chunk_path = os.path.join(chunk_dir, f'chunk_{i}')
if os.path.exists(chunk_path):
uploaded_chunks.append(i)
return jsonify({
'uploaded_chunks': uploaded_chunks,
'total_chunks': total_chunks
})
if __name__ == '__main__':
app.run(debug=True, port=5000)
使用示例
# 使用基础上传
uploader = FileChunkUploader(chunk_size=10*1024*1024) # 10MB分片
uploader.upload_file("large_file.zip")
# 使用断点续传
resume_uploader = ResumeFileUploader(chunk_size=5*1024*1024)
resume_uploader.upload_file_with_resume("large_file.zip")
关键特性说明
| 特性 | 说明 |
|---|---|
| 分片大小 | 通常建议 5-10MB,根据网络状况调整 |
| 并发上传 | 3-5个并发线程,平衡速度和稳定性 |
| 断点续传 | 记录已上传分片,支持恢复上传 |
| 完整性校验 | MD5验证每个分片和最终文件 |
| 错误重试 | 失败自动重试3次 |
这个方案适用于大文件上传场景,如视频、压缩包等,可以有效避免单个大文件上传超时的问题。