fase 6 - Controlled Rename Execute API (bevestigde rename op basis van preview) afgerond

This commit is contained in:
kodi
2026-03-07 15:14:43 +01:00
parent 00c32f9ee6
commit cfd18d7535
7 changed files with 409 additions and 11 deletions
+150
View File
@@ -1,4 +1,5 @@
import json
import os
import sqlite3
from pathlib import Path
@@ -401,3 +402,152 @@ class SessionService:
"template": "{series} ({year}) - S{season:02}E{episode:02} - {title}{ext}",
"items": previews,
}
def execute_rename(self, session_id: str, confirm: bool) -> dict:
if not confirm:
raise ValueError("confirm=true is required to execute rename")
preview = self.build_filename_preview(session_id)
allowed_roots = self._allowed_media_roots()
preflight_items = []
preflight_errors = 0
for item in preview["items"]:
source_path_str = str(item["file"].get("path") or "").strip()
proposed_filename = item["proposed_filename"]
source_path = Path(source_path_str)
destination_path = source_path.with_name(proposed_filename) if source_path_str else Path("")
errors = self._preflight_errors(
source_path=source_path,
destination_path=destination_path,
proposed_filename=proposed_filename,
allowed_roots=allowed_roots,
)
status = "ready"
if errors:
status = "preflight_error"
preflight_errors += 1
preflight_items.append(
{
"index": item["index"],
"episode_selection_id": item["episode_selection_id"],
"file_selection_id": item["file_selection_id"],
"source_path": source_path_str,
"destination_path": str(destination_path) if source_path_str else "",
"proposed_filename": proposed_filename,
"status": status,
"errors": errors,
}
)
if preflight_errors > 0:
return {
"session_id": session_id,
"confirm": confirm,
"executed": False,
"preflight_ok": False,
"counts": preview["counts"],
"items": preflight_items,
}
results = []
for item in preflight_items:
source_path = Path(item["source_path"])
destination_path = Path(item["destination_path"])
os.replace(str(source_path), str(destination_path))
results.append(
{
**item,
"status": "renamed",
"errors": [],
}
)
return {
"session_id": session_id,
"confirm": confirm,
"executed": True,
"preflight_ok": True,
"counts": preview["counts"],
"items": results,
}
def _allowed_media_roots(self) -> list[Path]:
raw = os.getenv("ALLOWED_MEDIA_ROOTS", "").strip()
if raw:
candidates = [p.strip() for p in raw.split(",") if p.strip()]
else:
media_root = os.getenv("MEDIA_ROOT", "").strip()
if media_root:
candidates = [media_root]
else:
candidates = [
"/Volumes/8TB/Shared_Folders/TV_Shows",
"/Volumes/8TB_RAID1/Shared_Folders/Library/TV_Shows",
]
roots = []
for candidate in candidates:
try:
roots.append(Path(candidate).resolve())
except Exception:
continue
return roots
def _is_within_allowed_roots(self, path: Path, allowed_roots: list[Path]) -> bool:
try:
resolved = path.resolve()
except Exception:
return False
for root in allowed_roots:
try:
resolved.relative_to(root)
return True
except ValueError:
continue
return False
def _preflight_errors(
self,
source_path: Path,
destination_path: Path,
proposed_filename: str,
allowed_roots: list[Path],
) -> list[str]:
errors = []
if not str(source_path):
errors.append("source path missing")
return errors
if ".." in source_path.parts:
errors.append("source path traversal is not allowed")
if ".." in Path(proposed_filename).parts:
errors.append("destination filename traversal is not allowed")
if Path(proposed_filename).name != proposed_filename:
errors.append("destination filename must not contain path separators")
if not self._is_within_allowed_roots(source_path, allowed_roots):
errors.append("source path is outside allowed media roots")
if not self._is_within_allowed_roots(destination_path, allowed_roots):
errors.append("destination path is outside allowed media roots")
if not source_path.exists():
errors.append("source file does not exist")
if source_path.exists() and not source_path.is_file():
errors.append("source path is not a file")
if source_path == destination_path:
errors.append("source and destination paths are equal")
if destination_path.exists():
errors.append("destination file already exists")
if not destination_path.parent.exists():
errors.append("destination parent directory does not exist")
return errors