203 lines
6.1 KiB
Python
203 lines
6.1 KiB
Python
import os
|
|
from pathlib import Path
|
|
|
|
|
|
class FileDiscoveryService:
|
|
def __init__(self) -> None:
|
|
self._allowed_extensions = self._load_allowed_extensions()
|
|
self._roots = self._load_allowed_roots()
|
|
|
|
def list_roots(self) -> list[dict]:
|
|
roots = []
|
|
for root in self._roots:
|
|
roots.append(
|
|
{
|
|
"id": root["id"],
|
|
"path": str(root["path"]),
|
|
"exists": root["path"].exists(),
|
|
"readable": os.access(root["path"], os.R_OK),
|
|
}
|
|
)
|
|
return roots
|
|
|
|
def discover(
|
|
self,
|
|
root_id: str,
|
|
subpath: str = "",
|
|
recursive: bool = False,
|
|
limit: int = 200,
|
|
) -> dict:
|
|
root = self._get_root_by_id(root_id)
|
|
target = self._resolve_target(root["path"], subpath)
|
|
|
|
files = []
|
|
if not target.exists():
|
|
return {
|
|
"root_id": root["id"],
|
|
"root_path": str(root["path"]),
|
|
"subpath": subpath,
|
|
"recursive": recursive,
|
|
"limit": limit,
|
|
"items": files,
|
|
}
|
|
if not target.is_dir():
|
|
raise ValueError("resolved target is not a directory")
|
|
|
|
if recursive:
|
|
iterator = target.rglob("*")
|
|
else:
|
|
iterator = target.iterdir()
|
|
|
|
for entry in iterator:
|
|
if len(files) >= limit:
|
|
break
|
|
if not entry.is_file():
|
|
continue
|
|
ext = entry.suffix.lower()
|
|
if ext not in self._allowed_extensions:
|
|
continue
|
|
|
|
try:
|
|
relative_to_root = entry.resolve().relative_to(root["path"])
|
|
except ValueError:
|
|
continue
|
|
|
|
stat = entry.stat()
|
|
files.append(
|
|
{
|
|
"name": entry.name,
|
|
"path": str(entry),
|
|
"relative_path": str(relative_to_root),
|
|
"extension": ext,
|
|
"size_bytes": int(stat.st_size),
|
|
"modified_at_unix": int(stat.st_mtime),
|
|
}
|
|
)
|
|
|
|
return {
|
|
"root_id": root["id"],
|
|
"root_path": str(root["path"]),
|
|
"subpath": subpath,
|
|
"recursive": recursive,
|
|
"limit": limit,
|
|
"items": files,
|
|
}
|
|
|
|
def list_folders(
|
|
self,
|
|
root_id: str,
|
|
subpath: str = "",
|
|
limit: int = 5000,
|
|
) -> dict:
|
|
root = self._get_root_by_id(root_id)
|
|
target = self._resolve_target(root["path"], subpath)
|
|
|
|
folders = []
|
|
if not target.exists():
|
|
return {
|
|
"root_id": root["id"],
|
|
"root_path": str(root["path"]),
|
|
"subpath": subpath,
|
|
"limit": limit,
|
|
"items": folders,
|
|
}
|
|
if not target.is_dir():
|
|
raise ValueError("resolved target is not a directory")
|
|
|
|
for entry in target.iterdir():
|
|
if len(folders) >= limit:
|
|
break
|
|
if not entry.is_dir():
|
|
continue
|
|
|
|
try:
|
|
relative_to_root = entry.resolve().relative_to(root["path"])
|
|
except ValueError:
|
|
continue
|
|
|
|
folders.append(
|
|
{
|
|
"name": entry.name,
|
|
"subpath": str(relative_to_root),
|
|
"path": str(entry),
|
|
}
|
|
)
|
|
|
|
folders.sort(key=lambda x: x["name"].lower())
|
|
return {
|
|
"root_id": root["id"],
|
|
"root_path": str(root["path"]),
|
|
"subpath": subpath,
|
|
"limit": limit,
|
|
"items": folders,
|
|
}
|
|
|
|
def _load_allowed_extensions(self) -> set[str]:
|
|
raw = os.getenv("ALLOWED_EXTENSIONS", "").strip()
|
|
if raw:
|
|
values = [x.strip().lower() for x in raw.split(",") if x.strip()]
|
|
normalized = set()
|
|
for value in values:
|
|
if not value.startswith("."):
|
|
value = f".{value}"
|
|
normalized.add(value)
|
|
if normalized:
|
|
return normalized
|
|
return {".mkv", ".mp4", ".avi", ".m4v", ".srt"}
|
|
|
|
def _load_allowed_roots(self) -> list[dict]:
|
|
raw = os.getenv("ALLOWED_MEDIA_ROOTS", "").strip()
|
|
if raw:
|
|
candidates = [p.strip() for p in raw.split(",") if p.strip()]
|
|
else:
|
|
media_root = os.getenv("MEDIA_ROOT", "").strip()
|
|
if media_root:
|
|
candidates = [media_root]
|
|
else:
|
|
candidates = [
|
|
"/Volumes/8TB/Shared_Folders/TV_Shows",
|
|
"/Volumes/8TB_RAID1/Shared_Folders/Library/TV_Shows",
|
|
]
|
|
|
|
roots = []
|
|
seen = set()
|
|
for idx, candidate in enumerate(candidates, start=1):
|
|
try:
|
|
resolved = Path(candidate).resolve()
|
|
except Exception:
|
|
continue
|
|
if resolved in seen:
|
|
continue
|
|
seen.add(resolved)
|
|
roots.append(
|
|
{
|
|
"id": f"root{idx}",
|
|
"path": resolved,
|
|
}
|
|
)
|
|
return roots
|
|
|
|
def _get_root_by_id(self, root_id: str) -> dict:
|
|
matches = [root for root in self._roots if root["id"] == root_id]
|
|
if len(matches) != 1:
|
|
raise ValueError("invalid root_id")
|
|
return matches[0]
|
|
|
|
def _resolve_target(self, root_path: Path, subpath: str) -> Path:
|
|
safe_subpath = subpath.strip()
|
|
sub = Path(safe_subpath) if safe_subpath else Path(".")
|
|
|
|
if sub.is_absolute():
|
|
raise ValueError("subpath must be relative")
|
|
if ".." in sub.parts:
|
|
raise ValueError("subpath traversal is not allowed")
|
|
|
|
target = (root_path / sub).resolve(strict=False)
|
|
|
|
try:
|
|
target.relative_to(root_path)
|
|
except ValueError:
|
|
raise ValueError("resolved target is outside selected root")
|
|
|
|
return target
|