217 lines
7.8 KiB
Python
217 lines
7.8 KiB
Python
#!/usr/bin/env python3
|
|
import logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
# TODO:
|
|
# LOGGING - make better format
|
|
# TLS SCANNING
|
|
# TLS Version PROBE
|
|
# EMAIL
|
|
|
|
import time
|
|
from pathlib import Path
|
|
from ipaddress import ip_address
|
|
from typing import Any, Dict, List, Set
|
|
|
|
from utils.scan_config_loader import ScanConfigRepository, ScanConfigFile
|
|
from utils.schedule_manager import ScanScheduler
|
|
from utils.scanner import nmap_scanner
|
|
from utils.models import HostResult, HostReport, GroupedReports
|
|
|
|
from reporting_jinja import write_html_report_jinja
|
|
from utils.settings import get_settings
|
|
from utils.common import get_common_utils
|
|
|
|
logger = logging.getLogger(__file__)
|
|
|
|
utils = get_common_utils()
|
|
settings = get_settings()
|
|
|
|
def results_to_open_sets(
|
|
results: List[HostResult],
|
|
count_as_open: Set[str] = frozenset({"open", "open|filtered"})) -> Dict[str, Dict[str, Set[int]]]:
|
|
"""
|
|
Convert HostResult list to:
|
|
{ ip: {"tcp": {open ports}, "udp": {open ports}} }
|
|
Only include ports whose state is in `count_as_open`.
|
|
"""
|
|
out: Dict[str, Dict[str, Set[int]]] = {}
|
|
for hr in results:
|
|
tcp = set()
|
|
udp = set()
|
|
for p in hr.ports:
|
|
if p.state.lower() in count_as_open:
|
|
(tcp if p.protocol == "tcp" else udp).add(p.port)
|
|
out[hr.address] = {"tcp": tcp, "udp": udp}
|
|
return out
|
|
|
|
# Build the grouped_reports (what the HTML renderer expects)
|
|
def build_grouped_reports(
|
|
scan_config: "ScanConfigFile",
|
|
discovered: Dict[str, Dict[str, Set[int]]],
|
|
) -> GroupedReports:
|
|
"""
|
|
Build per-IP deltas and return a grouped, template-friendly result.
|
|
|
|
Returns:
|
|
GroupedReports:
|
|
- issues: hosts with any deltas (sorted by IP)
|
|
- expected: hosts with no deltas (sorted by IP)
|
|
- by_ip: mapping of ip -> HostReport for random access
|
|
|
|
Notes:
|
|
- Works with `scan_config.scan_targets` where each target has:
|
|
ip, expected_tcp (List[int]), expected_udp (List[int]).
|
|
- `discovered` is expected to be { ip: { "tcp": Set[int], "udp": Set[int] } }.
|
|
Lists are accepted and coerced to sets.
|
|
- Supports IPv4 and IPv6 sorting. Falls back to string compare if ip parsing fails.
|
|
"""
|
|
# ---- 1) Build expectations from scan_config ----
|
|
expected: Dict[str, Dict[str, Set[int]]] = {}
|
|
cfg_targets = getattr(scan_config, "scan_targets", []) or []
|
|
|
|
for t in cfg_targets:
|
|
# Support dataclass-like or dict-like objects
|
|
ip = getattr(t, "ip", None) if hasattr(t, "ip") else t.get("ip")
|
|
if not ip:
|
|
continue
|
|
|
|
raw_tcp = getattr(t, "expected_tcp", None) if hasattr(t, "expected_tcp") else t.get("expected_tcp", [])
|
|
raw_udp = getattr(t, "expected_udp", None) if hasattr(t, "expected_udp") else t.get("expected_udp", [])
|
|
|
|
exp_tcp = set(int(p) for p in (raw_tcp or []))
|
|
exp_udp = set(int(p) for p in (raw_udp or []))
|
|
|
|
expected[ip] = {
|
|
"expected_tcp": exp_tcp,
|
|
"expected_udp": exp_udp,
|
|
}
|
|
|
|
# ---- 2) Union of IPs present in either expectations or discoveries ----
|
|
all_ips = set(expected.keys()) | set(discovered.keys())
|
|
|
|
# ---- 3) Compute per-host deltas into HostReport objects ----
|
|
by_ip: Dict[str, HostReport] = {}
|
|
|
|
for ip in all_ips:
|
|
# Expected sets (default to empty sets if not present)
|
|
exp_tcp: Set[int] = expected.get(ip, {}).get("expected_tcp", set()) or set()
|
|
exp_udp: Set[int] = expected.get(ip, {}).get("expected_udp", set()) or set()
|
|
|
|
# Discovered sets (default to empty sets if not present); coerce lists -> sets
|
|
disc_tcp = discovered.get(ip, {}).get("tcp", set()) or set()
|
|
disc_udp = discovered.get(ip, {}).get("udp", set()) or set()
|
|
if not isinstance(disc_tcp, set):
|
|
disc_tcp = set(disc_tcp)
|
|
if not isinstance(disc_udp, set):
|
|
disc_udp = set(disc_udp)
|
|
|
|
hr = HostReport(
|
|
ip=ip,
|
|
unexpected_tcp=sorted(disc_tcp - exp_tcp),
|
|
missing_tcp=sorted(exp_tcp - disc_tcp),
|
|
unexpected_udp=sorted(disc_udp - exp_udp),
|
|
missing_udp=sorted(exp_udp - disc_udp),
|
|
)
|
|
by_ip[ip] = hr
|
|
|
|
# ---- 4) Split into issues vs expected ----
|
|
issues: List[HostReport] = []
|
|
expected_clean: List[HostReport] = []
|
|
|
|
for hr in by_ip.values():
|
|
if hr.has_issues():
|
|
issues.append(hr)
|
|
else:
|
|
expected_clean.append(hr)
|
|
|
|
# ---- 5) Sort both lists by numeric IP (IPv4/IPv6); fallback to string ----
|
|
def ip_sort_key(hr: HostReport):
|
|
try:
|
|
return ip_address(hr.ip)
|
|
except ValueError:
|
|
return hr.ip # non-IP strings (unlikely) fall back to lexical
|
|
|
|
issues.sort(key=ip_sort_key)
|
|
expected_clean.sort(key=ip_sort_key)
|
|
|
|
return GroupedReports(
|
|
issues=issues,
|
|
expected=expected_clean,
|
|
by_ip=by_ip,
|
|
)
|
|
|
|
def run_repo_scan(scan_config:ScanConfigFile):
|
|
logger.info(f"Starting scan for {scan_config.name}")
|
|
logger.info(f"Options: udp={scan_config.scan_options.udp_scan} tls_sec={scan_config.scan_options.tls_security_scan} tls_exp={scan_config.scan_options.tls_exp_check}",)
|
|
logger.info("Targets: %d hosts", len(scan_config.scan_targets))
|
|
|
|
# tack the filename on the end of our data path
|
|
file_out_path = Path() / "data" / "output" / scan_config.reporting.report_filename
|
|
|
|
if scan_config.reporting.full_details:
|
|
show_only_issues = False
|
|
else:
|
|
show_only_issues = True
|
|
|
|
logger.info(f"Reporting Dark Mode set to: {scan_config.reporting.dark_mode}")
|
|
logger.info(f"Reporting Only Issues: {show_only_issues}")
|
|
|
|
scanner = nmap_scanner(scan_config)
|
|
scan_results = scanner.scan_targets()
|
|
discovered_sets = results_to_open_sets(scan_results, count_as_open={"open", "open|filtered"})
|
|
grouped_reports = build_grouped_reports(scan_config, discovered_sets)
|
|
|
|
# build the HTML report
|
|
# write_html_report_jinja(grouped=grouped_reports,
|
|
# host_results=scan_results,
|
|
# out_path=file_out_path,
|
|
# title=scan_config.reporting.report_name,
|
|
# template_name=template,
|
|
# only_issues=show_only_issues)
|
|
|
|
write_html_report_jinja(grouped=grouped_reports,
|
|
host_results=scan_results,
|
|
out_path=file_out_path,
|
|
title=scan_config.reporting.report_name,
|
|
dark_mode=scan_config.reporting.dark_mode,
|
|
only_issues=show_only_issues)
|
|
|
|
scanner.cleanup()
|
|
|
|
def main():
|
|
logger.info(f"{settings.app.name} - v{settings.app.version_major}.{settings.app.version_minor} Started")
|
|
logger.info(f"Application Running Production flag set to: {settings.app.production}")
|
|
|
|
# timezone validation
|
|
if utils.TextUtils.is_valid_timezone(settings.app.timezone):
|
|
logger.info(f"Timezone set to {settings.app.timezone}")
|
|
app_timezone = settings.app.timezone
|
|
else:
|
|
logger.warning(f"The Timezone {settings.app.timezone} is invalid, Defaulting to UTC")
|
|
app_timezone = "America/Danmarkshavn" # UTC
|
|
|
|
# load / configure the scan repos
|
|
repo = ScanConfigRepository()
|
|
scan_configs = repo.load_all()
|
|
|
|
# if in prod - run the scheduler like normal
|
|
if settings.app.production:
|
|
sched = ScanScheduler(timezone=app_timezone)
|
|
sched.start()
|
|
|
|
jobs = sched.schedule_configs(scan_configs, run_scan_fn=run_repo_scan)
|
|
logger.info("Scheduled %d job(s).", jobs)
|
|
|
|
try:
|
|
while True:
|
|
time.sleep(3600)
|
|
except KeyboardInterrupt:
|
|
sched.shutdown()
|
|
else:
|
|
# run single scan in dev mode
|
|
run_repo_scan(scan_configs[0])
|
|
|
|
if __name__ == "__main__":
|
|
main()
|