Phase 3 Step 7: Scan Comparison Features & UX Improvements

Implemented comprehensive scan comparison functionality with historical
analysis and improved user experience for scan triggering.

Features Added:
- Scan comparison engine with ports, services, and certificates analysis
- Drift score calculation (0.0-1.0 scale) for infrastructure changes
- Side-by-side comparison UI with color-coded changes (added/removed/changed)
- Historical trend charts showing port counts over time
- "Compare with Previous" button on scan detail pages
- Scan history API endpoint for trending data

API Endpoints:
- GET /api/scans/<id1>/compare/<id2> - Compare two scans
- GET /api/stats/scan-history/<id> - Historical scan data for charts

UI Improvements:
- Replaced config file text inputs with dropdown selectors
- Added config file selection to dashboard and scans pages
- Improved delete scan confirmation with proper async handling
- Enhanced error messages with detailed validation feedback
- Added 2-second delay before redirect to ensure deletion completes

Comparison Features:
- Port changes: tracks added, removed, and unchanged ports
- Service changes: detects version updates and service modifications
- Certificate changes: monitors SSL/TLS certificate updates
- Interactive historical charts with clickable data points
- Automatic detection of previous scan for comparison

Bug Fixes:
- Fixed scan deletion UI alert appearing on successful deletion
- Prevented config file path duplication (configs/configs/...)
- Improved error handling for failed API responses
- Added proper JSON response parsing with fallback handling

Testing:
- Created comprehensive test suite for comparison functionality
- Tests cover comparison API, service methods, and drift scoring
- Added edge case tests for identical scans and missing data
This commit is contained in:
2025-11-14 16:15:13 -06:00
parent 9b88f42297
commit 6792d69eb1
10 changed files with 1581 additions and 36 deletions

View File

