Files
Code_of_Conquest/api/app/services/combat_service.py
Phillip Tarrant a38906b445 feat(api): integrate equipment stats into combat damage system
Equipment-Combat Integration:
- Update Stats damage formula from STR//2 to int(STR*0.75) for better scaling
- Add spell_power system for magical weapons (staves, wands)
- Add spell_power_bonus field to Stats model with spell_power property
- Add spell_power field to Item model with is_magical_weapon() method
- Update Character.get_effective_stats() to populate spell_power_bonus

Combatant Model Updates:
- Add weapon property fields (crit_chance, crit_multiplier, damage_type)
- Add elemental weapon support (elemental_damage_type, physical_ratio, elemental_ratio)
- Update serialization to handle new weapon properties

DamageCalculator Refactoring:
- Remove weapon_damage parameter from calculate_physical_damage()
- Use attacker_stats.damage directly (includes weapon bonus)
- Use attacker_stats.spell_power for magical damage calculations

Combat Service Updates:
- Extract weapon properties in _create_combatant_from_character()
- Use stats.damage_bonus for enemy combatants from templates
- Remove hardcoded _get_weapon_damage() method
- Handle elemental weapons with split damage in _execute_attack()

Item Generation Updates:
- Add base_spell_power to BaseItemTemplate dataclass
- Add ARCANE damage type to DamageType enum
- Add magical weapon templates (wizard_staff, arcane_staff, wand, crystal_wand)

Test Updates:
- Update test_stats.py for new damage formula (0.75 scaling)
- Update test_character.py for equipment bonus calculations
- Update test_damage_calculator.py for new API signatures
- Update test_combat_service.py mock fixture for equipped attribute

Tests: 174 passing
2025-11-26 19:54:58 -06:00

1097 lines
38 KiB
Python

