282 lines
10 KiB
Python
282 lines
10 KiB
Python
from __future__ import annotations
|
|
import os
|
|
import subprocess
|
|
import xml.etree.ElementTree as ET
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Iterable, List, Dict, Optional, Tuple, Union
|
|
|
|
from utils.models import HostResult, PortFinding
|
|
from utils.scan_config_loader import ScanConfigFile,ScanTarget
|
|
|
|
class nmap_scanner:
|
|
|
|
TCP_REPORT_PATH = Path() / "data" / "nmap-tcp-results.xml"
|
|
UDP_REPORT_PATH = Path() / "data" / "nmap-udp-results.xml"
|
|
NMAP_RESULTS_PATH = Path() / "data" / "nmap-results.xml"
|
|
|
|
def __init__(self, config:ScanConfigFile):
|
|
self.scan_config = config
|
|
self.targets = config.scan_targets
|
|
self.target_list = [t.ip for t in config.scan_targets]
|
|
self.scan_udp = config.scan_options.udp_scan
|
|
pass
|
|
|
|
def scan_targets(self):
|
|
tcp_results = self.run_nmap_tcp_all()
|
|
|
|
if self.scan_udp:
|
|
udp_results = self.run_nmap_udp()
|
|
all_results: List[HostResult] = self.merge_host_results(tcp_results,udp_results)
|
|
else:
|
|
all_results = tcp_results
|
|
|
|
return all_results
|
|
|
|
def run_nmap_tcp_all(self, min_rate: int = 1000, assume_up: bool = True) -> List[HostResult]:
|
|
"""
|
|
Run a TCP SYN scan across all ports (0-65535) for the given targets and parse results.
|
|
Returns a list of HostResult objects.
|
|
"""
|
|
targets_list = self.target_list
|
|
if not targets_list:
|
|
return []
|
|
|
|
cmd = [
|
|
"nmap",
|
|
"-sS", # TCP SYN scan
|
|
"-p-", # all TCP ports
|
|
"-T4",
|
|
"--min-rate", str(min_rate),
|
|
"-oX", str(self.TCP_REPORT_PATH),
|
|
]
|
|
if assume_up:
|
|
cmd.append("-Pn")
|
|
cmd.extend(targets_list)
|
|
|
|
self._run_nmap(cmd)
|
|
return self.parse_nmap_xml(self.TCP_REPORT_PATH)
|
|
|
|
def run_nmap_udp(
|
|
self,
|
|
ports: Optional[Iterable[int]] = None,
|
|
min_rate: int = 500,
|
|
assume_up: bool = True,
|
|
) -> List[HostResult]:
|
|
"""
|
|
Run UDP scans.
|
|
|
|
Behavior:
|
|
- If `ports` is provided -> single nmap run against all targets using that port list.
|
|
- If `ports` is None ->
|
|
* For hosts with `expected_udp` defined and non-empty: scan only those ports.
|
|
* For hosts with no `expected_udp` (or empty): omit `-p` so nmap uses its default top UDP ports.
|
|
Hosts sharing the same explicit UDP port set are grouped into one nmap run.
|
|
Returns:
|
|
Merged List[HostResult] across all runs.
|
|
"""
|
|
targets_list = getattr(self, "target_list", [])
|
|
if not targets_list:
|
|
return []
|
|
|
|
# Optional logger (don't fail if not present)
|
|
logger = getattr(self, "logger", None)
|
|
def _log(msg: str) -> None:
|
|
if logger:
|
|
logger.info(msg)
|
|
else:
|
|
print(msg)
|
|
|
|
# Case 1: caller provided a global port list -> one run, all targets
|
|
if ports:
|
|
port_list = sorted({int(p) for p in ports})
|
|
port_str = ",".join(str(p) for p in port_list)
|
|
|
|
with tempfile.NamedTemporaryFile(prefix="nmap_udp_", suffix=".xml", delete=False) as tmp:
|
|
report_path = tmp.name
|
|
|
|
cmd = [
|
|
"nmap",
|
|
"-sU",
|
|
"-T3",
|
|
"--min-rate", str(min_rate),
|
|
"-oX", str(report_path),
|
|
]
|
|
if assume_up:
|
|
cmd.append("-Pn")
|
|
cmd.extend(["-p", port_str])
|
|
cmd.extend(targets_list)
|
|
|
|
_log(f"UDP scan (global ports): {port_str} on {len(targets_list)} host(s)")
|
|
self._run_nmap(cmd)
|
|
results = self.parse_nmap_xml(report_path)
|
|
try:
|
|
os.remove(report_path)
|
|
except OSError:
|
|
pass
|
|
return results
|
|
|
|
# Case 2: per-host behavior using self.scan_config.scan_targets
|
|
# Build per-IP port tuple (empty tuple => use nmap's default top UDP ports)
|
|
ip_to_ports: Dict[str, Tuple[int, ...]] = {}
|
|
|
|
# Prefer the IPs present in self.target_list (order/selection comes from there)
|
|
# Map from ScanConfigFile / ScanTarget
|
|
cfg_targets = getattr(getattr(self, "scan_config", None), "scan_targets", []) or []
|
|
# Build quick lookup from config
|
|
conf_map: Dict[str, List[int]] = {}
|
|
for t in cfg_targets:
|
|
# Support either dataclass (attrs) or dict-like
|
|
ip = getattr(t, "ip", None) if hasattr(t, "ip") else t.get("ip")
|
|
if not ip:
|
|
continue
|
|
raw_udp = getattr(t, "expected_udp", None) if hasattr(t, "expected_udp") else t.get("expected_udp", [])
|
|
conf_map[ip] = list(raw_udp or [])
|
|
|
|
for ip in targets_list:
|
|
raw = conf_map.get(ip, [])
|
|
if raw:
|
|
ip_to_ports[ip] = tuple(sorted(int(p) for p in raw))
|
|
else:
|
|
ip_to_ports[ip] = () # empty => use nmap defaults (top UDP ports)
|
|
|
|
# Group hosts by identical port tuple
|
|
groups: Dict[Tuple[int, ...], List[str]] = {}
|
|
for ip, port_tuple in ip_to_ports.items():
|
|
groups.setdefault(port_tuple, []).append(ip)
|
|
|
|
all_result_sets: List[List[HostResult]] = []
|
|
|
|
for port_tuple, ips in groups.items():
|
|
# Per-group report path
|
|
with tempfile.NamedTemporaryFile(prefix="nmap_udp_", suffix=".xml", delete=False) as tmp:
|
|
report_path = tmp.name
|
|
|
|
cmd = [
|
|
"nmap",
|
|
"-sU",
|
|
"-T3",
|
|
"--min-rate", str(min_rate),
|
|
"-oX", str(report_path),
|
|
]
|
|
if assume_up:
|
|
cmd.append("-Pn")
|
|
|
|
if port_tuple:
|
|
# explicit per-group ports
|
|
port_str = ",".join(str(p) for p in port_tuple)
|
|
cmd.extend(["-p", port_str])
|
|
_log(f"UDP scan (explicit ports {port_str}) on {len(ips)} host(s): {', '.join(ips)}")
|
|
else:
|
|
# no -p -> nmap defaults to its top UDP ports
|
|
_log(f"UDP scan (nmap top UDP ports) on {len(ips)} host(s): {', '.join(ips)}")
|
|
|
|
cmd.extend(ips)
|
|
|
|
self._run_nmap(cmd)
|
|
result = self.parse_nmap_xml(report_path)
|
|
all_result_sets.append(result)
|
|
try:
|
|
os.remove(report_path)
|
|
except OSError:
|
|
pass
|
|
|
|
if not all_result_sets:
|
|
return []
|
|
|
|
# Merge per-run results into final list
|
|
return self.merge_host_results(*all_result_sets)
|
|
|
|
def merge_host_results(self, *result_sets: List[HostResult]) -> List[HostResult]:
|
|
"""
|
|
Merge multiple lists of HostResult (e.g., TCP set + UDP set) by address.
|
|
Ports are combined; hostnames preserved if found in any set.
|
|
"""
|
|
merged: Dict[str, HostResult] = {}
|
|
for results in result_sets:
|
|
for hr in results:
|
|
if hr.address not in merged:
|
|
merged[hr.address] = HostResult(address=hr.address, host=hr.host, ports=list(hr.ports))
|
|
else:
|
|
existing = merged[hr.address]
|
|
# prefer a hostname if we didn't have one yet
|
|
if not existing.host and hr.host:
|
|
existing.host = hr.host
|
|
# merge ports (avoid dupes)
|
|
existing_ports_key = {(p.protocol, p.port, p.state, p.service) for p in existing.ports}
|
|
for p in hr.ports:
|
|
key = (p.protocol, p.port, p.state, p.service)
|
|
if key not in existing_ports_key:
|
|
existing.ports.append(p)
|
|
existing_ports_key.add(key)
|
|
# Sort ports in each host for stable output
|
|
for hr in merged.values():
|
|
hr.ports.sort(key=lambda p: (p.protocol, p.port))
|
|
return sorted(merged.values(), key=lambda h: h.address)
|
|
|
|
def parse_nmap_xml(self, xml_path: Path) -> List[HostResult]:
|
|
"""
|
|
Parse an Nmap XML file into a list of HostResult objects.
|
|
Captures per-port protocol, state, and optional service name.
|
|
"""
|
|
tree = ET.parse(str(xml_path))
|
|
root = tree.getroot()
|
|
|
|
results: List[HostResult] = []
|
|
|
|
for host_el in root.findall("host"):
|
|
# Address
|
|
addr_el = host_el.find("address")
|
|
if addr_el is None:
|
|
continue
|
|
address = addr_el.get("addr", "")
|
|
|
|
# Hostname (if present)
|
|
hostname: Optional[str] = None
|
|
hostnames_el = host_el.find("hostnames")
|
|
if hostnames_el is not None:
|
|
hn_el = hostnames_el.find("hostname")
|
|
if hn_el is not None:
|
|
hostname = hn_el.get("name")
|
|
|
|
findings: List[PortFinding] = []
|
|
ports_el = host_el.find("ports")
|
|
if ports_el is not None:
|
|
for port_el in ports_el.findall("port"):
|
|
protocol = (port_el.get("protocol") or "").lower() # 'tcp' or 'udp'
|
|
portid_str = port_el.get("portid", "0")
|
|
try:
|
|
portid = int(portid_str)
|
|
except ValueError:
|
|
continue
|
|
|
|
state_el = port_el.find("state")
|
|
state = (state_el.get("state") if state_el is not None else "unknown").lower()
|
|
|
|
# optional service info
|
|
service_el = port_el.find("service")
|
|
service_name = service_el.get("name") if service_el is not None else None
|
|
|
|
findings.append(PortFinding(port=portid, protocol=protocol, state=state, service=service_name))
|
|
|
|
results.append(HostResult(address=address, host=hostname, ports=findings))
|
|
|
|
return results
|
|
|
|
def _run_nmap(self, cmd: List[str]) -> None:
|
|
"""
|
|
Run a command and raise on non-zero exit with a readable message.
|
|
"""
|
|
try:
|
|
subprocess.run(cmd, check=True)
|
|
except subprocess.CalledProcessError as exc:
|
|
raise RuntimeError(f"Command failed ({exc.returncode}): {' '.join(cmd)}") from exc
|
|
|
|
def cleanup(self):
|
|
if self.TCP_REPORT_PATH.exists():
|
|
self.TCP_REPORT_PATH.unlink()
|
|
if self.UDP_REPORT_PATH.exists():
|
|
self.UDP_REPORT_PATH.unlink()
|
|
# if self.NMAP_RESULTS_PATH.exists:
|
|
# self.NMAP_RESULTS_PATH.unlink()
|
|
|