Files
Code_of_Conquest/api/app/services/combat_service.py

1484 lines
52 KiB
Python

"""
Combat Service - Orchestrates turn-based combat encounters.
This service manages the full combat lifecycle including:
- Starting combat with characters and enemies
- Executing player and enemy actions
- Processing damage, effects, and state changes
- Distributing rewards on victory
"""
import random
from dataclasses import dataclass, field
from typing import Dict, Any, List, Optional, Tuple
from uuid import uuid4
from app.models.combat import Combatant, CombatEncounter
from app.models.character import Character
from app.models.enemy import EnemyTemplate, EnemyDifficulty
from app.models.stats import Stats
from app.models.abilities import Ability, AbilityLoader
from app.models.effects import Effect
from app.models.items import Item
from app.models.enums import CombatStatus, AbilityType, DamageType, EffectType, StatType
from app.services.damage_calculator import DamageCalculator, DamageResult
from app.services.enemy_loader import EnemyLoader, get_enemy_loader
from app.services.session_service import get_session_service
from app.services.character_service import get_character_service
from app.services.combat_loot_service import (
get_combat_loot_service,
CombatLootService,
LootContext
)
from app.services.combat_repository import (
get_combat_repository,
CombatRepository
)
from app.utils.logging import get_logger
logger = get_logger(__file__)
# =============================================================================
# Supporting Dataclasses
# =============================================================================
@dataclass
class CombatAction:
"""
Represents a combat action to be executed.
Attributes:
action_type: Type of action (attack, ability, defend, flee, item)
target_ids: List of target combatant IDs
ability_id: Ability to use (for ability actions)
item_id: Item to use (for item actions)
"""
action_type: str
target_ids: List[str] = field(default_factory=list)
ability_id: Optional[str] = None
item_id: Optional[str] = None
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'CombatAction':
"""Create CombatAction from dictionary."""
return cls(
action_type=data["action_type"],
target_ids=data.get("target_ids", []),
ability_id=data.get("ability_id"),
item_id=data.get("item_id"),
)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"action_type": self.action_type,
"target_ids": self.target_ids,
"ability_id": self.ability_id,
"item_id": self.item_id,
}
@dataclass
class ActionResult:
"""
Result of executing a combat action.
Attributes:
success: Whether the action succeeded
message: Human-readable result message
damage_results: List of damage calculation results
effects_applied: List of effects applied to targets
combat_ended: Whether combat ended as a result
combat_status: Final combat status if ended
next_combatant_id: ID of combatant whose turn is next
turn_effects: Effects that triggered at turn start/end
rewards: Combat rewards if victory (XP, gold, items)
"""
success: bool
message: str
damage_results: List[DamageResult] = field(default_factory=list)
effects_applied: List[Dict[str, Any]] = field(default_factory=list)
combat_ended: bool = False
combat_status: Optional[CombatStatus] = None
next_combatant_id: Optional[str] = None
next_is_player: bool = True # True if next turn is player's
turn_effects: List[Dict[str, Any]] = field(default_factory=list)
rewards: Optional[Dict[str, Any]] = None # Populated on victory
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for API response."""
return {
"success": self.success,
"message": self.message,
"damage_results": [
{
"target_id": dr.target_id if hasattr(dr, 'target_id') else None,
"total_damage": dr.total_damage,
"is_critical": dr.is_critical,
"is_miss": dr.is_miss,
"message": dr.message,
}
for dr in self.damage_results
],
"effects_applied": self.effects_applied,
"combat_ended": self.combat_ended,
"combat_status": self.combat_status.value if self.combat_status else None,
"next_combatant_id": self.next_combatant_id,
"next_is_player": self.next_is_player,
"turn_effects": self.turn_effects,
"rewards": self.rewards,
}
@dataclass
class CombatRewards:
"""
Rewards distributed after combat victory.
Attributes:
experience: Total XP earned
gold: Total gold earned
items: List of item drops
level_ups: Character IDs that leveled up
"""
experience: int = 0
gold: int = 0
items: List[Dict[str, Any]] = field(default_factory=list)
level_ups: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"experience": self.experience,
"gold": self.gold,
"items": self.items,
"level_ups": self.level_ups,
}
# =============================================================================
# Combat Service Exceptions
# =============================================================================
class CombatError(Exception):
"""Base exception for combat errors."""
pass
class NotInCombatError(CombatError):
"""Raised when action requires combat but session is not in combat."""
pass
class AlreadyInCombatError(CombatError):
"""Raised when trying to start combat but already in combat."""
pass
class InvalidActionError(CombatError):
"""Raised when action is invalid (wrong turn, invalid target, etc.)."""
pass
class InsufficientResourceError(CombatError):
"""Raised when combatant lacks resources (mana, cooldown, etc.)."""
pass
# =============================================================================
# Combat Service
# =============================================================================
class CombatService:
"""
Orchestrates turn-based combat encounters.
This service manages:
- Combat initialization with players and enemies
- Turn-by-turn action execution
- Damage calculation via DamageCalculator
- Effect processing and state management
- Victory/defeat detection and reward distribution
"""
def __init__(self):
"""Initialize the combat service with dependencies."""
self.session_service = get_session_service()
self.character_service = get_character_service()
self.enemy_loader = get_enemy_loader()
self.ability_loader = AbilityLoader()
self.loot_service = get_combat_loot_service()
self.combat_repository = get_combat_repository()
logger.info("CombatService initialized")
# =========================================================================
# Combat Lifecycle
# =========================================================================
def start_combat(
self,
session_id: str,
user_id: str,
enemy_ids: List[str],
) -> CombatEncounter:
"""
Start a new combat encounter.
Args:
session_id: Game session ID
user_id: User ID for authorization
enemy_ids: List of enemy template IDs to spawn
Returns:
Initialized CombatEncounter
Raises:
AlreadyInCombatError: If session is already in combat
ValueError: If enemy templates not found
"""
logger.info("Starting combat",
session_id=session_id,
enemy_count=len(enemy_ids))
# Get session
session = self.session_service.get_session(session_id, user_id)
# Check not already in combat
if session.is_in_combat():
raise AlreadyInCombatError("Session is already in combat")
# Create combatants from player character(s)
combatants = []
if session.is_solo():
# Solo session - single character
character = self.character_service.get_character(
session.solo_character_id,
user_id
)
player_combatant = self._create_combatant_from_character(character)
combatants.append(player_combatant)
else:
# Multiplayer - all party members
for char_id in session.party_member_ids:
# Note: In multiplayer, we'd need to handle different user_ids
# For now, assume all characters belong to session owner
character = self.character_service.get_character(char_id, user_id)
player_combatant = self._create_combatant_from_character(character)
combatants.append(player_combatant)
# Create combatants from enemies
for i, enemy_id in enumerate(enemy_ids):
enemy_template = self.enemy_loader.load_enemy(enemy_id)
if not enemy_template:
raise ValueError(f"Enemy template not found: {enemy_id}")
enemy_combatant = self._create_combatant_from_enemy(
enemy_template,
instance_index=i
)
combatants.append(enemy_combatant)
# Create encounter
encounter = CombatEncounter(
encounter_id=f"enc_{uuid4().hex[:12]}",
combatants=combatants,
)
# Initialize combat (roll initiative, set turn order)
encounter.initialize_combat()
# Save encounter to dedicated table
self.combat_repository.create_encounter(
encounter=encounter,
session_id=session_id,
user_id=user_id
)
# Update session with reference to encounter (not inline data)
session.active_combat_encounter_id = encounter.encounter_id
session.combat_encounter = None # Clear legacy inline storage
session.update_activity()
self.session_service.update_session(session)
logger.info("Combat started",
session_id=session_id,
encounter_id=encounter.encounter_id,
player_count=len([c for c in combatants if c.is_player]),
enemy_count=len([c for c in combatants if not c.is_player]))
return encounter
def get_combat_state(
self,
session_id: str,
user_id: str
) -> Optional[CombatEncounter]:
"""
Get current combat state for a session.
Uses the new database-backed storage, with fallback to legacy
inline session storage for backward compatibility.
Args:
session_id: Game session ID
user_id: User ID for authorization
Returns:
CombatEncounter if in combat, None otherwise
"""
session = self.session_service.get_session(session_id, user_id)
# New system: Check for reference to combat_encounters table
if session.active_combat_encounter_id:
encounter = self.combat_repository.get_encounter(
session.active_combat_encounter_id
)
if encounter:
return encounter
# Reference exists but encounter not found - clear stale reference
logger.warning("Stale combat encounter reference, clearing",
session_id=session_id,
encounter_id=session.active_combat_encounter_id)
session.active_combat_encounter_id = None
self.session_service.update_session(session)
return None
# Legacy fallback: Check inline combat data and migrate if present
if session.combat_encounter:
return self._migrate_inline_encounter(session, user_id)
return None
def _migrate_inline_encounter(
self,
session,
user_id: str
) -> CombatEncounter:
"""
Migrate legacy inline combat encounter to database table.
This provides backward compatibility by automatically migrating
existing inline combat data to the new database-backed system
on first access.
Args:
session: GameSession with inline combat_encounter
user_id: User ID
Returns:
The migrated CombatEncounter
"""
encounter = session.combat_encounter
logger.info("Migrating inline combat encounter to database",
session_id=session.session_id,
encounter_id=encounter.encounter_id)
# Save to repository
self.combat_repository.create_encounter(
encounter=encounter,
session_id=session.session_id,
user_id=user_id
)
# Update session to use reference
session.active_combat_encounter_id = encounter.encounter_id
session.combat_encounter = None # Clear inline data
self.session_service.update_session(session)
return encounter
def end_combat(
self,
session_id: str,
user_id: str,
outcome: CombatStatus
) -> CombatRewards:
"""
End combat and distribute rewards if victorious.
Args:
session_id: Game session ID
user_id: User ID for authorization
outcome: Combat outcome (VICTORY, DEFEAT, FLED)
Returns:
CombatRewards containing XP, gold, items earned
"""
logger.info("Ending combat",
session_id=session_id,
outcome=outcome.value)
session = self.session_service.get_session(session_id, user_id)
if not session.is_in_combat():
raise NotInCombatError("Session is not in combat")
# Get encounter from repository (or legacy inline)
encounter = self.get_combat_state(session_id, user_id)
if not encounter:
raise NotInCombatError("Combat encounter not found")
encounter.status = outcome
# Calculate rewards if victory
rewards = CombatRewards()
if outcome == CombatStatus.VICTORY:
rewards = self._calculate_rewards(encounter, session, user_id)
# End encounter in repository
if session.active_combat_encounter_id:
self.combat_repository.end_encounter(
encounter_id=session.active_combat_encounter_id,
status=outcome
)
# Clear session combat reference
session.active_combat_encounter_id = None
session.combat_encounter = None # Also clear legacy field
session.update_activity()
self.session_service.update_session(session)
logger.info("Combat ended",
session_id=session_id,
encounter_id=encounter.encounter_id,
outcome=outcome.value,
xp_earned=rewards.experience,
gold_earned=rewards.gold)
return rewards
def check_existing_combat(
self,
session_id: str,
user_id: str
) -> Optional[Dict[str, Any]]:
"""
Check if a session has an existing active combat encounter.
Returns combat summary if exists, None otherwise.
Args:
session_id: Game session ID
user_id: User ID for authorization
Returns:
Dictionary with combat summary if in combat, None otherwise
"""
logger.info("Checking for existing combat",
session_id=session_id)
session = self.session_service.get_session(session_id, user_id)
if not session.is_in_combat():
return None
# Get encounter details
encounter = self.get_combat_state(session_id, user_id)
if not encounter:
return None
# Build summary of combatants
players = []
enemies = []
for combatant in encounter.combatants:
combatant_info = {
"name": combatant.name,
"current_hp": combatant.current_hp,
"max_hp": combatant.max_hp,
"is_alive": combatant.is_alive(),
}
if combatant.is_player:
players.append(combatant_info)
else:
enemies.append(combatant_info)
return {
"has_active_combat": True,
"encounter_id": encounter.encounter_id,
"round_number": encounter.round_number,
"status": encounter.status.value,
"players": players,
"enemies": enemies,
}
def abandon_combat(
self,
session_id: str,
user_id: str
) -> bool:
"""
Abandon an existing combat encounter without completing it.
Deletes the encounter from the database and clears the session
reference. No rewards are distributed.
Args:
session_id: Game session ID
user_id: User ID for authorization
Returns:
True if combat was abandoned, False if no combat existed
"""
logger.info("Abandoning combat",
session_id=session_id)
session = self.session_service.get_session(session_id, user_id)
if not session.is_in_combat():
logger.info("No combat to abandon",
session_id=session_id)
return False
encounter_id = session.active_combat_encounter_id
# Delete encounter from repository
if encounter_id:
try:
self.combat_repository.delete_encounter(encounter_id)
logger.info("Deleted encounter from repository",
encounter_id=encounter_id)
except Exception as e:
logger.warning("Failed to delete encounter from repository",
encounter_id=encounter_id,
error=str(e))
# Clear session combat references
session.active_combat_encounter_id = None
session.combat_encounter = None # Clear legacy field too
session.update_activity()
self.session_service.update_session(session)
logger.info("Combat abandoned",
session_id=session_id,
encounter_id=encounter_id)
return True
# =========================================================================
# Action Execution
# =========================================================================
def execute_action(
self,
session_id: str,
user_id: str,
combatant_id: str,
action: CombatAction
) -> ActionResult:
"""
Execute a combat action for a combatant.
Args:
session_id: Game session ID
user_id: User ID for authorization
combatant_id: ID of combatant taking action
action: Action to execute
Returns:
ActionResult with outcome details
Raises:
NotInCombatError: If session not in combat
InvalidActionError: If action is invalid
"""
logger.info("Executing action",
session_id=session_id,
combatant_id=combatant_id,
action_type=action.action_type)
session = self.session_service.get_session(session_id, user_id)
if not session.is_in_combat():
raise NotInCombatError("Session is not in combat")
# Get encounter from repository
encounter = self.get_combat_state(session_id, user_id)
if not encounter:
raise NotInCombatError("Combat encounter not found")
# Validate it's this combatant's turn
current = encounter.get_current_combatant()
if not current or current.combatant_id != combatant_id:
raise InvalidActionError(
f"Not {combatant_id}'s turn. Current turn: "
f"{current.combatant_id if current else 'None'}"
)
# Process start-of-turn effects
turn_effects = encounter.start_turn()
# Check if combatant is stunned
if current.is_stunned():
result = ActionResult(
success=False,
message=f"{current.name} is stunned and cannot act!",
turn_effects=[{"type": "stun", "combatant": current.name}],
)
self._advance_turn_and_save(encounter, session, user_id)
result.next_combatant_id = encounter.get_current_combatant().combatant_id
return result
# Execute based on action type
if action.action_type == "attack":
result = self._execute_attack(encounter, current, action.target_ids)
elif action.action_type == "ability":
result = self._execute_ability(
encounter, current, action.ability_id, action.target_ids
)
elif action.action_type == "defend":
result = self._execute_defend(encounter, current)
elif action.action_type == "flee":
result = self._execute_flee(encounter, current, session, user_id)
elif action.action_type == "item":
result = self._execute_use_item(
encounter, current, action.item_id, action.target_ids
)
else:
raise InvalidActionError(f"Unknown action type: {action.action_type}")
# Add turn effects
result.turn_effects = [
{"type": e.get("type", "effect"), "message": e.get("message", "")}
for e in turn_effects
]
# Check for combat end
status = encounter.check_end_condition()
if status != CombatStatus.ACTIVE:
result.combat_ended = True
result.combat_status = status
# Auto-end combat and distribute rewards
if status == CombatStatus.VICTORY:
rewards = self._calculate_rewards(encounter, session, user_id)
result.message += f" Victory! Earned {rewards.experience} XP and {rewards.gold} gold."
result.rewards = rewards.to_dict()
# End encounter in repository
if session.active_combat_encounter_id:
self.combat_repository.end_encounter(
encounter_id=session.active_combat_encounter_id,
status=status
)
# Clear session combat reference
session.active_combat_encounter_id = None
session.combat_encounter = None
else:
# Advance turn and save to repository
self._advance_turn_and_save(encounter, session, user_id)
next_combatant = encounter.get_current_combatant()
if next_combatant:
result.next_combatant_id = next_combatant.combatant_id
result.next_is_player = next_combatant.is_player
else:
result.next_combatant_id = None
result.next_is_player = True
# Save session state
self.session_service.update_session(session)
return result
def execute_enemy_turn(
self,
session_id: str,
user_id: str
) -> ActionResult:
"""
Execute an enemy's turn using basic AI.
Args:
session_id: Game session ID
user_id: User ID for authorization
Returns:
ActionResult with enemy action outcome
"""
session = self.session_service.get_session(session_id, user_id)
if not session.is_in_combat():
raise NotInCombatError("Session is not in combat")
# Get encounter from repository
encounter = self.get_combat_state(session_id, user_id)
if not encounter:
raise NotInCombatError("Combat encounter not found")
current = encounter.get_current_combatant()
if not current:
raise InvalidActionError("No current combatant")
if current.is_player:
raise InvalidActionError("Current combatant is a player, not an enemy")
# Check if the enemy is dead (shouldn't happen with fixed advance_turn, but defensive)
if current.is_dead():
# Skip this dead enemy's turn and advance
self._advance_turn_and_save(encounter, session, user_id)
next_combatant = encounter.get_current_combatant()
return ActionResult(
success=True,
message=f"{current.name} is defeated and cannot act.",
next_combatant_id=next_combatant.combatant_id if next_combatant else None,
next_is_player=next_combatant.is_player if next_combatant else True,
)
# Process start-of-turn effects
turn_effects = encounter.start_turn()
# Check if enemy died from DoT effects at turn start
if current.is_dead():
# Check if combat ended
combat_status = encounter.check_end_condition()
if combat_status in [CombatStatus.VICTORY, CombatStatus.DEFEAT]:
encounter.status = combat_status
result = ActionResult(
success=True,
message=f"{current.name} was defeated by damage over time!",
combat_ended=True,
combat_status=combat_status,
turn_effects=turn_effects,
)
if session.active_combat_encounter_id:
self.combat_repository.end_encounter(
encounter_id=session.active_combat_encounter_id,
status=combat_status
)
session.active_combat_encounter_id = None
session.combat_encounter = None
self.session_service.update_session(session)
return result
else:
# Advance past the dead enemy
self._advance_turn_and_save(encounter, session, user_id)
next_combatant = encounter.get_current_combatant()
return ActionResult(
success=True,
message=f"{current.name} was defeated by damage over time!",
next_combatant_id=next_combatant.combatant_id if next_combatant else None,
next_is_player=next_combatant.is_player if next_combatant else True,
turn_effects=turn_effects,
)
# Check if stunned
if current.is_stunned():
result = ActionResult(
success=False,
message=f"{current.name} is stunned and cannot act!",
turn_effects=[{"type": "stun", "combatant": current.name}],
)
self._advance_turn_and_save(encounter, session, user_id)
result.next_combatant_id = encounter.get_current_combatant().combatant_id
return result
# Enemy AI: Choose action
action, targets = self._choose_enemy_action(encounter, current)
# Execute chosen action
if action == "ability" and current.abilities:
# Try to use an ability
ability_id = self._choose_enemy_ability(current)
if ability_id:
result = self._execute_ability(
encounter, current, ability_id, targets
)
else:
# Fallback to basic attack
result = self._execute_attack(encounter, current, targets)
else:
# Basic attack
result = self._execute_attack(encounter, current, targets)
# Add turn effects
result.turn_effects = [
{"type": e.get("type", "effect"), "message": e.get("message", "")}
for e in turn_effects
]
# Check for combat end
status = encounter.check_end_condition()
if status != CombatStatus.ACTIVE:
result.combat_ended = True
result.combat_status = status
# Calculate and distribute rewards on victory
if status == CombatStatus.VICTORY:
rewards = self._calculate_rewards(encounter, session, user_id)
result.rewards = rewards.to_dict()
# End encounter in repository
if session.active_combat_encounter_id:
self.combat_repository.end_encounter(
encounter_id=session.active_combat_encounter_id,
status=status
)
# Clear session combat reference
session.active_combat_encounter_id = None
session.combat_encounter = None
else:
logger.info("Combat still active, advancing turn",
session_id=session_id,
encounter_id=encounter.encounter_id)
self._advance_turn_and_save(encounter, session, user_id)
next_combatant = encounter.get_current_combatant()
logger.info("Next combatant determined",
next_combatant_id=next_combatant.combatant_id if next_combatant else None,
next_is_player=next_combatant.is_player if next_combatant else None)
if next_combatant:
result.next_combatant_id = next_combatant.combatant_id
result.next_is_player = next_combatant.is_player
else:
result.next_combatant_id = None
result.next_is_player = True
self.session_service.update_session(session)
logger.info("Enemy turn complete, returning result",
session_id=session_id,
next_combatant_id=result.next_combatant_id,
next_is_player=result.next_is_player)
return result
# =========================================================================
# Specific Action Implementations
# =========================================================================
def _execute_attack(
self,
encounter: CombatEncounter,
attacker: Combatant,
target_ids: List[str]
) -> ActionResult:
"""Execute a basic attack action."""
if not target_ids:
# Auto-target: first alive enemy/player
target = self._get_default_target(encounter, attacker)
if not target:
return ActionResult(
success=False,
message="No valid targets available"
)
target_ids = [target.combatant_id]
target = encounter.get_combatant(target_ids[0])
if not target or target.is_dead():
return ActionResult(
success=False,
message="Invalid or dead target"
)
# Check if this is an elemental weapon attack
if attacker.elemental_ratio > 0.0 and attacker.elemental_damage_type:
# Elemental weapon: split damage between physical and elemental
damage_result = DamageCalculator.calculate_elemental_weapon_damage(
attacker_stats=attacker.stats,
defender_stats=target.stats,
weapon_crit_chance=attacker.weapon_crit_chance,
weapon_crit_multiplier=attacker.weapon_crit_multiplier,
physical_ratio=attacker.physical_ratio,
elemental_ratio=attacker.elemental_ratio,
elemental_type=attacker.elemental_damage_type,
)
else:
# Normal physical attack
damage_result = DamageCalculator.calculate_physical_damage(
attacker_stats=attacker.stats,
defender_stats=target.stats,
weapon_crit_chance=attacker.weapon_crit_chance,
weapon_crit_multiplier=attacker.weapon_crit_multiplier,
)
# Add target_id to result for tracking
damage_result.target_id = target.combatant_id
# Apply damage
if not damage_result.is_miss:
actual_damage = target.take_damage(damage_result.total_damage)
message = f"{attacker.name} attacks {target.name}! {damage_result.message}"
if actual_damage < damage_result.total_damage:
message += f" (Shield absorbed {damage_result.total_damage - actual_damage})"
else:
message = f"{attacker.name} attacks {target.name}! {damage_result.message}"
# Log action
encounter.log_action(
"attack",
attacker.combatant_id,
message,
{"damage": damage_result.total_damage, "target": target.combatant_id}
)
return ActionResult(
success=True,
message=message,
damage_results=[damage_result],
)
def _execute_ability(
self,
encounter: CombatEncounter,
caster: Combatant,
ability_id: str,
target_ids: List[str]
) -> ActionResult:
"""Execute an ability action."""
# Load ability
ability = self.ability_loader.load_ability(ability_id)
if not ability:
return ActionResult(
success=False,
message=f"Unknown ability: {ability_id}"
)
# Check if ability is available
if not caster.can_use_ability(ability_id, ability):
if caster.current_mp < ability.mana_cost:
return ActionResult(
success=False,
message=f"Not enough mana ({caster.current_mp}/{ability.mana_cost})"
)
if ability_id in caster.cooldowns and caster.cooldowns[ability_id] > 0:
return ActionResult(
success=False,
message=f"Ability on cooldown ({caster.cooldowns[ability_id]} turns)"
)
return ActionResult(
success=False,
message=f"Cannot use ability: {ability.name}"
)
# Determine targets
if ability.is_aoe:
# AoE: Target all enemies (or allies for heals)
if ability.ability_type in [AbilityType.HEAL, AbilityType.BUFF]:
targets = [c for c in encounter.combatants
if c.is_player == caster.is_player and c.is_alive()]
else:
targets = [c for c in encounter.combatants
if c.is_player != caster.is_player and c.is_alive()]
# Limit by target_count if specified
if ability.target_count > 0:
targets = targets[:ability.target_count]
else:
# Single target
if not target_ids:
target = self._get_default_target(encounter, caster)
if not target:
return ActionResult(success=False, message="No valid targets")
targets = [target]
else:
targets = [encounter.get_combatant(tid) for tid in target_ids]
targets = [t for t in targets if t and t.is_alive()]
if not targets:
return ActionResult(success=False, message="No valid targets")
# Apply mana cost and cooldown
caster.use_ability_cost(ability, ability_id)
# Execute ability based on type
damage_results = []
effects_applied = []
messages = []
for target in targets:
if ability.ability_type in [AbilityType.ATTACK, AbilityType.SPELL]:
# Damage ability
damage_result = DamageCalculator.calculate_magical_damage(
attacker_stats=caster.stats,
defender_stats=target.stats,
ability_base_power=ability.calculate_power(caster.stats),
weapon_crit_chance=self._get_crit_chance(caster),
)
damage_result.target_id = target.combatant_id
if not damage_result.is_miss:
target.take_damage(damage_result.total_damage)
messages.append(
f"{target.name} takes {damage_result.total_damage} damage"
)
else:
messages.append(f"{target.name} dodges!")
damage_results.append(damage_result)
elif ability.ability_type == AbilityType.HEAL:
# Healing ability
heal_amount = ability.calculate_power(caster.stats)
actual_heal = target.heal(heal_amount)
messages.append(f"{target.name} heals for {actual_heal}")
# Apply effects
for effect in ability.get_effects_to_apply():
target.add_effect(effect)
effects_applied.append({
"target": target.combatant_id,
"effect": effect.name,
"duration": effect.duration,
})
messages.append(f"{target.name} is affected by {effect.name}")
message = f"{caster.name} uses {ability.name}! " + "; ".join(messages)
# Log action
encounter.log_action(
"ability",
caster.combatant_id,
message,
{
"ability": ability_id,
"targets": [t.combatant_id for t in targets],
"mana_cost": ability.mana_cost,
}
)
return ActionResult(
success=True,
message=message,
damage_results=damage_results,
effects_applied=effects_applied,
)
def _execute_defend(
self,
encounter: CombatEncounter,
combatant: Combatant
) -> ActionResult:
"""Execute a defend action (increases defense for one turn)."""
# Add a temporary defense buff
defense_buff = Effect(
effect_id=f"defend_{uuid4().hex[:8]}",
name="Defending",
effect_type=EffectType.BUFF,
duration=1,
power=5, # +5 defense
stat_affected=StatType.CONSTITUTION,
source="defend_action",
)
combatant.add_effect(defense_buff)
message = f"{combatant.name} takes a defensive stance! (+5 Defense)"
encounter.log_action(
"defend",
combatant.combatant_id,
message,
{}
)
return ActionResult(
success=True,
message=message,
effects_applied=[{
"target": combatant.combatant_id,
"effect": "Defending",
"duration": 1,
}],
)
def _execute_flee(
self,
encounter: CombatEncounter,
combatant: Combatant,
session,
user_id: str
) -> ActionResult:
"""Execute a flee action."""
# Calculate flee chance (base 50%, modified by DEX vs enemy DEX)
base_flee_chance = 0.50
# Get average enemy DEX
enemies = [c for c in encounter.combatants if not c.is_player and c.is_alive()]
if enemies:
avg_enemy_dex = sum(e.stats.dexterity for e in enemies) / len(enemies)
dex_modifier = (combatant.stats.dexterity - avg_enemy_dex) * 0.02
flee_chance = max(0.10, min(0.90, base_flee_chance + dex_modifier))
else:
flee_chance = 1.0 # No enemies, always succeed
# Roll for flee
roll = random.random()
success = roll < flee_chance
if success:
encounter.status = CombatStatus.FLED
encounter.log_action(
"flee",
combatant.combatant_id,
f"{combatant.name} successfully flees from combat!",
{"roll": roll, "chance": flee_chance}
)
return ActionResult(
success=True,
message=f"{combatant.name} successfully flees from combat!",
combat_ended=True,
combat_status=CombatStatus.FLED,
)
else:
encounter.log_action(
"flee_failed",
combatant.combatant_id,
f"{combatant.name} fails to flee!",
{"roll": roll, "chance": flee_chance}
)
return ActionResult(
success=False,
message=f"{combatant.name} fails to flee! (Roll: {roll:.2f}, Needed: {flee_chance:.2f})",
)
def _execute_use_item(
self,
encounter: CombatEncounter,
combatant: Combatant,
item_id: str,
target_ids: List[str]
) -> ActionResult:
"""Execute an item use action."""
# TODO: Implement item usage from inventory
# For now, return not implemented
return ActionResult(
success=False,
message="Item usage not yet implemented"
)
# =========================================================================
# Enemy AI
# =========================================================================
def _choose_enemy_action(
self,
encounter: CombatEncounter,
enemy: Combatant
) -> Tuple[str, List[str]]:
"""
Choose an action for an enemy combatant.
Returns:
Tuple of (action_type, target_ids)
"""
# Get valid player targets
players = [c for c in encounter.combatants if c.is_player and c.is_alive()]
if not players:
return ("attack", [])
# Simple AI: Attack lowest HP player
target = min(players, key=lambda p: p.current_hp)
# 30% chance to use ability if available
if enemy.abilities and enemy.current_mp > 0 and random.random() < 0.3:
return ("ability", [target.combatant_id])
return ("attack", [target.combatant_id])
def _choose_enemy_ability(self, enemy: Combatant) -> Optional[str]:
"""Choose an ability for an enemy to use."""
for ability_id in enemy.abilities:
ability = self.ability_loader.load_ability(ability_id)
if ability and enemy.can_use_ability(ability_id, ability):
return ability_id
return None
# =========================================================================
# Rewards
# =========================================================================
def _calculate_rewards(
self,
encounter: CombatEncounter,
session,
user_id: str
) -> CombatRewards:
"""
Calculate and distribute rewards after victory.
Uses CombatLootService for loot generation, supporting both
static items (consumables) and procedural equipment.
Args:
encounter: Completed combat encounter
session: Game session
user_id: User ID for character updates
Returns:
CombatRewards with totals
"""
rewards = CombatRewards()
# Build loot context from encounter
loot_context = self._build_loot_context(encounter)
# Sum up rewards from defeated enemies
for combatant in encounter.combatants:
if not combatant.is_player and combatant.is_dead():
# Get enemy template for rewards
enemy_id = combatant.combatant_id.split("_")[0] # Extract base ID
enemy = self.enemy_loader.load_enemy(enemy_id)
if enemy:
rewards.experience += enemy.experience_reward
rewards.gold += enemy.get_gold_reward()
# Generate loot using the loot service
# Update context with this specific enemy's difficulty
enemy_context = LootContext(
party_average_level=loot_context.party_average_level,
enemy_difficulty=enemy.difficulty,
luck_stat=loot_context.luck_stat,
loot_bonus=loot_context.loot_bonus
)
# Use boss loot for boss enemies
if enemy.is_boss():
loot_items = self.loot_service.generate_boss_loot(
enemy, enemy_context
)
else:
loot_items = self.loot_service.generate_loot_from_enemy(
enemy, enemy_context
)
# Convert Item objects to dicts for serialization
for item in loot_items:
rewards.items.append(item.to_dict())
# Distribute rewards to player characters
player_combatants = [c for c in encounter.combatants if c.is_player]
xp_per_player = rewards.experience // max(1, len(player_combatants))
gold_per_player = rewards.gold // max(1, len(player_combatants))
for player in player_combatants:
if session.is_solo():
char_id = session.solo_character_id
else:
char_id = player.combatant_id
try:
character = self.character_service.get_character(char_id, user_id)
# Add XP and check for level up
leveled_up = character.add_experience(xp_per_player)
if leveled_up:
rewards.level_ups.append(char_id)
# Add gold
character.gold += gold_per_player
# Save character
self.character_service.update_character(character, user_id)
except Exception as e:
logger.error("Failed to distribute rewards to character",
char_id=char_id,
error=str(e))
logger.info("Rewards distributed",
total_xp=rewards.experience,
total_gold=rewards.gold,
items=len(rewards.items),
level_ups=rewards.level_ups)
return rewards
def _build_loot_context(self, encounter: CombatEncounter) -> LootContext:
"""
Build loot generation context from a combat encounter.
Calculates:
- Party average level
- Party average luck stat
- Default difficulty (uses EASY, specific enemies override)
Args:
encounter: Combat encounter with player combatants
Returns:
LootContext for loot generation
"""
player_combatants = [c for c in encounter.combatants if c.is_player]
# Calculate party average level
if player_combatants:
# Use combatant's level if available, otherwise default to 1
levels = []
for p in player_combatants:
# Try to get level from stats or combatant
level = getattr(p, 'level', 1)
levels.append(level)
avg_level = sum(levels) // len(levels) if levels else 1
else:
avg_level = 1
# Calculate party average luck
if player_combatants:
luck_values = [p.stats.luck for p in player_combatants]
avg_luck = sum(luck_values) // len(luck_values) if luck_values else 8
else:
avg_luck = 8
return LootContext(
party_average_level=avg_level,
enemy_difficulty=EnemyDifficulty.EASY, # Default; overridden per-enemy
luck_stat=avg_luck,
loot_bonus=0.0 # Future: add buffs/abilities bonus
)
# =========================================================================
# Helper Methods
# =========================================================================
def _create_combatant_from_character(
self,
character: Character
) -> Combatant:
"""Create a Combatant from a player Character."""
effective_stats = character.get_effective_stats()
# Get abilities from unlocked skills
abilities = ["basic_attack"] # All characters have basic attack
abilities.extend(character.unlocked_skills)
# Extract weapon properties from equipped weapon
weapon = character.equipped.get("weapon")
weapon_crit_chance = 0.05
weapon_crit_multiplier = 2.0
weapon_damage_type = DamageType.PHYSICAL
elemental_damage_type = None
physical_ratio = 1.0
elemental_ratio = 0.0
if weapon and weapon.is_weapon():
weapon_crit_chance = weapon.crit_chance
weapon_crit_multiplier = weapon.crit_multiplier
weapon_damage_type = weapon.damage_type or DamageType.PHYSICAL
if weapon.is_elemental_weapon():
elemental_damage_type = weapon.elemental_damage_type
physical_ratio = weapon.physical_ratio
elemental_ratio = weapon.elemental_ratio
return Combatant(
combatant_id=character.character_id,
name=character.name,
is_player=True,
current_hp=effective_stats.hit_points,
max_hp=effective_stats.hit_points,
current_mp=effective_stats.mana_points,
max_mp=effective_stats.mana_points,
stats=effective_stats,
abilities=abilities,
weapon_crit_chance=weapon_crit_chance,
weapon_crit_multiplier=weapon_crit_multiplier,
weapon_damage_type=weapon_damage_type,
elemental_damage_type=elemental_damage_type,
physical_ratio=physical_ratio,
elemental_ratio=elemental_ratio,
)
def _create_combatant_from_enemy(
self,
template: EnemyTemplate,
instance_index: int = 0
) -> Combatant:
"""Create a Combatant from an EnemyTemplate."""
# Create unique ID for this instance
combatant_id = f"{template.enemy_id}_{instance_index}"
# Add variation to name if multiple of same type
name = template.name
if instance_index > 0:
name = f"{template.name} #{instance_index + 1}"
# Copy stats and populate damage_bonus with base_damage
stats = template.base_stats.copy()
stats.damage_bonus = template.base_damage
return Combatant(
combatant_id=combatant_id,
name=name,
is_player=False,
current_hp=stats.hit_points,
max_hp=stats.hit_points,
current_mp=stats.mana_points,
max_mp=stats.mana_points,
stats=stats,
abilities=template.abilities.copy(),
weapon_crit_chance=template.crit_chance,
weapon_crit_multiplier=2.0,
weapon_damage_type=DamageType.PHYSICAL,
)
def _get_crit_chance(self, combatant: Combatant) -> float:
"""Get critical hit chance for a combatant."""
# Weapon crit chance + LUK bonus
return combatant.weapon_crit_chance + combatant.stats.crit_bonus
def _get_default_target(
self,
encounter: CombatEncounter,
attacker: Combatant
) -> Optional[Combatant]:
"""Get a default target for an attacker."""
# Target opposite side, first alive
for combatant in encounter.combatants:
if combatant.is_player != attacker.is_player and combatant.is_alive():
return combatant
return None
def _advance_turn_and_save(
self,
encounter: CombatEncounter,
session,
user_id: str
) -> None:
"""Advance the turn and save encounter state to repository."""
logger.info("_advance_turn_and_save called",
encounter_id=encounter.encounter_id,
before_turn_index=encounter.current_turn_index,
combat_log_count=len(encounter.combat_log))
encounter.advance_turn()
logger.info("Turn advanced, now saving",
encounter_id=encounter.encounter_id,
after_turn_index=encounter.current_turn_index,
combat_log_count=len(encounter.combat_log))
# Save encounter to repository
self.combat_repository.update_encounter(encounter)
logger.info("Encounter saved",
encounter_id=encounter.encounter_id)
# =============================================================================
# Global Instance
# =============================================================================
_service_instance: Optional[CombatService] = None
def get_combat_service() -> CombatService:
"""
Get the global CombatService instance.
Returns:
Singleton CombatService instance
"""
global _service_instance
if _service_instance is None:
_service_instance = CombatService()
return _service_instance