Python案例:如何高效下载网络文件?从入门到实战详解
目录导读
- 引言:网络文件下载的常见场景与痛点
- Python下载网络文件的基础原理
- 使用urllib.request下载文件(标准库)
- 使用requests库下载文件(第三方库)
- 分块下载大文件,避免内存爆炸
- 断点续传与并发下载(进阶)
- 案例实战:批量下载图片/文档/压缩包
- 常见问题与Q&A
- 总结与最佳实践建议

网络文件下载的常见场景与痛点
在日常开发或数据采集工作中,我们经常需要从网络上获取文件——比如下载PDF报告、视频素材、软件安装包、JSON数据、图片集等,手动点击下载按钮固然简单,但当文件数量多(如1000张图片)或需要定时下载时,编写Python脚本自动化下载就成为刚需。
许多初学者会遇到以下痛点:
- 下载大文件时程序卡死或内存溢出
- 下载速度慢,不会使用多线程加速
- 下载中断后要重新从头开始(缺少断点续传)
- 遇到反爬虫机制(如User-Agent检查、cookies验证)
本文将通过多个实战案例,手把手教你用Python实现稳健、高效的网络文件下载。
Python下载网络文件的基础原理
无论是使用哪种库,网络文件下载的本质都是:
- 发送HTTP请求:客户端(你的代码)向服务器发出GET请求,指定要下载的文件URL。
- 接收响应流:服务器返回文件数据(如二进制流),Python代码逐块读取数据。
- 写入本地文件:将内存中的数据写入磁盘,保存为本地文件。
关键点:分块写入(chunked download)是处理大文件的核心,避免一次性将整个文件加载到内存中。
方法一:使用urllib.request下载文件(标准库)
Python内置的urllib.request模块无需安装,适合简单的小文件下载。
基础代码示例:
import urllib.request
def download_file(url, filename):
# 添加User-Agent模拟浏览器,避免被拒
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
req = urllib.request.Request(url, headers=headers)
# 获取文件对象并读取
with urllib.request.urlopen(req) as response, open(filename, 'wb') as out_file:
data = response.read() # 小文件直接读取
out_file.write(data)
# 使用示例
download_file('https://example.com/sample.pdf', 'sample.pdf')
print("文件下载完成!")
优点:零依赖,原生支持。
缺点:缺乏进度反馈,大文件直接read()会占满内存,不支持断点续传。
方法二:使用requests库下载文件(第三方库)
requests是Python最流行的HTTP库,语法简洁且功能全面,首先安装:
pip install requests
带进度显示的文件下载(推荐):
import requests
def download_with_progress(url, filename):
headers = {'User-Agent': 'Mozilla/5.0'}
response = requests.get(url, headers=headers, stream=True) # stream=True很重要
response.raise_for_status() # 检查请求是否成功
total_size = int(response.headers.get('content-length', 0)) # 文件总大小
block_size = 1024 # 每次读取1KB
downloaded = 0
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=block_size):
if chunk: # 过滤保持连接的空白块
f.write(chunk)
downloaded += len(chunk)
# 打印进度百分比
if total_size > 0:
percent = downloaded * 100 / total_size
print(f"\r下载进度: {percent:.1f}% ({downloaded}/{total_size} bytes)", end='')
print("\n下载完成!")
download_with_progress('https://example.com/large_file.zip', 'large_file.zip')
关键点:
stream=True:让响应以流式传输,不立刻下载所有数据。response.iter_content():逐块迭代数据,对内存友好。
方法三:分块下载大文件,避免内存爆炸
当需要下载几百MB甚至GB级别的文件时,必须确保程序不会一次性把整个文件读入内存,上面的stream模式已经实现分块,但我们可以进一步优化——使用固定大小分块并写入文件。
强化版分块下载:
import requests
def download_large_file(url, filename, chunk_size=8192): # 8KB块大小
response = requests.get(url, stream=True)
response.raise_for_status()
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
f.flush() # 强制写入磁盘,降低内存占用
print("大文件下载完成!")
内存优化原理:每次只保留chunk_size字节在内存中,写入后释放。chunk_size建议范围:1KB~1MB,过小会降低速度,过大则增加内存。
方法四:断点续传与并发下载(进阶)
1 断点续传
如果下载中断(如网络断开),避免重新下载整个文件,可以利用HTTP的Range头。
实现原理:
- 先发一个
HEAD请求获取文件大小。 - 检查本地已下载文件的大小。
- 如果本地文件小于远程文件,设置
Range: bytes=已下载大小-。 - 以“追加模式”写入文件。
代码实现:
import requests
import os
def resume_download(url, filename):
headers = {'User-Agent': 'Mozilla/5.0'}
# 获取远程文件大小
head_resp = requests.head(url, headers=headers)
total_size = int(head_resp.headers.get('content-length', 0))
# 检查本地文件
if os.path.exists(filename):
downloaded_size = os.path.getsize(filename)
if downloaded_size >= total_size:
print("文件已完整下载,无需继续")
return
else:
downloaded_size = 0
# 设置断点续传的Range头
headers['Range'] = f'bytes={downloaded_size}-'
response = requests.get(url, headers=headers, stream=True)
mode = 'ab' if downloaded_size > 0 else 'wb' # 追加模式或新建
with open(filename, mode) as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"断点续传完成,文件保存为 {filename}")
2 多线程并发下载
对于大文件,多线程能将一个文件分成多个部分同时下载,显著提速,注意:部分服务器不支持多范围请求。
简化版多线程示例:
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
def download_segment(url, start, end, segment_number, filename):
headers = {'Range': f'bytes={start}-{end}'}
resp = requests.get(url, headers=headers, stream=True)
# 假设分段写入不同临时文件,最后合并
with open(f'{filename}.part{segment_number}', 'wb') as f:
for chunk in resp.iter_content(chunk_size=4096):
if chunk:
f.write(chunk)
return segment_number
def parallel_download(url, filename, num_threads=4):
head = requests.head(url)
total = int(head.headers.get('content-length', 0))
part_size = total // num_threads
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = []
for i in range(num_threads):
start = i * part_size
end = start + part_size - 1 if i < num_threads-1 else total-1
futures.append(executor.submit(download_segment, url, start, end, i, filename))
for future in as_completed(futures):
future.result() # 检查异常
# 合并所有分段文件(此处省略合并代码)
print(f"多线程下载完成,共{num_threads}个分段")
注意:生产环境中建议使用成熟库如aria2或wget的Python包装,或者aiohttp实现异步下载。
案例实战:批量下载图片/文档/压缩包
假设你有100张图片的URL列表(放在一个文本文件中),我们要把它们全部下载到本地。
完整脚本:
import requests
import os
from concurrent.futures import ThreadPoolExecutor
import time
def download_single_image(url, save_dir, filename):
"""下载单张图片并保存"""
try:
headers = {'User-Agent': 'Mozilla/5.0'}
resp = requests.get(url, headers=headers, timeout=10)
resp.raise_for_status()
filepath = os.path.join(save_dir, filename)
with open(filepath, 'wb') as f:
f.write(resp.content)
print(f"✓ 已下载: {filename}")
return True
except Exception as e:
print(f"✗ 下载失败 {filename}: {e}")
return False
def batch_download(url_file, save_dir, max_workers=5):
"""从URL列表文件批量下载"""
if not os.path.exists(save_dir):
os.makedirs(save_dir)
# 读取URL列表
with open(url_file, 'r', encoding='utf-8') as f:
urls = [line.strip() for line in f if line.strip()]
print(f"共发现 {len(urls)} 个文件需要下载")
# 使用线程池并发下载
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for i, url in enumerate(urls, 1):
ext = url.split('.')[-1].split('?')[0] or 'jpg' # 简单获取扩展名
filename = f"image_{i:04d}.{ext}"
futures.append(executor.submit(download_single_image, url, save_dir, filename))
for future in futures:
future.result()
print(f"批量下载完成,文件保存在: {save_dir}")
# 使用示例
batch_download('image_urls.txt', 'downloaded_images', max_workers=10)
扩展建议:
- 可加入重试机制(最多3次)
- 支持从CSV、JSON读取URL
- 记录下载日志到文件
常见问题与Q&A
Q1:下载文件时,服务器返回403 Forbidden怎么办?
A:通常是因为缺少User-Agent或cookies,解决方案:
- 添加合理的
User-Agent(如上例所示)。 - 若有登录需求,先使用
requests.Session()获取cookies,并携带cookies下载。 - 某些网站需要
Referer头,可模拟从页面点击链接。
Q2:如何知道下载是否成功?
A:除了检查HTTP状态码(200或206),还可以:
- 比较本地文件大小与
content-length头是否一致。 - 计算文件的MD5/SHA256哈希值,与服务器提供的哈希对比(如果有)。
Q3:下载速度很慢,如何优化?
A:尝试以下方法:
- 使用多线程/多进程并发下载不同文件(如案例七)。
- 对单个大文件使用多线程分段下载(需要服务器支持
Range)。 - 检查网络环境,避免在高峰时段下载。
Q4:如何下载需要登录后才能访问的文件?
A:使用requests.Session():
session = requests.Session()
login_data = {'username': 'your_name', 'password': 'your_pass'}
session.post('https://example.com/login', data=login_data)
# 然后使用session.get()下载文件
response = session.get('https://example.com/protected_file.zip')
Q5:下载中途中断,是否有办法自动重试?
A:可以使用tenacity库或自己写循环:
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
def safe_download(url, filename):
# 你的下载代码
pass
总结与最佳实践建议
通过本文的案例,你已掌握了从最基础的urllib到进阶的断点续传、多线程下载的完整技能,在实际项目中,建议遵循以下原则:
- 优先使用
requests库:功能丰富、社区活跃、异常处理完善。 - 大文件务必使用
stream=True和分块写入:保护内存。 - 添加User-Agent和重试机制:提升下载稳定性。
- 善用并发:批量下载时,线程数建议为5~20,过多可能导致IP被封。
- 记录日志:对于生产环境,将下载结果写入日志文件,便于排查错误。
最后提醒:下载文件时请遵守网站的robots.txt规则和版权规定,避免对服务器造成过大压力或侵犯他人权益。
掌握这些技巧,你就能轻松应对绝大多数的网络文件下载需求,去编写你的第一个自动化下载脚本吧!