Wazuh API Helper (Python)
A lightweight helper around the Wazuh/OpenSearch APIs with a tiny DSL for building queries. It includes:
wazuh_dsl_query– build OpenSearch-style bool/range/terms queries programmaticallywazuh_api– convenience wrapper for common Wazuh operations (search, SQL, counts, snapshots, MITRE rollups, vuln reporting, etc.)
Note: Some hooks in the code reference proprietary utilities (e.g., Jira helpers) that are not included. Those paths are noted in comments and are safe to remove or replace.
Features at a Glance
- Simple DSL to compose filters, ranges, exists checks, wildcard/multi-match, aggregations
- Search helpers for
wazuh-alerts-*indices - OpenSearch SQL endpoint helper
- Aggregation parsers (agents, MITRE tactics/techniques, actionable alert groupings)
- Vulnerability detector report helpers (with
jq-based shaping) - Token-based calls to the Wazuh Security API (overview endpoints)
- Snapshot existence/creation helpers (example uses
AWS-S3repo) - Alert retrieval by ID and recent alerts by level/time window
Requirements
-
Python 3.10+ recommended
-
System dependency for
jqPython package:- Linux:
sudo apt-get install -y libjq1(Debian/Ubuntu) orsudo yum install jq(RHEL/CentOS) - macOS:
brew install jq
- Linux:
-
Python packages (see
requirements.txtbelow)
requirements.txt
requests>=2.31.0
jq>=1.6.0
Installation
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
If you see Unable to load external libraries, ensure the virtualenv is active and libjq/jq is installed.
Quick Start
from wazuh_api import wazuh_api, wazuh_dsl_query # adjust if your filename/module is different
api = wazuh_api(
indexer_url="https://wazuh-indexer.example.com:9200",
analyst_url="https://wazuh.example.com",
admin_api_url="https://wazuh.example.com:55000",
user="wazuh_reader",
password="******",
admin_username="wazuh_admin",
admin_password="******",
)
# Basic health check
assert api.settings_valid(), "Wazuh API settings are not valid"
# Build a DSL query for alerts between two ISO timestamps
q = wazuh_dsl_query("2025-01-01T00:00:00.000Z", "2025-01-01T23:59:59.999Z")
q.set_size(10)
q.add_filter_range("rule.level", 12, 16) # actionable levels
q.add_filter_field_must_exist("rule.level")
query = q.get_query(exclude_size=False)
hits = api.do_wazuh_search(query) # returns hits list
print(f"Found {len(hits)} hits")
Common Recipes
1) Get Alerts in the Last N Minutes
recent = api.get_alerts(
index="wazuh-alerts-*",
min_alert_level=12,
max_alert_level=16,
max_alerts=200,
minutes_back=60,
)
2) Count Alerts in a Time Range
count_resp = api.get_alerts_count_for_time_range(
"2025-01-01T00:00:00.000Z",
"2025-01-01T12:00:00.000Z",
)
print(count_resp) # OpenSearch _count response
3) Search Logs Related to a Specific Alert (by field/value)
related = api.search_logs_related_to_alert(
timestamp="2025-01-01T05:15:30.000Z",
minutes_around=2,
filter_label="rule.id",
filter_value="123456",
)
4) MITRE TTP Rollups (Aggregations)
ttp = api.get_mitre_attack_data(
"2025-01-01T00:00:00.000Z",
"2025-01-02T00:00:00.000Z",
)
# Returns dict with buckets for 'mitre_tactics' and 'mitre_tech'
5) Top Agents with Alerts
agents = api.get_top_agents_with_alerts(
"2025-01-01T00:00:00.000Z",
"2025-01-02T00:00:00.000Z",
)
6) Vulnerability Detector Report (example)
vuln_rows = api.get_vuln_report_data_v2(
start_date_iso="2025-01-01T00:00:00.000Z",
end_date_iso="2025-01-31T23:59:59.999Z",
limit=True, # only sample a few agents
)
# List of dicts like "Machine Name", "IP", "Application", "CVE ID", etc.
7) OpenSearch SQL
resp = api.do_wazuh_sql_query("SELECT agent.name FROM wazuh-alerts-* LIMIT 5")
print(resp)
8) Alert by Document ID
alert = api.get_alert_by_id(alert_id="Aw123...docid...", date="2025-02-11")
Class Overview
wazuh_dsl_query
set_size(n)– set result sizeadd_filter_range(field, gte, lte, timestamp=False)– range filter; settimestamp=Trueto include OpenSearch date formatadd_filter_exact_match(field, value)– exact match viamatch_phraseadd_filter_value_in_field(field, values)– OR of several exact matchesadd_filter_field_must_exist(field)–existsfilteradd_filter_wildcard_match(value)–multi_match(lenient) text searchadd_filter_exclude(field, value)– appendmust_notfor a valueadd_agg_count(name, field)– value_count aggregationadd_agg_count_group(name, field, max_size=20)– terms aggregationget_query(exclude_size=True)– build final query dict
wazuh_api
Core settings:
indexer_url– OpenSearch/Indexer base (e.g.,https://host:9200)analyst_url– Wazuh web (used in some orgs; not strictly required in helper methods)admin_api_url– Wazuh API (e.g.,https://host:55000)user/password– basic auth for indexeradmin_username/admin_password– admin creds for Wazuh Security API token calls
Key methods:
- Search & SQL:
do_wazuh_search,do_wazuh_sql_query,get_count_for_wazuh_query - Snapshots:
check_snapshot_exits(date),do_snapshot(date) - Helpers:
get_time_around_wazuh_alert_time,search_logs_related_to_alert - Reporting/Aggs:
get_top_agents_with_alerts,get_mitre_attack_data,get_data_for_actionable_alerts - Security API:
get_token,get_agent_overview - Alert utils:
get_alert_by_id,get_alerts - Vuln parsing:
get_vuln_report_data_v2,format_cve_list,__parse_vuln_record(usesjq)
Configuration Tips
-
The code initializes a basic logger:
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")Replace with your own logging setup in production.
-
TLS verification is disabled (
verify=False) in requests for convenience. Strongly consider enabling verification and providing a CA bundle in production. -
The snapshot helpers assume a repository named
AWS-S3. Snapshot repo name is configurable via snapshot_repo in the constructor (default: AWS-S3).. -
Integration-specific parsers can be added by defining methods named parse_{integration}_integration (Wazuh’s data.integration with - replaced by _). A fallback parse_default_integration is provided.
Security Notes
- Avoid committing real credentials. Use environment variables or a secrets manager.
- Consider rotating to token-based auth for indexer access as supported by your deployment.
- Re-enable certificate verification and pin CA where possible.
Error Handling
- Methods return
{},[], orFalseon errors and log details withlogger.critical/logger.error. - Wrap calls in your code to handle these cases gracefully.
Known Gaps / TODO
- Expand unit tests and add type hints where appropriate for stricter usage.
Project Structure (suggested)
your-repo/
├─ wazuh_api.py # this module (rename as needed)
├─ requirements.txt
├─ examples/
│ └─ quickstart.py
└─ README.md
License
MIT (suggested). Replace with your preferred license.
Changelog
- v0.1.0 – Initial public extraction: search/SQL helpers, DSL, aggregations, vuln report shaping, snapshot helpers.
Support
Open an issue or PR on this repo. If you extend parsers for additional Wazuh integrations, consider contributing back minimal, non-proprietary versions.