fase 8 afgerond

This commit is contained in:
kodi
2026-03-07 16:15:29 +01:00
parent 1ed4d5cf52
commit 01dfc5f2e6
5 changed files with 295 additions and 1 deletions
+30
View File
@@ -0,0 +1,30 @@
from fastapi import APIRouter, HTTPException, Query
from app.services.file_discovery_service import FileDiscoveryService
router = APIRouter()
@router.get("/roots")
def get_roots():
service = FileDiscoveryService()
return {"items": service.list_roots()}
@router.get("/discover")
def discover_files(
root_id: str = Query(..., min_length=1),
subpath: str = Query(""),
recursive: bool = Query(False),
limit: int = Query(200, ge=1, le=1000),
):
service = FileDiscoveryService()
try:
return service.discover(
root_id=root_id,
subpath=subpath,
recursive=recursive,
limit=limit,
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc))
+2
View File
@@ -1,4 +1,5 @@
from fastapi import FastAPI
from app.api.files import router as files_router
from app.api.session import router as session_router
from app.api.tvdb import router as tvdb_router
@@ -6,6 +7,7 @@ app = FastAPI(title="Rename MVP")
app.include_router(tvdb_router, prefix="/api/tvdb", tags=["tvdb"])
app.include_router(session_router, prefix="/api/session", tags=["session"])
app.include_router(files_router, prefix="/api/files", tags=["files"])
@app.get("/api/health")
+153
View File
@@ -0,0 +1,153 @@
import os
from pathlib import Path
class FileDiscoveryService:
def __init__(self) -> None:
self._allowed_extensions = self._load_allowed_extensions()
self._roots = self._load_allowed_roots()
def list_roots(self) -> list[dict]:
roots = []
for root in self._roots:
roots.append(
{
"id": root["id"],
"path": str(root["path"]),
"exists": root["path"].exists(),
"readable": os.access(root["path"], os.R_OK),
}
)
return roots
def discover(
self,
root_id: str,
subpath: str = "",
recursive: bool = False,
limit: int = 200,
) -> dict:
root = self._get_root_by_id(root_id)
target = self._resolve_target(root["path"], subpath)
files = []
if not target.exists():
return {
"root_id": root["id"],
"root_path": str(root["path"]),
"subpath": subpath,
"recursive": recursive,
"limit": limit,
"items": files,
}
if not target.is_dir():
raise ValueError("resolved target is not a directory")
if recursive:
iterator = target.rglob("*")
else:
iterator = target.iterdir()
for entry in iterator:
if len(files) >= limit:
break
if not entry.is_file():
continue
ext = entry.suffix.lower()
if ext not in self._allowed_extensions:
continue
try:
relative_to_root = entry.resolve().relative_to(root["path"])
except ValueError:
continue
stat = entry.stat()
files.append(
{
"name": entry.name,
"path": str(entry),
"relative_path": str(relative_to_root),
"extension": ext,
"size_bytes": int(stat.st_size),
"modified_at_unix": int(stat.st_mtime),
}
)
return {
"root_id": root["id"],
"root_path": str(root["path"]),
"subpath": subpath,
"recursive": recursive,
"limit": limit,
"items": files,
}
def _load_allowed_extensions(self) -> set[str]:
raw = os.getenv("ALLOWED_EXTENSIONS", "").strip()
if raw:
values = [x.strip().lower() for x in raw.split(",") if x.strip()]
normalized = set()
for value in values:
if not value.startswith("."):
value = f".{value}"
normalized.add(value)
if normalized:
return normalized
return {".mkv", ".mp4", ".avi", ".m4v", ".srt"}
def _load_allowed_roots(self) -> list[dict]:
raw = os.getenv("ALLOWED_MEDIA_ROOTS", "").strip()
if raw:
candidates = [p.strip() for p in raw.split(",") if p.strip()]
else:
media_root = os.getenv("MEDIA_ROOT", "").strip()
if media_root:
candidates = [media_root]
else:
candidates = [
"/Volumes/8TB/Shared_Folders/TV_Shows",
"/Volumes/8TB_RAID1/Shared_Folders/Library/TV_Shows",
]
roots = []
seen = set()
for idx, candidate in enumerate(candidates, start=1):
try:
resolved = Path(candidate).resolve()
except Exception:
continue
if resolved in seen:
continue
seen.add(resolved)
roots.append(
{
"id": f"root{idx}",
"path": resolved,
}
)
return roots
def _get_root_by_id(self, root_id: str) -> dict:
matches = [root for root in self._roots if root["id"] == root_id]
if len(matches) != 1:
raise ValueError("invalid root_id")
return matches[0]
def _resolve_target(self, root_path: Path, subpath: str) -> Path:
safe_subpath = subpath.strip()
sub = Path(safe_subpath) if safe_subpath else Path(".")
if sub.is_absolute():
raise ValueError("subpath must be relative")
if ".." in sub.parts:
raise ValueError("subpath traversal is not allowed")
target = (root_path / sub).resolve(strict=False)
try:
target.relative_to(root_path)
except ValueError:
raise ValueError("resolved target is outside selected root")
return target