Files
mass-scan2/app/utils/scanner.py

282 lines
10 KiB
Python

from __future__ import annotations
import os
import subprocess
import xml.etree.ElementTree as ET
import tempfile
from pathlib import Path
from typing import Iterable, List, Dict, Optional, Tuple, Union
from utils.models import HostResult, PortFinding
from utils.scan_config_loader import ScanConfigFile,ScanTarget
class nmap_scanner:
TCP_REPORT_PATH = Path() / "data" / "nmap-tcp-results.xml"
UDP_REPORT_PATH = Path() / "data" / "nmap-udp-results.xml"
NMAP_RESULTS_PATH = Path() / "data" / "nmap-results.xml"
def __init__(self, config:ScanConfigFile):
self.scan_config = config
self.targets = config.scan_targets
self.target_list = [t.ip for t in config.scan_targets]
self.scan_udp = config.scan_options.udp_scan
pass
def scan_targets(self):
tcp_results = self.run_nmap_tcp_all()
if self.scan_udp:
udp_results = self.run_nmap_udp()
all_results: List[HostResult] = self.merge_host_results(tcp_results,udp_results)
else:
all_results = tcp_results
return all_results
def run_nmap_tcp_all(self, min_rate: int = 1000, assume_up: bool = True) -> List[HostResult]:
"""
Run a TCP SYN scan across all ports (0-65535) for the given targets and parse results.
Returns a list of HostResult objects.
"""
targets_list = self.target_list
if not targets_list:
return []
cmd = [
"nmap",
"-sS", # TCP SYN scan
"-p-", # all TCP ports
"-T4",
"--min-rate", str(min_rate),
"-oX", str(self.TCP_REPORT_PATH),
]
if assume_up:
cmd.append("-Pn")
cmd.extend(targets_list)
self._run_nmap(cmd)
return self.parse_nmap_xml(self.TCP_REPORT_PATH)
def run_nmap_udp(
self,
ports: Optional[Iterable[int]] = None,
min_rate: int = 500,
assume_up: bool = True,
) -> List[HostResult]:
"""
Run UDP scans.
Behavior:
- If `ports` is provided -> single nmap run against all targets using that port list.
- If `ports` is None ->
* For hosts with `expected_udp` defined and non-empty: scan only those ports.
* For hosts with no `expected_udp` (or empty): omit `-p` so nmap uses its default top UDP ports.
Hosts sharing the same explicit UDP port set are grouped into one nmap run.
Returns:
Merged List[HostResult] across all runs.
"""
targets_list = getattr(self, "target_list", [])
if not targets_list:
return []
# Optional logger (don't fail if not present)
logger = getattr(self, "logger", None)
def _log(msg: str) -> None:
if logger:
logger.info(msg)
else:
print(msg)
# Case 1: caller provided a global port list -> one run, all targets
if ports:
port_list = sorted({int(p) for p in ports})
port_str = ",".join(str(p) for p in port_list)
with tempfile.NamedTemporaryFile(prefix="nmap_udp_", suffix=".xml", delete=False) as tmp:
report_path = tmp.name
cmd = [
"nmap",
"-sU",
"-T3",
"--min-rate", str(min_rate),
"-oX", str(report_path),
]
if assume_up:
cmd.append("-Pn")
cmd.extend(["-p", port_str])
cmd.extend(targets_list)
_log(f"UDP scan (global ports): {port_str} on {len(targets_list)} host(s)")
self._run_nmap(cmd)
results = self.parse_nmap_xml(report_path)
try:
os.remove(report_path)
except OSError:
pass
return results
# Case 2: per-host behavior using self.scan_config.scan_targets
# Build per-IP port tuple (empty tuple => use nmap's default top UDP ports)
ip_to_ports: Dict[str, Tuple[int, ...]] = {}
# Prefer the IPs present in self.target_list (order/selection comes from there)
# Map from ScanConfigFile / ScanTarget
cfg_targets = getattr(getattr(self, "scan_config", None), "scan_targets", []) or []
# Build quick lookup from config
conf_map: Dict[str, List[int]] = {}
for t in cfg_targets:
# Support either dataclass (attrs) or dict-like
ip = getattr(t, "ip", None) if hasattr(t, "ip") else t.get("ip")
if not ip:
continue
raw_udp = getattr(t, "expected_udp", None) if hasattr(t, "expected_udp") else t.get("expected_udp", [])
conf_map[ip] = list(raw_udp or [])
for ip in targets_list:
raw = conf_map.get(ip, [])
if raw:
ip_to_ports[ip] = tuple(sorted(int(p) for p in raw))
else:
ip_to_ports[ip] = () # empty => use nmap defaults (top UDP ports)
# Group hosts by identical port tuple
groups: Dict[Tuple[int, ...], List[str]] = {}
for ip, port_tuple in ip_to_ports.items():
groups.setdefault(port_tuple, []).append(ip)
all_result_sets: List[List[HostResult]] = []
for port_tuple, ips in groups.items():
# Per-group report path
with tempfile.NamedTemporaryFile(prefix="nmap_udp_", suffix=".xml", delete=False) as tmp:
report_path = tmp.name
cmd = [
"nmap",
"-sU",
"-T3",
"--min-rate", str(min_rate),
"-oX", str(report_path),
]
if assume_up:
cmd.append("-Pn")
if port_tuple:
# explicit per-group ports
port_str = ",".join(str(p) for p in port_tuple)
cmd.extend(["-p", port_str])
_log(f"UDP scan (explicit ports {port_str}) on {len(ips)} host(s): {', '.join(ips)}")
else:
# no -p -> nmap defaults to its top UDP ports
_log(f"UDP scan (nmap top UDP ports) on {len(ips)} host(s): {', '.join(ips)}")
cmd.extend(ips)
self._run_nmap(cmd)
result = self.parse_nmap_xml(report_path)
all_result_sets.append(result)
try:
os.remove(report_path)
except OSError:
pass
if not all_result_sets:
return []
# Merge per-run results into final list
return self.merge_host_results(*all_result_sets)
def merge_host_results(self, *result_sets: List[HostResult]) -> List[HostResult]:
"""
Merge multiple lists of HostResult (e.g., TCP set + UDP set) by address.
Ports are combined; hostnames preserved if found in any set.
"""
merged: Dict[str, HostResult] = {}
for results in result_sets:
for hr in results:
if hr.address not in merged:
merged[hr.address] = HostResult(address=hr.address, host=hr.host, ports=list(hr.ports))
else:
existing = merged[hr.address]
# prefer a hostname if we didn't have one yet
if not existing.host and hr.host:
existing.host = hr.host
# merge ports (avoid dupes)
existing_ports_key = {(p.protocol, p.port, p.state, p.service) for p in existing.ports}
for p in hr.ports:
key = (p.protocol, p.port, p.state, p.service)
if key not in existing_ports_key:
existing.ports.append(p)
existing_ports_key.add(key)
# Sort ports in each host for stable output
for hr in merged.values():
hr.ports.sort(key=lambda p: (p.protocol, p.port))
return sorted(merged.values(), key=lambda h: h.address)
def parse_nmap_xml(self, xml_path: Path) -> List[HostResult]:
"""
Parse an Nmap XML file into a list of HostResult objects.
Captures per-port protocol, state, and optional service name.
"""
tree = ET.parse(str(xml_path))
root = tree.getroot()
results: List[HostResult] = []
for host_el in root.findall("host"):
# Address
addr_el = host_el.find("address")
if addr_el is None:
continue
address = addr_el.get("addr", "")
# Hostname (if present)
hostname: Optional[str] = None
hostnames_el = host_el.find("hostnames")
if hostnames_el is not None:
hn_el = hostnames_el.find("hostname")
if hn_el is not None:
hostname = hn_el.get("name")
findings: List[PortFinding] = []
ports_el = host_el.find("ports")
if ports_el is not None:
for port_el in ports_el.findall("port"):
protocol = (port_el.get("protocol") or "").lower() # 'tcp' or 'udp'
portid_str = port_el.get("portid", "0")
try:
portid = int(portid_str)
except ValueError:
continue
state_el = port_el.find("state")
state = (state_el.get("state") if state_el is not None else "unknown").lower()
# optional service info
service_el = port_el.find("service")
service_name = service_el.get("name") if service_el is not None else None
findings.append(PortFinding(port=portid, protocol=protocol, state=state, service=service_name))
results.append(HostResult(address=address, host=hostname, ports=findings))
return results
def _run_nmap(self, cmd: List[str]) -> None:
"""
Run a command and raise on non-zero exit with a readable message.
"""
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as exc:
raise RuntimeError(f"Command failed ({exc.returncode}): {' '.join(cmd)}") from exc
def cleanup(self):
if self.TCP_REPORT_PATH.exists():
self.TCP_REPORT_PATH.unlink()
if self.UDP_REPORT_PATH.exists():
self.UDP_REPORT_PATH.unlink()
# if self.NMAP_RESULTS_PATH.exists:
# self.NMAP_RESULTS_PATH.unlink()