#!/usr/bin/env python3 import logging logging.basicConfig(level=logging.INFO) # TODO: # LOGGING - make better format # TLS SCANNING # TLS Version PROBE # EMAIL import time from pathlib import Path from ipaddress import ip_address from typing import Any, Dict, List, Set from utils.scan_config_loader import ScanConfigRepository, ScanConfigFile from utils.schedule_manager import ScanScheduler from utils.scanner import nmap_scanner from utils.models import HostResult, HostReport, GroupedReports from reporting_jinja import write_html_report_jinja from utils.settings import get_settings from utils.common import get_common_utils logger = logging.getLogger(__file__) utils = get_common_utils() settings = get_settings() def results_to_open_sets( results: List[HostResult], count_as_open: Set[str] = frozenset({"open", "open|filtered"})) -> Dict[str, Dict[str, Set[int]]]: """ Convert HostResult list to: { ip: {"tcp": {open ports}, "udp": {open ports}} } Only include ports whose state is in `count_as_open`. """ out: Dict[str, Dict[str, Set[int]]] = {} for hr in results: tcp = set() udp = set() for p in hr.ports: if p.state.lower() in count_as_open: (tcp if p.protocol == "tcp" else udp).add(p.port) out[hr.address] = {"tcp": tcp, "udp": udp} return out # Build the "reports" dict (what the HTML renderer expects) def build_reports( scan_config: "ScanConfigFile", discovered: Dict[str, Dict[str, Set[int]]], ) -> GroupedReports: """ Build per-IP deltas and return a grouped, template-friendly result. Returns: GroupedReports: - issues: hosts with any deltas (sorted by IP) - expected: hosts with no deltas (sorted by IP) - by_ip: mapping of ip -> HostReport for random access Notes: - Works with `scan_config.scan_targets` where each target has: ip, expected_tcp (List[int]), expected_udp (List[int]). - `discovered` is expected to be { ip: { "tcp": Set[int], "udp": Set[int] } }. Lists are accepted and coerced to sets. - Supports IPv4 and IPv6 sorting. Falls back to string compare if ip parsing fails. """ # ---- 1) Build expectations from scan_config ---- expected: Dict[str, Dict[str, Set[int]]] = {} cfg_targets = getattr(scan_config, "scan_targets", []) or [] for t in cfg_targets: # Support dataclass-like or dict-like objects ip = getattr(t, "ip", None) if hasattr(t, "ip") else t.get("ip") if not ip: continue raw_tcp = getattr(t, "expected_tcp", None) if hasattr(t, "expected_tcp") else t.get("expected_tcp", []) raw_udp = getattr(t, "expected_udp", None) if hasattr(t, "expected_udp") else t.get("expected_udp", []) exp_tcp = set(int(p) for p in (raw_tcp or [])) exp_udp = set(int(p) for p in (raw_udp or [])) expected[ip] = { "expected_tcp": exp_tcp, "expected_udp": exp_udp, } # ---- 2) Union of IPs present in either expectations or discoveries ---- all_ips = set(expected.keys()) | set(discovered.keys()) # ---- 3) Compute per-host deltas into HostReport objects ---- by_ip: Dict[str, HostReport] = {} for ip in all_ips: # Expected sets (default to empty sets if not present) exp_tcp: Set[int] = expected.get(ip, {}).get("expected_tcp", set()) or set() exp_udp: Set[int] = expected.get(ip, {}).get("expected_udp", set()) or set() # Discovered sets (default to empty sets if not present); coerce lists -> sets disc_tcp = discovered.get(ip, {}).get("tcp", set()) or set() disc_udp = discovered.get(ip, {}).get("udp", set()) or set() if not isinstance(disc_tcp, set): disc_tcp = set(disc_tcp) if not isinstance(disc_udp, set): disc_udp = set(disc_udp) hr = HostReport( ip=ip, unexpected_tcp=sorted(disc_tcp - exp_tcp), missing_tcp=sorted(exp_tcp - disc_tcp), unexpected_udp=sorted(disc_udp - exp_udp), missing_udp=sorted(exp_udp - disc_udp), ) by_ip[ip] = hr # ---- 4) Split into issues vs expected ---- issues: List[HostReport] = [] expected_clean: List[HostReport] = [] for hr in by_ip.values(): if hr.has_issues(): issues.append(hr) else: expected_clean.append(hr) # ---- 5) Sort both lists by numeric IP (IPv4/IPv6); fallback to string ---- def ip_sort_key(hr: HostReport): try: return ip_address(hr.ip) except ValueError: return hr.ip # non-IP strings (unlikely) fall back to lexical issues.sort(key=ip_sort_key) expected_clean.sort(key=ip_sort_key) return GroupedReports( issues=issues, expected=expected_clean, by_ip=by_ip, ) def run_repo_scan(scan_config:ScanConfigFile): logger.info(f"Starting scan for {scan_config.name}") logger.info(f"Options: udp={scan_config.scan_options.udp_scan} tls_sec={scan_config.scan_options.tls_security_scan} tls_exp={scan_config.scan_options.tls_exp_check}",) logger.info("Targets: %d hosts", len(scan_config.scan_targets)) # tack the filename on the end of our data path file_out_path = Path() / "data" / "output" / scan_config.reporting.report_filename LIGHT_TEMPLATE = "report_light.html.j2" DARK_TEMPLATE = "report_dark.html.j2" template = LIGHT_TEMPLATE if scan_config.reporting.dark_mode: template = DARK_TEMPLATE if scan_config.reporting.full_details: show_only_issues = False else: show_only_issues = True logger.info(f"Reporting Template Set to: {template}") logger.info(f"Reporting Only Issues: {show_only_issues}") scanner = nmap_scanner(scan_config) scan_results = scanner.scan_targets() discovered_sets = results_to_open_sets(scan_results, count_as_open={"open", "open|filtered"}) reports = build_reports(scan_config, discovered_sets) # build the HTML report write_html_report_jinja(reports=reports, host_results=scan_results, out_path=file_out_path, title=scan_config.reporting.report_name, template_name=template, only_issues=show_only_issues) scanner.cleanup() def main(): logger.info(f"{settings.app.name} - v{settings.app.version_major}.{settings.app.version_minor} Started") logger.info(f"Application Running Production flag set to: {settings.app.production}") # timezone validation if utils.TextUtils.is_valid_timezone(settings.app.timezone): logger.info(f"Timezone set to {settings.app.timezone}") app_timezone = settings.app.timezone else: logger.warning(f"The Timezone {settings.app.timezone} is invalid, Defaulting to UTC") app_timezone = "America/Danmarkshavn" # UTC # load / configure the scan repos repo = ScanConfigRepository() scan_configs = repo.load_all() # if in prod - run the scheduler like normal if settings.app.production: sched = ScanScheduler(timezone=app_timezone) sched.start() jobs = sched.schedule_configs(scan_configs, run_scan_fn=run_repo_scan) logger.info("Scheduled %d job(s).", jobs) try: while True: time.sleep(3600) except KeyboardInterrupt: sched.shutdown() else: # run single scan in dev mode run_repo_scan(scan_configs[0]) if __name__ == "__main__": main()