@@ -658,3 +658,333 @@ class ScanService:
result['cipher_suites'] = []
return result
def compare_scans(self, scan1_id: int, scan2_id: int) -> Optional[Dict[str, Any]]:
"""
Compare two scans and return the differences.
Compares ports, services, and certificates between two scans,
highlighting added, removed, and changed items.
Args:
scan1_id: ID of the first (older) scan
scan2_id: ID of the second (newer) scan
Returns:
Dictionary with comparison results, or None if either scan not found
{
'scan1': {...}, # Scan 1 summary
'scan2': {...}, # Scan 2 summary
'ports': {
'added': [...],
'removed': [...],
'unchanged': [...]
},
'services': {
'added': [...],
'removed': [...],
'changed': [...]
},
'certificates': {
'added': [...],
'removed': [...],
'changed': [...]
},
'drift_score': 0.0-1.0
}
"""
# Get both scans
scan1 = self.get_scan(scan1_id)
scan2 = self.get_scan(scan2_id)
if not scan1 or not scan2:
return None
# Extract port data
ports1 = self._extract_ports_from_scan(scan1)
ports2 = self._extract_ports_from_scan(scan2)
# Compare ports
ports_comparison = self._compare_ports(ports1, ports2)
# Extract service data
services1 = self._extract_services_from_scan(scan1)
services2 = self._extract_services_from_scan(scan2)
# Compare services
services_comparison = self._compare_services(services1, services2)
# Extract certificate data
certs1 = self._extract_certificates_from_scan(scan1)
certs2 = self._extract_certificates_from_scan(scan2)
# Compare certificates
certificates_comparison = self._compare_certificates(certs1, certs2)
# Calculate drift score (0.0 = identical, 1.0 = completely different)
drift_score = self._calculate_drift_score(
ports_comparison,
services_comparison,
certificates_comparison
)
return {
'scan1': {
'id': scan1['id'],
'timestamp': scan1['timestamp'],
'title': scan1['title'],
'status': scan1['status']
},
'scan2': {
'id': scan2['id'],
'timestamp': scan2['timestamp'],
'title': scan2['title'],
'status': scan2['status']
},
'ports': ports_comparison,
'services': services_comparison,
'certificates': certificates_comparison,
'drift_score': drift_score
}
def _extract_ports_from_scan(self, scan: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract port information from a scan.
Returns:
Dictionary mapping "ip:port:protocol" to port details
"""
ports = {}
for site in scan.get('sites', []):
for ip_data in site.get('ips', []):
ip_addr = ip_data['address']
for port_data in ip_data.get('ports', []):
key = f"{ip_addr}:{port_data['port']}:{port_data['protocol']}"
ports[key] = {
'ip': ip_addr,
'port': port_data['port'],
'protocol': port_data['protocol'],
'state': port_data.get('state', 'unknown'),
'expected': port_data.get('expected')
}
return ports
def _extract_services_from_scan(self, scan: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract service information from a scan.
Returns:
Dictionary mapping "ip:port:protocol" to service details
"""
services = {}
for site in scan.get('sites', []):
for ip_data in site.get('ips', []):
ip_addr = ip_data['address']
for port_data in ip_data.get('ports', []):
port_num = port_data['port']
protocol = port_data['protocol']
key = f"{ip_addr}:{port_num}:{protocol}"
# Get first service (usually only one per port)
port_services = port_data.get('services', [])
if port_services:
svc = port_services[0]
services[key] = {
'ip': ip_addr,
'port': port_num,
'protocol': protocol,
'service_name': svc.get('service_name'),
'product': svc.get('product'),
'version': svc.get('version'),
'extrainfo': svc.get('extrainfo')
}
return services
def _extract_certificates_from_scan(self, scan: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract certificate information from a scan.
Returns:
Dictionary mapping "ip:port" to certificate details
"""
certificates = {}
for site in scan.get('sites', []):
for ip_data in site.get('ips', []):
ip_addr = ip_data['address']
for port_data in ip_data.get('ports', []):
port_num = port_data['port']
protocol = port_data['protocol']
# Get certificates from services
for svc in port_data.get('services', []):
if svc.get('certificates'):
for cert in svc['certificates']:
key = f"{ip_addr}:{port_num}"
certificates[key] = {
'ip': ip_addr,
'port': port_num,
'subject': cert.get('subject'),
'issuer': cert.get('issuer'),
'not_valid_after': cert.get('not_valid_after'),
'days_until_expiry': cert.get('days_until_expiry'),
'is_self_signed': cert.get('is_self_signed')
}
return certificates
def _compare_ports(self, ports1: Dict, ports2: Dict) -> Dict[str, List]:
"""
Compare port sets between two scans.
Returns:
Dictionary with added, removed, and unchanged ports
"""
keys1 = set(ports1.keys())
keys2 = set(ports2.keys())
added_keys = keys2 - keys1
removed_keys = keys1 - keys2
unchanged_keys = keys1 & keys2
return {
'added': [ports2[k] for k in sorted(added_keys)],
'removed': [ports1[k] for k in sorted(removed_keys)],
'unchanged': [ports2[k] for k in sorted(unchanged_keys)]
}
def _compare_services(self, services1: Dict, services2: Dict) -> Dict[str, List]:
"""
Compare services between two scans.
Returns:
Dictionary with added, removed, and changed services
"""
keys1 = set(services1.keys())
keys2 = set(services2.keys())
added_keys = keys2 - keys1
removed_keys = keys1 - keys2
common_keys = keys1 & keys2
# Find changed services (same port, different version/product)
changed = []
for key in sorted(common_keys):
svc1 = services1[key]
svc2 = services2[key]
# Check if service details changed
if (svc1.get('product') != svc2.get('product') or
svc1.get('version') != svc2.get('version') or
svc1.get('service_name') != svc2.get('service_name')):
changed.append({
'ip': svc2['ip'],
'port': svc2['port'],
'protocol': svc2['protocol'],
'old': {
'service_name': svc1.get('service_name'),
'product': svc1.get('product'),
'version': svc1.get('version')
},
'new': {
'service_name': svc2.get('service_name'),
'product': svc2.get('product'),
'version': svc2.get('version')
}
})
return {
'added': [services2[k] for k in sorted(added_keys)],
'removed': [services1[k] for k in sorted(removed_keys)],
'changed': changed
}
def _compare_certificates(self, certs1: Dict, certs2: Dict) -> Dict[str, List]:
"""
Compare certificates between two scans.
Returns:
Dictionary with added, removed, and changed certificates
"""
keys1 = set(certs1.keys())
keys2 = set(certs2.keys())
added_keys = keys2 - keys1
removed_keys = keys1 - keys2
common_keys = keys1 & keys2
# Find changed certificates (same IP:port, different cert details)
changed = []
for key in sorted(common_keys):
cert1 = certs1[key]
cert2 = certs2[key]
# Check if certificate changed
if (cert1.get('subject') != cert2.get('subject') or
cert1.get('issuer') != cert2.get('issuer') or
cert1.get('not_valid_after') != cert2.get('not_valid_after')):
changed.append({
'ip': cert2['ip'],
'port': cert2['port'],
'old': {
'subject': cert1.get('subject'),
'issuer': cert1.get('issuer'),
'not_valid_after': cert1.get('not_valid_after'),
'days_until_expiry': cert1.get('days_until_expiry')
},
'new': {
'subject': cert2.get('subject'),
'issuer': cert2.get('issuer'),
'not_valid_after': cert2.get('not_valid_after'),
'days_until_expiry': cert2.get('days_until_expiry')
}
})
return {
'added': [certs2[k] for k in sorted(added_keys)],
'removed': [certs1[k] for k in sorted(removed_keys)],
'changed': changed
}
def _calculate_drift_score(self, ports_comp: Dict, services_comp: Dict,
certs_comp: Dict) -> float:
"""
Calculate drift score based on comparison results.
Returns:
Float between 0.0 (identical) and 1.0 (completely different)
"""
# Count total items in both scans
total_ports = (
len(ports_comp['added']) +
len(ports_comp['removed']) +
len(ports_comp['unchanged'])
)
total_services = (
len(services_comp['added']) +
len(services_comp['removed']) +
len(services_comp['changed']) +
max(0, len(ports_comp['unchanged']) - len(services_comp['changed']))
)
# Count changed items
changed_ports = len(ports_comp['added']) + len(ports_comp['removed'])
changed_services = (
len(services_comp['added']) +
len(services_comp['removed']) +
len(services_comp['changed'])
)
changed_certs = (
len(certs_comp['added']) +
len(certs_comp['removed']) +
len(certs_comp['changed'])
)
# Calculate weighted drift score
# Ports have 50% weight, services 30%, certificates 20%
port_drift = changed_ports / max(total_ports, 1)
service_drift = changed_services / max(total_services, 1)
cert_drift = changed_certs / max(len(certs_comp['added']) + len(certs_comp['removed']) + len(certs_comp['changed']), 1)
drift_score = (port_drift * 0.5) + (service_drift * 0.3) + (cert_drift * 0.2)
return round(min(drift_score, 1.0), 3) # Cap at 1.0 and round to 3 decimals