import os from pathlib import Path class FileDiscoveryService: def __init__(self) -> None: self._allowed_extensions = self._load_allowed_extensions() self._roots = self._load_allowed_roots() def list_roots(self) -> list[dict]: roots = [] for root in self._roots: roots.append( { "id": root["id"], "path": str(root["path"]), "exists": root["path"].exists(), "readable": os.access(root["path"], os.R_OK), } ) return roots def discover( self, root_id: str, subpath: str = "", recursive: bool = False, limit: int = 200, ) -> dict: root = self._get_root_by_id(root_id) target = self._resolve_target(root["path"], subpath) files = [] if not target.exists(): return { "root_id": root["id"], "root_path": str(root["path"]), "subpath": subpath, "recursive": recursive, "limit": limit, "items": files, } if not target.is_dir(): raise ValueError("resolved target is not a directory") if recursive: iterator = target.rglob("*") else: iterator = target.iterdir() for entry in iterator: if len(files) >= limit: break if not entry.is_file(): continue ext = entry.suffix.lower() if ext not in self._allowed_extensions: continue try: relative_to_root = entry.resolve().relative_to(root["path"]) except ValueError: continue stat = entry.stat() files.append( { "name": entry.name, "path": str(entry), "relative_path": str(relative_to_root), "extension": ext, "size_bytes": int(stat.st_size), "modified_at_unix": int(stat.st_mtime), } ) return { "root_id": root["id"], "root_path": str(root["path"]), "subpath": subpath, "recursive": recursive, "limit": limit, "items": files, } def _load_allowed_extensions(self) -> set[str]: raw = os.getenv("ALLOWED_EXTENSIONS", "").strip() if raw: values = [x.strip().lower() for x in raw.split(",") if x.strip()] normalized = set() for value in values: if not value.startswith("."): value = f".{value}" normalized.add(value) if normalized: return normalized return {".mkv", ".mp4", ".avi", ".m4v", ".srt"} def _load_allowed_roots(self) -> list[dict]: raw = os.getenv("ALLOWED_MEDIA_ROOTS", "").strip() if raw: candidates = [p.strip() for p in raw.split(",") if p.strip()] else: media_root = os.getenv("MEDIA_ROOT", "").strip() if media_root: candidates = [media_root] else: candidates = [ "/Volumes/8TB/Shared_Folders/TV_Shows", "/Volumes/8TB_RAID1/Shared_Folders/Library/TV_Shows", ] roots = [] seen = set() for idx, candidate in enumerate(candidates, start=1): try: resolved = Path(candidate).resolve() except Exception: continue if resolved in seen: continue seen.add(resolved) roots.append( { "id": f"root{idx}", "path": resolved, } ) return roots def _get_root_by_id(self, root_id: str) -> dict: matches = [root for root in self._roots if root["id"] == root_id] if len(matches) != 1: raise ValueError("invalid root_id") return matches[0] def _resolve_target(self, root_path: Path, subpath: str) -> Path: safe_subpath = subpath.strip() sub = Path(safe_subpath) if safe_subpath else Path(".") if sub.is_absolute(): raise ValueError("subpath must be relative") if ".." in sub.parts: raise ValueError("subpath traversal is not allowed") target = (root_path / sub).resolve(strict=False) try: target.relative_to(root_path) except ValueError: raise ValueError("resolved target is outside selected root") return target