import json from pathlib import Path import httpx from app.config import ( TVDB_API_KEY, TVDB_PIN, TVDB_BASE_URL, TVDB_AUTH_STATE_FILE, TVDB_TOKEN_RENEW_MARGIN_SECONDS, TVDB_REQUEST_TIMEOUT_SECONDS, ) from app.utils.jwt_utils import decode_jwt_payload, unix_to_iso_utc, now_utc_ts class TvdbAuthService: def __init__(self, auth_file: Path = TVDB_AUTH_STATE_FILE): self.auth_file = auth_file def load_state(self) -> dict: if not self.auth_file.exists(): return self.empty_state() try: return json.loads(self.auth_file.read_text(encoding="utf-8")) except Exception: return self.empty_state() def save_state(self, state: dict) -> None: self.auth_file.write_text( json.dumps(state, indent=2, ensure_ascii=False), encoding="utf-8", ) @staticmethod def empty_state() -> dict: return { "token": None, "token_type": "Bearer", "issued_at": None, "expires_at": None, "expires_at_unix": None, "renew_after": None, "renew_after_unix": None, "last_login_attempt_at": None, "last_login_success_at": None, "last_login_status": None, "last_login_error": None, "jwt_payload": {}, "token_source": "none", } def login_and_store_token(self) -> dict: if not TVDB_API_KEY: raise RuntimeError("TVDB_API_KEY is not configured") state = self.load_state() state["last_login_attempt_at"] = unix_to_iso_utc(now_utc_ts()) state["last_login_status"] = "pending" state["last_login_error"] = None self.save_state(state) payload = {"apikey": TVDB_API_KEY} if TVDB_PIN: payload["pin"] = TVDB_PIN with httpx.Client(timeout=TVDB_REQUEST_TIMEOUT_SECONDS) as client: response = client.post(f"{TVDB_BASE_URL}/login", json=payload) response.raise_for_status() data = response.json() token = data["data"]["token"] claims = decode_jwt_payload(token) exp = claims.get("exp") iat = claims.get("iat") if not exp: raise RuntimeError("JWT exp claim is missing") renew_after_unix = int(exp) - TVDB_TOKEN_RENEW_MARGIN_SECONDS new_state = { "token": token, "token_type": "Bearer", "issued_at": unix_to_iso_utc(iat), "expires_at": unix_to_iso_utc(exp), "expires_at_unix": int(exp), "renew_after": unix_to_iso_utc(renew_after_unix), "renew_after_unix": renew_after_unix, "last_login_attempt_at": state["last_login_attempt_at"], "last_login_success_at": unix_to_iso_utc(now_utc_ts()), "last_login_status": "ok", "last_login_error": None, "jwt_payload": { "exp": claims.get("exp"), "iat": claims.get("iat"), }, "token_source": "renewed" if state.get("token") else "login", } self.save_state(new_state) return new_state def get_valid_token(self) -> str: state = self.load_state() now_ts = now_utc_ts() token = state.get("token") exp = state.get("expires_at_unix") renew_after = state.get("renew_after_unix") if not token or not exp or not renew_after: return self.login_and_store_token()["token"] if now_ts >= int(exp): return self.login_and_store_token()["token"] if now_ts >= int(renew_after): return self.login_and_store_token()["token"] state["token_source"] = "cached" self.save_state(state) return token def auth_status(self) -> dict: state = self.load_state() now_ts = now_utc_ts() exp = state.get("expires_at_unix") renew_after = state.get("renew_after_unix") return { "configured": bool(TVDB_API_KEY), "has_token": bool(state.get("token")), "issued_at": state.get("issued_at"), "expires_at": state.get("expires_at"), "expires_at_unix": exp, "renew_after": state.get("renew_after"), "renew_after_unix": renew_after, "is_expired": bool(exp and now_ts >= int(exp)), "is_renewal_due": bool(renew_after and now_ts >= int(renew_after)), "last_login_attempt_at": state.get("last_login_attempt_at"), "last_login_success_at": state.get("last_login_success_at"), "last_login_status": state.get("last_login_status"), "last_login_error": state.get("last_login_error"), "jwt_payload": state.get("jwt_payload", {}), "token_source": state.get("token_source"), }