实用脚本能批量高TFTP吗?

wen 实用脚本 64

本文目录导读:

实用脚本能批量高TFTP吗?

  1. Python脚本(推荐)
  2. 使用示例
  3. 使用前准备
  4. 注意事项

我可以帮您了解如何批量传输文件到TFTP服务器,以下是一个实用的Python脚本和一个批处理脚本,可以实现批量文件到TFTP服务器的高效传输。

Python脚本(推荐)

#!/usr/bin/env python3
"""
批量TFTP文件上传/下载脚本
支持多线程并发传输
"""
import os
import sys
import argparse
import threading
import time
from queue import Queue
from tftpy import TftpClient
class TftpBatchTransfer:
    def __init__(self, host, port=69, threads=4, retries=3):
        self.host = host
        self.port = port
        self.threads = threads
        self.retries = retries
        self.success_count = 0
        self.fail_count = 0
        self.lock = threading.Lock()
    def upload_file(self, local_file, remote_file):
        """上传单个文件"""
        client = TftpClient(self.host, self.port)
        for attempt in range(self.retries):
            try:
                client.upload(remote_file, local_file)
                with self.lock:
                    self.success_count += 1
                    print(f"[SUCCESS] 上传: {local_file} -> {remote_file}")
                return True
            except Exception as e:
                if attempt < self.retries - 1:
                    print(f"[RETRY] {attempt+1}/{self.retries}: {local_file} - {str(e)}")
                else:
                    with self.lock:
                        self.fail_count += 1
                        print(f"[FAIL] 上传失败: {local_file} - {str(e)}")
        return False
    def download_file(self, remote_file, local_file):
        """下载单个文件"""
        client = TftpClient(self.host, self.port)
        for attempt in range(self.retries):
            try:
                client.download(remote_file, local_file)
                with self.lock:
                    self.success_count += 1
                    print(f"[SUCCESS] 下载: {remote_file} -> {local_file}")
                return True
            except Exception as e:
                if attempt < self.retries - 1:
                    print(f"[RETRY] {attempt+1}/{self.retries}: {remote_file} - {str(e)}")
                else:
                    with self.lock:
                        self.fail_count += 1
                        print(f"[FAIL] 下载失败: {remote_file} - {str(e)}")
        return False
    def worker_upload(self, queue):
        """上传工作线程"""
        while not queue.empty():
            try:
                local_file, remote_file = queue.get_nowait()
                self.upload_file(local_file, remote_file)
            except queue.Empty:
                break
    def batch_upload(self, file_list, local_dir=".", remote_dir=""):
        """批量上传文件
        file_list: 文件列表(相对于local_dir)
        """
        queue = Queue()
        for filename in file_list:
            local_path = os.path.join(local_dir, filename)
            remote_path = os.path.join(remote_dir, filename).replace('\\', '/')
            if os.path.isfile(local_path):
                queue.put((local_path, remote_path))
        threads = []
        for _ in range(min(self.threads, queue.qsize())):
            t = threading.Thread(target=self.worker_upload, args=(queue,))
            t.start()
            threads.append(t)
        for t in threads:
            t.join()
    def batch_download(self, file_list, remote_dir=".", local_dir=""):
        """批量下载文件"""
        os.makedirs(local_dir, exist_ok=True)
        queue = Queue()
        for filename in file_list:
            remote_path = os.path.join(remote_dir, filename).replace('\\', '/')
            local_path = os.path.join(local_dir, filename)
            queue.put((remote_path, local_path))
        threads = []
        for _ in range(min(self.threads, queue.qsize())):
            t = threading.Thread(target=self.worker_download, args=(queue,))
            t.start()
            threads.append(t)
        for t in threads:
            t.join()
    def worker_download(self, queue):
        """下载工作线程"""
        while not queue.empty():
            try:
                remote_file, local_file = queue.get_nowait()
                self.download_file(remote_file, local_file)
            except queue.Empty:
                break
def parse_args():
    parser = argparse.ArgumentParser(description='批量TFTP文件传输工具')
    parser.add_argument('host', help='TFTP服务器地址')
    parser.add_argument('-p', '--port', type=int, default=69, help='TFTP端口号 (默认: 69)')
    parser.add_argument('-t', '--threads', type=int, default=4, help='并发线程数 (默认: 4)')
    parser.add_argument('-r', '--retries', type=int, default=3, help='重试次数 (默认: 3)')
    # 操作模式
    mode = parser.add_mutually_exclusive_group(required=True)
    mode.add_argument('-u', '--upload', action='store_true', help='上传模式')
    mode.add_argument('-d', '--download', action='store_true', help='下载模式')
    # 文件参数
    parser.add_argument('-f', '--files', nargs='+', help='文件列表 (空格分隔)')
    parser.add_argument('--file-list', help='包含文件名的文件 (每行一个文件名)')
    parser.add_argument('--local-dir', default='.', help='本地目录 (默认: 当前目录)')
    parser.add_argument('--remote-dir', default='.', help='远程目录 (默认: 根目录)')
    parser.add_argument('--extension', help='文件扩展名过滤 ( .bin .cfg)')
    return parser.parse_args()
