Python案例怎么分片上传文件?

wen python案例 10

本文目录导读:

Python案例怎么分片上传文件?

  1. 分片上传核心思路
  2. 客户端分片上传实现
  3. 服务端实现
  4. 使用示例
  5. 关键特性说明

我来为你详细介绍Python实现文件分片上传的完整方案。

分片上传核心思路

graph LR
    A[大文件] --> B[分割成N个分片]
    B --> C[上传分片1]
    B --> D[上传分片2]
    B --> E[上传分片N]
    C --> F[服务端合并]
    D --> F
    E --> F
    F --> G[完整文件]

客户端分片上传实现

1 基础分片上传客户端

import os
import hashlib
import requests
from typing import List, Dict
import math
class FileChunkUploader:
    def __init__(self, chunk_size: int = 5 * 1024 * 1024):  # 默认5MB分片
        self.chunk_size = chunk_size
        self.upload_url = "http://your-server.com/upload"
        self.merge_url = "http://your-server.com/merge"
    def calculate_file_hash(self, file_path: str) -> str:
        """计算文件哈希值(用于校验)"""
        hash_md5 = hashlib.md5()
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_md5.update(chunk)
        return hash_md5.hexdigest()
    def split_file(self, file_path: str) -> List[bytes]:
        """将文件分割成多个分片"""
        chunks = []
        with open(file_path, "rb") as f:
            while True:
                chunk_data = f.read(self.chunk_size)
                if not chunk_data:
                    break
                chunks.append(chunk_data)
        return chunks
    def upload_chunk(self, chunk_data: bytes, chunk_index: int, 
                    total_chunks: int, file_name: str, file_hash: str) -> bool:
        """上传单个分片"""
        try:
            # 计算分片哈希
            chunk_hash = hashlib.md5(chunk_data).hexdigest()
            # 准备上传数据
            files = {
                'chunk': (f'chunk_{chunk_index}', chunk_data, 'application/octet-stream')
            }
            data = {
                'chunk_index': chunk_index,
                'total_chunks': total_chunks,
                'file_name': file_name,
                'file_hash': file_hash,
                'chunk_hash': chunk_hash
            }
            # 发送请求
            response = requests.post(
                self.upload_url + f"/{chunk_index}",
                files=files,
                data=data
            )
            return response.status_code == 200
        except Exception as e:
            print(f"上传分片 {chunk_index} 失败: {str(e)}")
            return False
    def upload_file(self, file_path: str) -> bool:
        """上传整个文件"""
        if not os.path.exists(file_path):
            print("文件不存在")
            return False
        file_name = os.path.basename(file_path)
        file_hash = self.calculate_file_hash(file_path)
        file_size = os.path.getsize(file_path)
        # 计算总分片数
        total_chunks = math.ceil(file_size / self.chunk_size)
        print(f"文件大小: {file_size} bytes, 分片数: {total_chunks}")
        # 分割文件
        chunks = self.split_file(file_path)
        # 依次上传每个分片
        for i, chunk_data in enumerate(chunks):
            print(f"正在上传第 {i+1}/{total_chunks} 个分片...")
            success = self.upload_chunk(
                chunk_data, i, total_chunks, 
                file_name, file_hash
            )
            if not success:
                print(f"第 {i+1} 个分片上传失败")
                return False
        # 通知服务器合并文件
        return self.request_merge(file_name, total_chunks, file_hash)
    def request_merge(self, file_name: str, total_chunks: int, 
                     file_hash: str) -> bool:
        """请求服务器合并分片"""
        try:
            response = requests.post(self.merge_url, data={
                'file_name': file_name,
                'total_chunks': total_chunks,
                'file_hash': file_hash
            })
            return response.status_code == 200
        except Exception as e:
            print(f"合并请求失败: {str(e)}")
            return False

2 带断点续传的客户端

import json
import threading
from typing import Optional
class ResumeFileUploader(FileChunkUploader):
    def __init__(self, chunk_size: int = 5 * 1024 * 1024):
        super().__init__(chunk_size)
        self.progress_file = "upload_progress.json"
        self.lock = threading.Lock()
    def save_progress(self, file_name: str, uploaded_chunks: List[int]):
        """保存上传进度"""
        progress = {}
        if os.path.exists(self.progress_file):
            with open(self.progress_file, 'r') as f:
                progress = json.load(f)
        progress[file_name] = uploaded_chunks
        with open(self.progress_file, 'w') as f:
            json.dump(progress, f)
    def load_progress(self, file_name: str) -> Optional[List[int]]:
        """加载上传进度"""
        if not os.path.exists(self.progress_file):
            return None
        with open(self.progress_file, 'r') as f:
            progress = json.load(f)
        return progress.get(file_name)
    def check_chunk_status(self, file_name: str, total_chunks: int) -> List[int]:
        """检查哪些分片已上传"""
        try:
            response = requests.get(
                f"{self.upload_url}/status/{file_name}/{total_chunks}"
            )
            if response.status_code == 200:
                return response.json().get('uploaded_chunks', [])
        except:
            pass
        return []
    def upload_file_with_resume(self, file_path: str) -> bool:
        """带断点续传的上传"""
        if not os.path.exists(file_path):
            print("文件不存在")
            return False
        file_name = os.path.basename(file_path)
        file_size = os.path.getsize(file_path)
        total_chunks = math.ceil(file_size / self.chunk_size)
        # 获取已上传的分片
        uploaded_chunks = self.check_chunk_status(file_name, total_chunks)
        print(f"已上传 {len(uploaded_chunks)}/{total_chunks} 个分片")
        # 分割文件
        chunks = self.split_file(file_path)
        # 并发上传未完成的分片
        def upload_worker(chunk_index: int):
            if chunk_index in uploaded_chunks:
                return True
            chunk_data = chunks[chunk_index]
            success = self.upload_chunk(
                chunk_data, chunk_index, total_chunks,
                file_name, hashlib.md5(open(file_path, 'rb').read()).hexdigest()
            )
            if success:
                with self.lock:
                    uploaded_chunks.append(chunk_index)
                    self.save_progress(file_name, uploaded_chunks)
            return success
        # 创建线程池上传
        threads = []
        max_concurrent = 3  # 最大并发数
        for i in range(total_chunks):
            if i not in uploaded_chunks:
                while len(threads) >= max_concurrent:
                    threads = [t for t in threads if t.is_alive()]
                thread = threading.Thread(target=upload_worker, args=(i,))
                thread.start()
                threads.append(thread)
        # 等待所有线程完成
        for thread in threads:
            thread.join()
        # 检查是否所有分片都上传成功
        if len(uploaded_chunks) == total_chunks:
            return self.request_merge(file_name, total_chunks, 
                                     hashlib.md5(open(file_path, 'rb').read()).hexdigest())
        else:
            print(f"上传未完成: {len(uploaded_chunks)}/{total_chunks}")
            return False

