adding missing files

This commit is contained in:
2025-11-03 21:43:13 -06:00
parent efdf3570c5
commit 0443d1553f
36 changed files with 1765 additions and 0 deletions

232
app/services/coc_api.py Normal file
View File

@@ -0,0 +1,232 @@
from __future__ import annotations
import json
import logging
from typing import Any, Dict, Optional
import requests
from requests import Response, Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from urllib.parse import urljoin
from app.services.appwrite_client import AppWriteClient
from app.utils.logging import get_logger
logger = get_logger(__file__)
class CoCApi:
"""
Centralized API client for Code of Conquest.
All HTTP interactions go through _request() for consistent behavior and logging.
"""
def __init__(
self,
base_url: str = "http://localhost:8000",
default_timeout: tuple[float, float] = (5.0, 10.0),
max_retries: int = 3,
) -> None:
"""
:param base_url: Base URL for the API (no trailing slash needed).
:param default_timeout: (connect_timeout, read_timeout)
:param max_retries: Number of retries for transient network/server errors.
"""
self.base_url = base_url.rstrip("/")
self.default_timeout = default_timeout
self._aw = AppWriteClient()
# Base headers for JSON APIs.
self._base_headers: Dict[str, str] = {
"Accept": "application/json",
"Content-Type": "application/json",
"User-Agent": "CoC-Client/1.0",
}
# Pre-configured Session with retries.
self._session = self._build_session(max_retries=max_retries)
# ---------- Public convenience methods ----------
def create_char(self, name:str, origin_story:str, race_id:str, profession_id:str) -> Dict[str, Any]:
payload = {
"name":name,
"origin_story":origin_story,
"race_id":race_id,
"profession_id":profession_id
}
result = self.post("/char/new",payload=payload)
player_uuid = result.get("result")
return player_uuid
def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
return self._request("GET", path, params=params)
def post(self, path: str, payload: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
return self._request("POST", path, json_body=payload)
def patch(self, path: str, json_body: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
return self._request("PATCH", path, json_body=json_body)
# ---------- Internal helpers ----------
def _build_session(self, max_retries: int) -> Session:
"""
Create a Session with sane retries for transient network/server failures.
We retry idempotent methods and some 5xx responses.
"""
session = requests.Session()
retries = Retry(
total=max_retries,
connect=max_retries,
read=max_retries,
backoff_factor=0.5,
status_forcelist=(502, 503, 504),
allowed_methods=frozenset(["GET", "HEAD", "OPTIONS", "TRACE"]),
raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retries)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def _mint_jwt(self) -> str:
"""Mint a JWT from AppWrite; empty string if unavailable (dont block requests)."""
try:
token = self._aw.mint_jwt() # expected to return dict-like
return token.get("jwt", "") if token else ""
except Exception as e:
logger.warning("Failed to mint JWT", extra={"error": str(e)})
return ""
def _auth_headers(self) -> Dict[str, str]:
"""
Build per-call headers with Authorization if a JWT is available.
Avoid mutating shared headers.
"""
headers = dict(self._base_headers)
jwt = self._mint_jwt()
if jwt:
headers["Authorization"] = f"Bearer {jwt}"
return headers
def _resolve_url(self, path_or_url: str) -> str:
"""
Accept either a full URL or a path (e.g., '/char/'). Join with base_url when needed.
"""
if path_or_url.lower().startswith(("http://", "https://")):
return path_or_url
return urljoin(self.base_url + "/", path_or_url.lstrip("/"))
def _safe_json(self, resp: Response) -> Dict[str, Any]:
"""
Attempt to parse JSON. If body is empty or not JSON, return {}.
"""
# 204 No Content or truly empty payloads
if resp.status_code == 204 or not resp.content:
return {}
try:
return resp.json()
except ValueError:
# Non-JSON payload; log preview and return empty.
preview = ""
try:
preview = resp.text[:400]
except Exception:
pass
logger.warning(
"Non-JSON response body",
extra={
"url": resp.request.url if resp.request else None,
"status": resp.status_code,
"body_preview": preview,
},
)
return {}
def _request(
self,
method: str,
path: str,
*,
params: Optional[Dict[str, Any]] = None,
json_body: Optional[Dict[str, Any]] = None,
timeout: Optional[tuple[float, float]] = None,
) -> Dict[str, Any]:
"""
Central request executor. Never raises to the caller.
Returns parsed JSON on success or {} on any error.
"""
url = self._resolve_url(path)
headers = self._auth_headers()
to = timeout or self.default_timeout
try:
resp = self._session.request(
method=method.upper(),
url=url,
headers=headers,
params=params,
json=json_body,
timeout=to,
)
# Log and return {} on non-2xx
if not (200 <= resp.status_code < 300):
# Truncate body in logs to avoid huge entries
preview = ""
try:
preview = resp.text[:400]
except Exception:
pass
logger.warning(
"HTTP request failed",
extra={
"method": method.upper(),
"url": resp.request.url if resp.request else url,
"status": resp.status_code,
"params": params,
"json_body": json_body,
"body_preview": preview,
},
)
return {}
# Success path: parse JSON safely
return self._safe_json(resp)
except requests.exceptions.RequestException as e:
# Network/timeout/connection errors
logger.warning(
"Network error during HTTP request",
extra={
"method": method.upper(),
"url": url,
"params": params,
"json_body": json_body,
"error_type": type(e).__name__,
"error": str(e),
},
exc_info=True,
)
return {}
except Exception as e:
# Absolute last-resort guardrail
logger.error(
"Unexpected error during HTTP request",
extra={
"method": method.upper(),
"url": url,
"params": params,
"json_body": json_body,
"error_type": type(e).__name__,
"error": str(e),
},
exc_info=True,
)
return {}