295 lines
9.7 KiB
Python
295 lines
9.7 KiB
Python
"""
|
|
Webhook Template Service
|
|
|
|
Provides Jinja2 template rendering for webhook payloads with a sandboxed
|
|
environment and comprehensive context building from scan/alert/rule data.
|
|
"""
|
|
|
|
from jinja2 import Environment, BaseLoader, TemplateError, meta
|
|
from jinja2.sandbox import SandboxedEnvironment
|
|
import json
|
|
from typing import Dict, Any, Optional, Tuple
|
|
from datetime import datetime
|
|
|
|
|
|
class TemplateService:
|
|
"""
|
|
Service for rendering webhook templates safely using Jinja2.
|
|
|
|
Features:
|
|
- Sandboxed Jinja2 environment to prevent code execution
|
|
- Rich context with alert, scan, rule, service, cert data
|
|
- Support for both JSON and text output formats
|
|
- Template validation and error handling
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the sandboxed Jinja2 environment."""
|
|
self.env = SandboxedEnvironment(
|
|
loader=BaseLoader(),
|
|
autoescape=False, # We control the output format
|
|
trim_blocks=True,
|
|
lstrip_blocks=True
|
|
)
|
|
|
|
# Add custom filters
|
|
self.env.filters['isoformat'] = self._isoformat_filter
|
|
|
|
def _isoformat_filter(self, value):
|
|
"""Custom filter to convert datetime to ISO format."""
|
|
if isinstance(value, datetime):
|
|
return value.isoformat()
|
|
return str(value)
|
|
|
|
def build_context(
|
|
self,
|
|
alert,
|
|
scan,
|
|
rule,
|
|
app_name: str = "SneakyScanner",
|
|
app_version: str = "1.0.0",
|
|
app_url: str = "https://github.com/sneakygeek/SneakyScan"
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Build the template context from alert, scan, and rule objects.
|
|
|
|
Args:
|
|
alert: Alert model instance
|
|
scan: Scan model instance
|
|
rule: AlertRule model instance
|
|
app_name: Application name
|
|
app_version: Application version
|
|
app_url: Application repository URL
|
|
|
|
Returns:
|
|
Dictionary with all available template variables
|
|
"""
|
|
context = {
|
|
"alert": {
|
|
"id": alert.id,
|
|
"type": alert.alert_type,
|
|
"severity": alert.severity,
|
|
"message": alert.message,
|
|
"ip_address": alert.ip_address,
|
|
"port": alert.port,
|
|
"acknowledged": alert.acknowledged,
|
|
"acknowledged_at": alert.acknowledged_at,
|
|
"acknowledged_by": alert.acknowledged_by,
|
|
"created_at": alert.created_at,
|
|
"email_sent": alert.email_sent,
|
|
"email_sent_at": alert.email_sent_at,
|
|
"webhook_sent": alert.webhook_sent,
|
|
"webhook_sent_at": alert.webhook_sent_at,
|
|
},
|
|
"scan": {
|
|
"id": scan.id,
|
|
"title": scan.title,
|
|
"timestamp": scan.timestamp,
|
|
"duration": scan.duration,
|
|
"status": scan.status,
|
|
"config_file": scan.config_file,
|
|
"triggered_by": scan.triggered_by,
|
|
"started_at": scan.started_at,
|
|
"completed_at": scan.completed_at,
|
|
"error_message": scan.error_message,
|
|
},
|
|
"rule": {
|
|
"id": rule.id,
|
|
"name": rule.name,
|
|
"type": rule.rule_type,
|
|
"threshold": rule.threshold,
|
|
"severity": rule.severity,
|
|
"enabled": rule.enabled,
|
|
},
|
|
"app": {
|
|
"name": app_name,
|
|
"version": app_version,
|
|
"url": app_url,
|
|
},
|
|
"timestamp": datetime.utcnow(),
|
|
}
|
|
|
|
# Add service information if available (for service-related alerts)
|
|
# This would require additional context from the caller
|
|
# For now, we'll add placeholder support
|
|
context["service"] = None
|
|
context["cert"] = None
|
|
context["tls"] = None
|
|
|
|
return context
|
|
|
|
def render(
|
|
self,
|
|
template_string: str,
|
|
context: Dict[str, Any],
|
|
template_format: str = 'json'
|
|
) -> Tuple[str, Optional[str]]:
|
|
"""
|
|
Render a template with the given context.
|
|
|
|
Args:
|
|
template_string: The Jinja2 template string
|
|
context: Template context dictionary
|
|
template_format: Output format ('json' or 'text')
|
|
|
|
Returns:
|
|
Tuple of (rendered_output, error_message)
|
|
- If successful: (rendered_string, None)
|
|
- If failed: (None, error_message)
|
|
"""
|
|
try:
|
|
template = self.env.from_string(template_string)
|
|
rendered = template.render(context)
|
|
|
|
# For JSON format, validate that the output is valid JSON
|
|
if template_format == 'json':
|
|
try:
|
|
# Parse to validate JSON structure
|
|
json.loads(rendered)
|
|
except json.JSONDecodeError as e:
|
|
return None, f"Template rendered invalid JSON: {str(e)}"
|
|
|
|
return rendered, None
|
|
|
|
except TemplateError as e:
|
|
return None, f"Template rendering error: {str(e)}"
|
|
except Exception as e:
|
|
return None, f"Unexpected error rendering template: {str(e)}"
|
|
|
|
def validate_template(
|
|
self,
|
|
template_string: str,
|
|
template_format: str = 'json'
|
|
) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Validate a template without rendering it.
|
|
|
|
Args:
|
|
template_string: The Jinja2 template string to validate
|
|
template_format: Expected output format ('json' or 'text')
|
|
|
|
Returns:
|
|
Tuple of (is_valid, error_message)
|
|
- If valid: (True, None)
|
|
- If invalid: (False, error_message)
|
|
"""
|
|
try:
|
|
# Parse the template to check syntax
|
|
self.env.parse(template_string)
|
|
|
|
# For JSON templates, check if it looks like valid JSON structure
|
|
# (this is a basic check - full validation happens during render)
|
|
if template_format == 'json':
|
|
# Just check for basic JSON structure markers
|
|
stripped = template_string.strip()
|
|
if not (stripped.startswith('{') or stripped.startswith('[')):
|
|
return False, "JSON template must start with { or ["
|
|
|
|
return True, None
|
|
|
|
except TemplateError as e:
|
|
return False, f"Template syntax error: {str(e)}"
|
|
except Exception as e:
|
|
return False, f"Template validation error: {str(e)}"
|
|
|
|
def get_template_variables(self, template_string: str) -> set:
|
|
"""
|
|
Extract all variables used in a template.
|
|
|
|
Args:
|
|
template_string: The Jinja2 template string
|
|
|
|
Returns:
|
|
Set of variable names used in the template
|
|
"""
|
|
try:
|
|
ast = self.env.parse(template_string)
|
|
return meta.find_undeclared_variables(ast)
|
|
except Exception:
|
|
return set()
|
|
|
|
def render_test_payload(
|
|
self,
|
|
template_string: str,
|
|
template_format: str = 'json'
|
|
) -> Tuple[str, Optional[str]]:
|
|
"""
|
|
Render a template with sample/test data for preview purposes.
|
|
|
|
Args:
|
|
template_string: The Jinja2 template string
|
|
template_format: Output format ('json' or 'text')
|
|
|
|
Returns:
|
|
Tuple of (rendered_output, error_message)
|
|
"""
|
|
# Create sample context data
|
|
sample_context = {
|
|
"alert": {
|
|
"id": 123,
|
|
"type": "unexpected_port",
|
|
"severity": "warning",
|
|
"message": "Unexpected port 8080 found open on 192.168.1.100",
|
|
"ip_address": "192.168.1.100",
|
|
"port": 8080,
|
|
"acknowledged": False,
|
|
"acknowledged_at": None,
|
|
"acknowledged_by": None,
|
|
"created_at": datetime.utcnow(),
|
|
"email_sent": False,
|
|
"email_sent_at": None,
|
|
"webhook_sent": False,
|
|
"webhook_sent_at": None,
|
|
},
|
|
"scan": {
|
|
"id": 456,
|
|
"title": "Production Infrastructure Scan",
|
|
"timestamp": datetime.utcnow(),
|
|
"duration": 125.5,
|
|
"status": "completed",
|
|
"config_file": "production-scan.yaml",
|
|
"triggered_by": "schedule",
|
|
"started_at": datetime.utcnow(),
|
|
"completed_at": datetime.utcnow(),
|
|
"error_message": None,
|
|
},
|
|
"rule": {
|
|
"id": 789,
|
|
"name": "Unexpected Port Detection",
|
|
"type": "unexpected_port",
|
|
"threshold": None,
|
|
"severity": "warning",
|
|
"enabled": True,
|
|
},
|
|
"service": {
|
|
"name": "http",
|
|
"product": "nginx",
|
|
"version": "1.20.0",
|
|
},
|
|
"cert": {
|
|
"subject": "CN=example.com",
|
|
"issuer": "CN=Let's Encrypt Authority X3",
|
|
"days_until_expiry": 15,
|
|
},
|
|
"app": {
|
|
"name": "SneakyScanner",
|
|
"version": "1.0.0-phase5",
|
|
"url": "https://github.com/sneakygeek/SneakyScan",
|
|
},
|
|
"timestamp": datetime.utcnow(),
|
|
}
|
|
|
|
return self.render(template_string, sample_context, template_format)
|
|
|
|
|
|
# Singleton instance
|
|
_template_service = None
|
|
|
|
|
|
def get_template_service() -> TemplateService:
|
|
"""Get the singleton TemplateService instance."""
|
|
global _template_service
|
|
if _template_service is None:
|
|
_template_service = TemplateService()
|
|
return _template_service
|