feat: add missing import service and templates
All checks were successful
Build and Push Docker Image / build-and-push (push) Successful in 12s

These files were untracked and missing from prior commits, causing
ModuleNotFoundError on fresh deploys.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-13 15:46:37 -05:00
parent df8d5c65fb
commit bae0bc9dee
3 changed files with 335 additions and 0 deletions

View File

@@ -0,0 +1,281 @@
"""Import service for loading workout history from an old database.
Opens the old SQLite DB read-only, maps IDs by name, and imports
workout_sessions, workout_logs, and progress_log into the current DB.
"""
import sqlite3
from dataclasses import dataclass, field
from datetime import date, datetime
import structlog
from sqlmodel import Session, select
from app.models.exercise import Exercise
from app.models.progress_log import ProgressLog
from app.models.user import User
from app.models.workout_day import WorkoutDay
from app.models.workout_log import WorkoutLog
from app.models.workout_session import WorkoutSession
logger = structlog.get_logger(__name__)
@dataclass
class ImportResult:
"""Tracks counts and warnings from an import operation."""
sessions_imported: int = 0
sessions_skipped: int = 0
logs_imported: int = 0
logs_skipped: int = 0
progress_imported: int = 0
progress_skipped: int = 0
warnings: list[str] = field(default_factory=list)
class ImportService:
"""Imports workout history from an old SneakySwole database.
Args:
session: An active SQLModel Session for the current DB.
"""
def __init__(self, session: Session) -> None:
self._session = session
def import_from_db(self, db_path: str) -> ImportResult:
"""Import workout data from an old database file.
Matches users, exercises, and workout days by name between
old and new databases. Skips duplicates for idempotency.
Args:
db_path: Path to the old SQLite database file.
Returns:
ImportResult with counts and any warnings.
"""
result = ImportResult()
old_db = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
old_db.row_factory = sqlite3.Row
try:
user_map = self._build_user_map(old_db, result)
exercise_map = self._build_exercise_map(old_db, result)
workout_day_map = self._build_workout_day_map(old_db, result)
session_map = self._import_sessions(
old_db, user_map, workout_day_map, result,
)
self._import_logs(old_db, session_map, exercise_map, result)
self._import_progress(old_db, user_map, exercise_map, result)
self._session.commit()
finally:
old_db.close()
logger.info(
"import_complete",
sessions=result.sessions_imported,
logs=result.logs_imported,
progress=result.progress_imported,
warnings=len(result.warnings),
)
return result
def _build_user_map(
self, old_db: sqlite3.Connection, result: ImportResult,
) -> dict[int, int]:
"""Map old user IDs to new user IDs by matching on username."""
new_users = self._session.exec(select(User)).all()
new_user_by_name = {u.username: u.id for u in new_users}
user_map: dict[int, int] = {}
for row in old_db.execute("SELECT id, username FROM users"):
new_id = new_user_by_name.get(row["username"])
if new_id is not None:
user_map[row["id"]] = new_id
else:
result.warnings.append(
f"User '{row['username']}' (old id={row['id']}) not found in new DB — skipping their data",
)
return user_map
def _build_exercise_map(
self, old_db: sqlite3.Connection, result: ImportResult,
) -> dict[int, int]:
"""Map old exercise IDs to new exercise IDs by matching on name."""
new_exercises = self._session.exec(select(Exercise)).all()
new_ex_by_name = {e.name: e.id for e in new_exercises}
exercise_map: dict[int, int] = {}
for row in old_db.execute("SELECT id, name FROM exercises"):
new_id = new_ex_by_name.get(row["name"])
if new_id is not None:
exercise_map[row["id"]] = new_id
else:
result.warnings.append(
f"Exercise '{row['name']}' (old id={row['id']}) not found in new DB — skipping its logs",
)
return exercise_map
def _build_workout_day_map(
self, old_db: sqlite3.Connection, result: ImportResult,
) -> dict[int, int]:
"""Map old workout_day IDs to new ones by matching on name."""
new_days = self._session.exec(select(WorkoutDay)).all()
new_day_by_name = {d.name: d.id for d in new_days}
day_map: dict[int, int] = {}
for row in old_db.execute("SELECT id, name FROM workout_days"):
new_id = new_day_by_name.get(row["name"])
if new_id is not None:
day_map[row["id"]] = new_id
else:
result.warnings.append(
f"Workout day '{row['name']}' (old id={row['id']}) not found in new DB",
)
return day_map
def _import_sessions(
self,
old_db: sqlite3.Connection,
user_map: dict[int, int],
workout_day_map: dict[int, int],
result: ImportResult,
) -> dict[int, int]:
"""Import workout_sessions, returning old_id -> new_id map."""
session_map: dict[int, int] = {}
rows = old_db.execute(
"SELECT id, user_id, workout_day_id, date, notes, created_at "
"FROM workout_sessions ORDER BY id",
).fetchall()
for row in rows:
new_user_id = user_map.get(row["user_id"])
new_day_id = workout_day_map.get(row["workout_day_id"])
if new_user_id is None or new_day_id is None:
result.sessions_skipped += 1
continue
# Check for duplicate by (user_id, workout_day_id, date)
existing = self._session.exec(
select(WorkoutSession).where(
WorkoutSession.user_id == new_user_id,
WorkoutSession.workout_day_id == new_day_id,
WorkoutSession.date == row["date"],
),
).first()
if existing:
session_map[row["id"]] = existing.id
result.sessions_skipped += 1
continue
new_session = WorkoutSession(
user_id=new_user_id,
workout_day_id=new_day_id,
date=date.fromisoformat(row["date"]),
notes=row["notes"],
created_at=datetime.fromisoformat(row["created_at"]),
)
self._session.add(new_session)
self._session.flush()
session_map[row["id"]] = new_session.id
result.sessions_imported += 1
return session_map
def _import_logs(
self,
old_db: sqlite3.Connection,
session_map: dict[int, int],
exercise_map: dict[int, int],
result: ImportResult,
) -> None:
"""Import workout_logs using mapped session and exercise IDs."""
rows = old_db.execute(
"SELECT session_id, exercise_id, set_number, reps_completed, "
"weight_used, felt_easy, notes, created_at "
"FROM workout_logs ORDER BY id",
).fetchall()
for row in rows:
new_session_id = session_map.get(row["session_id"])
new_exercise_id = exercise_map.get(row["exercise_id"])
if new_session_id is None or new_exercise_id is None:
result.logs_skipped += 1
continue
existing = self._session.exec(
select(WorkoutLog).where(
WorkoutLog.session_id == new_session_id,
WorkoutLog.exercise_id == new_exercise_id,
WorkoutLog.set_number == row["set_number"],
),
).first()
if existing:
result.logs_skipped += 1
continue
self._session.add(WorkoutLog(
session_id=new_session_id,
exercise_id=new_exercise_id,
set_number=row["set_number"],
reps_completed=row["reps_completed"],
weight_used=row["weight_used"],
felt_easy=bool(row["felt_easy"]),
notes=row["notes"],
created_at=datetime.fromisoformat(row["created_at"]),
))
result.logs_imported += 1
def _import_progress(
self,
old_db: sqlite3.Connection,
user_map: dict[int, int],
exercise_map: dict[int, int],
result: ImportResult,
) -> None:
"""Import progress_log using mapped user and exercise IDs."""
rows = old_db.execute(
"SELECT user_id, exercise_id, date, suggested_reps, "
"suggested_weight, actual_reps, actual_weight, "
"progression_applied, created_at "
"FROM progress_log ORDER BY id",
).fetchall()
for row in rows:
new_user_id = user_map.get(row["user_id"])
new_exercise_id = exercise_map.get(row["exercise_id"])
if new_user_id is None or new_exercise_id is None:
result.progress_skipped += 1
continue
existing = self._session.exec(
select(ProgressLog).where(
ProgressLog.user_id == new_user_id,
ProgressLog.exercise_id == new_exercise_id,
ProgressLog.date == row["date"],
),
).first()
if existing:
result.progress_skipped += 1
continue
self._session.add(ProgressLog(
user_id=new_user_id,
exercise_id=new_exercise_id,
date=date.fromisoformat(row["date"]),
suggested_reps=row["suggested_reps"],
suggested_weight=row["suggested_weight"],
actual_reps=row["actual_reps"],
actual_weight=row["actual_weight"],
progression_applied=row["progression_applied"],
created_at=datetime.fromisoformat(row["created_at"]),
))
result.progress_imported += 1