def get_file_list(args):
    """获取要传输的文件列表"""
    files = []
    if args.files:
        files.extend(args.files)
    if args.file_list:
        if not os.path.exists(args.file_list):
            print(f"错误: 文件列表 {args.file_list} 不存在")
            sys.exit(1)
        with open(args.file_list, 'r') as f:
            files.extend([line.strip() for line in f if line.strip()])
    if not files:
        # 如果没有指定文件,扫描目录
        if args.extension:
            files = [f for f in os.listdir(args.local_dir) 
                    if f.endswith(args.extension) and os.path.isfile(os.path.join(args.local_dir, f))]
        else:
            files = [f for f in os.listdir(args.local_dir) 
                    if os.path.isfile(os.path.join(args.local_dir, f))]
    if not files:
        print("错误: 没有找到要传输的文件")
        sys.exit(1)
    return files
def main():
    args = parse_args()
    files = get_file_list(args)
    print(f"TFTP服务器: {args.host}:{args.port}")
    print(f"操作模式: {'上传' if args.upload else '下载'}")
    print(f"文件数量: {len(files)}")
    print(f"并发线程: {args.threads}")
    print(f"重试次数: {args.retries}")
    print("-" * 50)
    transfer = TftpBatchTransfer(args.host, args.port, args.threads, args.retries)
    start_time = time.time()
    if args.upload:
        transfer.batch_upload(files, args.local_dir, args.remote_dir)
    else:
        transfer.batch_download(files, args.remote_dir, args.local_dir)
    elapsed_time = time.time() - start_time
    print("-" * 50)
    print(f"完成! 成功: {transfer.success_count}, 失败: {transfer.fail_count}")
    print(f"耗时: {elapsed_time:.2f}秒")
if __name__ == "__main__":
    # 确保安装了所需库: pip install tftpy
    try:
        from tftpy import TftpClient
    except ImportError:
        print("请先安装tftpy库: pip install tftpy")
        sys.exit(1)
    main()

使用示例

批量上传

# 上传当前目录所有.bin文件
python tftp_batch.py 192.168.1.100 -u --extension .bin
# 上传指定文件
python tftp_batch.py 192.168.1.100 -u -f file1.bin file2.cfg file3.img
# 使用文件列表
python tftp_batch.py 192.168.1.100 -u --file-list files.txt
# 指定并发线程数
python tftp_batch.py 192.168.1.100 -u --extension .bin -t 8

批量下载

# 下载指定文件
python tftp_batch.py 192.168.1.100 -d -f config.cfg startup.cfg
# 下载到指定目录
python tftp_batch.py 192.168.1.100 -d -f config.cfg --local-dir ./downloads

文件列表格式 (files.txt)

router-config.cfg
firmware.bin
backup.img
image-v2.bin

使用前准备

  1. 安装依赖库

    pip install tftpy
  2. Windows批处理脚本 (简单版本):

@echo off
REM 批量TFTP上传批处理脚本
set TFTP_SERVER=192.168.1.100
set LOCAL_DIR=C:\firmware
set EXTENSION=*.bin
echo 开始批量上传文件到TFTP服务器 %TFTP_SERVER%
echo 源目录: %LOCAL_DIR%
echo.
for %%f in ("%LOCAL_DIR%\%EXTENSION%") do (
    echo 上传: %%~nxf
    tftp %TFTP_SERVER% put "%%f" "%%~nxf"
    if errorlevel 1 (
        echo [失败] %%~nxf
    ) else (
        echo [成功] %%~nxf
    )
)
echo.
echo 批处理完成!
pause

注意事项

  1. TFTP服务器需要开启,且支持写操作(如果是上传)
  2. 文件权限:确保目标目录有写入权限
  3. 网络稳定性:TFTP基于UDP,大文件传输可能不稳定
  4. 文件大小限制:标准TFTP最大支持32MB传输
  5. 并行数建议:不要超过10个并发,以免网络拥塞

这个脚本应该能满足您的批量TFTP传输需求,需要我针对您的具体场景进行修改吗?

抱歉,评论功能暂时关闭!