From 9956667c8f1a3849c32e2e59101bc6bba262c52f Mon Sep 17 00:00:00 2001 From: Phillip Tarrant Date: Fri, 17 Oct 2025 13:54:04 -0500 Subject: [PATCH] init commit --- .gitignore | 2 + Dockerfile | 30 +++++ app/main.py | 108 ++++++++++++++++ app/reporting_jinja.py | 95 +++++++++++++++ app/requirements.txt | 2 + app/templates/report.html.j2 | 116 ++++++++++++++++++ app/utils/models.py | 30 +++++ app/utils/scan_config_loader.py | 210 ++++++++++++++++++++++++++++++++ app/utils/scanner.py | 179 +++++++++++++++++++++++++++ app/utils/settings.py | 127 +++++++++++++++++++ data/expected.json | 5 + data/report.html | 121 ++++++++++++++++++ data/rw-expected.json | 34 ++++++ data/scan_targets/corp-wan.yaml | 15 +++ data/scan_targets/dmz.yaml | 23 ++++ data/settings.yaml | 11 ++ docker-compose.yaml | 16 +++ 17 files changed, 1124 insertions(+) create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 app/main.py create mode 100644 app/reporting_jinja.py create mode 100644 app/requirements.txt create mode 100644 app/templates/report.html.j2 create mode 100644 app/utils/models.py create mode 100644 app/utils/scan_config_loader.py create mode 100644 app/utils/scanner.py create mode 100644 app/utils/settings.py create mode 100644 data/expected.json create mode 100644 data/report.html create mode 100644 data/rw-expected.json create mode 100644 data/scan_targets/corp-wan.yaml create mode 100644 data/scan_targets/dmz.yaml create mode 100644 data/settings.yaml create mode 100644 docker-compose.yaml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1a6d212 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/venv/ +.env \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..e6c24ad --- /dev/null +++ b/Dockerfile @@ -0,0 +1,30 @@ +# Ubuntu slim base +FROM ubuntu:24.04 + +ENV DEBIAN_FRONTEND=noninteractive \ + PYTHONUNBUFFERED=1 \ + VIRTUAL_ENV=/opt/venv \ + PATH="/opt/venv/bin:$PATH" + +# copy only requirements first so pip install can be cached +COPY app/requirements.txt /app/requirements.txt + +# Minimal runtime: python3 + nmap (+ ca certs/tzdata), clean apt cache +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + python3 python3-venv python3-pip \ + nmap ca-certificates tzdata && \ + rm -rf /var/lib/apt/lists/* + +# ---- Create & activate venv ---- +RUN python3 -m venv $VIRTUAL_ENV + +# ---- Install Python deps into venv ---- +RUN pip install --no-cache-dir -r /app/requirements.txt + +# ---- App code ---- +WORKDIR /app +COPY app/ /app/ + +# Default command +ENTRYPOINT ["python3", "/app/main.py"] diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..daf5ad4 --- /dev/null +++ b/app/main.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 +""" +port_checker.py +- expects `expected.json` in same dir (see format below) +- writes nmap XML to a temp file, parses, compares, prints a report +""" +import os +import json +import subprocess +import tempfile +from datetime import datetime +import xml.etree.ElementTree as ET +from pathlib import Path +from typing import Dict, List, Set + +from utils.scanner import nmap_scanner +from utils.models import HostResult +from reporting_jinja import write_html_report_jinja + +EXPECTED_FILE = Path() / "data" / "expected.json" +HTML_REPORT_FILE = Path() / "data" / "report.html" + +def load_expected(path: Path) -> Dict[str, Dict[str, Set[int]]]: + with path.open() as fh: + arr = json.load(fh) + out = {} + for entry in arr: + ip = entry["ip"] + out[ip] = { + "expected_tcp": set(entry.get("expected_tcp", [])), + "expected_udp": set(entry.get("expected_udp", [])), + } + return out + +# def write_targets(expected: Dict[str, Dict[str, Set[int]]], path: Path) -> None: + path.write_text("\n".join(sorted(expected.keys())) + "\n") + +# +def results_to_open_sets( + results: List[HostResult], + count_as_open: Set[str] = frozenset({"open", "open|filtered"})) -> Dict[str, Dict[str, Set[int]]]: + """ + Convert HostResult list to: + { ip: {"tcp": {open ports}, "udp": {open ports}} } + Only include ports whose state is in `count_as_open`. + """ + out: Dict[str, Dict[str, Set[int]]] = {} + for hr in results: + tcp = set() + udp = set() + for p in hr.ports: + if p.state.lower() in count_as_open: + (tcp if p.protocol == "tcp" else udp).add(p.port) + out[hr.address] = {"tcp": tcp, "udp": udp} + return out + +# Build the "reports" dict (what the HTML renderer expects) +def build_reports( + expected: Dict[str, Dict[str, Set[int]]], + discovered: Dict[str, Dict[str, Set[int]]], +) -> Dict[str, Dict[str, List[int]]]: + """ + Create the per-IP delta structure: + { + ip: { + "unexpected_tcp": [...], + "missing_tcp": [...], + "unexpected_udp": [...], + "missing_udp": [...] + } + } + """ + reports: Dict[str, Dict[str, List[int]]] = {} + all_ips = set(expected.keys()) | set(discovered.keys()) + + for ip in sorted(all_ips): + exp_tcp = expected.get(ip, {}).get("expected_tcp", set()) + exp_udp = expected.get(ip, {}).get("expected_udp", set()) + disc_tcp = discovered.get(ip, {}).get("tcp", set()) + disc_udp = discovered.get(ip, {}).get("udp", set()) + + reports[ip] = { + "unexpected_tcp": sorted(disc_tcp - exp_tcp), + "missing_tcp": sorted(exp_tcp - disc_tcp), + "unexpected_udp": sorted(disc_udp - exp_udp), + "missing_udp": sorted(exp_udp - disc_udp), + } + return reports + +def main(): + + # repo = ScanConfigRepository() + + if not EXPECTED_FILE.exists(): + print("Expected File not found") + return + + expected = load_expected(EXPECTED_FILE) + targets = sorted(expected.keys()) + scanner = nmap_scanner(targets) + scan_results = scanner.scan_targets() + discovered_sets = results_to_open_sets(scan_results, count_as_open={"open", "open|filtered"}) + reports = build_reports(expected, discovered_sets) + write_html_report_jinja(reports=reports,host_results=scan_results,out_path=HTML_REPORT_FILE,title="Compliance Report",only_issues=True) + scanner.cleanup() + +if __name__ == "__main__": + main() diff --git a/app/reporting_jinja.py b/app/reporting_jinja.py new file mode 100644 index 0000000..e674f0d --- /dev/null +++ b/app/reporting_jinja.py @@ -0,0 +1,95 @@ +from __future__ import annotations +from datetime import datetime +from html import escape +from pathlib import Path +from typing import Dict, List + +from jinja2 import Environment, FileSystemLoader, select_autoescape + +from utils.models import HostResult + +def fmt_ports(ports: List[int]) -> str: + if not ports: + return "none" + return ", ".join(str(p) for p in sorted(set(int(x) for x in ports))) + +def badge(text: str, bg: str, fg: str = "#ffffff") -> str: + return ( + f'{escape(text)}' + ) + +def pill_state(state: str) -> str: + s = (state or "").lower() + if s == "open": + return badge("open", "#16a34a") + if s in ("open|filtered",): + return badge("open|filtered", "#0ea5e9") + if s == "filtered": + return badge("filtered", "#f59e0b", "#111111") + if s == "closed": + return badge("closed", "#ef4444") + return badge(s, "#6b7280") + +def pill_proto(proto: str) -> str: + return badge((proto or "").lower(), "#334155") + +def _env(templates_dir: Path) -> Environment: + env = Environment( + loader=FileSystemLoader(str(templates_dir)), + autoescape=select_autoescape(["html", "xml"]), + trim_blocks=True, + lstrip_blocks=True, + ) + env.globals.update(badge=badge, pill_state=pill_state, pill_proto=pill_proto, fmt_ports=fmt_ports) + return env + +def render_html_report_jinja( + reports: Dict[str, Dict[str, List[int]]], + host_results: List[HostResult], + templates_dir: Path, + template_name: str = "report.html.j2", + title: str = "Port Compliance Report", + only_issues: bool = False, +) -> str: + env = _env(templates_dir) + template = env.get_template(template_name) + + total_hosts = len(reports) + hosts_with_issues = [ip for ip, r in reports.items() if any(r.values())] + ok_hosts = total_hosts - len(hosts_with_issues) + + # No filtering — we'll show all, but only_issues changes template behavior. + by_ip = {hr.address: hr for hr in host_results} + for hr in by_ip.values(): + if hr and hr.ports: + hr.ports.sort(key=lambda p: (p.protocol, p.port)) + + html = template.render( + title=title, + generated=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + total_hosts=total_hosts, + ok_hosts=ok_hosts, + hosts_with_issues=hosts_with_issues, + reports=reports, + host_results_by_ip=by_ip, + only_issues=only_issues, + ) + return html + + +def write_html_report_jinja( + reports: Dict[str, Dict[str, List[int]]], + host_results: List[HostResult], + out_path: Path, + templates_dir: Path = Path("templates"), + template_name: str = "report.html.j2", + title: str = "Port Compliance Report", + only_issues: bool = False, +) -> Path: + html = render_html_report_jinja( + reports, host_results, templates_dir, template_name, title, only_issues + ) + out_path.write_text(html, encoding="utf-8") + return out_path diff --git a/app/requirements.txt b/app/requirements.txt new file mode 100644 index 0000000..6226544 --- /dev/null +++ b/app/requirements.txt @@ -0,0 +1,2 @@ +Jinja2==3.1.6 +MarkupSafe==3.0.3 diff --git a/app/templates/report.html.j2 b/app/templates/report.html.j2 new file mode 100644 index 0000000..d184272 --- /dev/null +++ b/app/templates/report.html.j2 @@ -0,0 +1,116 @@ + + + + + {{ title }} + + + +
+ + + + +
+
{{ title }}
+
Generated: {{ generated }}
+
+ + + + + +
+ Total hosts: {{ total_hosts }}  + Matching expected: {{ ok_hosts }}  + With issues: {{ hosts_with_issues|length }} +
+ + {% if not only_issues and ok_hosts == total_hosts and total_hosts > 0 %} +
+ All hosts matched expected ports. +
+ {% endif %} + + {% if only_issues and hosts_with_issues|length == 0 %} +
+ No hosts with issues found. ✅ +
+ {% endif %} + + {# Host sections #} + {% for ip_key, r in reports|dictsort %} + {% set has_issues = r.unexpected_tcp or r.missing_tcp or r.unexpected_udp or r.missing_udp %} + {% set hr = host_results_by_ip.get(ip_key) %} + {% set header_title = ip_key ~ ((' (' ~ hr.host ~ ')') if hr and hr.host else '') %} + + + + + + + {% if has_issues %} + + + + + {% macro delta_row(label, ports) -%} + + + + {%- endmacro %} + + {{ delta_row('Unexpected TCP open ports', r.unexpected_tcp) }} + {{ delta_row('Expected TCP ports not seen', r.missing_tcp) }} + {{ delta_row('Unexpected UDP open ports', r.unexpected_udp) }} + {{ delta_row('Expected UDP ports not seen', r.missing_udp) }} + + + + + + + + + + + + {% if hr and hr.ports %} + {% for p in hr.ports %} + + + + + + + {% endfor %} + {% else %} + + + + {% endif %} + {% else %} + {# Host has no issues #} + + + + {% endif %} +
+ {{ header_title }} {% if has_issues %} {{ badge('ISSUES', '#ef4444') }} {% else %} {{ badge('OK', '#16a34a') }} {% endif %} +
+ {{ badge('ISSUES', '#ef4444') }} +
+ {{ label }}: {{ fmt_ports(ports) }} +
+
Discovered Ports
+
ProtocolPortStateService
{{ pill_proto(p.protocol) }}{{ p.port }}{{ pill_state(p.state) }}{{ p.service or '-' }}
+ No per-port details available for this host. +
+ {{ badge('OK', '#16a34a') }}   Matches expected ports. +
+ {% endfor %} + + +
+ Report generated by mass-scan-v2 • {{ generated }} +
diff --git a/app/utils/models.py b/app/utils/models.py new file mode 100644 index 0000000..55669b8 --- /dev/null +++ b/app/utils/models.py @@ -0,0 +1,30 @@ +from __future__ import annotations +from dataclasses import dataclass, field +from typing import List, Optional + + +@dataclass +class PortFinding: + """ + A single discovered port on a host. + protocol: 'tcp' or 'udp' + state: 'open', 'closed', 'filtered', 'open|filtered', etc. + service: optional nmap-reported service name (e.g., 'ssh', 'http') + """ + port: int + protocol: str + state: str + service: Optional[str] = None + + +@dataclass +class HostResult: + """ + Results for a single host. + address: IP address (e.g., '192.0.2.10') + host: primary hostname if reported by nmap (may be None) + ports: list of PortFinding instances for this host + """ + address: str + host: Optional[str] = None + ports: List[PortFinding] = field(default_factory=list) diff --git a/app/utils/scan_config_loader.py b/app/utils/scan_config_loader.py new file mode 100644 index 0000000..bac7bc0 --- /dev/null +++ b/app/utils/scan_config_loader.py @@ -0,0 +1,210 @@ +from __future__ import annotations +from dataclasses import dataclass, field +from pathlib import Path +from typing import List, Dict, Any, Optional, Tuple +import os +import yaml +import logging + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + + +@dataclass +class ScanTarget: + """ + One IP and its expected ports. + """ + ip: str + expected_tcp: List[int] = field(default_factory=list) + expected_udp: List[int] = field(default_factory=list) + + +@dataclass +class ScanOptions: + """ + Feature toggles that affect how scans are executed. + """ + udp_scan: bool = False + tls_security_scan: bool = True + tls_exp_check: bool = True + + +@dataclass +class Reporting: + """ + Output/report preferences for this config file. + """ + report_name: str = "Scan Report" + report_filename: str = "report.html" + full_details: bool = False + + +@dataclass +class ScanConfigFile: + """ + Full configuration for a single logical scan "set" (e.g., DMZ, WAN). + """ + name: str = "Unnamed" + scan_options: ScanOptions = field(default_factory=ScanOptions) + reporting: Reporting = field(default_factory=Reporting) + scan_targets: List[ScanTarget] = field(default_factory=list) + + +class ScanConfigRepository: + """ + Loads and validates *.yaml scan configuration files from a directory. + + Search order for the config directory: + 1) Explicit path argument to load_all() + 2) Environment variable SCAN_TARGETS_DIR + 3) Default: /data/scan_targets + """ + + SUPPORTED_EXT = (".yaml", ".yml") + + def __init__(self) -> None: + self._loaded: List[ScanConfigFile] = [] + + def load_all(self, directory: Optional[Path] = None) -> List[ScanConfigFile]: + """ + Load all YAML configs from the given directory and return them. + + :param directory: Optional explicit directory path. + """ + root = self._resolve_directory(directory) + logger.info("Loading scan configs from: %s", root) + + files = sorted([p for p in root.iterdir() if p.suffix.lower() in self.SUPPORTED_EXT]) + logger.info("Found %d config file(s).", len(files)) + + configs: List[ScanConfigFile] = [] + for fpath in files: + try: + data = self._read_yaml(fpath) + cfg = self._parse_config(data, default_name=fpath.stem) + self._validate_config(cfg, source=str(fpath)) + configs.append(cfg) + logger.info("Loaded config: %s (%s targets)", cfg.name, len(cfg.scan_targets)) + except Exception as exc: + # Fail-open vs fail-fast is up to you; here we log and continue. + logger.error("Failed to load %s: %s", fpath, exc) + + self._loaded = configs + return configs + + def _resolve_directory(self, directory: Optional[Path]) -> Path: + """ + Decide which directory to load from. + """ + if directory: + return directory + env = os.getenv("SCAN_TARGETS_DIR") + if env: + return Path(env) + return Path("/data/scan_targets") + + @staticmethod + def _read_yaml(path: Path) -> Dict[str, Any]: + """ + Safely read YAML file into a Python dict. + """ + with path.open("r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + if not isinstance(data, dict): + raise ValueError("Top-level YAML must be a mapping (dict).") + return data + + @staticmethod + def _as_int_list(value: Any, field_name: str) -> List[int]: + """ + Coerce a sequence to a list of ints; raise if invalid. + """ + if value in (None, []): + return [] + if not isinstance(value, (list, tuple)): + raise TypeError(f"'{field_name}' must be a list of integers.") + out: List[int] = [] + for v in value: + if isinstance(v, bool): + # Avoid True/False being treated as 1/0 + raise TypeError(f"'{field_name}' must contain integers, not booleans.") + try: + out.append(int(v)) + except Exception as exc: + raise TypeError(f"'{field_name}' contains a non-integer: {v!r}") from exc + return out + + def _parse_config(self, data: Dict[str, Any], default_name: str) -> ScanConfigFile: + """ + Convert a raw dict (from YAML) into a validated ScanConfigFile. + """ + name = str(data.get("name", default_name)) + + # Parse scan_options + so_raw = data.get("scan_options", {}) or {} + scan_options = ScanOptions( + udp_scan=bool(so_raw.get("udp_scan", False)), + tls_security_scan=bool(so_raw.get("tls_security_scan", True)), + tls_exp_check=bool(so_raw.get("tls_exp_check", True)), + ) + + # Parse reporting + rep_raw = data.get("reporting", {}) or {} + reporting = Reporting( + report_name=str(rep_raw.get("report_name", "Scan Report")), + report_filename=str(rep_raw.get("report_filename", "report.html")), + full_details=bool(rep_raw.get("full_details", False)), + ) + + # Parse targets + targets_raw = data.get("scan_targets", []) or [] + if not isinstance(targets_raw, list): + raise TypeError("'scan_targets' must be a list.") + targets: List[ScanTarget] = [] + for idx, item in enumerate(targets_raw, start=1): + if not isinstance(item, dict): + raise TypeError(f"scan_targets[{idx}] must be a mapping (dict).") + ip = item.get("ip") + if not ip or not isinstance(ip, str): + raise ValueError(f"scan_targets[{idx}].ip must be a non-empty string.") + expected_tcp = self._as_int_list(item.get("expected_tcp", []), "expected_tcp") + expected_udp = self._as_int_list(item.get("expected_udp", []), "expected_udp") + targets.append(ScanTarget(ip=ip, expected_tcp=expected_tcp, expected_udp=expected_udp)) + + return ScanConfigFile( + name=name, + scan_options=scan_options, + reporting=reporting, + scan_targets=targets, + ) + + @staticmethod + def _validate_config(cfg: ScanConfigFile, source: str) -> None: + """ + Lightweight semantic checks. + """ + # Example: disallow duplicate IPs within a single file + seen: Dict[str, int] = {} + for t in cfg.scan_targets: + seen[t.ip] = seen.get(t.ip, 0) + 1 + dups = [ip for ip, count in seen.items() if count > 1] + if dups: + raise ValueError(f"{source}: duplicate IP(s) in scan_targets: {dups}") + + # Optional helpers + + def list_configs(self) -> List[str]: + """ + Return names of loaded configs for UI selection. + """ + return [c.name for c in self._loaded] + + def get_by_name(self, name: str) -> Optional[ScanConfigFile]: + """ + Fetch a loaded config by its name. + """ + for c in self._loaded: + if c.name == name: + return c + return None \ No newline at end of file diff --git a/app/utils/scanner.py b/app/utils/scanner.py new file mode 100644 index 0000000..51b7573 --- /dev/null +++ b/app/utils/scanner.py @@ -0,0 +1,179 @@ +from __future__ import annotations +import os +import subprocess +import xml.etree.ElementTree as ET +from pathlib import Path +from typing import Iterable, List, Dict, Optional, Tuple + +from utils.models import HostResult, PortFinding + + +class nmap_scanner: + + TCP_REPORT_PATH = Path() / "data" / "nmap-tcp-results.xml" + UDP_REPORT_PATH = Path() / "data" / "nmap-udp-results.xml" + NMAP_RESULTS_PATH = Path() / "data" / "nmap-results.xml" + + def __init__(self, targets:Iterable[str],scan_udp=False): + self.targets = list(targets) + self.scan_udp = scan_udp + pass + + def scan_targets(self): + tcp_results = self.run_nmap_tcp_all() + + if self.scan_udp: + udp_results = self.run_nmap_udp() + all_results = List[HostResult] = self.merge_host_results(tcp_results,udp_results) + else: + all_results = tcp_results + + return all_results + + def run_nmap_tcp_all(self, min_rate: int = 1000, assume_up: bool = True) -> List[HostResult]: + """ + Run a TCP SYN scan across all ports (0-65535) for the given targets and parse results. + Returns a list of HostResult objects. + """ + targets_list = self.targets + if not targets_list: + return [] + + cmd = [ + "nmap", + "-sS", # TCP SYN scan + "-p-", # all TCP ports + "-T4", + "--min-rate", str(min_rate), + "-oX", str(self.TCP_REPORT_PATH), + ] + if assume_up: + cmd.append("-Pn") + cmd.extend(targets_list) + + self._run_nmap(cmd) + return self.parse_nmap_xml(self.TCP_REPORT_PATH) + + def run_nmap_udp(self, ports: Optional[Iterable[int]] = None, min_rate: int = 500, assume_up: bool = True) -> List[HostResult]: + """ + Run a UDP scan for the provided ports (recommended to keep this list small). + If 'ports' is None, nmap defaults to its "top" UDP ports; full -p- UDP is very slow. + """ + targets_list = self.targets + if not targets_list: + return [] + + cmd = [ + "nmap", + "-sU", # UDP scan + "-T3", # less aggressive timing by default for UDP + "--min-rate", str(min_rate), + "-oX", str(self.UDP_REPORT_PATH), + ] + if assume_up: + cmd.append("-Pn") + + if ports: + # Explicit port set + port_list = sorted(set(int(p) for p in ports)) + port_str = ",".join(str(p) for p in port_list) + cmd.extend(["-p", port_str]) + + cmd.extend(targets_list) + + self._run_nmap(cmd) + return self.parse_nmap_xml(self.UDP_REPORT_PATH) + + def merge_host_results(self, *result_sets: List[HostResult]) -> List[HostResult]: + """ + Merge multiple lists of HostResult (e.g., TCP set + UDP set) by address. + Ports are combined; hostnames preserved if found in any set. + """ + merged: Dict[str, HostResult] = {} + for results in result_sets: + for hr in results: + if hr.address not in merged: + merged[hr.address] = HostResult(address=hr.address, host=hr.host, ports=list(hr.ports)) + else: + existing = merged[hr.address] + # prefer a hostname if we didn't have one yet + if not existing.host and hr.host: + existing.host = hr.host + # merge ports (avoid dupes) + existing_ports_key = {(p.protocol, p.port, p.state, p.service) for p in existing.ports} + for p in hr.ports: + key = (p.protocol, p.port, p.state, p.service) + if key not in existing_ports_key: + existing.ports.append(p) + existing_ports_key.add(key) + # Sort ports in each host for stable output + for hr in merged.values(): + hr.ports.sort(key=lambda p: (p.protocol, p.port)) + return sorted(merged.values(), key=lambda h: h.address) + + def parse_nmap_xml(self, xml_path: Path) -> List[HostResult]: + """ + Parse an Nmap XML file into a list of HostResult objects. + Captures per-port protocol, state, and optional service name. + """ + tree = ET.parse(str(xml_path)) + root = tree.getroot() + + results: List[HostResult] = [] + + for host_el in root.findall("host"): + # Address + addr_el = host_el.find("address") + if addr_el is None: + continue + address = addr_el.get("addr", "") + + # Hostname (if present) + hostname: Optional[str] = None + hostnames_el = host_el.find("hostnames") + if hostnames_el is not None: + hn_el = hostnames_el.find("hostname") + if hn_el is not None: + hostname = hn_el.get("name") + + findings: List[PortFinding] = [] + ports_el = host_el.find("ports") + if ports_el is not None: + for port_el in ports_el.findall("port"): + protocol = (port_el.get("protocol") or "").lower() # 'tcp' or 'udp' + portid_str = port_el.get("portid", "0") + try: + portid = int(portid_str) + except ValueError: + continue + + state_el = port_el.find("state") + state = (state_el.get("state") if state_el is not None else "unknown").lower() + + # optional service info + service_el = port_el.find("service") + service_name = service_el.get("name") if service_el is not None else None + + findings.append(PortFinding(port=portid, protocol=protocol, state=state, service=service_name)) + + results.append(HostResult(address=address, host=hostname, ports=findings)) + + return results + + def _run_nmap(self, cmd: List[str]) -> None: + """ + Run a command and raise on non-zero exit with a readable message. + """ + try: + subprocess.run(cmd, check=True) + except subprocess.CalledProcessError as exc: + raise RuntimeError(f"Command failed ({exc.returncode}): {' '.join(cmd)}") from exc + + def cleanup(self): + if self.TCP_REPORT_PATH.exists(): + self.TCP_REPORT_PATH.unlink() + if self.UDP_REPORT_PATH.exists(): + self.UDP_REPORT_PATH.unlink() + if self.NMAP_RESULTS_PATH.exists: + self.NMAP_RESULTS_PATH.unlink() + \ No newline at end of file diff --git a/app/utils/settings.py b/app/utils/settings.py new file mode 100644 index 0000000..0822d74 --- /dev/null +++ b/app/utils/settings.py @@ -0,0 +1,127 @@ +# +# Note the settings file is hardcoded in this class at the top after imports. +# +# To make a new settings section, just add the setting dict to your yaml +# and then define the data class below in the config data classes area. +# +# Example use from anywhere - this will always return the same singleton +# from settings import get_settings +# def main(): +# settings = get_settings() +# print(settings.database.host) # Autocomplete works +# print(settings.logging.level) + +# if __name__ == "__main__": +# main() + +import functools +from pathlib import Path +from typing import Any, Callable, TypeVar +from dataclasses import dataclass, fields, is_dataclass, field, MISSING + +try: + import yaml +except ModuleNotFoundError: + import logging + import sys + + logger = logging.getLogger(__file__) + msg = ( + "Required modules are not installed. " + "Can not continue with module / application loading.\n" + "Install it with: pip install -r requirements" + ) + print(msg, file=sys.stderr) + logger.error(msg) + exit() + +DEFAULT_SETTINGS_FILE = Path.cwd() / "config" /"settings.yaml" + +# ---------- CONFIG DATA CLASSES ---------- +@dataclass +class DatabaseConfig: + host: str = "localhost" + port: int = 5432 + username: str = "root" + password: str = "" + + +@dataclass +class AppConfig: + name: str = "MyApp" + version_major: int = 1 + version_minor: int = 0 + production: bool = False + enabled: bool = True + token_expiry: int = 3600 + + +@dataclass +class Settings: + database: DatabaseConfig = field(default_factory=DatabaseConfig) + app: AppConfig = field(default_factory=AppConfig) + + @classmethod + def from_yaml(cls, path: str | Path) -> "Settings": + """Load settings from YAML file into a Settings object.""" + with open(path, "r", encoding="utf-8") as f: + raw: dict[str, Any] = yaml.safe_load(f) or {} + + init_kwargs = {} + for f_def in fields(cls): + yaml_value = raw.get(f_def.name, None) + + # Determine default value from default_factory or default + if f_def.default_factory is not MISSING: + default_value = f_def.default_factory() + elif f_def.default is not MISSING: + default_value = f_def.default + else: + default_value = None + + # Handle nested dataclasses + if is_dataclass(f_def.type): + if isinstance(yaml_value, dict): + # Merge YAML values with defaults + merged_data = {fld.name: getattr(default_value, fld.name) for fld in fields(f_def.type)} + merged_data.update(yaml_value) + init_kwargs[f_def.name] = f_def.type(**merged_data) + else: + init_kwargs[f_def.name] = default_value + else: + init_kwargs[f_def.name] = yaml_value if yaml_value is not None else default_value + + return cls(**init_kwargs) + + +# ---------- SINGLETON DECORATOR ---------- +T = TypeVar("T") + +def singleton_loader(func: Callable[..., T]) -> Callable[..., T]: + """Ensure the function only runs once, returning the cached value.""" + cache: dict[str, T] = {} + + @functools.wraps(func) + def wrapper(*args, **kwargs) -> T: + if func.__name__ not in cache: + cache[func.__name__] = func(*args, **kwargs) + return cache[func.__name__] + + return wrapper + + +@singleton_loader +def get_settings(config_path: str | Path | None = None) -> Settings: + """ + Returns the singleton Settings instance. + + Args: + config_path: Optional path to the YAML config file. If not provided, + defaults to 'config/settings.yaml' in the current working directory. + """ + if config_path is None: + config_path = DEFAULT_SETTINGS_FILE + else: + config_path = Path(config_path) + + return Settings.from_yaml(config_path) \ No newline at end of file diff --git a/data/expected.json b/data/expected.json new file mode 100644 index 0000000..5b3781d --- /dev/null +++ b/data/expected.json @@ -0,0 +1,5 @@ +[ +{"ip": "10.10.99.6", "expected_tcp": [22,222,3000], "expected_udp": []}, +{"ip": "10.10.99.2", "expected_tcp": [22,222,3000], "expected_udp": []}, +{"ip": "10.10.99.10", "expected_tcp": [22,80,443], "expected_udp": []} +] \ No newline at end of file diff --git a/data/report.html b/data/report.html new file mode 100644 index 0000000..844f5dc --- /dev/null +++ b/data/report.html @@ -0,0 +1,121 @@ + + + + + Compliance Report + + + +
+ + + + +
+
Compliance Report
+
Generated: 2025-10-17 17:19:25
+
+ + + + + +
+ Total hosts: 3  + Matching expected: 2  + With issues: 1 +
+ + + + + + + + + + + + +
+ 10.10.99.10 (git.sneakygeek.net) OK
+ OK   Matches expected ports. +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ 10.10.99.2 ISSUES
+ ISSUES +
+ Unexpected TCP open ports: 80 +
+ Expected TCP ports not seen: 222, 3000 +
+ Unexpected UDP open ports: none +
+ Expected UDP ports not seen: none +
+
Discovered Ports
+
ProtocolPortStateService
tcp22openssh
tcp80openhttp
+ + + + + + + + + +
+ 10.10.99.6 OK
+ OK   Matches expected ports. +
+ + +
+ Report generated by mass-scan-v2 • 2025-10-17 17:19:25 +
\ No newline at end of file diff --git a/data/rw-expected.json b/data/rw-expected.json new file mode 100644 index 0000000..e33db70 --- /dev/null +++ b/data/rw-expected.json @@ -0,0 +1,34 @@ +[ + {"ip": "81.246.102.192", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.193", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.194", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.195", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.196", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.197", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.198", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.199", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.200", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.201", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.202", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.203", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.204", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.205", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.206", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.207", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.208", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.209", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.210", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.211", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.212", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.213", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.214", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.215", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.216", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.217", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.218", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.219", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.220", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.221", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.222", "expected_tcp": [], "expected_udp": []}, + {"ip": "81.246.102.223", "expected_tcp": [], "expected_udp": []} +] \ No newline at end of file diff --git a/data/scan_targets/corp-wan.yaml b/data/scan_targets/corp-wan.yaml new file mode 100644 index 0000000..c11d77b --- /dev/null +++ b/data/scan_targets/corp-wan.yaml @@ -0,0 +1,15 @@ +name: Corp WAN +scan_options: + udp_scan: true + tls_security_scan: false + tls_exp_check: false + +reporting: + report_name: Corporate WAN Perimeter + report_filename: corp-wan.html + full_details: true + +scan_targets: + - ip: 10.10.20.5 + expected_tcp: [22, 80] + expected_udp: [53] \ No newline at end of file diff --git a/data/scan_targets/dmz.yaml b/data/scan_targets/dmz.yaml new file mode 100644 index 0000000..bd65ef8 --- /dev/null +++ b/data/scan_targets/dmz.yaml @@ -0,0 +1,23 @@ +name: DMZ +scan_options: + udp_scan: false + tls_security_scan: true + tls_exp_check: true + +reporting: + report_name: Sneaky Geek Labs DMZ Report + report_filename: dmz-report.html + full_details: false + +scan_targets: + - ip: 10.10.99.6 + expected_tcp: [22, 222, 3000] + expected_udp: [] + + - ip: 10.10.99.2 + expected_tcp: [22, 222, 3000] + expected_udp: [] + + - ip: 10.10.99.10 + expected_tcp: [22, 80, 443] + expected_udp: [] \ No newline at end of file diff --git a/data/settings.yaml b/data/settings.yaml new file mode 100644 index 0000000..945713b --- /dev/null +++ b/data/settings.yaml @@ -0,0 +1,11 @@ +app: + +scan_options: + targets_filename: expected.json + tcp_scan_type: all + udp_scan: false + +reporting: + report_name: Compliance Report + report_filename: report.html + full_details: false \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..204ad14 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,16 @@ +services: + port-checker: + build: . + image: sneaky/port-checker:ubuntu-slim + # Host networking is recommended for scanning accuracy/speed + network_mode: host + # Raw sockets for SYN scans (-sS). NET_ADMIN helps some environments. + cap_add: + - NET_RAW + - NET_ADMIN + security_opt: + - no-new-privileges:false + volumes: + - ./data:/app/data + environment: + - PYTHONUNBUFFERED=1