- Fix exercise_id undefined error in log_form.html by using scalar exercise_id instead of exercise.id object reference - Clean up orphaned WorkoutSession records when all logs are deleted - Filter empty sessions from dashboard stats (sessions, volume, streak) - Replace broken HTTPException auth redirect with custom exception handler that properly returns 302 to /login Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
182 lines
5.4 KiB
Python
182 lines
5.4 KiB
Python
"""Analytics service for progress dashboard and chart data.
|
|
|
|
Aggregates workout log data into stats, trends, and chart-ready formats.
|
|
"""
|
|
|
|
import re
|
|
from datetime import date, timedelta
|
|
|
|
import structlog
|
|
from sqlmodel import Session, select
|
|
|
|
from app.models.workout_day import WorkoutDay
|
|
from app.models.workout_log import WorkoutLog
|
|
from app.models.workout_session import WorkoutSession
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
|
|
def _weight_to_float(weight_str: str) -> float:
|
|
"""Convert weight string to float for volume calculations.
|
|
|
|
Args:
|
|
weight_str: Weight like '30 lbs' or 'BW'.
|
|
|
|
Returns:
|
|
Numeric weight, or 0.0 for bodyweight.
|
|
"""
|
|
if not weight_str or weight_str.upper() == "BW":
|
|
return 0.0
|
|
match = re.search(r"(\d+(?:\.\d+)?)", weight_str)
|
|
return float(match.group(1)) if match else 0.0
|
|
|
|
|
|
class AnalyticsService:
|
|
"""Aggregates workout data for dashboards and charts.
|
|
|
|
Args:
|
|
session: An active SQLModel Session.
|
|
"""
|
|
|
|
def __init__(self, session: Session) -> None:
|
|
self._session = session
|
|
|
|
def get_user_stats(self, user_id: int) -> dict:
|
|
"""Get summary statistics for a user.
|
|
|
|
Returns:
|
|
Dict with keys: total_sessions, total_volume, total_sets,
|
|
current_streak, last_workout_date.
|
|
"""
|
|
all_sessions = self._session.exec(
|
|
select(WorkoutSession)
|
|
.where(WorkoutSession.user_id == user_id)
|
|
.order_by(WorkoutSession.date.desc())
|
|
).all()
|
|
|
|
# Only count sessions that still have log entries
|
|
sessions = []
|
|
total_volume = 0.0
|
|
total_sets = 0
|
|
for ws in all_sessions:
|
|
logs = self._session.exec(
|
|
select(WorkoutLog).where(WorkoutLog.session_id == ws.id)
|
|
).all()
|
|
if not logs:
|
|
continue
|
|
sessions.append(ws)
|
|
for log_entry in logs:
|
|
total_sets += 1
|
|
weight = _weight_to_float(log_entry.weight_used)
|
|
total_volume += log_entry.reps_completed * weight
|
|
total_sessions = len(sessions)
|
|
|
|
current_streak = 0
|
|
if sessions:
|
|
week_start = date.today() - timedelta(days=date.today().weekday())
|
|
for week_offset in range(52):
|
|
week_check = week_start - timedelta(weeks=week_offset)
|
|
week_end = week_check + timedelta(days=6)
|
|
has_session = any(
|
|
week_check <= ws.date <= week_end for ws in sessions
|
|
)
|
|
if has_session:
|
|
current_streak += 1
|
|
else:
|
|
break
|
|
|
|
last_workout = sessions[0].date if sessions else None
|
|
|
|
return {
|
|
"total_sessions": total_sessions,
|
|
"total_volume": round(total_volume),
|
|
"total_sets": total_sets,
|
|
"current_streak": current_streak,
|
|
"last_workout_date": last_workout,
|
|
}
|
|
|
|
def get_exercise_progress(
|
|
self, user_id: int, exercise_id: int,
|
|
) -> dict:
|
|
"""Get chart-ready progress data for a specific exercise.
|
|
|
|
Returns:
|
|
Dict with keys: dates, reps, weights, volumes.
|
|
"""
|
|
sessions = self._session.exec(
|
|
select(WorkoutSession)
|
|
.where(WorkoutSession.user_id == user_id)
|
|
.order_by(WorkoutSession.date.asc())
|
|
).all()
|
|
|
|
dates = []
|
|
reps = []
|
|
weights = []
|
|
volumes = []
|
|
|
|
for ws in sessions:
|
|
logs = self._session.exec(
|
|
select(WorkoutLog).where(
|
|
WorkoutLog.session_id == ws.id,
|
|
WorkoutLog.exercise_id == exercise_id,
|
|
)
|
|
).all()
|
|
|
|
if not logs:
|
|
continue
|
|
|
|
avg_reps = sum(
|
|
log_entry.reps_completed for log_entry in logs
|
|
) / len(logs)
|
|
weight = _weight_to_float(logs[0].weight_used)
|
|
session_volume = sum(
|
|
log_entry.reps_completed * _weight_to_float(log_entry.weight_used)
|
|
for log_entry in logs
|
|
)
|
|
|
|
dates.append(ws.date.isoformat())
|
|
reps.append(round(avg_reps, 1))
|
|
weights.append(weight)
|
|
volumes.append(round(session_volume))
|
|
|
|
return {
|
|
"dates": dates,
|
|
"reps": reps,
|
|
"weights": weights,
|
|
"volumes": volumes,
|
|
}
|
|
|
|
def get_volume_by_day(self, user_id: int) -> dict:
|
|
"""Get total volume broken down by workout day.
|
|
|
|
Returns:
|
|
Dict mapping workout day name to total volume.
|
|
"""
|
|
days = self._session.exec(select(WorkoutDay)).all()
|
|
day_map = {d.id: d.name for d in days}
|
|
|
|
sessions = self._session.exec(
|
|
select(WorkoutSession)
|
|
.where(WorkoutSession.user_id == user_id)
|
|
).all()
|
|
|
|
volume_by_day = {}
|
|
for ws in sessions:
|
|
day_name = day_map.get(ws.workout_day_id, "Unknown")
|
|
logs = self._session.exec(
|
|
select(WorkoutLog).where(WorkoutLog.session_id == ws.id)
|
|
).all()
|
|
|
|
if not logs:
|
|
continue
|
|
|
|
day_volume = sum(
|
|
log_entry.reps_completed * _weight_to_float(log_entry.weight_used)
|
|
for log_entry in logs
|
|
)
|
|
volume_by_day[day_name] = (
|
|
volume_by_day.get(day_name, 0) + round(day_volume)
|
|
)
|
|
|
|
return volume_by_day
|