from __future__ import annotations import os import subprocess import xml.etree.ElementTree as ET from pathlib import Path from typing import Iterable, List, Dict, Optional, Tuple from utils.models import HostResult, PortFinding class nmap_scanner: TCP_REPORT_PATH = Path() / "data" / "nmap-tcp-results.xml" UDP_REPORT_PATH = Path() / "data" / "nmap-udp-results.xml" NMAP_RESULTS_PATH = Path() / "data" / "nmap-results.xml" def __init__(self, targets:Iterable[str],scan_udp=False): self.targets = list(targets) self.scan_udp = scan_udp pass def scan_targets(self): tcp_results = self.run_nmap_tcp_all() if self.scan_udp: udp_results = self.run_nmap_udp() all_results = List[HostResult] = self.merge_host_results(tcp_results,udp_results) else: all_results = tcp_results return all_results def run_nmap_tcp_all(self, min_rate: int = 1000, assume_up: bool = True) -> List[HostResult]: """ Run a TCP SYN scan across all ports (0-65535) for the given targets and parse results. Returns a list of HostResult objects. """ targets_list = self.targets if not targets_list: return [] cmd = [ "nmap", "-sS", # TCP SYN scan "-p-", # all TCP ports "-T4", "--min-rate", str(min_rate), "-oX", str(self.TCP_REPORT_PATH), ] if assume_up: cmd.append("-Pn") cmd.extend(targets_list) self._run_nmap(cmd) return self.parse_nmap_xml(self.TCP_REPORT_PATH) def run_nmap_udp(self, ports: Optional[Iterable[int]] = None, min_rate: int = 500, assume_up: bool = True) -> List[HostResult]: """ Run a UDP scan for the provided ports (recommended to keep this list small). If 'ports' is None, nmap defaults to its "top" UDP ports; full -p- UDP is very slow. """ targets_list = self.targets if not targets_list: return [] cmd = [ "nmap", "-sU", # UDP scan "-T3", # less aggressive timing by default for UDP "--min-rate", str(min_rate), "-oX", str(self.UDP_REPORT_PATH), ] if assume_up: cmd.append("-Pn") if ports: # Explicit port set port_list = sorted(set(int(p) for p in ports)) port_str = ",".join(str(p) for p in port_list) cmd.extend(["-p", port_str]) cmd.extend(targets_list) self._run_nmap(cmd) return self.parse_nmap_xml(self.UDP_REPORT_PATH) def merge_host_results(self, *result_sets: List[HostResult]) -> List[HostResult]: """ Merge multiple lists of HostResult (e.g., TCP set + UDP set) by address. Ports are combined; hostnames preserved if found in any set. """ merged: Dict[str, HostResult] = {} for results in result_sets: for hr in results: if hr.address not in merged: merged[hr.address] = HostResult(address=hr.address, host=hr.host, ports=list(hr.ports)) else: existing = merged[hr.address] # prefer a hostname if we didn't have one yet if not existing.host and hr.host: existing.host = hr.host # merge ports (avoid dupes) existing_ports_key = {(p.protocol, p.port, p.state, p.service) for p in existing.ports} for p in hr.ports: key = (p.protocol, p.port, p.state, p.service) if key not in existing_ports_key: existing.ports.append(p) existing_ports_key.add(key) # Sort ports in each host for stable output for hr in merged.values(): hr.ports.sort(key=lambda p: (p.protocol, p.port)) return sorted(merged.values(), key=lambda h: h.address) def parse_nmap_xml(self, xml_path: Path) -> List[HostResult]: """ Parse an Nmap XML file into a list of HostResult objects. Captures per-port protocol, state, and optional service name. """ tree = ET.parse(str(xml_path)) root = tree.getroot() results: List[HostResult] = [] for host_el in root.findall("host"): # Address addr_el = host_el.find("address") if addr_el is None: continue address = addr_el.get("addr", "") # Hostname (if present) hostname: Optional[str] = None hostnames_el = host_el.find("hostnames") if hostnames_el is not None: hn_el = hostnames_el.find("hostname") if hn_el is not None: hostname = hn_el.get("name") findings: List[PortFinding] = [] ports_el = host_el.find("ports") if ports_el is not None: for port_el in ports_el.findall("port"): protocol = (port_el.get("protocol") or "").lower() # 'tcp' or 'udp' portid_str = port_el.get("portid", "0") try: portid = int(portid_str) except ValueError: continue state_el = port_el.find("state") state = (state_el.get("state") if state_el is not None else "unknown").lower() # optional service info service_el = port_el.find("service") service_name = service_el.get("name") if service_el is not None else None findings.append(PortFinding(port=portid, protocol=protocol, state=state, service=service_name)) results.append(HostResult(address=address, host=hostname, ports=findings)) return results def _run_nmap(self, cmd: List[str]) -> None: """ Run a command and raise on non-zero exit with a readable message. """ try: subprocess.run(cmd, check=True) except subprocess.CalledProcessError as exc: raise RuntimeError(f"Command failed ({exc.returncode}): {' '.join(cmd)}") from exc def cleanup(self): if self.TCP_REPORT_PATH.exists(): self.TCP_REPORT_PATH.unlink() if self.UDP_REPORT_PATH.exists(): self.UDP_REPORT_PATH.unlink() if self.NMAP_RESULTS_PATH.exists: self.NMAP_RESULTS_PATH.unlink()