init commit
This commit is contained in:
71
app/__init__.py
Normal file
71
app/__init__.py
Normal file
@@ -0,0 +1,71 @@
|
||||
import os
|
||||
from flask import Flask, request, g, jsonify, current_app
|
||||
from app.utils.typed_flask import CoCFlask
|
||||
from typing import cast
|
||||
|
||||
from app.services.appwrite_client import AppWriteClient
|
||||
from app.utils.api_response import ApiResponder
|
||||
|
||||
from app.blueprints.main import main_bp
|
||||
from app.blueprints.char import char_bp
|
||||
|
||||
from app.utils.settings import get_settings
|
||||
|
||||
settings = get_settings()
|
||||
|
||||
def create_app() -> CoCFlask:
|
||||
app = CoCFlask(__name__)
|
||||
|
||||
app.config.update(
|
||||
SECRET_KEY=settings.flask_secret_key,
|
||||
APPWRITE_ENDPOINT=settings.appwrite_endpoint,
|
||||
APPWRITE_PROJECT_ID=settings.appwrite_project_id,
|
||||
APPWRITE_API_KEY=settings.appwrite_api_key,
|
||||
)
|
||||
|
||||
default_headers = {
|
||||
"Cache-Control": "no-store",
|
||||
"Content-Type": "application/json; charset=utf-8",
|
||||
# Add CORS if needed:
|
||||
# "Access-Control-Allow-Origin": "*",
|
||||
}
|
||||
|
||||
full_app_name = f"{settings.app_name} - v {settings.app_version}"
|
||||
app.api = ApiResponder(
|
||||
app_name=full_app_name,
|
||||
version_provider=settings.app_version,
|
||||
include_request_id=True,
|
||||
default_headers=default_headers
|
||||
)
|
||||
|
||||
app.api.register_error_handlers(app)
|
||||
|
||||
if not app.config["APPWRITE_ENDPOINT"] or not app.config["APPWRITE_PROJECT_ID"]:
|
||||
raise RuntimeError("Missing APPWRITE_ENDPOINT or APPWRITE_PROJECT_ID")
|
||||
|
||||
# Blueprints
|
||||
app.register_blueprint(main_bp)
|
||||
app.register_blueprint(char_bp)
|
||||
|
||||
@app.before_request
|
||||
def authenticate():
|
||||
auth = request.headers.get("Authorization", "")
|
||||
if not auth.startswith("Bearer "):
|
||||
return app.api.unauthorized("Missing Bearer Token")
|
||||
|
||||
token = auth.split(" ", 1)[1].strip()
|
||||
try:
|
||||
aw = AppWriteClient()
|
||||
user = aw.get_user_from_jwt_token(token)
|
||||
if isinstance(user,str):
|
||||
return app.api.unauthorized("Invalid or Expired JWT")
|
||||
except Exception as e:
|
||||
return app.api.unauthorized("Invalid or Expired JWT")
|
||||
|
||||
# make available to routes
|
||||
g.appwrite_user = user
|
||||
return None
|
||||
|
||||
|
||||
|
||||
return app
|
||||
17
app/blueprints/char.py
Normal file
17
app/blueprints/char.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from flask import Blueprint, g, jsonify, current_app
|
||||
from typing import cast
|
||||
from app.utils.typed_flask import CoCFlask
|
||||
|
||||
# from app.services.appwrite_client import AppWriteClient
|
||||
|
||||
# type cast flask to my custom flask app so the app.api methods are available in the IDE / typed correctly.
|
||||
app = cast(CoCFlask,current_app)
|
||||
|
||||
# blueprint def
|
||||
char_bp = Blueprint("char", __name__, url_prefix="/char")
|
||||
|
||||
|
||||
@char_bp.route("/", methods=["GET", "POST"])
|
||||
def char():
|
||||
return app.api.ok({"user":g.appwrite_user})
|
||||
|
||||
21
app/blueprints/main.py
Normal file
21
app/blueprints/main.py
Normal file
@@ -0,0 +1,21 @@
|
||||
from flask import Blueprint, g, jsonify, current_app
|
||||
from typing import cast
|
||||
from app.utils.typed_flask import CoCFlask
|
||||
|
||||
# from app.services.appwrite_client import AppWriteClient
|
||||
|
||||
# type cast flask to my custom flask app so the app.api methods are available in the IDE / typed correctly.
|
||||
app = cast(CoCFlask,current_app)
|
||||
|
||||
# blueprint def
|
||||
main_bp = Blueprint("main", __name__, url_prefix="/")
|
||||
|
||||
|
||||
@main_bp.route("/health", methods=["GET", "POST"])
|
||||
def health():
|
||||
return app.api.ok({"status":"up"})
|
||||
|
||||
@main_bp.route("/me", methods=["GET", "POST"])
|
||||
def me():
|
||||
return app.api.ok({"user":g.appwrite_user})
|
||||
|
||||
56
app/game/generators/entity_factory.py
Normal file
56
app/game/generators/entity_factory.py
Normal file
@@ -0,0 +1,56 @@
|
||||
import uuid
|
||||
from typing import Dict, List, Any, Tuple
|
||||
|
||||
from app.game.utils.common import Dice
|
||||
from app.game.models.entities import Entity
|
||||
from app.game.utils.loaders import TemplateStore
|
||||
from app.game.systems.leveling import hp_and_mp_gain
|
||||
|
||||
dice = Dice()
|
||||
|
||||
def build_char(name:str, origin_story:str, race_id:str, class_id:str, fame:int=0) -> Entity:
|
||||
|
||||
templates = TemplateStore()
|
||||
|
||||
# base_ability_scores = {"STR":10,"DEX":10,"CON":10,"INT":10,"WIS":10,"CHA":10, "LUC":10}
|
||||
|
||||
race = templates.race(race_id)
|
||||
profession = templates.profession(class_id)
|
||||
|
||||
e = Entity(
|
||||
uuid = str(uuid.uuid4()),
|
||||
name = name,
|
||||
origin_story = origin_story,
|
||||
fame = fame,
|
||||
level=1,
|
||||
race =race.get("name"),
|
||||
profession = profession.get("name")
|
||||
)
|
||||
|
||||
# assign hit dice
|
||||
e.hit_die = profession.get("hit_die")
|
||||
e.per_level.hp_die = profession.get("per_level",{}).get("hp_rule",{}).get("die","1d10")
|
||||
e.per_level.mp_die = profession.get("per_level",{}).get("mp_rule",{}).get("die","1d10")
|
||||
e.per_level.hp_rule = profession.get("per_level",{}).get("hp_rule",{}).get("rule","avg")
|
||||
e.per_level.mp_rule = profession.get("per_level",{}).get("mp_rule",{}).get("rule","avg")
|
||||
|
||||
# assign hp/mp
|
||||
e.hp = hp_and_mp_gain(1,e.per_level.hp_die,e.per_level.hp_rule)
|
||||
e.mp = hp_and_mp_gain(1,e.per_level.mp_die,e.per_level.mp_rule)
|
||||
|
||||
for stat, delta in race.get("ability_mods", {}).items():
|
||||
e.ability_scores[stat] = e.ability_scores.get(stat, 10) + int(delta)
|
||||
|
||||
# Apply class grants (spells/skills/equipment + MP base)
|
||||
grants = profession.get("grants", {})
|
||||
|
||||
# add spells to char
|
||||
grant_spells = grants.get("spells", []) or []
|
||||
for spell in grant_spells:
|
||||
spell_dict = templates.spell(spell)
|
||||
e.spells.append(spell_dict)
|
||||
|
||||
# TODO
|
||||
# e.skills.extend(grants.get("skills", []) or [])
|
||||
|
||||
return e
|
||||
38
app/game/models/entities.py
Normal file
38
app/game/models/entities.py
Normal file
@@ -0,0 +1,38 @@
|
||||
from __future__ import annotations
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict, List, Any, Tuple
|
||||
|
||||
@dataclass
|
||||
class PerLevel:
|
||||
hp_die: str = "1d10"
|
||||
hp_rule: str = "avg"
|
||||
mp_die: str = "1d6"
|
||||
mp_rule: str = "avg"
|
||||
|
||||
@dataclass
|
||||
class Entity:
|
||||
uuid: str = ""
|
||||
name: str = ""
|
||||
race: str = ""
|
||||
profession: str = ""
|
||||
origin_story:str = ""
|
||||
hit_die: str = "1d6"
|
||||
|
||||
hp: int = 1
|
||||
mp: int = 0
|
||||
energy_credits: int = 0
|
||||
|
||||
level: int = 0
|
||||
xp: int = 0
|
||||
xp_to_next_level:int = 100
|
||||
|
||||
fame: int = 0
|
||||
alignment: int = 0
|
||||
|
||||
per_level: PerLevel = field(default_factory=PerLevel)
|
||||
ability_scores: Dict[str, int] = field(default_factory=dict)
|
||||
weapons: List[Dict[str, int]] = field(default_factory=list)
|
||||
armor: List[Dict[str, int]] = field(default_factory=list)
|
||||
spells: List[Dict[str, int]] = field(default_factory=list)
|
||||
skills: List[Dict[str, int]] = field(default_factory=list)
|
||||
|
||||
72
app/game/systems/leveling.py
Normal file
72
app/game/systems/leveling.py
Normal file
@@ -0,0 +1,72 @@
|
||||
|
||||
from app.utils.logging import get_logger
|
||||
|
||||
from app.game.utils.common import Dice
|
||||
from app.game.models.entities import Entity
|
||||
|
||||
logger = get_logger(__file__)
|
||||
|
||||
dice = Dice()
|
||||
|
||||
|
||||
def calculate_xp_and_level(current_xp: int, xp_gain: int, base_xp: int = 100, growth_rate: float = 1.2):
|
||||
"""
|
||||
Calculates new XP and level after gaining experience.
|
||||
|
||||
Args:
|
||||
current_xp (int): The player's current total XP.
|
||||
xp_gain (int): The amount of XP gained.
|
||||
base_xp (int): XP required for level 1 → 2.
|
||||
growth_rate (float): Multiplier for each level’s XP requirement.
|
||||
|
||||
Returns:
|
||||
dict: {
|
||||
'new_xp': int,
|
||||
'new_level': int,
|
||||
'xp_to_next_level': int,
|
||||
'remaining_xp_to_next': int
|
||||
}
|
||||
"""
|
||||
# Calculate current level based on XP
|
||||
level = 1
|
||||
xp_needed = base_xp
|
||||
total_xp_for_next = xp_needed
|
||||
|
||||
# Determine current level from current_xp
|
||||
while current_xp >= total_xp_for_next:
|
||||
level += 1
|
||||
xp_needed = int(base_xp * (growth_rate ** (level - 1)))
|
||||
total_xp_for_next += xp_needed
|
||||
|
||||
# Add new XP
|
||||
new_xp = current_xp + xp_gain
|
||||
|
||||
# Check if level up(s) occurred
|
||||
new_level = level
|
||||
while new_xp >= total_xp_for_next:
|
||||
new_level += 1
|
||||
xp_needed = int(base_xp * (growth_rate ** (new_level - 1)))
|
||||
total_xp_for_next += xp_needed
|
||||
|
||||
# XP required for next level
|
||||
xp_to_next_level = xp_needed
|
||||
levels_gained = new_level - level
|
||||
|
||||
return {
|
||||
"new_xp": new_xp,
|
||||
"new_level": new_level,
|
||||
"xp_to_next_level": xp_to_next_level,
|
||||
"levels_gained": levels_gained
|
||||
}
|
||||
|
||||
def hp_and_mp_gain(level: int, hit_die: str, rule: str) -> int:
|
||||
rule = (rule or "avg").lower()
|
||||
if level == 1:
|
||||
# Common RPG convention: level 1 gets max die
|
||||
return dice.max_of_die(hit_die)
|
||||
if rule == "max":
|
||||
return dice.max_of_die(hit_die)
|
||||
if rule == "avg":
|
||||
return dice.roll_dice(hit_die)
|
||||
# default fallback
|
||||
return dice.roll_dice(hit_die)
|
||||
88
app/game/templates/classes.yaml
Normal file
88
app/game/templates/classes.yaml
Normal file
@@ -0,0 +1,88 @@
|
||||
classes:
|
||||
guardian:
|
||||
name: Guardian
|
||||
description: A protector and defender, skilled in combat and leadership.
|
||||
hit_die: "2d6"
|
||||
per_level:
|
||||
hp_rule: {"die":"1d10", "rule":"max"}
|
||||
mp_rule: {"die":"1d4", "rule":"avg"}
|
||||
grants:
|
||||
spells: []
|
||||
skills: []
|
||||
|
||||
bloodborn:
|
||||
name: Bloodborn
|
||||
description: Born from darkness and violence, they wield power with abandon.
|
||||
hit_die: "2d6"
|
||||
per_level:
|
||||
hp_rule: {"die":"1d10", "rule":"max"}
|
||||
mp_rule: {"die":"1d4", "rule":"avg"}
|
||||
grants:
|
||||
spells: []
|
||||
skills: []
|
||||
|
||||
arcanist:
|
||||
name: Arcanist
|
||||
description: Arcanists are scholars of the arcane, shaping elemental forces through study and sheer intellect. Each Arcanists past is different, some taught in grand academies, others self-taught hermits obsessed with understanding creation.
|
||||
hit_die: "1d6"
|
||||
per_level:
|
||||
hp_rule: {"die":"1d4", "rule":"avg"}
|
||||
mp_rule: {"die":"1d10", "rule":"max"}
|
||||
grants:
|
||||
spells: ["ember_flicker"]
|
||||
skills: []
|
||||
|
||||
hexist:
|
||||
name: Hexist
|
||||
description: Twisted sorcerers who wield dark magic to bend reality to their will.
|
||||
hit_die: "1d10"
|
||||
per_level:
|
||||
hp_rule: {"die":"1d4", "rule":"avg"}
|
||||
mp_rule: {"die":"1d10", "rule":"max"}
|
||||
grants:
|
||||
spells: []
|
||||
skills: []
|
||||
|
||||
assassin:
|
||||
name: Assassin
|
||||
description: Stealthy killers, trained from a young age to strike without warning.
|
||||
hit_die: "2d6"
|
||||
per_level:
|
||||
hp_rule: {"die":"1d10", "rule":"max"}
|
||||
mp_rule: {"die":"1d6", "rule":"avg"}
|
||||
grants:
|
||||
spells: []
|
||||
skills: []
|
||||
|
||||
ranger:
|
||||
name: Ranger
|
||||
description: Skilled hunters who live off the land, tracking prey and evading predators.
|
||||
hit_die: "2d6"
|
||||
per_level:
|
||||
hp_rule: {"die":"1d10", "rule":"max"}
|
||||
mp_rule: {"die":"1d6", "rule":"avg"}
|
||||
grants:
|
||||
spells: []
|
||||
skills: []
|
||||
|
||||
cleric:
|
||||
name: Cleric
|
||||
description: Holy warriors, using faith to heal and protect allies, and smite enemies.
|
||||
hit_die: "3d4"
|
||||
per_level:
|
||||
hp_rule: {"die":"1d10", "rule":"avg"}
|
||||
mp_rule: {"die":"2d6", "rule":"avg"}
|
||||
grants:
|
||||
spells: []
|
||||
skills: []
|
||||
|
||||
warlock:
|
||||
name: Warlock
|
||||
description: Bargainers with dark forces, wielding forbidden magic for mysterious purposes.
|
||||
hit_die: "3d4"
|
||||
per_level:
|
||||
hp_rule: {"die":"1d10", "rule":"avg"}
|
||||
mp_rule: {"die":"2d6", "rule":"avg"}
|
||||
grants:
|
||||
spells: []
|
||||
skills: []
|
||||
48
app/game/templates/races.yaml
Normal file
48
app/game/templates/races.yaml
Normal file
@@ -0,0 +1,48 @@
|
||||
races:
|
||||
avaline:
|
||||
name: Avaline
|
||||
description: A good and fair creature that many feel is divine, it's both powerful and intelligent.
|
||||
ability_mods: { STR: 5, DEX: 0, INT: 3, WIS: 1, LUK: -2, CHA: -1, CON: 0}
|
||||
alignment: 100
|
||||
|
||||
beastfolk:
|
||||
name: Beastfolk
|
||||
description: Humanoid creatures who are intelligent and fair natured. Half Terran / Half beast.
|
||||
ability_mods: { STR: 3, DEX: 5, INT: 1, WIS: -2, LUK: -1, CHA: 0, CON: 1}
|
||||
alignment: 50
|
||||
|
||||
dwarf:
|
||||
name: Dwarf
|
||||
description: Dwarves are stout, bearded, and skilled in mining, smithing, and engineering, often living in underground kingdoms
|
||||
ability_mods: { STR: 5, DEX: -2, INT: 0, WIS: -1, LUK: 1, CHA: 1, CON: 3}
|
||||
alignment: 50
|
||||
|
||||
elf:
|
||||
name: Elf
|
||||
description: Elves are mythological beings, often depicted as tall, slender, and agile, with pointed ears, and magical abilities.
|
||||
ability_mods: { STR: -2, DEX: 5, INT: 3, WIS: 0, LUK: 1, CHA: 1, CON: -1}
|
||||
alignment: 50
|
||||
|
||||
draconian:
|
||||
name: Draconian
|
||||
description: Draconians are massive, fire-breathing humanoids with scaly skin and noble heritage
|
||||
ability_mods: { STR: 5, DEX: -1, INT: 3, WIS: 1, LUK: -2, CHA: 0, CON: 1}
|
||||
alignment: 0
|
||||
|
||||
terran:
|
||||
name: Terran
|
||||
description: Common folk of the land, similar to what some have called Human.
|
||||
ability_mods: { STR: 1, DEX: 3, INT: 0, WIS: -2, LUK: 5, CHA: 1, CON: -1}
|
||||
alignment: 0
|
||||
|
||||
hellion:
|
||||
name: Hellion
|
||||
description: A creature from the darkest reaches of the shadow, evil.
|
||||
ability_mods: { STR: 3, DEX: 1, INT: 1, WIS: 0, LUK: -2, CHA: 5, CON: -1}
|
||||
alignment: -50
|
||||
|
||||
vorgath:
|
||||
name: Vorgath
|
||||
description: A monstrous creature that's both powerful and intelligent, it's terrifying.
|
||||
ability_mods: { STR: 5, DEX: 3, INT: 1, WIS: 0, LUK: -1, CHA: -2, CON: 3}
|
||||
alignment: -100
|
||||
184
app/game/templates/spells.yaml
Normal file
184
app/game/templates/spells.yaml
Normal file
@@ -0,0 +1,184 @@
|
||||
spells:
|
||||
ember_flicker:
|
||||
name: Ember Flicker
|
||||
description: Cast a small burst of flame, dealing minor damage to all enemies within range.
|
||||
cost_mp: 1
|
||||
element: fire
|
||||
damage: 2
|
||||
level: 1
|
||||
|
||||
spark_streak:
|
||||
name: Spark Streak
|
||||
description: Create a trail of sparks that deal minor fire damage to all enemies they pass through.
|
||||
cost_mp: 4
|
||||
element: fire
|
||||
damage: 6
|
||||
level: 5
|
||||
|
||||
flameburst:
|
||||
name: Flameburst
|
||||
description: Unleash a powerful blast of flame, dealing significant damage to all enemies within a small radius.
|
||||
cost_mp: 10
|
||||
element: fire
|
||||
damage: 20
|
||||
level: 8
|
||||
|
||||
inferno_strike:
|
||||
name: Inferno Strike
|
||||
description: Deal massive fire damage to a single target, also applying a burning effect that deals additional damage over time.
|
||||
cost_mp: 15
|
||||
element: fire
|
||||
damage: 30
|
||||
level: 12
|
||||
|
||||
blaze_wave:
|
||||
name: Blaze Wave
|
||||
description: Cast a wave of flame that deals moderate damage to all enemies it passes through, also knocking them back slightly.
|
||||
cost_mp: 8
|
||||
element: fire
|
||||
damage: 18
|
||||
level: 15
|
||||
|
||||
flame_tongue:
|
||||
name: Flame Tongue
|
||||
description: Lick your target with a stream of flame, dealing moderate fire damage and potentially applying a burning effect.
|
||||
cost_mp: 12
|
||||
element: fire
|
||||
damage: 25
|
||||
level: 18
|
||||
|
||||
incendiary_cloud:
|
||||
name: Incendiary Cloud
|
||||
description: Release a cloud of flaming particles that deal minor fire damage to all enemies within a medium radius, also causing them to take additional damage if they are not wearing any heat-resistant gear.
|
||||
cost_mp: 18
|
||||
element: fire
|
||||
damage: 35
|
||||
level: 22
|
||||
|
||||
scorching_sapling:
|
||||
name: Scorching Sapling
|
||||
description: Set a target area on fire, dealing moderate fire damage to all enemies within it and also applying a burning effect.
|
||||
cost_mp: 12
|
||||
element: fire
|
||||
damage: 25
|
||||
level: 23
|
||||
|
||||
magma_blast:
|
||||
name: Magma Blast
|
||||
description: Unleash a powerful blast of molten lava that deals significant damage to all enemies within its area of effect, also potentially causing them to become stunned.
|
||||
cost_mp: 20
|
||||
element: fire
|
||||
damage: 40
|
||||
level: 25
|
||||
|
||||
flame_ember:
|
||||
name: Flame Ember
|
||||
description: Cast a burning ember that deals minor fire damage to all enemies it passes through and applies a chance to set them on fire.
|
||||
cost_mp: 6
|
||||
element: fire
|
||||
damage: 10
|
||||
level: 30
|
||||
|
||||
blaze_lash:
|
||||
name: Blaze Lash
|
||||
description: Deal moderate fire damage to a single target, also potentially applying a burning effect and increasing your attack speed for a short duration.
|
||||
cost_mp: 15
|
||||
element: fire
|
||||
damage: 28
|
||||
level: 32
|
||||
|
||||
infernal_flames:
|
||||
name: Infernal Flames
|
||||
description: Cast a massive wave of flame that deals significant damage to all enemies within its area of effect, also potentially causing them to become stunned.
|
||||
cost_mp: 25
|
||||
element: fire
|
||||
damage: 50
|
||||
level: 35
|
||||
|
||||
burning_scorch:
|
||||
name: Burning Scorch
|
||||
description: Deal massive fire damage to a single target, also applying a burning effect and dealing additional damage over time if they are not wearing any heat-resistant gear.
|
||||
cost_mp: 25
|
||||
element: fire
|
||||
damage: 45
|
||||
level: 38
|
||||
|
||||
magma_wave:
|
||||
name: Magma Wave
|
||||
description: Cast a wave of molten lava that deals moderate fire damage to all enemies it passes through, also potentially causing them to become stunned.
|
||||
cost_mp: 20
|
||||
element: fire
|
||||
damage: 40
|
||||
level: 40
|
||||
|
||||
flame_burst:
|
||||
name: Flame Burst
|
||||
description: Unleash a powerful blast of flame that deals significant damage to all enemies within its area of effect, also potentially causing them to become stunned.
|
||||
cost_mp: 22
|
||||
element: fire
|
||||
damage: 45
|
||||
level: 42
|
||||
|
||||
inferno_wing:
|
||||
name: Inferno Wing
|
||||
description: Deal massive fire damage to a single target, also applying a burning effect and dealing additional damage over time if they are not wearing any heat-resistant gear.
|
||||
cost_mp: 30
|
||||
element: fire
|
||||
damage: 60
|
||||
level: 45
|
||||
|
||||
flame_frenzy:
|
||||
name: Flame Frenzy
|
||||
description: Cast a wave of flame that deals moderate damage to all enemies it passes through, also increasing your attack speed for a short duration and potentially applying a burning effect.
|
||||
cost_mp: 25
|
||||
element: fire
|
||||
damage: 45
|
||||
level: 48
|
||||
|
||||
infernal_tongue:
|
||||
name: Infernal Tongue
|
||||
description: Lick your target with a stream of flame that deals moderate fire damage, potentially applying a burning effect and dealing additional damage over time.
|
||||
cost_mp: 28
|
||||
element: fire
|
||||
damage: 50
|
||||
level: 50
|
||||
|
||||
magma_rampart:
|
||||
name: Magma Rampart
|
||||
description: Cast a massive wall of molten lava that deals significant damage to all enemies within its area of effect, also potentially causing them to become stunned.
|
||||
cost_mp: 35
|
||||
element: fire
|
||||
damage: 70
|
||||
level: 55
|
||||
|
||||
blazing_scorch:
|
||||
name: Blazing Scorch
|
||||
description: Deal massive fire damage to a single target, also applying a burning effect and dealing additional damage over time if they are not wearing any heat-resistant gear.
|
||||
cost_mp: 35
|
||||
element: fire
|
||||
damage: 70
|
||||
level: 58
|
||||
|
||||
infernal_cloud:
|
||||
name: Infernal Cloud
|
||||
description: Release a massive cloud of flaming particles that deal significant damage to all enemies within its area of effect, also potentially causing them to become stunned.
|
||||
cost_mp: 40
|
||||
element: fire
|
||||
damage: 90
|
||||
level: 60
|
||||
|
||||
magma_crater:
|
||||
name: Magma Crater
|
||||
description: Cast a massive wave of molten lava that deals significant damage to all enemies within its area of effect, also potentially causing them to become stunned.
|
||||
cost_mp: 40
|
||||
element: fire
|
||||
damage: 90
|
||||
level: 62
|
||||
|
||||
infernal_flare:
|
||||
name: Infernal Flare
|
||||
description: Unleash a massive blast of flame that deals significant damage to all enemies within its area of effect, also potentially causing them to become stunned.
|
||||
cost_mp: 45
|
||||
element: fire
|
||||
damage: 100
|
||||
level: 65
|
||||
24
app/game/utils/common.py
Normal file
24
app/game/utils/common.py
Normal file
@@ -0,0 +1,24 @@
|
||||
from __future__ import annotations
|
||||
import math
|
||||
import random
|
||||
|
||||
from logging import getLogger
|
||||
|
||||
logger = getLogger(__file__)
|
||||
|
||||
class Dice:
|
||||
def roll_dice(self, spec: str = "1d6"):
|
||||
# supports "1d10", "2d6" etc
|
||||
total = 0
|
||||
try:
|
||||
n, d = map(int, spec.lower().split("d"))
|
||||
for _ in range(0,n,1):
|
||||
total += random.randint(1,d)
|
||||
return total
|
||||
except Exception as e:
|
||||
logger.error(f"Unable to roll dice spec: {spec} - please use 1d10 or similar")
|
||||
return 0
|
||||
|
||||
def max_of_die(self, spec: str) -> int:
|
||||
n, d = map(int, spec.lower().split("d"))
|
||||
return n * d
|
||||
50
app/game/utils/loaders.py
Normal file
50
app/game/utils/loaders.py
Normal file
@@ -0,0 +1,50 @@
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Any, Tuple
|
||||
from dataclasses import fields, is_dataclass
|
||||
|
||||
import yaml
|
||||
|
||||
class TemplateStore:
|
||||
def __init__(self):
|
||||
|
||||
# incase we have problems later
|
||||
# templates_dir = Path(settings.repo_root) / "coc_api" / "app" / "game" / "templates"
|
||||
templates_dir = Path() / "app" / "game" / "templates"
|
||||
|
||||
races_yml_path = templates_dir / "races.yaml"
|
||||
classes_yml_path = templates_dir / "classes.yaml"
|
||||
spells_path = templates_dir / "spells.yaml"
|
||||
|
||||
with open(races_yml_path, "r", encoding="utf-8") as f:
|
||||
self.races = yaml.safe_load(f)["races"]
|
||||
with open(classes_yml_path, "r", encoding="utf-8") as f:
|
||||
self.classes = yaml.safe_load(f)["classes"]
|
||||
with open(spells_path, "r", encoding="utf-8") as f:
|
||||
self.spells = yaml.safe_load(f)["spells"]
|
||||
|
||||
def race(self, race_id: str) -> Dict[str, Any]:
|
||||
if race_id not in self.races:
|
||||
raise KeyError(f"Unknown race: {race_id}")
|
||||
return self.races[race_id]
|
||||
|
||||
def profession(self, class_id: str) -> Dict[str, Any]:
|
||||
if class_id not in self.classes:
|
||||
raise KeyError(f"Unknown class: {class_id}")
|
||||
return self.classes[class_id]
|
||||
|
||||
def spell(self, spell_id: str) -> Dict[str, Any]:
|
||||
if spell_id not in self.spells:
|
||||
raise KeyError(f"Unknown spell: {spell_id}")
|
||||
return self.spells[spell_id]
|
||||
|
||||
def from_dict(cls, d):
|
||||
"""Recursively reconstruct dataclasses from dicts."""
|
||||
if not is_dataclass(cls):
|
||||
return d
|
||||
kwargs = {}
|
||||
for f in fields(cls):
|
||||
value = d.get(f.name)
|
||||
if is_dataclass(f.type):
|
||||
value = from_dict(f.type, value)
|
||||
kwargs[f.name] = value
|
||||
return cls(**kwargs)
|
||||
22
app/models/enums.py
Normal file
22
app/models/enums.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from enum import Enum
|
||||
|
||||
class HeroClass(str, Enum):
|
||||
GUARDIAN = "guardian"
|
||||
BLOODBORN = "bloodborn"
|
||||
RANGER = "ranger"
|
||||
ASSASSIN = "assassin"
|
||||
ARCANIST = "arcanist"
|
||||
HEXIST = "hexist"
|
||||
CLERIC = "cleric"
|
||||
WARLOCK = "warlock"
|
||||
|
||||
class Races(str, Enum):
|
||||
"""Representing various species."""
|
||||
Draconian = "draconian"
|
||||
Dwarf = "dwarves"
|
||||
Elf = "elves"
|
||||
Beastfolk = "beastfolk"
|
||||
Terran = "terran"
|
||||
Vorgath = "vorgath"
|
||||
Avaline = "avaline"
|
||||
Hellion = "hellions"
|
||||
376
app/models/hero.py
Normal file
376
app/models/hero.py
Normal file
@@ -0,0 +1,376 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from enum import Enum
|
||||
from uuid import UUID, uuid4
|
||||
from datetime import datetime, timezone
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from typing import Callable, List, Optional, Dict, Any, Iterable, Mapping, TYPE_CHECKING
|
||||
|
||||
from app.models.enums import HeroClass, Race
|
||||
from app.models.primitives import Attributes, Resources, Collections
|
||||
from app.utils.merging import merge_catalogs, merge_sheets
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from app.utils.hero_catalog import HeroDataRegistry
|
||||
from app.utils.race_catalog import RaceDataRegistry
|
||||
|
||||
# -----------------------------
|
||||
# Small utilities
|
||||
# -----------------------------
|
||||
|
||||
def _utcnow_iso() -> str:
|
||||
"""Return current UTC time as ISO 8601 string."""
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
def _clamp(value: int, lo: int, hi: int) -> int:
|
||||
"""Clamp integer between lo and hi."""
|
||||
return max(lo, min(hi, value))
|
||||
|
||||
# -----------------------------
|
||||
# Component dataclasses
|
||||
# -----------------------------
|
||||
|
||||
@dataclass
|
||||
class Progress:
|
||||
"""Economy, experience, level, and other progression flags."""
|
||||
level: int = 1
|
||||
gold: int = 50
|
||||
xp: int = 0
|
||||
xp_next_level: int = 100 # will be recomputed on init
|
||||
needs_sleep: int = 0
|
||||
fame: int = 0
|
||||
alignment: int = 0 # -100..+100 works nicely
|
||||
battlecount: int = 0
|
||||
|
||||
def gain_xp(self, amount: int, xp_curve: Callable[[int, int], int]) -> int:
|
||||
"""
|
||||
Add XP and compute how many levels are gained using the provided xp curve.
|
||||
Returns the number of levels gained.
|
||||
"""
|
||||
amount = max(0, int(amount))
|
||||
if amount == 0:
|
||||
return 0
|
||||
|
||||
self.xp += amount
|
||||
levels_gained = 0
|
||||
|
||||
# Level up loop (handles large XP chunks)
|
||||
while self.xp >= self.xp_next_level:
|
||||
self.level += 1
|
||||
levels_gained += 1
|
||||
self.xp_next_level = xp_curve(self.xp, self.level)
|
||||
|
||||
return levels_gained
|
||||
|
||||
@dataclass
|
||||
class Location:
|
||||
current_town: str = "Brindlemark"
|
||||
current_nation: str = "Brindlemark"
|
||||
current_continent: str = "Brindlemark"
|
||||
|
||||
@dataclass
|
||||
class Equipment:
|
||||
"""References to equipped items (IDs or simple names for now)."""
|
||||
weapon: Optional[str] = None
|
||||
armor: Optional[str] = None
|
||||
|
||||
# -----------------------------
|
||||
# Main model
|
||||
# -----------------------------
|
||||
|
||||
@dataclass
|
||||
class Hero:
|
||||
# Identity
|
||||
hero_id: UUID = field(default_factory=uuid4)
|
||||
name: str = "Unnamed"
|
||||
|
||||
# Build
|
||||
race: Optional[Race] = None
|
||||
hero_class: Optional[HeroClass] = None
|
||||
|
||||
# Stats & state
|
||||
attributes: Attributes = field(default_factory=Attributes)
|
||||
resources: Resources = field(default_factory=Resources)
|
||||
progress: Progress = field(default_factory=Progress)
|
||||
location: Location = field(default_factory=Location)
|
||||
equipment: Equipment = field(default_factory=Equipment)
|
||||
collections: Collections = field(default_factory=Collections)
|
||||
|
||||
# RP bits
|
||||
origin_story: str = ""
|
||||
|
||||
# Auditing & versioning
|
||||
version: int = 1
|
||||
created_at: str = field(default_factory=_utcnow_iso)
|
||||
updated_at: str = field(default_factory=_utcnow_iso)
|
||||
|
||||
# -------------------------
|
||||
# Lifecycle helpers
|
||||
# -------------------------
|
||||
|
||||
def initialize(self, xp_curve: Callable[[int, int], int]) -> None:
|
||||
"""
|
||||
Call once after creation or after loading if you need to recompute derived fields.
|
||||
For example: set xp_next_level using your curve.
|
||||
"""
|
||||
self.progress.xp_next_level = xp_curve(self.progress.xp, self.progress.level)
|
||||
self.touch()
|
||||
|
||||
def touch(self) -> None:
|
||||
"""Update the `updated_at` timestamp."""
|
||||
self.updated_at = _utcnow_iso()
|
||||
|
||||
@classmethod
|
||||
def from_race_and_class(
|
||||
cls,
|
||||
name: str,
|
||||
race: Race | None,
|
||||
hero_class: HeroClass | None,
|
||||
class_registry: HeroDataRegistry | None = None,
|
||||
race_registry: RaceDataRegistry | None = None,
|
||||
*,
|
||||
# Optional global registries (if you’ve split catalogs):
|
||||
skill_registry: Any | None = None,
|
||||
spell_registry: Any | None = None,
|
||||
) -> Hero:
|
||||
"""
|
||||
Factory that builds a Hero and seeds starting skills/spells from race/class.
|
||||
Validates starting IDs against global registries when provided, otherwise
|
||||
falls back to merged local catalogs from the race/class sheets.
|
||||
"""
|
||||
# Always define locals so later checks are safe
|
||||
skills_catalog: Dict[str, Any] = {}
|
||||
spells_catalog: Dict[str, Any] = {}
|
||||
race_sheet = None
|
||||
cls_sheet = None
|
||||
|
||||
if class_registry and race_registry and hero_class and race:
|
||||
cls_sheet = class_registry.for_class(hero_class)
|
||||
race_sheet = race_registry.for_race(race)
|
||||
# Merge any local catalogs for fallback validation
|
||||
skills_catalog, spells_catalog = merge_catalogs(
|
||||
race_sheet.skills, race_sheet.spells,
|
||||
cls_sheet.skills, cls_sheet.spells
|
||||
)
|
||||
|
||||
starting_skills: list[str] = []
|
||||
starting_spells: list[str] = []
|
||||
if race_sheet:
|
||||
starting_skills.extend(race_sheet.starting_skills or [])
|
||||
starting_spells.extend(race_sheet.starting_spells or [])
|
||||
if cls_sheet:
|
||||
starting_skills.extend(cls_sheet.starting_skills or [])
|
||||
starting_spells.extend(cls_sheet.starting_spells or [])
|
||||
|
||||
# Dedup while preserving order
|
||||
def _dedup(seq: list[str]) -> list[str]:
|
||||
seen: set[str] = set()
|
||||
out: list[str] = []
|
||||
for x in seq:
|
||||
if x not in seen:
|
||||
out.append(x)
|
||||
seen.add(x)
|
||||
return out
|
||||
|
||||
starting_skills = _dedup(starting_skills)
|
||||
starting_spells = _dedup(starting_spells)
|
||||
|
||||
# Validate against global registries first, fallback to locals
|
||||
_validate_ids_exist(
|
||||
starting_skills, skills_catalog, "skill",
|
||||
owner=f"{race.value if race else 'unknown'}+{hero_class.value if hero_class else 'unknown'}",
|
||||
registry=skill_registry
|
||||
)
|
||||
_validate_ids_exist(
|
||||
starting_spells, spells_catalog, "spell",
|
||||
owner=f"{race.value if race else 'unknown'}+{hero_class.value if hero_class else 'unknown'}",
|
||||
registry=spell_registry
|
||||
)
|
||||
|
||||
hero = cls(name=name, race=race, hero_class=hero_class)
|
||||
hero.collections.skills = starting_skills
|
||||
hero.collections.spells = starting_spells
|
||||
hero.touch()
|
||||
return hero
|
||||
|
||||
# -------------------------
|
||||
# Progression
|
||||
# -------------------------
|
||||
|
||||
def gain_xp(
|
||||
self,
|
||||
amount: int,
|
||||
xp_curve: Callable[[int, int], int],
|
||||
class_registry: HeroDataRegistry | None = None,
|
||||
race_registry: RaceDataRegistry | None = None,
|
||||
*,
|
||||
# Optional global registries (preferred for validation)
|
||||
skill_registry: Any | None = None,
|
||||
spell_registry: Any | None = None,
|
||||
) -> int:
|
||||
"""
|
||||
Gain XP and apply level-ups via the provided xp_curve.
|
||||
If registries are provided, also grant any class/race skills & spells
|
||||
unlocked by the newly reached level(s).
|
||||
Returns number of levels gained.
|
||||
"""
|
||||
prev_level = self.progress.level
|
||||
levels = self.progress.gain_xp(amount, xp_curve)
|
||||
|
||||
# Always prepare empty local catalogs; may be filled below
|
||||
skills_catalog: Dict[str, Any] = {}
|
||||
spells_catalog: Dict[str, Any] = {}
|
||||
|
||||
if levels > 0 and class_registry and race_registry and self.hero_class and self.race:
|
||||
cls_sheet = class_registry.for_class(self.hero_class)
|
||||
race_sheet = race_registry.for_race(self.race)
|
||||
|
||||
# Optional: merge local catalogs for fallback validation
|
||||
skills_catalog, spells_catalog = merge_catalogs(
|
||||
race_sheet.skills, race_sheet.spells,
|
||||
cls_sheet.skills, cls_sheet.spells
|
||||
)
|
||||
|
||||
unlocked_skills: list[str] = []
|
||||
unlocked_spells: list[str] = []
|
||||
now_lvl = self.progress.level
|
||||
|
||||
# Unlock skills whose min_level is newly reached
|
||||
for nodes in (cls_sheet.skill_trees or {}).values():
|
||||
for ref in nodes:
|
||||
if ref.min_level is not None and prev_level < ref.min_level <= now_lvl:
|
||||
unlocked_skills.append(ref.id)
|
||||
|
||||
# Unlock spells by each level reached in the jump (handles 1→6, etc.)
|
||||
for L in range(prev_level + 1, now_lvl + 1):
|
||||
unlocked_spells.extend((cls_sheet.spells_by_level.get(L) or []))
|
||||
|
||||
# Validate using global registries when provided, fallback to locals
|
||||
owner = f"{self.race.value}+{self.hero_class.value}"
|
||||
_validate_ids_exist(unlocked_skills, skills_catalog, "skill", owner, registry=skill_registry)
|
||||
_validate_ids_exist(unlocked_spells, spells_catalog, "spell", owner, registry=spell_registry)
|
||||
|
||||
# Grant only new ones (dedupe)
|
||||
_unique_extend(self.collections.skills, unlocked_skills)
|
||||
_unique_extend(self.collections.spells, unlocked_spells)
|
||||
|
||||
# Your existing resource gains
|
||||
hp_gain = 8 * levels
|
||||
mp_gain = 6 * levels
|
||||
self.resources.maxhp += hp_gain
|
||||
self.resources.maxmp += mp_gain
|
||||
self.resources.hp = self.resources.maxhp
|
||||
self.resources.mp = self.resources.maxmp
|
||||
|
||||
self.touch()
|
||||
return levels
|
||||
|
||||
# -------------------------
|
||||
# (De)serialization helpers
|
||||
# -------------------------
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Convert to a pure-JSON-friendly dict (Enums → value, UUID → str).
|
||||
"""
|
||||
def _enum_to_value(obj: Any) -> Any:
|
||||
if isinstance(obj, Enum):
|
||||
return obj.value
|
||||
if isinstance(obj, UUID):
|
||||
return str(obj)
|
||||
return obj
|
||||
|
||||
raw = asdict(self)
|
||||
# Fix enum & UUID fields:
|
||||
raw["hero_id"] = str(self.hero_id)
|
||||
raw["hero_class"] = self.hero_class.value if self.hero_class else None
|
||||
return _walk_map(raw, _enum_to_value)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> Hero:
|
||||
"""
|
||||
Build Hero from a dict (reverse of to_dict).
|
||||
Accepts hero_class as string or None; hero_id as str or UUID.
|
||||
"""
|
||||
data = dict(data) # shallow copy
|
||||
|
||||
# Parse UUID
|
||||
hero_id_val = data.get("hero_id")
|
||||
if hero_id_val and not isinstance(hero_id_val, UUID):
|
||||
data["hero_id"] = UUID(hero_id_val)
|
||||
|
||||
# Parse enum
|
||||
hc = data.get("hero_class")
|
||||
if isinstance(hc, str) and hc:
|
||||
data["hero_class"] = HeroClass(hc)
|
||||
elif hc is None:
|
||||
data["hero_class"] = None
|
||||
|
||||
# Rebuild nested dataclasses if passed as dicts
|
||||
for k, typ in {
|
||||
"attributes": Attributes,
|
||||
"resources": Resources,
|
||||
"progress": Progress,
|
||||
"location": Location,
|
||||
"equipment": Equipment,
|
||||
"collections": Collections,
|
||||
}.items():
|
||||
if isinstance(data.get(k), dict):
|
||||
data[k] = typ(**data[k])
|
||||
|
||||
hero: Hero = cls(**data) # type: ignore[arg-type]
|
||||
return hero
|
||||
|
||||
# -----------------------------
|
||||
# Helpers
|
||||
# -----------------------------
|
||||
|
||||
def _walk_map(obj: Any, f) -> Any:
|
||||
"""Recursively map Python structures with function f applied to leaf values as needed."""
|
||||
if isinstance(obj, dict):
|
||||
return {k: _walk_map(v, f) for k, v in obj.items()}
|
||||
if isinstance(obj, list):
|
||||
return [_walk_map(x, f) for x in obj]
|
||||
return f(obj)
|
||||
|
||||
def _unique_extend(target: list[str], new_items: Iterable[str]) -> list[str]:
|
||||
"""Append only items not already present; return the list for convenience."""
|
||||
seen = set(target)
|
||||
for x in new_items:
|
||||
if x not in seen:
|
||||
target.append(x)
|
||||
seen.add(x)
|
||||
return target
|
||||
|
||||
def default_xp_curve(current_xp: int, level: int) -> int:
|
||||
"""
|
||||
Replace this with your real XP curve (e.g., g_utils.get_xp_for_next_level).
|
||||
Simple example: mild exponential growth.
|
||||
"""
|
||||
base = 100
|
||||
return base + int(50 * (level ** 1.5)) + (level * 25)
|
||||
|
||||
def _validate_ids_exist(
|
||||
ids: Iterable[str],
|
||||
local_catalog: Mapping[str, Any] | None,
|
||||
kind: str,
|
||||
owner: str,
|
||||
*,
|
||||
registry: Any | None = None, # e.g., SkillRegistry or SpellRegistry with .get()
|
||||
) -> None:
|
||||
"""
|
||||
Validate that each id is present either in the provided local_catalog OR
|
||||
resolvable via the optional global registry (registry.get(id) not None).
|
||||
"""
|
||||
local_catalog = local_catalog or {}
|
||||
reg_get = getattr(registry, "get", None) if registry is not None else None
|
||||
|
||||
missing: list[str] = []
|
||||
for _id in ids:
|
||||
in_local = _id in local_catalog
|
||||
in_global = bool(reg_get(_id)) if reg_get else False
|
||||
if not (in_local or in_global):
|
||||
missing.append(_id)
|
||||
|
||||
if missing:
|
||||
raise ValueError(f"{owner}: undefined {kind} id(s): {', '.join(missing)}")
|
||||
49
app/models/primitives.py
Normal file
49
app/models/primitives.py
Normal file
@@ -0,0 +1,49 @@
|
||||
# app/models/primitives.py
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List
|
||||
|
||||
@dataclass
|
||||
class Attributes:
|
||||
base_str: int = 0
|
||||
base_dex: int = 0
|
||||
base_int: int = 0
|
||||
base_wis: int = 0
|
||||
base_luk: int = 0
|
||||
base_cha: int = 0
|
||||
|
||||
@dataclass
|
||||
class Resources:
|
||||
maxhp: int = 0
|
||||
hp: int = 0
|
||||
maxmp: int = 0
|
||||
mp: int = 0
|
||||
gold: int = 0
|
||||
|
||||
def apply_damage(self, amount: int) -> None:
|
||||
"""Reduce HP by amount (cannot go below 0)."""
|
||||
self.hp = _clamp(self.hp - max(0, amount), 0, self.maxhp)
|
||||
|
||||
def heal(self, amount: int) -> None:
|
||||
"""Increase HP by amount (cannot exceed maxhp)."""
|
||||
self.hp = _clamp(self.hp + max(0, amount), 0, self.maxhp)
|
||||
|
||||
def spend_mana(self, amount: int) -> bool:
|
||||
"""Try to spend MP. Returns True if successful."""
|
||||
if amount <= self.mp:
|
||||
self.mp -= amount
|
||||
return True
|
||||
return False
|
||||
|
||||
def restore_mana(self, amount: int) -> None:
|
||||
"""Increase MP by amount (cannot exceed maxmp)."""
|
||||
self.mp = _clamp(self.mp + max(0, amount), 0, self.maxmp)
|
||||
|
||||
@dataclass
|
||||
class Collections:
|
||||
skills: List[str] = field(default_factory=list)
|
||||
spells: List[str] = field(default_factory=list)
|
||||
achievements: List[str] = field(default_factory=list)
|
||||
|
||||
def _clamp(value: int, lo: int, hi: int) -> int:
|
||||
"""Clamp integer between lo and hi."""
|
||||
return max(lo, min(hi, value))
|
||||
36
app/models/races.py
Normal file
36
app/models/races.py
Normal file
@@ -0,0 +1,36 @@
|
||||
# races.py
|
||||
from __future__ import annotations
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict, List, Any
|
||||
|
||||
# Reuse your existing types:
|
||||
|
||||
from app.utils.hero_catalog import SkillDef, SpellDef
|
||||
|
||||
from app.models.enums import Race
|
||||
from app.models.primitives import Attributes, Resources
|
||||
|
||||
@dataclass
|
||||
class RacialTrait:
|
||||
id: str
|
||||
data: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
@dataclass
|
||||
class RaceSheet:
|
||||
key: Race
|
||||
display_name: str
|
||||
|
||||
# Flat bases/mods (additive by default)
|
||||
base_attributes: Attributes = field(default_factory=Attributes)
|
||||
starting_resources: Resources = field(default_factory=Resources)
|
||||
|
||||
# Starting lists (IDs only)
|
||||
starting_skills: List[str] = field(default_factory=list)
|
||||
starting_spells: List[str] = field(default_factory=list)
|
||||
|
||||
# Optional local catalogs (definitions)
|
||||
skills: Dict[str, SkillDef] = field(default_factory=dict)
|
||||
spells: Dict[str, SpellDef] = field(default_factory=dict)
|
||||
|
||||
# Racial traits/abilities with free-form data
|
||||
traits: List[RacialTrait] = field(default_factory=list)
|
||||
143
app/services/appwrite_client.py
Normal file
143
app/services/appwrite_client.py
Normal file
@@ -0,0 +1,143 @@
|
||||
# app/services/appwrite_client.py
|
||||
from __future__ import annotations
|
||||
import os
|
||||
from typing import Optional, Dict, Any, Mapping, Union, List
|
||||
|
||||
from flask import session, redirect, url_for, request
|
||||
|
||||
|
||||
from appwrite.client import Client
|
||||
from appwrite.services.account import Account
|
||||
from appwrite.services.tables_db import TablesDB
|
||||
from appwrite.id import ID
|
||||
|
||||
|
||||
ENDPOINT = os.getenv("APPWRITE_ENDPOINT")
|
||||
PROJECT_ID = os.getenv("APPWRITE_PROJECT_ID")
|
||||
API_KEY = os.getenv("APPWRITE_API_KEY")
|
||||
|
||||
# SESSION USER OBJECT DICT NOTES
|
||||
# {
|
||||
# "$id": "6902663c000efa514a81",
|
||||
# "$createdAt": "2025-10-29T19:08:44.483+00:00",
|
||||
# "$updatedAt": "2025-10-31T00:28:26.422+00:00",
|
||||
# "name": "Test Account",
|
||||
# "registration": "2025-10-29T19:08:44.482+00:00",
|
||||
# "status": true,
|
||||
# "labels": [],
|
||||
# "passwordUpdate": "2025-10-29T19:08:44.482+00:00",
|
||||
# "email": "ptarrant@gmail.com",
|
||||
# "phone": "",
|
||||
# "emailVerification": false,
|
||||
# "phoneVerification": false,
|
||||
# "mfa": false,
|
||||
# "prefs": {},
|
||||
# "targets": [
|
||||
# {
|
||||
# "$id": "6902663c81f9f1a63f4c",
|
||||
# "$createdAt": "2025-10-29T19:08:44.532+00:00",
|
||||
# "$updatedAt": "2025-10-29T19:08:44.532+00:00",
|
||||
# "name": "",
|
||||
# "userId": "6902663c000efa514a81",
|
||||
# "providerId": null,
|
||||
# "providerType": "email",
|
||||
# "identifier": "ptarrant@gmail.com",
|
||||
# "expired": false
|
||||
# }
|
||||
# ],
|
||||
# "accessedAt": "2025-10-31T00:28:26.418+00:00"
|
||||
# }
|
||||
|
||||
class AppWriteClient:
|
||||
def __init__(self):
|
||||
self.session_key = f"a_session_{PROJECT_ID}"
|
||||
|
||||
def _get_admin_client(self):
|
||||
return (Client()
|
||||
.set_endpoint(ENDPOINT)
|
||||
.set_project(PROJECT_ID)
|
||||
.set_key(API_KEY)
|
||||
)
|
||||
|
||||
def _get_user_client(self):
|
||||
client = (Client()
|
||||
.set_endpoint(ENDPOINT)
|
||||
.set_project(PROJECT_ID)
|
||||
.set_forwarded_user_agent(request.headers.get('user-agent'))
|
||||
)
|
||||
|
||||
if session[self.session_key] is not None:
|
||||
client.set_session(session[self.session_key])
|
||||
|
||||
return client
|
||||
|
||||
def _refresh_user_session_data(self):
|
||||
user_client = self._get_user_client()
|
||||
user_account = Account(user_client)
|
||||
user = user_account.get()
|
||||
session['user']=user
|
||||
|
||||
def tables_get_client(self):
|
||||
admin_client = self._get_admin_client()
|
||||
tables = TablesDB(admin_client)
|
||||
return tables
|
||||
|
||||
def create_new_user(self, email:str, password:str, name:Optional[str]):
|
||||
admin_client = self._get_admin_client()
|
||||
try:
|
||||
admin_account = Account(admin_client)
|
||||
admin_account.create(user_id=ID.unique(),email=email,password=password,name=name)
|
||||
return True, ""
|
||||
except Exception as e:
|
||||
return False, e
|
||||
|
||||
def get_user_from_jwt_token(self, jwt_token:str):
|
||||
try:
|
||||
client = (Client()
|
||||
.set_endpoint(ENDPOINT)
|
||||
.set_project(PROJECT_ID)
|
||||
.set_jwt(jwt_token)
|
||||
)
|
||||
return Account(client).get()
|
||||
except Exception as e:
|
||||
return str(e)
|
||||
|
||||
def log_user_in(self, email:str,password:str):
|
||||
admin_client = self._get_admin_client()
|
||||
try:
|
||||
admin_account = Account(admin_client)
|
||||
user_session = admin_account.create_email_password_session(email,password)
|
||||
session[self.session_key]=user_session['secret']
|
||||
|
||||
self._refresh_user_session_data()
|
||||
|
||||
return True, ""
|
||||
except Exception as e:
|
||||
return False, str(e)
|
||||
|
||||
def log_user_out(self):
|
||||
try:
|
||||
user_client = self._get_user_client()
|
||||
user_account = Account(user_client)
|
||||
user_account.delete_sessions()
|
||||
return True
|
||||
except Exception as e:
|
||||
return True
|
||||
|
||||
def send_email_verification(self):
|
||||
user_client = self._get_user_client()
|
||||
user_account = Account(user_client)
|
||||
callback_url = url_for('auth.callback', _external=True)
|
||||
user_account.create_verification(url=callback_url)
|
||||
|
||||
def verify_email(self, user_id:str, secret:str):
|
||||
if session[self.session_key] is None:
|
||||
return False
|
||||
try:
|
||||
user_client = self._get_user_client()
|
||||
user_account = Account(user_client)
|
||||
user_account.update_email_verification(user_id,secret)
|
||||
self._refresh_user_session_data()
|
||||
return True
|
||||
except Exception as e:
|
||||
return False
|
||||
279
app/utils/api_response.py
Normal file
279
app/utils/api_response.py
Normal file
@@ -0,0 +1,279 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import asdict, is_dataclass
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Callable, Dict, Optional, Tuple, Union
|
||||
import uuid
|
||||
|
||||
from flask import jsonify, make_response, request, Flask, Response
|
||||
|
||||
|
||||
class ApiResponder:
|
||||
"""
|
||||
Centralized JSON response builder for Flask APIs.
|
||||
|
||||
This class enforces a consistent envelope for all responses:
|
||||
|
||||
{
|
||||
"app": "<APP NAME>",
|
||||
"version": "<APP VERSION>",
|
||||
"status": <HTTP STATUS CODE>,
|
||||
"timestamp": "<UTC ISO8601>",
|
||||
"request_id": "<optional request id>",
|
||||
"result": <your data OR null>,
|
||||
"error": {
|
||||
"code": "<optional machine code>",
|
||||
"message": "<human message>",
|
||||
"details": {...} # optional extras (validation fields, etc.)
|
||||
},
|
||||
"meta": { ... } # optional metadata (pagination, etc.)
|
||||
}
|
||||
|
||||
Usage:
|
||||
responder = ApiResponder(app_name="Code of Conquest",
|
||||
version_provider=lambda: CURRENT_VERSION)
|
||||
|
||||
return responder.ok({"hello": "world"})
|
||||
return responder.created({"id": 123})
|
||||
return responder.bad_request("Missing field `name`", details={"field": "name"})
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
app_name: str,
|
||||
version_provider: str,
|
||||
include_request_id: bool = True,
|
||||
default_headers: Optional[Dict[str, str]] = None,
|
||||
) -> None:
|
||||
"""
|
||||
:param app_name: Human-friendly app name included in every response.
|
||||
:param version_provider: Callable returning a version string at call time.
|
||||
:param include_request_id: When True, include request id (from X-Request-ID header if present, else generated).
|
||||
:param default_headers: Extra headers to attach to every response (e.g., CORS, caching).
|
||||
"""
|
||||
self.app_name = app_name
|
||||
self.version_provider = version_provider
|
||||
self.include_request_id = include_request_id
|
||||
self.default_headers = default_headers or {}
|
||||
|
||||
# ---------- Public helpers for common statuses ----------
|
||||
|
||||
def ok(self, result: Any = None, meta: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, str]] = None) -> Tuple[Response, int]:
|
||||
"""200 OK."""
|
||||
return self._build(status=200, result=result, meta=meta, headers=headers)
|
||||
|
||||
def created(self, result: Any = None, meta: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, str]] = None) -> Tuple[Response, int]:
|
||||
"""201 Created."""
|
||||
return self._build(status=201, result=result, meta=meta, headers=headers)
|
||||
|
||||
def accepted(self, result: Any = None, meta: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, str]] = None) -> Tuple[Response, int]:
|
||||
"""202 Accepted."""
|
||||
return self._build(status=202, result=result, meta=meta, headers=headers)
|
||||
|
||||
def no_content(self, headers: Optional[Dict[str, str]] = None) -> Tuple[Response, int]:
|
||||
"""
|
||||
204 No Content. Returns the standard envelope with result=null for consistency.
|
||||
(If you prefer an empty body, switch to make_response(("", 204)) in your code.)
|
||||
"""
|
||||
return self._build(status=204, result=None, headers=headers)
|
||||
|
||||
def bad_request(self, message: str, code: Optional[str] = None,
|
||||
details: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, str]] = None) -> Tuple[Response, int]:
|
||||
"""400 Bad Request."""
|
||||
return self._build_error(400, message, code, details, headers)
|
||||
|
||||
def unauthorized(self, message: str = "Unauthorized", code: Optional[str] = None,
|
||||
details: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, str]] = None) -> Tuple[Response, int]:
|
||||
"""401 Unauthorized."""
|
||||
return self._build_error(401, message, code, details, headers)
|
||||
|
||||
def forbidden(self, message: str = "Forbidden", code: Optional[str] = None,
|
||||
details: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, str]] = None) -> Tuple[Response, int]:
|
||||
"""403 Forbidden."""
|
||||
return self._build_error(403, message, code, details, headers)
|
||||
|
||||
def not_found(self, message: str = "Not Found", code: Optional[str] = None,
|
||||
details: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, str]] = None) -> Tuple[Response, int]:
|
||||
"""404 Not Found."""
|
||||
return self._build_error(404, message, code, details, headers)
|
||||
|
||||
def conflict(self, message: str = "Conflict", code: Optional[str] = None,
|
||||
details: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, str]] = None) -> Tuple[Response, int]:
|
||||
"""409 Conflict."""
|
||||
return self._build_error(409, message, code, details, headers)
|
||||
|
||||
def unprocessable(self, message: str = "Unprocessable Entity", code: Optional[str] = None,
|
||||
details: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, str]] = None) -> Tuple[Response, int]:
|
||||
"""422 Unprocessable Entity (great for validation errors)."""
|
||||
return self._build_error(422, message, code, details, headers)
|
||||
|
||||
def error(self, message: str = "Internal Server Error", code: Optional[str] = None,
|
||||
details: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, str]] = None) -> Tuple[Response, int]:
|
||||
"""500 Internal Server Error."""
|
||||
return self._build_error(500, message, code, details, headers)
|
||||
|
||||
# ---------- Pagination helper ----------
|
||||
|
||||
def paginate(self,
|
||||
items: Any,
|
||||
total: int,
|
||||
page: int,
|
||||
per_page: int,
|
||||
extra_meta: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, str]] = None) -> Tuple[Response, int]:
|
||||
"""
|
||||
200 OK with pagination metadata.
|
||||
|
||||
:param items: The current page of items (list or serializable value).
|
||||
:param total: Total count of items across all pages.
|
||||
:param page: 1-based page index.
|
||||
:param per_page: Page size.
|
||||
:param extra_meta: Optional extra metadata to merge into the meta block.
|
||||
"""
|
||||
# Build pagination metadata explicitly for clarity (no list comps).
|
||||
meta: Dict[str, Any] = {}
|
||||
meta["total"] = int(total)
|
||||
meta["page"] = int(page)
|
||||
meta["per_page"] = int(per_page)
|
||||
|
||||
# Compute total pages carefully with integer math.
|
||||
total_pages = int((total + per_page - 1) // per_page) if per_page > 0 else 0
|
||||
meta["total_pages"] = total_pages
|
||||
|
||||
if extra_meta is not None:
|
||||
for key in extra_meta:
|
||||
meta[key] = extra_meta[key]
|
||||
|
||||
return self._build(status=200, result=items, meta=meta, headers=headers)
|
||||
|
||||
# ---------- Exception binding (optional but handy) ----------
|
||||
|
||||
def register_error_handlers(self, app: Flask) -> None:
|
||||
"""
|
||||
Registers generic error handlers that convert exceptions into standard JSON.
|
||||
Override selectively in your app as needed.
|
||||
"""
|
||||
|
||||
@app.errorhandler(400)
|
||||
def _h400(e): # pragma: no cover
|
||||
return self.bad_request(getattr(e, "description", "Bad Request"))
|
||||
|
||||
@app.errorhandler(401)
|
||||
def _h401(e): # pragma: no cover
|
||||
return self.unauthorized(getattr(e, "description", "Unauthorized"))
|
||||
|
||||
@app.errorhandler(403)
|
||||
def _h403(e): # pragma: no cover
|
||||
return self.forbidden(getattr(e, "description", "Forbidden"))
|
||||
|
||||
@app.errorhandler(404)
|
||||
def _h404(e): # pragma: no cover
|
||||
return self.not_found(getattr(e, "description", "Not Found"))
|
||||
|
||||
@app.errorhandler(422)
|
||||
def _h422(e): # pragma: no cover
|
||||
message = getattr(e, "description", "Unprocessable Entity")
|
||||
# Marshmallow/WTF often attach data on e.data; include if present.
|
||||
details = getattr(e, "data", None)
|
||||
return self.unprocessable(message=message, details=details)
|
||||
|
||||
@app.errorhandler(500)
|
||||
def _h500(e): # pragma: no cover
|
||||
return self.error()
|
||||
|
||||
# ---------- Core builder ----------
|
||||
|
||||
def _build(self,
|
||||
status: int,
|
||||
result: Any = None,
|
||||
meta: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, str]] = None) -> Tuple[Response, int]:
|
||||
"""
|
||||
Build the canonical JSON body and Response object.
|
||||
"""
|
||||
# Convert dataclasses to plain dicts to keep jsonify happy.
|
||||
safe_result = self._to_plain(result)
|
||||
|
||||
body: Dict[str, Any] = {}
|
||||
body["app"] = self.app_name
|
||||
body["version"] = self.version_provider or ""
|
||||
body["status"] = int(status)
|
||||
body["timestamp"] = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
if self.include_request_id:
|
||||
# Prefer inbound request id if the client provided one.
|
||||
req_id = request.headers.get("X-Request-ID")
|
||||
if req_id is None or req_id == "":
|
||||
req_id = str(uuid.uuid4())
|
||||
body["request_id"] = req_id
|
||||
|
||||
body["result"] = safe_result
|
||||
|
||||
if meta is not None:
|
||||
body["meta"] = meta
|
||||
|
||||
response = make_response(jsonify(body), status)
|
||||
|
||||
# Attach default headers first, then per-call overrides.
|
||||
for key in self.default_headers:
|
||||
response.headers[key] = self.default_headers[key]
|
||||
if headers is not None:
|
||||
for key in headers:
|
||||
response.headers[key] = headers[key]
|
||||
|
||||
return response, status
|
||||
|
||||
def _build_error(self,
|
||||
status: int,
|
||||
message: str,
|
||||
code: Optional[str],
|
||||
details: Optional[Dict[str, Any]],
|
||||
headers: Optional[Dict[str, str]]) -> Tuple[Response, int]:
|
||||
"""
|
||||
Build a standardized error envelope.
|
||||
"""
|
||||
error_block: Dict[str, Any] = {}
|
||||
error_block["message"] = message
|
||||
|
||||
if code is not None and code != "":
|
||||
error_block["code"] = code
|
||||
|
||||
if details is not None:
|
||||
# Convert nested dataclasses if any.
|
||||
error_block["details"] = self._to_plain(details)
|
||||
|
||||
# Errors carry result=null
|
||||
return self._build(status=status, result=None, meta={"error": error_block}, headers=headers)
|
||||
|
||||
def _to_plain(self, value: Any) -> Any:
|
||||
"""
|
||||
Convert dataclasses to dicts recursively; leave other JSON-serializable values as-is.
|
||||
"""
|
||||
if is_dataclass(value):
|
||||
return asdict(value)
|
||||
|
||||
# Handle lists/tuples without comprehensions for clarity.
|
||||
if isinstance(value, (list, tuple)):
|
||||
converted = []
|
||||
for item in value:
|
||||
converted.append(self._to_plain(item))
|
||||
return converted
|
||||
|
||||
if isinstance(value, dict):
|
||||
converted_dict: Dict[str, Any] = {}
|
||||
for k in value:
|
||||
converted_dict[k] = self._to_plain(value[k])
|
||||
return converted_dict
|
||||
|
||||
# Let Flask jsonify handle the rest (numbers, strings, None, bool).
|
||||
return value
|
||||
181
app/utils/catalogs/hero_catalog.py
Normal file
181
app/utils/catalogs/hero_catalog.py
Normal file
@@ -0,0 +1,181 @@
|
||||
# hero_catalog.py
|
||||
from __future__ import annotations
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Literal, Any
|
||||
|
||||
import yaml # PyYAML
|
||||
|
||||
from app.models.enums import HeroClass
|
||||
from app.models.primitives import Attributes, Resources
|
||||
|
||||
TargetType = Literal["self", "ally", "all_allies", "single_enemy", "all_enemies", "jumps_3_targets"]
|
||||
|
||||
@dataclass
|
||||
class StatusSpec:
|
||||
chance: float
|
||||
potency: int
|
||||
duration_turns: int
|
||||
|
||||
@dataclass
|
||||
class SkillRef:
|
||||
id: str
|
||||
min_level: int = 1
|
||||
|
||||
@dataclass
|
||||
class SpellDef:
|
||||
id: str
|
||||
element: Optional[str] = None
|
||||
cost_mp: int = 0
|
||||
power: int = 0
|
||||
scaling: Dict[str, float] = field(default_factory=dict) # e.g., {"int": 0.9, "luk": 0.03}
|
||||
variance: float = 0.0
|
||||
target: TargetType = "single_enemy"
|
||||
tags: List[str] = field(default_factory=list)
|
||||
status_chance: Dict[str, StatusSpec] = field(default_factory=dict) # {"burn": StatusSpec(...)}
|
||||
crit_bonus: Optional[Dict[str, float]] = None # {"chance":0.05,"multiplier":1.5}
|
||||
|
||||
@staticmethod
|
||||
def from_yaml(id_: str, raw: Dict[str, Any]) -> "SpellDef":
|
||||
sc = {}
|
||||
for k, v in (raw.get("status_chance") or {}).items():
|
||||
sc[k] = StatusSpec(**v)
|
||||
return SpellDef(
|
||||
id=id_,
|
||||
element=raw.get("element"),
|
||||
cost_mp=int((raw.get("cost") or {}).get("mp", 0)),
|
||||
power=int(raw.get("power", 0)),
|
||||
scaling={k: float(v) for k, v in (raw.get("scaling") or {}).items()},
|
||||
variance=float(raw.get("variance", 0.0)),
|
||||
target=(raw.get("target") or "single_enemy"),
|
||||
tags=list(raw.get("tags") or []),
|
||||
status_chance=sc,
|
||||
crit_bonus=raw.get("crit_bonus"),
|
||||
)
|
||||
|
||||
@dataclass
|
||||
class SkillDef:
|
||||
id: str
|
||||
type: Literal["active", "passive", "buff", "debuff", "utility"] = "active"
|
||||
cost_mp: int = 0
|
||||
tags: List[str] = field(default_factory=list)
|
||||
# Free-form payloads your engine can interpret:
|
||||
passive_modifiers: Dict[str, Any] = field(default_factory=dict) # e.g., damage_multiplier_vs_tags.fire=1.10
|
||||
effects: List[Dict[str, Any]] = field(default_factory=list) # e.g., [{"kind":"buff","stat":"int","amount":2,...}]
|
||||
|
||||
@staticmethod
|
||||
def from_yaml(id_: str, raw: Dict[str, Any]) -> "SkillDef":
|
||||
return SkillDef(
|
||||
id=id_,
|
||||
type=(raw.get("type") or "active"),
|
||||
cost_mp=int((raw.get("cost") or {}).get("mp", 0)),
|
||||
tags=list(raw.get("tags") or []),
|
||||
passive_modifiers=dict(raw.get("passive_modifiers") or {}),
|
||||
effects=list(raw.get("effects") or []),
|
||||
)
|
||||
|
||||
@dataclass
|
||||
class HeroArchetype:
|
||||
key: HeroClass
|
||||
display_name: str
|
||||
background: str
|
||||
base_attributes: Attributes = field(default_factory=Attributes)
|
||||
starting_resources: Resources = field(default_factory=Resources)
|
||||
starting_skills: List[str] = field(default_factory=list)
|
||||
starting_spells: List[str] = field(default_factory=list)
|
||||
skill_trees: Dict[str, List[SkillRef]] = field(default_factory=dict)
|
||||
spells_by_level: Dict[int, List[str]] = field(default_factory=dict)
|
||||
bonus_abilities: List[str] = field(default_factory=list)
|
||||
|
||||
# Local, per-class catalogs (optional—can be empty if you centralize elsewhere)
|
||||
skills: Dict[str, SkillDef] = field(default_factory=dict)
|
||||
spells: Dict[str, SpellDef] = field(default_factory=dict)
|
||||
|
||||
|
||||
class HeroDataRegistry:
|
||||
"""
|
||||
In-memory catalog of YAML-defined class sheets.
|
||||
Keyed by HeroClass (enum).
|
||||
"""
|
||||
def __init__(self) -> None:
|
||||
self._by_class: Dict[HeroClass, HeroArchetype] = {}
|
||||
self._skills: Dict[str, SkillDef] = {}
|
||||
self._spells: Dict[str, SpellDef] = {}
|
||||
|
||||
|
||||
def load_dir(self, directory: Path | str) -> None:
|
||||
directory = Path(directory)
|
||||
for path in sorted(directory.glob("*.y*ml")):
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
raw = yaml.safe_load(f) or {}
|
||||
|
||||
# --- required ---
|
||||
key_raw = raw.get("key")
|
||||
if not key_raw:
|
||||
raise ValueError(f"{path.name}: missing required 'key' field")
|
||||
key = HeroClass(key_raw) # validate against Enum
|
||||
|
||||
# --- optional / nested ---
|
||||
base_attributes = Attributes(**(raw.get("base_attributes") or {}))
|
||||
starting_resources = Resources(**(raw.get("starting_resources") or {}))
|
||||
|
||||
# trees
|
||||
raw_trees = raw.get("skill_trees") or {}
|
||||
trees: Dict[str, List[SkillRef]] = {}
|
||||
for tree_name, nodes in raw_trees.items():
|
||||
typed_nodes: List[SkillRef] = []
|
||||
if nodes:
|
||||
for node in nodes:
|
||||
typed_nodes.append(SkillRef(**node))
|
||||
trees[tree_name] = typed_nodes
|
||||
|
||||
# spells by level (keys may be strings in YAML)
|
||||
sbl_in = raw.get("spells_by_level") or {}
|
||||
spells_by_level: Dict[int, List[str]] = {}
|
||||
for lvl_key, spell_list in sbl_in.items():
|
||||
lvl = int(lvl_key)
|
||||
spells_by_level[lvl] = list(spell_list or [])
|
||||
|
||||
arch = HeroArchetype(
|
||||
key=key,
|
||||
display_name=raw.get("display_name", key.value.title()),
|
||||
background=raw.get("background","A person with an unknown origin"),
|
||||
base_attributes=base_attributes,
|
||||
starting_resources=starting_resources,
|
||||
starting_skills=list(raw.get("starting_skills") or []),
|
||||
starting_spells=list(raw.get("starting_spells") or []),
|
||||
skill_trees=trees,
|
||||
spells_by_level=spells_by_level,
|
||||
bonus_abilities=list(raw.get("bonus_abilities") or []),
|
||||
)
|
||||
|
||||
# parse local catalogs if present:
|
||||
skills_raw = raw.get("skills") or {}
|
||||
for sid, sdef in skills_raw.items():
|
||||
arch.skills[sid] = SkillDef.from_yaml(sid, sdef)
|
||||
self._skills[sid] = arch.skills[sid] # promote to global map
|
||||
|
||||
spells_raw = raw.get("spells") or {}
|
||||
for spid, spdef in spells_raw.items():
|
||||
arch.spells[spid] = SpellDef.from_yaml(spid, spdef)
|
||||
self._spells[spid] = arch.spells[spid]
|
||||
|
||||
self._by_class[key] = arch
|
||||
|
||||
# Lookups (prefer global catalogs for cross-class reuse)
|
||||
def get_spell(self, spell_id: str) -> SpellDef:
|
||||
try:
|
||||
return self._spells[spell_id]
|
||||
except KeyError:
|
||||
raise KeyError(f"Unknown spell id '{spell_id}'")
|
||||
|
||||
def get_skill(self, skill_id: str) -> SkillDef:
|
||||
try:
|
||||
return self._skills[skill_id]
|
||||
except KeyError:
|
||||
raise KeyError(f"Unknown skill id '{skill_id}'")
|
||||
|
||||
def for_class(self, hero_class: HeroClass) -> HeroArchetype:
|
||||
if hero_class not in self._by_class:
|
||||
raise KeyError(f"No archetype loaded for class {hero_class.value}")
|
||||
return self._by_class[hero_class]
|
||||
72
app/utils/catalogs/race_catalog.py
Normal file
72
app/utils/catalogs/race_catalog.py
Normal file
@@ -0,0 +1,72 @@
|
||||
# race_catalog.py
|
||||
from __future__ import annotations
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Any
|
||||
import yaml
|
||||
|
||||
|
||||
from app.models.enums import Race
|
||||
from app.models.primitives import Attributes, Resources
|
||||
from app.models.races import RaceSheet, RacialTrait
|
||||
from app.utils.hero_catalog import SkillDef, SpellDef
|
||||
|
||||
class RaceDataRegistry:
|
||||
"""
|
||||
In-memory catalog of YAML-defined race sheets.
|
||||
"""
|
||||
def __init__(self) -> None:
|
||||
self._by_race: Dict[Race, RaceSheet] = {}
|
||||
# promoted global catalogs for cross-use at runtime (optional)
|
||||
self._skills: Dict[str, SkillDef] = {}
|
||||
self._spells: Dict[str, SpellDef] = {}
|
||||
|
||||
def load_dir(self, directory: str | Path) -> None:
|
||||
directory = Path(directory)
|
||||
for path in sorted(directory.glob("*.y*ml")):
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
raw = yaml.safe_load(f) or {}
|
||||
|
||||
key_raw = raw.get("key")
|
||||
if not key_raw:
|
||||
raise ValueError(f"{path.name}: missing required 'key'")
|
||||
race = Race(key_raw) # validates enum
|
||||
|
||||
base_attributes = Attributes(**(raw.get("base_attributes") or {}))
|
||||
starting_resources = Resources(**(raw.get("starting_resources") or {}))
|
||||
|
||||
sheet = RaceSheet(
|
||||
key=race,
|
||||
display_name=raw.get("display_name", race.value.title()),
|
||||
base_attributes=base_attributes,
|
||||
starting_resources=starting_resources,
|
||||
starting_skills=list(raw.get("starting_skills") or []),
|
||||
starting_spells=list(raw.get("starting_spells") or []),
|
||||
)
|
||||
|
||||
# Local skill/spell catalogs (optional)
|
||||
for sid, sdef in (raw.get("skills") or {}).items():
|
||||
s = SkillDef.from_yaml(sid, sdef)
|
||||
sheet.skills[sid] = s
|
||||
self._skills[sid] = s
|
||||
|
||||
for spid, spdef in (raw.get("spells") or {}).items():
|
||||
sp = SpellDef.from_yaml(spid, spdef)
|
||||
sheet.spells[spid] = sp
|
||||
self._spells[spid] = sp
|
||||
|
||||
# Traits
|
||||
traits_raw = raw.get("traits") or []
|
||||
for t in traits_raw:
|
||||
sheet.traits.append(RacialTrait(id=t.get("id"), data=dict(t.get("data") or {})))
|
||||
|
||||
self._by_race[race] = sheet
|
||||
|
||||
def for_race(self, race: Race) -> RaceSheet:
|
||||
if race not in self._by_race:
|
||||
raise KeyError(f"No race sheet loaded for {race.value}")
|
||||
return self._by_race[race]
|
||||
|
||||
# Optional global lookups (if you want to fetch a skill from race-only files)
|
||||
def get_skill(self, skill_id: str) -> SkillDef: return self._skills[skill_id]
|
||||
def get_spell(self, spell_id: str) -> SpellDef: return self._spells[spell_id]
|
||||
30
app/utils/catalogs/skill_catalog.py
Normal file
30
app/utils/catalogs/skill_catalog.py
Normal file
@@ -0,0 +1,30 @@
|
||||
# app/utils/skill_registry.py
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional
|
||||
import yaml
|
||||
|
||||
@dataclass
|
||||
class Skill:
|
||||
id: str
|
||||
name: str
|
||||
description: str
|
||||
tags: List[str]
|
||||
max_rank: int = 1
|
||||
|
||||
class SkillRegistry:
|
||||
def __init__(self) -> None:
|
||||
self.by_id: Dict[str, Skill] = {}
|
||||
|
||||
def load_file(self, path: Path) -> None:
|
||||
data = yaml.safe_load(path.read_text()) or []
|
||||
for raw in data:
|
||||
skill = Skill(**raw)
|
||||
self.by_id[skill.id] = skill
|
||||
|
||||
def load_dir(self, root: Path) -> None:
|
||||
for p in sorted(root.glob("*.y*ml")):
|
||||
self.load_file(p)
|
||||
|
||||
def get(self, skill_id: str) -> Optional[Skill]:
|
||||
return self.by_id.get(skill_id)
|
||||
33
app/utils/catalogs/spell_catalog.py
Normal file
33
app/utils/catalogs/spell_catalog.py
Normal file
@@ -0,0 +1,33 @@
|
||||
# app/utils/spell_registry.py
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional
|
||||
import yaml
|
||||
|
||||
@dataclass
|
||||
class Spell:
|
||||
id: str
|
||||
name: str
|
||||
school: str
|
||||
element: Optional[str] = None
|
||||
rank: int = 1
|
||||
cost_mp: int = 0
|
||||
description: str = ""
|
||||
aoe: Optional[str] = None
|
||||
|
||||
class SpellRegistry:
|
||||
def __init__(self) -> None:
|
||||
self.by_id: Dict[str, Spell] = {}
|
||||
|
||||
def load_file(self, path: Path) -> None:
|
||||
data = yaml.safe_load(path.read_text()) or []
|
||||
for raw in data:
|
||||
spell = Spell(**raw)
|
||||
self.by_id[spell.id] = spell
|
||||
|
||||
def load_dir(self, root: Path) -> None:
|
||||
for p in sorted(root.glob("*.y*ml")):
|
||||
self.load_file(p)
|
||||
|
||||
def get(self, spell_id: str) -> Optional[Spell]:
|
||||
return self.by_id.get(spell_id)
|
||||
149
app/utils/logging.py
Normal file
149
app/utils/logging.py
Normal file
@@ -0,0 +1,149 @@
|
||||
"""
|
||||
Structured logging setup for Code of Conquest.
|
||||
|
||||
Environment-aware behavior:
|
||||
- dev → colorful console output (human-friendly)
|
||||
- test/prod → JSON logs (machine-friendly)
|
||||
|
||||
Features:
|
||||
- Includes logger name, level, filename, and line number
|
||||
- Unified stdlib + structlog integration
|
||||
- Respects LOG_LEVEL from environment
|
||||
- Works with both ChatGPT/Ollama components and Web/TUI layers
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
import structlog
|
||||
from structlog.dev import ConsoleRenderer
|
||||
from structlog.processors import JSONRenderer, TimeStamper, CallsiteParameterAdder, CallsiteParameter
|
||||
from structlog.stdlib import ProcessorFormatter
|
||||
|
||||
|
||||
@dataclass
|
||||
class _ConfiguredState:
|
||||
configured: bool = False
|
||||
level_name: str = "INFO"
|
||||
|
||||
|
||||
_state = _ConfiguredState()
|
||||
|
||||
|
||||
def _level_from_name(name: str) -> int:
|
||||
mapping = {
|
||||
"CRITICAL": logging.CRITICAL,
|
||||
"ERROR": logging.ERROR,
|
||||
"WARNING": logging.WARNING,
|
||||
"INFO": logging.INFO,
|
||||
"DEBUG": logging.DEBUG,
|
||||
"NOTSET": logging.NOTSET,
|
||||
}
|
||||
return mapping.get((name or "INFO").upper(), logging.INFO)
|
||||
|
||||
|
||||
def _shared_processors():
|
||||
"""
|
||||
Processors common to both console and JSON pipelines.
|
||||
Adds level, logger name, and callsite metadata.
|
||||
"""
|
||||
return [
|
||||
structlog.stdlib.add_log_level,
|
||||
structlog.stdlib.add_logger_name,
|
||||
CallsiteParameterAdder(
|
||||
parameters=[
|
||||
CallsiteParameter.FILENAME,
|
||||
CallsiteParameter.LINENO,
|
||||
CallsiteParameter.FUNC_NAME,
|
||||
]
|
||||
),
|
||||
structlog.processors.StackInfoRenderer(),
|
||||
structlog.processors.format_exc_info,
|
||||
]
|
||||
|
||||
|
||||
|
||||
def _console_renderer():
|
||||
"""Pretty colored logs for development."""
|
||||
return ConsoleRenderer()
|
||||
|
||||
|
||||
def _json_renderer():
|
||||
"""Machine-friendly JSON logs for test/prod."""
|
||||
return JSONRenderer(sort_keys=True)
|
||||
|
||||
|
||||
def configure_logging(settings=None) -> None:
|
||||
"""
|
||||
Configure structlog + stdlib logging once for the process.
|
||||
"""
|
||||
if _state.configured:
|
||||
return
|
||||
|
||||
if settings is None:
|
||||
from app.utils.settings import get_settings # lazy import
|
||||
settings = get_settings()
|
||||
|
||||
env = settings.env.value
|
||||
level_name = settings.log_level or "INFO"
|
||||
level = _level_from_name(level_name)
|
||||
|
||||
# Choose renderers
|
||||
if env == "dev":
|
||||
renderer = _console_renderer()
|
||||
foreign_pre_chain = _shared_processors()
|
||||
else:
|
||||
renderer = _json_renderer()
|
||||
foreign_pre_chain = _shared_processors() + [
|
||||
TimeStamper(fmt="iso", utc=True, key="ts")
|
||||
]
|
||||
|
||||
# stdlib -> structlog bridge
|
||||
handler = logging.StreamHandler(sys.stdout)
|
||||
handler.setLevel(level)
|
||||
handler.setFormatter(
|
||||
ProcessorFormatter(
|
||||
processor=renderer,
|
||||
foreign_pre_chain=foreign_pre_chain,
|
||||
)
|
||||
)
|
||||
|
||||
root = logging.getLogger()
|
||||
root.handlers.clear()
|
||||
root.setLevel(level)
|
||||
root.addHandler(handler)
|
||||
|
||||
# Quiet noisy libs
|
||||
logging.getLogger("urllib3").setLevel(logging.WARNING)
|
||||
logging.getLogger("httpx").setLevel(logging.WARNING)
|
||||
logging.getLogger("uvicorn").setLevel(logging.INFO)
|
||||
|
||||
# structlog pipeline
|
||||
structlog.configure(
|
||||
processors=[
|
||||
structlog.contextvars.merge_contextvars,
|
||||
structlog.stdlib.filter_by_level,
|
||||
*foreign_pre_chain,
|
||||
ProcessorFormatter.wrap_for_formatter, # hand off to renderer
|
||||
],
|
||||
wrapper_class=structlog.stdlib.BoundLogger,
|
||||
context_class=dict,
|
||||
logger_factory=structlog.stdlib.LoggerFactory(),
|
||||
cache_logger_on_first_use=True,
|
||||
)
|
||||
|
||||
_state.configured = True
|
||||
_state.level_name = level_name
|
||||
|
||||
|
||||
def get_logger(name: Optional[str] = None) -> structlog.stdlib.BoundLogger:
|
||||
"""
|
||||
Retrieve a structlog logger.
|
||||
"""
|
||||
if name is None:
|
||||
return structlog.get_logger()
|
||||
return structlog.get_logger(name)
|
||||
69
app/utils/merging.py
Normal file
69
app/utils/merging.py
Normal file
@@ -0,0 +1,69 @@
|
||||
# merging.py
|
||||
from __future__ import annotations
|
||||
from typing import Iterable, Dict, List, Tuple
|
||||
|
||||
from app.models.primitives import Attributes, Resources
|
||||
from app.models.races import RaceSheet
|
||||
from app.utils.hero_catalog import HeroArchetype, SkillDef, SpellDef
|
||||
|
||||
|
||||
def _add_attributes(a: Attributes, b: Attributes) -> Attributes:
|
||||
return Attributes(
|
||||
base_str=a.base_str + b.base_str,
|
||||
base_dex=a.base_dex + b.base_dex,
|
||||
base_int=a.base_int + b.base_int,
|
||||
base_wis=a.base_wis + b.base_wis,
|
||||
base_luk=a.base_luk + b.base_luk,
|
||||
base_cha=a.base_cha + b.base_cha,
|
||||
)
|
||||
|
||||
def _add_resources(a: Resources, b: Resources) -> Resources:
|
||||
return Resources(
|
||||
maxhp=a.maxhp + b.maxhp,
|
||||
hp=a.hp + b.hp,
|
||||
maxmp=a.maxmp + b.maxmp,
|
||||
mp=a.mp + b.mp,
|
||||
gold=a.gold + b.gold if hasattr(a, "gold") else getattr(b, "gold", 0),
|
||||
)
|
||||
|
||||
def _unique_chain(strings: Iterable[str]) -> List[str]:
|
||||
seen = set()
|
||||
out: List[str] = []
|
||||
for s in strings:
|
||||
if s not in seen:
|
||||
out.append(s)
|
||||
seen.add(s)
|
||||
return out
|
||||
|
||||
def merge_catalogs(
|
||||
race_skills: Dict[str, "SkillDef"],
|
||||
race_spells: Dict[str, "SpellDef"],
|
||||
class_skills: Dict[str, "SkillDef"],
|
||||
class_spells: Dict[str, "SpellDef"],
|
||||
) -> Tuple[Dict[str, "SkillDef"], Dict[str, "SpellDef"]]:
|
||||
"""
|
||||
Return merged skill and spell maps. By default, CLASS overrides RACE on conflicts.
|
||||
"""
|
||||
skills = dict(race_skills)
|
||||
skills.update(class_skills)
|
||||
|
||||
spells = dict(race_spells)
|
||||
spells.update(class_spells)
|
||||
|
||||
return skills, spells
|
||||
|
||||
def merge_sheets(race: RaceSheet, cls: HeroArchetype):
|
||||
"""
|
||||
Compute final base Attributes/Resources and starting lists, plus merged catalogs.
|
||||
"""
|
||||
attrs = _add_attributes(race.base_attributes, cls.base_attributes)
|
||||
res = _add_resources(race.starting_resources, cls.starting_resources)
|
||||
|
||||
starting_skills = _unique_chain([*race.starting_skills, *cls.starting_skills])
|
||||
starting_spells = _unique_chain([*race.starting_spells, *cls.starting_spells])
|
||||
|
||||
skills_catalog, spells_catalog = merge_catalogs(
|
||||
race.skills, race.spells, cls.skills, cls.spells
|
||||
)
|
||||
|
||||
return attrs, res, starting_skills, starting_spells, skills_catalog, spells_catalog
|
||||
129
app/utils/settings.py
Normal file
129
app/utils/settings.py
Normal file
@@ -0,0 +1,129 @@
|
||||
"""
|
||||
Environment-aware settings for Code of Conquest.
|
||||
|
||||
- Loads environment variables from OS and `.env` (OS wins).
|
||||
- Provides repo-relative default paths for data storage.
|
||||
- Validates a few key fields (env, model backend).
|
||||
- Ensures important directories exist on first load.
|
||||
- Exposes a tiny singleton: get_settings().
|
||||
|
||||
Style:
|
||||
- Python 3.11+
|
||||
- Dataclasses (no Pydantic)
|
||||
- Docstrings + inline comments
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
class Environment(str, Enum):
|
||||
DEV = "dev"
|
||||
TEST = "test"
|
||||
PROD = "prod"
|
||||
|
||||
def _repo_root_from_here() -> Path:
|
||||
"""
|
||||
Resolve the repository root by walking up from this file.
|
||||
|
||||
This file lives at: project/app/core/utils/settings.py
|
||||
So parents[3] should be the repo root:
|
||||
parents[0] = utils
|
||||
parents[1] = core
|
||||
parents[2] = app
|
||||
parents[3] = project root
|
||||
"""
|
||||
here = Path(__file__).resolve()
|
||||
repo_root = here.parents[3]
|
||||
return repo_root
|
||||
|
||||
|
||||
@dataclass
|
||||
class Settings:
|
||||
"""
|
||||
Settings container for Code of Conquest.
|
||||
|
||||
Load order:
|
||||
1) OS environment
|
||||
2) .env file (at repo root)
|
||||
3) Defaults below
|
||||
|
||||
Paths default into the repo under ./data unless overridden.
|
||||
"""
|
||||
|
||||
# --- Core Tunables---
|
||||
env: Environment = Environment.DEV
|
||||
log_level: str = "INFO"
|
||||
flask_secret_key: str = os.getenv("FLASK_SECRET_KEY","change-me-for-prod")
|
||||
|
||||
# APPWRITE Things
|
||||
appwrite_endpoint: str = os.getenv("APPWRITE_ENDPOINT","NOT SET")
|
||||
appwrite_project_id: str = os.getenv("APPWRITE_PROJECT_ID","NOT SET")
|
||||
appwrite_api_key: str = os.getenv("APPWRITE_API_KEY","NOT SET")
|
||||
|
||||
app_name: str = "Code of Conquest"
|
||||
app_version: str = "0.0.1"
|
||||
|
||||
# --- Paths (default under ./data) ---
|
||||
repo_root: Path = field(default_factory=_repo_root_from_here)
|
||||
|
||||
# --- Build paths for convenience (not env-controlled directly) ---
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
# Basic validation
|
||||
if self.env not in (Environment.DEV, Environment.TEST, Environment.PROD):
|
||||
raise ValueError(f"Invalid COC_ENV: {self.env}")
|
||||
|
||||
@staticmethod
|
||||
def _ensure_dir(path: Path) -> None:
|
||||
if path is None:
|
||||
return
|
||||
if not path.exists():
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
# ---- Singleton loader ----
|
||||
_settings_singleton: Optional[Settings] = None
|
||||
|
||||
|
||||
def get_settings() -> Settings:
|
||||
"""
|
||||
Load settings from environment and `.env` once, then reuse.
|
||||
OS env always takes precedence over `.env`.
|
||||
|
||||
Returns:
|
||||
Settings: A process-wide singleton instance.
|
||||
"""
|
||||
global _settings_singleton
|
||||
if _settings_singleton is not None:
|
||||
return _settings_singleton
|
||||
|
||||
# Load .env from repo root
|
||||
repo_root = _repo_root_from_here()
|
||||
dotenv_path = repo_root / ".env"
|
||||
load_dotenv(dotenv_path=dotenv_path, override=False)
|
||||
|
||||
# Environment
|
||||
env_str = os.getenv("COC_ENV", "dev").strip().lower()
|
||||
if env_str == "dev":
|
||||
env_val = Environment.DEV
|
||||
elif env_str == "test":
|
||||
env_val = Environment.TEST
|
||||
elif env_str == "prod":
|
||||
env_val = Environment.PROD
|
||||
else:
|
||||
raise ValueError(f"COC_ENV must be one of dev|test|prod, got '{env_str}'")
|
||||
|
||||
# Construct settings
|
||||
_settings_singleton = Settings(
|
||||
env=env_val,
|
||||
log_level=os.getenv("LOG_LEVEL", "INFO").strip().upper(),
|
||||
)
|
||||
|
||||
return _settings_singleton
|
||||
10
app/utils/typed_flask.py
Normal file
10
app/utils/typed_flask.py
Normal file
@@ -0,0 +1,10 @@
|
||||
# app/core/typed_flask.py
|
||||
from flask import Flask
|
||||
from typing import Optional
|
||||
from app.utils.api_response import ApiResponder
|
||||
|
||||
class CoCFlask(Flask):
|
||||
"""
|
||||
A typed subclass of Flask that includes an `api` attribute for IDE support.
|
||||
"""
|
||||
api: Optional[ApiResponder] = None
|
||||
Reference in New Issue
Block a user