first commit

This commit is contained in:
2025-11-10 11:37:13 -06:00
commit 60e169c99f
3 changed files with 894 additions and 0 deletions

268
README.md Normal file
View File

@@ -0,0 +1,268 @@
# Wazuh API Helper (Python)
A lightweight helper around the Wazuh/OpenSearch APIs with a tiny DSL for building queries. It includes:
* `wazuh_dsl_query` build OpenSearch-style bool/range/terms queries programmatically
* `wazuh_api` convenience wrapper for common Wazuh operations (search, SQL, counts, snapshots, MITRE rollups, vuln reporting, etc.)
> Note: Some hooks in the code reference proprietary utilities (e.g., Jira helpers) that are not included. Those paths are noted in comments and are safe to remove or replace.
---
## Features at a Glance
* Simple DSL to compose filters, ranges, exists checks, wildcard/multi-match, aggregations
* Search helpers for `wazuh-alerts-*` indices
* OpenSearch SQL endpoint helper
* Aggregation parsers (agents, MITRE tactics/techniques, actionable alert groupings)
* Vulnerability detector report helpers (with `jq`-based shaping)
* Token-based calls to the Wazuh Security API (overview endpoints)
* Snapshot existence/creation helpers (example uses `AWS-S3` repo)
* Alert retrieval by ID and recent alerts by level/time window
---
## Requirements
* Python 3.10+ recommended
* System dependency for `jq` Python package:
* Linux: `sudo apt-get install -y libjq1` (Debian/Ubuntu) or `sudo yum install jq` (RHEL/CentOS)
* macOS: `brew install jq`
* Python packages (see `requirements.txt` below)
### `requirements.txt`
```txt
requests>=2.31.0
jq>=1.6.0
```
---
## Installation
```bash
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```
If you see `Unable to load external libraries`, ensure the virtualenv is active and `libjq/jq` is installed.
---
## Quick Start
```python
from wazuh_api import wazuh_api, wazuh_dsl_query # adjust if your filename/module is different
api = wazuh_api(
indexer_url="https://wazuh-indexer.example.com:9200",
analyst_url="https://wazuh.example.com",
admin_api_url="https://wazuh.example.com:55000",
user="wazuh_reader",
password="******",
admin_username="wazuh_admin",
admin_password="******",
)
# Basic health check
assert api.settings_valid(), "Wazuh API settings are not valid"
# Build a DSL query for alerts between two ISO timestamps
q = wazuh_dsl_query("2025-01-01T00:00:00.000Z", "2025-01-01T23:59:59.999Z")
q.set_size(10)
q.add_filter_range("rule.level", 12, 16) # actionable levels
q.add_filter_field_must_exist("rule.level")
query = q.get_query(exclude_size=False)
hits = api.do_wazuh_search(query) # returns hits list
print(f"Found {len(hits)} hits")
```
---
## Common Recipes
### 1) Get Alerts in the Last N Minutes
```python
recent = api.get_alerts(
index="wazuh-alerts-*",
min_alert_level=12,
max_alert_level=16,
max_alerts=200,
minutes_back=60,
)
```
### 2) Count Alerts in a Time Range
```python
count_resp = api.get_alerts_count_for_time_range(
"2025-01-01T00:00:00.000Z",
"2025-01-01T12:00:00.000Z",
)
print(count_resp) # OpenSearch _count response
```
### 3) Search Logs Related to a Specific Alert (by field/value)
```python
related = api.search_logs_related_to_alert(
timestamp="2025-01-01T05:15:30.000Z",
minutes_around=2,
filter_label="rule.id",
filter_value="123456",
)
```
### 4) MITRE TTP Rollups (Aggregations)
```python
ttp = api.get_mitre_attack_data(
"2025-01-01T00:00:00.000Z",
"2025-01-02T00:00:00.000Z",
)
# Returns dict with buckets for 'mitre_tactics' and 'mitre_tech'
```
### 5) Top Agents with Alerts
```python
agents = api.get_top_agents_with_alerts(
"2025-01-01T00:00:00.000Z",
"2025-01-02T00:00:00.000Z",
)
```
### 6) Vulnerability Detector Report (example)
```python
vuln_rows = api.get_vuln_report_data_v2(
start_date_iso="2025-01-01T00:00:00.000Z",
end_date_iso="2025-01-31T23:59:59.999Z",
limit=True, # only sample a few agents
)
# List of dicts like "Machine Name", "IP", "Application", "CVE ID", etc.
```
### 7) OpenSearch SQL
```python
resp = api.do_wazuh_sql_query("SELECT agent.name FROM wazuh-alerts-* LIMIT 5")
print(resp)
```
### 8) Alert by Document ID
```python
alert = api.get_alert_by_id(alert_id="Aw123...docid...", date="2025-02-11")
```
---
## Class Overview
### `wazuh_dsl_query`
* `set_size(n)` set result size
* `add_filter_range(field, gte, lte, timestamp=False)` range filter; set `timestamp=True` to include OpenSearch date format
* `add_filter_exact_match(field, value)` exact match via `match_phrase`
* `add_filter_value_in_field(field, values)` OR of several exact matches
* `add_filter_field_must_exist(field)` `exists` filter
* `add_filter_wildcard_match(value)` `multi_match` (lenient) text search
* `add_filter_exclude(field, value)` append `must_not` for a value
* `add_agg_count(name, field)` value_count aggregation
* `add_agg_count_group(name, field, max_size=20)` terms aggregation
* `get_query(exclude_size=True)` build final query dict
### `wazuh_api`
Core settings:
* `indexer_url` OpenSearch/Indexer base (e.g., `https://host:9200`)
* `analyst_url` Wazuh web (used in some orgs; not strictly required in helper methods)
* `admin_api_url` Wazuh API (e.g., `https://host:55000`)
* `user/password` basic auth for indexer
* `admin_username/admin_password` admin creds for Wazuh Security API token calls
Key methods:
* Search & SQL: `do_wazuh_search`, `do_wazuh_sql_query`, `get_count_for_wazuh_query`
* Snapshots: `check_snapshot_exits(date)`, `do_snapshot(date)`
* Helpers: `get_time_around_wazuh_alert_time`, `search_logs_related_to_alert`
* Reporting/Aggs: `get_top_agents_with_alerts`, `get_mitre_attack_data`, `get_data_for_actionable_alerts`
* Security API: `get_token`, `get_agent_overview`
* Alert utils: `get_alert_by_id`, `get_alerts`
* Vuln parsing: `get_vuln_report_data_v2`, `format_cve_list`, `__parse_vuln_record` (uses `jq`)
---
## Configuration Tips
* The code initializes a basic logger:
```python
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
```
Replace with your own logging setup in production.
* TLS verification is disabled (`verify=False`) in requests for convenience. **Strongly** consider enabling verification and providing a CA bundle in production.
* The snapshot helpers assume a repository named `AWS-S3`. Snapshot repo name is configurable via snapshot_repo in the constructor (default: AWS-S3)..
* Integration-specific parsers can be added by defining methods named parse_{integration}_integration (Wazuhs data.integration with - replaced by _). A fallback parse_default_integration is provided.
---
## Security Notes
* Avoid committing real credentials. Use environment variables or a secrets manager.
* Consider rotating to token-based auth for indexer access as supported by your deployment.
* Re-enable certificate verification and pin CA where possible.
---
## Error Handling
* Methods return `{}`, `[]`, or `False` on errors and log details with `logger.critical`/`logger.error`.
* Wrap calls in your code to handle these cases gracefully.
---
## Known Gaps / TODO
* Expand unit tests and add type hints where appropriate for stricter usage.
---
## Project Structure (suggested)
```
your-repo/
├─ wazuh_api.py # this module (rename as needed)
├─ requirements.txt
├─ examples/
│ └─ quickstart.py
└─ README.md
```
---
## License
MIT (suggested). Replace with your preferred license.
---
## Changelog
* **v0.1.0** Initial public extraction: search/SQL helpers, DSL, aggregations, vuln report shaping, snapshot helpers.
---
## Support
Open an issue or PR on this repo. If you extend parsers for additional Wazuh integrations, consider contributing back minimal, non-proprietary versions.

