removing flask_login, fixed many appwriter issues with custom class

This commit is contained in:
2025-10-30 21:20:42 -05:00
parent 8405edd191
commit 695efdd193
9 changed files with 291 additions and 245 deletions

View File

@@ -1,84 +1,123 @@
# app/services/appwrite_client.py
from __future__ import annotations
import os
from typing import Optional, Dict, Any, Mapping, Union
from typing import Optional, Dict, Any, Mapping, Union, List
from flask import current_app, has_request_context, request, session
from flask import session, redirect, url_for
from appwrite.client import Client
from appwrite.services.account import Account
from appwrite.id import ID
ENDPOINT = os.getenv("APPWRITE_ENDPOINT")
PROJECT_ID = os.getenv("APPWRITE_PROJECT_ID")
API_KEY = os.getenv("APPWRITE_API_KEY")
class AppwriteAccountClient:
def __init__(self, cookies: Optional[Union[str, Mapping[str, str]]] = None, use_admin: bool = False) -> None:
endpoint = current_app.config.get("APPWRITE_ENDPOINT") or os.getenv("APPWRITE_ENDPOINT")
project_id = current_app.config.get("APPWRITE_PROJECT_ID") or os.getenv("APPWRITE_PROJECT_ID")
api_key = current_app.config.get("APPWRITE_API_KEY") or os.getenv("APPWRITE_API_KEY")
if not endpoint or not project_id:
raise RuntimeError("APPWRITE_ENDPOINT and APPWRITE_PROJECT_ID must be configured")
# SESSION USER OBJECT DICT NOTES
# {
# "$id": "6902663c000efa514a81",
# "$createdAt": "2025-10-29T19:08:44.483+00:00",
# "$updatedAt": "2025-10-31T00:28:26.422+00:00",
# "name": "Test Account",
# "registration": "2025-10-29T19:08:44.482+00:00",
# "status": true,
# "labels": [],
# "passwordUpdate": "2025-10-29T19:08:44.482+00:00",
# "email": "ptarrant@gmail.com",
# "phone": "",
# "emailVerification": false,
# "phoneVerification": false,
# "mfa": false,
# "prefs": {},
# "targets": [
# {
# "$id": "6902663c81f9f1a63f4c",
# "$createdAt": "2025-10-29T19:08:44.532+00:00",
# "$updatedAt": "2025-10-29T19:08:44.532+00:00",
# "name": "",
# "userId": "6902663c000efa514a81",
# "providerId": null,
# "providerType": "email",
# "identifier": "ptarrant@gmail.com",
# "expired": false
# }
# ],
# "accessedAt": "2025-10-31T00:28:26.418+00:00"
# }
self.endpoint = endpoint
self.project_id = project_id
class AppWriteClient:
def __init__(self):
self.session_key = f"a_session_{PROJECT_ID}"
self.client = Client()
self.client.set_endpoint(self.endpoint)
self.client.set_project(self.project_id)
def _get_admin_client(self):
return (Client()
.set_endpoint(ENDPOINT)
.set_project(PROJECT_ID)
.set_key(API_KEY)
)
def _get_user_client(self):
client = (Client()
.set_endpoint(ENDPOINT)
.set_project(PROJECT_ID)
)
# If we need admin privileges (to get session.secret), set the API key
if use_admin:
if not api_key:
raise RuntimeError("APPWRITE_API_KEY is required when use_admin=True")
self.client.set_key(api_key)
if session[self.session_key] is not None:
client.set_session(session[self.session_key])
# Bind session if available (explicit → browser cookie → Flask session)
bound = False
if cookies:
bound = self._bind_session_from(cookies)
if not bound and has_request_context():
bound = self._bind_session_from(request.cookies)
if not bound and has_request_context():
secret = session.get("appwrite_cookies")
if secret:
self.client.set_session(secret)
bound = True
return client
self.account = Account(self.client)
def create_new_user(self, email:str, password:str, name:Optional[str]):
admin_client = self._get_admin_client()
try:
admin_account = Account(admin_client)
admin_account.create(user_id=ID.unique(),email=email,password=password,name=name)
return True, ""
except Exception as e:
return False, e
def _refresh_user_session_data(self):
user_client = self._get_user_client()
user_account = Account(user_client)
user = user_account.get()
session['user']=user
@staticmethod
def session_cookie_key(project_id: str) -> str:
return f"a_session_{project_id}"
def log_user_in(self, email:str,password:str):
admin_client = self._get_admin_client()
try:
admin_account = Account(admin_client)
user_session = admin_account.create_email_password_session(email,password)
session[self.session_key]=user_session['secret']
def _bind_session_from(self, cookies: Union[str, Mapping[str, str]]) -> bool:
if isinstance(cookies, str):
self.client.set_session(cookies); return True
key = f"a_session_{self.project_id}"
if key in cookies and cookies.get(key):
self.client.set_session(cookies[key]); return True
for v in cookies.values():
if v: self.client.set_session(v); return True
return False
self._refresh_user_session_data()
return True, ""
except Exception as e:
return False, str(e)
def log_user_out(self):
try:
user_client = self._get_user_client()
user_account = Account(user_client)
user_account.delete_sessions()
return True
except Exception as e:
return True
# --- Auth & account helpers ---
def create_user(self, email: str, password: str, name: Optional[str] = None, user_id: Optional[str] = None) -> Dict[str, Any]:
return dict(self.account.create(user_id=user_id or ID.unique(), email=email, password=password, name=name))
def create_email_password_session(self, email: str, password: str) -> Dict[str, Any]:
return dict(self.account.create_email_password_session(email=email, password=password))
def create_jwt(self) -> Dict[str, Any]:
return dict(self.account.create_jwt())
def get_account(self) -> Dict[str, Any]:
return dict(self.account.get())
def logout_current(self) -> bool:
self.account.delete_session("current")
return True
# --- Email verification ---
def send_verification(self, callback_url: str) -> Dict[str, Any]:
return dict(self.account.create_verification(url=callback_url))
def complete_verification(self, user_id: str, secret: str) -> Dict[str, Any]:
return dict(self.account.update_verification(user_id=user_id, secret=secret))
def send_email_verification(self):
user_client = self._get_user_client()
user_account = Account(user_client)
callback_url = url_for('auth.callback', _external=True)
user_account.create_verification(url=callback_url)
def verify_email(self, user_id:str, secret:str):
if session[self.session_key] is None:
return False
try:
user_client = self._get_user_client()
user_account = Account(user_client)
user_account.update_email_verification(user_id,secret)
self._refresh_user_session_data()
return True
except Exception as e:
return False