""" Combat Service - Orchestrates turn-based combat encounters. This service manages the full combat lifecycle including: - Starting combat with characters and enemies - Executing player and enemy actions - Processing damage, effects, and state changes - Distributing rewards on victory """ import random from dataclasses import dataclass, field from typing import Dict, Any, List, Optional, Tuple from uuid import uuid4 from app.models.combat import Combatant, CombatEncounter from app.models.character import Character from app.models.enemy import EnemyTemplate, EnemyDifficulty from app.models.stats import Stats from app.models.abilities import Ability, AbilityLoader from app.models.effects import Effect from app.models.items import Item from app.models.enums import CombatStatus, AbilityType, DamageType, EffectType from app.services.damage_calculator import DamageCalculator, DamageResult from app.services.enemy_loader import EnemyLoader, get_enemy_loader from app.services.session_service import get_session_service from app.services.character_service import get_character_service from app.services.combat_loot_service import ( get_combat_loot_service, CombatLootService, LootContext ) from app.services.combat_repository import ( get_combat_repository, CombatRepository ) from app.utils.logging import get_logger logger = get_logger(__file__) # ============================================================================= # Supporting Dataclasses # ============================================================================= @dataclass class CombatAction: """ Represents a combat action to be executed. Attributes: action_type: Type of action (attack, ability, defend, flee, item) target_ids: List of target combatant IDs ability_id: Ability to use (for ability actions) item_id: Item to use (for item actions) """ action_type: str target_ids: List[str] = field(default_factory=list) ability_id: Optional[str] = None item_id: Optional[str] = None @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'CombatAction': """Create CombatAction from dictionary.""" return cls( action_type=data["action_type"], target_ids=data.get("target_ids", []), ability_id=data.get("ability_id"), item_id=data.get("item_id"), ) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "action_type": self.action_type, "target_ids": self.target_ids, "ability_id": self.ability_id, "item_id": self.item_id, } @dataclass class ActionResult: """ Result of executing a combat action. Attributes: success: Whether the action succeeded message: Human-readable result message damage_results: List of damage calculation results effects_applied: List of effects applied to targets combat_ended: Whether combat ended as a result combat_status: Final combat status if ended next_combatant_id: ID of combatant whose turn is next turn_effects: Effects that triggered at turn start/end """ success: bool message: str damage_results: List[DamageResult] = field(default_factory=list) effects_applied: List[Dict[str, Any]] = field(default_factory=list) combat_ended: bool = False combat_status: Optional[CombatStatus] = None next_combatant_id: Optional[str] = None next_is_player: bool = True # True if next turn is player's turn_effects: List[Dict[str, Any]] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for API response.""" return { "success": self.success, "message": self.message, "damage_results": [ { "target_id": dr.target_id if hasattr(dr, 'target_id') else None, "total_damage": dr.total_damage, "is_critical": dr.is_critical, "is_miss": dr.is_miss, "message": dr.message, } for dr in self.damage_results ], "effects_applied": self.effects_applied, "combat_ended": self.combat_ended, "combat_status": self.combat_status.value if self.combat_status else None, "next_combatant_id": self.next_combatant_id, "next_is_player": self.next_is_player, "turn_effects": self.turn_effects, } @dataclass class CombatRewards: """ Rewards distributed after combat victory. Attributes: experience: Total XP earned gold: Total gold earned items: List of item drops level_ups: Character IDs that leveled up """ experience: int = 0 gold: int = 0 items: List[Dict[str, Any]] = field(default_factory=list) level_ups: List[str] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "experience": self.experience, "gold": self.gold, "items": self.items, "level_ups": self.level_ups, } # ============================================================================= # Combat Service Exceptions # ============================================================================= class CombatError(Exception): """Base exception for combat errors.""" pass class NotInCombatError(CombatError): """Raised when action requires combat but session is not in combat.""" pass class AlreadyInCombatError(CombatError): """Raised when trying to start combat but already in combat.""" pass class InvalidActionError(CombatError): """Raised when action is invalid (wrong turn, invalid target, etc.).""" pass class InsufficientResourceError(CombatError): """Raised when combatant lacks resources (mana, cooldown, etc.).""" pass # ============================================================================= # Combat Service # ============================================================================= class CombatService: """ Orchestrates turn-based combat encounters. This service manages: - Combat initialization with players and enemies - Turn-by-turn action execution - Damage calculation via DamageCalculator - Effect processing and state management - Victory/defeat detection and reward distribution """ def __init__(self): """Initialize the combat service with dependencies.""" self.session_service = get_session_service() self.character_service = get_character_service() self.enemy_loader = get_enemy_loader() self.ability_loader = AbilityLoader() self.loot_service = get_combat_loot_service() self.combat_repository = get_combat_repository() logger.info("CombatService initialized") # ========================================================================= # Combat Lifecycle # ========================================================================= def start_combat( self, session_id: str, user_id: str, enemy_ids: List[str], ) -> CombatEncounter: """ Start a new combat encounter. Args: session_id: Game session ID user_id: User ID for authorization enemy_ids: List of enemy template IDs to spawn Returns: Initialized CombatEncounter Raises: AlreadyInCombatError: If session is already in combat ValueError: If enemy templates not found """ logger.info("Starting combat", session_id=session_id, enemy_count=len(enemy_ids)) # Get session session = self.session_service.get_session(session_id, user_id) # Check not already in combat if session.is_in_combat(): raise AlreadyInCombatError("Session is already in combat") # Create combatants from player character(s) combatants = [] if session.is_solo(): # Solo session - single character character = self.character_service.get_character( session.solo_character_id, user_id ) player_combatant = self._create_combatant_from_character(character) combatants.append(player_combatant) else: # Multiplayer - all party members for char_id in session.party_member_ids: # Note: In multiplayer, we'd need to handle different user_ids # For now, assume all characters belong to session owner character = self.character_service.get_character(char_id, user_id) player_combatant = self._create_combatant_from_character(character) combatants.append(player_combatant) # Create combatants from enemies for i, enemy_id in enumerate(enemy_ids): enemy_template = self.enemy_loader.load_enemy(enemy_id) if not enemy_template: raise ValueError(f"Enemy template not found: {enemy_id}") enemy_combatant = self._create_combatant_from_enemy( enemy_template, instance_index=i ) combatants.append(enemy_combatant) # Create encounter encounter = CombatEncounter( encounter_id=f"enc_{uuid4().hex[:12]}", combatants=combatants, ) # Initialize combat (roll initiative, set turn order) encounter.initialize_combat() # Save encounter to dedicated table self.combat_repository.create_encounter( encounter=encounter, session_id=session_id, user_id=user_id ) # Update session with reference to encounter (not inline data) session.active_combat_encounter_id = encounter.encounter_id session.combat_encounter = None # Clear legacy inline storage session.update_activity() self.session_service.update_session(session) logger.info("Combat started", session_id=session_id, encounter_id=encounter.encounter_id, player_count=len([c for c in combatants if c.is_player]), enemy_count=len([c for c in combatants if not c.is_player])) return encounter def get_combat_state( self, session_id: str, user_id: str ) -> Optional[CombatEncounter]: """ Get current combat state for a session. Uses the new database-backed storage, with fallback to legacy inline session storage for backward compatibility. Args: session_id: Game session ID user_id: User ID for authorization Returns: CombatEncounter if in combat, None otherwise """ session = self.session_service.get_session(session_id, user_id) # New system: Check for reference to combat_encounters table if session.active_combat_encounter_id: encounter = self.combat_repository.get_encounter( session.active_combat_encounter_id ) if encounter: return encounter # Reference exists but encounter not found - clear stale reference logger.warning("Stale combat encounter reference, clearing", session_id=session_id, encounter_id=session.active_combat_encounter_id) session.active_combat_encounter_id = None self.session_service.update_session(session) return None # Legacy fallback: Check inline combat data and migrate if present if session.combat_encounter: return self._migrate_inline_encounter(session, user_id) return None def _migrate_inline_encounter( self, session, user_id: str ) -> CombatEncounter: """ Migrate legacy inline combat encounter to database table. This provides backward compatibility by automatically migrating existing inline combat data to the new database-backed system on first access. Args: session: GameSession with inline combat_encounter user_id: User ID Returns: The migrated CombatEncounter """ encounter = session.combat_encounter logger.info("Migrating inline combat encounter to database", session_id=session.session_id, encounter_id=encounter.encounter_id) # Save to repository self.combat_repository.create_encounter( encounter=encounter, session_id=session.session_id, user_id=user_id ) # Update session to use reference session.active_combat_encounter_id = encounter.encounter_id session.combat_encounter = None # Clear inline data self.session_service.update_session(session) return encounter def end_combat( self, session_id: str, user_id: str, outcome: CombatStatus ) -> CombatRewards: """ End combat and distribute rewards if victorious. Args: session_id: Game session ID user_id: User ID for authorization outcome: Combat outcome (VICTORY, DEFEAT, FLED) Returns: CombatRewards containing XP, gold, items earned """ logger.info("Ending combat", session_id=session_id, outcome=outcome.value) session = self.session_service.get_session(session_id, user_id) if not session.is_in_combat(): raise NotInCombatError("Session is not in combat") # Get encounter from repository (or legacy inline) encounter = self.get_combat_state(session_id, user_id) if not encounter: raise NotInCombatError("Combat encounter not found") encounter.status = outcome # Calculate rewards if victory rewards = CombatRewards() if outcome == CombatStatus.VICTORY: rewards = self._calculate_rewards(encounter, session, user_id) # End encounter in repository if session.active_combat_encounter_id: self.combat_repository.end_encounter( encounter_id=session.active_combat_encounter_id, status=outcome ) # Clear session combat reference session.active_combat_encounter_id = None session.combat_encounter = None # Also clear legacy field session.update_activity() self.session_service.update_session(session) logger.info("Combat ended", session_id=session_id, encounter_id=encounter.encounter_id, outcome=outcome.value, xp_earned=rewards.experience, gold_earned=rewards.gold) return rewards # ========================================================================= # Action Execution # ========================================================================= def execute_action( self, session_id: str, user_id: str, combatant_id: str, action: CombatAction ) -> ActionResult: """ Execute a combat action for a combatant. Args: session_id: Game session ID user_id: User ID for authorization combatant_id: ID of combatant taking action action: Action to execute Returns: ActionResult with outcome details Raises: NotInCombatError: If session not in combat InvalidActionError: If action is invalid """ logger.info("Executing action", session_id=session_id, combatant_id=combatant_id, action_type=action.action_type) session = self.session_service.get_session(session_id, user_id) if not session.is_in_combat(): raise NotInCombatError("Session is not in combat") # Get encounter from repository encounter = self.get_combat_state(session_id, user_id) if not encounter: raise NotInCombatError("Combat encounter not found") # Validate it's this combatant's turn current = encounter.get_current_combatant() if not current or current.combatant_id != combatant_id: raise InvalidActionError( f"Not {combatant_id}'s turn. Current turn: " f"{current.combatant_id if current else 'None'}" ) # Process start-of-turn effects turn_effects = encounter.start_turn() # Check if combatant is stunned if current.is_stunned(): result = ActionResult( success=False, message=f"{current.name} is stunned and cannot act!", turn_effects=[{"type": "stun", "combatant": current.name}], ) self._advance_turn_and_save(encounter, session, user_id) result.next_combatant_id = encounter.get_current_combatant().combatant_id return result # Execute based on action type if action.action_type == "attack": result = self._execute_attack(encounter, current, action.target_ids) elif action.action_type == "ability": result = self._execute_ability( encounter, current, action.ability_id, action.target_ids ) elif action.action_type == "defend": result = self._execute_defend(encounter, current) elif action.action_type == "flee": result = self._execute_flee(encounter, current, session, user_id) elif action.action_type == "item": result = self._execute_use_item( encounter, current, action.item_id, action.target_ids ) else: raise InvalidActionError(f"Unknown action type: {action.action_type}") # Add turn effects result.turn_effects = [ {"type": e.get("type", "effect"), "message": e.get("message", "")} for e in turn_effects ] # Check for combat end status = encounter.check_end_condition() if status != CombatStatus.ACTIVE: result.combat_ended = True result.combat_status = status # Auto-end combat and distribute rewards if status == CombatStatus.VICTORY: rewards = self._calculate_rewards(encounter, session, user_id) result.message += f" Victory! Earned {rewards.experience} XP and {rewards.gold} gold." # End encounter in repository if session.active_combat_encounter_id: self.combat_repository.end_encounter( encounter_id=session.active_combat_encounter_id, status=status ) # Clear session combat reference session.active_combat_encounter_id = None session.combat_encounter = None else: # Advance turn and save to repository self._advance_turn_and_save(encounter, session, user_id) next_combatant = encounter.get_current_combatant() if next_combatant: result.next_combatant_id = next_combatant.combatant_id result.next_is_player = next_combatant.is_player else: result.next_combatant_id = None result.next_is_player = True # Save session state self.session_service.update_session(session) return result def execute_enemy_turn( self, session_id: str, user_id: str ) -> ActionResult: """ Execute an enemy's turn using basic AI. Args: session_id: Game session ID user_id: User ID for authorization Returns: ActionResult with enemy action outcome """ session = self.session_service.get_session(session_id, user_id) if not session.is_in_combat(): raise NotInCombatError("Session is not in combat") # Get encounter from repository encounter = self.get_combat_state(session_id, user_id) if not encounter: raise NotInCombatError("Combat encounter not found") current = encounter.get_current_combatant() if not current: raise InvalidActionError("No current combatant") if current.is_player: raise InvalidActionError("Current combatant is a player, not an enemy") # Check if the enemy is dead (shouldn't happen with fixed advance_turn, but defensive) if current.is_dead(): # Skip this dead enemy's turn and advance self._advance_turn_and_save(encounter, session, user_id) next_combatant = encounter.get_current_combatant() return ActionResult( success=True, message=f"{current.name} is defeated and cannot act.", next_combatant_id=next_combatant.combatant_id if next_combatant else None, next_is_player=next_combatant.is_player if next_combatant else True, ) # Process start-of-turn effects turn_effects = encounter.start_turn() # Check if enemy died from DoT effects at turn start if current.is_dead(): # Check if combat ended combat_status = encounter.check_end_condition() if combat_status in [CombatStatus.VICTORY, CombatStatus.DEFEAT]: encounter.status = combat_status result = ActionResult( success=True, message=f"{current.name} was defeated by damage over time!", combat_ended=True, combat_status=combat_status, turn_effects=turn_effects, ) if session.active_combat_encounter_id: self.combat_repository.end_encounter( encounter_id=session.active_combat_encounter_id, status=combat_status ) session.active_combat_encounter_id = None session.combat_encounter = None self.session_service.update_session(session) return result else: # Advance past the dead enemy self._advance_turn_and_save(encounter, session, user_id) next_combatant = encounter.get_current_combatant() return ActionResult( success=True, message=f"{current.name} was defeated by damage over time!", next_combatant_id=next_combatant.combatant_id if next_combatant else None, next_is_player=next_combatant.is_player if next_combatant else True, turn_effects=turn_effects, ) # Check if stunned if current.is_stunned(): result = ActionResult( success=False, message=f"{current.name} is stunned and cannot act!", turn_effects=[{"type": "stun", "combatant": current.name}], ) self._advance_turn_and_save(encounter, session, user_id) result.next_combatant_id = encounter.get_current_combatant().combatant_id return result # Enemy AI: Choose action action, targets = self._choose_enemy_action(encounter, current) # Execute chosen action if action == "ability" and current.abilities: # Try to use an ability ability_id = self._choose_enemy_ability(current) if ability_id: result = self._execute_ability( encounter, current, ability_id, targets ) else: # Fallback to basic attack result = self._execute_attack(encounter, current, targets) else: # Basic attack result = self._execute_attack(encounter, current, targets) # Add turn effects result.turn_effects = [ {"type": e.get("type", "effect"), "message": e.get("message", "")} for e in turn_effects ] # Check for combat end status = encounter.check_end_condition() if status != CombatStatus.ACTIVE: result.combat_ended = True result.combat_status = status # End encounter in repository if session.active_combat_encounter_id: self.combat_repository.end_encounter( encounter_id=session.active_combat_encounter_id, status=status ) # Clear session combat reference session.active_combat_encounter_id = None session.combat_encounter = None else: logger.info("Combat still active, advancing turn", session_id=session_id, encounter_id=encounter.encounter_id) self._advance_turn_and_save(encounter, session, user_id) next_combatant = encounter.get_current_combatant() logger.info("Next combatant determined", next_combatant_id=next_combatant.combatant_id if next_combatant else None, next_is_player=next_combatant.is_player if next_combatant else None) if next_combatant: result.next_combatant_id = next_combatant.combatant_id result.next_is_player = next_combatant.is_player else: result.next_combatant_id = None result.next_is_player = True self.session_service.update_session(session) logger.info("Enemy turn complete, returning result", session_id=session_id, next_combatant_id=result.next_combatant_id, next_is_player=result.next_is_player) return result # ========================================================================= # Specific Action Implementations # ========================================================================= def _execute_attack( self, encounter: CombatEncounter, attacker: Combatant, target_ids: List[str] ) -> ActionResult: """Execute a basic attack action.""" if not target_ids: # Auto-target: first alive enemy/player target = self._get_default_target(encounter, attacker) if not target: return ActionResult( success=False, message="No valid targets available" ) target_ids = [target.combatant_id] target = encounter.get_combatant(target_ids[0]) if not target or target.is_dead(): return ActionResult( success=False, message="Invalid or dead target" ) # Check if this is an elemental weapon attack if attacker.elemental_ratio > 0.0 and attacker.elemental_damage_type: # Elemental weapon: split damage between physical and elemental damage_result = DamageCalculator.calculate_elemental_weapon_damage( attacker_stats=attacker.stats, defender_stats=target.stats, weapon_crit_chance=attacker.weapon_crit_chance, weapon_crit_multiplier=attacker.weapon_crit_multiplier, physical_ratio=attacker.physical_ratio, elemental_ratio=attacker.elemental_ratio, elemental_type=attacker.elemental_damage_type, ) else: # Normal physical attack damage_result = DamageCalculator.calculate_physical_damage( attacker_stats=attacker.stats, defender_stats=target.stats, weapon_crit_chance=attacker.weapon_crit_chance, weapon_crit_multiplier=attacker.weapon_crit_multiplier, ) # Add target_id to result for tracking damage_result.target_id = target.combatant_id # Apply damage if not damage_result.is_miss: actual_damage = target.take_damage(damage_result.total_damage) message = f"{attacker.name} attacks {target.name}! {damage_result.message}" if actual_damage < damage_result.total_damage: message += f" (Shield absorbed {damage_result.total_damage - actual_damage})" else: message = f"{attacker.name} attacks {target.name}! {damage_result.message}" # Log action encounter.log_action( "attack", attacker.combatant_id, message, {"damage": damage_result.total_damage, "target": target.combatant_id} ) return ActionResult( success=True, message=message, damage_results=[damage_result], ) def _execute_ability( self, encounter: CombatEncounter, caster: Combatant, ability_id: str, target_ids: List[str] ) -> ActionResult: """Execute an ability action.""" # Load ability ability = self.ability_loader.load_ability(ability_id) if not ability: return ActionResult( success=False, message=f"Unknown ability: {ability_id}" ) # Check if ability is available if not caster.can_use_ability(ability_id, ability): if caster.current_mp < ability.mana_cost: return ActionResult( success=False, message=f"Not enough mana ({caster.current_mp}/{ability.mana_cost})" ) if ability_id in caster.cooldowns and caster.cooldowns[ability_id] > 0: return ActionResult( success=False, message=f"Ability on cooldown ({caster.cooldowns[ability_id]} turns)" ) return ActionResult( success=False, message=f"Cannot use ability: {ability.name}" ) # Determine targets if ability.is_aoe: # AoE: Target all enemies (or allies for heals) if ability.ability_type in [AbilityType.HEAL, AbilityType.BUFF]: targets = [c for c in encounter.combatants if c.is_player == caster.is_player and c.is_alive()] else: targets = [c for c in encounter.combatants if c.is_player != caster.is_player and c.is_alive()] # Limit by target_count if specified if ability.target_count > 0: targets = targets[:ability.target_count] else: # Single target if not target_ids: target = self._get_default_target(encounter, caster) if not target: return ActionResult(success=False, message="No valid targets") targets = [target] else: targets = [encounter.get_combatant(tid) for tid in target_ids] targets = [t for t in targets if t and t.is_alive()] if not targets: return ActionResult(success=False, message="No valid targets") # Apply mana cost and cooldown caster.use_ability_cost(ability, ability_id) # Execute ability based on type damage_results = [] effects_applied = [] messages = [] for target in targets: if ability.ability_type in [AbilityType.ATTACK, AbilityType.SPELL]: # Damage ability damage_result = DamageCalculator.calculate_magical_damage( attacker_stats=caster.stats, defender_stats=target.stats, ability_base_power=ability.calculate_power(caster.stats), weapon_crit_chance=self._get_crit_chance(caster), ) damage_result.target_id = target.combatant_id if not damage_result.is_miss: target.take_damage(damage_result.total_damage) messages.append( f"{target.name} takes {damage_result.total_damage} damage" ) else: messages.append(f"{target.name} dodges!") damage_results.append(damage_result) elif ability.ability_type == AbilityType.HEAL: # Healing ability heal_amount = ability.calculate_power(caster.stats) actual_heal = target.heal(heal_amount) messages.append(f"{target.name} heals for {actual_heal}") # Apply effects for effect in ability.get_effects_to_apply(): target.add_effect(effect) effects_applied.append({ "target": target.combatant_id, "effect": effect.name, "duration": effect.duration, }) messages.append(f"{target.name} is affected by {effect.name}") message = f"{caster.name} uses {ability.name}! " + "; ".join(messages) # Log action encounter.log_action( "ability", caster.combatant_id, message, { "ability": ability_id, "targets": [t.combatant_id for t in targets], "mana_cost": ability.mana_cost, } ) return ActionResult( success=True, message=message, damage_results=damage_results, effects_applied=effects_applied, ) def _execute_defend( self, encounter: CombatEncounter, combatant: Combatant ) -> ActionResult: """Execute a defend action (increases defense for one turn).""" # Add a temporary defense buff defense_buff = Effect( effect_id=f"defend_{uuid4().hex[:8]}", name="Defending", effect_type=EffectType.BUFF, duration=1, power=5, # +5 defense stat_affected="constitution", source="defend_action", ) combatant.add_effect(defense_buff) message = f"{combatant.name} takes a defensive stance! (+5 Defense)" encounter.log_action( "defend", combatant.combatant_id, message, {} ) return ActionResult( success=True, message=message, effects_applied=[{ "target": combatant.combatant_id, "effect": "Defending", "duration": 1, }], ) def _execute_flee( self, encounter: CombatEncounter, combatant: Combatant, session, user_id: str ) -> ActionResult: """Execute a flee action.""" # Calculate flee chance (base 50%, modified by DEX vs enemy DEX) base_flee_chance = 0.50 # Get average enemy DEX enemies = [c for c in encounter.combatants if not c.is_player and c.is_alive()] if enemies: avg_enemy_dex = sum(e.stats.dexterity for e in enemies) / len(enemies) dex_modifier = (combatant.stats.dexterity - avg_enemy_dex) * 0.02 flee_chance = max(0.10, min(0.90, base_flee_chance + dex_modifier)) else: flee_chance = 1.0 # No enemies, always succeed # Roll for flee roll = random.random() success = roll < flee_chance if success: encounter.status = CombatStatus.FLED encounter.log_action( "flee", combatant.combatant_id, f"{combatant.name} successfully flees from combat!", {"roll": roll, "chance": flee_chance} ) return ActionResult( success=True, message=f"{combatant.name} successfully flees from combat!", combat_ended=True, combat_status=CombatStatus.FLED, ) else: encounter.log_action( "flee_failed", combatant.combatant_id, f"{combatant.name} fails to flee!", {"roll": roll, "chance": flee_chance} ) return ActionResult( success=False, message=f"{combatant.name} fails to flee! (Roll: {roll:.2f}, Needed: {flee_chance:.2f})", ) def _execute_use_item( self, encounter: CombatEncounter, combatant: Combatant, item_id: str, target_ids: List[str] ) -> ActionResult: """Execute an item use action.""" # TODO: Implement item usage from inventory # For now, return not implemented return ActionResult( success=False, message="Item usage not yet implemented" ) # ========================================================================= # Enemy AI # ========================================================================= def _choose_enemy_action( self, encounter: CombatEncounter, enemy: Combatant ) -> Tuple[str, List[str]]: """ Choose an action for an enemy combatant. Returns: Tuple of (action_type, target_ids) """ # Get valid player targets players = [c for c in encounter.combatants if c.is_player and c.is_alive()] if not players: return ("attack", []) # Simple AI: Attack lowest HP player target = min(players, key=lambda p: p.current_hp) # 30% chance to use ability if available if enemy.abilities and enemy.current_mp > 0 and random.random() < 0.3: return ("ability", [target.combatant_id]) return ("attack", [target.combatant_id]) def _choose_enemy_ability(self, enemy: Combatant) -> Optional[str]: """Choose an ability for an enemy to use.""" for ability_id in enemy.abilities: ability = self.ability_loader.load_ability(ability_id) if ability and enemy.can_use_ability(ability_id, ability): return ability_id return None # ========================================================================= # Rewards # ========================================================================= def _calculate_rewards( self, encounter: CombatEncounter, session, user_id: str ) -> CombatRewards: """ Calculate and distribute rewards after victory. Uses CombatLootService for loot generation, supporting both static items (consumables) and procedural equipment. Args: encounter: Completed combat encounter session: Game session user_id: User ID for character updates Returns: CombatRewards with totals """ rewards = CombatRewards() # Build loot context from encounter loot_context = self._build_loot_context(encounter) # Sum up rewards from defeated enemies for combatant in encounter.combatants: if not combatant.is_player and combatant.is_dead(): # Get enemy template for rewards enemy_id = combatant.combatant_id.split("_")[0] # Extract base ID enemy = self.enemy_loader.load_enemy(enemy_id) if enemy: rewards.experience += enemy.experience_reward rewards.gold += enemy.get_gold_reward() # Generate loot using the loot service # Update context with this specific enemy's difficulty enemy_context = LootContext( party_average_level=loot_context.party_average_level, enemy_difficulty=enemy.difficulty, luck_stat=loot_context.luck_stat, loot_bonus=loot_context.loot_bonus ) # Use boss loot for boss enemies if enemy.is_boss(): loot_items = self.loot_service.generate_boss_loot( enemy, enemy_context ) else: loot_items = self.loot_service.generate_loot_from_enemy( enemy, enemy_context ) # Convert Item objects to dicts for serialization for item in loot_items: rewards.items.append(item.to_dict()) # Distribute rewards to player characters player_combatants = [c for c in encounter.combatants if c.is_player] xp_per_player = rewards.experience // max(1, len(player_combatants)) gold_per_player = rewards.gold // max(1, len(player_combatants)) for player in player_combatants: if session.is_solo(): char_id = session.solo_character_id else: char_id = player.combatant_id try: character = self.character_service.get_character(char_id, user_id) # Add XP and check for level up old_level = character.level character.experience += xp_per_player # TODO: Add level up logic based on XP thresholds # Add gold character.gold += gold_per_player # Save character self.character_service.update_character(character, user_id) if character.level > old_level: rewards.level_ups.append(char_id) except Exception as e: logger.error("Failed to distribute rewards to character", char_id=char_id, error=str(e)) logger.info("Rewards distributed", total_xp=rewards.experience, total_gold=rewards.gold, items=len(rewards.items), level_ups=rewards.level_ups) return rewards def _build_loot_context(self, encounter: CombatEncounter) -> LootContext: """ Build loot generation context from a combat encounter. Calculates: - Party average level - Party average luck stat - Default difficulty (uses EASY, specific enemies override) Args: encounter: Combat encounter with player combatants Returns: LootContext for loot generation """ player_combatants = [c for c in encounter.combatants if c.is_player] # Calculate party average level if player_combatants: # Use combatant's level if available, otherwise default to 1 levels = [] for p in player_combatants: # Try to get level from stats or combatant level = getattr(p, 'level', 1) levels.append(level) avg_level = sum(levels) // len(levels) if levels else 1 else: avg_level = 1 # Calculate party average luck if player_combatants: luck_values = [p.stats.luck for p in player_combatants] avg_luck = sum(luck_values) // len(luck_values) if luck_values else 8 else: avg_luck = 8 return LootContext( party_average_level=avg_level, enemy_difficulty=EnemyDifficulty.EASY, # Default; overridden per-enemy luck_stat=avg_luck, loot_bonus=0.0 # Future: add buffs/abilities bonus ) # ========================================================================= # Helper Methods # ========================================================================= def _create_combatant_from_character( self, character: Character ) -> Combatant: """Create a Combatant from a player Character.""" effective_stats = character.get_effective_stats() # Get abilities from unlocked skills abilities = ["basic_attack"] # All characters have basic attack abilities.extend(character.unlocked_skills) # Extract weapon properties from equipped weapon weapon = character.equipped.get("weapon") weapon_crit_chance = 0.05 weapon_crit_multiplier = 2.0 weapon_damage_type = DamageType.PHYSICAL elemental_damage_type = None physical_ratio = 1.0 elemental_ratio = 0.0 if weapon and weapon.is_weapon(): weapon_crit_chance = weapon.crit_chance weapon_crit_multiplier = weapon.crit_multiplier weapon_damage_type = weapon.damage_type or DamageType.PHYSICAL if weapon.is_elemental_weapon(): elemental_damage_type = weapon.elemental_damage_type physical_ratio = weapon.physical_ratio elemental_ratio = weapon.elemental_ratio return Combatant( combatant_id=character.character_id, name=character.name, is_player=True, current_hp=effective_stats.hit_points, max_hp=effective_stats.hit_points, current_mp=effective_stats.mana_points, max_mp=effective_stats.mana_points, stats=effective_stats, abilities=abilities, weapon_crit_chance=weapon_crit_chance, weapon_crit_multiplier=weapon_crit_multiplier, weapon_damage_type=weapon_damage_type, elemental_damage_type=elemental_damage_type, physical_ratio=physical_ratio, elemental_ratio=elemental_ratio, ) def _create_combatant_from_enemy( self, template: EnemyTemplate, instance_index: int = 0 ) -> Combatant: """Create a Combatant from an EnemyTemplate.""" # Create unique ID for this instance combatant_id = f"{template.enemy_id}_{instance_index}" # Add variation to name if multiple of same type name = template.name if instance_index > 0: name = f"{template.name} #{instance_index + 1}" # Copy stats and populate damage_bonus with base_damage stats = template.base_stats.copy() stats.damage_bonus = template.base_damage return Combatant( combatant_id=combatant_id, name=name, is_player=False, current_hp=stats.hit_points, max_hp=stats.hit_points, current_mp=stats.mana_points, max_mp=stats.mana_points, stats=stats, abilities=template.abilities.copy(), weapon_crit_chance=template.crit_chance, weapon_crit_multiplier=2.0, weapon_damage_type=DamageType.PHYSICAL, ) def _get_crit_chance(self, combatant: Combatant) -> float: """Get critical hit chance for a combatant.""" # Weapon crit chance + LUK bonus return combatant.weapon_crit_chance + combatant.stats.crit_bonus def _get_default_target( self, encounter: CombatEncounter, attacker: Combatant ) -> Optional[Combatant]: """Get a default target for an attacker.""" # Target opposite side, first alive for combatant in encounter.combatants: if combatant.is_player != attacker.is_player and combatant.is_alive(): return combatant return None def _advance_turn_and_save( self, encounter: CombatEncounter, session, user_id: str ) -> None: """Advance the turn and save encounter state to repository.""" logger.info("_advance_turn_and_save called", encounter_id=encounter.encounter_id, before_turn_index=encounter.current_turn_index, combat_log_count=len(encounter.combat_log)) encounter.advance_turn() logger.info("Turn advanced, now saving", encounter_id=encounter.encounter_id, after_turn_index=encounter.current_turn_index, combat_log_count=len(encounter.combat_log)) # Save encounter to repository self.combat_repository.update_encounter(encounter) logger.info("Encounter saved", encounter_id=encounter.encounter_id) # ============================================================================= # Global Instance # ============================================================================= _service_instance: Optional[CombatService] = None def get_combat_service() -> CombatService: """ Get the global CombatService instance. Returns: Singleton CombatService instance """ global _service_instance if _service_instance is None: _service_instance = CombatService() return _service_instance