Files
rename-mvp/app/services/session_service.py
T
2026-03-13 16:41:04 +01:00

1138 lines
40 KiB
Python

import json
import os
import re
import sqlite3
import time
from datetime import datetime, timezone
from pathlib import Path
from app.config import APP_DATA_DIR
class SessionService:
FILE_DATE_SETTING_KEY = "set_file_date_to_first_aired_date"
DEFAULT_ROOT_SETTING_KEY = "default_media_root_path"
REMEMBER_MAX_SERIES_KEY = "remember_max_series"
MAX_FILENAME_LEN = 220
DEFAULT_REMEMBER_MAX_SERIES = 10
def __init__(self) -> None:
self._db_path = Path(APP_DATA_DIR) / "session_state.sqlite3"
self._db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_db()
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self._db_path)
conn.row_factory = sqlite3.Row
return conn
def _init_db(self) -> None:
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS selected_episodes (
selection_id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
position INTEGER NOT NULL,
payload_json TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE UNIQUE INDEX IF NOT EXISTS idx_selected_episodes_session_position
ON selected_episodes(session_id, position)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_selected_episodes_session
ON selected_episodes(session_id)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS selected_files (
selection_id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
position INTEGER NOT NULL,
payload_json TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE UNIQUE INDEX IF NOT EXISTS idx_selected_files_session_position
ON selected_files(session_id, position)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_selected_files_session
ON selected_files(session_id)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS rename_runs (
run_id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
created_at TEXT NOT NULL,
confirm INTEGER NOT NULL,
executed INTEGER NOT NULL,
preflight_ok INTEGER NOT NULL,
episodes_count INTEGER NOT NULL,
files_count INTEGER NOT NULL,
duration_ms INTEGER NOT NULL
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_rename_runs_session
ON rename_runs(session_id)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS rename_run_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id INTEGER NOT NULL,
item_index INTEGER NOT NULL,
episode_selection_id INTEGER,
file_selection_id INTEGER,
source_path TEXT NOT NULL,
destination_path TEXT NOT NULL,
proposed_filename TEXT NOT NULL,
status TEXT NOT NULL,
errors_json TEXT NOT NULL,
FOREIGN KEY(run_id) REFERENCES rename_runs(run_id)
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_rename_run_items_run
ON rename_run_items(run_id, item_index)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS app_settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS remembered_series (
series_id TEXT PRIMARY KEY,
payload_json TEXT NOT NULL,
last_selected_at TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_remembered_series_last_selected
ON remembered_series(last_selected_at DESC)
"""
)
def get_settings(self) -> dict:
with self._connect() as conn:
rows = conn.execute(
"""
SELECT key, value
FROM app_settings
"""
).fetchall()
values = {str(row["key"]): str(row["value"]) for row in rows}
remember_max_raw = values.get(self.REMEMBER_MAX_SERIES_KEY, str(self.DEFAULT_REMEMBER_MAX_SERIES))
try:
remember_max = int(remember_max_raw)
except ValueError:
remember_max = self.DEFAULT_REMEMBER_MAX_SERIES
remember_max = max(1, min(100, remember_max))
return {
self.FILE_DATE_SETTING_KEY: values.get(self.FILE_DATE_SETTING_KEY, "0") == "1",
self.DEFAULT_ROOT_SETTING_KEY: values.get(self.DEFAULT_ROOT_SETTING_KEY) or None,
self.REMEMBER_MAX_SERIES_KEY: remember_max,
}
def update_settings(self, settings: dict) -> dict:
updated_at = datetime.now(timezone.utc).isoformat()
current = self.get_settings()
allowed_keys = {
self.FILE_DATE_SETTING_KEY,
self.DEFAULT_ROOT_SETTING_KEY,
self.REMEMBER_MAX_SERIES_KEY,
}
unknown_keys = [key for key in settings.keys() if key not in allowed_keys]
if unknown_keys:
raise ValueError(f"unknown setting key: {unknown_keys[0]}")
merged = dict(current)
merged.update(settings)
file_date_value = merged.get(self.FILE_DATE_SETTING_KEY)
if not isinstance(file_date_value, bool):
raise ValueError(f"{self.FILE_DATE_SETTING_KEY} must be boolean")
default_root_path = merged.get(self.DEFAULT_ROOT_SETTING_KEY)
if default_root_path is not None and not isinstance(default_root_path, str):
raise ValueError(f"{self.DEFAULT_ROOT_SETTING_KEY} must be string or null")
default_root_path = (default_root_path or "").strip() or None
remember_max_value = merged.get(self.REMEMBER_MAX_SERIES_KEY, self.DEFAULT_REMEMBER_MAX_SERIES)
if not isinstance(remember_max_value, int):
raise ValueError(f"{self.REMEMBER_MAX_SERIES_KEY} must be integer")
if remember_max_value < 1 or remember_max_value > 100:
raise ValueError(f"{self.REMEMBER_MAX_SERIES_KEY} must be between 1 and 100")
with self._connect() as conn:
conn.execute(
"""
INSERT INTO app_settings (key, value, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(key) DO UPDATE SET
value = excluded.value,
updated_at = excluded.updated_at
""",
(self.FILE_DATE_SETTING_KEY, "1" if file_date_value else "0", updated_at),
)
conn.execute(
"""
INSERT INTO app_settings (key, value, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(key) DO UPDATE SET
value = excluded.value,
updated_at = excluded.updated_at
""",
(self.DEFAULT_ROOT_SETTING_KEY, default_root_path or "", updated_at),
)
conn.execute(
"""
INSERT INTO app_settings (key, value, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(key) DO UPDATE SET
value = excluded.value,
updated_at = excluded.updated_at
""",
(self.REMEMBER_MAX_SERIES_KEY, str(remember_max_value), updated_at),
)
self._enforce_remembered_series_limit(remember_max_value)
return self.get_settings()
def list_remembered_series(self) -> list[dict]:
with self._connect() as conn:
rows = conn.execute(
"""
SELECT series_id, payload_json, last_selected_at
FROM remembered_series
ORDER BY last_selected_at DESC
"""
).fetchall()
items = []
for row in rows:
payload = json.loads(row["payload_json"])
items.append(
{
"series_id": row["series_id"],
"last_selected_at": row["last_selected_at"],
"series": payload,
}
)
return items
def remember_series(self, item: dict) -> list[dict]:
series_id = str(item.get("id") or "").strip()
if not series_id:
raise ValueError("series id is required")
# Preserve only expected display fields plus raw payload.
payload = {
"id": item.get("id"),
"name": item.get("name"),
"year": item.get("year"),
"display_name": item.get("display_name"),
"raw": item.get("raw", {}),
}
now = datetime.now(timezone.utc).isoformat()
with self._connect() as conn:
conn.execute(
"""
INSERT INTO remembered_series (series_id, payload_json, last_selected_at)
VALUES (?, ?, ?)
ON CONFLICT(series_id) DO UPDATE SET
payload_json = excluded.payload_json,
last_selected_at = excluded.last_selected_at
""",
(series_id, json.dumps(payload, ensure_ascii=True), now),
)
remember_max = int(self.get_settings().get(self.REMEMBER_MAX_SERIES_KEY, self.DEFAULT_REMEMBER_MAX_SERIES))
self._enforce_remembered_series_limit(remember_max)
return self.list_remembered_series()
def clear_remembered_series(self) -> None:
with self._connect() as conn:
conn.execute("DELETE FROM remembered_series")
def remove_remembered_series(self, series_id: str) -> list[dict]:
normalized = str(series_id or "").strip()
if not normalized:
raise ValueError("series_id is required")
with self._connect() as conn:
conn.execute(
"DELETE FROM remembered_series WHERE series_id = ?",
(normalized,),
)
return self.list_remembered_series()
def _enforce_remembered_series_limit(self, limit: int) -> None:
with self._connect() as conn:
conn.execute(
"""
DELETE FROM remembered_series
WHERE series_id NOT IN (
SELECT series_id
FROM remembered_series
ORDER BY last_selected_at DESC
LIMIT ?
)
""",
(int(limit),),
)
def list_selected_episodes(self, session_id: str) -> list[dict]:
with self._connect() as conn:
rows = conn.execute(
"""
SELECT selection_id, position, payload_json
FROM selected_episodes
WHERE session_id = ?
ORDER BY position ASC
""",
(session_id,),
).fetchall()
items = []
for row in rows:
payload = json.loads(row["payload_json"])
items.append(
{
"selection_id": row["selection_id"],
"position": row["position"],
"episode": payload,
}
)
return items
def add_selected_episodes(self, session_id: str, items: list[dict]) -> list[dict]:
if not items:
return self.list_selected_episodes(session_id)
with self._connect() as conn:
existing_rows = conn.execute(
"""
SELECT payload_json
FROM selected_episodes
WHERE session_id = ?
""",
(session_id,),
).fetchall()
existing_episode_ids: set[str] = set()
for row in existing_rows:
payload = json.loads(row["payload_json"])
episode_id = str(payload.get("id") or "").strip()
if episode_id:
existing_episode_ids.add(episode_id)
current_max = conn.execute(
"""
SELECT COALESCE(MAX(position), -1) AS max_position
FROM selected_episodes
WHERE session_id = ?
""",
(session_id,),
).fetchone()
next_position = int(current_max["max_position"]) + 1
for item in items:
episode_id = str(item.get("id") or "").strip()
if episode_id and episode_id in existing_episode_ids:
continue
conn.execute(
"""
INSERT INTO selected_episodes (session_id, position, payload_json)
VALUES (?, ?, ?)
""",
(session_id, next_position, json.dumps(item, ensure_ascii=True)),
)
if episode_id:
existing_episode_ids.add(episode_id)
next_position += 1
return self.list_selected_episodes(session_id)
def clear_selected_episodes(self, session_id: str) -> None:
with self._connect() as conn:
conn.execute(
"DELETE FROM selected_episodes WHERE session_id = ?",
(session_id,),
)
def remove_selected_episode(self, session_id: str, selection_id: int) -> list[dict]:
with self._connect() as conn:
conn.execute(
"""
DELETE FROM selected_episodes
WHERE session_id = ? AND selection_id = ?
""",
(session_id, selection_id),
)
return self._compact_positions(session_id)
def reorder_selected_episodes(
self,
session_id: str,
from_index: int,
to_index: int,
) -> list[dict]:
current_items = self.list_selected_episodes(session_id)
if from_index < 0 or from_index >= len(current_items):
raise ValueError("from_index out of range")
if to_index < 0 or to_index >= len(current_items):
raise ValueError("to_index out of range")
if from_index == to_index:
return current_items
moved = current_items.pop(from_index)
current_items.insert(to_index, moved)
with self._connect() as conn:
# Two-phase update avoids transient UNIQUE conflicts on (session_id, position).
for position, item in enumerate(current_items):
conn.execute(
"""
UPDATE selected_episodes
SET position = ?
WHERE session_id = ? AND selection_id = ?
""",
(-(position + 1), session_id, item["selection_id"]),
)
for position, item in enumerate(current_items):
conn.execute(
"""
UPDATE selected_episodes
SET position = ?
WHERE session_id = ? AND selection_id = ?
""",
(position, session_id, item["selection_id"]),
)
return self.list_selected_episodes(session_id)
def _compact_positions(self, session_id: str) -> list[dict]:
items = self.list_selected_episodes(session_id)
with self._connect() as conn:
for position, item in enumerate(items):
if item["position"] == position:
continue
conn.execute(
"""
UPDATE selected_episodes
SET position = ?
WHERE session_id = ? AND selection_id = ?
""",
(position, session_id, item["selection_id"]),
)
return self.list_selected_episodes(session_id)
def list_selected_files(self, session_id: str) -> list[dict]:
with self._connect() as conn:
rows = conn.execute(
"""
SELECT selection_id, position, payload_json
FROM selected_files
WHERE session_id = ?
ORDER BY position ASC
""",
(session_id,),
).fetchall()
items = []
for row in rows:
payload = json.loads(row["payload_json"])
items.append(
{
"selection_id": row["selection_id"],
"position": row["position"],
"file": payload,
}
)
return items
def add_selected_files(self, session_id: str, items: list[dict]) -> list[dict]:
if not items:
return self.list_selected_files(session_id)
with self._connect() as conn:
current_max = conn.execute(
"""
SELECT COALESCE(MAX(position), -1) AS max_position
FROM selected_files
WHERE session_id = ?
""",
(session_id,),
).fetchone()
next_position = int(current_max["max_position"]) + 1
for item in items:
conn.execute(
"""
INSERT INTO selected_files (session_id, position, payload_json)
VALUES (?, ?, ?)
""",
(session_id, next_position, json.dumps(item, ensure_ascii=True)),
)
next_position += 1
return self.list_selected_files(session_id)
def clear_selected_files(self, session_id: str) -> None:
with self._connect() as conn:
conn.execute(
"DELETE FROM selected_files WHERE session_id = ?",
(session_id,),
)
def remove_selected_file(self, session_id: str, selection_id: int) -> list[dict]:
with self._connect() as conn:
conn.execute(
"""
DELETE FROM selected_files
WHERE session_id = ? AND selection_id = ?
""",
(session_id, selection_id),
)
return self._compact_file_positions(session_id)
def reorder_selected_files(
self,
session_id: str,
from_index: int,
to_index: int,
) -> list[dict]:
current_items = self.list_selected_files(session_id)
if from_index < 0 or from_index >= len(current_items):
raise ValueError("from_index out of range")
if to_index < 0 or to_index >= len(current_items):
raise ValueError("to_index out of range")
if from_index == to_index:
return current_items
moved = current_items.pop(from_index)
current_items.insert(to_index, moved)
with self._connect() as conn:
# Two-phase update avoids transient UNIQUE conflicts on (session_id, position).
for position, item in enumerate(current_items):
conn.execute(
"""
UPDATE selected_files
SET position = ?
WHERE session_id = ? AND selection_id = ?
""",
(-(position + 1), session_id, item["selection_id"]),
)
for position, item in enumerate(current_items):
conn.execute(
"""
UPDATE selected_files
SET position = ?
WHERE session_id = ? AND selection_id = ?
""",
(position, session_id, item["selection_id"]),
)
return self.list_selected_files(session_id)
def _compact_file_positions(self, session_id: str) -> list[dict]:
items = self.list_selected_files(session_id)
with self._connect() as conn:
for position, item in enumerate(items):
if item["position"] == position:
continue
conn.execute(
"""
UPDATE selected_files
SET position = ?
WHERE session_id = ? AND selection_id = ?
""",
(position, session_id, item["selection_id"]),
)
return self.list_selected_files(session_id)
def build_mapping_preview(self, session_id: str) -> dict:
episodes = self.list_selected_episodes(session_id)
files = self.list_selected_files(session_id)
if len(episodes) != len(files):
raise ValueError(
"Selected episodes and selected files count mismatch: "
f"{len(episodes)} episodes vs {len(files)} files"
)
mappings = []
for index, (episode_item, file_item) in enumerate(zip(episodes, files)):
mappings.append(
{
"index": index,
"episode_selection_id": episode_item["selection_id"],
"file_selection_id": file_item["selection_id"],
"episode": episode_item["episode"],
"file": file_item["file"],
}
)
return {
"session_id": session_id,
"counts": {
"episodes": len(episodes),
"files": len(files),
},
"mappings": mappings,
}
def build_filename_preview(self, session_id: str) -> dict:
mapping_preview = self.build_mapping_preview(session_id)
previews = []
for item in mapping_preview["mappings"]:
episode = item["episode"]
file_payload = item["file"]
series = (
episode.get("series")
or episode.get("series_name")
or episode.get("show")
or "Unknown Series"
)
year = episode.get("year") or "0000"
series = self._normalize_series_name(series, year)
series = self.sanitize_filename_component(series)
title = episode.get("title") or "Untitled"
title = self.sanitize_filename_component(title)
season_raw = episode.get("season_number") or episode.get("season") or 0
episode_raw = episode.get("episode_number") or episode.get("number") or 0
try:
season_number = int(season_raw)
except (TypeError, ValueError):
season_number = 0
try:
episode_number = int(episode_raw)
except (TypeError, ValueError):
episode_number = 0
source_name = file_payload.get("name") or file_payload.get("path") or ""
ext = Path(source_name).suffix
proposed_filename = (
f"{series} ({year}) - S{season_number:02}E{episode_number:02} - {title}{ext}"
)
proposed_filename = self._finalize_filename(proposed_filename, ext)
previews.append(
{
"index": item["index"],
"episode_selection_id": item["episode_selection_id"],
"file_selection_id": item["file_selection_id"],
"episode": episode,
"file": file_payload,
"proposed_filename": proposed_filename,
}
)
return {
"session_id": mapping_preview["session_id"],
"counts": mapping_preview["counts"],
"template": "{series} ({year}) - S{season:02}E{episode:02} - {title}{ext}",
"items": previews,
}
def _normalize_series_name(self, series: str, year: int | str) -> str:
text = str(series or "").strip()
year_str = str(year or "").strip()
if not text or not year_str:
return text
# Strip trailing " (YEAR)" to avoid duplicate year in the template output.
pattern = re.compile(rf"\s*\({re.escape(year_str)}\)\s*$")
return pattern.sub("", text).strip()
def sanitize_filename_component(self, value: str) -> str:
text = str(value or "")
# Replace Windows/SMB disallowed characters with spaces.
text = re.sub(r'[\\/:*?"<>|]', " ", text)
# Normalize any repeated whitespace and trim trailing/leading dot/space.
text = re.sub(r"\s+", " ", text).strip(" .")
return text or "Untitled"
def _finalize_filename(self, filename: str, ext: str) -> str:
extension = str(ext or "")
stem = filename[: -len(extension)] if extension and filename.endswith(extension) else filename
stem = re.sub(r"\s+", " ", stem).strip(" .")
if not stem:
stem = "Untitled"
max_stem_len = max(1, self.MAX_FILENAME_LEN - len(extension))
if len(stem) > max_stem_len:
stem = stem[:max_stem_len].rstrip(" .")
if not stem:
stem = "Untitled"
return f"{stem}{extension}"
def execute_rename(self, session_id: str, confirm: bool) -> dict:
if not confirm:
raise ValueError("confirm=true is required to execute rename")
started_at = time.perf_counter()
preview = self.build_filename_preview(session_id)
settings = self.get_settings()
set_file_date_to_first_aired = bool(settings.get(self.FILE_DATE_SETTING_KEY, False))
allowed_roots = self._allowed_media_roots()
preflight_items = []
preflight_errors = 0
aired_by_index = {}
for preview_item in preview["items"]:
aired_by_index[int(preview_item["index"])] = preview_item["episode"].get("aired")
for item in preview["items"]:
source_path_str = str(item["file"].get("path") or "").strip()
proposed_filename = item["proposed_filename"]
source_path = Path(source_path_str)
destination_path = source_path.with_name(proposed_filename) if source_path_str else Path("")
errors = self._preflight_errors(
source_path=source_path,
destination_path=destination_path,
proposed_filename=proposed_filename,
allowed_roots=allowed_roots,
)
if source_path_str and source_path == destination_path:
errors = [err for err in errors if err != "source and destination paths are equal"]
status = "ready"
if errors:
status = "preflight_error"
preflight_errors += 1
preflight_items.append(
{
"index": item["index"],
"episode_selection_id": item["episode_selection_id"],
"file_selection_id": item["file_selection_id"],
"source_path": source_path_str,
"destination_path": str(destination_path) if source_path_str else "",
"proposed_filename": proposed_filename,
"status": status,
"errors": errors,
"file_date_status": "file_date_skipped",
"file_date_detail": "rename not executed due to preflight failure",
}
)
if preflight_errors > 0:
result = {
"session_id": session_id,
"confirm": confirm,
"executed": False,
"preflight_ok": False,
"counts": preview["counts"],
"items": preflight_items,
}
self._log_rename_run(
session_id=session_id,
result=result,
duration_ms=int((time.perf_counter() - started_at) * 1000),
)
return result
results = []
for item in preflight_items:
source_path = Path(item["source_path"])
destination_path = Path(item["destination_path"])
rename_needed = source_path != destination_path
if rename_needed:
os.replace(str(source_path), str(destination_path))
target_path = destination_path
item_status = "renamed"
else:
target_path = source_path
item_status = "unchanged"
file_date_status, file_date_detail = self._apply_file_date_after_rename(
enabled=set_file_date_to_first_aired,
aired_value=aired_by_index.get(int(item["index"])),
destination_path=target_path,
)
results.append(
{
**item,
"status": item_status,
"errors": [],
"file_date_status": file_date_status,
"file_date_detail": file_date_detail,
}
)
result = {
"session_id": session_id,
"confirm": confirm,
"executed": True,
"preflight_ok": True,
"counts": preview["counts"],
"items": results,
}
self._log_rename_run(
session_id=session_id,
result=result,
duration_ms=int((time.perf_counter() - started_at) * 1000),
)
return result
def _apply_file_date_after_rename(
self,
enabled: bool,
aired_value,
destination_path: Path,
) -> tuple[str, str]:
if not enabled:
return ("file_date_skipped", "setting disabled")
ts = self._aired_to_local_noon_timestamp(aired_value)
if ts is None:
return ("file_date_skipped", "aired date missing or invalid")
try:
os.utime(destination_path, (ts, ts))
return ("file_date_updated", "mtime+atime set to aired date at 12:00 local time")
except Exception as exc:
return ("file_date_error", str(exc))
def _aired_to_local_noon_timestamp(self, aired_value) -> float | None:
if aired_value is None:
return None
text = str(aired_value).strip()
if not text:
return None
date_text = text[:10]
try:
date_part = datetime.strptime(date_text, "%Y-%m-%d")
except ValueError:
return None
# Convert with local-time semantics (host/container local timezone),
# avoiding implicit UTC conversion paths.
local_struct = (
date_part.year,
date_part.month,
date_part.day,
12,
0,
0,
-1,
-1,
-1,
)
return time.mktime(local_struct)
def _log_rename_run(self, session_id: str, result: dict, duration_ms: int) -> None:
created_at = datetime.now(timezone.utc).isoformat()
counts = result.get("counts", {})
with self._connect() as conn:
cursor = conn.execute(
"""
INSERT INTO rename_runs (
session_id,
created_at,
confirm,
executed,
preflight_ok,
episodes_count,
files_count,
duration_ms
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
session_id,
created_at,
1 if result.get("confirm") else 0,
1 if result.get("executed") else 0,
1 if result.get("preflight_ok") else 0,
int(counts.get("episodes") or 0),
int(counts.get("files") or 0),
int(duration_ms),
),
)
run_id = int(cursor.lastrowid)
for item in result.get("items", []):
conn.execute(
"""
INSERT INTO rename_run_items (
run_id,
item_index,
episode_selection_id,
file_selection_id,
source_path,
destination_path,
proposed_filename,
status,
errors_json
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
run_id,
int(item.get("index") or 0),
item.get("episode_selection_id"),
item.get("file_selection_id"),
str(item.get("source_path") or ""),
str(item.get("destination_path") or ""),
str(item.get("proposed_filename") or ""),
str(item.get("status") or ""),
json.dumps(item.get("errors") or [], ensure_ascii=True),
),
)
def list_rename_runs(self, session_id: str, limit: int = 20) -> list[dict]:
with self._connect() as conn:
rows = conn.execute(
"""
SELECT
run_id,
session_id,
created_at,
confirm,
executed,
preflight_ok,
episodes_count,
files_count,
duration_ms
FROM rename_runs
WHERE session_id = ?
ORDER BY run_id DESC
LIMIT ?
""",
(session_id, limit),
).fetchall()
item_counts = conn.execute(
"""
SELECT run_id, COUNT(*) AS item_count
FROM rename_run_items
WHERE run_id IN (
SELECT run_id
FROM rename_runs
WHERE session_id = ?
ORDER BY run_id DESC
LIMIT ?
)
GROUP BY run_id
""",
(session_id, limit),
).fetchall()
counts_by_run = {int(row["run_id"]): int(row["item_count"]) for row in item_counts}
runs = []
for row in rows:
run_id = int(row["run_id"])
runs.append(
{
"run_id": run_id,
"session_id": row["session_id"],
"created_at": row["created_at"],
"confirm": bool(row["confirm"]),
"executed": bool(row["executed"]),
"preflight_ok": bool(row["preflight_ok"]),
"counts": {
"episodes": int(row["episodes_count"]),
"files": int(row["files_count"]),
},
"duration_ms": int(row["duration_ms"]),
"items_count": counts_by_run.get(run_id, 0),
}
)
return runs
def get_rename_run(self, run_id: int) -> dict | None:
with self._connect() as conn:
run = conn.execute(
"""
SELECT
run_id,
session_id,
created_at,
confirm,
executed,
preflight_ok,
episodes_count,
files_count,
duration_ms
FROM rename_runs
WHERE run_id = ?
""",
(run_id,),
).fetchone()
if run is None:
return None
item_rows = conn.execute(
"""
SELECT
item_index,
episode_selection_id,
file_selection_id,
source_path,
destination_path,
proposed_filename,
status,
errors_json
FROM rename_run_items
WHERE run_id = ?
ORDER BY item_index ASC
""",
(run_id,),
).fetchall()
items = []
for row in item_rows:
items.append(
{
"index": int(row["item_index"]),
"episode_selection_id": row["episode_selection_id"],
"file_selection_id": row["file_selection_id"],
"source_path": row["source_path"],
"destination_path": row["destination_path"],
"proposed_filename": row["proposed_filename"],
"status": row["status"],
"errors": json.loads(row["errors_json"]),
}
)
return {
"run_id": int(run["run_id"]),
"session_id": run["session_id"],
"created_at": run["created_at"],
"confirm": bool(run["confirm"]),
"executed": bool(run["executed"]),
"preflight_ok": bool(run["preflight_ok"]),
"counts": {
"episodes": int(run["episodes_count"]),
"files": int(run["files_count"]),
},
"duration_ms": int(run["duration_ms"]),
"items": items,
}
def _allowed_media_roots(self) -> list[Path]:
raw = os.getenv("ALLOWED_MEDIA_ROOTS", "").strip()
if raw:
candidates = [p.strip() for p in raw.split(",") if p.strip()]
else:
media_root = os.getenv("MEDIA_ROOT", "").strip()
if media_root:
candidates = [media_root]
else:
candidates = [
"/Volumes/8TB/Shared_Folders/TV_Shows",
"/Volumes/8TB_RAID1/Shared_Folders/Library/TV_Shows",
]
roots = []
for candidate in candidates:
try:
roots.append(Path(candidate).resolve())
except Exception:
continue
return roots
def _is_within_allowed_roots(self, path: Path, allowed_roots: list[Path]) -> bool:
try:
resolved = path.resolve()
except Exception:
return False
for root in allowed_roots:
try:
resolved.relative_to(root)
return True
except ValueError:
continue
return False
def _preflight_errors(
self,
source_path: Path,
destination_path: Path,
proposed_filename: str,
allowed_roots: list[Path],
) -> list[str]:
errors = []
if not str(source_path):
errors.append("source path missing")
return errors
if ".." in source_path.parts:
errors.append("source path traversal is not allowed")
if ".." in Path(proposed_filename).parts:
errors.append("destination filename traversal is not allowed")
if Path(proposed_filename).name != proposed_filename:
errors.append("destination filename must not contain path separators")
if not self._is_within_allowed_roots(source_path, allowed_roots):
errors.append("source path is outside allowed media roots")
if not self._is_within_allowed_roots(destination_path, allowed_roots):
errors.append("destination path is outside allowed media roots")
if not source_path.exists():
errors.append("source file does not exist")
if source_path.exists() and not source_path.is_file():
errors.append("source path is not a file")
if source_path == destination_path:
errors.append("source and destination paths are equal")
if destination_path.exists():
errors.append("destination file already exists")
if not destination_path.parent.exists():
errors.append("destination parent directory does not exist")
return errors