#!/usr/bin/env python3 """ remote_forensics_collect.py Collect forensic artifacts from a remote Linux host (Debian/RHEL aware) via SSH/SFTP. Features: - SSH auth via private key or password - SFTP file download and remote command capture (stdout -> files) - Optional sudo support (provide sudo password or require key-based sudo) - Default artifact lists for Debian-style and RHEL-style systems (easily extended) - Creates a local case directory like: ./case_123_20251110T1320Z/ - Writes a manifest.json and checksums.sha256 for integrity and audit - Detailed logging Requirements: - Python 3.8+ - Paramiko (`pip install paramiko`) - (optional) colorama for nicer console coloring, but not required NOTES & CAUTION: - Some artifacts (e.g., /etc/shadow) are extremely sensitive. Only collect when permitted. - Sudo password is sent only over the SSH channel to the remote host (not stored). """ from __future__ import annotations import argparse import getpass import json import logging import os import stat import hashlib import pathlib import sys import time from dataclasses import dataclass, field from datetime import datetime, timezone from typing import List, Dict, Optional, Tuple import paramiko # ----------------------- # Configuration dataclass # ----------------------- @dataclass class CollectorConfig: host: str port: int = 22 username: str = "root" key_filename: Optional[str] = None # path to private key file password: Optional[str] = None # SSH password (if not using key) sudo_password: Optional[str] = None # password to use with sudo -S if needed case_id: str = "000" outdir: pathlib.Path = pathlib.Path.cwd() timeout: int = 30 accept_unknown_host_key: bool = True artifact_set: str = "auto" # "debian", "rhel", or "auto" verbose: bool = False collect_sensitive: bool = False # if True, collects e.g., /etc/shadow extra_files: List[str] = field(default_factory=list) extra_cmds: List[str] = field(default_factory=list) # ----------------------- # Useful defaults # ----------------------- DEBIAN_FILES = [ "/var/log/auth.log", "/var/log/syslog", "/etc/ssh/sshd_config", "/etc/ssh/ssh_config", "/etc/apt/sources.list", "/var/log/apt/history.log", "/var/log/apt/term.log", "/etc/crontab", "/var/spool/cron/crontabs", # per-user cron (dir) "/etc/cron.d", "/etc/cron.daily", "/etc/cron.hourly", "/etc/passwd", "/etc/group", "/etc/sudoers", "/var/log/messages", # sometimes present ] RHEL_FILES = [ "/var/log/secure", "/var/log/messages", "/etc/ssh/sshd_config", "/etc/ssh/ssh_config", "/etc/yum.repos.d", "/etc/crontab", "/var/spool/cron", "/etc/cron.d", "/etc/passwd", "/etc/group", "/etc/sudoers", ] # extremely sensitive - only collect if explicitly allowed SENSITIVE_FILES = [ "/etc/shadow", "/root/.bash_history", ] # useful commands to snapshot system state DEFAULT_COMMANDS = [ "uname -a", "uptime", "who -a", "w", "ps auxww", "ss -tulpen || netstat -tulpen", "last -n 200", "df -h", "mount", "ip addr show", "ip route show", "lsblk -f", "cat /proc/cpuinfo", "cat /proc/meminfo", "journalctl -n 500 --no-pager", # systemd logs (may require sudo) "dmesg --level=err,warn || true", # kernel messages "crontab -l || true", "sudo -l || true", ] # ----------------------- # Helper functions # ----------------------- def iso_ts_now() -> str: return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") def sha256_of_file(path: pathlib.Path) -> str: h = hashlib.sha256() with path.open("rb") as fh: for chunk in iter(lambda: fh.read(8192), b""): h.update(chunk) return h.hexdigest() # ----------------------- # Main Collector Class # ----------------------- class RemoteForensicsCollector: def __init__(self, cfg: CollectorConfig): self.cfg = cfg self.logger = logging.getLogger("RFC") if cfg.verbose: logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s") else: logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") self.case_dir = self._make_case_dir() self.manifest: Dict[str, Dict] = { "case_id": cfg.case_id, "host": cfg.host, "port": cfg.port, "user": cfg.username, "started_at": iso_ts_now(), "collected": [], "errors": [], } self.ssh_client: Optional[paramiko.SSHClient] = None self.sftp: Optional[paramiko.SFTPClient] = None def _make_case_dir(self) -> pathlib.Path: ts = iso_ts_now() name = f"case_{self.cfg.case_id}_{ts}" path = (self.cfg.outdir / name).resolve() path.mkdir(parents=True, exist_ok=False) self.logger.info("Creating case directory: %s", path) return path def _connect(self) -> None: self.logger.info("Connecting to %s@%s:%d ...", self.cfg.username, self.cfg.host, self.cfg.port) client = paramiko.SSHClient() if self.cfg.accept_unknown_host_key: client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) else: client.load_system_host_keys() try: pkey = None key_filename = self.cfg.key_filename if key_filename: # Paramiko will attempt to use the key file automatically if key_filename passed in connect() self.logger.debug("Using key file: %s", key_filename) client.connect( hostname=self.cfg.host, port=self.cfg.port, username=self.cfg.username, password=self.cfg.password, key_filename=key_filename, timeout=self.cfg.timeout, look_for_keys=bool(key_filename is None), allow_agent=True, ) except Exception as exc: self.logger.exception("SSH connection failed: %s", exc) raise self.ssh_client = client self.sftp = client.open_sftp() self.logger.info("Connected and SFTP opened.") def _close(self) -> None: try: if self.sftp: self.sftp.close() if self.ssh_client: self.ssh_client.close() self.logger.info("SSH connection closed.") except Exception: self.logger.debug("Error closing SSH client", exc_info=True) def _remote_stat(self, path: str) -> Optional[paramiko.SFTPAttributes]: try: return self.sftp.stat(path) except IOError: return None def _download_file(self, remote_path: str, local_path: pathlib.Path) -> bool: """Download single remote file via SFTP. Create parent dirs as needed.""" try: local_path.parent.mkdir(parents=True, exist_ok=True) self.logger.debug("Attempting SFTP get: %s -> %s", remote_path, local_path) self.sftp.get(remote_path, str(local_path)) self.logger.info("Downloaded: %s", remote_path) self._record_collected(remote_path, local_path, method="sftp") return True except IOError as exc: self.logger.warning("SFTP get failed for %s: %s", remote_path, exc) return False except Exception as exc: self.logger.exception("Unexpected error downloading %s: %s", remote_path, exc) return False def _record_collected(self, remote_path: str, local_path: pathlib.Path, method: str = "cmd"): entry = { "remote_path": remote_path, "local_path": str(local_path.relative_to(self.case_dir)), "method": method, "size": local_path.stat().st_size if local_path.exists() else None, "sha256": sha256_of_file(local_path) if local_path.exists() else None, "collected_at": iso_ts_now(), } self.manifest["collected"].append(entry) def _run_remote_cmd(self, cmd: str, sudo: bool = False, capture_stdout_to: Optional[pathlib.Path] = None) -> Tuple[int, str, str]: """ Run command via SSH. If sudo=True and sudo_password provided, uses 'sudo -S -p "" ' and feeds password. Returns (exit_status, stdout, stderr) """ if self.ssh_client is None: raise RuntimeError("SSH client not connected") effective_cmd = cmd need_password = False if sudo: # Use -S (read password from stdin) and -p "" to avoid prompt text effective_cmd = f"sudo -S -p '' {cmd}" need_password = bool(self.cfg.sudo_password) self.logger.debug("Running remote cmd: %s (sudo=%s)", cmd, sudo) stdin, stdout, stderr = self.ssh_client.exec_command(effective_cmd, timeout=self.cfg.timeout, get_pty=True) if need_password: # send sudo password followed by newline self.logger.debug("Sending sudo password to remote") stdin.write(self.cfg.sudo_password + "\n") stdin.flush() stdout_str = stdout.read().decode(errors="replace") stderr_str = stderr.read().decode(errors="replace") exit_status = stdout.channel.recv_exit_status() self.logger.debug("Command exit: %s", exit_status) if capture_stdout_to: capture_stdout_to.parent.mkdir(parents=True, exist_ok=True) with capture_stdout_to.open("wb") as fh: fh.write(stdout_str.encode()) self._record_collected(f"{'(sudo) ' if sudo else ''}{cmd}", capture_stdout_to, method="cmd") return exit_status, stdout_str, stderr_str def detect_os_family(self) -> str: """Try to detect if target is Debian-family or RHEL-family. Returns 'debian', 'rhel', or 'unknown'.""" try: self.logger.debug("Detecting OS family") rc, out, err = self._run_remote_cmd("cat /etc/os-release || true", sudo=False) if "ID_LIKE=" in out: if "debian" in out.lower(): return "debian" if "rhel" in out.lower() or "fedora" in out.lower() or "centos" in out.lower(): return "rhel" if "ID=" in out: if "debian" in out.lower() or "ubuntu" in out.lower(): return "debian" if "rhel" in out.lower() or "fedora" in out.lower() or "centos" in out.lower() or "redhat" in out.lower(): return "rhel" # fallback - check for yum/dnf/apt rc, out, err = self._run_remote_cmd("which apt-get || which yum || which dnf || true") if "apt-get" in out: return "debian" if "yum" in out or "dnf" in out: return "rhel" except Exception: self.logger.debug("OS detection failed", exc_info=True) return "unknown" def collect_files(self, file_list: List[str]) -> None: """Try to get each entry. If it's a dir, recursively fetch contents (non-recursive SFTP requires walking)""" for remote_path in file_list: try: statobj = self._remote_stat(remote_path) if statobj is None: self.logger.info("Not present on remote: %s", remote_path) self.manifest["errors"].append({"path": remote_path, "error": "not found"}) continue # If directory, walk & fetch files if stat.S_ISDIR(statobj.st_mode): self.logger.info("Remote path is directory — walking: %s", remote_path) self._download_directory(remote_path) else: local_path = self.case_dir / "files" / remote_path.lstrip("/") ok = self._download_file(remote_path, local_path) if not ok: # fallback: try to cat the file via remote command (use sudo) self.logger.info("Attempting fallback 'cat' for: %s", remote_path) out_file = self.case_dir / "files" / (remote_path.lstrip("/").replace("/", "_") + ".txt") rc, out, err = self._run_remote_cmd(f"cat {remote_path}", sudo=True, capture_stdout_to=out_file) if rc != 0: self.logger.warning("Fallback cat failed for %s (rc=%s)", remote_path, rc) self.manifest["errors"].append({"path": remote_path, "error": f"download & cat failed rc={rc}"}) # continue to next except Exception as exc: self.logger.exception("Error collecting file %s: %s", remote_path, exc) self.manifest["errors"].append({"path": remote_path, "error": str(exc)}) def _download_directory(self, remote_dir: str) -> None: """Recursively walk a remote directory and download files. Danger: can be large; use responsibly.""" # Simple recursive walker using SFTP.listdir_attr def walk(rdir: str, local_parent: pathlib.Path): try: entries = self.sftp.listdir_attr(rdir) except IOError as exc: self.logger.warning("Failed to listdir %s: %s", rdir, exc) self.manifest["errors"].append({"path": rdir, "error": str(exc)}) return for e in entries: name = e.filename rpath = rdir.rstrip("/") + "/" + name if stat.S_ISDIR(e.st_mode): walk(rpath, local_parent / name) else: local_file = local_parent / name local_file.parent.mkdir(parents=True, exist_ok=True) try: self.logger.debug("Downloading file in dir: %s", rpath) self.sftp.get(rpath, str(local_file)) self._record_collected(rpath, local_file, method="sftp") except Exception as exc: self.logger.warning("Failed downloading %s: %s", rpath, exc) self.manifest["errors"].append({"path": rpath, "error": str(exc)}) local_root = self.case_dir / "files" / remote_dir.lstrip("/") walk(remote_dir, local_root) def collect_commands(self, commands: List[str]) -> None: cmds_dir = self.case_dir / "commands" for i, cmd in enumerate(commands, start=1): # if command likely requires sudo (journalctl) we will try sudo first and then fallback without cmd_label = cmd.replace(" ", "_").replace("/", "_").replace("|", "_")[:80] out_file = cmds_dir / f"{i:02d}_{cmd_label}.txt" try: # prefer sudo if configured sudo_first = False if "journalctl" in cmd or cmd.startswith("dmesg") or "ss" in cmd: sudo_first = True if sudo_first: rc, out, err = self._run_remote_cmd(cmd, sudo=True, capture_stdout_to=out_file) if rc != 0: self.logger.debug("Sudo cmd failed rc=%s, retrying without sudo: %s", rc, cmd) rc2, out2, err2 = self._run_remote_cmd(cmd, sudo=False, capture_stdout_to=out_file) else: rc, out, err = self._run_remote_cmd(cmd, sudo=False, capture_stdout_to=out_file) except Exception as exc: self.logger.exception("Error running command %s: %s", cmd, exc) self.manifest["errors"].append({"cmd": cmd, "error": str(exc)}) def finalize(self) -> None: # manifest and checksums self.manifest["finished_at"] = iso_ts_now() manifest_path = self.case_dir / "manifest.json" with manifest_path.open("w") as fh: json.dump(self.manifest, fh, indent=2) self.logger.info("Wrote manifest: %s", manifest_path) # create checksums file of everything collected under case_dir checksums_path = self.case_dir / "checksums.sha256" with checksums_path.open("w") as outfh: for p in sorted(self.case_dir.rglob("*")): if p.is_file(): h = sha256_of_file(p) outfh.write(f"{h} {p.relative_to(self.case_dir)}\n") self.logger.info("Wrote checksums: %s", checksums_path) def run(self) -> None: # Connect try: self._connect() except Exception as exc: self.logger.error("Could not connect to remote host: %s", exc) self.manifest["errors"].append({"connect": str(exc)}) self.finalize() return try: # detect OS if requested os_family = "unknown" if self.cfg.artifact_set == "auto": os_family = self.detect_os_family() self.logger.info("Detected OS family: %s", os_family) else: os_family = self.cfg.artifact_set # prepare artifact list files_to_collect = [] if os_family == "debian": files_to_collect.extend(DEBIAN_FILES) elif os_family == "rhel": files_to_collect.extend(RHEL_FILES) else: files_to_collect.extend(DEBIAN_FILES + RHEL_FILES) # include extra files user requested if self.cfg.extra_files: files_to_collect.extend(self.cfg.extra_files) if self.cfg.collect_sensitive: files_to_collect.extend(SENSITIVE_FILES) else: # Only log that we skipped them for sf in SENSITIVE_FILES: self.logger.info("Skipping sensitive file by default (set --collect-sensitive to include): %s", sf) # dedupe files_to_collect = sorted(list(dict.fromkeys(files_to_collect))) # collect files (SFTP preferred) self.collect_files(files_to_collect) # run commands cmds = DEFAULT_COMMANDS.copy() if self.cfg.extra_cmds: cmds.extend(self.cfg.extra_cmds) # dedupe commands seen = set() cmds = [c for c in cmds if not (c in seen or seen.add(c))] self.collect_commands(cmds) except Exception as exc: self.logger.exception("Unhandled exception during collection: %s", exc) self.manifest["errors"].append({"unhandled": str(exc)}) finally: self._close() self.finalize() # ----------------------- # CLI # ----------------------- def parse_args() -> CollectorConfig: ap = argparse.ArgumentParser(description="Remote Forensics Collector (SSH/SFTP)") ap.add_argument("host", help="remote host (IP or hostname)") ap.add_argument("--case", "-c", dest="case_id", default=f"{int(time.time())}", help="case id/name") ap.add_argument("--user", "-u", dest="username", default=getpass.getuser(), help="username to SSH as") ap.add_argument("--port", "-p", dest="port", type=int, default=22) ap.add_argument("--key", dest="key_filename", help="private key file path (e.g. ~/.ssh/id_rsa)") ap.add_argument("--password", dest="password", action="store_true", help="prompt for SSH password (if not using key)") ap.add_argument("--sudo", dest="sudo", action="store_true", help="prompt for sudo password to use for privileged commands (if needed)") ap.add_argument("--outdir", dest="outdir", default=".", help="where to place case_ directory") ap.add_argument("--artifact-set", dest="artifact_set", choices=["auto", "debian", "rhel", "all"], default="auto", help="artifact list to use") ap.add_argument("--collect-sensitive", dest="collect_sensitive", action="store_true", help="also collect sensitive files (e.g., /etc/shadow) - use with caution") ap.add_argument("--extra-file", dest="extra_files", action="append", help="extra remote file or directory to pull (can be specified multiple times)") ap.add_argument("--extra-cmd", dest="extra_cmds", action="append", help="extra remote command to run and save output (can be specified multiple times)") ap.add_argument("--no-accept-hostkey", dest="accept_unknown_host_key", action="store_false", help="do NOT auto-accept unknown host key (load system keys instead)") ap.add_argument("--verbose", "-v", dest="verbose", action="store_true") args = ap.parse_args() ssh_pass = None if args.password: ssh_pass = getpass.getpass(prompt=f"SSH password for {args.username}@{args.host}: ") sudo_pass = None if args.sudo: sudo_pass = getpass.getpass(prompt="sudo password for remote user (will be sent over SSH channel if needed): ") outdir_path = pathlib.Path(args.outdir).expanduser().resolve() cfg = CollectorConfig( host=args.host, port=args.port, username=args.username, key_filename=args.key_filename if args.key_filename else None, password=ssh_pass, sudo_password=sudo_pass, case_id=args.case_id, outdir=outdir_path, accept_unknown_host_key=args.accept_unknown_host_key, artifact_set=(args.artifact_set if args.artifact_set != "all" else "auto"), verbose=args.verbose, collect_sensitive=args.collect_sensitive, extra_files=args.extra_files or [], extra_cmds=args.extra_cmds or [], ) return cfg def main(): cfg = parse_args() collector = RemoteForensicsCollector(cfg) try: collector.run() except KeyboardInterrupt: print("\nInterrupted by user - finalizing partial results") collector.finalize() sys.exit(1) if __name__ == "__main__": main()