本文目录导读:

我来详细介绍使用Python实现姿态估计的几种方法,从简单到复杂:
使用MediaPipe实现姿态估计(最简单)
MediaPipe是Google开发的开源框架,使用起来非常简便:
import cv2
import mediapipe as mp
import numpy as np
class PoseEstimator:
def __init__(self):
# 初始化MediaPipe Pose
self.mp_pose = mp.solutions.pose
self.pose = self.mp_pose.Pose(
static_image_mode=False,
model_complexity=1, # 0, 1, 2 复杂度递增
smooth_landmarks=True,
enable_segmentation=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
self.mp_drawing = mp.solutions.drawing_utils
self.mp_drawing_styles = mp.solutions.drawing_styles
def estimate_pose(self, image):
"""检测图像中的姿态"""
# 转换颜色空间 (BGR to RGB)
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image_rgb.flags.writeable = False
# 进行姿态检测
results = self.pose.process(image_rgb)
# 恢复颜色空间
image_rgb.flags.writeable = True
image_bgr = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR)
# 绘制关键点
if results.pose_landmarks:
self.mp_drawing.draw_landmarks(
image_bgr,
results.pose_landmarks,
self.mp_pose.POSE_CONNECTIONS,
landmark_drawing_spec=self.mp_drawing_styles.get_default_pose_landmarks_style()
)
return image_bgr, results
def get_keypoints(self, results, image_shape):
"""获取关键点坐标"""
h, w, _ = image_shape
keypoints = []
if results.pose_landmarks:
for landmark in results.pose_landmarks.landmark:
# 转换为像素坐标
x = int(landmark.x * w)
y = int(landmark.y * h)
z = landmark.z
visibility = landmark.visibility
keypoints.append([x, y, z, visibility])
return np.array(keypoints)
def process_video():
"""处理视频流"""
cap = cv2.VideoCapture(0) # 使用摄像头
estimator = PoseEstimator()
while cap.isOpened():
success, frame = cap.read()
if not success:
break
# 镜像翻转
frame = cv2.flip(frame, 1)
# 姿态估计
annotated_frame, results = estimator.estimate_pose(frame)
# 获取关键点
if results.pose_landmarks:
keypoints = estimator.get_keypoints(results, frame.shape)
# 显示部分关键点信息
if len(keypoints) > 0:
# 获取鼻子坐标 (landmark 0)
nose = keypoints[0]
cv2.putText(annotated_frame, f"Nose: ({nose[0]}, {nose[1]})",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
# 显示结果
cv2.imshow('Pose Estimation', annotated_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
process_video()
使用OpenCV DNN模块(更精确)
import cv2
import numpy as np
import urllib.request
class OpenPoseEstimator:
def __init__(self, model_path="pose/coco"):
self.BODY_PARTS = {
"Nose": 0, "Neck": 1, "RShoulder": 2, "RElbow": 3, "RWrist": 4,
"LShoulder": 5, "LElbow": 6, "LWrist": 7, "RHip": 8, "RKnee": 9,
"RAnkle": 10, "LHip": 11, "LKnee": 12, "LAnkle": 13, "REye": 14,
"LEye": 15, "REar": 16, "LEar": 17, "Background": 18
}
self.POSE_PAIRS = [
["Neck", "RShoulder"], ["Neck", "LShoulder"], ["RShoulder", "RElbow"],
["RElbow", "RWrist"], ["LShoulder", "LElbow"], ["LElbow", "LWrist"],
["Neck", "RHip"], ["RHip", "RKnee"], ["RKnee", "RAnkle"],
["Neck", "LHip"], ["LHip", "LKnee"], ["LKnee", "LAnkle"],
["Neck", "Nose"], ["Nose", "REye"], ["REye", "REar"],
["Nose", "LEye"], ["LEye", "LEar"]
]
# 下载模型文件
self.download_model()
# 加载网络
self.net = cv2.dnn.readNetFromTensorflow(
"pose/coco/graph_opt.pb",
"pose/coco/graph.pbtxt"
)
def download_model(self):
"""下载预训练模型"""
import os
if not os.path.exists("pose/coco/graph_opt.pb"):
print("下载模型文件...")
os.makedirs("pose/coco", exist_ok=True)
url = "https://github.com/opencv/opencv_extra/raw/master/testdata/dnn/openpose_pose_coco.prototxt"
urllib.request.urlretrieve(url, "pose/coco/graph.pbtxt")
url = "https://github.com/opencv/opencv_extra/raw/master/testdata/dnn/openpose_pose_coco.caffemodel"
urllib.request.urlretrieve(url, "pose/coco/graph_opt.pb")
def estimate_pose(self, frame):
"""估计姿态"""
frameHeight, frameWidth = frame.shape[:2]
# 准备输入
inpBlob = cv2.dnn.blobFromImage(
frame, 1.0 / 255, (368, 368),
(0, 0, 0), swapRB=False, crop=False
)
self.net.setInput(inpBlob)
output = self.net.forward()
# 检测关键点
points = []
for i in range(len(self.BODY_PARTS) - 1):
# 获取置信度图
probMap = output[0, i, :, :]
minVal, prob, minLoc, point = cv2.minMaxLoc(probMap)
# 缩放坐标到原图尺寸
x = (frameWidth * point[0]) / output.shape[3]
y = (frameHeight * point[1]) / output.shape[2]
if prob > 0.1: # 置信度阈值
points.append((int(x), int(y)))
cv2.circle(frame, (int(x), int(y)), 8, (0, 255, 255), thickness=-1)
else:
points.append(None)
# 绘制骨架
for pair in self.POSE_PAIRS:
partFrom = pair[0]
partTo = pair[1]
idFrom = self.BODY_PARTS[partFrom]
idTo = self.BODY_PARTS[partTo]
if points[idFrom] and points[idTo]:
cv2.line(frame, points[idFrom], points[idTo], (0, 255, 0), 3)
cv2.circle(frame, points[idFrom], 3, (0, 0, 255), thickness=-1)
cv2.circle(frame, points[idTo], 3, (0, 0, 255), thickness=-1)
return frame, points
# 使用示例
def openpose_demo():
cap = cv2.VideoCapture(0)
estimator = OpenPoseEstimator()
while True:
ret, frame = cap.read()
if not ret:
break
frame, points = estimator.estimate_pose(frame)
cv2.imshow('OpenPose', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
高级应用:计算关节角度
import cv2
import mediapipe as mp
import numpy as np
import math
class AdvancedPoseAnalyzer:
def __init__(self):
self.mp_pose = mp.solutions.pose
self.pose = self.mp_pose.Pose(
static_image_mode=False,
model_complexity=2,
smooth_landmarks=True
)
def calculate_angle(self, a, b, c):
"""计算三个点之间的角度"""
a = np.array(a[:2]) # (x, y)
b = np.array(b[:2])
c = np.array(c[:2])
radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - \
np.arctan2(a[1] - b[1], a[0] - b[0])
angle = np.abs(radians * 180.0 / np.pi)
if angle > 180.0:
angle = 360 - angle
return angle
def analyze_pose(self, image):
"""分析姿态并计算各种角度"""
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
results = self.pose.process(image_rgb)
if not results.pose_landmarks:
return image, None
landmarks = results.pose_landmarks.landmark
h, w, _ = image.shape
# 获取关键点坐标
def get_landmark_point(idx):
landmark = landmarks[idx]
return (int(landmark.x * w), int(landmark.y * h), landmark.visibility)
points = {
'left_shoulder': get_landmark_point(mp.solutions.pose.PoseLandmark.LEFT_SHOULDER),
'right_shoulder': get_landmark_point(mp.solutions.pose.PoseLandmark.RIGHT_SHOULDER),
'left_elbow': get_landmark_point(mp.solutions.pose.PoseLandmark.LEFT_ELBOW),
'right_elbow': get_landmark_point(mp.solutions.pose.PoseLandmark.RIGHT_ELBOW),
'left_wrist': get_landmark_point(mp.solutions.pose.PoseLandmark.LEFT_WRIST),
'right_wrist': get_landmark_point(mp.solutions.pose.PoseLandmark.RIGHT_WRIST),
'left_hip': get_landmark_point(mp.solutions.pose.PoseLandmark.LEFT_HIP),
'right_hip': get_landmark_point(mp.solutions.pose.PoseLandmark.RIGHT_HIP),
'left_knee': get_landmark_point(mp.solutions.pose.PoseLandmark.LEFT_KNEE),
'right_knee': get_landmark_point(mp.solutions.pose.PoseLandmark.RIGHT_KNEE),
'left_ankle': get_landmark_point(mp.solutions.pose.PoseLandmark.LEFT_ANKLE),
'right_ankle': get_landmark_point(mp.solutions.pose.PoseLandmark.RIGHT_ANKLE)
}
# 计算关节角度
angles = {}
# 左肘角度
if all(p[2] > 0.5 for p in [points['left_shoulder'], points['left_elbow'], points['left_wrist']]):
angles['left_elbow'] = self.calculate_angle(
points['left_shoulder'], points['left_elbow'], points['left_wrist']
)
# 右肘角度
if all(p[2] > 0.5 for p in [points['right_shoulder'], points['right_elbow'], points['right_wrist']]):
angles['right_elbow'] = self.calculate_angle(
points['right_shoulder'], points['right_elbow'], points['right_wrist']
)
# 左膝角度
if all(p[2] > 0.5 for p in [points['left_hip'], points['left_knee'], points['left_ankle']]):
angles['left_knee'] = self.calculate_angle(
points['left_hip'], points['left_knee'], points['left_ankle']
)
# 右膝角度
if all(p[2] > 0.5 for p in [points['right_hip'], points['right_knee'], points['right_ankle']]):
angles['right_knee'] = self.calculate_angle(
points['right_hip'], points['right_knee'], points['right_ankle']
)
# 显示角度信息
y_offset = 30
for joint, angle in angles.items():
cv2.putText(image, f"{joint}: {angle:.1f}°",
(10, y_offset), cv2.FONT_HERSHEY_SIMPLEX,
0.6, (255, 255, 0), 2)
y_offset += 25
# 绘制关键点和骨架
mp.solutions.drawing_utils.draw_landmarks(
image, results.pose_landmarks,
mp.solutions.pose.POSE_CONNECTIONS
)
return image, angles
# 实时姿态分析
def realtime_pose_analysis():
cap = cv2.VideoCapture(0)
analyzer = AdvancedPoseAnalyzer()
print("按 'q' 退出程序")
while cap.isOpened():
success, frame = cap.read()
if not success:
break
frame = cv2.flip(frame, 1)
# 分析姿态
annotated_frame, angles = analyzer.analyze_pose(frame)
# 显示信息
if angles:
status = "姿态检测正常" if all(a < 160 for a in angles.values()) else "异常姿态"
cv2.putText(annotated_frame, f"状态: {status}",
(10, annotated_frame.shape[0] - 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.imshow('Pose Analysis', annotated_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
realtime_pose_analysis()
人体姿势分类器
import cv2
import mediapipe as mp
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn import svm
import pickle
class PoseClassifier:
def __init__(self):
self.mp_pose = mp.solutions.pose
self.pose = self.mp_pose.Pose()
self.model = None
self.scaler = StandardScaler()
def extract_features(self, landmarks):
"""从关键点提取特征"""
features = []
# 计算相对坐标和角度
# 使用肩部作为参考点
left_shoulder = landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value]
right_shoulder = landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
# 肩部中心
center_x = (left_shoulder.x + right_shoulder.x) / 2
center_y = (left_shoulder.y + right_shoulder.y) / 2
# 提取所有关键点的相对坐标
for landmark in landmarks:
features.extend([
landmark.x - center_x,
landmark.y - center_y,
landmark.z
])
# 计算关键身体部位之间的距离
# 手腕到肩膀的距离
left_wrist = landmarks[self.mp_pose.PoseLandmark.LEFT_WRIST.value]
right_wrist = landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value]
features.append(self._distance(left_wrist, left_shoulder))
features.append(self._distance(right_wrist, right_shoulder))
features.append(self._distance(left_wrist, right_wrist))
return np.array(features)
def _distance(self, point1, point2):
"""计算两点间欧氏距离"""
return np.sqrt(
(point1.x - point2.x)**2 +
(point1.y - point2.y)**2 +
(point1.z - point2.z)**2
)
def train_model(self, X_train, y_train):
"""训练分类器"""
# 特征标准化
X_train_scaled = self.scaler.fit_transform(X_train)
# 使用SVM
self.model = svm.SVC(kernel='rbf', probability=True)
self.model.fit(X_train_scaled, y_train)
return self
def predict_pose(self, landmarks):
"""预测姿态类型"""
if self.model is None:
raise ValueError("模型尚未训练")
features = self.extract_features(landmarks)
features_scaled = self.scaler.transform([features])
prediction = self.model.predict(features_scaled)
probability = self.model.predict_proba(features_scaled)
return prediction[0], probability[0]
# 使用示例:数据收集和训练
def collect_pose_data():
"""收集姿态数据用于训练"""
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()
cap = cv2.VideoCapture(0)
# 定义要收集的姿态类型
poses = {
'0': '站立',
'1': '坐下',
'2': '举手',
'3': '弯腰'
}
print("数据收集模式:")
for key, value in poses.items():
print(f"按 '{key}' 收集 {value} 姿态数据")
print("按 's' 保存数据")
print("按 'q' 退出")
collected_data = []
collected_labels = []
current_pose = None
while cap.isOpened():
success, frame = cap.read()
if not success:
break
frame = cv2.flip(frame, 1)
image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = pose.process(image_rgb)
if results.pose_landmarks:
mp.solutions.drawing_utils.draw_landmarks(
frame, results.pose_landmarks,
mp_pose.POSE_CONNECTIONS
)
key = cv2.waitKey(1) & 0xFF
if chr(key) in poses:
current_pose = poses[chr(key)]
print(f"收集 {current_pose} 姿态...")
if results.pose_landmarks:
classifier = PoseClassifier()
features = classifier.extract_features(results.pose_landmarks.landmark)
collected_data.append(features)
collected_labels.append(current_pose)
elif key == ord('s'):
# 保存数据到文件
data = {
'features': np.array(collected_data),
'labels': collected_labels
}
with open('pose_data.pkl', 'wb') as f:
pickle.dump(data, f)
print(f"已保存 {len(collected_data)} 个样本")
elif key == ord('q'):
break
# 显示当前状态
cv2.putText(frame, f"当前: {current_pose or '未选择'}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.putText(frame, f"样本数: {len(collected_data)}",
(10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.imshow('Data Collection', frame)
cap.release()
cv2.destroyAllWindows()
# 姿态估计与分类主程序
def main():
print("选择模式:")
print("1. 基础姿态检测")
print("2. 角度分析")
print("3. 数据收集")
print("4. 退出")
choice = input("请输入选择 (1-4): ")
if choice == '1':
process_video()
elif choice == '2':
realtime_pose_analysis()
elif choice == '3':
collect_pose_data()
else:
return
if __name__ == "__main__":
main()
安装依赖
# 基础依赖 pip install opencv-python pip install mediapipe pip install numpy # 高级功能所需 pip install scikit-learn pip install matplotlib # 用于可视化
性能优化建议
# 1. 使用更轻量级的模型
pose = mp.solutions.pose.Pose(
model_complexity=0, # 使用最轻量级模型
enable_segmentation=False # 不需要分割
)
# 2. 降低输入分辨率
inpBlob = cv2.dnn.blobFromImage(
frame, 1.0 / 255, (256, 256), # 降低输入尺寸
(0, 0, 0), swapRB=False, crop=False
)
# 3. 跳帧处理
frame_count = 0
process_every_n_frames = 2 # 每2帧处理一次
# 4. 使用GPU加速
net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
这些代码实现了:
- ✅ 基础姿态检测
- ✅ 关节角度计算
- ✅ 实时姿态分析
- ✅ 数据收集和模型训练
- ✅ 多种实现方式对比
建议先使用MediaPipe(最简单),再根据需要切换到更精确的方案。