View File

@@ -0,0 +1,17 @@
<article>
<header><h3>Import Old Database</h3></header>
<form hx-post="/dashboard/import" hx-encoding="multipart/form-data"
hx-target="#import-results" hx-swap="innerHTML">
<div class="grid">
<label>
Old Database File (.db)
<input type="file" name="db_file" accept=".db" required>
</label>
<label>
&nbsp;
<button type="submit">Import</button>
</label>
</div>
</form>
<div id="import-results"></div>
</article>

View File

@@ -0,0 +1,37 @@
<table>
<thead>
<tr>
<th>Category</th>
<th>Imported</th>
<th>Skipped</th>
</tr>
</thead>
<tbody>
<tr>
<td>Workout Sessions</td>
<td>{{ result.sessions_imported }}</td>
<td>{{ result.sessions_skipped }}</td>
</tr>
<tr>
<td>Workout Logs</td>
<td>{{ result.logs_imported }}</td>
<td>{{ result.logs_skipped }}</td>
</tr>
<tr>
<td>Progress Log</td>
<td>{{ result.progress_imported }}</td>
<td>{{ result.progress_skipped }}</td>
</tr>
</tbody>
</table>
{% if result.warnings %}
<details>
<summary>Warnings ({{ result.warnings|length }})</summary>
<ul>
{% for warning in result.warnings %}
<li><small>{{ warning }}</small></li>
{% endfor %}
</ul>
</details>
{% endif %}