init commit

This commit is contained in:
2025-11-06 02:10:05 -06:00
commit 1a26e71165
15 changed files with 1057 additions and 0 deletions

202
app/utils/common_utils.py Normal file
View File

@@ -0,0 +1,202 @@
import re
import os
import sys
import csv
import json
import logging
import zipfile
import functools
from pathlib import Path
logger = logging.getLogger(__file__)
try:
import requests
import yaml
except ModuleNotFoundError:
msg = (
"Required modules are not installed. "
"Can not continue with module / application loading.\n"
"Install it with: pip install -r requirements"
)
print(msg, file=sys.stderr)
logger.error(msg)
exit()
# ---------- SINGLETON DECORATOR ----------
T = type("T", (), {})
def singleton_loader(func):
"""Decorator to ensure a singleton instance."""
cache = {}
@functools.wraps(func)
def wrapper(*args, **kwargs):
if func.__name__ not in cache:
cache[func.__name__] = func(*args, **kwargs)
return cache[func.__name__]
return wrapper
# ---------- UTILITY CLASSES ----------
class FileUtils:
"""File and directory utilities."""
@staticmethod
def ensure_directory(path):
"""Create the directory if it doesn't exist."""
dir_path = Path(path)
if not dir_path.exists():
dir_path.mkdir(parents=True, exist_ok=True)
logger.info(f"Created directory: {dir_path}")
return True
return False
@staticmethod
def create_dir_if_not_exist(dir_to_create):
return FileUtils.ensure_directory(dir_to_create)
@staticmethod
def list_files_with_ext(directory="/tmp", ext="docx"):
"""List all files in a directory with a specific extension."""
return [f for f in os.listdir(directory) if f.endswith(ext)]
@staticmethod
def download_file(url, dest_path):
"""Download a file from a URL to a local path."""
response = requests.get(url, stream=True)
response.raise_for_status()
with open(dest_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
logger.info(f"File Downloaded to: {dest_path} from {url}")
@staticmethod
def unzip_file(zip_path, extract_to="."):
"""Unzip a file to the given directory."""
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_to)
logger.info(f"{zip_path} Extracted to: {extract_to}")
@staticmethod
def verify_file_exist(filepath: Path, exit_if_false=False):
"""Verify a file exists."""
if not filepath.exists():
if exit_if_false:
sys.stderr.write(f"[FATAL] File not found: {filepath}\n")
sys.exit(1)
return False
return True
@staticmethod
def read_yaml_file(full_file_path: Path):
"""Read a YAML file safely."""
if not FileUtils.verify_file_exist(full_file_path):
logger.error(f"Unable to read yaml - {full_file_path} does not exist")
return {}
try:
with open(full_file_path, 'r') as yfile:
return yaml.safe_load(yfile)
except Exception as e:
logger.error(f"Unable to read yaml due to: {e}")
return {}
@staticmethod
def delete_list_of_files(files_to_delete: list):
"""Delete multiple files safely."""
for file_path in files_to_delete:
try:
os.remove(file_path)
logger.info(f"Deleted {file_path}")
except FileNotFoundError:
logger.warning(f"File not found: {file_path}")
except PermissionError:
logger.warning(f"Permission denied: {file_path}")
except Exception as e:
logger.error(f"Error deleting {file_path}: {e}")
class TextUtils:
"""Text parsing and string utilities."""
@staticmethod
def extract_strings(data: bytes, min_length: int = 4):
"""Extract ASCII and UTF-16LE strings from binary data."""
ascii_re = re.compile(rb"[ -~]{%d,}" % min_length)
ascii_strings = [match.decode("ascii", errors="ignore") for match in ascii_re.findall(data)]
wide_re = re.compile(rb"(?:[ -~]\x00){%d,}" % min_length)
wide_strings = [match.decode("utf-16le", errors="ignore") for match in wide_re.findall(data)]
return ascii_strings + wide_strings
@staticmethod
def defang_url(url: str) -> str:
"""Defang a URL to prevent it from being clickable."""
return url.replace('.', '[.]').replace(':', '[:]')
@staticmethod
def load_dirty_json(json_text: str):
"""Load JSON, return None on error."""
try:
return json.loads(json_text)
except Exception as e:
logger.warning(f"Failed to parse JSON: {e}")
return None
class DataUtils:
"""Data manipulation utilities (CSV, dict lists)."""
@staticmethod
def sort_dict_list(dict_list, key):
"""Sort a list of dictionaries by a given key."""
return sorted(dict_list, key=lambda x: x[key])
@staticmethod
def write_to_csv(data, headers, filename):
"""
Write a list of dictionaries to a CSV file with specified headers.
Nested dicts/lists are flattened for CSV output.
"""
if not data:
logger.warning("No data provided to write to CSV")
return
with open(filename, mode='w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow(headers)
key_mapping = list(data[0].keys())
for item in data:
row = []
for key in key_mapping:
item_value = item.get(key, "")
if isinstance(item_value, list):
entry = ", ".join(str(v) for v in item_value)
elif isinstance(item_value, dict):
entry = json.dumps(item_value)
else:
entry = str(item_value)
row.append(entry)
writer.writerow(row)
# ---------- SINGLETON FACTORY ----------
@singleton_loader
def get_common_utils():
"""
Returns the singleton instance for common utilities.
Usage:
utils = get_common_utils()
utils.FileUtils.ensure_directory("/tmp/data")
utils.TextUtils.defang_url("http://example.com")
"""
# Aggregate all utility classes into one instance
class _CommonUtils:
FileUtils = FileUtils
TextUtils = TextUtils
DataUtils = DataUtils
return _CommonUtils()