Files
SneakyMon/app/utils/common_utils.py
2025-11-06 03:25:44 -06:00

255 lines
8.0 KiB
Python

import re
import os
import sys
import csv
import json
import base64
import logging
import zipfile
import functools
from pathlib import Path
from typing import List
logger = logging.getLogger(__file__)
try:
import requests
import yaml
except ModuleNotFoundError:
msg = (
"common_utils.py - Required modules are not installed. "
"Can not continue with module / application loading.\n"
"Install it with: pip install -r requirements"
)
print(msg, file=sys.stderr)
logger.error(msg)
exit()
# ---------- SINGLETON DECORATOR ----------
T = type("T", (), {})
def singleton_loader(func):
"""Decorator to ensure a singleton instance."""
cache = {}
@functools.wraps(func)
def wrapper(*args, **kwargs):
if func.__name__ not in cache:
cache[func.__name__] = func(*args, **kwargs)
return cache[func.__name__]
return wrapper
# ---------- UTILITY CLASSES ----------
class FileUtils:
"""File and directory utilities."""
@staticmethod
def ensure_directory(path):
"""Create the directory if it doesn't exist."""
dir_path = Path(path)
if not dir_path.exists():
dir_path.mkdir(parents=True, exist_ok=True)
logger.info(f"Created directory: {dir_path}")
return True
return False
@staticmethod
def create_dir_if_not_exist(dir_to_create):
return FileUtils.ensure_directory(dir_to_create)
@staticmethod
def list_files_with_ext(directory="/tmp", ext="docx"):
"""List all files in a directory with a specific extension."""
return [f for f in os.listdir(directory) if f.endswith(ext)]
@staticmethod
def list_files_in_dir(directory="/tmp"):
"""List all files in a directory with a specific extension."""
return [f for f in os.listdir(directory)]
def list_files_in_dir_w_subs(directory:str) -> List[str]:
"""
Recursively list all files in the given directory and its subdirectories.
Args:
directory (str): The path to the directory to search.
Returns:
List[str]: A list of full file paths.
"""
files = []
for root, _, filenames in os.walk(directory):
for filename in filenames:
files.append(os.path.join(root, filename))
return files
@staticmethod
def download_file(url, dest_path):
"""Download a file from a URL to a local path."""
response = requests.get(url, stream=True)
response.raise_for_status()
with open(dest_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
logger.info(f"File Downloaded to: {dest_path} from {url}")
@staticmethod
def unzip_file(zip_path, extract_to="."):
"""Unzip a file to the given directory."""
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_to)
logger.info(f"{zip_path} Extracted to: {extract_to}")
@staticmethod
def verify_file_exist(filepath: Path, exit_if_false=False):
"""Verify a file exists."""
if not filepath.exists():
if exit_if_false:
sys.stderr.write(f"[FATAL] File not found: {filepath}\n")
sys.exit(1)
return False
return True
@staticmethod
def read_yaml_file(full_file_path: Path):
"""Read a YAML file safely."""
if not FileUtils.verify_file_exist(full_file_path):
logger.error(f"Unable to read yaml - {full_file_path} does not exist")
return {}
try:
with open(full_file_path, 'r') as yfile:
return yaml.safe_load(yfile)
except Exception as e:
logger.error(f"Unable to read yaml due to: {e}")
return {}
@staticmethod
def delete_list_of_files(files_to_delete: list):
"""Delete multiple files safely."""
for file_path in files_to_delete:
try:
os.remove(file_path)
logger.info(f"Deleted {file_path}")
except FileNotFoundError:
logger.warning(f"File not found: {file_path}")
except PermissionError:
logger.warning(f"Permission denied: {file_path}")
except Exception as e:
logger.error(f"Error deleting {file_path}: {e}")
class TextUtils:
"""Text parsing and string utilities."""
@staticmethod
def extract_strings(data: bytes, min_length: int = 4):
"""Extract ASCII and UTF-16LE strings from binary data."""
ascii_re = re.compile(rb"[ -~]{%d,}" % min_length)
ascii_strings = [match.decode("ascii", errors="ignore") for match in ascii_re.findall(data)]
wide_re = re.compile(rb"(?:[ -~]\x00){%d,}" % min_length)
wide_strings = [match.decode("utf-16le", errors="ignore") for match in wide_re.findall(data)]
return ascii_strings + wide_strings
@staticmethod
def defang_url(url: str) -> str:
"""Defang a URL to prevent it from being clickable."""
return url.replace('.', '[.]').replace(':', '[:]')
@staticmethod
def load_dirty_json(json_text: str):
"""Load JSON, return None on error."""
try:
return json.loads(json_text)
except Exception as e:
logger.warning(f"Failed to parse JSON: {e}")
return None
@staticmethod
def encode_base64(text: str) -> str:
"""
Encode a string using Base64 and return the encoded result as a string.
Args:
text (str): The input text to encode.
Returns:
str: The Base64-encoded string.
"""
encoded_bytes = base64.b64encode(text.encode("utf-8"))
return encoded_bytes.decode("utf-8")
@staticmethod
def decode_base64(encoded_text: str) -> str:
"""
Decode a Base64-encoded string and return the original text.
Args:
encoded_text (str): The Base64-encoded string.
Returns:
str: The decoded plain text.
"""
decoded_bytes = base64.b64decode(encoded_text.encode("utf-8"))
return decoded_bytes.decode("utf-8")
class DataUtils:
"""Data manipulation utilities (CSV, dict lists)."""
@staticmethod
def sort_dict_list(dict_list, key):
"""Sort a list of dictionaries by a given key."""
return sorted(dict_list, key=lambda x: x[key])
@staticmethod
def write_to_csv(data, headers, filename):
"""
Write a list of dictionaries to a CSV file with specified headers.
Nested dicts/lists are flattened for CSV output.
"""
if not data:
logger.warning("No data provided to write to CSV")
return
with open(filename, mode='w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow(headers)
key_mapping = list(data[0].keys())
for item in data:
row = []
for key in key_mapping:
item_value = item.get(key, "")
if isinstance(item_value, list):
entry = ", ".join(str(v) for v in item_value)
elif isinstance(item_value, dict):
entry = json.dumps(item_value)
else:
entry = str(item_value)
row.append(entry)
writer.writerow(row)
# ---------- SINGLETON FACTORY ----------
@singleton_loader
def get_common_utils():
"""
Returns the singleton instance for common utilities.
Usage:
utils = get_common_utils()
utils.FileUtils.ensure_directory("/tmp/data")
utils.TextUtils.defang_url("http://example.com")
"""
# Aggregate all utility classes into one instance
class _CommonUtils:
FileUtils = FileUtils
TextUtils = TextUtils
DataUtils = DataUtils
return _CommonUtils()