From 70d29f9f95e080762b25ba92875d98d215d3ff32 Mon Sep 17 00:00:00 2001 From: Phillip Tarrant Date: Wed, 20 Aug 2025 21:22:28 +0000 Subject: [PATCH] first commit --- .env.example | 10 + .gitignore | 2 + Dockerfile | 34 +++ Readme.md | 92 +++++++ app/__init__.py | 82 +++++++ app/browser.py | 400 +++++++++++++++++++++++++++++++ app/config/bec_words.yaml | 5 + app/config/settings.yaml | 9 + app/config/suspicious_rules.yaml | 80 +++++++ app/enrichment.py | 137 +++++++++++ app/routes.py | 125 ++++++++++ app/static/style.css | 288 ++++++++++++++++++++++ app/templates/base.html | 33 +++ app/templates/index.html | 149 ++++++++++++ app/templates/result.html | 268 +++++++++++++++++++++ app/utils/cache_db.py | 128 ++++++++++ app/utils/io_helpers.py | 115 +++++++++ app/utils/rules_engine.py | 132 ++++++++++ app/utils/settings.py | 144 +++++++++++ app/wsgi.py | 10 + docker-compose.yaml | 13 + docs/roadmap.md | 71 ++++++ entrypoint.sh | 22 ++ openapi/openapi.yaml | 94 ++++++++ requirements.txt | 14 ++ sandbox.sh | 101 ++++++++ 26 files changed, 2558 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 Readme.md create mode 100644 app/__init__.py create mode 100644 app/browser.py create mode 100644 app/config/bec_words.yaml create mode 100644 app/config/settings.yaml create mode 100644 app/config/suspicious_rules.yaml create mode 100644 app/enrichment.py create mode 100644 app/routes.py create mode 100644 app/static/style.css create mode 100644 app/templates/base.html create mode 100644 app/templates/index.html create mode 100644 app/templates/result.html create mode 100644 app/utils/cache_db.py create mode 100644 app/utils/io_helpers.py create mode 100644 app/utils/rules_engine.py create mode 100644 app/utils/settings.py create mode 100644 app/wsgi.py create mode 100644 docker-compose.yaml create mode 100644 docs/roadmap.md create mode 100644 entrypoint.sh create mode 100644 openapi/openapi.yaml create mode 100644 requirements.txt create mode 100755 sandbox.sh diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..1cf47d5 --- /dev/null +++ b/.env.example @@ -0,0 +1,10 @@ +# Flask Configuration +FLASK_ENV=production +SECRET_KEY=changeme_super_long_random_secret +PYTHONUNBUFFERED=1 + +# Playwright (browser automation) +PLAYWRIGHT_BROWSERS_PATH=/ms-playwright + +# Sandbox Storage +SANDBOX_STORAGE=/data diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0737481 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.env +/data/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..adc9235 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,34 @@ +# Use the official Playwright image with browsers preinstalled +FROM mcr.microsoft.com/playwright/python:v1.45.0-jammy + +# Create a non-root user (the base image already has pwuser, we'll keep it) +USER root + +# System deps (whois, dig, etc. β€” handy for later stages) +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + whois dnsutils iputils-ping ca-certificates \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +# Copy requirements first to leverage Docker layer caching +COPY requirements.txt ./ +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code (the double app is needed because the app folder needs to be inside the app folder) +COPY app/ /app/app/ + +COPY entrypoint.sh ./entrypoint.sh +RUN chmod +x /app/entrypoint.sh + +# Create data dir for screenshots/artifacts +RUN mkdir -p /data && chown -R pwuser:pwuser /data /app + +USER pwuser + +# Expose port +EXPOSE 8000 + +# Start server +ENTRYPOINT ["/app/entrypoint.sh"] \ No newline at end of file diff --git a/Readme.md b/Readme.md new file mode 100644 index 0000000..04f47e3 --- /dev/null +++ b/Readme.md @@ -0,0 +1,92 @@ +# URL Sandbox + +A lightweight web-based sandbox for analyzing websites and domains. +It performs WHOIS lookups, GeoIP enrichment, script/form inspection, and provides analyst-friendly output. + +--- + +## πŸš€ Features + +- **Domain & IP Enrichment** + - WHOIS lookups with fallback to raw text when fields are missing + - Explicit handling of privacy-protected WHOIS records (`N/A` or `Possible Privacy`) + - GeoIP (City, Region, Country, Latitude/Longitude) + - ASN, ISP, and network details +- **Flagged Content Analysis** + - Suspicious script detection + - Suspicious form detection + - Nested bullet-style reporting for clarity +- **Improved UX** + - Automatic addition of `http://`, `https://`, and `www.` if only a domain is provided + - Modal spinner to indicate background analysis (`Analyzing website…`) +- **Resilient GeoLite2 Database Management** + - Downloads the MaxMind GeoLite2-City database on first startup + - Checks file age and only re-downloads if older than **14 days** (configurable via environment variable) + +--- + +## βš™οΈ Setup Instructions + +### 1. Clone the Repository +```bash +git clone https://github.com/yourusername/url-sandbox.git +cd url-sandbox +``` + +### 2. Create a MaxMind Account & License Key +1. Go to [MaxMind GeoLite2](https://dev.maxmind.com/geoip/geolite2-free-geolocation-data) +2. Sign up for a free account +3. Navigate to **Account > Manage License Keys** +4. Generate a new license key + +### 3. Configure Environment Variables +All environment variables are loaded from a `.env` file. + +1. Copy the sample file: +```bash + cp .env.example .env +```` + +2. Edit `.env` and set your values (see [`.env.example`](./.env.example) for available options). + +Make sure to add your **MaxMind License Key** under `MAXMIND_LICENSE_KEY`. + + +### 4. Run with Docker Compose +```bash +docker-compose up --build +``` + +This will: +- Build the app +- Download the GeoLite2 database if not present or too old +- Start the web interface + +--- + +## πŸ“ Example Output + +**WHOIS Info** +- Registrar: MarkMonitor, Inc. +- Organization: Possible Privacy +- Creation: 1997-09-15 +- Expiration: 2028-09-14 + +**GeoIP Info** +- IP: 172.66.159.20 + - City: N/A + - Region: N/A + - Country: United States + - Coordinates: (37.751, -97.822) + - ASN: 13335 + - ISP: Cloudflare, Inc. + +--- + +## πŸ“Œ Roadmap +See [Next Steps Checklist](docs/roadmap.md) for planned features: +- Improved UI templates +- Artifact cleanup +- Proxy support (optional) + +--- \ No newline at end of file diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..1d63dbf --- /dev/null +++ b/app/__init__.py @@ -0,0 +1,82 @@ +""" +app/__init__.py + +Application factory and startup hooks for SneakyScope. + +Responsibilities: +- Create the Flask app. +- Load settings (YAML -> dataclasses) with safe defaults. +- Initialize and load the Suspicious Rules Engine from YAML. +- Register blueprints (routes). +- Configure core paths (e.g., SANDBOX_STORAGE). +""" + +import os +import logging +from pathlib import Path +from flask import Flask + +# Local imports +from .utils.settings import get_settings +from .utils import io_helpers # if you need logging/setup later +from .utils import cache_db # available for future injections +from .utils.rules_engine import RuleEngine, load_rules_from_yaml # rules engine +from . import routes # blueprint + + +def create_app() -> Flask: + """ + Create and configure the Flask application instance. + + Returns: + Flask: The configured Flask app. + """ + # Basic app object + app = Flask(__name__, template_folder="templates", static_folder="static") + + # Load settings (safe fallback to defaults if file missing) + settings = get_settings() + + # Secret key loaded from env + app.secret_key = os.getenv("SECRET_KEY") + + # Configure storage directory (bind-mount is still handled by sandbox.sh) + sandbox_storage_default = Path("/data") + app.config["SANDBOX_STORAGE"] = str(sandbox_storage_default) + + # Initialize Suspicious Rules Engine at startup + # Determine rules file path relative to this package + base_dir = Path(__file__).resolve().parent + rules_path = base_dir / "config" / "suspicious_rules.yaml" + + # Create an engine instance (even if file missing, we still want an engine) + engine = RuleEngine() + + # Try to load from YAML if present; log clearly if not + if rules_path.exists(): + try: + loaded_rules = load_rules_from_yaml(rules_path) + # Add rules one-by-one (explicit) + for rule in loaded_rules: + engine.add_rule(rule) + app.logger.info(f"[+] Loaded {len(loaded_rules)} suspicious rules from {rules_path}") + except Exception as e: + app.logger.warning(f"[!] Failed loading rules from {rules_path}: {e}") + else: + app.logger.warning(f"[!] Rules file not found at {rules_path}. Engine will start with zero rules.") + + # Store engine on app config so it is accessible via current_app + app.config["RULE_ENGINE"] = engine + + # Make app name/version available for templates here if you want it globally + app.config["APP_NAME"] = settings.app.name + app.config["APP_VERSION"] = f"v{settings.app.version_major}.{settings.app.version_minor}" + + # Register blueprints + app.register_blueprint(routes.bp) + + # Example log line so we know we booted cleanly + app.logger.info(f"SneakyScope started: {app.config['APP_NAME']} {app.config['APP_VERSION']}") + app.logger.info(f"SANDBOX_STORAGE: {app.config['SANDBOX_STORAGE']}") + + return app diff --git a/app/browser.py b/app/browser.py new file mode 100644 index 0000000..4858f9b --- /dev/null +++ b/app/browser.py @@ -0,0 +1,400 @@ +import re +import uuid +import json +from pathlib import Path +from bs4 import BeautifulSoup +from datetime import datetime +from urllib.parse import urlparse +from typing import Dict, Any, Optional +from playwright.async_api import async_playwright, TimeoutError as PWTimeoutError + +from flask import current_app # access the rule engine from app config + +from app.utils.io_helpers import safe_write +from .enrichment import enrich_url + +def get_rule_engine(): + """ + Retrieve the rules engine instance from the Flask application config. + + Returns: + RuleEngine or None: The engine if available, or None if not configured. + """ + try: + # current_app is only available during an active request context + engine = current_app.config.get("RULE_ENGINE") + return engine + except Exception: + # If called outside a Flask request context, fail gracefully + return None + + +def run_rule_checks(text, category): + """ + Run all rules for a given category against the provided text. + + Args: + text (str): The content to test (e.g., form snippet, inline JS). + category (str): The rule category to run (e.g., 'form' or 'script'). + + Returns: + dict: { + "checks": [ { "rule": str, "category": str, "matched": bool, "reason": Optional[str] }, ... ], + "summary": { "matched_count": int, "total_rules": int } + } + """ + result = { + "checks": [], + "summary": { + "matched_count": 0, + "total_rules": 0 + } + } + + engine = get_rule_engine() + if engine is None: + # No engine configured; return empty but well-formed structure + return result + + try: + # Run engine rules for the specified category + check_results = engine.run_all(text, category=category) + + # Normalize results into the expected structure + total = 0 + matched = 0 + + for item in check_results: + # item is expected to contain: rule, category, matched, reason (optional) + total = total + 1 + if bool(item.get("matched")): + matched = matched + 1 + + normalized = { + "rule": item.get("rule"), + "category": item.get("category"), + "matched": bool(item.get("matched")), + "reason": item.get("reason") + } + result["checks"].append(normalized) + + result["summary"]["matched_count"] = matched + result["summary"]["total_rules"] = total + + except Exception as e: + # If anything goes wrong, keep structure and add a fake failure note + result["checks"].append({ + "rule": "engine_error", + "category": category, + "matched": False, + "reason": f"Rule engine error: {e}" + }) + result["summary"]["matched_count"] = 0 + result["summary"]["total_rules"] = 0 + + return result + + +def analyze_forms(html: str, base_url: str): + """ + Parse forms from the page HTML and apply heuristic flags and rule-based checks. + + Args: + html (str): The full page HTML. + base_url (str): The final URL of the page (used for hostname comparisons). + + Returns: + list[dict]: A list of form analysis dictionaries, each including: + - action, method, inputs + - flagged (bool), flag_reasons (list[str]), status (str) + - rule_checks: dict with "checks" (list) and "summary" (dict) + """ + soup = BeautifulSoup(html, "lxml") + forms_info = [] + page_hostname = urlparse(base_url).hostname + + for form in soup.find_all("form"): + action = form.get("action") + method = form.get("method", "get").lower() + + # Build explicit inputs list + inputs = [] + for inp in form.find_all("input"): + input_name = inp.get("name") + input_type = inp.get("type", "text") + inputs.append({ + "name": input_name, + "type": input_type + }) + + flagged_reasons = [] + + # No action specified + if not action or str(action).strip() == "": + flagged_reasons.append("No action specified") + + # External host + else: + try: + action_host = urlparse(action).hostname + if not str(action).startswith("/") and action_host != page_hostname: + flagged_reasons.append("Submits to a different host") + except Exception: + # If hostname parsing fails, skip this condition quietly + pass + + # HTTP form on HTTPS page + try: + if urlparse(action).scheme == "http" and urlparse(base_url).scheme == "https": + flagged_reasons.append("Submits over insecure HTTP") + except Exception: + # If scheme parsing fails, ignore + pass + + # Hidden password / suspicious hidden inputs + for hidden in form.find_all("input", type="hidden"): + name_value = hidden.get("name") or "" + if "password" in name_value.lower(): + flagged_reasons.append("Hidden password field") + + flagged = bool(flagged_reasons) + + # Serialize a simple form snippet for the rules engine to analyze (category='form') + snippet_lines = [] + snippet_lines.append(f"action={action}") + snippet_lines.append(f"method={method}") + snippet_lines.append("inputs=") + for item in inputs: + snippet_lines.append(f" - name={item.get('name')} type={item.get('type')}") + form_snippet = "\n".join(snippet_lines) + + rule_checks = run_rule_checks(form_snippet, category="form") + + forms_info.append({ + "action": action, + "method": method, + "inputs": inputs, + "flagged": flagged, + "flag_reasons": flagged_reasons, + "status": "flagged" if flagged else "possibly safe", + "rule_checks": rule_checks + }) + + return forms_info + + +def analyze_scripts(html: str, base_url: str = "", engine=None) -> list[dict]: + """ + Analyze + + + +{% endblock %} diff --git a/app/templates/result.html b/app/templates/result.html new file mode 100644 index 0000000..717dcce --- /dev/null +++ b/app/templates/result.html @@ -0,0 +1,268 @@ +{% extends "base.html" %} +{% block content %} + + +
+

