Files

323 lines
10 KiB
Python

from fastapi import APIRouter, HTTPException, Query
from app.services.tvdb_auth_service import TvdbAuthService
from app.services.tvdb_client import TvdbClient
router = APIRouter()
@router.get("/auth-status")
def auth_status():
service = TvdbAuthService()
return service.auth_status()
@router.post("/login")
def force_login():
service = TvdbAuthService()
try:
state = service.login_and_store_token()
return {
"status": "ok",
"issued_at": state["issued_at"],
"expires_at": state["expires_at"],
"renew_after": state["renew_after"],
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/search")
def search_series(q: str = Query(..., min_length=1)):
"""
Eenvoudige smoke test richting TVDB.
Deze proxy gebruikt de backend token flow en filtert series uit het resultaat.
"""
client = TvdbClient()
try:
# In TVDB v4 wordt search via de API ondersteund; deze proxy is bewust dun.
payload = client.get("/search", params={"query": q, "type": "series"})
items = payload.get("data", [])
normalized = []
for item in items:
name = item.get("name") or item.get("seriesName") or item.get("slug")
year = item.get("year")
normalized.append(
{
"id": item.get("tvdb_id") or item.get("id"),
"name": name,
"year": year,
"display_name": f"{name} ({year})" if name and year else name,
"raw": item,
}
)
return {"items": normalized}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
def _series_display_name(name: str | None, year: int | str | None) -> str | None:
if name and year:
return f"{name} ({year})"
return name
def _extract_episodes(payload: dict) -> list[dict]:
data = payload.get("data")
if isinstance(data, list):
return data
if isinstance(data, dict):
episodes = data.get("episodes")
if isinstance(episodes, list):
return episodes
return []
def _fetch_aired_episodes(client: TvdbClient, series_id: int) -> list[dict]:
attempts = [
(f"/series/{series_id}/episodes/default", {"seasonType": "aired"}),
(f"/series/{series_id}/episodes/official", None),
]
errors = []
for path, params in attempts:
try:
payload = client.get(path, params=params)
items = _extract_episodes(payload)
if items:
return items
except Exception as exc:
errors.append(f"{path}: {exc}")
raise RuntimeError(
"Unable to load aired episodes from TVDB; tried known episode endpoints"
if not errors
else f"Unable to load aired episodes from TVDB ({'; '.join(errors)})"
)
def _status_name(value) -> str | None:
if isinstance(value, dict):
name = value.get("name")
if isinstance(name, str) and name.strip():
return name.strip()
if isinstance(value, str) and value.strip():
return value.strip()
return None
def _fetch_artwork_type_labels(client: TvdbClient) -> dict[int, str]:
payload = client.get("/artwork/types")
items = payload.get("data", []) if isinstance(payload, dict) else []
labels: dict[int, str] = {}
for item in items:
if not isinstance(item, dict):
continue
type_id = item.get("id")
try:
type_id = int(type_id)
except (TypeError, ValueError):
continue
label = (
item.get("slug")
or item.get("name")
or ""
)
label = label.lower().strip() if isinstance(label, str) else ""
labels[type_id] = label
return labels
def _pick_banner_url(artworks: list[dict], artwork_type_labels: dict[int, str]) -> str | None:
strict_banner_candidates = []
wide_fallback_candidates = []
generic_wide_candidates = []
for artwork in artworks:
if not isinstance(artwork, dict):
continue
image = artwork.get("image") or artwork.get("thumbnail")
if not isinstance(image, str) or not image.strip():
continue
width = artwork.get("width")
height = artwork.get("height")
score = artwork.get("score") or 0
try:
width = int(width) if width is not None else 0
height = int(height) if height is not None else 0
except (TypeError, ValueError):
width, height = 0, 0
try:
score = float(score)
except (TypeError, ValueError):
score = 0.0
ratio = (width / height) if width > 0 and height > 0 else 0.0
record = (score, width, image.strip())
type_label = ""
try:
artwork_type = int(artwork.get("type"))
type_label = artwork_type_labels.get(artwork_type, "")
except (TypeError, ValueError):
type_label = ""
if "banner" in type_label:
strict_banner_candidates.append(record)
elif any(token in type_label for token in ("fanart", "background", "landscape")):
wide_fallback_candidates.append(record)
elif ratio >= 1.4:
generic_wide_candidates.append(record)
if strict_banner_candidates:
strict_banner_candidates.sort(key=lambda x: (x[0], x[1]), reverse=True)
return strict_banner_candidates[0][2]
if wide_fallback_candidates:
wide_fallback_candidates.sort(key=lambda x: (x[0], x[1]), reverse=True)
return wide_fallback_candidates[0][2]
if generic_wide_candidates:
generic_wide_candidates.sort(key=lambda x: (x[0], x[1]), reverse=True)
return generic_wide_candidates[0][2]
return None
@router.get("/series/{series_id}/summary")
def get_series_summary(series_id: int):
client = TvdbClient()
try:
extended_payload = client.get(f"/series/{series_id}/extended")
base_payload = client.get(f"/series/{series_id}")
extended_data = extended_payload.get("data", {}) if isinstance(extended_payload, dict) else {}
base_data = base_payload.get("data", {}) if isinstance(base_payload, dict) else {}
artworks = extended_data.get("artworks")
artworks = artworks if isinstance(artworks, list) else []
artwork_type_labels = _fetch_artwork_type_labels(client)
banner_url = _pick_banner_url(artworks, artwork_type_labels)
poster_url = (
base_data.get("image")
or extended_data.get("image")
or None
)
first_aired = (
extended_data.get("firstAired")
or base_data.get("firstAired")
or None
)
network = (
extended_data.get("latestNetwork", {}).get("name")
if isinstance(extended_data.get("latestNetwork"), dict)
else None
) or extended_data.get("network") or base_data.get("network")
status = _status_name(extended_data.get("status")) or _status_name(base_data.get("status"))
overview = (
extended_data.get("overview")
or base_data.get("overview")
or None
)
slug = extended_data.get("slug") or base_data.get("slug")
tvdb_id = extended_data.get("id") or base_data.get("id") or series_id
return {
"id": tvdb_id,
"banner_url": banner_url,
"poster_url": poster_url,
"first_aired": first_aired,
"network": network,
"status": status,
"overview": overview,
"slug": slug,
"tvdb_url": f"https://www.thetvdb.com/series/{slug}" if slug else None,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/series/{series_id}/episodes")
def get_series_episodes(
series_id: int,
order_type: str = Query("aired", min_length=1),
):
if order_type.lower() != "aired":
raise HTTPException(
status_code=400,
detail="Unsupported order_type. Only 'aired' is supported in this MVP.",
)
client = TvdbClient()
try:
series_payload = client.get(f"/series/{series_id}")
series_data = series_payload.get("data", {}) if isinstance(series_payload, dict) else {}
series_name = (
series_data.get("name")
or series_data.get("seriesName")
or series_data.get("slug")
)
series_year = series_data.get("year")
episodes = _fetch_aired_episodes(client, series_id)
normalized = []
for item in episodes:
season_number = (
item.get("seasonNumber")
or item.get("airedSeason")
or item.get("season")
or 0
)
episode_number = (
item.get("number")
or item.get("airedEpisodeNumber")
or item.get("episodeNumber")
or 0
)
title = item.get("name") or item.get("episodeName") or ""
aired_on = item.get("aired") or item.get("firstAired")
label = None
try:
season_num = int(season_number)
episode_num = int(episode_number)
label = f"S{season_num:02}E{episode_num:02} - {title}"
if aired_on:
label = f"{label} - {aired_on}"
except (TypeError, ValueError):
pass
normalized.append(
{
"id": item.get("id"),
"season_number": season_number,
"episode_number": episode_number,
"title": title,
"aired": aired_on,
"label": label,
"raw": item,
}
)
return {
"series": {
"id": series_data.get("id") or series_id,
"name": series_name,
"year": series_year,
"display_name": _series_display_name(series_name, series_year),
},
"order_type": "aired",
"items": normalized,
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))