from __future__ import annotations import os import subprocess import xml.etree.ElementTree as ET import tempfile from pathlib import Path from typing import Iterable, List, Dict, Optional, Tuple, Union from utils.models import HostResult, PortFinding from utils.scan_config_loader import ScanConfigFile,ScanTarget class nmap_scanner: TCP_REPORT_PATH = Path() / "data" / "nmap-tcp-results.xml" UDP_REPORT_PATH = Path() / "data" / "nmap-udp-results.xml" NMAP_RESULTS_PATH = Path() / "data" / "nmap-results.xml" def __init__(self, config:ScanConfigFile): self.scan_config = config self.targets = config.scan_targets self.target_list = [t.ip for t in config.scan_targets] self.scan_udp = config.scan_options.udp_scan pass def scan_targets(self): tcp_results = self.run_nmap_tcp_all() if self.scan_udp: udp_results = self.run_nmap_udp() all_results: List[HostResult] = self.merge_host_results(tcp_results,udp_results) else: all_results = tcp_results return all_results def run_nmap_tcp_all(self, min_rate: int = 1000, assume_up: bool = True) -> List[HostResult]: """ Run a TCP SYN scan across all ports (0-65535) for the given targets and parse results. Returns a list of HostResult objects. """ targets_list = self.target_list if not targets_list: return [] cmd = [ "nmap", "-sS", # TCP SYN scan "-p-", # all TCP ports "-T4", "--min-rate", str(min_rate), "-oX", str(self.TCP_REPORT_PATH), ] if assume_up: cmd.append("-Pn") cmd.extend(targets_list) self._run_nmap(cmd) return self.parse_nmap_xml(self.TCP_REPORT_PATH) def run_nmap_udp( self, ports: Optional[Iterable[int]] = None, min_rate: int = 500, assume_up: bool = True, ) -> List[HostResult]: """ Run UDP scans. Behavior: - If `ports` is provided -> single nmap run against all targets using that port list. - If `ports` is None -> * For hosts with `expected_udp` defined and non-empty: scan only those ports. * For hosts with no `expected_udp` (or empty): omit `-p` so nmap uses its default top UDP ports. Hosts sharing the same explicit UDP port set are grouped into one nmap run. Returns: Merged List[HostResult] across all runs. """ targets_list = getattr(self, "target_list", []) if not targets_list: return [] # Optional logger (don't fail if not present) logger = getattr(self, "logger", None) def _log(msg: str) -> None: if logger: logger.info(msg) else: print(msg) # Case 1: caller provided a global port list -> one run, all targets if ports: port_list = sorted({int(p) for p in ports}) port_str = ",".join(str(p) for p in port_list) with tempfile.NamedTemporaryFile(prefix="nmap_udp_", suffix=".xml", delete=False) as tmp: report_path = tmp.name cmd = [ "nmap", "-sU", "-T3", "--min-rate", str(min_rate), "-oX", str(report_path), ] if assume_up: cmd.append("-Pn") cmd.extend(["-p", port_str]) cmd.extend(targets_list) _log(f"UDP scan (global ports): {port_str} on {len(targets_list)} host(s)") self._run_nmap(cmd) results = self.parse_nmap_xml(report_path) try: os.remove(report_path) except OSError: pass return results # Case 2: per-host behavior using self.scan_config.scan_targets # Build per-IP port tuple (empty tuple => use nmap's default top UDP ports) ip_to_ports: Dict[str, Tuple[int, ...]] = {} # Prefer the IPs present in self.target_list (order/selection comes from there) # Map from ScanConfigFile / ScanTarget cfg_targets = getattr(getattr(self, "scan_config", None), "scan_targets", []) or [] # Build quick lookup from config conf_map: Dict[str, List[int]] = {} for t in cfg_targets: # Support either dataclass (attrs) or dict-like ip = getattr(t, "ip", None) if hasattr(t, "ip") else t.get("ip") if not ip: continue raw_udp = getattr(t, "expected_udp", None) if hasattr(t, "expected_udp") else t.get("expected_udp", []) conf_map[ip] = list(raw_udp or []) for ip in targets_list: raw = conf_map.get(ip, []) if raw: ip_to_ports[ip] = tuple(sorted(int(p) for p in raw)) else: ip_to_ports[ip] = () # empty => use nmap defaults (top UDP ports) # Group hosts by identical port tuple groups: Dict[Tuple[int, ...], List[str]] = {} for ip, port_tuple in ip_to_ports.items(): groups.setdefault(port_tuple, []).append(ip) all_result_sets: List[List[HostResult]] = [] for port_tuple, ips in groups.items(): # Per-group report path with tempfile.NamedTemporaryFile(prefix="nmap_udp_", suffix=".xml", delete=False) as tmp: report_path = tmp.name cmd = [ "nmap", "-sU", "-T3", "--min-rate", str(min_rate), "-oX", str(report_path), ] if assume_up: cmd.append("-Pn") if port_tuple: # explicit per-group ports port_str = ",".join(str(p) for p in port_tuple) cmd.extend(["-p", port_str]) _log(f"UDP scan (explicit ports {port_str}) on {len(ips)} host(s): {', '.join(ips)}") else: # no -p -> nmap defaults to its top UDP ports _log(f"UDP scan (nmap top UDP ports) on {len(ips)} host(s): {', '.join(ips)}") cmd.extend(ips) self._run_nmap(cmd) result = self.parse_nmap_xml(report_path) all_result_sets.append(result) try: os.remove(report_path) except OSError: pass if not all_result_sets: return [] # Merge per-run results into final list return self.merge_host_results(*all_result_sets) def merge_host_results(self, *result_sets: List[HostResult]) -> List[HostResult]: """ Merge multiple lists of HostResult (e.g., TCP set + UDP set) by address. Ports are combined; hostnames preserved if found in any set. """ merged: Dict[str, HostResult] = {} for results in result_sets: for hr in results: if hr.address not in merged: merged[hr.address] = HostResult(address=hr.address, host=hr.host, ports=list(hr.ports)) else: existing = merged[hr.address] # prefer a hostname if we didn't have one yet if not existing.host and hr.host: existing.host = hr.host # merge ports (avoid dupes) existing_ports_key = {(p.protocol, p.port, p.state, p.service) for p in existing.ports} for p in hr.ports: key = (p.protocol, p.port, p.state, p.service) if key not in existing_ports_key: existing.ports.append(p) existing_ports_key.add(key) # Sort ports in each host for stable output for hr in merged.values(): hr.ports.sort(key=lambda p: (p.protocol, p.port)) return sorted(merged.values(), key=lambda h: h.address) def parse_nmap_xml(self, xml_path: Path) -> List[HostResult]: """ Parse an Nmap XML file into a list of HostResult objects. Captures per-port protocol, state, and optional service name. """ tree = ET.parse(str(xml_path)) root = tree.getroot() results: List[HostResult] = [] for host_el in root.findall("host"): # Address addr_el = host_el.find("address") if addr_el is None: continue address = addr_el.get("addr", "") # Hostname (if present) hostname: Optional[str] = None hostnames_el = host_el.find("hostnames") if hostnames_el is not None: hn_el = hostnames_el.find("hostname") if hn_el is not None: hostname = hn_el.get("name") findings: List[PortFinding] = [] ports_el = host_el.find("ports") if ports_el is not None: for port_el in ports_el.findall("port"): protocol = (port_el.get("protocol") or "").lower() # 'tcp' or 'udp' portid_str = port_el.get("portid", "0") try: portid = int(portid_str) except ValueError: continue state_el = port_el.find("state") state = (state_el.get("state") if state_el is not None else "unknown").lower() # optional service info service_el = port_el.find("service") service_name = service_el.get("name") if service_el is not None else None findings.append(PortFinding(port=portid, protocol=protocol, state=state, service=service_name)) results.append(HostResult(address=address, host=hostname, ports=findings)) return results def _run_nmap(self, cmd: List[str]) -> None: """ Run a command and raise on non-zero exit with a readable message. """ try: subprocess.run(cmd, check=True,stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except subprocess.CalledProcessError as exc: raise RuntimeError(f"Command failed ({exc.returncode}): {' '.join(cmd)}") from exc def cleanup(self): if self.TCP_REPORT_PATH.exists(): self.TCP_REPORT_PATH.unlink() if self.UDP_REPORT_PATH.exists(): self.UDP_REPORT_PATH.unlink() # if self.NMAP_RESULTS_PATH.exists: # self.NMAP_RESULTS_PATH.unlink()