323 lines
10 KiB
Python
323 lines
10 KiB
Python
from fastapi import APIRouter, HTTPException, Query
|
|
from app.services.tvdb_auth_service import TvdbAuthService
|
|
from app.services.tvdb_client import TvdbClient
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.get("/auth-status")
|
|
def auth_status():
|
|
service = TvdbAuthService()
|
|
return service.auth_status()
|
|
|
|
|
|
@router.post("/login")
|
|
def force_login():
|
|
service = TvdbAuthService()
|
|
try:
|
|
state = service.login_and_store_token()
|
|
return {
|
|
"status": "ok",
|
|
"issued_at": state["issued_at"],
|
|
"expires_at": state["expires_at"],
|
|
"renew_after": state["renew_after"],
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/search")
|
|
def search_series(q: str = Query(..., min_length=1)):
|
|
"""
|
|
Eenvoudige smoke test richting TVDB.
|
|
Deze proxy gebruikt de backend token flow en filtert series uit het resultaat.
|
|
"""
|
|
client = TvdbClient()
|
|
try:
|
|
# In TVDB v4 wordt search via de API ondersteund; deze proxy is bewust dun.
|
|
payload = client.get("/search", params={"query": q, "type": "series"})
|
|
items = payload.get("data", [])
|
|
normalized = []
|
|
|
|
for item in items:
|
|
name = item.get("name") or item.get("seriesName") or item.get("slug")
|
|
year = item.get("year")
|
|
normalized.append(
|
|
{
|
|
"id": item.get("tvdb_id") or item.get("id"),
|
|
"name": name,
|
|
"year": year,
|
|
"display_name": f"{name} ({year})" if name and year else name,
|
|
"raw": item,
|
|
}
|
|
)
|
|
|
|
return {"items": normalized}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
def _series_display_name(name: str | None, year: int | str | None) -> str | None:
|
|
if name and year:
|
|
return f"{name} ({year})"
|
|
return name
|
|
|
|
|
|
def _extract_episodes(payload: dict) -> list[dict]:
|
|
data = payload.get("data")
|
|
if isinstance(data, list):
|
|
return data
|
|
if isinstance(data, dict):
|
|
episodes = data.get("episodes")
|
|
if isinstance(episodes, list):
|
|
return episodes
|
|
return []
|
|
|
|
|
|
def _fetch_aired_episodes(client: TvdbClient, series_id: int) -> list[dict]:
|
|
attempts = [
|
|
(f"/series/{series_id}/episodes/default", {"seasonType": "aired"}),
|
|
(f"/series/{series_id}/episodes/official", None),
|
|
]
|
|
errors = []
|
|
|
|
for path, params in attempts:
|
|
try:
|
|
payload = client.get(path, params=params)
|
|
items = _extract_episodes(payload)
|
|
if items:
|
|
return items
|
|
except Exception as exc:
|
|
errors.append(f"{path}: {exc}")
|
|
|
|
raise RuntimeError(
|
|
"Unable to load aired episodes from TVDB; tried known episode endpoints"
|
|
if not errors
|
|
else f"Unable to load aired episodes from TVDB ({'; '.join(errors)})"
|
|
)
|
|
|
|
|
|
def _status_name(value) -> str | None:
|
|
if isinstance(value, dict):
|
|
name = value.get("name")
|
|
if isinstance(name, str) and name.strip():
|
|
return name.strip()
|
|
if isinstance(value, str) and value.strip():
|
|
return value.strip()
|
|
return None
|
|
|
|
|
|
def _fetch_artwork_type_labels(client: TvdbClient) -> dict[int, str]:
|
|
payload = client.get("/artwork/types")
|
|
items = payload.get("data", []) if isinstance(payload, dict) else []
|
|
labels: dict[int, str] = {}
|
|
|
|
for item in items:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
type_id = item.get("id")
|
|
try:
|
|
type_id = int(type_id)
|
|
except (TypeError, ValueError):
|
|
continue
|
|
label = (
|
|
item.get("slug")
|
|
or item.get("name")
|
|
or ""
|
|
)
|
|
label = label.lower().strip() if isinstance(label, str) else ""
|
|
labels[type_id] = label
|
|
|
|
return labels
|
|
|
|
|
|
def _pick_banner_url(artworks: list[dict], artwork_type_labels: dict[int, str]) -> str | None:
|
|
strict_banner_candidates = []
|
|
wide_fallback_candidates = []
|
|
generic_wide_candidates = []
|
|
|
|
for artwork in artworks:
|
|
if not isinstance(artwork, dict):
|
|
continue
|
|
image = artwork.get("image") or artwork.get("thumbnail")
|
|
if not isinstance(image, str) or not image.strip():
|
|
continue
|
|
|
|
width = artwork.get("width")
|
|
height = artwork.get("height")
|
|
score = artwork.get("score") or 0
|
|
try:
|
|
width = int(width) if width is not None else 0
|
|
height = int(height) if height is not None else 0
|
|
except (TypeError, ValueError):
|
|
width, height = 0, 0
|
|
try:
|
|
score = float(score)
|
|
except (TypeError, ValueError):
|
|
score = 0.0
|
|
|
|
ratio = (width / height) if width > 0 and height > 0 else 0.0
|
|
record = (score, width, image.strip())
|
|
type_label = ""
|
|
try:
|
|
artwork_type = int(artwork.get("type"))
|
|
type_label = artwork_type_labels.get(artwork_type, "")
|
|
except (TypeError, ValueError):
|
|
type_label = ""
|
|
|
|
if "banner" in type_label:
|
|
strict_banner_candidates.append(record)
|
|
elif any(token in type_label for token in ("fanart", "background", "landscape")):
|
|
wide_fallback_candidates.append(record)
|
|
elif ratio >= 1.4:
|
|
generic_wide_candidates.append(record)
|
|
|
|
if strict_banner_candidates:
|
|
strict_banner_candidates.sort(key=lambda x: (x[0], x[1]), reverse=True)
|
|
return strict_banner_candidates[0][2]
|
|
|
|
if wide_fallback_candidates:
|
|
wide_fallback_candidates.sort(key=lambda x: (x[0], x[1]), reverse=True)
|
|
return wide_fallback_candidates[0][2]
|
|
|
|
if generic_wide_candidates:
|
|
generic_wide_candidates.sort(key=lambda x: (x[0], x[1]), reverse=True)
|
|
return generic_wide_candidates[0][2]
|
|
|
|
return None
|
|
|
|
|
|
@router.get("/series/{series_id}/summary")
|
|
def get_series_summary(series_id: int):
|
|
client = TvdbClient()
|
|
try:
|
|
extended_payload = client.get(f"/series/{series_id}/extended")
|
|
base_payload = client.get(f"/series/{series_id}")
|
|
|
|
extended_data = extended_payload.get("data", {}) if isinstance(extended_payload, dict) else {}
|
|
base_data = base_payload.get("data", {}) if isinstance(base_payload, dict) else {}
|
|
|
|
artworks = extended_data.get("artworks")
|
|
artworks = artworks if isinstance(artworks, list) else []
|
|
artwork_type_labels = _fetch_artwork_type_labels(client)
|
|
|
|
banner_url = _pick_banner_url(artworks, artwork_type_labels)
|
|
poster_url = (
|
|
base_data.get("image")
|
|
or extended_data.get("image")
|
|
or None
|
|
)
|
|
|
|
first_aired = (
|
|
extended_data.get("firstAired")
|
|
or base_data.get("firstAired")
|
|
or None
|
|
)
|
|
network = (
|
|
extended_data.get("latestNetwork", {}).get("name")
|
|
if isinstance(extended_data.get("latestNetwork"), dict)
|
|
else None
|
|
) or extended_data.get("network") or base_data.get("network")
|
|
status = _status_name(extended_data.get("status")) or _status_name(base_data.get("status"))
|
|
overview = (
|
|
extended_data.get("overview")
|
|
or base_data.get("overview")
|
|
or None
|
|
)
|
|
slug = extended_data.get("slug") or base_data.get("slug")
|
|
tvdb_id = extended_data.get("id") or base_data.get("id") or series_id
|
|
|
|
return {
|
|
"id": tvdb_id,
|
|
"banner_url": banner_url,
|
|
"poster_url": poster_url,
|
|
"first_aired": first_aired,
|
|
"network": network,
|
|
"status": status,
|
|
"overview": overview,
|
|
"slug": slug,
|
|
"tvdb_url": f"https://www.thetvdb.com/series/{slug}" if slug else None,
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@router.get("/series/{series_id}/episodes")
|
|
def get_series_episodes(
|
|
series_id: int,
|
|
order_type: str = Query("aired", min_length=1),
|
|
):
|
|
if order_type.lower() != "aired":
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Unsupported order_type. Only 'aired' is supported in this MVP.",
|
|
)
|
|
|
|
client = TvdbClient()
|
|
|
|
try:
|
|
series_payload = client.get(f"/series/{series_id}")
|
|
series_data = series_payload.get("data", {}) if isinstance(series_payload, dict) else {}
|
|
series_name = (
|
|
series_data.get("name")
|
|
or series_data.get("seriesName")
|
|
or series_data.get("slug")
|
|
)
|
|
series_year = series_data.get("year")
|
|
|
|
episodes = _fetch_aired_episodes(client, series_id)
|
|
normalized = []
|
|
|
|
for item in episodes:
|
|
season_number = (
|
|
item.get("seasonNumber")
|
|
or item.get("airedSeason")
|
|
or item.get("season")
|
|
or 0
|
|
)
|
|
episode_number = (
|
|
item.get("number")
|
|
or item.get("airedEpisodeNumber")
|
|
or item.get("episodeNumber")
|
|
or 0
|
|
)
|
|
title = item.get("name") or item.get("episodeName") or ""
|
|
aired_on = item.get("aired") or item.get("firstAired")
|
|
|
|
label = None
|
|
try:
|
|
season_num = int(season_number)
|
|
episode_num = int(episode_number)
|
|
label = f"S{season_num:02}E{episode_num:02} - {title}"
|
|
if aired_on:
|
|
label = f"{label} - {aired_on}"
|
|
except (TypeError, ValueError):
|
|
pass
|
|
|
|
normalized.append(
|
|
{
|
|
"id": item.get("id"),
|
|
"season_number": season_number,
|
|
"episode_number": episode_number,
|
|
"title": title,
|
|
"aired": aired_on,
|
|
"label": label,
|
|
"raw": item,
|
|
}
|
|
)
|
|
|
|
return {
|
|
"series": {
|
|
"id": series_data.get("id") or series_id,
|
|
"name": series_name,
|
|
"year": series_year,
|
|
"display_name": _series_display_name(series_name, series_year),
|
|
},
|
|
"order_type": "aired",
|
|
"items": normalized,
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|