commit 60e169c99f88617df8f2253818219aa8f3d6baac Author: Phillip Tarrant Date: Mon Nov 10 11:37:13 2025 -0600 first commit diff --git a/README.md b/README.md new file mode 100644 index 0000000..0331756 --- /dev/null +++ b/README.md @@ -0,0 +1,268 @@ +# Wazuh API Helper (Python) + +A lightweight helper around the Wazuh/OpenSearch APIs with a tiny DSL for building queries. It includes: + +* `wazuh_dsl_query` – build OpenSearch-style bool/range/terms queries programmatically +* `wazuh_api` – convenience wrapper for common Wazuh operations (search, SQL, counts, snapshots, MITRE rollups, vuln reporting, etc.) + +> Note: Some hooks in the code reference proprietary utilities (e.g., Jira helpers) that are not included. Those paths are noted in comments and are safe to remove or replace. + +--- + +## Features at a Glance + +* Simple DSL to compose filters, ranges, exists checks, wildcard/multi-match, aggregations +* Search helpers for `wazuh-alerts-*` indices +* OpenSearch SQL endpoint helper +* Aggregation parsers (agents, MITRE tactics/techniques, actionable alert groupings) +* Vulnerability detector report helpers (with `jq`-based shaping) +* Token-based calls to the Wazuh Security API (overview endpoints) +* Snapshot existence/creation helpers (example uses `AWS-S3` repo) +* Alert retrieval by ID and recent alerts by level/time window + +--- + +## Requirements + +* Python 3.10+ recommended +* System dependency for `jq` Python package: + + * Linux: `sudo apt-get install -y libjq1` (Debian/Ubuntu) or `sudo yum install jq` (RHEL/CentOS) + * macOS: `brew install jq` +* Python packages (see `requirements.txt` below) + +### `requirements.txt` + +```txt +requests>=2.31.0 +jq>=1.6.0 +``` + +--- + +## Installation + +```bash +python -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +``` + +If you see `Unable to load external libraries`, ensure the virtualenv is active and `libjq/jq` is installed. + +--- + +## Quick Start + +```python +from wazuh_api import wazuh_api, wazuh_dsl_query # adjust if your filename/module is different + +api = wazuh_api( + indexer_url="https://wazuh-indexer.example.com:9200", + analyst_url="https://wazuh.example.com", + admin_api_url="https://wazuh.example.com:55000", + user="wazuh_reader", + password="******", + admin_username="wazuh_admin", + admin_password="******", +) + +# Basic health check +assert api.settings_valid(), "Wazuh API settings are not valid" + +# Build a DSL query for alerts between two ISO timestamps +q = wazuh_dsl_query("2025-01-01T00:00:00.000Z", "2025-01-01T23:59:59.999Z") +q.set_size(10) +q.add_filter_range("rule.level", 12, 16) # actionable levels +q.add_filter_field_must_exist("rule.level") +query = q.get_query(exclude_size=False) + +hits = api.do_wazuh_search(query) # returns hits list +print(f"Found {len(hits)} hits") +``` + +--- + +## Common Recipes + +### 1) Get Alerts in the Last N Minutes + +```python +recent = api.get_alerts( + index="wazuh-alerts-*", + min_alert_level=12, + max_alert_level=16, + max_alerts=200, + minutes_back=60, +) +``` + +### 2) Count Alerts in a Time Range + +```python +count_resp = api.get_alerts_count_for_time_range( + "2025-01-01T00:00:00.000Z", + "2025-01-01T12:00:00.000Z", +) +print(count_resp) # OpenSearch _count response +``` + +### 3) Search Logs Related to a Specific Alert (by field/value) + +```python +related = api.search_logs_related_to_alert( + timestamp="2025-01-01T05:15:30.000Z", + minutes_around=2, + filter_label="rule.id", + filter_value="123456", +) +``` + +### 4) MITRE TTP Rollups (Aggregations) + +```python +ttp = api.get_mitre_attack_data( + "2025-01-01T00:00:00.000Z", + "2025-01-02T00:00:00.000Z", +) +# Returns dict with buckets for 'mitre_tactics' and 'mitre_tech' +``` + +### 5) Top Agents with Alerts + +```python +agents = api.get_top_agents_with_alerts( + "2025-01-01T00:00:00.000Z", + "2025-01-02T00:00:00.000Z", +) +``` + +### 6) Vulnerability Detector Report (example) + +```python +vuln_rows = api.get_vuln_report_data_v2( + start_date_iso="2025-01-01T00:00:00.000Z", + end_date_iso="2025-01-31T23:59:59.999Z", + limit=True, # only sample a few agents +) +# List of dicts like "Machine Name", "IP", "Application", "CVE ID", etc. +``` + +### 7) OpenSearch SQL + +```python +resp = api.do_wazuh_sql_query("SELECT agent.name FROM wazuh-alerts-* LIMIT 5") +print(resp) +``` + +### 8) Alert by Document ID + +```python +alert = api.get_alert_by_id(alert_id="Aw123...docid...", date="2025-02-11") +``` + +--- + +## Class Overview + +### `wazuh_dsl_query` + +* `set_size(n)` – set result size +* `add_filter_range(field, gte, lte, timestamp=False)` – range filter; set `timestamp=True` to include OpenSearch date format +* `add_filter_exact_match(field, value)` – exact match via `match_phrase` +* `add_filter_value_in_field(field, values)` – OR of several exact matches +* `add_filter_field_must_exist(field)` – `exists` filter +* `add_filter_wildcard_match(value)` – `multi_match` (lenient) text search +* `add_filter_exclude(field, value)` – append `must_not` for a value +* `add_agg_count(name, field)` – value_count aggregation +* `add_agg_count_group(name, field, max_size=20)` – terms aggregation +* `get_query(exclude_size=True)` – build final query dict + +### `wazuh_api` + +Core settings: + +* `indexer_url` – OpenSearch/Indexer base (e.g., `https://host:9200`) +* `analyst_url` – Wazuh web (used in some orgs; not strictly required in helper methods) +* `admin_api_url` – Wazuh API (e.g., `https://host:55000`) +* `user/password` – basic auth for indexer +* `admin_username/admin_password` – admin creds for Wazuh Security API token calls + +Key methods: + +* Search & SQL: `do_wazuh_search`, `do_wazuh_sql_query`, `get_count_for_wazuh_query` +* Snapshots: `check_snapshot_exits(date)`, `do_snapshot(date)` +* Helpers: `get_time_around_wazuh_alert_time`, `search_logs_related_to_alert` +* Reporting/Aggs: `get_top_agents_with_alerts`, `get_mitre_attack_data`, `get_data_for_actionable_alerts` +* Security API: `get_token`, `get_agent_overview` +* Alert utils: `get_alert_by_id`, `get_alerts` +* Vuln parsing: `get_vuln_report_data_v2`, `format_cve_list`, `__parse_vuln_record` (uses `jq`) + +--- + +## Configuration Tips + +* The code initializes a basic logger: + + ```python + logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") + ``` + + Replace with your own logging setup in production. + +* TLS verification is disabled (`verify=False`) in requests for convenience. **Strongly** consider enabling verification and providing a CA bundle in production. + +* The snapshot helpers assume a repository named `AWS-S3`. Snapshot repo name is configurable via snapshot_repo in the constructor (default: AWS-S3).. + +* Integration-specific parsers can be added by defining methods named parse_{integration}_integration (Wazuh’s data.integration with - replaced by _). A fallback parse_default_integration is provided. +--- + +## Security Notes + +* Avoid committing real credentials. Use environment variables or a secrets manager. +* Consider rotating to token-based auth for indexer access as supported by your deployment. +* Re-enable certificate verification and pin CA where possible. + +--- + +## Error Handling + +* Methods return `{}`, `[]`, or `False` on errors and log details with `logger.critical`/`logger.error`. +* Wrap calls in your code to handle these cases gracefully. + +--- + +## Known Gaps / TODO + +* Expand unit tests and add type hints where appropriate for stricter usage. + +--- + +## Project Structure (suggested) + +``` +your-repo/ +├─ wazuh_api.py # this module (rename as needed) +├─ requirements.txt +├─ examples/ +│ └─ quickstart.py +└─ README.md +``` + +--- + +## License + +MIT (suggested). Replace with your preferred license. + +--- + +## Changelog + +* **v0.1.0** – Initial public extraction: search/SQL helpers, DSL, aggregations, vuln report shaping, snapshot helpers. + +--- + +## Support + +Open an issue or PR on this repo. If you extend parsers for additional Wazuh integrations, consider contributing back minimal, non-proprietary versions. diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d40accf --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +certifi==2025.10.5 +charset-normalizer==3.4.4 +idna==3.11 +jq==1.10.0 +requests==2.32.5 +urllib3==2.5.0 diff --git a/wazuh_api.py b/wazuh_api.py new file mode 100644 index 0000000..2e7513a --- /dev/null +++ b/wazuh_api.py @@ -0,0 +1,620 @@ +import os +import json +import logging +from datetime import datetime, timedelta + +# Note - several things in this file are tied to Jira or other utils that are not included in this file due to proprietary info in those files. + +# Configure logging once, this is just a basic logger - implement your own and adjust as needed. +logging.basicConfig( + level=logging.INFO, # or DEBUG for development + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) + +logger = logging.getLogger(__file__) + +# External Imports +try: + import jq + import requests + from requests.auth import HTTPBasicAuth + from requests.packages.urllib3.exceptions import InsecureRequestWarning # type: ignore + + requests.packages.urllib3.disable_warnings(InsecureRequestWarning) + +except ImportError as e: + print(f"Unable to load external libraries - {e}") + print("Run pip install -r requirements.txt") + exit(1) + +class wazuh_dsl_query: + + def __init__(self, query_start_time_iso:str, query_end_time_iso:str): + self.size = 1 + self.aggs = {} + self.filters = [ + {"match_all": {}} + ] + self.excludes = [] + self.add_filter_range("timestamp",query_start_time_iso,query_end_time_iso) + + pass + + def set_size(self,new_size:int): + self.size = new_size + + def add_agg_count(self, agg_name:str, agg_field_name:str): + agg = { + agg_name:{ + "value_count":{ + "field":agg_field_name + } + } + } + + self.aggs.update(agg) + + def add_agg_count_group(self, agg_name:str, agg_field_name:str, max_size=20): + agg = { + agg_name:{ + "terms":{ + "field":agg_field_name, + "size": max_size + } + } + } + + self.aggs.update(agg) + + def add_filter_range(self, field_name:str, greater_than, less_than, timestamp=False): + if timestamp: + self.filters.append({"range": { + field_name: { + "gte": greater_than, + "lte": less_than, + "format": "strict_date_optional_time" + } + }}) + else: + self.filters.append({"range": { + field_name: { + "gte": greater_than, + "lte": less_than, + } + }}) + + def add_filter_exact_match(self,field_name:str, field_value:str): + self.filters.append({ + "match_phrase": { + field_name: field_value + } + }) + + def add_filter_value_in_field(self,field_name:str,field_values_list:list): + should_list = [] + for field_value in field_values_list: + entry = {"match_phrase": {field_name: field_value}} + should_list.append(entry) + + self.filters.append({ + "bool": { + "should":should_list, + "minimum_should_match": 1 + }}) + + def add_filter_field_must_exist(self,field_name:str): + self.filters.append({ + "exists": { + "field": field_name + } + }) + + def add_filter_wildcard_match(self,value:str): + self.filters.append({ + "multi_match": { + "type": "best_fields", + "query": value, + "lenient": True + }}) + + def add_filter_exclude(self,field_name:str,value:str): + e = { + "match_phrase": { + field_name: value + } + } + self.excludes.append(e) + + def get_query(self,exclude_size=True): + query = {} + + query.update({"filter":self.filters}) + if len(self.excludes) > 0: + query.update({"must_not": self.excludes}) + + r = { + "query": {"bool":query} + } + + if not exclude_size: + r.update({"size":self.size}) + + # if we have aggrigates + if len(self.aggs) > 0: + r.update({"aggs": self.aggs}) + + # if we have excludes + + + return r + + +class wazuh_api: + + ALERT_LOWER=12 + ALERT_HIGHEST=16 + + def __init__(self, indexer_url:str, analyst_url:str, admin_api_url:str, user:str, password:str, admin_username:str, admin_password:str,snapshot_repo: str = "AWS-S3"): + self.base_url = indexer_url + self.security_url = admin_api_url + self.analyst_url = analyst_url + self.user = user + self.password = password + self.admin_username = admin_username + self.admin_password = admin_password + self.snapshot_repo = snapshot_repo + + self.headers = { + "Accept": "application/json", + "Content-Type": "application/json" + } + + pass + + def settings_valid(self): + if (self.base_url is None) or (self.user is None) or (self.password is None): + logger.critical("API settings are not valid. Unable to run Ingestion. Please check config.") + return False + else: + return True + + def get_time_around_wazuh_alert_time(self, wazuh_alert_time:str, minutes_around=2): + # Convert the date string to a datetime object + original_date = datetime.strptime(wazuh_alert_time, '%Y-%m-%dT%H:%M:%S.%fZ') + original_date = original_date.replace(second=0, microsecond=0) + + # Calculate the dates x minutes before and after the original date + date_before = original_date - timedelta(minutes=minutes_around) + date_after = original_date + timedelta(minutes=minutes_around) + + # Return the results as strings in the same format as the input + return date_before.strftime('%Y-%m-%dT%H:%M:%S.%fZ'), date_after.strftime('%Y-%m-%dT%H:%M:%S.%fZ') + + def search_logs_related_to_alert(self, timestamp:str,minutes_around:int, filter_label:str, filter_value:str): + start,end = self.get_time_around_wazuh_alert_time(timestamp,minutes_around) + q = wazuh_dsl_query(start,end) + q.set_size(50) + q.add_filter_exact_match(filter_label,filter_value) + query = q.get_query(exclude_size=False) + return self.do_wazuh_search(query) + + def do_wazuh_search(self, query:dict, index="wazuh-alerts-*",hits_only=True): + try: + search_url = f"{self.base_url}/{index}/_search" + logger.info(f"Searching Wazuh - {search_url}",extra={"query":query}) + resp = self.__get_json_payload_request(search_url,query) + hits = resp.get("hits",{}).get("hits",[]) + if hits_only: + return hits + else: + return resp + except Exception as e: + logger.error(f"Error while searching Wazuh! - {e}", extra={"query": query, "url":search_url}) + return [] + + def do_wazuh_sql_query(self,query:str): + logger.info(f"Executing SQL Search in Wazuh",extra={"query":query}) + try: + sql_url = f"{self.base_url}/_plugins/_sql?format=json" + payload = {"query":query} + resp = self._post_json_request(sql_url,payload) + return resp + except Exception as e: + logger.error(f"Error while doing SQL Post to Wazuh! - {e}", extra={"query": query, "url":sql_url}) + return [] + + def check_snapshot_exists(self, date:str): + url = f"{self.base_url}/_snapshot/{self.snapshot_repo}/{date}" + + try: + resp = self._get_param_request(url,None) + snapshots = resp.get("snapshots",[]) + if len(snapshots) > 0: + state = snapshots[0].get("state") + return state + return state + + except Exception as e: + return False + + def do_snapshot(self, date:str): + url = f"{self.base_url}/_snapshot/AWS-S3/{date}" + payload = { + "indices": "wazuh-alerts-*" + } + + try: + resp = self._post_json_request(url,payload) + return True + + except Exception as e: + return False + + def get_count_for_wazuh_query(self,query:dict, index="wazuh-alerts-*"): + try: + search_url = f"{self.base_url}/{index}/_count" + logger.info(f"Getting Count From Wazuah - {search_url}",extra={"query":query}) + resp = self.__get_json_payload_request(search_url,query) + return resp + except Exception as e: + logger.error(f"Error while searching Wazuh! - {e}", extra={"query": query, "url":search_url}) + return [] + + def get_token(self): + url = f"{self.security_url}/security/user/authenticate" + payload = None + resp = self._post_json_request(url,payload,True) + token = resp.get("data",{}).get("token") + + if token is not None: + logger.info("Successfully generated auth token") + else: + logger.critical(f"Unable to get token for request to url",extra={"url":url,"payload":payload}) + + return token + + def get_agent_overview(self): + url = f"{self.security_url}/overview/agents" + parms = {} + resp = self.__get_json_request_w_token(url,parms) + data = resp.get("data") + return data + + def get_alerts_count_for_time_range(self,start_date_iso:str,end_date_iso:str): + q = wazuh_dsl_query(start_date_iso,end_date_iso) + query = q.get_query(exclude_size=True) + return self.get_count_for_wazuh_query(query) + + def parse_aggs_return(self,aggs:dict) -> dict: + buckets = {} + if not isinstance(aggs,dict): + return {} + + for group_name,dict_values in aggs.items(): + items = [] + for entry in dict_values.get("buckets",[]): + items.append({entry.get("key"):entry.get("doc_count")}) + buckets.update({group_name:items}) + + return buckets + + def format_cve_list(self, entry): + # Step 1: Extract the list of CVEs from the data + buckets = entry.get('data.vulnerability.cve', {}).get('buckets', []) + cve_items = [] + + for bucket in buckets: + key = bucket.get('key') + if key: + cve_items.append(key) + + # Step 2: Format the list into a string that looks like a tuple + if cve_items: + quoted_items = ['"{}"'.format(cve) for cve in cve_items] + cve_string = ', '.join(quoted_items) + return f'({cve_string})' + else: + return '' + + def get_vuln_report_data_v2(self,start_date_iso:str, end_date_iso:str, limit=False): + query = f"""SELECT agent.name FROM wazuh-alerts-* WHERE timestamp >= '{start_date_iso}' + AND timestamp <= '{end_date_iso}' AND location = 'vulnerability-detector' AND data.vulnerability.status = 'Active' + GROUP BY agent.name, data.vulnerability.cve HAVING SUM(CASE WHEN data.vulnerability.status = 'Solved' + THEN 1 ELSE 0 END) = 0""" + + resp = self.do_wazuh_sql_query(query) + agent_groups = resp.get("aggregations",{}).get("agent.name",{}).get("buckets",{}) + + agents_with_vulns = [] + if len(agent_groups) > 0: + x=0 + for entry in agent_groups: + x = x+1 + if ( limit and (x >=6) ): + return agents_with_vulns + + agent_name = entry['key'] + cve_list = self.format_cve_list(entry) + + query = f"""SELECT d.agent.name, d.agent.ip, d.data.vulnerability.package.name, d.data.vulnerability.package.version, + d.data.vulnerability.severity, d.data.vulnerability.cve, d.data.vulnerability.cvss.cvss3.base_score, d.data.vulnerability.rationale, + d.data.vulnerability.package.condition, d.data.vulnerability.references, d.data.vulnerability.updated + FROM wazuh-alerts-* d + WHERE agent.name = '{agent_name}' AND timestamp >= '{start_date_iso}' AND timestamp <= '{end_date_iso}' AND data.vulnerability.cve IN {cve_list}""" + + resp = self.do_wazuh_sql_query(query) + data = resp.get("hits",{}).get("hits",{}) + parsed_data = self.__parse_vuln_record(data) + agents_with_vulns.extend(parsed_data) + + return agents_with_vulns + + def __parse_vuln_record(self,vuln_record:list): + + jq_script = """ + .[] | { + "Machine Name": ._source.agent.name, + "IP": ._source.agent.ip, + "Application": ._source.data.vulnerability.package.name, + "App Version": ._source.data.vulnerability.package.version, + "Severity": ._source.data.vulnerability.severity, + "CVE ID": ._source.data.vulnerability.cve, + "CVE Score": ._source.data.vulnerability.cvss.cvss3.base_score, + "CVE Summary": ._source.data.vulnerability.rationale, + "CVE Condition": ._source.data.vulnerability.package.condition, + "CVE References": (._source.data.vulnerability.references // []) | join(", "), + "CVE Updated Date": ._source.data.vulnerability.updated + } + """ + parsed_doc = jq.compile(jq_script).input_value(vuln_record).all() + return parsed_doc + + def get_top_agents_with_alerts(self,start_date_iso:str,end_date_iso:str): + q = wazuh_dsl_query(start_date_iso,end_date_iso) + q.set_size(0) + q.add_filter_range("rule.level",self.ALERT_LOWER,self.ALERT_HIGHEST) + q.add_filter_field_must_exist("rule.level") + q.add_filter_exclude("agent.id","000") + q.add_agg_count_group("agents","agent.name",10) + query = q.get_query(exclude_size=False) + # do a search (we are not interested in hits, and the size was 0 any, so no records were returned) + resp = self.do_wazuh_search(query,hits_only=False) + + # get the aggregations and parse them + aggs = resp.get("aggregations",{}) + bucket_data = self.parse_aggs_return(aggs) + return bucket_data + + def get_mitre_attack_data(self,start_date_iso:str,end_date_iso:str): + # build a query that searches for alerts, grouping by mitre tactic and technique + q = wazuh_dsl_query(start_date_iso,end_date_iso) + q.set_size(0) + q.add_filter_range("rule.level",self.ALERT_LOWER,self.ALERT_HIGHEST) + q.add_filter_field_must_exist("rule.mitre.tactic") + q.add_agg_count_group("mitre_tactics","rule.mitre.tactic",10) + q.add_agg_count_group("mitre_tech","rule.mitre.technique",10) + query = q.get_query(exclude_size=False) + + # do a search (we are not interested in hits, and the size was 0 any, so no records were returned) + resp = self.do_wazuh_search(query,hits_only=False) + + # get the aggregations and parse them + aggs = resp.get("aggregations",{}) + bucket_data = self.parse_aggs_return(aggs) + + return bucket_data + + def get_alert_by_id(self,alert_id:str,date="2025-02-11"): + query = { + "size": 1, + "query": { + "bool": { + "must": [], + "filter": [ + { + "match_all": {} + }, + { + "match_phrase": { + "_id": f"{alert_id}" + } + }, + { + "range": { + "timestamp": { + "gte": f"{date}T00:00:00.000Z", + "lte": f"{date}T23:23:59.999Z", + "format": "strict_date_optional_time" + } + } + } + ], + "should": [], + "must_not": [] + } + } + } + + alert = self.do_wazuh_search(query) + return alert + + def get_alerts(self, index="wazuh-alerts-*", min_alert_level:int=12, max_alert_level:int=16,max_alerts:int=1000,minutes_back:int=60): + query = { + "size": max_alerts, + "query": { + "bool": { + "must": [], + "filter": [ + { + "bool": { + "should": [ + { + "range": { + "rule.level": { + "gte": min_alert_level, + "lte": max_alert_level + } + } + } + ], + "minimum_should_match": 1 + } + }, + { + "range": { + "timestamp": { + "gte": f"now-{minutes_back}m", + "lte": "now", + "format": "strict_date_optional_time" + } + } + } + ], + "should": [], + "must_not": [] + } + } + } + return self.do_wazuh_search(query,index) + + def get_integration_type(self, record:dict): + record_integration = record.get("_source",{}).get("data",{}).get("integration").replace("-","_") + return record_integration + + def handle_default_wazuh(self, alert_record:dict) -> dict: + # print(json.dumps(alert_record,indent=True)) + # exit() + jq_script = """ + { + "timestamp":._source.timestamp, + "human_date": (._source.timestamp | split("T")[0]), + "human_time": (._source.timestamp | split("T")[1] | split(".")[0]), + "priority":._source.rule.level, + "summary":._source.rule.description, + "mitre_tactics":(._source.rule.mitre.tactic // []), + "mitre_techniques":(._source.rule.mitre.technique // []), + "rule_id":._source.rule.id, + "rule_fired_times":(._source.rule.firedtimes // 1), + "integration":(._source.data.integration // "default"), + "source_user":(._source.data.srcuser // ""), + "source_ip":(._source.data.srcip // ""), + } + """ + jq_script = "{"+jq_script+"}" + try: + r = jq.compile(jq_script).input_value(alert_record).first() + return r + except Exception as e: + logger.error(f"Unable to parse native Wazuh alert",extra={"alert":alert_record}) + return {} + + def handle_json(self,alert_record:dict) -> dict: + integration_type = self.get_integration_type(alert_record) + parser = f"parse_{integration_type}_integration" + + try: + if callable(getattr(self,parser)): + parsed_data = getattr(self, parser)(alert_record) + return parsed_data + except AttributeError as e: + logger.error(f"No parser found for integration {integration_type}, create one at {parser}") + return {} + + def parse_default_integration(self, alert_record: dict) -> dict: + """Fallback parser that just uses the standard handler.""" + return self.handle_default_wazuh(alert_record) + + def _get_param_request(self,url,params): + try: + if params is None: + response = requests.request("GET",url,headers=self.headers,auth=(self.user, self.password), verify=False, timeout=10) + else: + response = requests.request("GET",url,headers=self.headers,params=params,auth=(self.user, self.password), verify=False, timeout=10) + if not response.ok: + logger.critical("Error doing Get Request",extra={"api_response":response.text,"status_code":response.status_code,"params":params,"url":url}) + return {} + else: + return response.json() + + except Exception as error: + logger.critical(f"Get Call Not sent {params} - error:{error}") + return {} + + def __get_json_payload_request(self,url, payload): + try: + if payload is None: + response = requests.request("GET",url,headers=self.headers,auth=(self.user, self.password), verify=False, timeout=10) + else: + response = requests.request("GET",url,headers=self.headers,json=payload,auth=(self.user, self.password), verify=False, timeout=10) + if not response.ok: + logger.critical("Error doing Get Request",extra={"api_response":response.text,"status_code":response.status_code,"payload":payload,"url":url}) + return {} + else: + return response.json() + + except Exception as error: + logger.critical(f"Get Call Not sent {url} - error:{error}",extra={"payload":payload}) + return {} + + def _post_json_request(self, url, payload,use_admin_creds=False): + try: + if use_admin_creds: + response = requests.post(url, json=payload, headers=self.headers, auth=(self.admin_username, self.admin_password), verify=False, timeout=10) + else: + response = requests.post(url, json=payload, headers=self.headers, auth=(self.user, self.password), verify=False, timeout=10) + + if not response.ok: + logger.critical("Error posting Request",extra={"api_response":response.text,"status_code":response.status_code,"payload":payload,"url":url}) + return {} + + else: + return response.json() + + except Exception as error: + logger.critical(f"Payload not delivered {payload} - error:{error}",extra={"payload":payload,"url":url}) + return {} + + def __post_json_request_w_token(self, url, payload): + try: + auth_token = self.get_token() + if auth_token is None: + return {} + + auth_headers = self.headers + auth_headers.update({"Authorization":f"Bearer {auth_token}"}) + + response = requests.post(url, json=payload, headers=auth_headers, verify=False, timeout=10) + + if not response.ok: + logger.critical("Error posting Request",extra={"api_response":response.text,"status_code":response.status_code,"payload":payload,"url":url}) + return {} + + else: + return response.json() + + except Exception as error: + logger.critical(f"Payload not delivered {payload} - error:{error}",extra={"payload":payload,"url":url}) + return {} + + def __get_json_request_w_token(self, url, params:dict): + try: + auth_token = self.get_token() + if auth_token is None: + return {} + + auth_headers = self.headers + auth_headers.update({"Authorization":f"Bearer {auth_token}"}) + + response = requests.get(url, params=params, headers=auth_headers, verify=False, timeout=10) + + if not response.ok: + logger.critical("Error posting Request",extra={"api_response":response.text,"status_code":response.status_code,"params":params,"url":url}) + return {} + + else: + return response.json() + + except Exception as error: + logger.critical(f"Unable to get URL {url} - error:{error}",extra={"params":params,"url":url}) + return {}