Spaces:
Runtime error
Runtime error
| import math | |
| from operator import itemgetter | |
| import cv2 | |
| import numpy as np | |
| from pythreejs import * | |
| class Engine3js: | |
| """ | |
| The implementation of these interfaces depends on Pythreejs, so make | |
| sure you install and authorize the Jupyter Widgets plug-in correctly. | |
| Attributes: | |
| view_width, view_height: Size of the view window. | |
| position: Position of the camera. | |
| lookAtPos: The point at which the camera looks at. | |
| axis_size: The size of axis. | |
| grid_length: grid size, length == width. | |
| grid_num: grid number. | |
| grid: If use grid. | |
| axis: If use axis. | |
| """ | |
| def __init__( | |
| self, | |
| view_width=455, | |
| view_height=256, | |
| position=[300, 100, 0], | |
| lookAtPos=[0, 0, 0], | |
| axis_size=300, | |
| grid_length=600, | |
| grid_num=20, | |
| grid=False, | |
| axis=False, | |
| ): | |
| self.view_width = view_width | |
| self.view_height = view_height | |
| self.position = position | |
| # set the camera | |
| self.cam = PerspectiveCamera(position=self.position, aspect=self.view_width / self.view_height) | |
| self.cam.lookAt(lookAtPos) | |
| # x,y,z axis | |
| self.axis = AxesHelper(axis_size) # axes length | |
| # set grid size | |
| self.gridHelper = GridHelper(grid_length, grid_num) | |
| # set scene | |
| self.scene = Scene( | |
| children=[ | |
| self.cam, | |
| DirectionalLight(position=[3, 5, 1], intensity=0.6), | |
| AmbientLight(intensity=0.5), | |
| ] | |
| ) | |
| # add axis or grid | |
| if grid: | |
| self.scene.add(self.gridHelper) | |
| if axis: | |
| self.scene.add(self.axis) | |
| # render the objects in scene | |
| self.renderer = Renderer( | |
| camera=self.cam, | |
| scene=self.scene, | |
| controls=[OrbitControls(controlling=self.cam)], | |
| width=self.view_width, | |
| height=self.view_height, | |
| ) | |
| # display(renderer4) | |
| def get_width(self): | |
| return self.view_width | |
| def plot(self): | |
| self.renderer.render(self.scene, self.cam) | |
| def scene_add(self, object): | |
| self.scene.add(object) | |
| def scene_remove(self, object): | |
| self.scene.remove(object) | |
| class Geometry: | |
| """ | |
| This is the geometry base class that defines buffer and material. | |
| """ | |
| def __init__(self, name="geometry"): | |
| self.geometry = None | |
| self.material = None | |
| self.name = name | |
| def get_Name(): | |
| return self.name | |
| class Skeleton(Geometry): | |
| """ | |
| This is the class for drawing human body poses. | |
| """ | |
| def __init__(self, name="skeleton", lineWidth=3, body_edges=[]): | |
| super(Skeleton, self).__init__(name) | |
| self.material = LineBasicMaterial(vertexColors="VertexColors", linewidth=lineWidth) | |
| self.colorSet = BufferAttribute( | |
| np.array( | |
| [ | |
| [1, 0, 0], | |
| [1, 0, 0], | |
| [0, 1, 0], | |
| [0, 0, 1], | |
| [1, 0, 0], | |
| [1, 0, 0], | |
| [0, 1, 0], | |
| [0, 0, 1], | |
| [1, 0, 0], | |
| [1, 0, 0], | |
| [0, 1, 0], | |
| [0, 0, 1], | |
| [1, 0, 0], | |
| [1, 0, 0], | |
| [0, 1, 0], | |
| [0, 0, 1], | |
| [1, 0, 0], | |
| [1, 0, 0], | |
| [0, 1, 0], | |
| ], | |
| dtype=np.float32, | |
| ), | |
| normalized=False, | |
| ) | |
| # self.geometry.attributes["color"] = self.colorSet | |
| self.body_edges = body_edges | |
| def __call__(self, poses_3d): | |
| poses = [] | |
| for pose_position_tmp in poses_3d: | |
| bones = [] | |
| for edge in self.body_edges: | |
| # put pair of points as limbs | |
| bones.append(pose_position_tmp[edge[0]]) | |
| bones.append(pose_position_tmp[edge[1]]) | |
| bones = np.asarray(bones, dtype=np.float32) | |
| # You can find the api in https://github.com/jupyter-widgets/pythreejs | |
| self.geometry = BufferGeometry( | |
| attributes={ | |
| "position": BufferAttribute(bones, normalized=False), | |
| # It defines limbs' color | |
| "color": self.colorSet, | |
| } | |
| ) | |
| pose = LineSegments(self.geometry, self.material) | |
| poses.append(pose) | |
| # self.geometry.close() | |
| return poses | |
| def plot(self, pose_points=None): | |
| return self.__call__(pose_points) | |
| class Cloudpoint(Geometry): | |
| """ | |
| This is the class for drawing cloud points. | |
| """ | |
| def __init__(self, name="cloudpoint", points=[], point_size=5, line=None, points_color="blue"): | |
| super(Cloudpoint, self).__init__(name) | |
| self.material = PointsMaterial(size=point_size, color=points_color) | |
| self.points = points | |
| self.line = line | |
| def __call__(self, points_3d): | |
| self.geometry = BufferGeometry( | |
| attributes={ | |
| "position": BufferAttribute(points_3d, normalized=False), | |
| # It defines points' vertices' color | |
| "color": BufferAttribute( | |
| np.array( | |
| [ | |
| [1, 0, 0], | |
| [1, 0, 0], | |
| [0, 1, 0], | |
| [0, 0, 1], | |
| [1, 0, 0], | |
| [1, 0, 0], | |
| [0, 1, 0], | |
| [0, 0, 1], | |
| [1, 0, 0], | |
| [1, 0, 0], | |
| [0, 1, 0], | |
| [0, 0, 1], | |
| [1, 0, 0], | |
| [1, 0, 0], | |
| [0, 1, 0], | |
| [0, 0, 1], | |
| [1, 0, 0], | |
| [1, 0, 0], | |
| [0, 1, 0], | |
| ], | |
| dtype=np.float32, | |
| ), | |
| normalized=False, | |
| ), | |
| }, | |
| ) | |
| cloud_points = Points(self.geometry, self.material) | |
| if self.line is not None: | |
| g1 = BufferGeometry( | |
| attributes={ | |
| "position": BufferAttribute(line, normalized=False), | |
| # It defines limbs' color | |
| "color": BufferAttribute( | |
| # Here you can set vertex colors, if you set the 'color' option = vertexes | |
| np.array( | |
| [ | |
| [1, 0, 0], | |
| [1, 0, 0], | |
| [0, 1, 0], | |
| [0, 0, 1], | |
| [1, 0, 0], | |
| [1, 0, 0], | |
| [0, 1, 0], | |
| [0, 0, 1], | |
| [1, 0, 0], | |
| [1, 0, 0], | |
| [0, 1, 0], | |
| [0, 0, 1], | |
| [1, 0, 0], | |
| [1, 0, 0], | |
| [0, 1, 0], | |
| [0, 0, 1], | |
| [1, 0, 0], | |
| [1, 0, 0], | |
| [0, 1, 0], | |
| ], | |
| dtype=np.float32, | |
| ), | |
| normalized=False, | |
| ), | |
| }, | |
| ) | |
| m1 = LineBasicMaterial(color="red", linewidth=3) | |
| facemesh = LineSegments(g1, m1) | |
| return [cloud_points, facemesh] | |
| return cloud_points | |
| def Box_bounding(Geometry): | |
| def __init__(self, name="Box", lineWidth=3): | |
| super(Box_bounding, self).__init__(name) | |
| self.material = LineBasicMaterial(vertexColors="VertexColors", linewidth=lineWidth) | |
| self.edge = [] | |
| def __call__(self, points=None): | |
| pass | |
| class Pose: | |
| num_kpts = 18 | |
| kpt_names = [ | |
| "neck", | |
| "nose", | |
| "l_sho", | |
| "l_elb", | |
| "l_wri", | |
| "l_hip", | |
| "l_knee", | |
| "l_ank", | |
| "r_sho", | |
| "r_elb", | |
| "r_wri", | |
| "r_hip", | |
| "r_knee", | |
| "r_ank", | |
| "r_eye", | |
| "l_eye", | |
| "r_ear", | |
| "l_ear", | |
| ] | |
| sigmas = ( | |
| np.array( | |
| [ | |
| 0.79, | |
| 0.26, | |
| 0.79, | |
| 0.72, | |
| 0.62, | |
| 1.07, | |
| 0.87, | |
| 0.89, | |
| 0.79, | |
| 0.72, | |
| 0.62, | |
| 1.07, | |
| 0.87, | |
| 0.89, | |
| 0.25, | |
| 0.25, | |
| 0.35, | |
| 0.35, | |
| ], | |
| dtype=np.float32, | |
| ) | |
| / 10.0 | |
| ) | |
| vars = (sigmas * 2) ** 2 | |
| last_id = -1 | |
| color = [0, 224, 255] | |
| def __init__(self, keypoints, confidence): | |
| super().__init__() | |
| self.keypoints = keypoints | |
| self.confidence = confidence | |
| found_keypoints = np.zeros((np.count_nonzero(keypoints[:, 0] != -1), 2), dtype=np.int32) | |
| found_kpt_id = 0 | |
| for kpt_id in range(keypoints.shape[0]): | |
| if keypoints[kpt_id, 0] == -1: | |
| continue | |
| found_keypoints[found_kpt_id] = keypoints[kpt_id] | |
| found_kpt_id += 1 | |
| self.bbox = cv2.boundingRect(found_keypoints) | |
| self.id = None | |
| self.translation_filter = [ | |
| OneEuroFilter(freq=80, beta=0.01), | |
| OneEuroFilter(freq=80, beta=0.01), | |
| OneEuroFilter(freq=80, beta=0.01), | |
| ] | |
| def update_id(self, id=None): | |
| self.id = id | |
| if self.id is None: | |
| self.id = Pose.last_id + 1 | |
| Pose.last_id += 1 | |
| def filter(self, translation): | |
| filtered_translation = [] | |
| for coordinate_id in range(3): | |
| filtered_translation.append(self.translation_filter[coordinate_id](translation[coordinate_id])) | |
| return filtered_translation | |
| def get_similarity(a, b, threshold=0.5): | |
| num_similar_kpt = 0 | |
| for kpt_id in range(Pose.num_kpts): | |
| if a.keypoints[kpt_id, 0] != -1 and b.keypoints[kpt_id, 0] != -1: | |
| distance = np.sum((a.keypoints[kpt_id] - b.keypoints[kpt_id]) ** 2) | |
| area = max(a.bbox[2] * a.bbox[3], b.bbox[2] * b.bbox[3]) | |
| similarity = np.exp(-distance / (2 * (area + np.spacing(1)) * Pose.vars[kpt_id])) | |
| if similarity > threshold: | |
| num_similar_kpt += 1 | |
| return num_similar_kpt | |
| def propagate_ids(previous_poses, current_poses, threshold=3): | |
| """Propagate poses ids from previous frame results. Id is propagated, | |
| if there are at least `threshold` similar keypoints between pose from previous frame and current. | |
| :param previous_poses: poses from previous frame with ids | |
| :param current_poses: poses from current frame to assign ids | |
| :param threshold: minimal number of similar keypoints between poses | |
| :return: None | |
| """ | |
| current_poses_sorted_ids = list(range(len(current_poses))) | |
| current_poses_sorted_ids = sorted( | |
| current_poses_sorted_ids, | |
| key=lambda pose_id: current_poses[pose_id].confidence, | |
| reverse=True, | |
| ) # match confident poses first | |
| mask = np.ones(len(previous_poses), dtype=np.int32) | |
| for current_pose_id in current_poses_sorted_ids: | |
| best_matched_id = None | |
| best_matched_pose_id = None | |
| best_matched_iou = 0 | |
| for previous_pose_id in range(len(previous_poses)): | |
| if not mask[previous_pose_id]: | |
| continue | |
| iou = get_similarity(current_poses[current_pose_id], previous_poses[previous_pose_id]) | |
| if iou > best_matched_iou: | |
| best_matched_iou = iou | |
| best_matched_pose_id = previous_poses[previous_pose_id].id | |
| best_matched_id = previous_pose_id | |
| if best_matched_iou >= threshold: | |
| mask[best_matched_id] = 0 | |
| else: # pose not similar to any previous | |
| best_matched_pose_id = None | |
| current_poses[current_pose_id].update_id(best_matched_pose_id) | |
| if best_matched_pose_id is not None: | |
| current_poses[current_pose_id].translation_filter = previous_poses[best_matched_id].translation_filter | |
| AVG_PERSON_HEIGHT = 180 | |
| # pelvis (body center) is missing, id == 2 | |
| map_id_to_panoptic = [1, 0, 9, 10, 11, 3, 4, 5, 12, 13, 14, 6, 7, 8, 15, 16, 17, 18] | |
| limbs = [[18, 17, 1], [16, 15, 1], [5, 4, 3], [8, 7, 6], [11, 10, 9], [14, 13, 12]] | |
| def get_root_relative_poses(inference_results): | |
| features, heatmap, paf_map = inference_results | |
| upsample_ratio = 4 | |
| found_poses = extract_poses(heatmap[0:-1], paf_map, upsample_ratio)[0] | |
| # scale coordinates to features space | |
| found_poses[:, 0:-1:3] /= upsample_ratio | |
| found_poses[:, 1:-1:3] /= upsample_ratio | |
| poses_2d = [] | |
| num_kpt_panoptic = 19 | |
| num_kpt = 18 | |
| for pose_id in range(found_poses.shape[0]): | |
| if found_poses[pose_id, 5] == -1: # skip pose if is not found neck | |
| continue | |
| pose_2d = np.ones(num_kpt_panoptic * 3 + 1, dtype=np.float32) * -1 # +1 for pose confidence | |
| for kpt_id in range(num_kpt): | |
| if found_poses[pose_id, kpt_id * 3 + 2] != -1: | |
| x_2d, y_2d = found_poses[pose_id, kpt_id * 3 : kpt_id * 3 + 2] | |
| conf = found_poses[pose_id, kpt_id * 3 + 2] | |
| pose_2d[map_id_to_panoptic[kpt_id] * 3] = x_2d # just repacking | |
| pose_2d[map_id_to_panoptic[kpt_id] * 3 + 1] = y_2d | |
| pose_2d[map_id_to_panoptic[kpt_id] * 3 + 2] = conf | |
| pose_2d[-1] = found_poses[pose_id, -1] | |
| poses_2d.append(pose_2d) | |
| keypoint_treshold = 0.1 | |
| poses_3d = np.ones((len(poses_2d), num_kpt_panoptic * 4), dtype=np.float32) * -1 | |
| for pose_id in range(len(poses_3d)): | |
| if poses_2d[pose_id][2] > keypoint_treshold: | |
| neck_2d = poses_2d[pose_id][:2].astype(int) | |
| # read all pose coordinates at neck location | |
| for kpt_id in range(num_kpt_panoptic): | |
| map_3d = features[kpt_id * 3 : (kpt_id + 1) * 3] | |
| poses_3d[pose_id][kpt_id * 4] = map_3d[0, neck_2d[1], neck_2d[0]] * AVG_PERSON_HEIGHT | |
| poses_3d[pose_id][kpt_id * 4 + 1] = map_3d[1, neck_2d[1], neck_2d[0]] * AVG_PERSON_HEIGHT | |
| poses_3d[pose_id][kpt_id * 4 + 2] = map_3d[2, neck_2d[1], neck_2d[0]] * AVG_PERSON_HEIGHT | |
| poses_3d[pose_id][kpt_id * 4 + 3] = poses_2d[pose_id][kpt_id * 3 + 2] | |
| # refine keypoints coordinates at corresponding limbs locations | |
| for limb in limbs: | |
| for kpt_id_from in limb: | |
| if poses_2d[pose_id][kpt_id_from * 3 + 2] > keypoint_treshold: | |
| for kpt_id_where in limb: | |
| kpt_from_2d = poses_2d[pose_id][kpt_id_from * 3 : kpt_id_from * 3 + 2].astype(int) | |
| map_3d = features[kpt_id_where * 3 : (kpt_id_where + 1) * 3] | |
| poses_3d[pose_id][kpt_id_where * 4] = map_3d[0, kpt_from_2d[1], kpt_from_2d[0]] * AVG_PERSON_HEIGHT | |
| poses_3d[pose_id][kpt_id_where * 4 + 1] = map_3d[1, kpt_from_2d[1], kpt_from_2d[0]] * AVG_PERSON_HEIGHT | |
| poses_3d[pose_id][kpt_id_where * 4 + 2] = map_3d[2, kpt_from_2d[1], kpt_from_2d[0]] * AVG_PERSON_HEIGHT | |
| break | |
| return poses_3d, np.array(poses_2d), features.shape | |
| previous_poses_2d = [] | |
| def parse_poses(inference_results, input_scale, stride, fx, is_video=False): | |
| global previous_poses_2d | |
| poses_3d, poses_2d, features_shape = get_root_relative_poses(inference_results) | |
| poses_2d_scaled = [] | |
| for pose_2d in poses_2d: | |
| num_kpt = (pose_2d.shape[0] - 1) // 3 | |
| pose_2d_scaled = np.ones(pose_2d.shape[0], dtype=np.float32) * -1 # +1 for pose confidence | |
| for kpt_id in range(num_kpt): | |
| if pose_2d[kpt_id * 3 + 2] != -1: | |
| pose_2d_scaled[kpt_id * 3] = int(pose_2d[kpt_id * 3] * stride / input_scale) | |
| pose_2d_scaled[kpt_id * 3 + 1] = int(pose_2d[kpt_id * 3 + 1] * stride / input_scale) | |
| pose_2d_scaled[kpt_id * 3 + 2] = pose_2d[kpt_id * 3 + 2] | |
| pose_2d_scaled[-1] = pose_2d[-1] | |
| poses_2d_scaled.append(pose_2d_scaled) | |
| if is_video: # track poses ids | |
| current_poses_2d = [] | |
| for pose_id in range(len(poses_2d_scaled)): | |
| pose_keypoints = np.ones((Pose.num_kpts, 2), dtype=np.int32) * -1 | |
| for kpt_id in range(Pose.num_kpts): | |
| if poses_2d_scaled[pose_id][kpt_id * 3 + 2] != -1.0: # keypoint is found | |
| pose_keypoints[kpt_id, 0] = int(poses_2d_scaled[pose_id][kpt_id * 3 + 0]) | |
| pose_keypoints[kpt_id, 1] = int(poses_2d_scaled[pose_id][kpt_id * 3 + 1]) | |
| pose = Pose(pose_keypoints, poses_2d_scaled[pose_id][-1]) | |
| current_poses_2d.append(pose) | |
| propagate_ids(previous_poses_2d, current_poses_2d) | |
| previous_poses_2d = current_poses_2d | |
| translated_poses_3d = [] | |
| # translate poses | |
| for pose_id in range(len(poses_3d)): | |
| pose_3d = poses_3d[pose_id].reshape((-1, 4)).transpose() | |
| pose_2d = poses_2d[pose_id][:-1].reshape((-1, 3)).transpose() | |
| num_valid = np.count_nonzero(pose_2d[2] != -1) | |
| pose_3d_valid = np.zeros((3, num_valid), dtype=np.float32) | |
| pose_2d_valid = np.zeros((2, num_valid), dtype=np.float32) | |
| valid_id = 0 | |
| for kpt_id in range(pose_3d.shape[1]): | |
| if pose_2d[2, kpt_id] == -1: | |
| continue | |
| pose_3d_valid[:, valid_id] = pose_3d[0:3, kpt_id] | |
| pose_2d_valid[:, valid_id] = pose_2d[0:2, kpt_id] | |
| valid_id += 1 | |
| pose_2d_valid[0] = pose_2d_valid[0] - features_shape[2] / 2 | |
| pose_2d_valid[1] = pose_2d_valid[1] - features_shape[1] / 2 | |
| mean_3d = np.expand_dims(pose_3d_valid.mean(axis=1), axis=1) | |
| mean_2d = np.expand_dims(pose_2d_valid.mean(axis=1), axis=1) | |
| numerator = np.trace( | |
| np.dot( | |
| (pose_3d_valid[:2, :] - mean_3d[:2, :]).transpose(), | |
| pose_3d_valid[:2, :] - mean_3d[:2, :], | |
| ) | |
| ).sum() | |
| numerator = np.sqrt(numerator) | |
| denominator = np.sqrt( | |
| np.trace( | |
| np.dot( | |
| (pose_2d_valid[:2, :] - mean_2d[:2, :]).transpose(), | |
| pose_2d_valid[:2, :] - mean_2d[:2, :], | |
| ) | |
| ).sum() | |
| ) | |
| mean_2d = np.array([mean_2d[0, 0], mean_2d[1, 0], fx * input_scale / stride]) | |
| mean_3d = np.array([mean_3d[0, 0], mean_3d[1, 0], 0]) | |
| translation = numerator / denominator * mean_2d - mean_3d | |
| if is_video: | |
| translation = current_poses_2d[pose_id].filter(translation) | |
| for kpt_id in range(19): | |
| pose_3d[0, kpt_id] = pose_3d[0, kpt_id] + translation[0] | |
| pose_3d[1, kpt_id] = pose_3d[1, kpt_id] + translation[1] | |
| pose_3d[2, kpt_id] = pose_3d[2, kpt_id] + translation[2] | |
| translated_poses_3d.append(pose_3d.transpose().reshape(-1)) | |
| return np.array(translated_poses_3d), np.array(poses_2d_scaled) | |
| def get_alpha(rate=30, cutoff=1): | |
| tau = 1 / (2 * math.pi * cutoff) | |
| te = 1 / rate | |
| return 1 / (1 + tau / te) | |
| class LowPassFilter: | |
| def __init__(self): | |
| self.x_previous = None | |
| def __call__(self, x, alpha=0.5): | |
| if self.x_previous is None: | |
| self.x_previous = x | |
| return x | |
| x_filtered = alpha * x + (1 - alpha) * self.x_previous | |
| self.x_previous = x_filtered | |
| return x_filtered | |
| class OneEuroFilter: | |
| def __init__(self, freq=15, mincutoff=1, beta=1, dcutoff=1): | |
| self.freq = freq | |
| self.mincutoff = mincutoff | |
| self.beta = beta | |
| self.dcutoff = dcutoff | |
| self.filter_x = LowPassFilter() | |
| self.filter_dx = LowPassFilter() | |
| self.x_previous = None | |
| self.dx = None | |
| def __call__(self, x): | |
| if self.dx is None: | |
| self.dx = 0 | |
| else: | |
| self.dx = (x - self.x_previous) * self.freq | |
| dx_smoothed = self.filter_dx(self.dx, get_alpha(self.freq, self.dcutoff)) | |
| cutoff = self.mincutoff + self.beta * abs(dx_smoothed) | |
| x_filtered = self.filter_x(x, get_alpha(self.freq, cutoff)) | |
| self.x_previous = x | |
| return x_filtered | |
| BODY_PARTS_KPT_IDS = [ | |
| [1, 2], | |
| [1, 5], | |
| [2, 3], | |
| [3, 4], | |
| [5, 6], | |
| [6, 7], | |
| [1, 8], | |
| [8, 9], | |
| [9, 10], | |
| [1, 11], | |
| [11, 12], | |
| [12, 13], | |
| [1, 0], | |
| [0, 14], | |
| [14, 16], | |
| [0, 15], | |
| [15, 17], | |
| [2, 16], | |
| [5, 17], | |
| ] | |
| BODY_PARTS_PAF_IDS = ( | |
| [12, 13], | |
| [20, 21], | |
| [14, 15], | |
| [16, 17], | |
| [22, 23], | |
| [24, 25], | |
| [0, 1], | |
| [2, 3], | |
| [4, 5], | |
| [6, 7], | |
| [8, 9], | |
| [10, 11], | |
| [28, 29], | |
| [30, 31], | |
| [34, 35], | |
| [32, 33], | |
| [36, 37], | |
| [18, 19], | |
| [26, 27], | |
| ) | |
| def linspace2d(start, stop, n=10): | |
| points = 1 / (n - 1) * (stop - start) | |
| return points[:, None] * np.arange(n) + start[:, None] | |
| def extract_keypoints(heatmap, all_keypoints, total_keypoint_num): | |
| heatmap[heatmap < 0.1] = 0 | |
| heatmap_with_borders = np.pad(heatmap, [(2, 2), (2, 2)], mode="constant") | |
| heatmap_center = heatmap_with_borders[1 : heatmap_with_borders.shape[0] - 1, 1 : heatmap_with_borders.shape[1] - 1] | |
| heatmap_left = heatmap_with_borders[1 : heatmap_with_borders.shape[0] - 1, 2 : heatmap_with_borders.shape[1]] | |
| heatmap_right = heatmap_with_borders[1 : heatmap_with_borders.shape[0] - 1, 0 : heatmap_with_borders.shape[1] - 2] | |
| heatmap_up = heatmap_with_borders[2 : heatmap_with_borders.shape[0], 1 : heatmap_with_borders.shape[1] - 1] | |
| heatmap_down = heatmap_with_borders[0 : heatmap_with_borders.shape[0] - 2, 1 : heatmap_with_borders.shape[1] - 1] | |
| heatmap_peaks = (heatmap_center > heatmap_left) & (heatmap_center > heatmap_right) & (heatmap_center > heatmap_up) & (heatmap_center > heatmap_down) | |
| heatmap_peaks = heatmap_peaks[1 : heatmap_center.shape[0] - 1, 1 : heatmap_center.shape[1] - 1] | |
| keypoints = list(zip(np.nonzero(heatmap_peaks)[1], np.nonzero(heatmap_peaks)[0])) # (w, h) | |
| keypoints = sorted(keypoints, key=itemgetter(0)) | |
| suppressed = np.zeros(len(keypoints), np.uint8) | |
| keypoints_with_score_and_id = [] | |
| keypoint_num = 0 | |
| for i in range(len(keypoints)): | |
| if suppressed[i]: | |
| continue | |
| for j in range(i + 1, len(keypoints)): | |
| if math.sqrt((keypoints[i][0] - keypoints[j][0]) ** 2 + (keypoints[i][1] - keypoints[j][1]) ** 2) < 6: | |
| suppressed[j] = 1 | |
| keypoint_with_score_and_id = ( | |
| keypoints[i][0], | |
| keypoints[i][1], | |
| heatmap[keypoints[i][1], keypoints[i][0]], | |
| total_keypoint_num + keypoint_num, | |
| ) | |
| keypoints_with_score_and_id.append(keypoint_with_score_and_id) | |
| keypoint_num += 1 | |
| all_keypoints.append(keypoints_with_score_and_id) | |
| return keypoint_num | |
| def group_keypoints(all_keypoints_by_type, pafs, pose_entry_size=20, min_paf_score=0.05): | |
| pose_entries = [] | |
| all_keypoints = np.array([item for sublist in all_keypoints_by_type for item in sublist]) | |
| for part_id in range(len(BODY_PARTS_PAF_IDS)): | |
| part_pafs = pafs[BODY_PARTS_PAF_IDS[part_id]] | |
| kpts_a = all_keypoints_by_type[BODY_PARTS_KPT_IDS[part_id][0]] | |
| kpts_b = all_keypoints_by_type[BODY_PARTS_KPT_IDS[part_id][1]] | |
| num_kpts_a = len(kpts_a) | |
| num_kpts_b = len(kpts_b) | |
| kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0] | |
| kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1] | |
| if num_kpts_a == 0 and num_kpts_b == 0: # no keypoints for such body part | |
| continue | |
| elif num_kpts_a == 0: # body part has just 'b' keypoints | |
| for i in range(num_kpts_b): | |
| num = 0 | |
| for j in range(len(pose_entries)): # check if already in some pose, was added by another body part | |
| if pose_entries[j][kpt_b_id] == kpts_b[i][3]: | |
| num += 1 | |
| continue | |
| if num == 0: | |
| pose_entry = np.ones(pose_entry_size) * -1 | |
| pose_entry[kpt_b_id] = kpts_b[i][3] # keypoint idx | |
| pose_entry[-1] = 1 # num keypoints in pose | |
| pose_entry[-2] = kpts_b[i][2] # pose score | |
| pose_entries.append(pose_entry) | |
| continue | |
| elif num_kpts_b == 0: # body part has just 'a' keypoints | |
| for i in range(num_kpts_a): | |
| num = 0 | |
| for j in range(len(pose_entries)): | |
| if pose_entries[j][kpt_a_id] == kpts_a[i][3]: | |
| num += 1 | |
| continue | |
| if num == 0: | |
| pose_entry = np.ones(pose_entry_size) * -1 | |
| pose_entry[kpt_a_id] = kpts_a[i][3] | |
| pose_entry[-1] = 1 | |
| pose_entry[-2] = kpts_a[i][2] | |
| pose_entries.append(pose_entry) | |
| continue | |
| connections = [] | |
| for i in range(num_kpts_a): | |
| kpt_a = np.array(kpts_a[i][0:2]) | |
| for j in range(num_kpts_b): | |
| kpt_b = np.array(kpts_b[j][0:2]) | |
| mid_point = [(), ()] | |
| mid_point[0] = ( | |
| int(round((kpt_a[0] + kpt_b[0]) * 0.5)), | |
| int(round((kpt_a[1] + kpt_b[1]) * 0.5)), | |
| ) | |
| mid_point[1] = mid_point[0] | |
| vec = [kpt_b[0] - kpt_a[0], kpt_b[1] - kpt_a[1]] | |
| vec_norm = math.sqrt(vec[0] ** 2 + vec[1] ** 2) | |
| if vec_norm == 0: | |
| continue | |
| vec[0] /= vec_norm | |
| vec[1] /= vec_norm | |
| cur_point_score = vec[0] * part_pafs[0, mid_point[0][1], mid_point[0][0]] + vec[1] * part_pafs[1, mid_point[1][1], mid_point[1][0]] | |
| height_n = pafs.shape[1] // 2 | |
| success_ratio = 0 | |
| point_num = 10 # number of points to integration over paf | |
| ratio = 0 | |
| if cur_point_score > -100: | |
| passed_point_score = 0 | |
| passed_point_num = 0 | |
| x, y = linspace2d(kpt_a, kpt_b) | |
| for point_idx in range(point_num): | |
| px = int(x[point_idx]) | |
| py = int(y[point_idx]) | |
| paf = part_pafs[:, py, px] | |
| cur_point_score = vec[0] * paf[0] + vec[1] * paf[1] | |
| if cur_point_score > min_paf_score: | |
| passed_point_score += cur_point_score | |
| passed_point_num += 1 | |
| success_ratio = passed_point_num / point_num | |
| if passed_point_num > 0: | |
| ratio = passed_point_score / passed_point_num | |
| ratio += min(height_n / vec_norm - 1, 0) | |
| if ratio > 0 and success_ratio > 0.8: | |
| score_all = ratio + kpts_a[i][2] + kpts_b[j][2] | |
| connections.append([i, j, ratio, score_all]) | |
| if len(connections) > 0: | |
| connections = sorted(connections, key=itemgetter(2), reverse=True) | |
| num_connections = min(num_kpts_a, num_kpts_b) | |
| has_kpt_a = np.zeros(num_kpts_a, dtype=np.int32) | |
| has_kpt_b = np.zeros(num_kpts_b, dtype=np.int32) | |
| filtered_connections = [] | |
| for row in range(len(connections)): | |
| if len(filtered_connections) == num_connections: | |
| break | |
| i, j, cur_point_score = connections[row][0:3] | |
| if not has_kpt_a[i] and not has_kpt_b[j]: | |
| filtered_connections.append([kpts_a[i][3], kpts_b[j][3], cur_point_score]) | |
| has_kpt_a[i] = 1 | |
| has_kpt_b[j] = 1 | |
| connections = filtered_connections | |
| if len(connections) == 0: | |
| continue | |
| if part_id == 0: | |
| pose_entries = [np.ones(pose_entry_size) * -1 for _ in range(len(connections))] | |
| for i in range(len(connections)): | |
| pose_entries[i][BODY_PARTS_KPT_IDS[0][0]] = connections[i][0] | |
| pose_entries[i][BODY_PARTS_KPT_IDS[0][1]] = connections[i][1] | |
| pose_entries[i][-1] = 2 | |
| pose_entries[i][-2] = np.sum(all_keypoints[connections[i][0:2], 2]) + connections[i][2] | |
| elif part_id == 17 or part_id == 18: | |
| kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0] | |
| kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1] | |
| for i in range(len(connections)): | |
| for j in range(len(pose_entries)): | |
| if pose_entries[j][kpt_a_id] == connections[i][0] and pose_entries[j][kpt_b_id] == -1: | |
| pose_entries[j][kpt_b_id] = connections[i][1] | |
| elif pose_entries[j][kpt_b_id] == connections[i][1] and pose_entries[j][kpt_a_id] == -1: | |
| pose_entries[j][kpt_a_id] = connections[i][0] | |
| continue | |
| else: | |
| kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0] | |
| kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1] | |
| for i in range(len(connections)): | |
| num = 0 | |
| for j in range(len(pose_entries)): | |
| if pose_entries[j][kpt_a_id] == connections[i][0]: | |
| pose_entries[j][kpt_b_id] = connections[i][1] | |
| num += 1 | |
| pose_entries[j][-1] += 1 | |
| pose_entries[j][-2] += all_keypoints[connections[i][1], 2] + connections[i][2] | |
| if num == 0: | |
| pose_entry = np.ones(pose_entry_size) * -1 | |
| pose_entry[kpt_a_id] = connections[i][0] | |
| pose_entry[kpt_b_id] = connections[i][1] | |
| pose_entry[-1] = 2 | |
| pose_entry[-2] = np.sum(all_keypoints[connections[i][0:2], 2]) + connections[i][2] | |
| pose_entries.append(pose_entry) | |
| filtered_entries = [] | |
| for i in range(len(pose_entries)): | |
| if pose_entries[i][-1] < 3 or (pose_entries[i][-2] / pose_entries[i][-1] < 0.2): | |
| continue | |
| filtered_entries.append(pose_entries[i]) | |
| pose_entries = np.asarray(filtered_entries) | |
| return pose_entries, all_keypoints | |
| def extract_poses(heatmaps, pafs, upsample_ratio): | |
| heatmaps = np.transpose(heatmaps, (1, 2, 0)) | |
| pafs = np.transpose(pafs, (1, 2, 0)) | |
| heatmaps = cv2.resize(heatmaps, dsize=None, fx=upsample_ratio, fy=upsample_ratio) | |
| pafs = cv2.resize(pafs, dsize=None, fx=upsample_ratio, fy=upsample_ratio) | |
| heatmaps = np.transpose(heatmaps, (2, 0, 1)) | |
| pafs = np.transpose(pafs, (2, 0, 1)) | |
| num_keypoints = heatmaps.shape[0] | |
| total_keypoints_num = 0 | |
| all_keypoints_by_type = [] | |
| for kpt_idx in range(num_keypoints): | |
| total_keypoints_num += extract_keypoints(heatmaps[kpt_idx], all_keypoints_by_type, total_keypoints_num) | |
| pose_entries, all_keypoints = group_keypoints(all_keypoints_by_type, pafs) | |
| found_poses = [] | |
| for pose_entry in pose_entries: | |
| if len(pose_entry) == 0: | |
| continue | |
| pose_keypoints = np.ones((num_keypoints * 3 + 1), dtype=np.float32) * -1 | |
| for kpt_id in range(num_keypoints): | |
| if pose_entry[kpt_id] != -1.0: | |
| pose_keypoints[kpt_id * 3 + 0] = all_keypoints[int(pose_entry[kpt_id]), 0] | |
| pose_keypoints[kpt_id * 3 + 1] = all_keypoints[int(pose_entry[kpt_id]), 1] | |
| pose_keypoints[kpt_id * 3 + 2] = all_keypoints[int(pose_entry[kpt_id]), 2] | |
| pose_keypoints[-1] = pose_entry[18] | |
| found_poses.append(pose_keypoints) | |
| if not found_poses: | |
| return np.array(found_poses, dtype=np.float32).reshape((0, 0)), None | |
| return np.array(found_poses, dtype=np.float32), None | |