185 lines
6.0 KiB
Python
185 lines
6.0 KiB
Python
import time
|
|
import numpy as np
|
|
|
|
# COCO keypoint indices used by YOLO pose
|
|
_NOSE = 0
|
|
_L_SHOULDER = 5
|
|
_R_SHOULDER = 6
|
|
_L_ELBOW = 7
|
|
_R_ELBOW = 8
|
|
_L_WRIST = 9
|
|
_R_WRIST = 10
|
|
_L_HIP = 11
|
|
_R_HIP = 12
|
|
_L_ANKLE = 15
|
|
_R_ANKLE = 16
|
|
|
|
|
|
# ── helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
def _angle(a, b, c):
|
|
a, b, c = np.array(a), np.array(b), np.array(c)
|
|
rad = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0])
|
|
deg = np.abs(rad * 180.0 / np.pi)
|
|
return 360 - deg if deg > 180 else deg
|
|
|
|
|
|
def _elbow(kps, conf, side):
|
|
sh, el, wr = (_L_SHOULDER, _L_ELBOW, _L_WRIST) if side == 'left' \
|
|
else (_R_SHOULDER, _R_ELBOW, _R_WRIST)
|
|
return _angle(kps[sh], kps[el], kps[wr]), min(conf[sh], conf[el], conf[wr])
|
|
|
|
|
|
def _best_elbow(kps, conf):
|
|
la, lv = _elbow(kps, conf, 'left')
|
|
ra, rv = _elbow(kps, conf, 'right')
|
|
if lv > 0.5 and rv > 0.5:
|
|
return (la + ra) / 2, max(lv, rv)
|
|
return (la, lv) if lv >= rv else (ra, rv)
|
|
|
|
|
|
def _mid(kps, a, b):
|
|
return ((kps[a][0] + kps[b][0]) / 2,
|
|
(kps[a][1] + kps[b][1]) / 2)
|
|
|
|
|
|
def _is_horizontal(kps):
|
|
"""Nose and hips at similar y → body is lying flat (side-on view)."""
|
|
nose_y = kps[_NOSE][1]
|
|
_, hip_y = _mid(kps, _L_HIP, _R_HIP)
|
|
return abs(nose_y - hip_y) < 0.38
|
|
|
|
|
|
def _wrists_above_shoulders(kps, margin=0.05):
|
|
"""At least one wrist clearly above its shoulder (front-on view)."""
|
|
l = kps[_L_WRIST][1] < kps[_L_SHOULDER][1] - margin
|
|
r = kps[_R_WRIST][1] < kps[_R_SHOULDER][1] - margin
|
|
return l or r
|
|
|
|
|
|
def _plank_deviation(kps):
|
|
"""Signed hip deviation from the shoulder-to-ankle line.
|
|
0 = straight. Positive = hips sagging. Negative = hips piking."""
|
|
sh_x, sh_y = _mid(kps, _L_SHOULDER, _R_SHOULDER)
|
|
hi_x, hi_y = _mid(kps, _L_HIP, _R_HIP)
|
|
an_x, an_y = _mid(kps, _L_ANKLE, _R_ANKLE)
|
|
dx = an_x - sh_x
|
|
if abs(dx) < 0.01:
|
|
return 0.0
|
|
t = (hi_x - sh_x) / dx
|
|
return hi_y - (sh_y + t * (an_y - sh_y))
|
|
|
|
|
|
# ── exercise update functions ─────────────────────────────────────────────────
|
|
# All return (new_stage: str | None, rep_counted: bool)
|
|
# Side-on: push-up, curl, sit-up, plank
|
|
# Front-on: pull-up, bench
|
|
|
|
def update_pushup(kps, conf, stage):
|
|
angle, vis = _best_elbow(kps, conf)
|
|
if vis < 0.4 or not _is_horizontal(kps):
|
|
return stage, False
|
|
if angle > 155 and stage != 'up':
|
|
return 'up', False
|
|
if angle < 85 and stage == 'up':
|
|
return 'down', True
|
|
return stage, False
|
|
|
|
|
|
def update_pullup(kps, conf, stage):
|
|
angle, vis = _best_elbow(kps, conf)
|
|
if vis < 0.4 or not _wrists_above_shoulders(kps):
|
|
return stage, False
|
|
if angle > 155 and stage != 'down':
|
|
return 'down', False
|
|
if angle < 90 and stage == 'down':
|
|
return 'up', True
|
|
return stage, False
|
|
|
|
|
|
def update_bench(kps, conf, stage):
|
|
"""Horizontal body + wrists above shoulders (lying on back). Count on lockout."""
|
|
angle, vis = _best_elbow(kps, conf)
|
|
if vis < 0.4 or not _is_horizontal(kps) or not _wrists_above_shoulders(kps):
|
|
return stage, False
|
|
if angle < 90 and stage != 'down':
|
|
return 'down', False
|
|
if angle > 155 and stage == 'down':
|
|
return 'up', True
|
|
return stage, False
|
|
|
|
|
|
def update_curl(kps, conf, stage):
|
|
"""Standing (not horizontal). Count at full curl."""
|
|
angle, vis = _best_elbow(kps, conf)
|
|
if vis < 0.4 or _is_horizontal(kps):
|
|
return stage, False
|
|
if angle > 150 and stage != 'down':
|
|
return 'down', False
|
|
if angle < 60 and stage == 'down':
|
|
return 'up', True
|
|
return stage, False
|
|
|
|
|
|
def update_situp(kps, conf, stage):
|
|
"""Side-on view. Count when shoulders rise well above hips."""
|
|
_, sh_y = _mid(kps, _L_SHOULDER, _R_SHOULDER)
|
|
_, hi_y = _mid(kps, _L_HIP, _R_HIP)
|
|
rise = hi_y - sh_y # larger = shoulders further above hips
|
|
if rise < 0.10 and stage != 'down':
|
|
return 'down', False
|
|
if rise > 0.28 and stage == 'down':
|
|
return 'up', True
|
|
return stage, False
|
|
|
|
|
|
class _PlankUpdater:
|
|
"""Holds a per-second tick timer so the count increments in whole seconds."""
|
|
_LIMIT = 0.08
|
|
|
|
def __init__(self):
|
|
self._tick = None
|
|
|
|
def __call__(self, kps, conf, stage):
|
|
if stage is None:
|
|
self._tick = None
|
|
|
|
dev = _plank_deviation(kps)
|
|
flat = _is_horizontal(kps)
|
|
|
|
if not flat:
|
|
self._tick = None
|
|
return 'get flat', False
|
|
if dev > self._LIMIT:
|
|
self._tick = None
|
|
return 'sagging', False
|
|
if dev < -self._LIMIT:
|
|
self._tick = None
|
|
return 'piking', False
|
|
|
|
now = time.time()
|
|
if self._tick is None:
|
|
self._tick = now
|
|
if now - self._tick >= 1.0:
|
|
self._tick = now
|
|
return 'holding', True
|
|
return 'holding', False
|
|
|
|
|
|
_plank_fn = _PlankUpdater()
|
|
|
|
def update_plank(kps, conf, stage):
|
|
return _plank_fn(kps, conf, stage)
|
|
|
|
|
|
# ── registry ──────────────────────────────────────────────────────────────────
|
|
|
|
EXERCISES = {
|
|
'p': {'name': 'PUSH-UPS', 'fn': update_pushup, 'color': (50, 220, 100), 'unit': 'REPS'},
|
|
'u': {'name': 'PULL-UPS', 'fn': update_pullup, 'color': (80, 120, 255), 'unit': 'REPS'},
|
|
'b': {'name': 'BENCH PRESS', 'fn': update_bench, 'color': (0, 200, 255), 'unit': 'REPS'},
|
|
'c': {'name': 'BICEP CURLS', 'fn': update_curl, 'color': (200, 80, 255), 'unit': 'REPS'},
|
|
's': {'name': 'SIT-UPS', 'fn': update_situp, 'color': (255, 160, 30), 'unit': 'REPS'},
|
|
'l': {'name': 'PLANK', 'fn': update_plank, 'color': (30, 200, 220), 'unit': 'SECS'},
|
|
}
|