""" SQLAlchemy models for SneakyScanner database. This module defines all database tables for storing scan results, schedules, alerts, and application settings. The schema supports the full scanning workflow from port discovery through service detection and SSL/TLS analysis. """ from datetime import datetime from typing import Optional from sqlalchemy import ( Boolean, Column, DateTime, Float, ForeignKey, Integer, String, Text, UniqueConstraint, ) from sqlalchemy.orm import DeclarativeBase, relationship class Base(DeclarativeBase): """Base class for all SQLAlchemy models.""" pass # ============================================================================ # Core Scan Tables # ============================================================================ class Scan(Base): """ Stores metadata about each scan execution. This is the parent table that ties together all scan results including sites, IPs, ports, services, certificates, and TLS configuration. """ __tablename__ = 'scans' id = Column(Integer, primary_key=True, autoincrement=True) timestamp = Column(DateTime, nullable=False, index=True, comment="Scan start time (UTC)") duration = Column(Float, nullable=True, comment="Total scan duration in seconds") status = Column(String(20), nullable=False, default='running', comment="running, finalizing, completed, failed, cancelled") config_id = Column(Integer, ForeignKey('scan_configs.id'), nullable=True, index=True, comment="FK to scan_configs table") title = Column(Text, nullable=True, comment="Scan title from config") json_path = Column(Text, nullable=True, comment="Path to JSON report") html_path = Column(Text, nullable=True, comment="Path to HTML report") zip_path = Column(Text, nullable=True, comment="Path to ZIP archive") screenshot_dir = Column(Text, nullable=True, comment="Path to screenshot directory") created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Record creation time") triggered_by = Column(String(50), nullable=False, default='manual', comment="manual, scheduled, api") schedule_id = Column(Integer, ForeignKey('schedules.id'), nullable=True, comment="FK to schedules if triggered by schedule") started_at = Column(DateTime, nullable=True, comment="Scan execution start time") completed_at = Column(DateTime, nullable=True, comment="Scan execution completion time") error_message = Column(Text, nullable=True, comment="Error message if scan failed") # Progress tracking fields current_phase = Column(String(50), nullable=True, comment="Current scan phase: ping, tcp_scan, udp_scan, service_detection, http_analysis") total_ips = Column(Integer, nullable=True, comment="Total number of IPs to scan") completed_ips = Column(Integer, nullable=True, default=0, comment="Number of IPs completed in current phase") # Relationships sites = relationship('ScanSite', back_populates='scan', cascade='all, delete-orphan') ips = relationship('ScanIP', back_populates='scan', cascade='all, delete-orphan') ports = relationship('ScanPort', back_populates='scan', cascade='all, delete-orphan') services = relationship('ScanService', back_populates='scan', cascade='all, delete-orphan') certificates = relationship('ScanCertificate', back_populates='scan', cascade='all, delete-orphan') tls_versions = relationship('ScanTLSVersion', back_populates='scan', cascade='all, delete-orphan') alerts = relationship('Alert', back_populates='scan', cascade='all, delete-orphan') schedule = relationship('Schedule', back_populates='scans') config = relationship('ScanConfig', back_populates='scans') site_associations = relationship('ScanSiteAssociation', back_populates='scan', cascade='all, delete-orphan') progress_entries = relationship('ScanProgress', back_populates='scan', cascade='all, delete-orphan') def __repr__(self): return f"" class ScanSite(Base): """ Logical grouping of IPs by site. Sites represent logical network segments or locations (e.g., "Production DC", "DMZ", "Branch Office") as defined in the scan configuration. """ __tablename__ = 'scan_sites' id = Column(Integer, primary_key=True, autoincrement=True) scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True) site_name = Column(String(255), nullable=False, comment="Site name from config") # Relationships scan = relationship('Scan', back_populates='sites') ips = relationship('ScanIP', back_populates='site', cascade='all, delete-orphan') def __repr__(self): return f"" class ScanIP(Base): """ IP addresses scanned in each scan. Stores the target IPs and their ping response status (expected vs. actual). """ __tablename__ = 'scan_ips' id = Column(Integer, primary_key=True, autoincrement=True) scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True) site_id = Column(Integer, ForeignKey('scan_sites.id'), nullable=False, index=True) ip_address = Column(String(45), nullable=False, comment="IPv4 or IPv6 address") ping_expected = Column(Boolean, nullable=True, comment="Expected ping response") ping_actual = Column(Boolean, nullable=True, comment="Actual ping response") # Relationships scan = relationship('Scan', back_populates='ips') site = relationship('ScanSite', back_populates='ips') ports = relationship('ScanPort', back_populates='ip', cascade='all, delete-orphan') # Index for efficient IP lookups within a scan __table_args__ = ( UniqueConstraint('scan_id', 'ip_address', name='uix_scan_ip'), ) def __repr__(self): return f"" class ScanPort(Base): """ Discovered TCP/UDP ports. Stores all open ports found during masscan phase, along with expected vs. actual status for drift detection. """ __tablename__ = 'scan_ports' id = Column(Integer, primary_key=True, autoincrement=True) scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True) ip_id = Column(Integer, ForeignKey('scan_ips.id'), nullable=False, index=True) port = Column(Integer, nullable=False, comment="Port number (1-65535)") protocol = Column(String(10), nullable=False, comment="tcp or udp") expected = Column(Boolean, nullable=True, comment="Was this port expected?") state = Column(String(20), nullable=False, default='open', comment="open, closed, filtered") # Relationships scan = relationship('Scan', back_populates='ports') ip = relationship('ScanIP', back_populates='ports') services = relationship('ScanService', back_populates='port', cascade='all, delete-orphan') # Index for efficient port lookups __table_args__ = ( UniqueConstraint('scan_id', 'ip_id', 'port', 'protocol', name='uix_scan_ip_port'), ) def __repr__(self): return f"" class ScanService(Base): """ Detected services on open ports. Stores nmap service detection results including product names, versions, and HTTP/HTTPS information with screenshots. """ __tablename__ = 'scan_services' id = Column(Integer, primary_key=True, autoincrement=True) scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True) port_id = Column(Integer, ForeignKey('scan_ports.id'), nullable=False, index=True) service_name = Column(String(100), nullable=True, comment="Service name (e.g., ssh, http)") product = Column(String(255), nullable=True, comment="Product name (e.g., OpenSSH)") version = Column(String(100), nullable=True, comment="Version string") extrainfo = Column(Text, nullable=True, comment="Additional nmap info") ostype = Column(String(100), nullable=True, comment="OS type if detected") http_protocol = Column(String(10), nullable=True, comment="http or https (if web service)") screenshot_path = Column(Text, nullable=True, comment="Relative path to screenshot") # Relationships scan = relationship('Scan', back_populates='services') port = relationship('ScanPort', back_populates='services') certificates = relationship('ScanCertificate', back_populates='service', cascade='all, delete-orphan') def __repr__(self): return f"" class ScanCertificate(Base): """ SSL/TLS certificates discovered on HTTPS services. Stores certificate details including validity periods, subject/issuer, and flags for self-signed certificates. """ __tablename__ = 'scan_certificates' id = Column(Integer, primary_key=True, autoincrement=True) scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True) service_id = Column(Integer, ForeignKey('scan_services.id'), nullable=False, index=True) subject = Column(Text, nullable=True, comment="Certificate subject (CN)") issuer = Column(Text, nullable=True, comment="Certificate issuer") serial_number = Column(Text, nullable=True, comment="Serial number") not_valid_before = Column(DateTime, nullable=True, comment="Validity start date") not_valid_after = Column(DateTime, nullable=True, comment="Validity end date") days_until_expiry = Column(Integer, nullable=True, comment="Days until expiration") sans = Column(Text, nullable=True, comment="JSON array of SANs") is_self_signed = Column(Boolean, nullable=True, default=False, comment="Self-signed certificate flag") # Relationships scan = relationship('Scan', back_populates='certificates') service = relationship('ScanService', back_populates='certificates') tls_versions = relationship('ScanTLSVersion', back_populates='certificate', cascade='all, delete-orphan') # Index for certificate expiration queries __table_args__ = ( {'comment': 'Index on expiration date for alert queries'}, ) def __repr__(self): return f"" class ScanTLSVersion(Base): """ TLS version support and cipher suites. Stores which TLS versions (1.0, 1.1, 1.2, 1.3) are supported by each HTTPS service, along with accepted cipher suites. """ __tablename__ = 'scan_tls_versions' id = Column(Integer, primary_key=True, autoincrement=True) scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True) certificate_id = Column(Integer, ForeignKey('scan_certificates.id'), nullable=False, index=True) tls_version = Column(String(20), nullable=False, comment="TLS 1.0, TLS 1.1, TLS 1.2, TLS 1.3") supported = Column(Boolean, nullable=False, comment="Is this version supported?") cipher_suites = Column(Text, nullable=True, comment="JSON array of cipher suites") # Relationships scan = relationship('Scan', back_populates='tls_versions') certificate = relationship('ScanCertificate', back_populates='tls_versions') def __repr__(self): return f"" class ScanProgress(Base): """ Real-time progress tracking for individual IPs during scan execution. Stores intermediate results as they become available, allowing users to see progress and results before the full scan completes. """ __tablename__ = 'scan_progress' id = Column(Integer, primary_key=True, autoincrement=True) scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True) ip_address = Column(String(45), nullable=False, comment="IP address being scanned") site_name = Column(String(255), nullable=True, comment="Site name this IP belongs to") phase = Column(String(50), nullable=False, comment="Phase: ping, tcp_scan, udp_scan, service_detection, http_analysis") status = Column(String(20), nullable=False, default='pending', comment="pending, in_progress, completed, failed") # Results data (stored as JSON) ping_result = Column(Boolean, nullable=True, comment="Ping response result") tcp_ports = Column(Text, nullable=True, comment="JSON array of discovered TCP ports") udp_ports = Column(Text, nullable=True, comment="JSON array of discovered UDP ports") services = Column(Text, nullable=True, comment="JSON array of detected services") created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Entry creation time") updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, comment="Last update time") # Relationships scan = relationship('Scan', back_populates='progress_entries') # Index for efficient lookups __table_args__ = ( UniqueConstraint('scan_id', 'ip_address', name='uix_scan_progress_ip'), ) def __repr__(self): return f"" # ============================================================================ # Reusable Site Definition Tables # ============================================================================ class Site(Base): """ Master site definition (reusable across scans). Sites represent logical network segments (e.g., "Production DC", "DMZ", "Branch Office") that can be reused across multiple scans. Each site contains one or more CIDR ranges. """ __tablename__ = 'sites' id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(255), nullable=False, unique=True, index=True, comment="Unique site name") description = Column(Text, nullable=True, comment="Site description") created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Site creation time") updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, comment="Last modification time") # Relationships ips = relationship('SiteIP', back_populates='site', cascade='all, delete-orphan') scan_associations = relationship('ScanSiteAssociation', back_populates='site') config_associations = relationship('ScanConfigSite', back_populates='site') def __repr__(self): return f"" class SiteIP(Base): """ Individual IP addresses with their own settings. Each IP is directly associated with a site and has its own port and ping settings. IPs are standalone entities - CIDRs are only used as a convenience for bulk creation. """ __tablename__ = 'site_ips' id = Column(Integer, primary_key=True, autoincrement=True) site_id = Column(Integer, ForeignKey('sites.id'), nullable=False, index=True, comment="FK to sites") ip_address = Column(String(45), nullable=False, comment="IPv4 or IPv6 address") expected_ping = Column(Boolean, nullable=True, comment="Expected ping response for this IP") expected_tcp_ports = Column(Text, nullable=True, comment="JSON array of expected TCP ports") expected_udp_ports = Column(Text, nullable=True, comment="JSON array of expected UDP ports") created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="IP creation time") # Relationships site = relationship('Site', back_populates='ips') # Index for efficient IP lookups - prevent duplicate IPs within a site __table_args__ = ( UniqueConstraint('site_id', 'ip_address', name='uix_site_ip_address'), ) def __repr__(self): return f"" class ScanSiteAssociation(Base): """ Many-to-many relationship between scans and sites. Tracks which sites were included in which scans. This allows sites to be reused across multiple scans. """ __tablename__ = 'scan_site_associations' id = Column(Integer, primary_key=True, autoincrement=True) scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True) site_id = Column(Integer, ForeignKey('sites.id'), nullable=False, index=True) created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Association creation time") # Relationships scan = relationship('Scan', back_populates='site_associations') site = relationship('Site', back_populates='scan_associations') # Index to prevent duplicate associations __table_args__ = ( UniqueConstraint('scan_id', 'site_id', name='uix_scan_site'), ) def __repr__(self): return f"" # ============================================================================ # Scan Configuration Tables # ============================================================================ class ScanConfig(Base): """ Scan configurations stored in database (replaces YAML files). Stores reusable scan configurations that reference sites from the sites table. Configs define what sites to scan together. """ __tablename__ = 'scan_configs' id = Column(Integer, primary_key=True, autoincrement=True) title = Column(String(255), nullable=False, comment="Configuration title") description = Column(Text, nullable=True, comment="Configuration description") created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Config creation time") updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, comment="Last modification time") # Relationships site_associations = relationship('ScanConfigSite', back_populates='config', cascade='all, delete-orphan') scans = relationship('Scan', back_populates='config') schedules = relationship('Schedule', back_populates='config') def __repr__(self): return f"" class ScanConfigSite(Base): """ Many-to-many relationship between scan configs and sites. Links scan configurations to the sites they should scan. A config can reference multiple sites, and sites can be used in multiple configs. """ __tablename__ = 'scan_config_sites' id = Column(Integer, primary_key=True, autoincrement=True) config_id = Column(Integer, ForeignKey('scan_configs.id'), nullable=False, index=True) site_id = Column(Integer, ForeignKey('sites.id'), nullable=False, index=True) created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Association creation time") # Relationships config = relationship('ScanConfig', back_populates='site_associations') site = relationship('Site', back_populates='config_associations') # Index to prevent duplicate associations __table_args__ = ( UniqueConstraint('config_id', 'site_id', name='uix_config_site'), ) def __repr__(self): return f"" # ============================================================================ # Scheduling & Notifications Tables # ============================================================================ class Schedule(Base): """ Scheduled scan configurations. Stores cron-like schedules for automated periodic scanning of network infrastructure. """ __tablename__ = 'schedules' id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(255), nullable=False, comment="Schedule name (e.g., 'Daily prod scan')") config_id = Column(Integer, ForeignKey('scan_configs.id'), nullable=True, index=True, comment="FK to scan_configs table") cron_expression = Column(String(100), nullable=False, comment="Cron-like schedule (e.g., '0 2 * * *')") enabled = Column(Boolean, nullable=False, default=True, comment="Is schedule active?") last_run = Column(DateTime, nullable=True, comment="Last execution time") next_run = Column(DateTime, nullable=True, comment="Next scheduled execution") created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Schedule creation time") updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, comment="Last modification time") # Relationships scans = relationship('Scan', back_populates='schedule') config = relationship('ScanConfig', back_populates='schedules') def __repr__(self): return f"" class Alert(Base): """ Alert history and notifications sent. Stores all alerts generated by the alert rule engine, including severity levels and email notification status. """ __tablename__ = 'alerts' id = Column(Integer, primary_key=True, autoincrement=True) scan_id = Column(Integer, ForeignKey('scans.id'), nullable=False, index=True) rule_id = Column(Integer, ForeignKey('alert_rules.id'), nullable=True, index=True, comment="Associated alert rule") alert_type = Column(String(50), nullable=False, comment="unexpected_port, drift_detection, cert_expiry, service_change, ping_failed") severity = Column(String(20), nullable=False, comment="info, warning, critical") message = Column(Text, nullable=False, comment="Human-readable alert message") ip_address = Column(String(45), nullable=True, comment="Related IP (optional)") port = Column(Integer, nullable=True, comment="Related port (optional)") email_sent = Column(Boolean, nullable=False, default=False, comment="Was email notification sent?") email_sent_at = Column(DateTime, nullable=True, comment="Email send timestamp") webhook_sent = Column(Boolean, nullable=False, default=False, comment="Was webhook sent?") webhook_sent_at = Column(DateTime, nullable=True, comment="Webhook send timestamp") acknowledged = Column(Boolean, nullable=False, default=False, index=True, comment="Was alert acknowledged?") acknowledged_at = Column(DateTime, nullable=True, comment="Acknowledgment timestamp") acknowledged_by = Column(String(255), nullable=True, comment="User who acknowledged") created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Alert creation time") # Relationships scan = relationship('Scan', back_populates='alerts') rule = relationship('AlertRule', back_populates='alerts') # Index for alert queries by type and severity __table_args__ = ( {'comment': 'Indexes for alert filtering'}, ) def __repr__(self): return f"" class AlertRule(Base): """ User-defined alert rules. Configurable rules that trigger alerts based on scan results (e.g., certificates expiring in <30 days, unexpected ports opened). """ __tablename__ = 'alert_rules' id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(255), nullable=True, comment="User-friendly rule name") rule_type = Column(String(50), nullable=False, comment="unexpected_port, cert_expiry, service_down, drift_detection, etc.") enabled = Column(Boolean, nullable=False, default=True, comment="Is rule active?") threshold = Column(Integer, nullable=True, comment="Threshold value (e.g., days for cert expiry)") email_enabled = Column(Boolean, nullable=False, default=False, comment="Send email for this rule?") webhook_enabled = Column(Boolean, nullable=False, default=False, comment="Send webhook for this rule?") severity = Column(String(20), nullable=True, comment="Alert severity: critical, warning, info") filter_conditions = Column(Text, nullable=True, comment="JSON filter conditions for the rule") config_id = Column(Integer, ForeignKey('scan_configs.id'), nullable=True, index=True, comment="Optional: specific config this rule applies to") created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Rule creation time") updated_at = Column(DateTime, nullable=True, comment="Last update time") # Relationships alerts = relationship("Alert", back_populates="rule", cascade="all, delete-orphan") config = relationship("ScanConfig", backref="alert_rules") def __repr__(self): return f"" class Webhook(Base): """ Webhook configurations for alert notifications. Stores webhook endpoints and authentication details for sending alert notifications to external systems. """ __tablename__ = 'webhooks' id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(255), nullable=False, comment="Webhook name") url = Column(Text, nullable=False, comment="Webhook URL") enabled = Column(Boolean, nullable=False, default=True, comment="Is webhook enabled?") auth_type = Column(String(20), nullable=True, comment="Authentication type: none, bearer, basic, custom") auth_token = Column(Text, nullable=True, comment="Encrypted authentication token") custom_headers = Column(Text, nullable=True, comment="JSON custom headers") alert_types = Column(Text, nullable=True, comment="JSON array of alert types to trigger on") severity_filter = Column(Text, nullable=True, comment="JSON array of severities to trigger on") timeout = Column(Integer, nullable=True, default=10, comment="Request timeout in seconds") retry_count = Column(Integer, nullable=True, default=3, comment="Number of retry attempts") template = Column(Text, nullable=True, comment="Jinja2 template for webhook payload") template_format = Column(String(20), nullable=True, default='json', comment="Template output format: json, text") content_type_override = Column(String(100), nullable=True, comment="Optional custom Content-Type header") created_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Creation time") updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Last update time") # Relationships delivery_logs = relationship("WebhookDeliveryLog", back_populates="webhook", cascade="all, delete-orphan") def __repr__(self): return f"" class WebhookDeliveryLog(Base): """ Webhook delivery tracking. Logs all webhook delivery attempts for auditing and debugging purposes. """ __tablename__ = 'webhook_delivery_log' id = Column(Integer, primary_key=True, autoincrement=True) webhook_id = Column(Integer, ForeignKey('webhooks.id'), nullable=False, index=True, comment="Associated webhook") alert_id = Column(Integer, ForeignKey('alerts.id'), nullable=False, index=True, comment="Associated alert") status = Column(String(20), nullable=True, index=True, comment="Delivery status: success, failed, retrying") response_code = Column(Integer, nullable=True, comment="HTTP response code") response_body = Column(Text, nullable=True, comment="Response body from webhook") error_message = Column(Text, nullable=True, comment="Error message if failed") attempt_number = Column(Integer, nullable=True, comment="Which attempt this was") delivered_at = Column(DateTime, nullable=False, default=datetime.utcnow, comment="Delivery timestamp") # Relationships webhook = relationship("Webhook", back_populates="delivery_logs") alert = relationship("Alert") def __repr__(self): return f"" # ============================================================================ # Settings Table # ============================================================================ class Setting(Base): """ Application configuration key-value store. Stores application settings including SMTP configuration, authentication, and retention policies. Values stored as JSON for complex data types. """ __tablename__ = 'settings' id = Column(Integer, primary_key=True, autoincrement=True) key = Column(String(255), nullable=False, unique=True, index=True, comment="Setting key (e.g., smtp_server)") value = Column(Text, nullable=True, comment="Setting value (JSON for complex values)") updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, comment="Last modification time") def __repr__(self): return f""