(原创作者@CSDN_伊利丹~怒风)
环境准备
手机
测试手机型号:Redmi K60 Pro
处理器:第二代骁龙8移动--8gen2
运行内存:8.0GB ,LPDDR5X-8400,67.0 GB/s
摄像头:前置16MP+后置50MP+8MP+2MP
AI算力:NPU 48Tops INT8 && GPU 1536ALU x 2 x 680MHz = 2.089 TFLOPS
提示:任意手机均可以,性能越好的手机速度越快
软件
APP:AidLux 2.0
系统环境:Ubuntu 20.04.3 LTS
提示:AidLux登录后代码运行更流畅,在代码运行时保持AidLux APP在前台运行,避免代码运行过程中被系统回收进程,另外屏幕保持常亮,一般息屏后一段时间,手机系统会进入休眠状态,如需长驻后台需要给APP权限。
算法Demo
Demo代码介绍
这段代码是一个姿态检测模型的实时姿态估计应用,它使用了两个模型级联工作:一个用于检测人体,另一个用于在检测到的人体上识别关键点。下面是添加了详细中文注释的代码:
代码功能特点介绍
双模型级联处理:使用两个模型协同工作,第一个模型负责检测人体,第二个模型负责在检测到的人体上识别详细的关键点。
自适应摄像头选择:代码会自动检测并优先使用 USB 摄像头,如果没有 USB 摄像头,则会尝试使用设备内置摄像头。
图像处理优化:
- 图像预处理包括调整大小、填充和归一化保持原始图像的宽高比,避免变形支持图像水平翻转,使显示更符合用户习惯
高性能推理:
- 使用 aidlite 框架进行模型推理姿态检测模型使用 CPU 加速关键点识别模型使用 GPU 加速多线程支持,提高处理效率
精确的姿态关键点识别:
- 检测人体 22 个关键点(上半身模型)支持关键点连接,形成完整的姿态骨架提供置信度阈值过滤,确保检测准确性
灵活的 ROI 提取:
- 基于检测结果动态提取感兴趣区域支持旋转不变性,即使人体倾斜也能准确提取自动调整 ROI 大小,适应不同距离的人体
直观的可视化:
- 清晰显示检测到的人体边界框绘制关键点和连接线,形成直观的姿态骨架支持自定义颜色和大小,便于区分不同姿态
鲁棒的错误处理:
- 摄像头打开失败自动重试模型加载和推理错误检测异常情况优雅处理,确保程序稳定运行
这个应用可以用于多种场景,如健身指导、动作分析、人机交互等,通过识别和跟踪人体关键点,可以实时分析人体姿态并提供反馈。
Demo中的算法模型分析
这段代码使用了两个模型用AidLite 框架进行人体姿态检测和关键点识别,它们分别是:
- 姿态检测模型 (pose_detection.tflite)
- 作用:从输入图像中检测人体的大致位置和姿态。输入:128×128 像素的 RGB 图像。输出:包含边界框和关键点的预测结果(896 个候选框,每个框有 12 个坐标值)。特点:
- 轻量级设计,适合实时处理。使用锚框机制提高检测精度。输出人体的粗略位置和关键点(如眼睛、耳朵、肩膀等)。采用 CPU 加速,平衡性能与精度。
- 上半身姿态关键点模型 (pose_landmark_upper_body.tflite)
- 作用:在检测到的人体区域内,精确识别上半身的 22 个关键点。输入:256×256 像素的 RGB 图像(ROI 区域)。输出:31 个关键点的坐标(每个点包含 x、y、z 坐标和可见性)。特点:
- 高精度识别肩部、肘部、手腕等关节位置。使用 GPU 加速,提升复杂模型的推理速度。支持多角度和遮挡场景下的姿态估计。输出每个关键点的置信度,用于过滤不可靠的检测结果。
模型协同工作流程
- 姿态检测:先使用第一个模型快速定位人体位置。ROI 提取:基于检测结果裁剪并旋转感兴趣区域(ROI)。关键点识别:将 ROI 输入第二个模型,获取精细的上半身关键点。坐标映射:将归一化的关键点坐标映射回原始图像空间。
这种级联模型的设计兼顾了效率和精度,适合实时视频流处理。
Demo代码
import mathimport numpy as npfrom scipy.special import expitimport time from time import sleepimport aidliteimport osimport subprocessimport aidcv as cv2 # 摄像头设备路径root_dir = "/sys/class/video4linux/" def resize_pad(img): """ 调整图像大小并填充,使其适合检测器输入 人脸和手掌检测器网络分别需要256x256和128x128的输入图像。 此函数会保持原始图像的宽高比进行缩放,并在需要时添加填充。 返回值: img1: 256x256大小的图像 img2: 128x128大小的图像 scale: 原始图像与256x256图像之间的缩放因子 pad: 原始图像中添加的填充像素 """ size0 = img.shape if size0[0]>=size0[1]: h1 = 256 w1 = 256 * size0[1] // size0[0] padh = 0 padw = 256 - w1 scale = size0[1] / w1 else: h1 = 256 * size0[0] // size0[1] w1 = 256 padh = 256 - h1 padw = 0 scale = size0[0] / h1 padh1 = padh//2 padh2 = padh//2 + padh%2 padw1 = padw//2 padw2 = padw//2 + padw%2 img1 = cv2.resize(img, (w1,h1)) img1 = np.pad(img1, ((padh1, padh2), (padw1, padw2), (0,0)), 'constant', constant_values=(0,0)) pad = (int(padh1 * scale), int(padw1 * scale)) img2 = cv2.resize(img1, (128,128)) return img1, img2, scale, pad def denormalize_detections(detections, scale, pad): """ 将归一化的检测坐标映射回原始图像坐标 人脸和手掌检测器网络需要256x256和128x128的输入图像, 因此输入图像会被填充和缩放。此函数将归一化坐标映射回原始图像坐标。 输入: detections: nxm张量。n是检测到的对象数量。 m是4+2*k,其中前4个值是边界框坐标,k是检测器输出的额外关键点数量。 scale: 用于调整图像大小的缩放因子 pad: x和y维度上的填充量 """ detections[:, 0] = detections[:, 0] * scale * 256 - pad[0] detections[:, 1] = detections[:, 1] * scale * 256 - pad[1] detections[:, 2] = detections[:, 2] * scale * 256 - pad[0] detections[:, 3] = detections[:, 3] * scale * 256 - pad[1] detections[:, 4::2] = detections[:, 4::2] * scale * 256 - pad[1] detections[:, 5::2] = detections[:, 5::2] * scale * 256 - pad[0] return detections def _decode_boxes(raw_boxes, anchors): """ 将预测结果转换为实际坐标 使用锚框将模型预测转换为实际边界框坐标,一次性处理整个批次。 """ boxes = np.zeros_like(raw_boxes) x_center = raw_boxes[..., 0] / 128.0 * anchors[:, 2] + anchors[:, 0] y_center = raw_boxes[..., 1] / 128.0 * anchors[:, 3] + anchors[:, 1] w = raw_boxes[..., 2] / 128.0 * anchors[:, 2] h = raw_boxes[..., 3] / 128.0 * anchors[:, 3] boxes[..., 0] = y_center - h / 2. # ymin boxes[..., 1] = x_center - w / 2. # xmin boxes[..., 2] = y_center + h / 2. # ymax boxes[..., 3] = x_center + w / 2. # xmax for k in range(4): offset = 4 + k*2 keypoint_x = raw_boxes[..., offset ] / 128.0 * anchors[:, 2] + anchors[:, 0] keypoint_y = raw_boxes[..., offset + 1] / 128.0 * anchors[:, 3] + anchors[:, 1] boxes[..., offset ] = keypoint_x boxes[..., offset + 1] = keypoint_y return boxes def _tensors_to_detections(raw_box_tensor, raw_score_tensor, anchors): """ 将神经网络输出转换为检测结果 神经网络输出是一个形状为(b, 896, 16)的张量,包含边界框回归预测, 以及一个形状为(b, 896, 1)的张量,包含分类置信度。 此函数将这两个"原始"张量转换为适当的检测结果。 返回一个(num_detections, 17)的张量列表,每个张量对应批次中的一张图像。 """ detection_boxes = _decode_boxes(raw_box_tensor, anchors) thresh = 100.0 raw_score_tensor = np.clip(raw_score_tensor, -thresh, thresh) detection_scores = expit(raw_score_tensor) # 注意:我们从分数张量中去掉了最后一个维度,因为只有一个类别。 # 现在我们可以简单地使用掩码来过滤掉置信度太低的框。 mask = detection_scores >= 0.75 # 由于批次中的每张图像可能有不同数量的检测结果, # 因此使用循环一次处理一个图像。 boxes = detection_boxes[mask] scores = detection_scores[mask] scores = scores[..., np.newaxis] return np.hstack((boxes, scores)) def py_cpu_nms(dets, thresh): """ 纯Python实现的非极大值抑制算法 用于过滤重叠的检测框,保留置信度最高的框。 """ x1 = dets[:, 0] y1 = dets[:, 1] x2 = dets[:, 2] y2 = dets[:, 3] scores = dets[:, 12] areas = (x2 - x1 + 1) * (y2 - y1 + 1) # 按置信度从大到小排序,获取索引 order = scores.argsort()[::-1] # keep列表存储最终保留的边框 keep = [] while order.size > 0: # order[0]是当前分数最大的窗口,肯定要保留 i = order[0] keep.append(dets[i]) # 计算窗口i与其他所有窗口的交叠部分的面积,矩阵计算 xx1 = np.maximum(x1[i], x1[order[1:]]) yy1 = np.maximum(y1[i], y1[order[1:]]) xx2 = np.minimum(x2[i], x2[order[1:]]) yy2 = np.minimum(y2[i], y2[order[1:]]) w = np.maximum(0.0, xx2 - xx1 + 1) h = np.maximum(0.0, yy2 - yy1 + 1) inter = w * h # 计算IoU(交并比) ovr = inter / (areas[i] + areas[order[1:]] - inter) # ind为所有与窗口i的IoU值小于阈值的窗口的索引 inds = np.where(ovr <= thresh)[0] # 下一次计算前要把窗口i去除,所以索引加1 order = order[inds + 1] return keep def denormalize_detections(detections, scale, pad): """ 将归一化的检测坐标映射回原始图像坐标 人脸和手掌检测器网络需要256x256和128x128的输入图像, 因此输入图像会被填充和缩放。此函数将归一化坐标映射回原始图像坐标。 输入: detections: nxm张量。n是检测到的对象数量。 m是4+2*k,其中前4个值是边界框坐标,k是检测器输出的额外关键点数量。 scale: 用于调整图像大小的缩放因子 pad: x和y维度上的填充量 """ detections[:, 0] = detections[:, 0] * scale * 256 - pad[0] detections[:, 1] = detections[:, 1] * scale * 256 - pad[1] detections[:, 2] = detections[:, 2] * scale * 256 - pad[0] detections[:, 3] = detections[:, 3] * scale * 256 - pad[1] detections[:, 4::2] = detections[:, 4::2] * scale * 256 - pad[1] detections[:, 5::2] = detections[:, 5::2] * scale * 256 - pad[0] return detections def detection2roi(detection): """ 将检测器的检测结果转换为有方向的边界框 边界框的中心和大小由检测框的中心计算得出。 旋转角度由关键点1和关键点2之间的向量相对于theta0计算得出。 边界框会按dscale进行缩放,并按dy进行偏移。 """ kp1 = 2 kp2 = 3 theta0 = 90 * np.pi / 180 dscale = 1.5 dy = 0. xc = detection[:,4+2*kp1] yc = detection[:,4+2*kp1+1] x1 = detection[:,4+2*kp2] y1 = detection[:,4+2*kp2+1] scale = np.sqrt((xc-x1)**2 + (yc-y1)**2) * 2 yc += dy * scale scale *= dscale # 计算边界框旋转角度 x0 = detection[:,4+2*kp1] y0 = detection[:,4+2*kp1+1] x1 = detection[:,4+2*kp2] y1 = detection[:,4+2*kp2+1] theta = np.arctan2(y0-y1, x0-x1) - theta0 return xc, yc, scale, theta def extract_roi(frame, xc, yc, theta, scale): """ 从原始帧中提取感兴趣区域 根据给定的中心点、旋转角度和尺度,从原始帧中提取并旋转ROI区域。 """ # 在单位正方形上取点,并根据ROI参数进行变换 points = np.array([[-1, -1, 1, 1], [-1, 1, -1, 1]], dtype=np.float32).reshape(1,2,4) points = points * scale.reshape(-1,1,1)/2 theta = theta.reshape(-1, 1, 1) R = np.concatenate(( np.concatenate((np.cos(theta), -np.sin(theta)), 2), np.concatenate((np.sin(theta), np.cos(theta)), 2), ), 1) center = np.concatenate((xc.reshape(-1,1,1), yc.reshape(-1,1,1)), 1) points = R @ points + center # 使用这些点计算仿射变换,将这些点映射回输出正方形 res = 256 points1 = np.array([[0, 0, res-1], [0, res-1, 0]], dtype=np.float32).T affines = [] imgs = [] for i in range(points.shape[0]): pts = points[i, :, :3].T print('pts', pts.shape, points1.shape, pts.dtype, points1.dtype) M = cv2.getAffineTransform(pts, points1) img = cv2.warpAffine(frame, M, (res,res))#, borderValue=127.5) imgs.append(img) affine = cv2.invertAffineTransform(M).astype('float32') affines.append(affine) if imgs: imgs = np.stack(imgs).astype(np.float32) / 255.#/ 127.5 - 1.0 affines = np.stack(affines) else: imgs = np.zeros((0, 3, res, res)) affines = np.zeros((0, 2, 3)) return imgs, affines, points def denormalize_landmarks(landmarks, affines): """ 将归一化的关键点坐标映射回原始图像坐标 使用仿射变换矩阵将关键点坐标从归一化空间映射回原始图像空间。 """ for i in range(len(landmarks)): landmark, affine = landmarks[i], affines[i] landmark = (affine[:,:2] @ landmark[:,:2].T + affine[:,2:]).T landmarks[i,:,:2] = landmark return landmarks def draw_detections(img, detections, with_keypoints=True): """ 在图像上绘制检测结果 在图像上绘制边界框和关键点。 """ if detections.ndim == 1: detections = np.expand_dims(detections, axis=0) n_keypoints = detections.shape[1] // 2 - 2 for i in range(detections.shape[0]): ymin = detections[i, 0] xmin = detections[i, 1] ymax = detections[i, 2] xmax = detections[i, 3] start_point = (int(xmin), int(ymin)) end_point = (int(xmax), int(ymax)) img = cv2.rectangle(img, start_point, end_point, (255, 0, 0), 1) if with_keypoints: for k in range(n_keypoints): kp_x = int(detections[i, 4 + k*2 ]) kp_y = int(detections[i, 4 + k*2 + 1]) cv2.circle(img, (kp_x, kp_y), 2, (0, 0, 255), thickness=2) return img def draw_roi(img, roi): """ 在图像上绘制感兴趣区域 在图像上绘制ROI的边界框。 """ for i in range(roi.shape[0]): (x1,x2,x3,x4), (y1,y2,y3,y4) = roi[i] cv2.line(img, (int(x1), int(y1)), (int(x2), int(y2)), (0,0,0), 2) cv2.line(img, (int(x1), int(y1)), (int(x3), int(y3)), (0,255,0), 2) cv2.line(img, (int(x2), int(y2)), (int(x4), int(y4)), (0,0,0), 2) cv2.line(img, (int(x3), int(y3)), (int(x4), int(y4)), (0,0,0), 2) def draw_landmarks(img, points, connections=[], color=(255, 255, 0), size=2): """ 在图像上绘制关键点和连接线 在图像上绘制检测到的关键点,并根据连接列表连接相关关键点。 """ for point in points: x, y = point x, y = int(x), int(y) cv2.circle(img, (x, y), size, color, thickness=size) for connection in connections: x0, y0 = points[connection[0]] x1, y1 = points[connection[1]] x0, y0 = int(x0), int(y0) x1, y1 = int(x1), int(y1) cv2.line(img, (x0, y0), (x1, y1), (255,255,255), size) def get_cap_id(): """ 获取可用的USB摄像头ID 使用shell命令查找连接的USB摄像头,并返回最小的摄像头ID。 """ try: # 构造命令,使用awk处理输出 cmd = "ls -l /sys/class/video4linux | awk -F ' -> ' '/usb/{sub(/.*video/, \"\", $2); print $2}'" result = subprocess.run(cmd, shell=True, capture_output=True, text=True) output = result.stdout.strip().split() # 转换所有捕获的编号为整数,找出最小值 video_numbers = list(map(int, output)) if video_numbers: return min(video_numbers) else: return None except Exception as e: print(f"An error occurred: {e}") return None # 初始化两个模型:姿态检测模型和上半身姿态关键点模型model_path = 'models/pose_detection.tflite'model_pose = 'models/pose_landmark_upper_body.tflite' # 姿态检测模型的输入和输出形状配置inShape =[[1 , 128 , 128 ,3]]outShape = [[1,896,12,4], [1,896,1,4]]# 创建Model实例对象,并设置模型相关参数model = aidlite.Model.create_instance(model_path)if model is None: print("Create pose_detection model failed !") # 设置模型属性model.set_model_properties(inShape, aidlite.DataType.TYPE_FLOAT32, outShape,aidlite.DataType.TYPE_FLOAT32) # 创建Config实例对象,并设置配置信息config = aidlite.Config.create_instance()config.implement_type = aidlite.ImplementType.TYPE_FASTconfig.framework_type = aidlite.FrameworkType.TYPE_TFLITEconfig.accelerate_type = aidlite.AccelerateType.TYPE_CPUconfig.number_of_threads = 4 # 创建推理解释器对象fast_interpreter = aidlite.InterpreterBuilder.build_interpretper_from_model_and_config(model, config)if fast_interpreter is None: print("pose_detection model build_interpretper_from_model_and_config failed !")# 完成解释器初始化result = fast_interpreter.init()if result != 0: print("pose_detection model interpreter init failed !")# 加载模型result = fast_interpreter.load_model()if result != 0: print("pose_detection model interpreter load model failed !")print("pose_detection model load success!") # 上半身姿态关键点模型的输入和输出形状配置inShape1 =[[1 , 256 , 256 ,3]]outShape1 = [[1,155,4,1], [1,1,4,1], [1,128,128,1]]# 创建Model实例对象,并设置模型相关参数model1 = aidlite.Model.create_instance(model_pose)if model1 is None: print("Create pose_landmark_upper_body model failed !")# 设置模型属性model1.set_model_properties(inShape1, aidlite.DataType.TYPE_FLOAT32, outShape1,aidlite.DataType.TYPE_FLOAT32)# 创建Config实例对象,并设置配置信息config1 = aidlite.Config.create_instance()config1.implement_type = aidlite.ImplementType.TYPE_FASTconfig1.framework_type = aidlite.FrameworkType.TYPE_TFLITEconfig1.accelerate_type = aidlite.AccelerateType.TYPE_GPUconfig1.number_of_threads = 4 # 创建推理解释器对象fast_interpreter1 = aidlite.InterpreterBuilder.build_interpretper_from_model_and_config(model1, config1)if fast_interpreter1 is None: print("pose_landmark_upper_body model build_interpretper_from_model_and_config failed !")# 完成解释器初始化result = fast_interpreter1.init()if result != 0: print("pose_landmark_upper_body model interpreter init failed !")# 加载模型result = fast_interpreter1.load_model()if result != 0: print("pose_landmark_upper_body model interpreter load model failed !")print("pose_landmark_upper_body model load success!") # 姿态关键点连接列表,定义了哪些关键点应该连接起来形成骨架POSE_CONNECTIONS = [ (0,1), (1,2), (2,3), (3,7), (0,4), (4,5), (5,6), (6,8), (9,10), (11,13), (13,15), (15,17), (17,19), (19,15), (15,21), (12,14), (14,16), (16,18), (18,20), (20,16), (16,22), (11,12), (12,24), (24,23), (23,11)]# 加载锚框数据,用于边界框解码anchors = np.load('models/anchors.npy') # 设置运行环境类型和摄像头IDaidlux_type="root"# 0-后置,1-前置camId = 1opened = False# 尝试打开摄像头,优先使用USB摄像头while not opened: if aidlux_type == "basic": cap=cv2.VideoCapture(camId, device='mipi') else: capId = get_cap_id() print("usb camera id: ", capId) if capId is None: print ("no found usb camera") # 默认用1-前置摄像头打开相机,若打开失败,请尝试修改为0-后置 cap=cv2.VideoCapture(1, device='mipi') else: camId = capId cap = cv2.VideoCapture(camId) cap.set(6, cv2.VideoWriter.fourcc('M','J','P','G')) if cap.isOpened(): opened = True else: print("open camera failed") cap.release() time.sleep(0.5) # 主循环:读取摄像头帧并进行姿态检测和关键点识别while True: ret, image=cap.read() if not ret: continue if image is None: continue # 水平翻转图像,使显示更直观 image_roi=cv2.flip(image,1) # 转换颜色空间,从BGR转为RGB frame = cv2.cvtColor(image_roi, cv2.COLOR_BGR2RGB) # 调整图像大小并填充,准备输入到模型 img1, img2, scale, pad = resize_pad(frame) img2 = img2.astype(np.float32) img2 = img2 / 255.# 127.5 - 1.0 start_time = time.time() # 设置输入数据并执行姿态检测模型推理 result = fast_interpreter.set_input_tensor(0, img2.data) if result != 0: print("pose_detection model interpreter set_input_tensor() failed") result = fast_interpreter.invoke() if result != 0: print("pose_detection model interpreter invoke() failed") # 获取姿态检测模型的输出数据 stride32 = fast_interpreter.get_output_tensor(0) if stride32 is None: print("sample : pose_detection model interpreter->get_output_tensor(0) failed !") bboxes = stride32.reshape(896, -1) scores = fast_interpreter.get_output_tensor(1) # 将模型输出转换为检测结果 detections = _tensors_to_detections(bboxes, scores, anchors) # 应用非极大值抑制,过滤重叠的检测框 normalized_pose_detections = py_cpu_nms(detections, 0.3) # 处理检测结果,将归一化坐标映射回原始图像坐标 normalized_pose_detections = np.stack(normalized_pose_detections ) if len(normalized_pose_detections ) > 0 else np.zeros((0, 12+1)) pose_detections = denormalize_detections(normalized_pose_detections, scale, pad) # 如果检测到姿态 if len(pose_detections) >0: # 从检测结果中提取感兴趣区域 xc, yc, scale, theta = detection2roi(pose_detections) img, affine, box = extract_roi(frame, xc, yc, theta, scale) # 设置输入数据并执行上半身姿态关键点模型推理 result = fast_interpreter1.set_input_tensor(0, img.data) if result != 0: print("pose_landmark_upper_body model interpreter set_input_tensor() failed") result = fast_interpreter1.invoke() if result != 0: print("pose_landmark_upper_body model interpreter invoke() failed") # 获取上半身姿态关键点模型的输出数据 stride8 = fast_interpreter1.get_output_tensor(1) if stride8 is None: print("sample : interpreter->get_output_tensor(1) failed !") flags = stride8.reshape(-1, 1) mask = fast_interpreter1.get_output_tensor(2) if mask is None: print("sample : interpreter->get_output_tensor(2) failed !") stride32 = fast_interpreter1.get_output_tensor(0) if stride32 is None: print("sample : interpreter->get_output_tensor(0) failed !") normalized_landmarks = stride32.copy().reshape(1, 31, -1) # 将归一化的关键点坐标映射回原始图像坐标 landmarks = denormalize_landmarks(normalized_landmarks, affine) # 在图像上绘制感兴趣区域和姿态关键点 draw_roi(image_roi, box) for i in range(len(flags)): landmark, flag = landmarks[i], flags[i] if flag>.5: draw_landmarks(image_roi, landmark[:,:2], POSE_CONNECTIONS, size=2) # 显示处理后的图像 cv2.imshow("",image_roi)
模型位置
/opt/aidlux/app/aid-examples/pose_detect_track