214 lines
6.8 KiB
Python
214 lines
6.8 KiB
Python
import re
|
|
import os
|
|
import sys
|
|
import csv
|
|
import json
|
|
import logging
|
|
import zipfile
|
|
import functools
|
|
from pathlib import Path
|
|
from zoneinfo import ZoneInfo, available_timezones
|
|
|
|
logger = logging.getLogger(__file__)
|
|
|
|
try:
|
|
import requests
|
|
import yaml
|
|
except ModuleNotFoundError:
|
|
msg = (
|
|
"Required modules are not installed. "
|
|
"Can not continue with module / application loading.\n"
|
|
"Install it with: pip install -r requirements"
|
|
)
|
|
print(msg, file=sys.stderr)
|
|
logger.error(msg)
|
|
exit()
|
|
|
|
|
|
# ---------- SINGLETON DECORATOR ----------
|
|
T = type("T", (), {})
|
|
def singleton_loader(func):
|
|
"""Decorator to ensure a singleton instance."""
|
|
cache = {}
|
|
|
|
@functools.wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
if func.__name__ not in cache:
|
|
cache[func.__name__] = func(*args, **kwargs)
|
|
return cache[func.__name__]
|
|
|
|
return wrapper
|
|
|
|
|
|
# ---------- UTILITY CLASSES ----------
|
|
class FileUtils:
|
|
"""File and directory utilities."""
|
|
|
|
@staticmethod
|
|
def ensure_directory(path):
|
|
"""Create the directory if it doesn't exist."""
|
|
dir_path = Path(path)
|
|
if not dir_path.exists():
|
|
dir_path.mkdir(parents=True, exist_ok=True)
|
|
logger.info(f"Created directory: {dir_path}")
|
|
return True
|
|
return False
|
|
|
|
@staticmethod
|
|
def create_dir_if_not_exist(dir_to_create):
|
|
return FileUtils.ensure_directory(dir_to_create)
|
|
|
|
@staticmethod
|
|
def list_files_with_ext(directory="/tmp", ext="docx"):
|
|
"""List all files in a directory with a specific extension."""
|
|
return [f for f in os.listdir(directory) if f.endswith(ext)]
|
|
|
|
@staticmethod
|
|
def download_file(url, dest_path):
|
|
"""Download a file from a URL to a local path."""
|
|
response = requests.get(url, stream=True)
|
|
response.raise_for_status()
|
|
with open(dest_path, 'wb') as f:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
f.write(chunk)
|
|
logger.info(f"File Downloaded to: {dest_path} from {url}")
|
|
|
|
@staticmethod
|
|
def unzip_file(zip_path, extract_to="."):
|
|
"""Unzip a file to the given directory."""
|
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
|
zip_ref.extractall(extract_to)
|
|
logger.info(f"{zip_path} Extracted to: {extract_to}")
|
|
|
|
@staticmethod
|
|
def verify_file_exist(filepath: Path, exit_if_false=False):
|
|
"""Verify a file exists."""
|
|
if not filepath.exists():
|
|
if exit_if_false:
|
|
sys.stderr.write(f"[FATAL] File not found: {filepath}\n")
|
|
sys.exit(1)
|
|
return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def read_yaml_file(full_file_path: Path):
|
|
"""Read a YAML file safely."""
|
|
if not FileUtils.verify_file_exist(full_file_path):
|
|
logger.error(f"Unable to read yaml - {full_file_path} does not exist")
|
|
return {}
|
|
try:
|
|
with open(full_file_path, 'r') as yfile:
|
|
return yaml.safe_load(yfile)
|
|
except Exception as e:
|
|
logger.error(f"Unable to read yaml due to: {e}")
|
|
return {}
|
|
|
|
@staticmethod
|
|
def delete_list_of_files(files_to_delete: list):
|
|
"""Delete multiple files safely."""
|
|
for file_path in files_to_delete:
|
|
try:
|
|
os.remove(file_path)
|
|
logger.info(f"Deleted {file_path}")
|
|
except FileNotFoundError:
|
|
logger.warning(f"File not found: {file_path}")
|
|
except PermissionError:
|
|
logger.warning(f"Permission denied: {file_path}")
|
|
except Exception as e:
|
|
logger.error(f"Error deleting {file_path}: {e}")
|
|
|
|
class TextUtils:
|
|
"""Text parsing and string utilities."""
|
|
|
|
@staticmethod
|
|
def extract_strings(data: bytes, min_length: int = 4):
|
|
"""Extract ASCII and UTF-16LE strings from binary data."""
|
|
ascii_re = re.compile(rb"[ -~]{%d,}" % min_length)
|
|
ascii_strings = [match.decode("ascii", errors="ignore") for match in ascii_re.findall(data)]
|
|
|
|
wide_re = re.compile(rb"(?:[ -~]\x00){%d,}" % min_length)
|
|
wide_strings = [match.decode("utf-16le", errors="ignore") for match in wide_re.findall(data)]
|
|
|
|
return ascii_strings + wide_strings
|
|
|
|
@staticmethod
|
|
def defang_url(url: str) -> str:
|
|
"""Defang a URL to prevent it from being clickable."""
|
|
return url.replace('.', '[.]').replace(':', '[:]')
|
|
|
|
@staticmethod
|
|
def load_dirty_json(json_text: str):
|
|
"""Load JSON, return None on error."""
|
|
try:
|
|
return json.loads(json_text)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to parse JSON: {e}")
|
|
return None
|
|
|
|
@staticmethod
|
|
def is_valid_timezone(tz_str: str) -> bool:
|
|
"""
|
|
Check if a timezone string is a valid IANA timezone.
|
|
Example: 'America/Chicago', 'UTC', etc.
|
|
"""
|
|
try:
|
|
ZoneInfo(tz_str)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
class DataUtils:
|
|
"""Data manipulation utilities (CSV, dict lists)."""
|
|
|
|
@staticmethod
|
|
def sort_dict_list(dict_list, key):
|
|
"""Sort a list of dictionaries by a given key."""
|
|
return sorted(dict_list, key=lambda x: x[key])
|
|
|
|
@staticmethod
|
|
def write_to_csv(data, headers, filename):
|
|
"""
|
|
Write a list of dictionaries to a CSV file with specified headers.
|
|
Nested dicts/lists are flattened for CSV output.
|
|
"""
|
|
if not data:
|
|
logger.warning("No data provided to write to CSV")
|
|
return
|
|
|
|
with open(filename, mode='w', newline='', encoding='utf-8') as file:
|
|
writer = csv.writer(file)
|
|
writer.writerow(headers)
|
|
|
|
key_mapping = list(data[0].keys())
|
|
for item in data:
|
|
row = []
|
|
for key in key_mapping:
|
|
item_value = item.get(key, "")
|
|
if isinstance(item_value, list):
|
|
entry = ", ".join(str(v) for v in item_value)
|
|
elif isinstance(item_value, dict):
|
|
entry = json.dumps(item_value)
|
|
else:
|
|
entry = str(item_value)
|
|
row.append(entry)
|
|
writer.writerow(row)
|
|
|
|
|
|
# ---------- SINGLETON FACTORY ----------
|
|
@singleton_loader
|
|
def get_common_utils():
|
|
"""
|
|
Returns the singleton instance for common utilities.
|
|
Usage:
|
|
utils = get_common_utils()
|
|
utils.FileUtils.ensure_directory("/tmp/data")
|
|
utils.TextUtils.defang_url("http://example.com")
|
|
"""
|
|
# Aggregate all utility classes into one instance
|
|
class _CommonUtils:
|
|
FileUtils = FileUtils
|
|
TextUtils = TextUtils
|
|
DataUtils = DataUtils
|
|
|
|
return _CommonUtils()
|