Files
rename-mvp/app/services/tvdb_auth_service.py
2026-03-07 09:59:27 +01:00

148 lines
4.8 KiB
Python

import json
from pathlib import Path
import httpx
from app.config import (
TVDB_API_KEY,
TVDB_PIN,
TVDB_BASE_URL,
TVDB_AUTH_STATE_FILE,
TVDB_TOKEN_RENEW_MARGIN_SECONDS,
TVDB_REQUEST_TIMEOUT_SECONDS,
)
from app.utils.jwt_utils import decode_jwt_payload, unix_to_iso_utc, now_utc_ts
class TvdbAuthService:
def __init__(self, auth_file: Path = TVDB_AUTH_STATE_FILE):
self.auth_file = auth_file
def load_state(self) -> dict:
if not self.auth_file.exists():
return self.empty_state()
try:
return json.loads(self.auth_file.read_text(encoding="utf-8"))
except Exception:
return self.empty_state()
def save_state(self, state: dict) -> None:
self.auth_file.write_text(
json.dumps(state, indent=2, ensure_ascii=False),
encoding="utf-8",
)
@staticmethod
def empty_state() -> dict:
return {
"token": None,
"token_type": "Bearer",
"issued_at": None,
"expires_at": None,
"expires_at_unix": None,
"renew_after": None,
"renew_after_unix": None,
"last_login_attempt_at": None,
"last_login_success_at": None,
"last_login_status": None,
"last_login_error": None,
"jwt_payload": {},
"token_source": "none",
}
def login_and_store_token(self) -> dict:
if not TVDB_API_KEY:
raise RuntimeError("TVDB_API_KEY is not configured")
state = self.load_state()
state["last_login_attempt_at"] = unix_to_iso_utc(now_utc_ts())
state["last_login_status"] = "pending"
state["last_login_error"] = None
self.save_state(state)
payload = {"apikey": TVDB_API_KEY}
if TVDB_PIN:
payload["pin"] = TVDB_PIN
with httpx.Client(timeout=TVDB_REQUEST_TIMEOUT_SECONDS) as client:
response = client.post(f"{TVDB_BASE_URL}/login", json=payload)
response.raise_for_status()
data = response.json()
token = data["data"]["token"]
claims = decode_jwt_payload(token)
exp = claims.get("exp")
iat = claims.get("iat")
if not exp:
raise RuntimeError("JWT exp claim is missing")
renew_after_unix = int(exp) - TVDB_TOKEN_RENEW_MARGIN_SECONDS
new_state = {
"token": token,
"token_type": "Bearer",
"issued_at": unix_to_iso_utc(iat),
"expires_at": unix_to_iso_utc(exp),
"expires_at_unix": int(exp),
"renew_after": unix_to_iso_utc(renew_after_unix),
"renew_after_unix": renew_after_unix,
"last_login_attempt_at": state["last_login_attempt_at"],
"last_login_success_at": unix_to_iso_utc(now_utc_ts()),
"last_login_status": "ok",
"last_login_error": None,
"jwt_payload": {
"exp": claims.get("exp"),
"iat": claims.get("iat"),
},
"token_source": "renewed" if state.get("token") else "login",
}
self.save_state(new_state)
return new_state
def get_valid_token(self) -> str:
state = self.load_state()
now_ts = now_utc_ts()
token = state.get("token")
exp = state.get("expires_at_unix")
renew_after = state.get("renew_after_unix")
if not token or not exp or not renew_after:
return self.login_and_store_token()["token"]
if now_ts >= int(exp):
return self.login_and_store_token()["token"]
if now_ts >= int(renew_after):
return self.login_and_store_token()["token"]
state["token_source"] = "cached"
self.save_state(state)
return token
def auth_status(self) -> dict:
state = self.load_state()
now_ts = now_utc_ts()
exp = state.get("expires_at_unix")
renew_after = state.get("renew_after_unix")
return {
"configured": bool(TVDB_API_KEY),
"has_token": bool(state.get("token")),
"issued_at": state.get("issued_at"),
"expires_at": state.get("expires_at"),
"expires_at_unix": exp,
"renew_after": state.get("renew_after"),
"renew_after_unix": renew_after,
"is_expired": bool(exp and now_ts >= int(exp)),
"is_renewal_due": bool(renew_after and now_ts >= int(renew_after)),
"last_login_attempt_at": state.get("last_login_attempt_at"),
"last_login_success_at": state.get("last_login_success_at"),
"last_login_status": state.get("last_login_status"),
"last_login_error": state.get("last_login_error"),
"jwt_payload": state.get("jwt_payload", {}),
"token_source": state.get("token_source"),
}