84 lines
2.5 KiB
Python
84 lines
2.5 KiB
Python
"""Add webhook template support
|
|
|
|
Revision ID: 005
|
|
Revises: 004
|
|
Create Date: 2025-11-18
|
|
|
|
"""
|
|
from alembic import op
|
|
import sqlalchemy as sa
|
|
import json
|
|
|
|
|
|
# revision identifiers, used by Alembic
|
|
revision = '005'
|
|
down_revision = '004'
|
|
branch_labels = None
|
|
depends_on = None
|
|
|
|
|
|
# Default template that matches the current JSON payload structure
|
|
DEFAULT_TEMPLATE = """{
|
|
"event": "alert.created",
|
|
"alert": {
|
|
"id": {{ alert.id }},
|
|
"type": "{{ alert.type }}",
|
|
"severity": "{{ alert.severity }}",
|
|
"message": "{{ alert.message }}",
|
|
{% if alert.ip_address %}"ip_address": "{{ alert.ip_address }}",{% endif %}
|
|
{% if alert.port %}"port": {{ alert.port }},{% endif %}
|
|
"acknowledged": {{ alert.acknowledged|lower }},
|
|
"created_at": "{{ alert.created_at.isoformat() }}"
|
|
},
|
|
"scan": {
|
|
"id": {{ scan.id }},
|
|
"title": "{{ scan.title }}",
|
|
"timestamp": "{{ scan.timestamp.isoformat() }}",
|
|
"status": "{{ scan.status }}"
|
|
},
|
|
"rule": {
|
|
"id": {{ rule.id }},
|
|
"name": "{{ rule.name }}",
|
|
"type": "{{ rule.type }}",
|
|
"threshold": {{ rule.threshold if rule.threshold else 'null' }}
|
|
}
|
|
}"""
|
|
|
|
|
|
def upgrade():
|
|
"""
|
|
Add webhook template fields:
|
|
- template: Jinja2 template for payload
|
|
- template_format: Output format (json, text)
|
|
- content_type_override: Optional custom Content-Type
|
|
"""
|
|
|
|
# Add new columns to webhooks table
|
|
with op.batch_alter_table('webhooks') as batch_op:
|
|
batch_op.add_column(sa.Column('template', sa.Text(), nullable=True, comment='Jinja2 template for webhook payload'))
|
|
batch_op.add_column(sa.Column('template_format', sa.String(20), nullable=True, server_default='json', comment='Template output format: json, text'))
|
|
batch_op.add_column(sa.Column('content_type_override', sa.String(100), nullable=True, comment='Optional custom Content-Type header'))
|
|
|
|
# Populate existing webhooks with default template
|
|
# This ensures backward compatibility by converting existing webhooks to use the
|
|
# same JSON structure they're currently sending
|
|
connection = op.get_bind()
|
|
connection.execute(
|
|
sa.text("""
|
|
UPDATE webhooks
|
|
SET template = :template,
|
|
template_format = 'json'
|
|
WHERE template IS NULL
|
|
"""),
|
|
{"template": DEFAULT_TEMPLATE}
|
|
)
|
|
|
|
|
|
def downgrade():
|
|
"""Remove webhook template fields."""
|
|
|
|
with op.batch_alter_table('webhooks') as batch_op:
|
|
batch_op.drop_column('content_type_override')
|
|
batch_op.drop_column('template_format')
|
|
batch_op.drop_column('template')
|