143 lines
4.6 KiB
Python
143 lines
4.6 KiB
Python
import json
|
|
from pathlib import Path
|
|
import httpx
|
|
|
|
from app.config import (
|
|
TVDB_API_KEY,
|
|
TVDB_PIN,
|
|
TVDB_BASE_URL,
|
|
TVDB_AUTH_STATE_FILE,
|
|
TVDB_TOKEN_RENEW_MARGIN_SECONDS,
|
|
TVDB_REQUEST_TIMEOUT_SECONDS,
|
|
)
|
|
from app.utils.jwt_utils import decode_jwt_payload, unix_to_iso_utc, now_utc_ts
|
|
|
|
|
|
class TvdbAuthService:
|
|
def __init__(self, auth_file: Path = TVDB_AUTH_STATE_FILE):
|
|
self.auth_file = auth_file
|
|
|
|
def load_state(self) -> dict:
|
|
if not self.auth_file.exists():
|
|
return self.empty_state()
|
|
try:
|
|
return json.loads(self.auth_file.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return self.empty_state()
|
|
|
|
def save_state(self, state: dict) -> None:
|
|
self.auth_file.write_text(
|
|
json.dumps(state, indent=2, ensure_ascii=False),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
@staticmethod
|
|
def empty_state() -> dict:
|
|
return {
|
|
"token": None,
|
|
"token_type": "Bearer",
|
|
"issued_at": None,
|
|
"expires_at": None,
|
|
"expires_at_unix": None,
|
|
"renew_after": None,
|
|
"renew_after_unix": None,
|
|
"last_login_attempt_at": None,
|
|
"last_login_success_at": None,
|
|
"last_login_status": None,
|
|
"last_login_error": None,
|
|
"jwt_payload": {},
|
|
}
|
|
|
|
def login_and_store_token(self) -> dict:
|
|
if not TVDB_API_KEY:
|
|
raise RuntimeError("TVDB_API_KEY is not configured")
|
|
|
|
state = self.load_state()
|
|
state["last_login_attempt_at"] = unix_to_iso_utc(now_utc_ts())
|
|
state["last_login_status"] = "pending"
|
|
state["last_login_error"] = None
|
|
self.save_state(state)
|
|
|
|
payload = {"apikey": TVDB_API_KEY}
|
|
if TVDB_PIN:
|
|
payload["pin"] = TVDB_PIN
|
|
|
|
with httpx.Client(timeout=TVDB_REQUEST_TIMEOUT_SECONDS) as client:
|
|
response = client.post(f"{TVDB_BASE_URL}/login", json=payload)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
token = data["data"]["token"]
|
|
claims = decode_jwt_payload(token)
|
|
|
|
exp = claims.get("exp")
|
|
iat = claims.get("iat")
|
|
|
|
if not exp:
|
|
raise RuntimeError("JWT exp claim is missing")
|
|
|
|
renew_after_unix = int(exp) - TVDB_TOKEN_RENEW_MARGIN_SECONDS
|
|
|
|
new_state = {
|
|
"token": token,
|
|
"token_type": "Bearer",
|
|
"issued_at": unix_to_iso_utc(iat),
|
|
"expires_at": unix_to_iso_utc(exp),
|
|
"expires_at_unix": int(exp),
|
|
"renew_after": unix_to_iso_utc(renew_after_unix),
|
|
"renew_after_unix": renew_after_unix,
|
|
"last_login_attempt_at": state["last_login_attempt_at"],
|
|
"last_login_success_at": unix_to_iso_utc(now_utc_ts()),
|
|
"last_login_status": "ok",
|
|
"last_login_error": None,
|
|
"jwt_payload": {
|
|
"exp": claims.get("exp"),
|
|
"iat": claims.get("iat"),
|
|
},
|
|
}
|
|
self.save_state(new_state)
|
|
return new_state
|
|
|
|
def get_valid_token(self) -> str:
|
|
state = self.load_state()
|
|
now_ts = now_utc_ts()
|
|
|
|
token = state.get("token")
|
|
exp = state.get("expires_at_unix")
|
|
renew_after = state.get("renew_after_unix")
|
|
|
|
if not token or not exp or not renew_after:
|
|
return self.login_and_store_token()["token"]
|
|
|
|
if now_ts >= int(exp):
|
|
return self.login_and_store_token()["token"]
|
|
|
|
if now_ts >= int(renew_after):
|
|
return self.login_and_store_token()["token"]
|
|
|
|
return token
|
|
|
|
def auth_status(self) -> dict:
|
|
state = self.load_state()
|
|
now_ts = now_utc_ts()
|
|
|
|
exp = state.get("expires_at_unix")
|
|
renew_after = state.get("renew_after_unix")
|
|
|
|
return {
|
|
"configured": bool(TVDB_API_KEY),
|
|
"has_token": bool(state.get("token")),
|
|
"issued_at": state.get("issued_at"),
|
|
"expires_at": state.get("expires_at"),
|
|
"expires_at_unix": exp,
|
|
"renew_after": state.get("renew_after"),
|
|
"renew_after_unix": renew_after,
|
|
"is_expired": bool(exp and now_ts >= int(exp)),
|
|
"is_renewal_due": bool(renew_after and now_ts >= int(renew_after)),
|
|
"last_login_attempt_at": state.get("last_login_attempt_at"),
|
|
"last_login_success_at": state.get("last_login_success_at"),
|
|
"last_login_status": state.get("last_login_status"),
|
|
"last_login_error": state.get("last_login_error"),
|
|
"jwt_payload": state.get("jwt_payload", {}),
|
|
}
|