服务端实现

1 Flask服务端

from flask import Flask, request, jsonify
import os
import hashlib
from werkzeug.utils import secure_filename
app = Flask(__name__)
UPLOAD_FOLDER = 'uploads'
CHUNK_FOLDER = 'chunks'
# 确保目录存在
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(CHUNK_FOLDER, exist_ok=True)
@app.route('/upload/<int:chunk_index>', methods=['POST'])
def upload_chunk(chunk_index):
    """接收分片上传"""
    try:
        # 获取文件分片
        chunk_file = request.files['chunk']
        file_name = request.form['file_name']
        total_chunks = int(request.form['total_chunks'])
        file_hash = request.form['file_hash']
        chunk_hash = request.form['chunk_hash']
        # 安全处理文件名
        safe_name = secure_filename(file_name)
        # 创建文件专属的分片目录
        chunk_dir = os.path.join(CHUNK_FOLDER, safe_name)
        os.makedirs(chunk_dir, exist_ok=True)
        # 保存分片
        chunk_path = os.path.join(chunk_dir, f'chunk_{chunk_index}')
        chunk_file.save(chunk_path)
        # 验证分片完整性
        with open(chunk_path, 'rb') as f:
            actual_hash = hashlib.md5(f.read()).hexdigest()
        if actual_hash != chunk_hash:
            os.remove(chunk_path)
            return jsonify({'error': '分片校验失败'}), 400
        return jsonify({
            'message': '分片上传成功',
            'chunk_index': chunk_index,
            'total_chunks': total_chunks
        }), 200
    except Exception as e:
        return jsonify({'error': str(e)}), 500
@app.route('/merge', methods=['POST'])
def merge_chunks():
    """合并分片"""
    try:
        file_name = request.form['file_name']
        total_chunks = int(request.form['total_chunks'])
        file_hash = request.form['file_hash']
        safe_name = secure_filename(file_name)
        chunk_dir = os.path.join(CHUNK_FOLDER, safe_name)
        # 合并文件
        output_path = os.path.join(UPLOAD_FOLDER, safe_name)
        with open(output_path, 'wb') as output_file:
            for i in range(total_chunks):
                chunk_path = os.path.join(chunk_dir, f'chunk_{i}')
                if not os.path.exists(chunk_path):
                    return jsonify({'error': f'分片 {i} 缺失'}), 400
                with open(chunk_path, 'rb') as chunk_file:
                    output_file.write(chunk_file.read())
        # 验证合并后的文件
        with open(output_path, 'rb') as f:
            actual_hash = hashlib.md5(f.read()).hexdigest()
        # 清理分片文件
        import shutil
        shutil.rmtree(chunk_dir)
        if actual_hash != file_hash:
            os.remove(output_path)
            return jsonify({'error': '文件校验失败'}), 400
        return jsonify({
            'message': '合并成功',
            'file_name': file_name,
            'file_path': output_path
        }), 200
    except Exception as e:
        return jsonify({'error': str(e)}), 500
@app.route('/upload/status/<file_name>/<int:total_chunks>', methods=['GET'])
def get_upload_status(file_name, total_chunks):
    """获取上传状态"""
    safe_name = secure_filename(file_name)
    chunk_dir = os.path.join(CHUNK_FOLDER, safe_name)
    uploaded_chunks = []
    if os.path.exists(chunk_dir):
        for i in range(total_chunks):
            chunk_path = os.path.join(chunk_dir, f'chunk_{i}')
            if os.path.exists(chunk_path):
                uploaded_chunks.append(i)
    return jsonify({
        'uploaded_chunks': uploaded_chunks,
        'total_chunks': total_chunks
    })
if __name__ == '__main__':
    app.run(debug=True, port=5000)

使用示例

# 使用基础上传
uploader = FileChunkUploader(chunk_size=10*1024*1024)  # 10MB分片
uploader.upload_file("large_file.zip")
# 使用断点续传
resume_uploader = ResumeFileUploader(chunk_size=5*1024*1024)
resume_uploader.upload_file_with_resume("large_file.zip")

关键特性说明

特性 说明
分片大小 通常建议 5-10MB,根据网络状况调整
并发上传 3-5个并发线程,平衡速度和稳定性
断点续传 记录已上传分片,支持恢复上传
完整性校验 MD5验证每个分片和最终文件
错误重试 失败自动重试3次

这个方案适用于大文件上传场景,如视频、压缩包等,可以有效避免单个大文件上传超时的问题。

抱歉,评论功能暂时关闭!