本文目录导读:

下面是一个完整的 Python 批量加水印案例,包含详细的代码、说明和可配置项。
使用 PIL/Pillow 库(图片水印)
安装依赖
pip install Pillow
完整代码
import os
from PIL import Image, ImageDraw, ImageFont
from pathlib import Path
class ImageWatermark:
def __init__(self, input_dir="input", output_dir="output",
watermark_text="Watermark", font_size=50,
opacity=128, position="center"):
"""
图片水印处理类
Args:
input_dir: 输入文件夹
output_dir: 输出文件夹
watermark_text: 水印文字
font_size: 字体大小
opacity: 透明度 (0-255)
position: 位置 (center, top-left, top-right, bottom-left, bottom-right, tile)
"""
self.input_dir = input_dir
self.output_dir = output_dir
self.watermark_text = watermark_text
self.font_size = font_size
self.opacity = opacity
self.position = position
# 确保输出目录存在
Path(output_dir).mkdir(parents=True, exist_ok=True)
def get_font(self):
"""获取字体,尝试系统字体"""
try:
# Windows 系统字体
font_path = "C:/Windows/Fonts/msyh.ttc" # 微软雅黑
return ImageFont.truetype(font_path, self.font_size)
except:
try:
# Linux/Mac 系统字体
return ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", self.font_size)
except:
# 默认字体
return ImageFont.load_default()
def get_watermark_position(self, img, text_size):
"""计算水印位置"""
width, height = img.size
text_width, text_height = text_size
positions = {
"center": ((width - text_width) // 2, (height - text_height) // 2),
"top-left": (20, 20),
"top-right": (width - text_width - 20, 20),
"bottom-left": (20, height - text_height - 20),
"bottom-right": (width - text_width - 20, height - text_height - 20),
"tile": None # 平铺模式需要特殊处理
}
return positions.get(self.position, positions["center"])
def add_watermark_single(self, img_path):
"""为单张图片添加水印"""
try:
# 打开图片并转换为RGBA模式以支持透明度
img = Image.open(img_path).convert("RGBA")
# 创建水印层
watermark_layer = Image.new("RGBA", img.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(watermark_layer)
# 获取字体
font = self.get_font()
# 计算文字大小
text_bbox = draw.textbbox((0, 0), self.watermark_text, font=font)
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
text_size = (text_width, text_height)
if self.position == "tile":
# 平铺水印
x, y = 0, 0
while y < img.height:
while x < img.width:
draw.text((x, y), self.watermark_text,
font=font, fill=(255, 255, 255, self.opacity))
x += text_width + 50 # 间距
x = 0
y += text_height + 50
else:
# 单次水印
position = self.get_watermark_position(img, text_size)
draw.text(position, self.watermark_text,
font=font, fill=(255, 255, 255, self.opacity))
# 合并图片和水印层
watermarked = Image.alpha_composite(img, watermark_layer)
# 转换回RGB模式保存
output_path = os.path.join(self.output_dir, os.path.basename(img_path))
watermarked.convert("RGB").save(output_path, "JPEG", quality=95)
print(f"✅ 已处理: {os.path.basename(img_path)}")
return True
except Exception as e:
print(f"❌ 处理失败 {img_path}: {e}")
return False
def process_batch(self):
"""批量处理所有图片"""
# 支持的图片格式
image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff'}
# 获取输入目录中所有图片
images = []
for ext in image_extensions:
images.extend(Path(self.input_dir).glob(f"*{ext}"))
images.extend(Path(self.input_dir).glob(f"*{ext.upper()}"))
if not images:
print(f"⚠️ 在 {self.input_dir} 目录未找到图片文件")
return
print(f"📷 找到 {len(images)} 张图片")
print("🚀 开始批量处理...\n")
success_count = 0
for img_path in images:
if self.add_watermark_single(str(img_path)):
success_count += 1
print(f"\n✅ 处理完成! 成功: {success_count}/{len(images)}")
# 使用示例
if __name__ == "__main__":
# 创建输入输出目录
Path("input_images").mkdir(exist_ok=True)
Path("output_images").mkdir(exist_ok=True)
# 配置水印参数
watermarker = ImageWatermark(
input_dir="input_images",
output_dir="output_images",
watermark_text="© 2024 Your Company", # 水印文字
font_size=40, # 字体大小
opacity=128, # 透明度 (0=透明, 255=不透明)
position="bottom-right" # 位置
)
# 批量处理
watermarker.process_batch()
使用 OpenCV(更高级的水印)
安装依赖
pip install opencv-python numpy
图片水印(Logo方式)
import cv2
import numpy as np
from pathlib import Path
def add_logo_watermark(input_dir, output_dir, logo_path, alpha=0.3, position="bottom-right"):
"""
使用图片作为水印
Args:
input_dir: 输入目录
output_dir: 输出目录
logo_path: 水印logo路径
alpha: 透明度
position: 位置
"""
# 读取logo并确保有透明通道
logo = cv2.imread(logo_path, cv2.IMREAD_UNCHANGED)
if logo is None:
print("❌ 无法读取logo图片")
return
# 如果logo没有alpha通道,添加
if logo.shape[2] == 3:
logo = cv2.cvtColor(logo, cv2.COLOR_BGR2BGRA)
Path(output_dir).mkdir(parents=True, exist_ok=True)
# 支持的图片格式
image_extensions = {'.jpg', '.jpeg', '.png', '.bmp'}
images = []
for ext in image_extensions:
images.extend(Path(input_dir).glob(f"*{ext}"))
images.extend(Path(input_dir).glob(f"*{ext.upper()}"))
for img_path in images:
# 读取图片
img = cv2.imread(str(img_path), cv2.IMREAD_UNCHANGED)
if img.shape[2] == 3:
img = cv2.cvtColor(img, cv2.COLOR_BGR2BGRA)
# 调整logo大小(按比例)
logo_height = int(img.shape[0] * 0.1) # 图片高度的10%
aspect_ratio = logo.shape[1] / logo.shape[0]
logo_width = int(logo_height * aspect_ratio)
logo_resized = cv2.resize(logo, (logo_width, logo_height))
# 计算位置
positions = {
"top-left": (20, 20),
"top-right": (img.shape[1] - logo_width - 20, 20),
"bottom-left": (20, img.shape[0] - logo_height - 20),
"bottom-right": (img.shape[1] - logo_width - 20, img.shape[0] - logo_height - 20)
}
pos = positions.get(position, positions["bottom-right"])
# ROI区域
roi = img[pos[1]:pos[1]+logo_height, pos[0]:pos[0]+logo_width]
# 创建mask
logo_rgba = logo_resized[:, :, :]
logo_rgb = logo_rgba[:, :, :3]
mask = logo_rgba[:, :, 3] / 255.0
# 叠加水印
for c in range(3):
roi[:, :, c] = (1 - alpha * mask) * roi[:, :, c] + alpha * mask * logo_rgb[:, :, c]
# 保存结果
output_path = os.path.join(output_dir, os.path.basename(str(img_path)))
cv2.imwrite(output_path, img)
print(f"✅ 已处理: {os.path.basename(str(img_path))}")
# 使用示例
# add_logo_watermark("input_images", "output_images", "logo.png")
批量处理视频水印
import cv2
from pathlib import Path
def add_video_watermark(input_video, output_video, watermark_text, position="bottom-right"):
"""
为视频添加文字水印
Args:
input_video: 输入视频路径
output_video: 输出视频路径
watermark_text: 水印文字
position: 水印位置
"""
cap = cv2.VideoCapture(input_video)
# 获取视频属性
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# 创建视频写入器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video, fourcc, fps, (width, height))
# 字体设置
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 1
font_thickness = 2
text_size = cv2.getTextSize(watermark_text, font, font_scale, font_thickness)[0]
# 计算位置
positions = {
"top-left": (20, 30),
"top-right": (width - text_size[0] - 20, 30),
"bottom-left": (20, height - 20),
"bottom-right": (width - text_size[0] - 20, height - 20)
}
pos = positions.get(position, positions["bottom-right"])
frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# 添加水印
cv2.putText(frame, watermark_text, pos, font, font_scale,
(255, 255, 255), font_thickness, cv2.LINE_AA)
out.write(frame)
frame_count += 1
if frame_count % 30 == 0:
print(f"⏳ 进度: {frame_count}/{total_frames}")
cap.release()
out.release()
print(f"✅ 视频处理完成: {output_video}")
完整的命令行工具
创建一个命令行可执行的批量水印工具 batch_watermarker.py:
import argparse
import sys
from pathlib import Path
from PIL import Image, ImageDraw, ImageFont
def main():
parser = argparse.ArgumentParser(description="批量图片水印工具")
parser.add_argument("input", help="输入文件夹路径")
parser.add_argument("-o", "--output", default="output", help="输出文件夹路径")
parser.add_argument("-t", "--text", default="Watermark", help="水印文字")
parser.add_argument("-s", "--size", type=int, default=40, help="字体大小")
parser.add_argument("-a", "--alpha", type=int, default=128, help="透明度 (0-255)")
parser.add_argument("-p", "--position", default="center",
choices=["center", "top-left", "top-right", "bottom-left", "bottom-right", "tile"],
help="水印位置")
parser.add_argument("-f", "--font", help="自定义字体路径")
args = parser.parse_args()
# 检查输入目录
if not Path(args.input).exists():
print(f"❌ 输入目录不存在: {args.input}")
sys.exit(1)
# 创建水印处理器
watermarker = ImageWatermark(
input_dir=args.input,
output_dir=args.output,
watermark_text=args.text,
font_size=args.size,
opacity=args.alpha,
position=args.position
)
# 如果提供了自定义字体
if args.font and Path(args.font).exists():
from PIL import ImageFont
watermarker.font_path = args.font
# 开始处理
watermarker.process_batch()
if __name__ == "__main__":
main()
使用方法
基本使用
# 处理 input 目录中的所有图片 python batch_watermarker.py input_images # 自定义水印文字和位置 python batch_watermarker.py input_images -t "© 2024" -p "bottom-right" -s 50 # 设置透明度和字体大小 python batch_watermarker.py input_images -t "Confidential" -a 100 -s 60
目录结构
project/
├── input_images/ # 放入需要加水印的图片
│ ├── photo1.jpg
│ ├── photo2.jpg
│ └── photo3.png
├── output_images/ # 处理后的图片将保存到这里
├── batch_watermarker.py # 主程序
└── logo.png # (可选)Logo水印图片
特点
- ✅ 批量处理:自动处理整个文件夹的图片
- ✅ 多种位置:支持6种水印位置
- ✅ 透明度控制:可调整水印透明度
- ✅ 多格式支持:支持JPG、PNG、BMP等格式
- ✅ 错误处理:单个文件失败不影响其他文件
- ✅ 进度显示:实时显示处理进度
需要根据你的具体需求调整参数即可。