Eerste commit voor rename-mvp

This commit is contained in:
kodi
2026-03-07 09:59:27 +01:00
commit 27cee7395f
12 changed files with 510 additions and 0 deletions
+57
View File
@@ -0,0 +1,57 @@
from fastapi import APIRouter, HTTPException, Query
from app.services.tvdb_auth_service import TvdbAuthService
from app.services.tvdb_client import TvdbClient
router = APIRouter()
@router.get("/auth-status")
def auth_status():
service = TvdbAuthService()
return service.auth_status()
@router.post("/login")
def force_login():
service = TvdbAuthService()
try:
state = service.login_and_store_token()
return {
"status": "ok",
"issued_at": state["issued_at"],
"expires_at": state["expires_at"],
"renew_after": state["renew_after"],
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/search")
def search_series(q: str = Query(..., min_length=1)):
"""
Eenvoudige smoke test richting TVDB.
Deze proxy gebruikt de backend token flow en filtert series uit het resultaat.
"""
client = TvdbClient()
try:
# In TVDB v4 wordt search via de API ondersteund; deze proxy is bewust dun.
payload = client.get("/search", params={"query": q, "type": "series"})
items = payload.get("data", [])
normalized = []
for item in items:
name = item.get("name") or item.get("seriesName") or item.get("slug")
year = item.get("year")
normalized.append(
{
"id": item.get("tvdb_id") or item.get("id"),
"name": name,
"year": year,
"display_name": f"{name} ({year})" if name and year else name,
"raw": item,
}
)
return {"items": normalized}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
+21
View File
@@ -0,0 +1,21 @@
from pathlib import Path
import os
APP_DATA_DIR = Path(os.getenv("APP_DATA_DIR", "/app/data"))
TVDB_AUTH_STATE_FILE = Path(
os.getenv("TVDB_AUTH_STATE_FILE", str(APP_DATA_DIR / "tvdb_auth.json"))
)
TVDB_API_KEY = os.getenv("TVDB_API_KEY", "").strip()
TVDB_PIN = os.getenv("TVDB_PIN", "").strip()
TVDB_BASE_URL = os.getenv("TVDB_BASE_URL", "https://api4.thetvdb.com/v4").rstrip("/")
TVDB_TOKEN_RENEW_MARGIN_SECONDS = int(
os.getenv("TVDB_TOKEN_RENEW_MARGIN_SECONDS", "259200")
)
TVDB_REQUEST_TIMEOUT_SECONDS = int(
os.getenv("TVDB_REQUEST_TIMEOUT_SECONDS", "20")
)
APP_DATA_DIR.mkdir(parents=True, exist_ok=True)
TVDB_AUTH_STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
+11
View File
@@ -0,0 +1,11 @@
from fastapi import FastAPI
from app.api.tvdb import router as tvdb_router
app = FastAPI(title="Rename MVP")
app.include_router(tvdb_router, prefix="/api/tvdb", tags=["tvdb"])
@app.get("/api/health")
def health():
return {"status": "ok"}
+142
View File
@@ -0,0 +1,142 @@
import json
from pathlib import Path
import httpx
from app.config import (
TVDB_API_KEY,
TVDB_PIN,
TVDB_BASE_URL,
TVDB_AUTH_STATE_FILE,
TVDB_TOKEN_RENEW_MARGIN_SECONDS,
TVDB_REQUEST_TIMEOUT_SECONDS,
)
from app.utils.jwt_utils import decode_jwt_payload, unix_to_iso_utc, now_utc_ts
class TvdbAuthService:
def __init__(self, auth_file: Path = TVDB_AUTH_STATE_FILE):
self.auth_file = auth_file
def load_state(self) -> dict:
if not self.auth_file.exists():
return self.empty_state()
try:
return json.loads(self.auth_file.read_text(encoding="utf-8"))
except Exception:
return self.empty_state()
def save_state(self, state: dict) -> None:
self.auth_file.write_text(
json.dumps(state, indent=2, ensure_ascii=False),
encoding="utf-8",
)
@staticmethod
def empty_state() -> dict:
return {
"token": None,
"token_type": "Bearer",
"issued_at": None,
"expires_at": None,
"expires_at_unix": None,
"renew_after": None,
"renew_after_unix": None,
"last_login_attempt_at": None,
"last_login_success_at": None,
"last_login_status": None,
"last_login_error": None,
"jwt_payload": {},
}
def login_and_store_token(self) -> dict:
if not TVDB_API_KEY:
raise RuntimeError("TVDB_API_KEY is not configured")
state = self.load_state()
state["last_login_attempt_at"] = unix_to_iso_utc(now_utc_ts())
state["last_login_status"] = "pending"
state["last_login_error"] = None
self.save_state(state)
payload = {"apikey": TVDB_API_KEY}
if TVDB_PIN:
payload["pin"] = TVDB_PIN
with httpx.Client(timeout=TVDB_REQUEST_TIMEOUT_SECONDS) as client:
response = client.post(f"{TVDB_BASE_URL}/login", json=payload)
response.raise_for_status()
data = response.json()
token = data["data"]["token"]
claims = decode_jwt_payload(token)
exp = claims.get("exp")
iat = claims.get("iat")
if not exp:
raise RuntimeError("JWT exp claim is missing")
renew_after_unix = int(exp) - TVDB_TOKEN_RENEW_MARGIN_SECONDS
new_state = {
"token": token,
"token_type": "Bearer",
"issued_at": unix_to_iso_utc(iat),
"expires_at": unix_to_iso_utc(exp),
"expires_at_unix": int(exp),
"renew_after": unix_to_iso_utc(renew_after_unix),
"renew_after_unix": renew_after_unix,
"last_login_attempt_at": state["last_login_attempt_at"],
"last_login_success_at": unix_to_iso_utc(now_utc_ts()),
"last_login_status": "ok",
"last_login_error": None,
"jwt_payload": {
"exp": claims.get("exp"),
"iat": claims.get("iat"),
},
}
self.save_state(new_state)
return new_state
def get_valid_token(self) -> str:
state = self.load_state()
now_ts = now_utc_ts()
token = state.get("token")
exp = state.get("expires_at_unix")
renew_after = state.get("renew_after_unix")
if not token or not exp or not renew_after:
return self.login_and_store_token()["token"]
if now_ts >= int(exp):
return self.login_and_store_token()["token"]
if now_ts >= int(renew_after):
return self.login_and_store_token()["token"]
return token
def auth_status(self) -> dict:
state = self.load_state()
now_ts = now_utc_ts()
exp = state.get("expires_at_unix")
renew_after = state.get("renew_after_unix")
return {
"configured": bool(TVDB_API_KEY),
"has_token": bool(state.get("token")),
"issued_at": state.get("issued_at"),
"expires_at": state.get("expires_at"),
"expires_at_unix": exp,
"renew_after": state.get("renew_after"),
"renew_after_unix": renew_after,
"is_expired": bool(exp and now_ts >= int(exp)),
"is_renewal_due": bool(renew_after and now_ts >= int(renew_after)),
"last_login_attempt_at": state.get("last_login_attempt_at"),
"last_login_success_at": state.get("last_login_success_at"),
"last_login_status": state.get("last_login_status"),
"last_login_error": state.get("last_login_error"),
"jwt_payload": state.get("jwt_payload", {}),
}
+147
View File
@@ -0,0 +1,147 @@
import json
from pathlib import Path
import httpx
from app.config import (
TVDB_API_KEY,
TVDB_PIN,
TVDB_BASE_URL,
TVDB_AUTH_STATE_FILE,
TVDB_TOKEN_RENEW_MARGIN_SECONDS,
TVDB_REQUEST_TIMEOUT_SECONDS,
)
from app.utils.jwt_utils import decode_jwt_payload, unix_to_iso_utc, now_utc_ts
class TvdbAuthService:
def __init__(self, auth_file: Path = TVDB_AUTH_STATE_FILE):
self.auth_file = auth_file
def load_state(self) -> dict:
if not self.auth_file.exists():
return self.empty_state()
try:
return json.loads(self.auth_file.read_text(encoding="utf-8"))
except Exception:
return self.empty_state()
def save_state(self, state: dict) -> None:
self.auth_file.write_text(
json.dumps(state, indent=2, ensure_ascii=False),
encoding="utf-8",
)
@staticmethod
def empty_state() -> dict:
return {
"token": None,
"token_type": "Bearer",
"issued_at": None,
"expires_at": None,
"expires_at_unix": None,
"renew_after": None,
"renew_after_unix": None,
"last_login_attempt_at": None,
"last_login_success_at": None,
"last_login_status": None,
"last_login_error": None,
"jwt_payload": {},
"token_source": "none",
}
def login_and_store_token(self) -> dict:
if not TVDB_API_KEY:
raise RuntimeError("TVDB_API_KEY is not configured")
state = self.load_state()
state["last_login_attempt_at"] = unix_to_iso_utc(now_utc_ts())
state["last_login_status"] = "pending"
state["last_login_error"] = None
self.save_state(state)
payload = {"apikey": TVDB_API_KEY}
if TVDB_PIN:
payload["pin"] = TVDB_PIN
with httpx.Client(timeout=TVDB_REQUEST_TIMEOUT_SECONDS) as client:
response = client.post(f"{TVDB_BASE_URL}/login", json=payload)
response.raise_for_status()
data = response.json()
token = data["data"]["token"]
claims = decode_jwt_payload(token)
exp = claims.get("exp")
iat = claims.get("iat")
if not exp:
raise RuntimeError("JWT exp claim is missing")
renew_after_unix = int(exp) - TVDB_TOKEN_RENEW_MARGIN_SECONDS
new_state = {
"token": token,
"token_type": "Bearer",
"issued_at": unix_to_iso_utc(iat),
"expires_at": unix_to_iso_utc(exp),
"expires_at_unix": int(exp),
"renew_after": unix_to_iso_utc(renew_after_unix),
"renew_after_unix": renew_after_unix,
"last_login_attempt_at": state["last_login_attempt_at"],
"last_login_success_at": unix_to_iso_utc(now_utc_ts()),
"last_login_status": "ok",
"last_login_error": None,
"jwt_payload": {
"exp": claims.get("exp"),
"iat": claims.get("iat"),
},
"token_source": "renewed" if state.get("token") else "login",
}
self.save_state(new_state)
return new_state
def get_valid_token(self) -> str:
state = self.load_state()
now_ts = now_utc_ts()
token = state.get("token")
exp = state.get("expires_at_unix")
renew_after = state.get("renew_after_unix")
if not token or not exp or not renew_after:
return self.login_and_store_token()["token"]
if now_ts >= int(exp):
return self.login_and_store_token()["token"]
if now_ts >= int(renew_after):
return self.login_and_store_token()["token"]
state["token_source"] = "cached"
self.save_state(state)
return token
def auth_status(self) -> dict:
state = self.load_state()
now_ts = now_utc_ts()
exp = state.get("expires_at_unix")
renew_after = state.get("renew_after_unix")
return {
"configured": bool(TVDB_API_KEY),
"has_token": bool(state.get("token")),
"issued_at": state.get("issued_at"),
"expires_at": state.get("expires_at"),
"expires_at_unix": exp,
"renew_after": state.get("renew_after"),
"renew_after_unix": renew_after,
"is_expired": bool(exp and now_ts >= int(exp)),
"is_renewal_due": bool(renew_after and now_ts >= int(renew_after)),
"last_login_attempt_at": state.get("last_login_attempt_at"),
"last_login_success_at": state.get("last_login_success_at"),
"last_login_status": state.get("last_login_status"),
"last_login_error": state.get("last_login_error"),
"jwt_payload": state.get("jwt_payload", {}),
"token_source": state.get("token_source"),
}
+23
View File
@@ -0,0 +1,23 @@
import httpx
from app.config import TVDB_BASE_URL, TVDB_REQUEST_TIMEOUT_SECONDS
from app.services.tvdb_auth_service import TvdbAuthService
class TvdbClient:
def __init__(self):
self.auth = TvdbAuthService()
def get(self, path: str, params: dict | None = None) -> dict:
token = self.auth.get_valid_token()
headers = {"Authorization": f"Bearer {token}"}
with httpx.Client(timeout=TVDB_REQUEST_TIMEOUT_SECONDS) as client:
response = client.get(f"{TVDB_BASE_URL}{path}", params=params, headers=headers)
if response.status_code == 401:
token = self.auth.login_and_store_token()["token"]
headers = {"Authorization": f"Bearer {token}"}
response = client.get(f"{TVDB_BASE_URL}{path}", params=params, headers=headers)
response.raise_for_status()
return response.json()
+24
View File
@@ -0,0 +1,24 @@
import base64
import json
from datetime import datetime, timezone
def decode_jwt_payload(token: str) -> dict:
parts = token.split(".")
if len(parts) != 3:
raise ValueError("Invalid JWT format")
payload_b64 = parts[1]
padding = "=" * (-len(payload_b64) % 4)
raw = base64.urlsafe_b64decode(payload_b64 + padding)
return json.loads(raw.decode("utf-8"))
def unix_to_iso_utc(value: int | None) -> str | None:
if value is None:
return None
return datetime.fromtimestamp(value, tz=timezone.utc).isoformat()
def now_utc_ts() -> int:
return int(datetime.now(tz=timezone.utc).timestamp())