Jump to Section

+ +
+ + +
+

URL Overview

+

Submitted URL: {{ submitted_url }}

+

Final URL: {{ final_url }}

+

Permalink: + + {{ request.host_url }}results/{{ uuid }} + +

+

Back to top

+
+ + +
+

Enrichment

+ + + {% if enrichment.whois %} +

WHOIS

+ + + + + + + + + {% for k, v in enrichment.whois.items() %} + + + + + {% endfor %} + +
FieldValue
{{ k.replace('_', ' ').title() }}{{ v }}
+ {% endif %} + + {% if enrichment.raw_whois %} +

Raw WHOIS

+
{{ enrichment.raw_whois }}
+ {% endif %} + + + {% if enrichment.geoip %} +

GeoIP

+ {% for ip, info in enrichment.geoip.items() %} +
+ {{ ip }} + + + {% for key, val in info.items() %} + + + + + {% endfor %} + +
{{ key.replace('_', ' ').title() }}{{ val }}
+
+ {% endfor %} + {% endif %} + + + {% if enrichment.bec_words %} +

BEC Words Detected

+ + + + + + {% for word in enrichment.bec_words %} + + {% endfor %} + +
Word
{{ word }}
+ {% endif %} + + {% if not enrichment.whois and not enrichment.raw_whois and not enrichment.geoip and not enrichment.bec_words %} +

