Files
SneakyScan/app/migrations/versions/005_add_webhook_templates.py
2025-11-18 15:29:23 -06:00

84 lines
2.5 KiB
Python

"""Add webhook template support
Revision ID: 005
Revises: 004
Create Date: 2025-11-18
"""
from alembic import op
import sqlalchemy as sa
import json
# revision identifiers, used by Alembic
revision = '005'
down_revision = '004'
branch_labels = None
depends_on = None
# Default template that matches the current JSON payload structure
DEFAULT_TEMPLATE = """{
"event": "alert.created",
"alert": {
"id": {{ alert.id }},
"type": "{{ alert.type }}",
"severity": "{{ alert.severity }}",
"message": "{{ alert.message }}",
{% if alert.ip_address %}"ip_address": "{{ alert.ip_address }}",{% endif %}
{% if alert.port %}"port": {{ alert.port }},{% endif %}
"acknowledged": {{ alert.acknowledged|lower }},
"created_at": "{{ alert.created_at.isoformat() }}"
},
"scan": {
"id": {{ scan.id }},
"title": "{{ scan.title }}",
"timestamp": "{{ scan.timestamp.isoformat() }}",
"status": "{{ scan.status }}"
},
"rule": {
"id": {{ rule.id }},
"name": "{{ rule.name }}",
"type": "{{ rule.type }}",
"threshold": {{ rule.threshold if rule.threshold else 'null' }}
}
}"""
def upgrade():
"""
Add webhook template fields:
- template: Jinja2 template for payload
- template_format: Output format (json, text)
- content_type_override: Optional custom Content-Type
"""
# Add new columns to webhooks table
with op.batch_alter_table('webhooks') as batch_op:
batch_op.add_column(sa.Column('template', sa.Text(), nullable=True, comment='Jinja2 template for webhook payload'))
batch_op.add_column(sa.Column('template_format', sa.String(20), nullable=True, server_default='json', comment='Template output format: json, text'))
batch_op.add_column(sa.Column('content_type_override', sa.String(100), nullable=True, comment='Optional custom Content-Type header'))
# Populate existing webhooks with default template
# This ensures backward compatibility by converting existing webhooks to use the
# same JSON structure they're currently sending
connection = op.get_bind()
connection.execute(
sa.text("""
UPDATE webhooks
SET template = :template,
template_format = 'json'
WHERE template IS NULL
"""),
{"template": DEFAULT_TEMPLATE}
)
def downgrade():
"""Remove webhook template fields."""
with op.batch_alter_table('webhooks') as batch_op:
batch_op.drop_column('content_type_override')
batch_op.drop_column('template_format')
batch_op.drop_column('template')