first commit
This commit is contained in:
1
src/__init__.py
Normal file
1
src/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# Phishtest - Phishing Test System for Proofpoint TAP Testing
|
||||
27
src/config.py
Normal file
27
src/config.py
Normal file
@@ -0,0 +1,27 @@
|
||||
"""Configuration management for phishtest."""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
# Load .env file from project root
|
||||
PROJECT_ROOT = Path(__file__).parent.parent
|
||||
load_dotenv(PROJECT_ROOT / ".env")
|
||||
|
||||
|
||||
def get_config() -> dict:
|
||||
"""Get configuration from environment variables."""
|
||||
return {
|
||||
"smtp2go_api_key": os.getenv("SMTP2GO_API_KEY"),
|
||||
"sender_email": os.getenv("SENDER_EMAIL"),
|
||||
"sender_name": os.getenv("SENDER_NAME", "Phishing Test"),
|
||||
"company_name": os.getenv("COMPANY_NAME", "Your Company"),
|
||||
}
|
||||
|
||||
|
||||
def validate_config(config: dict) -> list[str]:
|
||||
"""Validate required configuration values. Returns list of missing keys."""
|
||||
required = ["smtp2go_api_key", "sender_email"]
|
||||
missing = [key for key in required if not config.get(key)]
|
||||
return missing
|
||||
77
src/csv_loader.py
Normal file
77
src/csv_loader.py
Normal file
@@ -0,0 +1,77 @@
|
||||
"""CSV file loading for recipient lists."""
|
||||
|
||||
import csv
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
EMAIL_REGEX = re.compile(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
|
||||
|
||||
|
||||
class Recipient:
|
||||
"""Represents an email recipient."""
|
||||
|
||||
def __init__(self, email: str, name: str = ""):
|
||||
self.email = email
|
||||
self.name = name
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"recipient_email": self.email,
|
||||
"recipient_name": self.name or self.email.split("@")[0],
|
||||
}
|
||||
|
||||
|
||||
def validate_email(email: str) -> bool:
|
||||
"""Validate email format."""
|
||||
return bool(EMAIL_REGEX.match(email))
|
||||
|
||||
|
||||
def load_recipients_from_csv(csv_path: str | Path) -> list[Recipient]:
|
||||
"""
|
||||
Load recipients from a CSV file.
|
||||
|
||||
Expected columns: email (required), name (optional)
|
||||
Returns list of Recipient objects.
|
||||
"""
|
||||
csv_path = Path(csv_path)
|
||||
|
||||
if not csv_path.exists():
|
||||
raise FileNotFoundError(f"CSV file not found: {csv_path}")
|
||||
|
||||
recipients = []
|
||||
errors = []
|
||||
|
||||
with open(csv_path, newline="", encoding="utf-8") as f:
|
||||
reader = csv.DictReader(f)
|
||||
|
||||
# Check for required column
|
||||
if "email" not in (reader.fieldnames or []):
|
||||
raise ValueError("CSV file must have an 'email' column")
|
||||
|
||||
for row_num, row in enumerate(reader, start=2): # start=2 accounts for header
|
||||
email = row.get("email", "").strip()
|
||||
name = row.get("name", "").strip()
|
||||
|
||||
if not email:
|
||||
errors.append(f"Row {row_num}: empty email")
|
||||
continue
|
||||
|
||||
if not validate_email(email):
|
||||
errors.append(f"Row {row_num}: invalid email format '{email}'")
|
||||
continue
|
||||
|
||||
recipients.append(Recipient(email=email, name=name))
|
||||
|
||||
if errors:
|
||||
error_msg = "\n".join(errors)
|
||||
raise ValueError(f"CSV validation errors:\n{error_msg}")
|
||||
|
||||
return recipients
|
||||
|
||||
|
||||
def create_recipient(email: str, name: str = "") -> Recipient:
|
||||
"""Create a single recipient, validating the email."""
|
||||
if not validate_email(email):
|
||||
raise ValueError(f"Invalid email format: {email}")
|
||||
return Recipient(email=email, name=name)
|
||||
103
src/email_sender.py
Normal file
103
src/email_sender.py
Normal file
@@ -0,0 +1,103 @@
|
||||
"""Email sending via smtp2go API."""
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from smtp2go.core import Smtp2goClient
|
||||
|
||||
|
||||
@dataclass
|
||||
class SendResult:
|
||||
"""Result of sending an email."""
|
||||
|
||||
recipient: str
|
||||
success: bool
|
||||
error: str | None = None
|
||||
|
||||
|
||||
class EmailSender:
|
||||
"""Sends emails via smtp2go API."""
|
||||
|
||||
def __init__(self, api_key: str):
|
||||
self.client = Smtp2goClient(api_key=api_key)
|
||||
|
||||
def send_email(
|
||||
self,
|
||||
sender_email: str,
|
||||
sender_name: str,
|
||||
recipient_email: str,
|
||||
subject: str,
|
||||
html_body: str = "",
|
||||
text_body: str = "",
|
||||
) -> SendResult:
|
||||
"""Send a single email."""
|
||||
# Format sender with display name
|
||||
sender = f"{sender_name} <{sender_email}>" if sender_name else sender_email
|
||||
|
||||
payload = {
|
||||
"sender": sender,
|
||||
"recipients": [recipient_email],
|
||||
"subject": subject,
|
||||
}
|
||||
|
||||
if html_body:
|
||||
payload["html"] = html_body
|
||||
if text_body:
|
||||
payload["text"] = text_body
|
||||
|
||||
# Must have at least one body
|
||||
if not html_body and not text_body:
|
||||
return SendResult(
|
||||
recipient=recipient_email,
|
||||
success=False,
|
||||
error="Email must have HTML or text body",
|
||||
)
|
||||
|
||||
try:
|
||||
response = self.client.send(**payload)
|
||||
|
||||
if response.success:
|
||||
return SendResult(recipient=recipient_email, success=True)
|
||||
else:
|
||||
error_msg = ", ".join(response.errors) if response.errors else "Unknown error"
|
||||
return SendResult(
|
||||
recipient=recipient_email, success=False, error=error_msg
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
return SendResult(recipient=recipient_email, success=False, error=str(e))
|
||||
|
||||
def send_batch(
|
||||
self,
|
||||
sender_email: str,
|
||||
sender_name: str,
|
||||
recipients: list[dict],
|
||||
template_renderer: callable,
|
||||
) -> list[SendResult]:
|
||||
"""
|
||||
Send emails to multiple recipients.
|
||||
|
||||
Args:
|
||||
sender_email: Sender email address
|
||||
sender_name: Sender display name
|
||||
recipients: List of recipient dicts with keys like 'recipient_email', 'recipient_name'
|
||||
template_renderer: Function that takes recipient dict and returns (subject, html, text)
|
||||
|
||||
Returns:
|
||||
List of SendResult objects
|
||||
"""
|
||||
results = []
|
||||
|
||||
for recipient in recipients:
|
||||
subject, html_body, text_body = template_renderer(recipient)
|
||||
|
||||
result = self.send_email(
|
||||
sender_email=sender_email,
|
||||
sender_name=sender_name,
|
||||
recipient_email=recipient["recipient_email"],
|
||||
subject=subject,
|
||||
html_body=html_body,
|
||||
text_body=text_body,
|
||||
)
|
||||
results.append(result)
|
||||
|
||||
return results
|
||||
171
src/main.py
Normal file
171
src/main.py
Normal file
@@ -0,0 +1,171 @@
|
||||
"""CLI interface for phishtest."""
|
||||
|
||||
import sys
|
||||
|
||||
import click
|
||||
|
||||
from .config import get_config, validate_config
|
||||
from .csv_loader import create_recipient, load_recipients_from_csv
|
||||
from .email_sender import EmailSender
|
||||
from .template_loader import list_templates, load_template, render_template
|
||||
|
||||
|
||||
@click.group()
|
||||
def cli():
|
||||
"""Phishtest - Phishing email testing tool for Proofpoint TAP."""
|
||||
pass
|
||||
|
||||
|
||||
@cli.command("list-templates")
|
||||
def list_templates_cmd():
|
||||
"""List all available email templates."""
|
||||
templates = list_templates()
|
||||
|
||||
if not templates:
|
||||
click.echo("No templates found in templates/ directory.")
|
||||
click.echo("Create a template folder with metadata.json, template.html, and template.txt")
|
||||
return
|
||||
|
||||
click.echo("Available templates:")
|
||||
for name in templates:
|
||||
try:
|
||||
template = load_template(name)
|
||||
click.echo(f" - {name}: {template.subject}")
|
||||
except Exception as e:
|
||||
click.echo(f" - {name}: (error loading: {e})")
|
||||
|
||||
|
||||
@cli.command("preview")
|
||||
@click.option("--template", "-t", required=True, help="Template name to preview")
|
||||
@click.option("--name", "-n", default="Test User", help="Recipient name for preview")
|
||||
@click.option("--email", "-e", default="test@example.com", help="Recipient email for preview")
|
||||
def preview_cmd(template: str, name: str, email: str):
|
||||
"""Preview a template with sample data."""
|
||||
config = get_config()
|
||||
|
||||
try:
|
||||
tmpl = load_template(template)
|
||||
except ValueError as e:
|
||||
click.echo(f"Error: {e}", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
variables = {
|
||||
"recipient_name": name,
|
||||
"recipient_email": email,
|
||||
"company_name": config["company_name"],
|
||||
}
|
||||
|
||||
subject, html_body, text_body = render_template(tmpl, variables)
|
||||
|
||||
click.echo("=" * 60)
|
||||
click.echo(f"Subject: {subject}")
|
||||
click.echo("=" * 60)
|
||||
|
||||
if html_body:
|
||||
click.echo("\n--- HTML Body ---")
|
||||
click.echo(html_body)
|
||||
|
||||
if text_body:
|
||||
click.echo("\n--- Text Body ---")
|
||||
click.echo(text_body)
|
||||
|
||||
|
||||
@cli.command("send")
|
||||
@click.option("--template", "-t", required=True, help="Template name to use")
|
||||
@click.option("--to", "recipient_email", help="Single recipient email address")
|
||||
@click.option("--csv", "csv_file", type=click.Path(exists=True), help="CSV file with recipients")
|
||||
@click.option("--name", "-n", default="", help="Recipient name (for single recipient)")
|
||||
@click.option("--dry-run", is_flag=True, help="Preview without sending")
|
||||
def send_cmd(template: str, recipient_email: str, csv_file: str, name: str, dry_run: bool):
|
||||
"""Send phishing test emails."""
|
||||
# Validate we have either --to or --csv
|
||||
if not recipient_email and not csv_file:
|
||||
click.echo("Error: Must specify either --to or --csv", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
if recipient_email and csv_file:
|
||||
click.echo("Error: Cannot specify both --to and --csv", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
# Validate config
|
||||
config = get_config()
|
||||
missing = validate_config(config)
|
||||
if missing:
|
||||
click.echo(f"Error: Missing required config: {', '.join(missing)}", err=True)
|
||||
click.echo("Set these in .env file or as environment variables.")
|
||||
sys.exit(1)
|
||||
|
||||
# Load template
|
||||
try:
|
||||
tmpl = load_template(template)
|
||||
except ValueError as e:
|
||||
click.echo(f"Error: {e}", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
# Load recipients
|
||||
recipients = []
|
||||
try:
|
||||
if recipient_email:
|
||||
recipient = create_recipient(recipient_email, name)
|
||||
recipients = [recipient]
|
||||
else:
|
||||
recipients = load_recipients_from_csv(csv_file)
|
||||
except (ValueError, FileNotFoundError) as e:
|
||||
click.echo(f"Error: {e}", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
if not recipients:
|
||||
click.echo("No recipients to send to.")
|
||||
sys.exit(1)
|
||||
|
||||
click.echo(f"Template: {template}")
|
||||
click.echo(f"Recipients: {len(recipients)}")
|
||||
click.echo(f"Sender: {config['sender_name']} <{config['sender_email']}>")
|
||||
|
||||
if dry_run:
|
||||
click.echo("\n[DRY RUN] Would send to:")
|
||||
for r in recipients:
|
||||
click.echo(f" - {r.name} <{r.email}>" if r.name else f" - {r.email}")
|
||||
return
|
||||
|
||||
# Confirm before sending
|
||||
if not click.confirm("\nProceed with sending?"):
|
||||
click.echo("Aborted.")
|
||||
return
|
||||
|
||||
# Send emails
|
||||
sender = EmailSender(api_key=config["smtp2go_api_key"])
|
||||
|
||||
# Get sender name from template or config
|
||||
sender_name = tmpl.sender_name or config["sender_name"]
|
||||
|
||||
def render_for_recipient(recipient_dict: dict) -> tuple[str, str, str]:
|
||||
variables = {
|
||||
**recipient_dict,
|
||||
"company_name": config["company_name"],
|
||||
}
|
||||
return render_template(tmpl, variables)
|
||||
|
||||
recipient_dicts = [r.to_dict() for r in recipients]
|
||||
results = sender.send_batch(
|
||||
sender_email=config["sender_email"],
|
||||
sender_name=sender_name,
|
||||
recipients=recipient_dicts,
|
||||
template_renderer=render_for_recipient,
|
||||
)
|
||||
|
||||
# Report results
|
||||
success_count = sum(1 for r in results if r.success)
|
||||
fail_count = len(results) - success_count
|
||||
|
||||
click.echo(f"\nResults: {success_count} sent, {fail_count} failed")
|
||||
|
||||
for result in results:
|
||||
if result.success:
|
||||
click.echo(f" [OK] {result.recipient}")
|
||||
else:
|
||||
click.echo(f" [FAIL] {result.recipient}: {result.error}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
||||
111
src/template_loader.py
Normal file
111
src/template_loader.py
Normal file
@@ -0,0 +1,111 @@
|
||||
"""Template loading and rendering for phishing emails."""
|
||||
|
||||
import json
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
from jinja2 import Environment, FileSystemLoader, select_autoescape
|
||||
|
||||
|
||||
PROJECT_ROOT = Path(__file__).parent.parent
|
||||
TEMPLATES_DIR = PROJECT_ROOT / "templates"
|
||||
|
||||
|
||||
class Template:
|
||||
"""Represents a phishing email template."""
|
||||
|
||||
def __init__(self, name: str, metadata: dict, html_content: str, text_content: str):
|
||||
self.name = name
|
||||
self.metadata = metadata
|
||||
self.html_content = html_content
|
||||
self.text_content = text_content
|
||||
|
||||
@property
|
||||
def subject(self) -> str:
|
||||
return self.metadata.get("subject", "No Subject")
|
||||
|
||||
@property
|
||||
def sender_name(self) -> str | None:
|
||||
return self.metadata.get("sender_name")
|
||||
|
||||
|
||||
def list_templates() -> list[str]:
|
||||
"""List all available template names."""
|
||||
if not TEMPLATES_DIR.exists():
|
||||
return []
|
||||
|
||||
templates = []
|
||||
for path in TEMPLATES_DIR.iterdir():
|
||||
if path.is_dir() and (path / "metadata.json").exists():
|
||||
templates.append(path.name)
|
||||
|
||||
return sorted(templates)
|
||||
|
||||
|
||||
def load_template(name: str) -> Template:
|
||||
"""Load a template by name."""
|
||||
template_dir = TEMPLATES_DIR / name
|
||||
|
||||
if not template_dir.exists():
|
||||
raise ValueError(f"Template '{name}' not found in {TEMPLATES_DIR}")
|
||||
|
||||
# Load metadata
|
||||
metadata_path = template_dir / "metadata.json"
|
||||
if not metadata_path.exists():
|
||||
raise ValueError(f"Template '{name}' missing metadata.json")
|
||||
|
||||
with open(metadata_path) as f:
|
||||
metadata = json.load(f)
|
||||
|
||||
# Load HTML template
|
||||
html_path = template_dir / "template.html"
|
||||
html_content = ""
|
||||
if html_path.exists():
|
||||
html_content = html_path.read_text()
|
||||
|
||||
# Load text template
|
||||
text_path = template_dir / "template.txt"
|
||||
text_content = ""
|
||||
if text_path.exists():
|
||||
text_content = text_path.read_text()
|
||||
|
||||
if not html_content and not text_content:
|
||||
raise ValueError(f"Template '{name}' has no template.html or template.txt")
|
||||
|
||||
return Template(name, metadata, html_content, text_content)
|
||||
|
||||
|
||||
def render_template(template: Template, variables: dict) -> tuple[str, str, str]:
|
||||
"""
|
||||
Render a template with the given variables.
|
||||
|
||||
Returns: (subject, html_body, text_body)
|
||||
"""
|
||||
env = Environment(
|
||||
loader=FileSystemLoader(TEMPLATES_DIR / template.name),
|
||||
autoescape=select_autoescape(['html'])
|
||||
)
|
||||
|
||||
# Add current date to variables
|
||||
render_vars = {
|
||||
"date": datetime.now().strftime("%B %d, %Y"),
|
||||
**variables
|
||||
}
|
||||
|
||||
# Render subject
|
||||
subject_template = env.from_string(template.subject)
|
||||
subject = subject_template.render(**render_vars)
|
||||
|
||||
# Render HTML body
|
||||
html_body = ""
|
||||
if template.html_content:
|
||||
html_template = env.from_string(template.html_content)
|
||||
html_body = html_template.render(**render_vars)
|
||||
|
||||
# Render text body
|
||||
text_body = ""
|
||||
if template.text_content:
|
||||
text_template = env.from_string(template.text_content)
|
||||
text_body = text_template.render(**render_vars)
|
||||
|
||||
return subject, html_body, text_body
|
||||
Reference in New Issue
Block a user