No enrichment data available.

+ {% endif %} + +

Back to top

+
+ + +
+

Redirects

+ {% if redirects %} + + + + + + + + + {% for r in redirects %} + + + + + {% endfor %} + +
StatusURL
{{ r.status }}{{ r.url }}
+ {% else %} +

No redirects detected.

+ {% endif %} +

Back to top

+
+ + +
+

Forms

+ {% if forms %} + {% for form in forms %} +
+ {{ form.status }} β€” Action: {{ form.action }} ({{ form.method | upper }}) + + + + + + + + + {% for inp in form.inputs %} + + + + + {% endfor %} + +
Input NameType
{{ inp.name }}{{ inp.type }}
+ {% if form.flagged %} +

Flag Reasons:

+
    + {% for reason in form.flag_reasons %} +
  • {{ reason }}
  • + {% endfor %} +
+ {% endif %} +
+ {% endfor %} + {% else %} +

No forms detected.

+ {% endif %} +

Back to top

+
+ + +
+

Suspicious Scripts

+ + {% if suspicious_scripts %} + + + + + + + + + + + {% for s in suspicious_scripts %} + + + + + + + + + + + + + + {% endfor %} + +
TypeSource URLContent SnippetMatches (Rules & Heuristics)
{{ s.type or 'unknown' }} + {% if s.src %} + {{ s.src }} + {% else %} + N/A + {% endif %} + + {% if s.content_snippet %} +
+ View snippet +
{{ s.content_snippet }}
+
+ {% else %} + N/A + {% endif %} +
+ {% set has_rules = s.rules and s.rules|length > 0 %} + {% set has_heur = s.heuristics and s.heuristics|length > 0 %} + + {% if has_rules %} + Rules +
    + {% for r in s.rules %} +
  • + {{ r.name }} + {% if r.description %} + β€” {{ r.description }} + {% endif %} +
  • + {% endfor %} +