6
requirements.txt Normal file
View File

@@ -0,0 +1,6 @@
certifi==2025.10.5
charset-normalizer==3.4.4
idna==3.11
jq==1.10.0
requests==2.32.5
urllib3==2.5.0

620
wazuh_api.py Normal file
View File

@@ -0,0 +1,620 @@
import os
import json
import logging
from datetime import datetime, timedelta
# Note - several things in this file are tied to Jira or other utils that are not included in this file due to proprietary info in those files.
# Configure logging once, this is just a basic logger - implement your own and adjust as needed.
logging.basicConfig(
level=logging.INFO, # or DEBUG for development
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__file__)
# External Imports
try:
import jq
import requests
from requests.auth import HTTPBasicAuth
from requests.packages.urllib3.exceptions import InsecureRequestWarning # type: ignore
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
except ImportError as e:
print(f"Unable to load external libraries - {e}")
print("Run pip install -r requirements.txt")
exit(1)
class wazuh_dsl_query:
def __init__(self, query_start_time_iso:str, query_end_time_iso:str):
self.size = 1
self.aggs = {}
self.filters = [
{"match_all": {}}
]
self.excludes = []
self.add_filter_range("timestamp",query_start_time_iso,query_end_time_iso)
pass
def set_size(self,new_size:int):
self.size = new_size
def add_agg_count(self, agg_name:str, agg_field_name:str):
agg = {
agg_name:{
"value_count":{
"field":agg_field_name
}
}
}
self.aggs.update(agg)
def add_agg_count_group(self, agg_name:str, agg_field_name:str, max_size=20):
agg = {
agg_name:{
"terms":{
"field":agg_field_name,
"size": max_size
}
}
}
self.aggs.update(agg)
def add_filter_range(self, field_name:str, greater_than, less_than, timestamp=False):
if timestamp:
self.filters.append({"range": {
field_name: {
"gte": greater_than,
"lte": less_than,
"format": "strict_date_optional_time"
}
}})
else:
self.filters.append({"range": {
field_name: {
"gte": greater_than,
"lte": less_than,
}
}})
def add_filter_exact_match(self,field_name:str, field_value:str):
self.filters.append({
"match_phrase": {
field_name: field_value
}
})
def add_filter_value_in_field(self,field_name:str,field_values_list:list):
should_list = []
for field_value in field_values_list:
entry = {"match_phrase": {field_name: field_value}}
should_list.append(entry)
self.filters.append({
"bool": {
"should":should_list,
"minimum_should_match": 1
}})
def add_filter_field_must_exist(self,field_name:str):
self.filters.append({
"exists": {
"field": field_name
}
})
def add_filter_wildcard_match(self,value:str):
self.filters.append({
"multi_match": {
"type": "best_fields",
"query": value,
"lenient": True
}})
def add_filter_exclude(self,field_name:str,value:str):
e = {
"match_phrase": {
field_name: value
}
}
self.excludes.append(e)
def get_query(self,exclude_size=True):
query = {}
query.update({"filter":self.filters})
if len(self.excludes) > 0:
query.update({"must_not": self.excludes})
r = {
"query": {"bool":query}
}
if not exclude_size:
r.update({"size":self.size})
# if we have aggrigates
if len(self.aggs) > 0:
r.update({"aggs": self.aggs})
# if we have excludes
return r
class wazuh_api:
ALERT_LOWER=12
ALERT_HIGHEST=16
def __init__(self, indexer_url:str, analyst_url:str, admin_api_url:str, user:str, password:str, admin_username:str, admin_password:str,snapshot_repo: str = "AWS-S3"):
self.base_url = indexer_url
self.security_url = admin_api_url
self.analyst_url = analyst_url
self.user = user
self.password = password
self.admin_username = admin_username
self.admin_password = admin_password
self.snapshot_repo = snapshot_repo
self.headers = {
"Accept": "application/json",
"Content-Type": "application/json"
}
pass
def settings_valid(self):
if (self.base_url is None) or (self.user is None) or (self.password is None):
logger.critical("API settings are not valid. Unable to run Ingestion. Please check config.")
return False
else:
return True
def get_time_around_wazuh_alert_time(self, wazuh_alert_time:str, minutes_around=2):
# Convert the date string to a datetime object
original_date = datetime.strptime(wazuh_alert_time, '%Y-%m-%dT%H:%M:%S.%fZ')
original_date = original_date.replace(second=0, microsecond=0)
# Calculate the dates x minutes before and after the original date
date_before = original_date - timedelta(minutes=minutes_around)
date_after = original_date + timedelta(minutes=minutes_around)
# Return the results as strings in the same format as the input
return date_before.strftime('%Y-%m-%dT%H:%M:%S.%fZ'), date_after.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
def search_logs_related_to_alert(self, timestamp:str,minutes_around:int, filter_label:str, filter_value:str):
start,end = self.get_time_around_wazuh_alert_time(timestamp,minutes_around)
q = wazuh_dsl_query(start,end)
q.set_size(50)
q.add_filter_exact_match(filter_label,filter_value)
query = q.get_query(exclude_size=False)
return self.do_wazuh_search(query)
def do_wazuh_search(self, query:dict, index="wazuh-alerts-*",hits_only=True):
try:
search_url = f"{self.base_url}/{index}/_search"
logger.info(f"Searching Wazuh - {search_url}",extra={"query":query})
resp = self.__get_json_payload_request(search_url,query)
hits = resp.get("hits",{}).get("hits",[])
if hits_only:
return hits
else:
return resp
except Exception as e:
logger.error(f"Error while searching Wazuh! - {e}", extra={"query": query, "url":search_url})
return []
def do_wazuh_sql_query(self,query:str):
logger.info(f"Executing SQL Search in Wazuh",extra={"query":query})
try:
sql_url = f"{self.base_url}/_plugins/_sql?format=json"
payload = {"query":query}
resp = self._post_json_request(sql_url,payload)
return resp
except Exception as e:
logger.error(f"Error while doing SQL Post to Wazuh! - {e}", extra={"query": query, "url":sql_url})
return []
def check_snapshot_exists(self, date:str):
url = f"{self.base_url}/_snapshot/{self.snapshot_repo}/{date}"
try:
resp = self._get_param_request(url,None)
snapshots = resp.get("snapshots",[])
if len(snapshots) > 0:
state = snapshots[0].get("state")
return state
return state
except Exception as e:
return False
def do_snapshot(self, date:str):
url = f"{self.base_url}/_snapshot/AWS-S3/{date}"
payload = {
"indices": "wazuh-alerts-*"
}
try:
resp = self._post_json_request(url,payload)
return True
except Exception as e:
return False
def get_count_for_wazuh_query(self,query:dict, index="wazuh-alerts-*"):
try:
search_url = f"{self.base_url}/{index}/_count"
logger.info(f"Getting Count From Wazuah - {search_url}",extra={"query":query})
resp = self.__get_json_payload_request(search_url,query)
return resp
except Exception as e:
logger.error(f"Error while searching Wazuh! - {e}", extra={"query": query, "url":search_url})
return []
def get_token(self):
url = f"{self.security_url}/security/user/authenticate"
payload = None
resp = self._post_json_request(url,payload,True)
token = resp.get("data",{}).get("token")
if token is not None:
logger.info("Successfully generated auth token")
else:
logger.critical(f"Unable to get token for request to url",extra={"url":url,"payload":payload})
return token
def get_agent_overview(self):
url = f"{self.security_url}/overview/agents"
parms = {}
resp = self.__get_json_request_w_token(url,parms)
data = resp.get("data")
return data
def get_alerts_count_for_time_range(self,start_date_iso:str,end_date_iso:str):
q = wazuh_dsl_query(start_date_iso,end_date_iso)
query = q.get_query(exclude_size=True)
return self.get_count_for_wazuh_query(query)
def parse_aggs_return(self,aggs:dict) -> dict:
buckets = {}
if not isinstance(aggs,dict):
return {}
for group_name,dict_values in aggs.items():
items = []
for entry in dict_values.get("buckets",[]):
items.append({entry.get("key"):entry.get("doc_count")})
buckets.update({group_name:items})
return buckets
def format_cve_list(self, entry):
# Step 1: Extract the list of CVEs from the data
buckets = entry.get('data.vulnerability.cve', {}).get('buckets', [])
cve_items = []
for bucket in buckets:
key = bucket.get('key')
if key:
cve_items.append(key)
# Step 2: Format the list into a string that looks like a tuple
if cve_items:
quoted_items = ['"{}"'.format(cve) for cve in cve_items]
cve_string = ', '.join(quoted_items)
return f'({cve_string})'
else:
return ''
def get_vuln_report_data_v2(self,start_date_iso:str, end_date_iso:str, limit=False):
query = f"""SELECT agent.name FROM wazuh-alerts-* WHERE timestamp >= '{start_date_iso}'
AND timestamp <= '{end_date_iso}' AND location = 'vulnerability-detector' AND data.vulnerability.status = 'Active'
GROUP BY agent.name, data.vulnerability.cve HAVING SUM(CASE WHEN data.vulnerability.status = 'Solved'
THEN 1 ELSE 0 END) = 0"""
resp = self.do_wazuh_sql_query(query)
agent_groups = resp.get("aggregations",{}).get("agent.name",{}).get("buckets",{})
agents_with_vulns = []
if len(agent_groups) > 0:
x=0
for entry in agent_groups:
x = x+1
if ( limit and (x >=6) ):
return agents_with_vulns
agent_name = entry['key']
cve_list = self.format_cve_list(entry)
query = f"""SELECT d.agent.name, d.agent.ip, d.data.vulnerability.package.name, d.data.vulnerability.package.version,
d.data.vulnerability.severity, d.data.vulnerability.cve, d.data.vulnerability.cvss.cvss3.base_score, d.data.vulnerability.rationale,
d.data.vulnerability.package.condition, d.data.vulnerability.references, d.data.vulnerability.updated
FROM wazuh-alerts-* d
WHERE agent.name = '{agent_name}' AND timestamp >= '{start_date_iso}' AND timestamp <= '{end_date_iso}' AND data.vulnerability.cve IN {cve_list}"""
resp = self.do_wazuh_sql_query(query)
data = resp.get("hits",{}).get("hits",{})
parsed_data = self.__parse_vuln_record(data)
agents_with_vulns.extend(parsed_data)
return agents_with_vulns
def __parse_vuln_record(self,vuln_record:list):
jq_script = """
.[] | {
"Machine Name": ._source.agent.name,
"IP": ._source.agent.ip,
"Application": ._source.data.vulnerability.package.name,
"App Version": ._source.data.vulnerability.package.version,
"Severity": ._source.data.vulnerability.severity,
"CVE ID": ._source.data.vulnerability.cve,
"CVE Score": ._source.data.vulnerability.cvss.cvss3.base_score,
"CVE Summary": ._source.data.vulnerability.rationale,
"CVE Condition": ._source.data.vulnerability.package.condition,
"CVE References": (._source.data.vulnerability.references // []) | join(", "),
"CVE Updated Date": ._source.data.vulnerability.updated
}
"""
parsed_doc = jq.compile(jq_script).input_value(vuln_record).all()
return parsed_doc
def get_top_agents_with_alerts(self,start_date_iso:str,end_date_iso:str):
q = wazuh_dsl_query(start_date_iso,end_date_iso)
q.set_size(0)
q.add_filter_range("rule.level",self.ALERT_LOWER,self.ALERT_HIGHEST)
q.add_filter_field_must_exist("rule.level")
q.add_filter_exclude("agent.id","000")
q.add_agg_count_group("agents","agent.name",10)
query = q.get_query(exclude_size=False)
# do a search (we are not interested in hits, and the size was 0 any, so no records were returned)
resp = self.do_wazuh_search(query,hits_only=False)
# get the aggregations and parse them
aggs = resp.get("aggregations",{})
bucket_data = self.parse_aggs_return(aggs)
return bucket_data
def get_mitre_attack_data(self,start_date_iso:str,end_date_iso:str):
# build a query that searches for alerts, grouping by mitre tactic and technique
q = wazuh_dsl_query(start_date_iso,end_date_iso)
q.set_size(0)
q.add_filter_range("rule.level",self.ALERT_LOWER,self.ALERT_HIGHEST)
q.add_filter_field_must_exist("rule.mitre.tactic")
q.add_agg_count_group("mitre_tactics","rule.mitre.tactic",10)
q.add_agg_count_group("mitre_tech","rule.mitre.technique",10)
query = q.get_query(exclude_size=False)
# do a search (we are not interested in hits, and the size was 0 any, so no records were returned)
resp = self.do_wazuh_search(query,hits_only=False)
# get the aggregations and parse them
aggs = resp.get("aggregations",{})
bucket_data = self.parse_aggs_return(aggs)
return bucket_data
def get_alert_by_id(self,alert_id:str,date="2025-02-11"):
query = {
"size": 1,
"query": {
"bool": {
"must": [],
"filter": [
{
"match_all": {}
},
{
"match_phrase": {
"_id": f"{alert_id}"
}
},
{
"range": {
"timestamp": {
"gte": f"{date}T00:00:00.000Z",
"lte": f"{date}T23:23:59.999Z",
"format": "strict_date_optional_time"
}
}
}
],
"should": [],
"must_not": []
}
}
}
alert = self.do_wazuh_search(query)
return alert
def get_alerts(self, index="wazuh-alerts-*", min_alert_level:int=12, max_alert_level:int=16,max_alerts:int=1000,minutes_back:int=60):
query = {
"size": max_alerts,
"query": {
"bool": {
"must": [],
"filter": [
{
"bool": {
"should": [
{
"range": {
"rule.level": {
"gte": min_alert_level,
"lte": max_alert_level
}
}
}
],
"minimum_should_match": 1
}
},
{
"range": {
"timestamp": {
"gte": f"now-{minutes_back}m",
"lte": "now",
"format": "strict_date_optional_time"
}
}
}
],
"should": [],
"must_not": []
}
}
}
return self.do_wazuh_search(query,index)
def get_integration_type(self, record:dict):
record_integration = record.get("_source",{}).get("data",{}).get("integration").replace("-","_")
return record_integration
def handle_default_wazuh(self, alert_record:dict) -> dict:
# print(json.dumps(alert_record,indent=True))
# exit()
jq_script = """
{
"timestamp":._source.timestamp,
"human_date": (._source.timestamp | split("T")[0]),
"human_time": (._source.timestamp | split("T")[1] | split(".")[0]),
"priority":._source.rule.level,
"summary":._source.rule.description,
"mitre_tactics":(._source.rule.mitre.tactic // []),
"mitre_techniques":(._source.rule.mitre.technique // []),
"rule_id":._source.rule.id,
"rule_fired_times":(._source.rule.firedtimes // 1),
"integration":(._source.data.integration // "default"),
"source_user":(._source.data.srcuser // ""),
"source_ip":(._source.data.srcip // ""),
}
"""
jq_script = "{"+jq_script+"}"
try:
r = jq.compile(jq_script).input_value(alert_record).first()
return r
except Exception as e:
logger.error(f"Unable to parse native Wazuh alert",extra={"alert":alert_record})
return {}
def handle_json(self,alert_record:dict) -> dict:
integration_type = self.get_integration_type(alert_record)
parser = f"parse_{integration_type}_integration"
try:
if callable(getattr(self,parser)):
parsed_data = getattr(self, parser)(alert_record)
return parsed_data
except AttributeError as e:
logger.error(f"No parser found for integration {integration_type}, create one at {parser}")
return {}
def parse_default_integration(self, alert_record: dict) -> dict:
"""Fallback parser that just uses the standard handler."""
return self.handle_default_wazuh(alert_record)
def _get_param_request(self,url,params):
try:
if params is None:
response = requests.request("GET",url,headers=self.headers,auth=(self.user, self.password), verify=False, timeout=10)
else:
response = requests.request("GET",url,headers=self.headers,params=params,auth=(self.user, self.password), verify=False, timeout=10)
if not response.ok:
logger.critical("Error doing Get Request",extra={"api_response":response.text,"status_code":response.status_code,"params":params,"url":url})
return {}
else:
return response.json()
except Exception as error:
logger.critical(f"Get Call Not sent {params} - error:{error}")
return {}
def __get_json_payload_request(self,url, payload):
try:
if payload is None:
response = requests.request("GET",url,headers=self.headers,auth=(self.user, self.password), verify=False, timeout=10)
else:
response = requests.request("GET",url,headers=self.headers,json=payload,auth=(self.user, self.password), verify=False, timeout=10)
if not response.ok:
logger.critical("Error doing Get Request",extra={"api_response":response.text,"status_code":response.status_code,"payload":payload,"url":url})
return {}
else:
return response.json()
except Exception as error:
logger.critical(f"Get Call Not sent {url} - error:{error}",extra={"payload":payload})
return {}
def _post_json_request(self, url, payload,use_admin_creds=False):
try:
if use_admin_creds:
response = requests.post(url, json=payload, headers=self.headers, auth=(self.admin_username, self.admin_password), verify=False, timeout=10)
else:
response = requests.post(url, json=payload, headers=self.headers, auth=(self.user, self.password), verify=False, timeout=10)
if not response.ok:
logger.critical("Error posting Request",extra={"api_response":response.text,"status_code":response.status_code,"payload":payload,"url":url})
return {}
else:
return response.json()
except Exception as error:
logger.critical(f"Payload not delivered {payload} - error:{error}",extra={"payload":payload,"url":url})
return {}
def __post_json_request_w_token(self, url, payload):
try:
auth_token = self.get_token()
if auth_token is None:
return {}
auth_headers = self.headers
auth_headers.update({"Authorization":f"Bearer {auth_token}"})
response = requests.post(url, json=payload, headers=auth_headers, verify=False, timeout=10)
if not response.ok:
logger.critical("Error posting Request",extra={"api_response":response.text,"status_code":response.status_code,"payload":payload,"url":url})
return {}
else:
return response.json()
except Exception as error:
logger.critical(f"Payload not delivered {payload} - error:{error}",extra={"payload":payload,"url":url})
return {}
def __get_json_request_w_token(self, url, params:dict):
try:
auth_token = self.get_token()
if auth_token is None:
return {}
auth_headers = self.headers
auth_headers.update({"Authorization":f"Bearer {auth_token}"})
response = requests.get(url, params=params, headers=auth_headers, verify=False, timeout=10)
if not response.ok:
logger.critical("Error posting Request",extra={"api_response":response.text,"status_code":response.status_code,"params":params,"url":url})
return {}
else:
return response.json()
except Exception as error:
logger.critical(f"Unable to get URL {url} - error:{error}",extra={"params":params,"url":url})
return {}