Implement complete database schema and Flask application structure for SneakyScan web interface. This establishes the foundation for web-based scan management, scheduling, and visualization. Database & ORM: - Add 11 SQLAlchemy models for comprehensive scan data storage (Scan, ScanSite, ScanIP, ScanPort, ScanService, ScanCertificate, ScanTLSVersion, Schedule, Alert, AlertRule, Setting) - Configure Alembic migrations system with initial schema migration - Add init_db.py script for database initialization and password setup - Support both migration-based and direct table creation Settings System: - Implement SettingsManager with automatic encryption for sensitive values - Add Fernet encryption for SMTP passwords and API tokens - Implement PasswordManager with bcrypt password hashing (work factor 12) - Initialize default settings for SMTP, authentication, and retention Flask Application: - Create Flask app factory pattern with scoped session management - Add 4 API blueprints: scans, schedules, alerts, settings - Implement functional Settings API (GET/PUT/DELETE endpoints) - Add CORS support, error handlers, and request/response logging - Configure development and production logging to file and console Docker & Deployment: - Update Dockerfile to install Flask dependencies - Add docker-compose-web.yml for web application deployment - Configure volume mounts for database, output, and logs persistence - Expose port 5000 for Flask web server Testing & Validation: - Add validate_phase1.py script to verify all deliverables - Validate directory structure, Python syntax, models, and endpoints - All validation checks passing Documentation: - Add PHASE1_COMPLETE.md with comprehensive Phase 1 summary - Update ROADMAP.md with Phase 1 completion status - Update .gitignore to exclude database files and documentation Files changed: 21 files - New: web/ directory with complete Flask app structure - New: migrations/ with Alembic configuration - New: requirements-web.txt with Flask dependencies - Modified: Dockerfile, ROADMAP.md, .gitignore
346 lines
15 KiB
Python
346 lines
15 KiB
Python
"""
|
|
SQLAlchemy models for SneakyScanner database.
|
|
|
|
This module defines all database tables for storing scan results, schedules,
|
|
alerts, and application settings. The schema supports the full scanning workflow
|
|
from port discovery through service detection and SSL/TLS analysis.
|
|
"""
|
|
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from sqlalchemy import (
|
|
Boolean,
|
|
Column,
|
|
DateTime,
|
|
Float,
|
|
ForeignKey,
|
|
Integer,
|
|
String,
|
|
Text,
|
|
UniqueConstraint,
|
|
)
|
|
from sqlalchemy.orm import DeclarativeBase, relationship
|
|
|
|
|
|
class Base(DeclarativeBase):
|
|
"""Base class for all SQLAlchemy models."""
|
|
pass
|
|
|
|
|
|
# ============================================================================
|
|
# Core Scan Tables
|
|
# ============================================================================
|
|
|
|
|
|
class Scan(Base):
|
|
"""
|
|
Stores metadata about each scan execution.
|
|
|
|
This is the parent table that ties together all scan results including
|
|
sites, IPs, ports, services, certificates, and TLS configuration.
|
|
"""
|
|
__tablename__ = 'scans'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
timestamp = Column(DateTime, nullable=False, index=True, comment="Scan start time (UTC)")
|
|
duration = Column(Float, nullable=True, comment="Total scan duration in seconds")
|
|
status = Column(String(20), nullable=False, default='running', comment="running, completed, failed")
|
|
config_file = Column(Text, nullable=True, comment="Path to YAML config used")
|
|
title = Column(Text, nullable=True, comment="Scan title from config")
|
|
json_path = Column(Text, nullable=True, comment="Path to JSON report")
|
|
html_path = Column(Text, nullable=True, comment="Path to HTML report")
|
|
zip_path = Column(Text, nullable=True, comment="Path to ZIP archive")
|
|
screenshot_dir = Column(Text, nullable=True, comment="Path to screenshot directory")
|
|
created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Record creation time")
|
|
triggered_by = Column(String(50), nullable=False, default='manual', comment="manual, scheduled, api")
|
|
schedule_id = Column(Integer, ForeignKey('schedules.id'), nullable=True, comment="FK to schedules if triggered by schedule")
|
|
|
|
# Relationships
|
|
sites = relationship('ScanSite', back_populates='scan', cascade='all, delete-orphan')
|
|
ips = relationship('ScanIP', back_populates='scan', cascade='all, delete-orphan')
|
|
ports = relationship('ScanPort', back_populates='scan', cascade='all, delete-orphan')
|
|
services = relationship('ScanService', back_populates='scan', cascade='all, delete-orphan')
|
|
certificates = relationship('ScanCertificate', back_populates='scan', cascade='all, delete-orphan')
|
|
tls_versions = relationship('ScanTLSVersion', back_populates='scan', cascade='all, delete-orphan')
|
|
alerts = relationship('Alert', back_populates='scan', cascade='all, delete-orphan')
|
|
schedule = relationship('Schedule', back_populates='scans')
|
|
|
|
def __repr__(self):
|
|
return f"<Scan(id={self.id}, title='{self.title}', status='{self.status}')>"
|
|
|
|
|
|
class ScanSite(Base):
|
|
"""
|
|
Logical grouping of IPs by site.
|
|
|
|
Sites represent logical network segments or locations (e.g., "Production DC",
|
|
"DMZ", "Branch Office") as defined in the scan configuration.
|
|
"""
|
|
__tablename__ = 'scan_sites'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True)
|
|
site_name = Column(String(255), nullable=False, comment="Site name from config")
|
|
|
|
# Relationships
|
|
scan = relationship('Scan', back_populates='sites')
|
|
ips = relationship('ScanIP', back_populates='site', cascade='all, delete-orphan')
|
|
|
|
def __repr__(self):
|
|
return f"<ScanSite(id={self.id}, site_name='{self.site_name}')>"
|
|
|
|
|
|
class ScanIP(Base):
|
|
"""
|
|
IP addresses scanned in each scan.
|
|
|
|
Stores the target IPs and their ping response status (expected vs. actual).
|
|
"""
|
|
__tablename__ = 'scan_ips'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True)
|
|
site_id = Column(Integer, ForeignKey('scan_sites.id'), nullable=False, index=True)
|
|
ip_address = Column(String(45), nullable=False, comment="IPv4 or IPv6 address")
|
|
ping_expected = Column(Boolean, nullable=True, comment="Expected ping response")
|
|
ping_actual = Column(Boolean, nullable=True, comment="Actual ping response")
|
|
|
|
# Relationships
|
|
scan = relationship('Scan', back_populates='ips')
|
|
site = relationship('ScanSite', back_populates='ips')
|
|
ports = relationship('ScanPort', back_populates='ip', cascade='all, delete-orphan')
|
|
|
|
# Index for efficient IP lookups within a scan
|
|
__table_args__ = (
|
|
UniqueConstraint('scan_id', 'ip_address', name='uix_scan_ip'),
|
|
)
|
|
|
|
def __repr__(self):
|
|
return f"<ScanIP(id={self.id}, ip_address='{self.ip_address}')>"
|
|
|
|
|
|
class ScanPort(Base):
|
|
"""
|
|
Discovered TCP/UDP ports.
|
|
|
|
Stores all open ports found during masscan phase, along with expected vs.
|
|
actual status for drift detection.
|
|
"""
|
|
__tablename__ = 'scan_ports'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True)
|
|
ip_id = Column(Integer, ForeignKey('scan_ips.id'), nullable=False, index=True)
|
|
port = Column(Integer, nullable=False, comment="Port number (1-65535)")
|
|
protocol = Column(String(10), nullable=False, comment="tcp or udp")
|
|
expected = Column(Boolean, nullable=True, comment="Was this port expected?")
|
|
state = Column(String(20), nullable=False, default='open', comment="open, closed, filtered")
|
|
|
|
# Relationships
|
|
scan = relationship('Scan', back_populates='ports')
|
|
ip = relationship('ScanIP', back_populates='ports')
|
|
services = relationship('ScanService', back_populates='port', cascade='all, delete-orphan')
|
|
|
|
# Index for efficient port lookups
|
|
__table_args__ = (
|
|
UniqueConstraint('scan_id', 'ip_id', 'port', 'protocol', name='uix_scan_ip_port'),
|
|
)
|
|
|
|
def __repr__(self):
|
|
return f"<ScanPort(id={self.id}, port={self.port}, protocol='{self.protocol}')>"
|
|
|
|
|
|
class ScanService(Base):
|
|
"""
|
|
Detected services on open ports.
|
|
|
|
Stores nmap service detection results including product names, versions,
|
|
and HTTP/HTTPS information with screenshots.
|
|
"""
|
|
__tablename__ = 'scan_services'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True)
|
|
port_id = Column(Integer, ForeignKey('scan_ports.id'), nullable=False, index=True)
|
|
service_name = Column(String(100), nullable=True, comment="Service name (e.g., ssh, http)")
|
|
product = Column(String(255), nullable=True, comment="Product name (e.g., OpenSSH)")
|
|
version = Column(String(100), nullable=True, comment="Version string")
|
|
extrainfo = Column(Text, nullable=True, comment="Additional nmap info")
|
|
ostype = Column(String(100), nullable=True, comment="OS type if detected")
|
|
http_protocol = Column(String(10), nullable=True, comment="http or https (if web service)")
|
|
screenshot_path = Column(Text, nullable=True, comment="Relative path to screenshot")
|
|
|
|
# Relationships
|
|
scan = relationship('Scan', back_populates='services')
|
|
port = relationship('ScanPort', back_populates='services')
|
|
certificates = relationship('ScanCertificate', back_populates='service', cascade='all, delete-orphan')
|
|
|
|
def __repr__(self):
|
|
return f"<ScanService(id={self.id}, service_name='{self.service_name}', product='{self.product}')>"
|
|
|
|
|
|
class ScanCertificate(Base):
|
|
"""
|
|
SSL/TLS certificates discovered on HTTPS services.
|
|
|
|
Stores certificate details including validity periods, subject/issuer,
|
|
and flags for self-signed certificates.
|
|
"""
|
|
__tablename__ = 'scan_certificates'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True)
|
|
service_id = Column(Integer, ForeignKey('scan_services.id'), nullable=False, index=True)
|
|
subject = Column(Text, nullable=True, comment="Certificate subject (CN)")
|
|
issuer = Column(Text, nullable=True, comment="Certificate issuer")
|
|
serial_number = Column(Text, nullable=True, comment="Serial number")
|
|
not_valid_before = Column(DateTime, nullable=True, comment="Validity start date")
|
|
not_valid_after = Column(DateTime, nullable=True, comment="Validity end date")
|
|
days_until_expiry = Column(Integer, nullable=True, comment="Days until expiration")
|
|
sans = Column(Text, nullable=True, comment="JSON array of SANs")
|
|
is_self_signed = Column(Boolean, nullable=True, default=False, comment="Self-signed certificate flag")
|
|
|
|
# Relationships
|
|
scan = relationship('Scan', back_populates='certificates')
|
|
service = relationship('ScanService', back_populates='certificates')
|
|
tls_versions = relationship('ScanTLSVersion', back_populates='certificate', cascade='all, delete-orphan')
|
|
|
|
# Index for certificate expiration queries
|
|
__table_args__ = (
|
|
{'comment': 'Index on expiration date for alert queries'},
|
|
)
|
|
|
|
def __repr__(self):
|
|
return f"<ScanCertificate(id={self.id}, subject='{self.subject}', days_until_expiry={self.days_until_expiry})>"
|
|
|
|
|
|
class ScanTLSVersion(Base):
|
|
"""
|
|
TLS version support and cipher suites.
|
|
|
|
Stores which TLS versions (1.0, 1.1, 1.2, 1.3) are supported by each
|
|
HTTPS service, along with accepted cipher suites.
|
|
"""
|
|
__tablename__ = 'scan_tls_versions'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True)
|
|
certificate_id = Column(Integer, ForeignKey('scan_certificates.id'), nullable=False, index=True)
|
|
tls_version = Column(String(20), nullable=False, comment="TLS 1.0, TLS 1.1, TLS 1.2, TLS 1.3")
|
|
supported = Column(Boolean, nullable=False, comment="Is this version supported?")
|
|
cipher_suites = Column(Text, nullable=True, comment="JSON array of cipher suites")
|
|
|
|
# Relationships
|
|
scan = relationship('Scan', back_populates='tls_versions')
|
|
certificate = relationship('ScanCertificate', back_populates='tls_versions')
|
|
|
|
def __repr__(self):
|
|
return f"<ScanTLSVersion(id={self.id}, tls_version='{self.tls_version}', supported={self.supported})>"
|
|
|
|
|
|
# ============================================================================
|
|
# Scheduling & Notifications Tables
|
|
# ============================================================================
|
|
|
|
|
|
class Schedule(Base):
|
|
"""
|
|
Scheduled scan configurations.
|
|
|
|
Stores cron-like schedules for automated periodic scanning of network
|
|
infrastructure.
|
|
"""
|
|
__tablename__ = 'schedules'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
name = Column(String(255), nullable=False, comment="Schedule name (e.g., 'Daily prod scan')")
|
|
config_file = Column(Text, nullable=False, comment="Path to YAML config")
|
|
cron_expression = Column(String(100), nullable=False, comment="Cron-like schedule (e.g., '0 2 * * *')")
|
|
enabled = Column(Boolean, nullable=False, default=True, comment="Is schedule active?")
|
|
last_run = Column(DateTime, nullable=True, comment="Last execution time")
|
|
next_run = Column(DateTime, nullable=True, comment="Next scheduled execution")
|
|
created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Schedule creation time")
|
|
updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, comment="Last modification time")
|
|
|
|
# Relationships
|
|
scans = relationship('Scan', back_populates='schedule')
|
|
|
|
def __repr__(self):
|
|
return f"<Schedule(id={self.id}, name='{self.name}', enabled={self.enabled})>"
|
|
|
|
|
|
class Alert(Base):
|
|
"""
|
|
Alert history and notifications sent.
|
|
|
|
Stores all alerts generated by the alert rule engine, including severity
|
|
levels and email notification status.
|
|
"""
|
|
__tablename__ = 'alerts'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True)
|
|
alert_type = Column(String(50), nullable=False, comment="new_port, cert_expiry, service_change, ping_failed")
|
|
severity = Column(String(20), nullable=False, comment="info, warning, critical")
|
|
message = Column(Text, nullable=False, comment="Human-readable alert message")
|
|
ip_address = Column(String(45), nullable=True, comment="Related IP (optional)")
|
|
port = Column(Integer, nullable=True, comment="Related port (optional)")
|
|
email_sent = Column(Boolean, nullable=False, default=False, comment="Was email notification sent?")
|
|
email_sent_at = Column(DateTime, nullable=True, comment="Email send timestamp")
|
|
created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Alert creation time")
|
|
|
|
# Relationships
|
|
scan = relationship('Scan', back_populates='alerts')
|
|
|
|
# Index for alert queries by type and severity
|
|
__table_args__ = (
|
|
{'comment': 'Indexes for alert filtering'},
|
|
)
|
|
|
|
def __repr__(self):
|
|
return f"<Alert(id={self.id}, alert_type='{self.alert_type}', severity='{self.severity}')>"
|
|
|
|
|
|
class AlertRule(Base):
|
|
"""
|
|
User-defined alert rules.
|
|
|
|
Configurable rules that trigger alerts based on scan results (e.g.,
|
|
certificates expiring in <30 days, unexpected ports opened).
|
|
"""
|
|
__tablename__ = 'alert_rules'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
rule_type = Column(String(50), nullable=False, comment="unexpected_port, cert_expiry, service_down, etc.")
|
|
enabled = Column(Boolean, nullable=False, default=True, comment="Is rule active?")
|
|
threshold = Column(Integer, nullable=True, comment="Threshold value (e.g., days for cert expiry)")
|
|
email_enabled = Column(Boolean, nullable=False, default=False, comment="Send email for this rule?")
|
|
created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Rule creation time")
|
|
|
|
def __repr__(self):
|
|
return f"<AlertRule(id={self.id}, rule_type='{self.rule_type}', enabled={self.enabled})>"
|
|
|
|
|
|
# ============================================================================
|
|
# Settings Table
|
|
# ============================================================================
|
|
|
|
|
|
class Setting(Base):
|
|
"""
|
|
Application configuration key-value store.
|
|
|
|
Stores application settings including SMTP configuration, authentication,
|
|
and retention policies. Values stored as JSON for complex data types.
|
|
"""
|
|
__tablename__ = 'settings'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
key = Column(String(255), nullable=False, unique=True, index=True, comment="Setting key (e.g., smtp_server)")
|
|
value = Column(Text, nullable=True, comment="Setting value (JSON for complex values)")
|
|
updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, comment="Last modification time")
|
|
|
|
def __repr__(self):
|
|
return f"<Setting(key='{self.key}', value='{self.value[:50] if self.value else None}...')>"
|