From 01dfc5f2e6df22b15ccce6e1d8c46f13932faa9b Mon Sep 17 00:00:00 2001 From: kodi Date: Sat, 7 Mar 2026 16:15:29 +0100 Subject: [PATCH] fase 8 afgerond --- .env | 2 +- app/api/files.py | 30 +++++ app/main.py | 2 + app/services/file_discovery_service.py | 153 +++++++++++++++++++++++++ feature_tests_file_discovery.sh | 109 ++++++++++++++++++ 5 files changed, 295 insertions(+), 1 deletion(-) create mode 100644 app/api/files.py create mode 100644 app/services/file_discovery_service.py create mode 100755 feature_tests_file_discovery.sh diff --git a/.env b/.env index 227c1b3..f07c799 100644 --- a/.env +++ b/.env @@ -3,7 +3,7 @@ APP_PORT=8080 APP_DATA_DIR=/app/data #MEDIA_ROOT=/data/media -ALLOWED_MEDIA_ROOTS=/Volumes/8TB/Shared_Folders/TV_Shows,/Volumes/8TB_RAID1/Shared_Folders/Library/TV_Shows +ALLOWED_MEDIA_ROOTS=/Volumes/8TB/Shared_Folders/TV_Shows TVDB_API_KEY=2c951d0c-0b7e-405b-bdb2-e250491dc69d TVDB_PIN= diff --git a/app/api/files.py b/app/api/files.py new file mode 100644 index 0000000..4b9a382 --- /dev/null +++ b/app/api/files.py @@ -0,0 +1,30 @@ +from fastapi import APIRouter, HTTPException, Query + +from app.services.file_discovery_service import FileDiscoveryService + +router = APIRouter() + + +@router.get("/roots") +def get_roots(): + service = FileDiscoveryService() + return {"items": service.list_roots()} + + +@router.get("/discover") +def discover_files( + root_id: str = Query(..., min_length=1), + subpath: str = Query(""), + recursive: bool = Query(False), + limit: int = Query(200, ge=1, le=1000), +): + service = FileDiscoveryService() + try: + return service.discover( + root_id=root_id, + subpath=subpath, + recursive=recursive, + limit=limit, + ) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) diff --git a/app/main.py b/app/main.py index e7c2ecd..f112cc0 100644 --- a/app/main.py +++ b/app/main.py @@ -1,4 +1,5 @@ from fastapi import FastAPI +from app.api.files import router as files_router from app.api.session import router as session_router from app.api.tvdb import router as tvdb_router @@ -6,6 +7,7 @@ app = FastAPI(title="Rename MVP") app.include_router(tvdb_router, prefix="/api/tvdb", tags=["tvdb"]) app.include_router(session_router, prefix="/api/session", tags=["session"]) +app.include_router(files_router, prefix="/api/files", tags=["files"]) @app.get("/api/health") diff --git a/app/services/file_discovery_service.py b/app/services/file_discovery_service.py new file mode 100644 index 0000000..9cd0323 --- /dev/null +++ b/app/services/file_discovery_service.py @@ -0,0 +1,153 @@ +import os +from pathlib import Path + + +class FileDiscoveryService: + def __init__(self) -> None: + self._allowed_extensions = self._load_allowed_extensions() + self._roots = self._load_allowed_roots() + + def list_roots(self) -> list[dict]: + roots = [] + for root in self._roots: + roots.append( + { + "id": root["id"], + "path": str(root["path"]), + "exists": root["path"].exists(), + "readable": os.access(root["path"], os.R_OK), + } + ) + return roots + + def discover( + self, + root_id: str, + subpath: str = "", + recursive: bool = False, + limit: int = 200, + ) -> dict: + root = self._get_root_by_id(root_id) + target = self._resolve_target(root["path"], subpath) + + files = [] + if not target.exists(): + return { + "root_id": root["id"], + "root_path": str(root["path"]), + "subpath": subpath, + "recursive": recursive, + "limit": limit, + "items": files, + } + if not target.is_dir(): + raise ValueError("resolved target is not a directory") + + if recursive: + iterator = target.rglob("*") + else: + iterator = target.iterdir() + + for entry in iterator: + if len(files) >= limit: + break + if not entry.is_file(): + continue + ext = entry.suffix.lower() + if ext not in self._allowed_extensions: + continue + + try: + relative_to_root = entry.resolve().relative_to(root["path"]) + except ValueError: + continue + + stat = entry.stat() + files.append( + { + "name": entry.name, + "path": str(entry), + "relative_path": str(relative_to_root), + "extension": ext, + "size_bytes": int(stat.st_size), + "modified_at_unix": int(stat.st_mtime), + } + ) + + return { + "root_id": root["id"], + "root_path": str(root["path"]), + "subpath": subpath, + "recursive": recursive, + "limit": limit, + "items": files, + } + + def _load_allowed_extensions(self) -> set[str]: + raw = os.getenv("ALLOWED_EXTENSIONS", "").strip() + if raw: + values = [x.strip().lower() for x in raw.split(",") if x.strip()] + normalized = set() + for value in values: + if not value.startswith("."): + value = f".{value}" + normalized.add(value) + if normalized: + return normalized + return {".mkv", ".mp4", ".avi", ".m4v", ".srt"} + + def _load_allowed_roots(self) -> list[dict]: + raw = os.getenv("ALLOWED_MEDIA_ROOTS", "").strip() + if raw: + candidates = [p.strip() for p in raw.split(",") if p.strip()] + else: + media_root = os.getenv("MEDIA_ROOT", "").strip() + if media_root: + candidates = [media_root] + else: + candidates = [ + "/Volumes/8TB/Shared_Folders/TV_Shows", + "/Volumes/8TB_RAID1/Shared_Folders/Library/TV_Shows", + ] + + roots = [] + seen = set() + for idx, candidate in enumerate(candidates, start=1): + try: + resolved = Path(candidate).resolve() + except Exception: + continue + if resolved in seen: + continue + seen.add(resolved) + roots.append( + { + "id": f"root{idx}", + "path": resolved, + } + ) + return roots + + def _get_root_by_id(self, root_id: str) -> dict: + matches = [root for root in self._roots if root["id"] == root_id] + if len(matches) != 1: + raise ValueError("invalid root_id") + return matches[0] + + def _resolve_target(self, root_path: Path, subpath: str) -> Path: + safe_subpath = subpath.strip() + sub = Path(safe_subpath) if safe_subpath else Path(".") + + if sub.is_absolute(): + raise ValueError("subpath must be relative") + if ".." in sub.parts: + raise ValueError("subpath traversal is not allowed") + + target = (root_path / sub).resolve(strict=False) + + try: + target.relative_to(root_path) + except ValueError: + raise ValueError("resolved target is outside selected root") + + return target diff --git a/feature_tests_file_discovery.sh b/feature_tests_file_discovery.sh new file mode 100755 index 0000000..d3d2703 --- /dev/null +++ b/feature_tests_file_discovery.sh @@ -0,0 +1,109 @@ +#!/usr/bin/env bash +set -euo pipefail + +if [ -z "${BASE_URL:-}" ]; then + if curl --silent --fail http://127.0.0.1:8085/api/health >/dev/null 2>&1; then + BASE_URL="http://127.0.0.1:8085" + elif curl --silent --fail http://host.containers.internal:8085/api/health >/dev/null 2>&1; then + BASE_URL="http://host.containers.internal:8085" + else + echo "ERROR: could not determine BASE_URL. Tried 127.0.0.1 and host.containers.internal." >&2 + exit 1 + fi +fi + +TMP_DIR="$(mktemp -d)" +trap 'rm -rf "$TMP_DIR"' EXIT + +echo "== Feature test 1: roots endpoint returns configured roots with stable fields ==" +curl --fail --silent --show-error \ + "${BASE_URL}/api/files/roots" \ + -o "${TMP_DIR}/roots.json" + +cat "${TMP_DIR}/roots.json" + +python3 - "${TMP_DIR}/roots.json" > "${TMP_DIR}/root_id.txt" <<'PY' +import json +import sys +from pathlib import Path + +data = json.loads(Path(sys.argv[1]).read_text(encoding="utf-8")) +items = data.get("items") +assert isinstance(items, list), "roots.items must be a list" +assert len(items) > 0, "roots.items must not be empty" +first = items[0] +for key in ["id", "path", "exists", "readable"]: + assert key in first, f"root missing key: {key}" +assert isinstance(first["id"], str) and first["id"], "root.id must be non-empty string" +assert isinstance(first["path"], str) and first["path"], "root.path must be non-empty string" +assert isinstance(first["exists"], bool), "root.exists must be boolean" +assert isinstance(first["readable"], bool), "root.readable must be boolean" +print(first["id"]) +PY + +ROOT_ID="$(cat "${TMP_DIR}/root_id.txt")" + +echo +echo "== Feature test 2: discover within root returns response shape and allowed extensions only ==" +curl --fail --silent --show-error \ + "${BASE_URL}/api/files/discover?root_id=${ROOT_ID}&subpath=&recursive=false&limit=200" \ + -o "${TMP_DIR}/discover.json" + +cat "${TMP_DIR}/discover.json" + +python3 - "${TMP_DIR}/discover.json" "$ROOT_ID" <<'PY' +import json +import sys +from pathlib import Path + +data = json.loads(Path(sys.argv[1]).read_text(encoding="utf-8")) +root_id = sys.argv[2] +assert data.get("root_id") == root_id, "discover.root_id mismatch" +assert "root_path" in data and isinstance(data["root_path"], str), "discover.root_path missing/invalid" +assert "items" in data and isinstance(data["items"], list), "discover.items missing/invalid" + +allowed = {".mkv", ".mp4", ".avi", ".m4v", ".srt"} +for item in data["items"]: + for key in ["name", "path", "relative_path", "extension", "size_bytes", "modified_at_unix"]: + assert key in item, f"discover item missing key: {key}" + assert item["extension"] in allowed, f"unexpected extension: {item['extension']}" + +print("discover shape validation passed") +PY + +echo +echo "== Feature test 3: traversal/absolute subpath is rejected ==" +curl --silent --show-error \ + -o "${TMP_DIR}/discover_bad_abs.json" \ + -w "%{http_code}" \ + "${BASE_URL}/api/files/discover?root_id=${ROOT_ID}&subpath=/etc&recursive=false&limit=10" \ + > "${TMP_DIR}/discover_bad_abs.status" + +curl --silent --show-error \ + -o "${TMP_DIR}/discover_bad_parent.json" \ + -w "%{http_code}" \ + "${BASE_URL}/api/files/discover?root_id=${ROOT_ID}&subpath=../secret&recursive=false&limit=10" \ + > "${TMP_DIR}/discover_bad_parent.status" + +cat "${TMP_DIR}/discover_bad_abs.json" +cat "${TMP_DIR}/discover_bad_parent.json" + +python3 - "${TMP_DIR}/discover_bad_abs.status" "${TMP_DIR}/discover_bad_abs.json" "${TMP_DIR}/discover_bad_parent.status" "${TMP_DIR}/discover_bad_parent.json" <<'PY' +import json +import sys +from pathlib import Path + +status_abs = Path(sys.argv[1]).read_text(encoding="utf-8").strip() +json_abs = json.loads(Path(sys.argv[2]).read_text(encoding="utf-8")) +status_parent = Path(sys.argv[3]).read_text(encoding="utf-8").strip() +json_parent = json.loads(Path(sys.argv[4]).read_text(encoding="utf-8")) + +assert status_abs == "400", f"absolute subpath should be 400, got {status_abs}" +assert status_parent == "400", f"parent traversal should be 400, got {status_parent}" +assert "detail" in json_abs, "absolute subpath response missing detail" +assert "detail" in json_parent, "parent traversal response missing detail" +print("subpath security validation passed") +PY + +echo +echo "All file discovery feature tests passed."