"""Service layer for admin authentication and session management. Handles password verification, session token creation/validation. Uses bcrypt for password hashing and itsdangerous for session signing. """ from typing import Optional import bcrypt import structlog from itsdangerous import BadSignature, URLSafeTimedSerializer from sqlmodel import Session, select from app.models.user import User logger = structlog.get_logger(__name__) # Session token max age: 24 hours SESSION_MAX_AGE_SECONDS = 86400 class AuthService: """Handles admin authentication and session token management. Args: session: An active SQLModel Session. secret_key: Secret key for signing session tokens. """ def __init__(self, session: Session, secret_key: str) -> None: self._session = session self._serializer = URLSafeTimedSerializer(secret_key) def authenticate(self, username: str, password: str) -> Optional[User]: """Verify admin credentials and return the User if valid. Only admin users can authenticate. Non-admin users are rejected. Args: username: The username to check. password: The plaintext password to verify. Returns: The authenticated User, or None if credentials are invalid. """ statement = select(User).where(User.username == username) user = self._session.exec(statement).first() if user is None: logger.warning("auth_failed", reason="user_not_found", username=username) return None if not user.is_admin: logger.warning("auth_failed", reason="not_admin", username=username) return None if not user.password_hash: logger.warning("auth_failed", reason="no_password_hash", username=username) return None # Verify password against bcrypt hash if not bcrypt.checkpw( password.encode("utf-8"), user.password_hash.encode("utf-8"), ): logger.warning("auth_failed", reason="wrong_password", username=username) return None logger.info("auth_success", username=username) return user def create_session_token(self, user_id: int) -> str: """Create a signed session token for the given user. Args: user_id: The authenticated user's ID. Returns: A signed, URL-safe token string. """ return self._serializer.dumps({"user_id": user_id}) def validate_session_token(self, token: str) -> Optional[int]: """Validate a session token and extract the user_id. Args: token: The session token to validate. Returns: The user_id if the token is valid, or None. """ try: data = self._serializer.loads(token, max_age=SESSION_MAX_AGE_SECONDS) return data.get("user_id") except BadSignature: logger.warning("session_invalid", reason="bad_signature") return None except Exception: logger.warning("session_invalid", reason="unknown_error") return None