+ {% endif %} + + {% if has_heur %} + Heuristics +
    + {% for h in s.heuristics %} +
  • {{ h }}
  • + {% endfor %} +
+ {% endif %} + + {% if not has_rules and not has_heur %} + N/A + {% endif %} +
+ {% else %} +

No suspicious scripts detected.

+ {% endif %} + +

Back to top

+
+ + + +
+

Screenshot

+ Screenshot +

Back to top

+
+ + +
+

Source

+

View Source

+

Back to top

+
+ +{% endblock %} diff --git a/app/utils/cache_db.py b/app/utils/cache_db.py new file mode 100644 index 0000000..c752bbb --- /dev/null +++ b/app/utils/cache_db.py @@ -0,0 +1,128 @@ +import json +import time +import sqlite3 +import threading +import functools +from pathlib import Path +from typing import Any, Optional + + +# ---------- SINGLETON DECORATOR ---------- +T = Any + +def singleton_loader(func): + """Ensure only one cache instance exists.""" + cache: dict[str, T] = {} + lock = threading.Lock() + + @functools.wraps(func) + def wrapper(*args, **kwargs) -> T: + with lock: + if func.__name__ not in cache: + cache[func.__name__] = func(*args, **kwargs) + return cache[func.__name__] + return wrapper + +# ---------- CACHE CLASS ---------- +class CacheDB: + """SQLite-backed cache with expiration in minutes, CRUD, auto-cleanup, singleton support.""" + + TABLE_NAME = "cache" + + def __init__(self, db_path: str | Path = "cache.db", default_expiration_minutes: int = 1440): + """ + :param default_expiration_minutes: default expiration in minutes (default 24 hours) + """ + self.db_path = Path(db_path) + self.default_expiration = default_expiration_minutes * 60 # convert minutes -> seconds + + self.conn = sqlite3.connect(self.db_path, check_same_thread=False) + self.conn.row_factory = sqlite3.Row + self._lock = threading.Lock() + self._create_table() + + def _create_table(self): + """Create the cache table if it doesn't exist.""" + with self._lock: + self.conn.execute(f""" + CREATE TABLE IF NOT EXISTS {self.TABLE_NAME} ( + key TEXT PRIMARY KEY, + value TEXT, + expires_at INTEGER + ) + """) + self.conn.commit() + + def _cleanup_expired(self): + """Delete expired rows.""" + now = int(time.time()) + with self._lock: + self.conn.execute( + f"DELETE FROM {self.TABLE_NAME} WHERE expires_at IS NOT NULL AND expires_at < ?", (now,) + ) + self.conn.commit() + + # ---------- CRUD ---------- + def create(self, key: str, value: Any, expires_in_minutes: Optional[int] = None): + """Insert or update a cache entry. expires_in_minutes overrides default expiration.""" + self._cleanup_expired() + if expires_in_minutes is None: + expires_in_seconds = self.default_expiration + else: + expires_in_seconds = expires_in_minutes * 60 + expires_at = int(time.time()) + expires_in_seconds + + value_json = json.dumps(value) + with self._lock: + self.conn.execute( + f"INSERT OR REPLACE INTO {self.TABLE_NAME} (key, value, expires_at) VALUES (?, ?, ?)", + (key, value_json, expires_at) + ) + self.conn.commit() + + def read(self, key: str) -> Optional[Any]: + """Read a cache entry. Auto-cleans expired items.""" + self._cleanup_expired() + with self._lock: + row = self.conn.execute( + f"SELECT * FROM {self.TABLE_NAME} WHERE key = ?", (key,) + ).fetchone() + if not row: + return None + return json.loads(row["value"]) + + def update(self, key: str, value: Any, expires_in_minutes: Optional[int] = None): + """Update a cache entry. Optional expiration in minutes.""" + if expires_in_minutes is None: + expires_in_seconds = self.default_expiration + else: + expires_in_seconds = expires_in_minutes * 60 + expires_at = int(time.time()) + expires_in_seconds + + value_json = json.dumps(value) + with self._lock: + self.conn.execute( + f"UPDATE {self.TABLE_NAME} SET value = ?, expires_at = ? WHERE key = ?", + (value_json, expires_at, key) + ) + self.conn.commit() + + def delete(self, key: str): + with self._lock: + self.conn.execute(f"DELETE FROM {self.TABLE_NAME} WHERE key = ?", (key,)) + self.conn.commit() + + def clear(self): + """Delete all rows from the cache table.""" + with self._lock: + self.conn.execute(f"DELETE FROM {self.TABLE_NAME}") + self.conn.commit() + + def close(self): + self.conn.close() + + +# ---------- SINGLETON INSTANCE ---------- +@singleton_loader +def get_cache(db_path: str = "cache.db", default_expiration_minutes: int = 1440) -> CacheDB: + return CacheDB(db_path=db_path, default_expiration_minutes=default_expiration_minutes) diff --git a/app/utils/io_helpers.py b/app/utils/io_helpers.py new file mode 100644 index 0000000..0243be3 --- /dev/null +++ b/app/utils/io_helpers.py @@ -0,0 +1,115 @@ +import json +import logging +from pathlib import Path +from datetime import datetime + +logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s") + +def safe_write(path: Path | str, content: str, mode="w", encoding="utf-8"): + """Write content to a file safely with logging.""" + path = Path(path) + try: + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, mode, encoding=encoding) as f: + f.write(content) + logging.info(f"[+] Wrote file: {path}") + except Exception as e: + logging.error(f"[!] Failed writing {path}: {e}") + raise + +def get_recent_results(storage_dir: Path, limit: int, logger) -> list[dict]: + """ + Scan the SANDBOX_STORAGE directory for run folders (UUIDs), read each + run's results.json, and return the most recent N entries by file mtime. + + Args: + storage_dir (Path): Base path where UUID run directories live. + limit (int): Maximum number of recent items to return. + logger: Flask or stdlib logger to record non-fatal issues. + + Returns: + list[dict]: Each item includes: + { + "uuid": str, + "submitted_url": str | None, + "final_url": str | None, + "timestamp": str (ISO 8601), + } + Returns an empty list if no runs are found or on error. + """ + items = [] + + try: + # Ensure the storage dir exists + storage_dir.mkdir(parents=True, exist_ok=True) + + # Iterate directories directly under storage_dir + for entry in storage_dir.iterdir(): + try: + if not entry.is_dir(): + # Skip non-directories + continue + + # Expect results.json inside each UUID directory + results_path = entry / "results.json" + if not results_path.exists(): + # Skip folders without results.json + continue + + # Read file metadata (mtime) for sorting and display + stat_info = results_path.stat() + mtime_epoch = stat_info.st_mtime + mtime_iso = datetime.fromtimestamp(mtime_epoch).isoformat(timespec="seconds") + + # Parse a small subset of the JSON for display + submitted_url = None + final_url = None + run_uuid = entry.name + + try: + with open(results_path, "r", encoding="utf-8") as f: + data = json.load(f) + + if isinstance(data, dict): + submitted_url = data.get("submitted_url") + final_url = data.get("final_url") + except Exception as read_err: + # If JSON is malformed or unreadable, log and continue + if logger: + logger.warning(f"[recent] Failed reading {results_path}: {read_err}") + + item = { + "uuid": run_uuid, + "submitted_url": submitted_url, + "final_url": final_url, + "timestamp": mtime_iso + } + + items.append((mtime_epoch, item)) + except Exception as inner_err: + # Keep going; a single bad folder should not break the list + if logger: + logger.warning(f"[recent] Skipping {entry}: {inner_err}") + + # Sort by mtime desc + try: + items.sort(key=lambda t: t[0], reverse=True) + except Exception as sort_err: + if logger: + logger.warning(f"[recent] Sort failed: {sort_err}") + + # Trim to limit without list comprehensions + trimmed = [] + count = 0 + for tup in items: + if count >= limit: + break + trimmed.append(tup[1]) + count = count + 1 + + return trimmed + + except Exception as outer_err: + if logger: + logger.error(f"[recent] Unexpected error while scanning {storage_dir}: {outer_err}") + return [] diff --git a/app/utils/rules_engine.py b/app/utils/rules_engine.py new file mode 100644 index 0000000..ee04eb5 --- /dev/null +++ b/app/utils/rules_engine.py @@ -0,0 +1,132 @@ +""" +rules_engine.py + +A flexible rule-based engine for detecting suspicious patterns in scripts, forms, +or other web artifacts inside SneakyScope. + +Each rule is defined as: + - name: str # Rule identifier + - description: str # Human-readable reason for analysts + - category: str # e.g., 'script', 'form', 'text', 'generic' + - type: str # 'regex' or 'function' + - pattern: str # Regex pattern (if type=regex) + - function: callable # Python function returning (bool, str) (if type=function) + +The framework returns a list of results, with pass/fail and reasoning. +""" + +import re +from pathlib import Path +from typing import Callable, Dict, List, Tuple, Union + +import yaml + + +class Rule: + """Represents a single detection rule.""" + + def __init__( + self, + name: str, + description: str, + category: str, + rule_type: str = "regex", + pattern: str = None, + function: Callable = None, + ): + self.name = name + self.description = description + self.category = category + self.rule_type = rule_type + self.pattern = pattern + self.function = function + + def run(self, text: str) -> Tuple[bool, str]: + """ + Run the rule on given text. + + Returns: + (matched: bool, reason: str) + """ + if self.rule_type == "regex" and self.pattern: + if re.search(self.pattern, text, re.IGNORECASE): + return True, f"Matched regex '{self.pattern}' β†’ {self.description}" + else: + return False, "No match" + elif self.rule_type == "function" and callable(self.function): + return self.function(text) + else: + return False, "Invalid rule configuration" + + +class RuleEngine: + """Loads and executes rules against provided text.""" + + def __init__(self, rules: List[Rule] = None): + self.rules = rules or [] + + def add_rule(self, rule: Rule): + """Add a new rule at runtime.""" + self.rules.append(rule) + + def run_all(self, text: str, category: str = None) -> List[Dict]: + """ + Run all rules against text. + + Args: + text: str β†’ the content to test + category: str β†’ optional, only run rules in this category + + Returns: + List of dicts with rule results. + """ + results = [] + for rule in self.rules: + if category and rule.category != category: + continue + + matched, reason = rule.run(text) + results.append( + { + "rule": rule.name, + "category": rule.category, + "matched": matched, + "reason": reason if matched else None, + } + ) + return results + + +def load_rules_from_yaml(yaml_file: Union[str, Path]) -> List[Rule]: + """ + Load rules from a YAML file. + + Example YAML format: + - name: suspicious_eval + description: "Use of eval() in script" + category: script + type: regex + pattern: "\\beval\\(" + + - name: password_reset + description: "Password reset wording" + category: text + type: regex + pattern: "reset password" + + """ + rules = [] + with open(yaml_file, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + + for item in data: + rule = Rule( + name=item["name"], + description=item["description"], + category=item["category"], + rule_type=item.get("type", "regex"), + pattern=item.get("pattern"), + ) + rules.append(rule) + + return rules diff --git a/app/utils/settings.py b/app/utils/settings.py new file mode 100644 index 0000000..76a2cbc --- /dev/null +++ b/app/utils/settings.py @@ -0,0 +1,144 @@ +# +# Note the settings file is hardcoded in this class at the top after imports. +# +# To make a new settings section, just add the setting dict to your yaml +# and then define the data class below in the config data classes area. +# +# Example use from anywhere - this will always return the same singleton +# from settings import get_settings +# def main(): +# settings = get_settings() +# print(settings.database.host) # Autocomplete works +# print(settings.logging.level) + +# if __name__ == "__main__": +# main() + +import functools +from pathlib import Path +from typing import Any, Callable, TypeVar +from dataclasses import dataclass, fields, is_dataclass, field, MISSING + +import logging +import sys +logger = logging.getLogger(__file__) + +try: + import yaml +except ModuleNotFoundError: + msg = ( + "Required modules are not installed. " + "Can not continue with module / application loading.\n" + "Install it with: pip install -r requirements" + ) + print(msg, file=sys.stderr) + logger.error(msg) + exit() + +BASE_DIR = Path(__file__).resolve().parent.parent +DEFAULT_SETTINGS_FILE = BASE_DIR / "config" / "settings.yaml" + +# ---------- CONFIG DATA CLASSES ---------- +@dataclass +class Cache_Config: + whois_cache_days: int = 7 + geoip_cache_days: int = 7 + recent_runs_count: int = 10 + + +@dataclass +class AppConfig: + name: str = "MyApp" + version_major: int = 1 + version_minor: int = 0 + + +@dataclass +class Settings: + cache: Cache_Config = field(default_factory=Cache_Config) + app: AppConfig = field(default_factory=AppConfig) + + @classmethod + def from_yaml(cls, path: str | Path) -> "Settings": + try: + """Load settings from YAML file into a Settings object.""" + with open(path, "r", encoding="utf-8") as f: + raw: dict[str, Any] = yaml.safe_load(f) or {} + except FileNotFoundError: + logger.warning(f"Settings file {path} not found! Using default settings.") + raw = {} + + init_kwargs = {} + for f_def in fields(cls): + yaml_value = raw.get(f_def.name, None) + + # Determine default value from default_factory or default + if f_def.default_factory is not MISSING: + default_value = f_def.default_factory() + elif f_def.default is not MISSING: + default_value = f_def.default + else: + default_value = None + + # Handle nested dataclasses + if is_dataclass(f_def.type): + if isinstance(yaml_value, dict): + # Merge YAML values with defaults + merged_data = {fld.name: getattr(default_value, fld.name) for fld in fields(f_def.type)} + merged_data.update(yaml_value) + init_kwargs[f_def.name] = f_def.type(**merged_data) + else: + init_kwargs[f_def.name] = default_value + else: + init_kwargs[f_def.name] = yaml_value if yaml_value is not None else default_value + + return cls(**init_kwargs) + + +# ---------- SINGLETON DECORATOR ---------- +T = TypeVar("T") + +def singleton_loader(func: Callable[..., T]) -> Callable[..., T]: + """Ensure the function only runs once, returning the cached value.""" + cache: dict[str, T] = {} + + @functools.wraps(func) + def wrapper(*args, **kwargs) -> T: + if func.__name__ not in cache: + cache[func.__name__] = func(*args, **kwargs) + return cache[func.__name__] + + return wrapper + + +# ---------- SINGLETON DECORATOR ---------- +T = TypeVar("T") + +def singleton_loader(func: Callable[..., T]) -> Callable[..., T]: + """Decorator to ensure the settings are loaded only once.""" + cache: dict[str, T] = {} + + @functools.wraps(func) + def wrapper(*args, **kwargs) -> T: + if func.__name__ not in cache: + cache[func.__name__] = func(*args, **kwargs) + return cache[func.__name__] + + return wrapper + + +@singleton_loader +def get_settings(config_path: str | Path | None = None) -> Settings: + """ + Returns the singleton Settings instance. + + Args: + config_path: Optional path to the YAML config file. If not provided, + defaults to 'config/settings.yaml' in the current working directory. + """ + if config_path is None: + config_path = DEFAULT_SETTINGS_FILE + else: + config_path = Path(config_path) + + return Settings.from_yaml(config_path) \ No newline at end of file diff --git a/app/wsgi.py b/app/wsgi.py new file mode 100644 index 0000000..745c665 --- /dev/null +++ b/app/wsgi.py @@ -0,0 +1,10 @@ +""" +app/wsgi.py + +Gunicorn entrypoint for SneakyScope. +""" + +from . import create_app + +# Gunicorn will look for "app" +app = create_app() diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..6bbe6d7 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,13 @@ +services: + web: + build: . + container_name: url-sandbox-web + ports: + - "8000:8000" + env_file: + - .env + volumes: + - ./data:/data + security_opt: + - no-new-privileges:true + restart: unless-stopped \ No newline at end of file diff --git a/docs/roadmap.md b/docs/roadmap.md new file mode 100644 index 0000000..e6ccf3f --- /dev/null +++ b/docs/roadmap.md @@ -0,0 +1,71 @@ + +## Priority 1 – Core Functionality / Stability + +**Permissions / Storage Paths** + +* βœ… `/data` and other mounted volumes setup handled by `sandbox.sh` +* βœ… Downloads, screenshots, and HTML artifacts are written correctly (`safe_write` in `io_helpers.py`) + +--- + +## Priority 2 – Data Accuracy / Enrichment + +**WHOIS & GeoIP Enhancements** + +* βœ… Implemented Python-based WHOIS parsing with fallback to raw WHOIS text +* βœ… Default `"Possible Privacy"` or `"N/A"` for missing WHOIS fields +* βœ… GeoIP + ASN + ISP info displayed per IP in **accordion tables** +* βœ… Cache WHOIS and GeoIP results to reduce repeated queries + +**Suspicious Scripts & Forms** + +* [ ] Expand flagged script and form output with reasons for analysts +* [ ] Show each check and if it triggered flags (pass/fail for each check) + +**Add Suspicious BEC words** + +* βœ… Look for things like `"reset password"` +* βœ… Make configurable via a config file (yaml doc with rules) + +--- + +## Priority 3 – User Interface / UX + +**Front Page / Input Handling** + +* [ ] Automatically prepend `http://`, `https://`, and/or `www.` if a user only enters a domain + +**Result Templates / Cards** +* [ ] load sourcecode for webpage in a code editor view or code block on page so that it's easier to read +* [ ] Update result cards with clear, analyst-friendly explanations +* [ ] Include flagged logic and reason lists for scripts and forms +* βœ… Display GeoIP results in accordion tables (βœ… done) + +--- + +## Priority 4 – API Layer + +**API Endpoints** + +* [ ] Add `/screenshot` endpoint +* [ ] Add `/source` endpoint +* [ ] Add `/analyse` endpoint + +**OpenAPI + Docs** + +* [ ] Create initial `openapi/openapi.yaml` spec file +* [ ] Serve spec at `/api/openapi.yaml` +* [ ] Wire up Swagger UI or Redoc at `/docs` for interactive API exploration + +--- + +## Priority 5 – Optional / Cleanup + +**Artifact Management** +* [ ] Implement saving of results from a UUID as "results.json" so we don't rerun all the rules and just load from cache. +* [ ] Implement cleanup or retention policy for old artifacts +* [ ] Optional: Add periodic maintenance scripts for storage + +**Extra Features** + +* [ ] Placeholder for additional features (e.g., bulk URL analysis, alerting, integrations) diff --git a/entrypoint.sh b/entrypoint.sh new file mode 100644 index 0000000..9113bf9 --- /dev/null +++ b/entrypoint.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Ensure browsers are installed (the base image already has them, but this is safe) +python - <<'PY' +from pathlib import Path +from playwright.__main__ import main as pw +# no-op import ensures playwright is present; install step below is quick if cached +PY + +# Run the app via gunicorn +# graceful-timeout - 300 ensures long page loads aren’t killed prematurely +# threads - 8 gives us more threads to work with +# gthread allows each worker to handle multiple threads, so async/blocking tasks like Playwright won’t block the whole worker +exec gunicorn \ + --bind 0.0.0.0:8000 \ + --workers 2 \ + --threads 8 \ + --worker-class gthread \ + --timeout 300 \ + --graceful-timeout 300 \ + "app.wsgi:app" diff --git a/openapi/openapi.yaml b/openapi/openapi.yaml new file mode 100644 index 0000000..d2f1a7b --- /dev/null +++ b/openapi/openapi.yaml @@ -0,0 +1,94 @@ +openapi: 3.0.3 +info: + title: URL Sandbox API + version: 0.1.0 + description: API for analyzing and extracting website artifacts. + +servers: + - url: http://localhost:5000/api + description: Local development + +paths: + /screenshot: + post: + summary: Capture a screenshot of a website + requestBody: + required: true + content: + application/json: + schema: + type: object + required: + - url + properties: + url: + type: string + example: "http://example.com" + responses: + '200': + description: Screenshot image returned + content: + image/png: {} + '400': + description: Invalid request + + /source: + post: + summary: Retrieve HTML source of a website + requestBody: + required: true + content: + application/json: + schema: + type: object + required: + - url + properties: + url: + type: string + example: "http://example.com" + responses: + '200': + description: Raw HTML source + content: + text/html: + schema: + type: string + '400': + description: Invalid request + + /analyse: + post: + summary: Run full analysis on a website + requestBody: + required: true + content: + application/json: + schema: + type: object + required: + - url + properties: + url: + type: string + example: "http://example.com" + responses: + '200': + description: JSON with enrichment and analysis results + content: + application/json: + schema: + type: object + properties: + url: + type: string + whois: + type: object + geoip: + type: object + flags: + type: array + items: + type: string + '400': + description: Invalid request diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..10ee22a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,14 @@ +Flask>=3.0.3 +Jinja2>=3.1.4 +Werkzeug>=3.0.3 +itsdangerous>=2.2.0 +click>=8.1.7 +lxml>=5.3.0 +playwright==1.45.0 # Playwright stack +beautifulsoup4>=4.12.3 # HTML parsing, etc. +gunicorn>=22.0.0 # Production server +python-whois # For WHOIS lookups +geoip2 # MaxMind GeoLite2 database for IP geolocation +dnspython # For DNS lookups, including A/AAAA records +ipwhois +PyYAML \ No newline at end of file diff --git a/sandbox.sh b/sandbox.sh new file mode 100755 index 0000000..ba9d9b3 --- /dev/null +++ b/sandbox.sh @@ -0,0 +1,101 @@ +#!/usr/bin/env bash +set -euo pipefail + +# --- CONFIG --- +SANDBOX_STORAGE="${SANDBOX_STORAGE:-./data}" +APP_URL="${APP_URL:-http://localhost:8000}" + +# --- FUNCTIONS --- +prepare_storage() { + echo "[*] Checking storage path: $SANDBOX_STORAGE" + if [ ! -d "$SANDBOX_STORAGE" ]; then + echo " -> Creating $SANDBOX_STORAGE on host" + sudo mkdir -p "$SANDBOX_STORAGE" + fi + + echo " -> Setting ownership to Playwright user (pwuser / UID 1000)" + sudo chown -R 1000:1000 "$SANDBOX_STORAGE" + sudo chmod -R 755 "$SANDBOX_STORAGE" + + echo "[+] Storage ready." +} + +start_stack() { + prepare_storage + echo "[*] Building Docker image..." + docker compose build + + if [[ "${1:-}" == "-d" ]]; then + echo "[*] Starting services in detached mode..." + docker compose up -d + else + echo "[*] Starting services (attached)..." + docker compose up + fi +} + +stop_stack() { + echo "[*] Stopping services..." + docker compose down +} + +clean_stack() { + echo "[*] Removing containers, networks, and volumes..." + docker compose down -v --remove-orphans +} + +restart_stack() { + stop_stack + echo "[*] Restarting services..." + start_stack -d +} + +logs_stack() { + echo "[*] Showing logs (Ctrl+C to exit)..." + docker compose logs -f +} + +status_stack() { + echo "[*] Current service status:" + docker compose ps +} + +healthcheck_stack() { + echo "[*] Running health check on $APP_URL ..." + if curl -fsS "$APP_URL" > /dev/null; then + echo "[+] Service is healthy and reachable." + else + echo "[!] Service is NOT reachable at $APP_URL" + exit 1 + fi +} + +# --- MAIN --- +case "${1:-}" in + start) + shift + start_stack "$@" + ;; + stop) + stop_stack + ;; + restart) + restart_stack + ;; + clean) + clean_stack + ;; + logs) + logs_stack + ;; + status) + status_stack + ;; + healthcheck) + healthcheck_stack + ;; + *) + echo "Usage: $0 {start [-d for detached mode] | stop | restart | clean | logs | status | healthcheck}" + exit 1 + ;; +esac