# Wazuh API Helper (Python) A lightweight helper around the Wazuh/OpenSearch APIs with a tiny DSL for building queries. It includes: * `wazuh_dsl_query` – build OpenSearch-style bool/range/terms queries programmatically * `wazuh_api` – convenience wrapper for common Wazuh operations (search, SQL, counts, snapshots, MITRE rollups, vuln reporting, etc.) > Note: Some hooks in the code reference proprietary utilities (e.g., Jira helpers) that are not included. Those paths are noted in comments and are safe to remove or replace. --- ## Features at a Glance * Simple DSL to compose filters, ranges, exists checks, wildcard/multi-match, aggregations * Search helpers for `wazuh-alerts-*` indices * OpenSearch SQL endpoint helper * Aggregation parsers (agents, MITRE tactics/techniques, actionable alert groupings) * Vulnerability detector report helpers (with `jq`-based shaping) * Token-based calls to the Wazuh Security API (overview endpoints) * Snapshot existence/creation helpers (example uses `AWS-S3` repo) * Alert retrieval by ID and recent alerts by level/time window --- ## Requirements * Python 3.10+ recommended * System dependency for `jq` Python package: * Linux: `sudo apt-get install -y libjq1` (Debian/Ubuntu) or `sudo yum install jq` (RHEL/CentOS) * macOS: `brew install jq` * Python packages (see `requirements.txt` below) ### `requirements.txt` ```txt requests>=2.31.0 jq>=1.6.0 ``` --- ## Installation ```bash python -m venv .venv source .venv/bin/activate pip install -r requirements.txt ``` If you see `Unable to load external libraries`, ensure the virtualenv is active and `libjq/jq` is installed. --- ## Quick Start ```python from wazuh_api import wazuh_api, wazuh_dsl_query # adjust if your filename/module is different api = wazuh_api( indexer_url="https://wazuh-indexer.example.com:9200", analyst_url="https://wazuh.example.com", admin_api_url="https://wazuh.example.com:55000", user="wazuh_reader", password="******", admin_username="wazuh_admin", admin_password="******", ) # Basic health check assert api.settings_valid(), "Wazuh API settings are not valid" # Build a DSL query for alerts between two ISO timestamps q = wazuh_dsl_query("2025-01-01T00:00:00.000Z", "2025-01-01T23:59:59.999Z") q.set_size(10) q.add_filter_range("rule.level", 12, 16) # actionable levels q.add_filter_field_must_exist("rule.level") query = q.get_query(exclude_size=False) hits = api.do_wazuh_search(query) # returns hits list print(f"Found {len(hits)} hits") ``` --- ## Common Recipes ### 1) Get Alerts in the Last N Minutes ```python recent = api.get_alerts( index="wazuh-alerts-*", min_alert_level=12, max_alert_level=16, max_alerts=200, minutes_back=60, ) ``` ### 2) Count Alerts in a Time Range ```python count_resp = api.get_alerts_count_for_time_range( "2025-01-01T00:00:00.000Z", "2025-01-01T12:00:00.000Z", ) print(count_resp) # OpenSearch _count response ``` ### 3) Search Logs Related to a Specific Alert (by field/value) ```python related = api.search_logs_related_to_alert( timestamp="2025-01-01T05:15:30.000Z", minutes_around=2, filter_label="rule.id", filter_value="123456", ) ``` ### 4) MITRE TTP Rollups (Aggregations) ```python ttp = api.get_mitre_attack_data( "2025-01-01T00:00:00.000Z", "2025-01-02T00:00:00.000Z", ) # Returns dict with buckets for 'mitre_tactics' and 'mitre_tech' ``` ### 5) Top Agents with Alerts ```python agents = api.get_top_agents_with_alerts( "2025-01-01T00:00:00.000Z", "2025-01-02T00:00:00.000Z", ) ``` ### 6) Vulnerability Detector Report (example) ```python vuln_rows = api.get_vuln_report_data_v2( start_date_iso="2025-01-01T00:00:00.000Z", end_date_iso="2025-01-31T23:59:59.999Z", limit=True, # only sample a few agents ) # List of dicts like "Machine Name", "IP", "Application", "CVE ID", etc. ``` ### 7) OpenSearch SQL ```python resp = api.do_wazuh_sql_query("SELECT agent.name FROM wazuh-alerts-* LIMIT 5") print(resp) ``` ### 8) Alert by Document ID ```python alert = api.get_alert_by_id(alert_id="Aw123...docid...", date="2025-02-11") ``` --- ## Class Overview ### `wazuh_dsl_query` * `set_size(n)` – set result size * `add_filter_range(field, gte, lte, timestamp=False)` – range filter; set `timestamp=True` to include OpenSearch date format * `add_filter_exact_match(field, value)` – exact match via `match_phrase` * `add_filter_value_in_field(field, values)` – OR of several exact matches * `add_filter_field_must_exist(field)` – `exists` filter * `add_filter_wildcard_match(value)` – `multi_match` (lenient) text search * `add_filter_exclude(field, value)` – append `must_not` for a value * `add_agg_count(name, field)` – value_count aggregation * `add_agg_count_group(name, field, max_size=20)` – terms aggregation * `get_query(exclude_size=True)` – build final query dict ### `wazuh_api` Core settings: * `indexer_url` – OpenSearch/Indexer base (e.g., `https://host:9200`) * `analyst_url` – Wazuh web (used in some orgs; not strictly required in helper methods) * `admin_api_url` – Wazuh API (e.g., `https://host:55000`) * `user/password` – basic auth for indexer * `admin_username/admin_password` – admin creds for Wazuh Security API token calls Key methods: * Search & SQL: `do_wazuh_search`, `do_wazuh_sql_query`, `get_count_for_wazuh_query` * Snapshots: `check_snapshot_exits(date)`, `do_snapshot(date)` * Helpers: `get_time_around_wazuh_alert_time`, `search_logs_related_to_alert` * Reporting/Aggs: `get_top_agents_with_alerts`, `get_mitre_attack_data`, `get_data_for_actionable_alerts` * Security API: `get_token`, `get_agent_overview` * Alert utils: `get_alert_by_id`, `get_alerts` * Vuln parsing: `get_vuln_report_data_v2`, `format_cve_list`, `__parse_vuln_record` (uses `jq`) --- ## Configuration Tips * The code initializes a basic logger: ```python logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") ``` Replace with your own logging setup in production. * TLS verification is disabled (`verify=False`) in requests for convenience. **Strongly** consider enabling verification and providing a CA bundle in production. * The snapshot helpers assume a repository named `AWS-S3`. Snapshot repo name is configurable via snapshot_repo in the constructor (default: AWS-S3).. * Integration-specific parsers can be added by defining methods named parse_{integration}_integration (Wazuh’s data.integration with - replaced by _). A fallback parse_default_integration is provided. --- ## Security Notes * Avoid committing real credentials. Use environment variables or a secrets manager. * Consider rotating to token-based auth for indexer access as supported by your deployment. * Re-enable certificate verification and pin CA where possible. --- ## Error Handling * Methods return `{}`, `[]`, or `False` on errors and log details with `logger.critical`/`logger.error`. * Wrap calls in your code to handle these cases gracefully. --- ## Known Gaps / TODO * Expand unit tests and add type hints where appropriate for stricter usage. --- ## Project Structure (suggested) ``` your-repo/ ├─ wazuh_api.py # this module (rename as needed) ├─ requirements.txt ├─ examples/ │ └─ quickstart.py └─ README.md ``` --- ## License MIT (suggested). Replace with your preferred license. --- ## Changelog * **v0.1.0** – Initial public extraction: search/SQL helpers, DSL, aggregations, vuln report shaping, snapshot helpers. --- ## Support Open an issue or PR on this repo. If you extend parsers for additional Wazuh integrations, consider contributing back minimal, non-proprietary versions.