Implemented full schedule management system with API endpoints and user interface for creating, editing, and managing scheduled scans. API Implementation: - Implemented all 6 schedules API endpoints (list, get, create, update, delete, trigger) - Added comprehensive error handling and validation - Integrated with ScheduleService and SchedulerService - Added manual trigger endpoint for on-demand execution Schedule Management UI: - Created schedules list page with stats cards and enable/disable toggles - Built schedule creation form with cron expression builder and quick templates - Implemented schedule edit page with execution history - Added "Schedules" navigation link to main menu - Real-time validation and human-readable cron descriptions Config File Path Resolution: - Fixed config file path handling to support relative filenames - Updated validators.py to resolve relative paths to /app/configs/ - Modified schedule_service.py, scan_service.py, and scan_job.py for consistency - Ensures UI can use simple filenames while backend uses absolute paths Scheduler Integration: - Completed scheduled scan execution in scheduler_service.py - Added cron job management with APScheduler - Implemented automatic schedule loading on startup - Updated run times after each execution Testing: - Added comprehensive API integration tests (test_schedule_api.py) - 22+ test cases covering all endpoints and workflows Progress: Phase 3 Steps 1-4 complete (36% - 5/14 days) Next: Step 5 - Enhanced Dashboard with Charts
640 lines
22 KiB
Python
640 lines
22 KiB
Python
"""
|
|
Integration tests for Schedule API endpoints.
|
|
|
|
Tests all schedule management endpoints including creating, listing,
|
|
updating, deleting schedules, and manually triggering scheduled scans.
|
|
"""
|
|
|
|
import json
|
|
import pytest
|
|
from datetime import datetime
|
|
|
|
from web.models import Schedule, Scan
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_schedule(db, sample_config_file):
|
|
"""
|
|
Create a sample schedule in the database for testing.
|
|
|
|
Args:
|
|
db: Database session fixture
|
|
sample_config_file: Path to test config file
|
|
|
|
Returns:
|
|
Schedule model instance
|
|
"""
|
|
schedule = Schedule(
|
|
name='Daily Test Scan',
|
|
config_file=sample_config_file,
|
|
cron_expression='0 2 * * *',
|
|
enabled=True,
|
|
last_run=None,
|
|
next_run=datetime(2025, 11, 15, 2, 0, 0),
|
|
created_at=datetime.utcnow(),
|
|
updated_at=datetime.utcnow()
|
|
)
|
|
|
|
db.add(schedule)
|
|
db.commit()
|
|
db.refresh(schedule)
|
|
|
|
return schedule
|
|
|
|
|
|
class TestScheduleAPIEndpoints:
|
|
"""Test suite for schedule API endpoints."""
|
|
|
|
def test_list_schedules_empty(self, client, db):
|
|
"""Test listing schedules when database is empty."""
|
|
response = client.get('/api/schedules')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert data['schedules'] == []
|
|
assert data['total'] == 0
|
|
assert data['page'] == 1
|
|
assert data['per_page'] == 20
|
|
|
|
def test_list_schedules_populated(self, client, db, sample_schedule):
|
|
"""Test listing schedules with existing data."""
|
|
response = client.get('/api/schedules')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert data['total'] == 1
|
|
assert len(data['schedules']) == 1
|
|
assert data['schedules'][0]['id'] == sample_schedule.id
|
|
assert data['schedules'][0]['name'] == sample_schedule.name
|
|
assert data['schedules'][0]['cron_expression'] == sample_schedule.cron_expression
|
|
|
|
def test_list_schedules_pagination(self, client, db, sample_config_file):
|
|
"""Test schedule list pagination."""
|
|
# Create 25 schedules
|
|
for i in range(25):
|
|
schedule = Schedule(
|
|
name=f'Schedule {i}',
|
|
config_file=sample_config_file,
|
|
cron_expression='0 2 * * *',
|
|
enabled=True,
|
|
created_at=datetime.utcnow()
|
|
)
|
|
db.add(schedule)
|
|
db.commit()
|
|
|
|
# Test page 1
|
|
response = client.get('/api/schedules?page=1&per_page=10')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert data['total'] == 25
|
|
assert len(data['schedules']) == 10
|
|
assert data['page'] == 1
|
|
assert data['per_page'] == 10
|
|
assert data['pages'] == 3
|
|
|
|
# Test page 2
|
|
response = client.get('/api/schedules?page=2&per_page=10')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert len(data['schedules']) == 10
|
|
assert data['page'] == 2
|
|
|
|
def test_list_schedules_filter_enabled(self, client, db, sample_config_file):
|
|
"""Test filtering schedules by enabled status."""
|
|
# Create enabled and disabled schedules
|
|
for i in range(3):
|
|
schedule = Schedule(
|
|
name=f'Enabled Schedule {i}',
|
|
config_file=sample_config_file,
|
|
cron_expression='0 2 * * *',
|
|
enabled=True,
|
|
created_at=datetime.utcnow()
|
|
)
|
|
db.add(schedule)
|
|
|
|
for i in range(2):
|
|
schedule = Schedule(
|
|
name=f'Disabled Schedule {i}',
|
|
config_file=sample_config_file,
|
|
cron_expression='0 3 * * *',
|
|
enabled=False,
|
|
created_at=datetime.utcnow()
|
|
)
|
|
db.add(schedule)
|
|
db.commit()
|
|
|
|
# Filter by enabled=true
|
|
response = client.get('/api/schedules?enabled=true')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert data['total'] == 3
|
|
for schedule in data['schedules']:
|
|
assert schedule['enabled'] is True
|
|
|
|
# Filter by enabled=false
|
|
response = client.get('/api/schedules?enabled=false')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert data['total'] == 2
|
|
for schedule in data['schedules']:
|
|
assert schedule['enabled'] is False
|
|
|
|
def test_get_schedule(self, client, db, sample_schedule):
|
|
"""Test getting schedule details."""
|
|
response = client.get(f'/api/schedules/{sample_schedule.id}')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert data['id'] == sample_schedule.id
|
|
assert data['name'] == sample_schedule.name
|
|
assert data['config_file'] == sample_schedule.config_file
|
|
assert data['cron_expression'] == sample_schedule.cron_expression
|
|
assert data['enabled'] == sample_schedule.enabled
|
|
assert 'history' in data
|
|
|
|
def test_get_schedule_not_found(self, client, db):
|
|
"""Test getting non-existent schedule."""
|
|
response = client.get('/api/schedules/99999')
|
|
assert response.status_code == 404
|
|
|
|
data = json.loads(response.data)
|
|
assert 'error' in data
|
|
assert 'not found' in data['error'].lower()
|
|
|
|
def test_create_schedule(self, client, db, sample_config_file):
|
|
"""Test creating a new schedule."""
|
|
schedule_data = {
|
|
'name': 'New Test Schedule',
|
|
'config_file': sample_config_file,
|
|
'cron_expression': '0 3 * * *',
|
|
'enabled': True
|
|
}
|
|
|
|
response = client.post(
|
|
'/api/schedules',
|
|
data=json.dumps(schedule_data),
|
|
content_type='application/json'
|
|
)
|
|
assert response.status_code == 201
|
|
|
|
data = json.loads(response.data)
|
|
assert 'schedule_id' in data
|
|
assert data['message'] == 'Schedule created successfully'
|
|
assert 'schedule' in data
|
|
|
|
# Verify schedule in database
|
|
schedule = db.query(Schedule).filter(Schedule.id == data['schedule_id']).first()
|
|
assert schedule is not None
|
|
assert schedule.name == schedule_data['name']
|
|
assert schedule.cron_expression == schedule_data['cron_expression']
|
|
|
|
def test_create_schedule_missing_fields(self, client, db):
|
|
"""Test creating schedule with missing required fields."""
|
|
# Missing cron_expression
|
|
schedule_data = {
|
|
'name': 'Incomplete Schedule',
|
|
'config_file': '/app/configs/test.yaml'
|
|
}
|
|
|
|
response = client.post(
|
|
'/api/schedules',
|
|
data=json.dumps(schedule_data),
|
|
content_type='application/json'
|
|
)
|
|
assert response.status_code == 400
|
|
|
|
data = json.loads(response.data)
|
|
assert 'error' in data
|
|
assert 'missing' in data['error'].lower()
|
|
|
|
def test_create_schedule_invalid_cron(self, client, db, sample_config_file):
|
|
"""Test creating schedule with invalid cron expression."""
|
|
schedule_data = {
|
|
'name': 'Invalid Cron Schedule',
|
|
'config_file': sample_config_file,
|
|
'cron_expression': 'invalid cron'
|
|
}
|
|
|
|
response = client.post(
|
|
'/api/schedules',
|
|
data=json.dumps(schedule_data),
|
|
content_type='application/json'
|
|
)
|
|
assert response.status_code == 400
|
|
|
|
data = json.loads(response.data)
|
|
assert 'error' in data
|
|
assert 'invalid' in data['error'].lower() or 'cron' in data['error'].lower()
|
|
|
|
def test_create_schedule_invalid_config(self, client, db):
|
|
"""Test creating schedule with non-existent config file."""
|
|
schedule_data = {
|
|
'name': 'Invalid Config Schedule',
|
|
'config_file': '/nonexistent/config.yaml',
|
|
'cron_expression': '0 2 * * *'
|
|
}
|
|
|
|
response = client.post(
|
|
'/api/schedules',
|
|
data=json.dumps(schedule_data),
|
|
content_type='application/json'
|
|
)
|
|
assert response.status_code == 400
|
|
|
|
data = json.loads(response.data)
|
|
assert 'error' in data
|
|
assert 'not found' in data['error'].lower()
|
|
|
|
def test_update_schedule(self, client, db, sample_schedule):
|
|
"""Test updating schedule fields."""
|
|
update_data = {
|
|
'name': 'Updated Schedule Name',
|
|
'cron_expression': '0 4 * * *'
|
|
}
|
|
|
|
response = client.put(
|
|
f'/api/schedules/{sample_schedule.id}',
|
|
data=json.dumps(update_data),
|
|
content_type='application/json'
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert data['message'] == 'Schedule updated successfully'
|
|
assert data['schedule']['name'] == update_data['name']
|
|
assert data['schedule']['cron_expression'] == update_data['cron_expression']
|
|
|
|
# Verify in database
|
|
db.refresh(sample_schedule)
|
|
assert sample_schedule.name == update_data['name']
|
|
assert sample_schedule.cron_expression == update_data['cron_expression']
|
|
|
|
def test_update_schedule_not_found(self, client, db):
|
|
"""Test updating non-existent schedule."""
|
|
update_data = {'name': 'New Name'}
|
|
|
|
response = client.put(
|
|
'/api/schedules/99999',
|
|
data=json.dumps(update_data),
|
|
content_type='application/json'
|
|
)
|
|
assert response.status_code == 404
|
|
|
|
data = json.loads(response.data)
|
|
assert 'error' in data
|
|
|
|
def test_update_schedule_invalid_cron(self, client, db, sample_schedule):
|
|
"""Test updating schedule with invalid cron expression."""
|
|
update_data = {'cron_expression': 'invalid'}
|
|
|
|
response = client.put(
|
|
f'/api/schedules/{sample_schedule.id}',
|
|
data=json.dumps(update_data),
|
|
content_type='application/json'
|
|
)
|
|
assert response.status_code == 400
|
|
|
|
data = json.loads(response.data)
|
|
assert 'error' in data
|
|
|
|
def test_update_schedule_toggle_enabled(self, client, db, sample_schedule):
|
|
"""Test enabling/disabling schedule."""
|
|
# Disable schedule
|
|
response = client.put(
|
|
f'/api/schedules/{sample_schedule.id}',
|
|
data=json.dumps({'enabled': False}),
|
|
content_type='application/json'
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert data['schedule']['enabled'] is False
|
|
|
|
# Enable schedule
|
|
response = client.put(
|
|
f'/api/schedules/{sample_schedule.id}',
|
|
data=json.dumps({'enabled': True}),
|
|
content_type='application/json'
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert data['schedule']['enabled'] is True
|
|
|
|
def test_update_schedule_no_data(self, client, db, sample_schedule):
|
|
"""Test updating schedule with no data."""
|
|
response = client.put(
|
|
f'/api/schedules/{sample_schedule.id}',
|
|
data=json.dumps({}),
|
|
content_type='application/json'
|
|
)
|
|
assert response.status_code == 400
|
|
|
|
data = json.loads(response.data)
|
|
assert 'error' in data
|
|
|
|
def test_delete_schedule(self, client, db, sample_schedule):
|
|
"""Test deleting a schedule."""
|
|
schedule_id = sample_schedule.id
|
|
|
|
response = client.delete(f'/api/schedules/{schedule_id}')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert data['message'] == 'Schedule deleted successfully'
|
|
assert data['schedule_id'] == schedule_id
|
|
|
|
# Verify deletion in database
|
|
schedule = db.query(Schedule).filter(Schedule.id == schedule_id).first()
|
|
assert schedule is None
|
|
|
|
def test_delete_schedule_not_found(self, client, db):
|
|
"""Test deleting non-existent schedule."""
|
|
response = client.delete('/api/schedules/99999')
|
|
assert response.status_code == 404
|
|
|
|
data = json.loads(response.data)
|
|
assert 'error' in data
|
|
|
|
def test_delete_schedule_preserves_scans(self, client, db, sample_schedule, sample_config_file):
|
|
"""Test that deleting schedule preserves associated scans."""
|
|
# Create a scan associated with the schedule
|
|
scan = Scan(
|
|
timestamp=datetime.utcnow(),
|
|
status='completed',
|
|
config_file=sample_config_file,
|
|
title='Test Scan',
|
|
triggered_by='scheduled',
|
|
schedule_id=sample_schedule.id
|
|
)
|
|
db.add(scan)
|
|
db.commit()
|
|
scan_id = scan.id
|
|
|
|
# Delete schedule
|
|
response = client.delete(f'/api/schedules/{sample_schedule.id}')
|
|
assert response.status_code == 200
|
|
|
|
# Verify scan still exists
|
|
scan = db.query(Scan).filter(Scan.id == scan_id).first()
|
|
assert scan is not None
|
|
assert scan.schedule_id is None # Schedule ID becomes null
|
|
|
|
def test_trigger_schedule(self, client, db, sample_schedule):
|
|
"""Test manually triggering a scheduled scan."""
|
|
response = client.post(f'/api/schedules/{sample_schedule.id}/trigger')
|
|
assert response.status_code == 201
|
|
|
|
data = json.loads(response.data)
|
|
assert data['message'] == 'Scan triggered successfully'
|
|
assert 'scan_id' in data
|
|
assert data['schedule_id'] == sample_schedule.id
|
|
|
|
# Verify scan was created
|
|
scan = db.query(Scan).filter(Scan.id == data['scan_id']).first()
|
|
assert scan is not None
|
|
assert scan.triggered_by == 'manual'
|
|
assert scan.schedule_id == sample_schedule.id
|
|
assert scan.config_file == sample_schedule.config_file
|
|
|
|
def test_trigger_schedule_not_found(self, client, db):
|
|
"""Test triggering non-existent schedule."""
|
|
response = client.post('/api/schedules/99999/trigger')
|
|
assert response.status_code == 404
|
|
|
|
data = json.loads(response.data)
|
|
assert 'error' in data
|
|
|
|
def test_get_schedule_with_history(self, client, db, sample_schedule, sample_config_file):
|
|
"""Test getting schedule includes execution history."""
|
|
# Create some scans for this schedule
|
|
for i in range(5):
|
|
scan = Scan(
|
|
timestamp=datetime.utcnow(),
|
|
status='completed',
|
|
config_file=sample_config_file,
|
|
title=f'Scheduled Scan {i}',
|
|
triggered_by='scheduled',
|
|
schedule_id=sample_schedule.id
|
|
)
|
|
db.add(scan)
|
|
db.commit()
|
|
|
|
response = client.get(f'/api/schedules/{sample_schedule.id}')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert 'history' in data
|
|
assert len(data['history']) == 5
|
|
|
|
def test_schedule_workflow_integration(self, client, db, sample_config_file):
|
|
"""Test complete schedule workflow: create → update → trigger → delete."""
|
|
# 1. Create schedule
|
|
schedule_data = {
|
|
'name': 'Integration Test Schedule',
|
|
'config_file': sample_config_file,
|
|
'cron_expression': '0 2 * * *',
|
|
'enabled': True
|
|
}
|
|
|
|
response = client.post(
|
|
'/api/schedules',
|
|
data=json.dumps(schedule_data),
|
|
content_type='application/json'
|
|
)
|
|
assert response.status_code == 201
|
|
schedule_id = json.loads(response.data)['schedule_id']
|
|
|
|
# 2. Get schedule
|
|
response = client.get(f'/api/schedules/{schedule_id}')
|
|
assert response.status_code == 200
|
|
|
|
# 3. Update schedule
|
|
response = client.put(
|
|
f'/api/schedules/{schedule_id}',
|
|
data=json.dumps({'name': 'Updated Integration Test'}),
|
|
content_type='application/json'
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
# 4. Trigger schedule
|
|
response = client.post(f'/api/schedules/{schedule_id}/trigger')
|
|
assert response.status_code == 201
|
|
scan_id = json.loads(response.data)['scan_id']
|
|
|
|
# 5. Verify scan was created
|
|
scan = db.query(Scan).filter(Scan.id == scan_id).first()
|
|
assert scan is not None
|
|
|
|
# 6. Delete schedule
|
|
response = client.delete(f'/api/schedules/{schedule_id}')
|
|
assert response.status_code == 200
|
|
|
|
# 7. Verify schedule deleted
|
|
response = client.get(f'/api/schedules/{schedule_id}')
|
|
assert response.status_code == 404
|
|
|
|
# 8. Verify scan still exists
|
|
scan = db.query(Scan).filter(Scan.id == scan_id).first()
|
|
assert scan is not None
|
|
|
|
def test_list_schedules_ordering(self, client, db, sample_config_file):
|
|
"""Test that schedules are ordered by next_run time."""
|
|
# Create schedules with different next_run times
|
|
schedules = []
|
|
for i in range(3):
|
|
schedule = Schedule(
|
|
name=f'Schedule {i}',
|
|
config_file=sample_config_file,
|
|
cron_expression='0 2 * * *',
|
|
enabled=True,
|
|
next_run=datetime(2025, 11, 15 + i, 2, 0, 0),
|
|
created_at=datetime.utcnow()
|
|
)
|
|
db.add(schedule)
|
|
schedules.append(schedule)
|
|
|
|
# Create a disabled schedule (next_run is None)
|
|
disabled_schedule = Schedule(
|
|
name='Disabled Schedule',
|
|
config_file=sample_config_file,
|
|
cron_expression='0 3 * * *',
|
|
enabled=False,
|
|
next_run=None,
|
|
created_at=datetime.utcnow()
|
|
)
|
|
db.add(disabled_schedule)
|
|
db.commit()
|
|
|
|
response = client.get('/api/schedules')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
returned_schedules = data['schedules']
|
|
|
|
# Schedules with next_run should come before those without
|
|
# Within those with next_run, they should be ordered by time
|
|
assert returned_schedules[0]['id'] == schedules[0].id
|
|
assert returned_schedules[1]['id'] == schedules[1].id
|
|
assert returned_schedules[2]['id'] == schedules[2].id
|
|
assert returned_schedules[3]['id'] == disabled_schedule.id
|
|
|
|
def test_create_schedule_with_disabled(self, client, db, sample_config_file):
|
|
"""Test creating a disabled schedule."""
|
|
schedule_data = {
|
|
'name': 'Disabled Schedule',
|
|
'config_file': sample_config_file,
|
|
'cron_expression': '0 2 * * *',
|
|
'enabled': False
|
|
}
|
|
|
|
response = client.post(
|
|
'/api/schedules',
|
|
data=json.dumps(schedule_data),
|
|
content_type='application/json'
|
|
)
|
|
assert response.status_code == 201
|
|
|
|
data = json.loads(response.data)
|
|
assert data['schedule']['enabled'] is False
|
|
assert data['schedule']['next_run'] is None # Disabled schedules have no next_run
|
|
|
|
|
|
class TestScheduleAPIAuthentication:
|
|
"""Test suite for schedule API authentication."""
|
|
|
|
def test_schedules_require_authentication(self, app):
|
|
"""Test that all schedule endpoints require authentication."""
|
|
# Create unauthenticated client
|
|
client = app.test_client()
|
|
|
|
endpoints = [
|
|
('GET', '/api/schedules'),
|
|
('GET', '/api/schedules/1'),
|
|
('POST', '/api/schedules'),
|
|
('PUT', '/api/schedules/1'),
|
|
('DELETE', '/api/schedules/1'),
|
|
('POST', '/api/schedules/1/trigger')
|
|
]
|
|
|
|
for method, endpoint in endpoints:
|
|
if method == 'GET':
|
|
response = client.get(endpoint)
|
|
elif method == 'POST':
|
|
response = client.post(
|
|
endpoint,
|
|
data=json.dumps({}),
|
|
content_type='application/json'
|
|
)
|
|
elif method == 'PUT':
|
|
response = client.put(
|
|
endpoint,
|
|
data=json.dumps({}),
|
|
content_type='application/json'
|
|
)
|
|
elif method == 'DELETE':
|
|
response = client.delete(endpoint)
|
|
|
|
# Should redirect to login or return 401
|
|
assert response.status_code in [302, 401], \
|
|
f"{method} {endpoint} should require authentication"
|
|
|
|
|
|
class TestScheduleAPICronValidation:
|
|
"""Test suite for cron expression validation."""
|
|
|
|
def test_valid_cron_expressions(self, client, db, sample_config_file):
|
|
"""Test various valid cron expressions."""
|
|
valid_expressions = [
|
|
'0 2 * * *', # Daily at 2am
|
|
'*/15 * * * *', # Every 15 minutes
|
|
'0 0 * * 0', # Weekly on Sunday
|
|
'0 0 1 * *', # Monthly on 1st
|
|
'0 */4 * * *', # Every 4 hours
|
|
]
|
|
|
|
for cron_expr in valid_expressions:
|
|
schedule_data = {
|
|
'name': f'Schedule for {cron_expr}',
|
|
'config_file': sample_config_file,
|
|
'cron_expression': cron_expr
|
|
}
|
|
|
|
response = client.post(
|
|
'/api/schedules',
|
|
data=json.dumps(schedule_data),
|
|
content_type='application/json'
|
|
)
|
|
assert response.status_code == 201, \
|
|
f"Valid cron expression '{cron_expr}' should be accepted"
|
|
|
|
def test_invalid_cron_expressions(self, client, db, sample_config_file):
|
|
"""Test various invalid cron expressions."""
|
|
invalid_expressions = [
|
|
'invalid',
|
|
'60 2 * * *', # Invalid minute
|
|
'0 25 * * *', # Invalid hour
|
|
'0 0 32 * *', # Invalid day
|
|
'0 0 * 13 *', # Invalid month
|
|
'0 0 * * 8', # Invalid day of week
|
|
]
|
|
|
|
for cron_expr in invalid_expressions:
|
|
schedule_data = {
|
|
'name': f'Schedule for {cron_expr}',
|
|
'config_file': sample_config_file,
|
|
'cron_expression': cron_expr
|
|
}
|
|
|
|
response = client.post(
|
|
'/api/schedules',
|
|
data=json.dumps(schedule_data),
|
|
content_type='application/json'
|
|
)
|
|
assert response.status_code == 400, \
|
|
f"Invalid cron expression '{cron_expr}' should be rejected"
|