"""
Combat Service - Orchestrates turn-based combat encounters.
This service manages the full combat lifecycle including:
- Starting combat with characters and enemies
- Executing player and enemy actions
- Processing damage, effects, and state changes
- Distributing rewards on victory
"""
import random
from dataclasses import dataclass, field
from typing import Dict, Any, List, Optional, Tuple
from uuid import uuid4
from app.models.combat import Combatant, CombatEncounter
from app.models.character import Character
from app.models.enemy import EnemyTemplate
from app.models.stats import Stats
from app.models.abilities import Ability, AbilityLoader
from app.models.effects import Effect
from app.models.items import Item
from app.models.enums import CombatStatus, AbilityType, DamageType, EffectType
from app.services.damage_calculator import DamageCalculator, DamageResult
from app.services.enemy_loader import EnemyLoader, get_enemy_loader
from app.services.session_service import get_session_service
from app.services.character_service import get_character_service
from app.utils.logging import get_logger
logger = get_logger(__file__)
# =============================================================================
# Supporting Dataclasses
# =============================================================================
@dataclass
class CombatAction:
"""
Represents a combat action to be executed.
Attributes:
action_type: Type of action (attack, ability, defend, flee, item)
target_ids: List of target combatant IDs
ability_id: Ability to use (for ability actions)
item_id: Item to use (for item actions)
"""
action_type: str
target_ids: List[str] = field(default_factory=list)
ability_id: Optional[str] = None
item_id: Optional[str] = None
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'CombatAction':
"""Create CombatAction from dictionary."""
return cls(
action_type=data["action_type"],
target_ids=data.get("target_ids", []),
ability_id=data.get("ability_id"),
item_id=data.get("item_id"),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"action_type": self.action_type,
"target_ids": self.target_ids,
"ability_id": self.ability_id,
"item_id": self.item_id,
}
@dataclass
class ActionResult:
"""
Result of executing a combat action.
Attributes:
success: Whether the action succeeded
message: Human-readable result message
damage_results: List of damage calculation results
effects_applied: List of effects applied to targets
combat_ended: Whether combat ended as a result
combat_status: Final combat status if ended
next_combatant_id: ID of combatant whose turn is next
turn_effects: Effects that triggered at turn start/end
"""
success: bool
message: str
damage_results: List[DamageResult] = field(default_factory=list)
effects_applied: List[Dict[str, Any]] = field(default_factory=list)
combat_ended: bool = False
combat_status: Optional[CombatStatus] = None
next_combatant_id: Optional[str] = None
turn_effects: List[Dict[str, Any]] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for API response."""
return {
"success": self.success,
"message": self.message,
"damage_results": [
{
"target_id": dr.target_id if hasattr(dr, 'target_id') else None,
"total_damage": dr.total_damage,
"is_critical": dr.is_critical,
"is_miss": dr.is_miss,
"message": dr.message,
}
for dr in self.damage_results
],
"effects_applied": self.effects_applied,
"combat_ended": self.combat_ended,
"combat_status": self.combat_status.value if self.combat_status else None,
"next_combatant_id": self.next_combatant_id,
"turn_effects": self.turn_effects,
}
@dataclass
class CombatRewards:
"""
Rewards distributed after combat victory.
Attributes:
experience: Total XP earned
gold: Total gold earned
items: List of item drops
level_ups: Character IDs that leveled up
"""
experience: int = 0
gold: int = 0
items: List[Dict[str, Any]] = field(default_factory=list)
level_ups: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"experience": self.experience,
"gold": self.gold,
"items": self.items,
"level_ups": self.level_ups,
}
# =============================================================================
# Combat Service Exceptions
# =============================================================================
class CombatError(Exception):
"""Base exception for combat errors."""
pass
class NotInCombatError(CombatError):
"""Raised when action requires combat but session is not in combat."""
pass
class AlreadyInCombatError(CombatError):
"""Raised when trying to start combat but already in combat."""
pass
class InvalidActionError(CombatError):
"""Raised when action is invalid (wrong turn, invalid target, etc.)."""
pass
class InsufficientResourceError(CombatError):
"""Raised when combatant lacks resources (mana, cooldown, etc.)."""
pass
# =============================================================================
# Combat Service
# =============================================================================
class CombatService:
"""
Orchestrates turn-based combat encounters.
This service manages:
- Combat initialization with players and enemies
- Turn-by-turn action execution
- Damage calculation via DamageCalculator
- Effect processing and state management
- Victory/defeat detection and reward distribution
"""
def __init__(self):
"""Initialize the combat service with dependencies."""
self.session_service = get_session_service()
self.character_service = get_character_service()
self.enemy_loader = get_enemy_loader()
self.ability_loader = AbilityLoader()
logger.info("CombatService initialized")
# =========================================================================
# Combat Lifecycle
# =========================================================================
def start_combat(
self,
session_id: str,
user_id: str,
enemy_ids: List[str],
) -> CombatEncounter:
"""
Start a new combat encounter.
Args:
session_id: Game session ID
user_id: User ID for authorization
enemy_ids: List of enemy template IDs to spawn
Returns:
Initialized CombatEncounter
Raises:
AlreadyInCombatError: If session is already in combat
ValueError: If enemy templates not found
"""
logger.info("Starting combat",
session_id=session_id,
enemy_count=len(enemy_ids))
# Get session
session = self.session_service.get_session(session_id, user_id)
# Check not already in combat
if session.is_in_combat():
raise AlreadyInCombatError("Session is already in combat")
# Create combatants from player character(s)
combatants = []
if session.is_solo():
# Solo session - single character
character = self.character_service.get_character(
session.solo_character_id,
user_id
)
player_combatant = self._create_combatant_from_character(character)
combatants.append(player_combatant)
else:
# Multiplayer - all party members
for char_id in session.party_member_ids:
# Note: In multiplayer, we'd need to handle different user_ids
# For now, assume all characters belong to session owner
character = self.character_service.get_character(char_id, user_id)
player_combatant = self._create_combatant_from_character(character)
combatants.append(player_combatant)
# Create combatants from enemies
for i, enemy_id in enumerate(enemy_ids):
enemy_template = self.enemy_loader.load_enemy(enemy_id)
if not enemy_template:
raise ValueError(f"Enemy template not found: {enemy_id}")
enemy_combatant = self._create_combatant_from_enemy(
enemy_template,
instance_index=i
)
combatants.append(enemy_combatant)
# Create encounter
encounter = CombatEncounter(
encounter_id=f"enc_{uuid4().hex[:12]}",
combatants=combatants,
)
# Initialize combat (roll initiative, set turn order)
encounter.initialize_combat()
# Store in session
session.start_combat(encounter)
self.session_service.update_session(session, user_id)
logger.info("Combat started",
session_id=session_id,
encounter_id=encounter.encounter_id,
player_count=len([c for c in combatants if c.is_player]),
enemy_count=len([c for c in combatants if not c.is_player]))
return encounter
def get_combat_state(
self,
session_id: str,
user_id: str
) -> Optional[CombatEncounter]:
"""
Get current combat state for a session.
Args:
session_id: Game session ID
user_id: User ID for authorization
Returns:
CombatEncounter if in combat, None otherwise
"""
session = self.session_service.get_session(session_id, user_id)
return session.combat_encounter
def end_combat(
self,
session_id: str,
user_id: str,
outcome: CombatStatus
) -> CombatRewards:
"""
End combat and distribute rewards if victorious.
Args:
session_id: Game session ID
user_id: User ID for authorization
outcome: Combat outcome (VICTORY, DEFEAT, FLED)
Returns:
CombatRewards containing XP, gold, items earned
"""
logger.info("Ending combat",
session_id=session_id,
outcome=outcome.value)
session = self.session_service.get_session(session_id, user_id)
if not session.is_in_combat():
raise NotInCombatError("Session is not in combat")
encounter = session.combat_encounter
encounter.status = outcome
# Calculate rewards if victory
rewards = CombatRewards()
if outcome == CombatStatus.VICTORY:
rewards = self._calculate_rewards(encounter, session, user_id)
# End combat on session
session.end_combat()
self.session_service.update_session(session, user_id)
logger.info("Combat ended",
session_id=session_id,
outcome=outcome.value,
xp_earned=rewards.experience,
gold_earned=rewards.gold)
return rewards
# =========================================================================
# Action Execution
# =========================================================================
def execute_action(
self,
session_id: str,
user_id: str,
combatant_id: str,
action: CombatAction
) -> ActionResult:
"""
Execute a combat action for a combatant.
Args:
session_id: Game session ID
user_id: User ID for authorization
combatant_id: ID of combatant taking action
action: Action to execute
Returns:
ActionResult with outcome details
Raises:
NotInCombatError: If session not in combat
InvalidActionError: If action is invalid
"""
logger.info("Executing action",
session_id=session_id,
combatant_id=combatant_id,
action_type=action.action_type)
session = self.session_service.get_session(session_id, user_id)
if not session.is_in_combat():
raise NotInCombatError("Session is not in combat")
encounter = session.combat_encounter
# Validate it's this combatant's turn
current = encounter.get_current_combatant()
if not current or current.combatant_id != combatant_id:
raise InvalidActionError(
f"Not {combatant_id}'s turn. Current turn: "
f"{current.combatant_id if current else 'None'}"
)
# Process start-of-turn effects
turn_effects = encounter.start_turn()
# Check if combatant is stunned
if current.is_stunned():
result = ActionResult(
success=False,
message=f"{current.name} is stunned and cannot act!",
turn_effects=[{"type": "stun", "combatant": current.name}],
)
self._advance_turn_and_save(encounter, session, user_id)
result.next_combatant_id = encounter.get_current_combatant().combatant_id
return result
# Execute based on action type
if action.action_type == "attack":
result = self._execute_attack(encounter, current, action.target_ids)
elif action.action_type == "ability":
result = self._execute_ability(
encounter, current, action.ability_id, action.target_ids
)
elif action.action_type == "defend":
result = self._execute_defend(encounter, current)
elif action.action_type == "flee":
result = self._execute_flee(encounter, current, session, user_id)
elif action.action_type == "item":
result = self._execute_use_item(
encounter, current, action.item_id, action.target_ids
)
else:
raise InvalidActionError(f"Unknown action type: {action.action_type}")
# Add turn effects
result.turn_effects = [
{"type": e.get("type", "effect"), "message": e.get("message", "")}
for e in turn_effects
]
# Check for combat end
status = encounter.check_end_condition()
if status != CombatStatus.ACTIVE:
result.combat_ended = True
result.combat_status = status
# Auto-end combat and distribute rewards
if status == CombatStatus.VICTORY:
rewards = self._calculate_rewards(encounter, session, user_id)
result.message += f" Victory! Earned {rewards.experience} XP and {rewards.gold} gold."
session.end_combat()
else:
# Advance turn
self._advance_turn_and_save(encounter, session, user_id)
next_combatant = encounter.get_current_combatant()
result.next_combatant_id = next_combatant.combatant_id if next_combatant else None
# Save session state
self.session_service.update_session(session, user_id)
return result
def execute_enemy_turn(
self,
session_id: str,
user_id: str
) -> ActionResult:
"""
Execute an enemy's turn using basic AI.
Args:
session_id: Game session ID
user_id: User ID for authorization
Returns:
ActionResult with enemy action outcome
"""
session = self.session_service.get_session(session_id, user_id)
if not session.is_in_combat():
raise NotInCombatError("Session is not in combat")
encounter = session.combat_encounter
current = encounter.get_current_combatant()
if not current:
raise InvalidActionError("No current combatant")
if current.is_player:
raise InvalidActionError("Current combatant is a player, not an enemy")
# Process start-of-turn effects
turn_effects = encounter.start_turn()
# Check if stunned
if current.is_stunned():
result = ActionResult(
success=False,
message=f"{current.name} is stunned and cannot act!",
turn_effects=[{"type": "stun", "combatant": current.name}],
)
self._advance_turn_and_save(encounter, session, user_id)
result.next_combatant_id = encounter.get_current_combatant().combatant_id
return result
# Enemy AI: Choose action
action, targets = self._choose_enemy_action(encounter, current)
# Execute chosen action
if action == "ability" and current.abilities:
# Try to use an ability
ability_id = self._choose_enemy_ability(current)
if ability_id:
result = self._execute_ability(
encounter, current, ability_id, targets
)
else:
# Fallback to basic attack
result = self._execute_attack(encounter, current, targets)
else:
# Basic attack
result = self._execute_attack(encounter, current, targets)
# Add turn effects
result.turn_effects = [
{"type": e.get("type", "effect"), "message": e.get("message", "")}
for e in turn_effects
]
# Check for combat end
status = encounter.check_end_condition()
if status != CombatStatus.ACTIVE:
result.combat_ended = True
result.combat_status = status
session.end_combat()
else:
self._advance_turn_and_save(encounter, session, user_id)
next_combatant = encounter.get_current_combatant()
result.next_combatant_id = next_combatant.combatant_id if next_combatant else None
self.session_service.update_session(session, user_id)
return result
# =========================================================================
# Specific Action Implementations
# =========================================================================
def _execute_attack(
self,
encounter: CombatEncounter,
attacker: Combatant,
target_ids: List[str]
) -> ActionResult:
"""Execute a basic attack action."""
if not target_ids:
# Auto-target: first alive enemy/player
target = self._get_default_target(encounter, attacker)
if not target:
return ActionResult(
success=False,
message="No valid targets available"
)
target_ids = [target.combatant_id]
target = encounter.get_combatant(target_ids[0])
if not target or target.is_dead():
return ActionResult(
success=False,
message="Invalid or dead target"
)
# Check if this is an elemental weapon attack
if attacker.elemental_ratio > 0.0 and attacker.elemental_damage_type:
# Elemental weapon: split damage between physical and elemental
damage_result = DamageCalculator.calculate_elemental_weapon_damage(
attacker_stats=attacker.stats,
defender_stats=target.stats,
weapon_crit_chance=attacker.weapon_crit_chance,
weapon_crit_multiplier=attacker.weapon_crit_multiplier,
physical_ratio=attacker.physical_ratio,
elemental_ratio=attacker.elemental_ratio,
elemental_type=attacker.elemental_damage_type,
)
else:
# Normal physical attack
damage_result = DamageCalculator.calculate_physical_damage(
attacker_stats=attacker.stats,
defender_stats=target.stats,
weapon_crit_chance=attacker.weapon_crit_chance,
weapon_crit_multiplier=attacker.weapon_crit_multiplier,
)
# Add target_id to result for tracking
damage_result.target_id = target.combatant_id
# Apply damage
if not damage_result.is_miss:
actual_damage = target.take_damage(damage_result.total_damage)
message = f"{attacker.name} attacks {target.name}! {damage_result.message}"
if actual_damage < damage_result.total_damage:
message += f" (Shield absorbed {damage_result.total_damage - actual_damage})"
else:
message = f"{attacker.name} attacks {target.name}! {damage_result.message}"
# Log action
encounter.log_action(
"attack",
attacker.combatant_id,
message,
{"damage": damage_result.total_damage, "target": target.combatant_id}
)
return ActionResult(
success=True,
message=message,
damage_results=[damage_result],
)
def _execute_ability(
self,
encounter: CombatEncounter,
caster: Combatant,
ability_id: str,
target_ids: List[str]
) -> ActionResult:
"""Execute an ability action."""
# Load ability
ability = self.ability_loader.load_ability(ability_id)
if not ability:
return ActionResult(
success=False,
message=f"Unknown ability: {ability_id}"
)
# Check if ability is available
if not caster.can_use_ability(ability_id, ability):
if caster.current_mp < ability.mana_cost:
return ActionResult(
success=False,
message=f"Not enough mana ({caster.current_mp}/{ability.mana_cost})"
)
if ability_id in caster.cooldowns and caster.cooldowns[ability_id] > 0:
return ActionResult(
success=False,
message=f"Ability on cooldown ({caster.cooldowns[ability_id]} turns)"
)
return ActionResult(
success=False,
message=f"Cannot use ability: {ability.name}"
)
# Determine targets
if ability.is_aoe:
# AoE: Target all enemies (or allies for heals)
if ability.ability_type in [AbilityType.HEAL, AbilityType.BUFF]:
targets = [c for c in encounter.combatants
if c.is_player == caster.is_player and c.is_alive()]
else:
targets = [c for c in encounter.combatants
if c.is_player != caster.is_player and c.is_alive()]
# Limit by target_count if specified
if ability.target_count > 0:
targets = targets[:ability.target_count]
else:
# Single target
if not target_ids:
target = self._get_default_target(encounter, caster)
if not target:
return ActionResult(success=False, message="No valid targets")
targets = [target]
else:
targets = [encounter.get_combatant(tid) for tid in target_ids]
targets = [t for t in targets if t and t.is_alive()]
if not targets:
return ActionResult(success=False, message="No valid targets")
# Apply mana cost and cooldown
caster.use_ability_cost(ability, ability_id)
# Execute ability based on type
damage_results = []
effects_applied = []
messages = []
for target in targets:
if ability.ability_type in [AbilityType.ATTACK, AbilityType.SPELL]:
# Damage ability
damage_result = DamageCalculator.calculate_magical_damage(
attacker_stats=caster.stats,
defender_stats=target.stats,
ability_base_power=ability.calculate_power(caster.stats),
weapon_crit_chance=self._get_crit_chance(caster),
)
damage_result.target_id = target.combatant_id
if not damage_result.is_miss:
target.take_damage(damage_result.total_damage)
messages.append(
f"{target.name} takes {damage_result.total_damage} damage"
)
else:
messages.append(f"{target.name} dodges!")
damage_results.append(damage_result)
elif ability.ability_type == AbilityType.HEAL:
# Healing ability
heal_amount = ability.calculate_power(caster.stats)
actual_heal = target.heal(heal_amount)
messages.append(f"{target.name} heals for {actual_heal}")
# Apply effects
for effect in ability.get_effects_to_apply():
target.add_effect(effect)
effects_applied.append({
"target": target.combatant_id,
"effect": effect.name,
"duration": effect.duration,
})
messages.append(f"{target.name} is affected by {effect.name}")
message = f"{caster.name} uses {ability.name}! " + "; ".join(messages)
# Log action
encounter.log_action(
"ability",
caster.combatant_id,
message,
{
"ability": ability_id,
"targets": [t.combatant_id for t in targets],
"mana_cost": ability.mana_cost,
}
)
return ActionResult(
success=True,
message=message,
damage_results=damage_results,
effects_applied=effects_applied,
)
def _execute_defend(
self,
encounter: CombatEncounter,
combatant: Combatant
) -> ActionResult:
"""Execute a defend action (increases defense for one turn)."""
# Add a temporary defense buff
defense_buff = Effect(
effect_id=f"defend_{uuid4().hex[:8]}",
name="Defending",
effect_type=EffectType.BUFF,
duration=1,
power=5, # +5 defense
stat_affected="constitution",
source="defend_action",
)
combatant.add_effect(defense_buff)
message = f"{combatant.name} takes a defensive stance! (+5 Defense)"
encounter.log_action(
"defend",
combatant.combatant_id,
message,
{}
)
return ActionResult(
success=True,
message=message,
effects_applied=[{
"target": combatant.combatant_id,
"effect": "Defending",
"duration": 1,
}],
)
def _execute_flee(
self,
encounter: CombatEncounter,
combatant: Combatant,
session,
user_id: str
) -> ActionResult:
"""Execute a flee action."""
# Calculate flee chance (base 50%, modified by DEX vs enemy DEX)
base_flee_chance = 0.50
# Get average enemy DEX
enemies = [c for c in encounter.combatants if not c.is_player and c.is_alive()]
if enemies:
avg_enemy_dex = sum(e.stats.dexterity for e in enemies) / len(enemies)
dex_modifier = (combatant.stats.dexterity - avg_enemy_dex) * 0.02
flee_chance = max(0.10, min(0.90, base_flee_chance + dex_modifier))
else:
flee_chance = 1.0 # No enemies, always succeed
# Roll for flee
roll = random.random()
success = roll < flee_chance
if success:
encounter.status = CombatStatus.FLED
encounter.log_action(
"flee",
combatant.combatant_id,
f"{combatant.name} successfully flees from combat!",
{"roll": roll, "chance": flee_chance}
)
return ActionResult(
success=True,
message=f"{combatant.name} successfully flees from combat!",
combat_ended=True,
combat_status=CombatStatus.FLED,
)
else:
encounter.log_action(
"flee_failed",
combatant.combatant_id,
f"{combatant.name} fails to flee!",
{"roll": roll, "chance": flee_chance}
)
return ActionResult(
success=False,
message=f"{combatant.name} fails to flee! (Roll: {roll:.2f}, Needed: {flee_chance:.2f})",
)
def _execute_use_item(
self,
encounter: CombatEncounter,
combatant: Combatant,
item_id: str,
target_ids: List[str]
) -> ActionResult:
"""Execute an item use action."""
# TODO: Implement item usage from inventory
# For now, return not implemented
return ActionResult(
success=False,
message="Item usage not yet implemented"
)
# =========================================================================
# Enemy AI
# =========================================================================
def _choose_enemy_action(
self,
encounter: CombatEncounter,
enemy: Combatant
) -> Tuple[str, List[str]]:
"""
Choose an action for an enemy combatant.
Returns:
Tuple of (action_type, target_ids)
"""
# Get valid player targets
players = [c for c in encounter.combatants if c.is_player and c.is_alive()]
if not players:
return ("attack", [])
# Simple AI: Attack lowest HP player
target = min(players, key=lambda p: p.current_hp)
# 30% chance to use ability if available
if enemy.abilities and enemy.current_mp > 0 and random.random() < 0.3:
return ("ability", [target.combatant_id])
return ("attack", [target.combatant_id])
def _choose_enemy_ability(self, enemy: Combatant) -> Optional[str]:
"""Choose an ability for an enemy to use."""
for ability_id in enemy.abilities:
ability = self.ability_loader.load_ability(ability_id)
if ability and enemy.can_use_ability(ability_id, ability):
return ability_id
return None
# =========================================================================
# Rewards
# =========================================================================
def _calculate_rewards(
self,
encounter: CombatEncounter,
session,
user_id: str
) -> CombatRewards:
"""
Calculate and distribute rewards after victory.
Args:
encounter: Completed combat encounter
session: Game session
user_id: User ID for character updates
Returns:
CombatRewards with totals
"""
rewards = CombatRewards()
# Sum up rewards from defeated enemies
for combatant in encounter.combatants:
if not combatant.is_player and combatant.is_dead():
# Get enemy template for rewards
enemy_id = combatant.combatant_id.split("_")[0] # Extract base ID
enemy = self.enemy_loader.load_enemy(enemy_id)
if enemy:
rewards.experience += enemy.experience_reward
rewards.gold += enemy.get_gold_reward()
# Roll for loot
loot = enemy.roll_loot()
rewards.items.extend(loot)
# Distribute rewards to player characters
player_combatants = [c for c in encounter.combatants if c.is_player]
xp_per_player = rewards.experience // max(1, len(player_combatants))
gold_per_player = rewards.gold // max(1, len(player_combatants))
for player in player_combatants:
if session.is_solo():
char_id = session.solo_character_id
else:
char_id = player.combatant_id
try:
character = self.character_service.get_character(char_id, user_id)
# Add XP and check for level up
old_level = character.level
character.experience += xp_per_player
# TODO: Add level up logic based on XP thresholds
# Add gold
character.gold += gold_per_player
# Save character
self.character_service.update_character(character, user_id)
if character.level > old_level:
rewards.level_ups.append(char_id)
except Exception as e:
logger.error("Failed to distribute rewards to character",
char_id=char_id,
error=str(e))
logger.info("Rewards distributed",
total_xp=rewards.experience,
total_gold=rewards.gold,
items=len(rewards.items),
level_ups=rewards.level_ups)
return rewards
# =========================================================================
# Helper Methods
# =========================================================================
def _create_combatant_from_character(
self,
character: Character
) -> Combatant:
"""Create a Combatant from a player Character."""
effective_stats = character.get_effective_stats()
# Get abilities from unlocked skills
abilities = ["basic_attack"] # All characters have basic attack
abilities.extend(character.unlocked_skills)
# Extract weapon properties from equipped weapon
weapon = character.equipped.get("weapon")
weapon_crit_chance = 0.05
weapon_crit_multiplier = 2.0
weapon_damage_type = DamageType.PHYSICAL
elemental_damage_type = None
physical_ratio = 1.0
elemental_ratio = 0.0
if weapon and weapon.is_weapon():
weapon_crit_chance = weapon.crit_chance
weapon_crit_multiplier = weapon.crit_multiplier
weapon_damage_type = weapon.damage_type or DamageType.PHYSICAL
if weapon.is_elemental_weapon():
elemental_damage_type = weapon.elemental_damage_type
physical_ratio = weapon.physical_ratio
elemental_ratio = weapon.elemental_ratio
return Combatant(
combatant_id=character.character_id,
name=character.name,
is_player=True,
current_hp=effective_stats.hit_points,
max_hp=effective_stats.hit_points,
current_mp=effective_stats.mana_points,
max_mp=effective_stats.mana_points,
stats=effective_stats,
abilities=abilities,
weapon_crit_chance=weapon_crit_chance,
weapon_crit_multiplier=weapon_crit_multiplier,
weapon_damage_type=weapon_damage_type,
elemental_damage_type=elemental_damage_type,
physical_ratio=physical_ratio,
elemental_ratio=elemental_ratio,
)
def _create_combatant_from_enemy(
self,
template: EnemyTemplate,
instance_index: int = 0
) -> Combatant:
"""Create a Combatant from an EnemyTemplate."""
# Create unique ID for this instance
combatant_id = f"{template.enemy_id}_{instance_index}"
# Add variation to name if multiple of same type
name = template.name
if instance_index > 0:
name = f"{template.name} #{instance_index + 1}"
# Copy stats and populate damage_bonus with base_damage
stats = template.base_stats.copy()
stats.damage_bonus = template.base_damage
return Combatant(
combatant_id=combatant_id,
name=name,
is_player=False,
current_hp=stats.hit_points,
max_hp=stats.hit_points,
current_mp=stats.mana_points,
max_mp=stats.mana_points,
stats=stats,
abilities=template.abilities.copy(),
weapon_crit_chance=template.crit_chance,
weapon_crit_multiplier=2.0,
weapon_damage_type=DamageType.PHYSICAL,
)
def _get_crit_chance(self, combatant: Combatant) -> float:
"""Get critical hit chance for a combatant."""
# Weapon crit chance + LUK bonus
return combatant.weapon_crit_chance + combatant.stats.crit_bonus
def _get_default_target(
self,
encounter: CombatEncounter,
attacker: Combatant
) -> Optional[Combatant]:
"""Get a default target for an attacker."""
# Target opposite side, first alive
for combatant in encounter.combatants:
if combatant.is_player != attacker.is_player and combatant.is_alive():
return combatant
return None
def _advance_turn_and_save(
self,
encounter: CombatEncounter,
session,
user_id: str
) -> None:
"""Advance the turn and save session state."""
encounter.advance_turn()
# =============================================================================
# Global Instance
# =============================================================================
_service_instance: Optional[CombatService] = None
def get_combat_service() -> CombatService:
"""
Get the global CombatService instance.
Returns:
Singleton CombatService instance
"""
global _service_instance
if _service_instance is None:
_service_instance = CombatService()
return _service_instance