本文目录导读:

我可以帮您了解如何批量传输文件到TFTP服务器,以下是一个实用的Python脚本和一个批处理脚本,可以实现批量文件到TFTP服务器的高效传输。
Python脚本(推荐)
#!/usr/bin/env python3
"""
批量TFTP文件上传/下载脚本
支持多线程并发传输
"""
import os
import sys
import argparse
import threading
import time
from queue import Queue
from tftpy import TftpClient
class TftpBatchTransfer:
def __init__(self, host, port=69, threads=4, retries=3):
self.host = host
self.port = port
self.threads = threads
self.retries = retries
self.success_count = 0
self.fail_count = 0
self.lock = threading.Lock()
def upload_file(self, local_file, remote_file):
"""上传单个文件"""
client = TftpClient(self.host, self.port)
for attempt in range(self.retries):
try:
client.upload(remote_file, local_file)
with self.lock:
self.success_count += 1
print(f"[SUCCESS] 上传: {local_file} -> {remote_file}")
return True
except Exception as e:
if attempt < self.retries - 1:
print(f"[RETRY] {attempt+1}/{self.retries}: {local_file} - {str(e)}")
else:
with self.lock:
self.fail_count += 1
print(f"[FAIL] 上传失败: {local_file} - {str(e)}")
return False
def download_file(self, remote_file, local_file):
"""下载单个文件"""
client = TftpClient(self.host, self.port)
for attempt in range(self.retries):
try:
client.download(remote_file, local_file)
with self.lock:
self.success_count += 1
print(f"[SUCCESS] 下载: {remote_file} -> {local_file}")
return True
except Exception as e:
if attempt < self.retries - 1:
print(f"[RETRY] {attempt+1}/{self.retries}: {remote_file} - {str(e)}")
else:
with self.lock:
self.fail_count += 1
print(f"[FAIL] 下载失败: {remote_file} - {str(e)}")
return False
def worker_upload(self, queue):
"""上传工作线程"""
while not queue.empty():
try:
local_file, remote_file = queue.get_nowait()
self.upload_file(local_file, remote_file)
except queue.Empty:
break
def batch_upload(self, file_list, local_dir=".", remote_dir=""):
"""批量上传文件
file_list: 文件列表(相对于local_dir)
"""
queue = Queue()
for filename in file_list:
local_path = os.path.join(local_dir, filename)
remote_path = os.path.join(remote_dir, filename).replace('\\', '/')
if os.path.isfile(local_path):
queue.put((local_path, remote_path))
threads = []
for _ in range(min(self.threads, queue.qsize())):
t = threading.Thread(target=self.worker_upload, args=(queue,))
t.start()
threads.append(t)
for t in threads:
t.join()
def batch_download(self, file_list, remote_dir=".", local_dir=""):
"""批量下载文件"""
os.makedirs(local_dir, exist_ok=True)
queue = Queue()
for filename in file_list:
remote_path = os.path.join(remote_dir, filename).replace('\\', '/')
local_path = os.path.join(local_dir, filename)
queue.put((remote_path, local_path))
threads = []
for _ in range(min(self.threads, queue.qsize())):
t = threading.Thread(target=self.worker_download, args=(queue,))
t.start()
threads.append(t)
for t in threads:
t.join()
def worker_download(self, queue):
"""下载工作线程"""
while not queue.empty():
try:
remote_file, local_file = queue.get_nowait()
self.download_file(remote_file, local_file)
except queue.Empty:
break
def parse_args():
parser = argparse.ArgumentParser(description='批量TFTP文件传输工具')
parser.add_argument('host', help='TFTP服务器地址')
parser.add_argument('-p', '--port', type=int, default=69, help='TFTP端口号 (默认: 69)')
parser.add_argument('-t', '--threads', type=int, default=4, help='并发线程数 (默认: 4)')
parser.add_argument('-r', '--retries', type=int, default=3, help='重试次数 (默认: 3)')
# 操作模式
mode = parser.add_mutually_exclusive_group(required=True)
mode.add_argument('-u', '--upload', action='store_true', help='上传模式')
mode.add_argument('-d', '--download', action='store_true', help='下载模式')
# 文件参数
parser.add_argument('-f', '--files', nargs='+', help='文件列表 (空格分隔)')
parser.add_argument('--file-list', help='包含文件名的文件 (每行一个文件名)')
parser.add_argument('--local-dir', default='.', help='本地目录 (默认: 当前目录)')
parser.add_argument('--remote-dir', default='.', help='远程目录 (默认: 根目录)')
parser.add_argument('--extension', help='文件扩展名过滤 ( .bin .cfg)')
return parser.parse_args()
def get_file_list(args):
"""获取要传输的文件列表"""
files = []
if args.files:
files.extend(args.files)
if args.file_list:
if not os.path.exists(args.file_list):
print(f"错误: 文件列表 {args.file_list} 不存在")
sys.exit(1)
with open(args.file_list, 'r') as f:
files.extend([line.strip() for line in f if line.strip()])
if not files:
# 如果没有指定文件,扫描目录
if args.extension:
files = [f for f in os.listdir(args.local_dir)
if f.endswith(args.extension) and os.path.isfile(os.path.join(args.local_dir, f))]
else:
files = [f for f in os.listdir(args.local_dir)
if os.path.isfile(os.path.join(args.local_dir, f))]
if not files:
print("错误: 没有找到要传输的文件")
sys.exit(1)
return files
def main():
args = parse_args()
files = get_file_list(args)
print(f"TFTP服务器: {args.host}:{args.port}")
print(f"操作模式: {'上传' if args.upload else '下载'}")
print(f"文件数量: {len(files)}")
print(f"并发线程: {args.threads}")
print(f"重试次数: {args.retries}")
print("-" * 50)
transfer = TftpBatchTransfer(args.host, args.port, args.threads, args.retries)
start_time = time.time()
if args.upload:
transfer.batch_upload(files, args.local_dir, args.remote_dir)
else:
transfer.batch_download(files, args.remote_dir, args.local_dir)
elapsed_time = time.time() - start_time
print("-" * 50)
print(f"完成! 成功: {transfer.success_count}, 失败: {transfer.fail_count}")
print(f"耗时: {elapsed_time:.2f}秒")
if __name__ == "__main__":
# 确保安装了所需库: pip install tftpy
try:
from tftpy import TftpClient
except ImportError:
print("请先安装tftpy库: pip install tftpy")
sys.exit(1)
main()
使用示例
批量上传
# 上传当前目录所有.bin文件 python tftp_batch.py 192.168.1.100 -u --extension .bin # 上传指定文件 python tftp_batch.py 192.168.1.100 -u -f file1.bin file2.cfg file3.img # 使用文件列表 python tftp_batch.py 192.168.1.100 -u --file-list files.txt # 指定并发线程数 python tftp_batch.py 192.168.1.100 -u --extension .bin -t 8
批量下载
# 下载指定文件 python tftp_batch.py 192.168.1.100 -d -f config.cfg startup.cfg # 下载到指定目录 python tftp_batch.py 192.168.1.100 -d -f config.cfg --local-dir ./downloads
文件列表格式 (files.txt)
router-config.cfg
firmware.bin
backup.img
image-v2.bin
使用前准备
-
安装依赖库:
pip install tftpy
-
Windows批处理脚本 (简单版本):
@echo off
REM 批量TFTP上传批处理脚本
set TFTP_SERVER=192.168.1.100
set LOCAL_DIR=C:\firmware
set EXTENSION=*.bin
echo 开始批量上传文件到TFTP服务器 %TFTP_SERVER%
echo 源目录: %LOCAL_DIR%
echo.
for %%f in ("%LOCAL_DIR%\%EXTENSION%") do (
echo 上传: %%~nxf
tftp %TFTP_SERVER% put "%%f" "%%~nxf"
if errorlevel 1 (
echo [失败] %%~nxf
) else (
echo [成功] %%~nxf
)
)
echo.
echo 批处理完成!
pause
注意事项
- TFTP服务器需要开启,且支持写操作(如果是上传)
- 文件权限:确保目标目录有写入权限
- 网络稳定性:TFTP基于UDP,大文件传输可能不稳定
- 文件大小限制:标准TFTP最大支持32MB传输
- 并行数建议:不要超过10个并发,以免网络拥塞
这个脚本应该能满足您的批量TFTP传输需求,需要我针对您的具体场景进行修改吗?