commit cd3cea12bed68b4b3c9784b0312f7206ab472e88 Author: Phillip Tarrant Date: Mon Nov 10 14:20:27 2025 -0600 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7d8080b --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +venv/ +.venv/ +__pycache__ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..965ad1b --- /dev/null +++ b/README.md @@ -0,0 +1,36 @@ +# SneakySleuth +## “Be the sleuth — gather facts from remote Linux hosts.” +Collect forensic artifacts from a remote Linux host (Debian/RHEL aware) via SSH/SFTP. + +Features: +- SSH auth via private key or password +- SFTP file download and remote command capture (stdout -> files) +- Optional sudo support (provide sudo password or require key-based sudo) +- Default artifact lists for Debian-style and RHEL-style systems (easily extended) +- Creates a local case directory like: ./case_123_20251110T1320Z/ +- Writes a manifest.json and checksums.sha256 for integrity and audit +- Detailed logging + +Requirements: +- Python 3.8+ +- Paramiko (`pip install paramiko`) +- (optional) colorama for nicer console coloring, but not required + +NOTES & CAUTION: +- Some artifacts (e.g., /etc/shadow) are extremely sensitive. Only collect when permitted. +- Sudo password is sent only over the SSH channel to the remote host (not stored). + +## Examples: +```bash +# Basic SSH Key based +python3 remote_forensics_collect.py 10.0.0.5 --case 123 --user ubuntu --key ~/.ssh/id_rsa + +# Prompt for Password +python3 remote_forensics_collect.py 10.0.0.5 --case 123 --user admin --password --sudo + +# collect an extra file and run an extra command ad-hoc +python3 remote_forensics_collect.py 10.0.0.5 --case 123 --user root --key ~/.ssh/id_rsa --extra-file /etc/hosts --extra-cmd "ss -tunap" + +# collect sensitive files (/etc/shadow etc...) +python3 remote_forensics_collect.py 10.0.0.5 --case 123 --user root --key ~/.ssh/id_rsa --collect-sensitive +``` \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0aed832 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +bcrypt==5.0.0 +cffi==2.0.0 +cryptography==46.0.3 +invoke==2.2.1 +paramiko==4.0.0 +pycparser==2.23 +PyNaCl==1.6.1 diff --git a/sneakysleuth.py b/sneakysleuth.py new file mode 100644 index 0000000..ae205b4 --- /dev/null +++ b/sneakysleuth.py @@ -0,0 +1,539 @@ +#!/usr/bin/env python3 +""" +remote_forensics_collect.py + +Collect forensic artifacts from a remote Linux host (Debian/RHEL aware) via SSH/SFTP. + +Features: +- SSH auth via private key or password +- SFTP file download and remote command capture (stdout -> files) +- Optional sudo support (provide sudo password or require key-based sudo) +- Default artifact lists for Debian-style and RHEL-style systems (easily extended) +- Creates a local case directory like: ./case_123_20251110T1320Z/ +- Writes a manifest.json and checksums.sha256 for integrity and audit +- Detailed logging + +Requirements: +- Python 3.8+ +- Paramiko (`pip install paramiko`) +- (optional) colorama for nicer console coloring, but not required + +NOTES & CAUTION: +- Some artifacts (e.g., /etc/shadow) are extremely sensitive. Only collect when permitted. +- Sudo password is sent only over the SSH channel to the remote host (not stored). +""" + +from __future__ import annotations +import argparse +import getpass +import json +import logging +import os +import stat +import hashlib +import pathlib +import sys +import time +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import List, Dict, Optional, Tuple + +import paramiko + +# ----------------------- +# Configuration dataclass +# ----------------------- +@dataclass +class CollectorConfig: + host: str + port: int = 22 + username: str = "root" + key_filename: Optional[str] = None # path to private key file + password: Optional[str] = None # SSH password (if not using key) + sudo_password: Optional[str] = None # password to use with sudo -S if needed + case_id: str = "000" + outdir: pathlib.Path = pathlib.Path.cwd() + timeout: int = 30 + accept_unknown_host_key: bool = True + artifact_set: str = "auto" # "debian", "rhel", or "auto" + verbose: bool = False + collect_sensitive: bool = False # if True, collects e.g., /etc/shadow + extra_files: List[str] = field(default_factory=list) + extra_cmds: List[str] = field(default_factory=list) + +# ----------------------- +# Useful defaults +# ----------------------- +DEBIAN_FILES = [ + "/var/log/auth.log", + "/var/log/syslog", + "/etc/ssh/sshd_config", + "/etc/ssh/ssh_config", + "/etc/apt/sources.list", + "/var/log/apt/history.log", + "/var/log/apt/term.log", + "/etc/crontab", + "/var/spool/cron/crontabs", # per-user cron (dir) + "/etc/cron.d", + "/etc/cron.daily", + "/etc/cron.hourly", + "/etc/passwd", + "/etc/group", + "/etc/sudoers", + "/var/log/messages", # sometimes present +] + +RHEL_FILES = [ + "/var/log/secure", + "/var/log/messages", + "/etc/ssh/sshd_config", + "/etc/ssh/ssh_config", + "/etc/yum.repos.d", + "/etc/crontab", + "/var/spool/cron", + "/etc/cron.d", + "/etc/passwd", + "/etc/group", + "/etc/sudoers", +] + +# extremely sensitive - only collect if explicitly allowed +SENSITIVE_FILES = [ + "/etc/shadow", + "/root/.bash_history", +] + +# useful commands to snapshot system state +DEFAULT_COMMANDS = [ + "uname -a", + "uptime", + "who -a", + "w", + "ps auxww", + "ss -tulpen || netstat -tulpen", + "last -n 200", + "df -h", + "mount", + "ip addr show", + "ip route show", + "lsblk -f", + "cat /proc/cpuinfo", + "cat /proc/meminfo", + "journalctl -n 500 --no-pager", # systemd logs (may require sudo) + "dmesg --level=err,warn || true", # kernel messages + "crontab -l || true", + "sudo -l || true", +] + +# ----------------------- +# Helper functions +# ----------------------- +def iso_ts_now() -> str: + return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + + +def sha256_of_file(path: pathlib.Path) -> str: + h = hashlib.sha256() + with path.open("rb") as fh: + for chunk in iter(lambda: fh.read(8192), b""): + h.update(chunk) + return h.hexdigest() + + +# ----------------------- +# Main Collector Class +# ----------------------- +class RemoteForensicsCollector: + def __init__(self, cfg: CollectorConfig): + self.cfg = cfg + self.logger = logging.getLogger("RFC") + if cfg.verbose: + logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s") + else: + logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") + + self.case_dir = self._make_case_dir() + self.manifest: Dict[str, Dict] = { + "case_id": cfg.case_id, + "host": cfg.host, + "port": cfg.port, + "user": cfg.username, + "started_at": iso_ts_now(), + "collected": [], + "errors": [], + } + self.ssh_client: Optional[paramiko.SSHClient] = None + self.sftp: Optional[paramiko.SFTPClient] = None + + def _make_case_dir(self) -> pathlib.Path: + ts = iso_ts_now() + name = f"case_{self.cfg.case_id}_{ts}" + path = (self.cfg.outdir / name).resolve() + path.mkdir(parents=True, exist_ok=False) + self.logger.info("Creating case directory: %s", path) + return path + + def _connect(self) -> None: + self.logger.info("Connecting to %s@%s:%d ...", self.cfg.username, self.cfg.host, self.cfg.port) + client = paramiko.SSHClient() + + if self.cfg.accept_unknown_host_key: + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + else: + client.load_system_host_keys() + + try: + pkey = None + key_filename = self.cfg.key_filename + if key_filename: + # Paramiko will attempt to use the key file automatically if key_filename passed in connect() + self.logger.debug("Using key file: %s", key_filename) + + client.connect( + hostname=self.cfg.host, + port=self.cfg.port, + username=self.cfg.username, + password=self.cfg.password, + key_filename=key_filename, + timeout=self.cfg.timeout, + look_for_keys=bool(key_filename is None), + allow_agent=True, + ) + except Exception as exc: + self.logger.exception("SSH connection failed: %s", exc) + raise + + self.ssh_client = client + self.sftp = client.open_sftp() + self.logger.info("Connected and SFTP opened.") + + def _close(self) -> None: + try: + if self.sftp: + self.sftp.close() + if self.ssh_client: + self.ssh_client.close() + self.logger.info("SSH connection closed.") + except Exception: + self.logger.debug("Error closing SSH client", exc_info=True) + + def _remote_stat(self, path: str) -> Optional[paramiko.SFTPAttributes]: + try: + return self.sftp.stat(path) + except IOError: + return None + + def _download_file(self, remote_path: str, local_path: pathlib.Path) -> bool: + """Download single remote file via SFTP. Create parent dirs as needed.""" + try: + local_path.parent.mkdir(parents=True, exist_ok=True) + self.logger.debug("Attempting SFTP get: %s -> %s", remote_path, local_path) + self.sftp.get(remote_path, str(local_path)) + self.logger.info("Downloaded: %s", remote_path) + self._record_collected(remote_path, local_path, method="sftp") + return True + except IOError as exc: + self.logger.warning("SFTP get failed for %s: %s", remote_path, exc) + return False + except Exception as exc: + self.logger.exception("Unexpected error downloading %s: %s", remote_path, exc) + return False + + def _record_collected(self, remote_path: str, local_path: pathlib.Path, method: str = "cmd"): + entry = { + "remote_path": remote_path, + "local_path": str(local_path.relative_to(self.case_dir)), + "method": method, + "size": local_path.stat().st_size if local_path.exists() else None, + "sha256": sha256_of_file(local_path) if local_path.exists() else None, + "collected_at": iso_ts_now(), + } + self.manifest["collected"].append(entry) + + def _run_remote_cmd(self, cmd: str, sudo: bool = False, capture_stdout_to: Optional[pathlib.Path] = None) -> Tuple[int, str, str]: + """ + Run command via SSH. If sudo=True and sudo_password provided, uses 'sudo -S -p "" ' and feeds password. + Returns (exit_status, stdout, stderr) + """ + if self.ssh_client is None: + raise RuntimeError("SSH client not connected") + + effective_cmd = cmd + need_password = False + if sudo: + # Use -S (read password from stdin) and -p "" to avoid prompt text + effective_cmd = f"sudo -S -p '' {cmd}" + need_password = bool(self.cfg.sudo_password) + + self.logger.debug("Running remote cmd: %s (sudo=%s)", cmd, sudo) + stdin, stdout, stderr = self.ssh_client.exec_command(effective_cmd, timeout=self.cfg.timeout, get_pty=True) + + if need_password: + # send sudo password followed by newline + self.logger.debug("Sending sudo password to remote") + stdin.write(self.cfg.sudo_password + "\n") + stdin.flush() + + stdout_str = stdout.read().decode(errors="replace") + stderr_str = stderr.read().decode(errors="replace") + exit_status = stdout.channel.recv_exit_status() + self.logger.debug("Command exit: %s", exit_status) + + if capture_stdout_to: + capture_stdout_to.parent.mkdir(parents=True, exist_ok=True) + with capture_stdout_to.open("wb") as fh: + fh.write(stdout_str.encode()) + self._record_collected(f"{'(sudo) ' if sudo else ''}{cmd}", capture_stdout_to, method="cmd") + + return exit_status, stdout_str, stderr_str + + def detect_os_family(self) -> str: + """Try to detect if target is Debian-family or RHEL-family. Returns 'debian', 'rhel', or 'unknown'.""" + try: + self.logger.debug("Detecting OS family") + rc, out, err = self._run_remote_cmd("cat /etc/os-release || true", sudo=False) + if "ID_LIKE=" in out: + if "debian" in out.lower(): + return "debian" + if "rhel" in out.lower() or "fedora" in out.lower() or "centos" in out.lower(): + return "rhel" + if "ID=" in out: + if "debian" in out.lower() or "ubuntu" in out.lower(): + return "debian" + if "rhel" in out.lower() or "fedora" in out.lower() or "centos" in out.lower() or "redhat" in out.lower(): + return "rhel" + # fallback - check for yum/dnf/apt + rc, out, err = self._run_remote_cmd("which apt-get || which yum || which dnf || true") + if "apt-get" in out: + return "debian" + if "yum" in out or "dnf" in out: + return "rhel" + except Exception: + self.logger.debug("OS detection failed", exc_info=True) + return "unknown" + + def collect_files(self, file_list: List[str]) -> None: + """Try to get each entry. If it's a dir, recursively fetch contents (non-recursive SFTP requires walking)""" + for remote_path in file_list: + try: + statobj = self._remote_stat(remote_path) + if statobj is None: + self.logger.info("Not present on remote: %s", remote_path) + self.manifest["errors"].append({"path": remote_path, "error": "not found"}) + continue + + # If directory, walk & fetch files + if stat.S_ISDIR(statobj.st_mode): + self.logger.info("Remote path is directory — walking: %s", remote_path) + self._download_directory(remote_path) + else: + local_path = self.case_dir / "files" / remote_path.lstrip("/") + ok = self._download_file(remote_path, local_path) + if not ok: + # fallback: try to cat the file via remote command (use sudo) + self.logger.info("Attempting fallback 'cat' for: %s", remote_path) + out_file = self.case_dir / "files" / (remote_path.lstrip("/").replace("/", "_") + ".txt") + rc, out, err = self._run_remote_cmd(f"cat {remote_path}", sudo=True, capture_stdout_to=out_file) + if rc != 0: + self.logger.warning("Fallback cat failed for %s (rc=%s)", remote_path, rc) + self.manifest["errors"].append({"path": remote_path, "error": f"download & cat failed rc={rc}"}) + # continue to next + except Exception as exc: + self.logger.exception("Error collecting file %s: %s", remote_path, exc) + self.manifest["errors"].append({"path": remote_path, "error": str(exc)}) + + def _download_directory(self, remote_dir: str) -> None: + """Recursively walk a remote directory and download files. Danger: can be large; use responsibly.""" + # Simple recursive walker using SFTP.listdir_attr + def walk(rdir: str, local_parent: pathlib.Path): + try: + entries = self.sftp.listdir_attr(rdir) + except IOError as exc: + self.logger.warning("Failed to listdir %s: %s", rdir, exc) + self.manifest["errors"].append({"path": rdir, "error": str(exc)}) + return + for e in entries: + name = e.filename + rpath = rdir.rstrip("/") + "/" + name + if stat.S_ISDIR(e.st_mode): + walk(rpath, local_parent / name) + else: + local_file = local_parent / name + local_file.parent.mkdir(parents=True, exist_ok=True) + try: + self.logger.debug("Downloading file in dir: %s", rpath) + self.sftp.get(rpath, str(local_file)) + self._record_collected(rpath, local_file, method="sftp") + except Exception as exc: + self.logger.warning("Failed downloading %s: %s", rpath, exc) + self.manifest["errors"].append({"path": rpath, "error": str(exc)}) + + local_root = self.case_dir / "files" / remote_dir.lstrip("/") + walk(remote_dir, local_root) + + def collect_commands(self, commands: List[str]) -> None: + cmds_dir = self.case_dir / "commands" + for i, cmd in enumerate(commands, start=1): + # if command likely requires sudo (journalctl) we will try sudo first and then fallback without + cmd_label = cmd.replace(" ", "_").replace("/", "_").replace("|", "_")[:80] + out_file = cmds_dir / f"{i:02d}_{cmd_label}.txt" + try: + # prefer sudo if configured + sudo_first = False + if "journalctl" in cmd or cmd.startswith("dmesg") or "ss" in cmd: + sudo_first = True + + if sudo_first: + rc, out, err = self._run_remote_cmd(cmd, sudo=True, capture_stdout_to=out_file) + if rc != 0: + self.logger.debug("Sudo cmd failed rc=%s, retrying without sudo: %s", rc, cmd) + rc2, out2, err2 = self._run_remote_cmd(cmd, sudo=False, capture_stdout_to=out_file) + else: + rc, out, err = self._run_remote_cmd(cmd, sudo=False, capture_stdout_to=out_file) + except Exception as exc: + self.logger.exception("Error running command %s: %s", cmd, exc) + self.manifest["errors"].append({"cmd": cmd, "error": str(exc)}) + + def finalize(self) -> None: + # manifest and checksums + self.manifest["finished_at"] = iso_ts_now() + manifest_path = self.case_dir / "manifest.json" + with manifest_path.open("w") as fh: + json.dump(self.manifest, fh, indent=2) + self.logger.info("Wrote manifest: %s", manifest_path) + + # create checksums file of everything collected under case_dir + checksums_path = self.case_dir / "checksums.sha256" + with checksums_path.open("w") as outfh: + for p in sorted(self.case_dir.rglob("*")): + if p.is_file(): + h = sha256_of_file(p) + outfh.write(f"{h} {p.relative_to(self.case_dir)}\n") + self.logger.info("Wrote checksums: %s", checksums_path) + + def run(self) -> None: + # Connect + try: + self._connect() + except Exception as exc: + self.logger.error("Could not connect to remote host: %s", exc) + self.manifest["errors"].append({"connect": str(exc)}) + self.finalize() + return + + try: + # detect OS if requested + os_family = "unknown" + if self.cfg.artifact_set == "auto": + os_family = self.detect_os_family() + self.logger.info("Detected OS family: %s", os_family) + else: + os_family = self.cfg.artifact_set + + # prepare artifact list + files_to_collect = [] + if os_family == "debian": + files_to_collect.extend(DEBIAN_FILES) + elif os_family == "rhel": + files_to_collect.extend(RHEL_FILES) + else: + files_to_collect.extend(DEBIAN_FILES + RHEL_FILES) + + # include extra files user requested + if self.cfg.extra_files: + files_to_collect.extend(self.cfg.extra_files) + + if self.cfg.collect_sensitive: + files_to_collect.extend(SENSITIVE_FILES) + else: + # Only log that we skipped them + for sf in SENSITIVE_FILES: + self.logger.info("Skipping sensitive file by default (set --collect-sensitive to include): %s", sf) + + # dedupe + files_to_collect = sorted(list(dict.fromkeys(files_to_collect))) + + # collect files (SFTP preferred) + self.collect_files(files_to_collect) + + # run commands + cmds = DEFAULT_COMMANDS.copy() + if self.cfg.extra_cmds: + cmds.extend(self.cfg.extra_cmds) + # dedupe commands + seen = set() + cmds = [c for c in cmds if not (c in seen or seen.add(c))] + + self.collect_commands(cmds) + + except Exception as exc: + self.logger.exception("Unhandled exception during collection: %s", exc) + self.manifest["errors"].append({"unhandled": str(exc)}) + finally: + self._close() + self.finalize() + + +# ----------------------- +# CLI +# ----------------------- +def parse_args() -> CollectorConfig: + ap = argparse.ArgumentParser(description="Remote Forensics Collector (SSH/SFTP)") + ap.add_argument("host", help="remote host (IP or hostname)") + ap.add_argument("--case", "-c", dest="case_id", default=f"{int(time.time())}", help="case id/name") + ap.add_argument("--user", "-u", dest="username", default=getpass.getuser(), help="username to SSH as") + ap.add_argument("--port", "-p", dest="port", type=int, default=22) + ap.add_argument("--key", dest="key_filename", help="private key file path (e.g. ~/.ssh/id_rsa)") + ap.add_argument("--password", dest="password", action="store_true", help="prompt for SSH password (if not using key)") + ap.add_argument("--sudo", dest="sudo", action="store_true", help="prompt for sudo password to use for privileged commands (if needed)") + ap.add_argument("--outdir", dest="outdir", default=".", help="where to place case_ directory") + ap.add_argument("--artifact-set", dest="artifact_set", choices=["auto", "debian", "rhel", "all"], default="auto", help="artifact list to use") + ap.add_argument("--collect-sensitive", dest="collect_sensitive", action="store_true", help="also collect sensitive files (e.g., /etc/shadow) - use with caution") + ap.add_argument("--extra-file", dest="extra_files", action="append", help="extra remote file or directory to pull (can be specified multiple times)") + ap.add_argument("--extra-cmd", dest="extra_cmds", action="append", help="extra remote command to run and save output (can be specified multiple times)") + ap.add_argument("--no-accept-hostkey", dest="accept_unknown_host_key", action="store_false", help="do NOT auto-accept unknown host key (load system keys instead)") + ap.add_argument("--verbose", "-v", dest="verbose", action="store_true") + args = ap.parse_args() + + ssh_pass = None + if args.password: + ssh_pass = getpass.getpass(prompt=f"SSH password for {args.username}@{args.host}: ") + + sudo_pass = None + if args.sudo: + sudo_pass = getpass.getpass(prompt="sudo password for remote user (will be sent over SSH channel if needed): ") + + outdir_path = pathlib.Path(args.outdir).expanduser().resolve() + + cfg = CollectorConfig( + host=args.host, + port=args.port, + username=args.username, + key_filename=args.key_filename if args.key_filename else None, + password=ssh_pass, + sudo_password=sudo_pass, + case_id=args.case_id, + outdir=outdir_path, + accept_unknown_host_key=args.accept_unknown_host_key, + artifact_set=(args.artifact_set if args.artifact_set != "all" else "auto"), + verbose=args.verbose, + collect_sensitive=args.collect_sensitive, + extra_files=args.extra_files or [], + extra_cmds=args.extra_cmds or [], + ) + return cfg + + +def main(): + cfg = parse_args() + collector = RemoteForensicsCollector(cfg) + try: + collector.run() + except KeyboardInterrupt: + print("\nInterrupted by user - finalizing partial results") + collector.finalize() + sys.exit(1) + + +if __name__ == "__main__": + main()