1108 lines
39 KiB
Python
1108 lines
39 KiB
Python
import json
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from app.config import APP_DATA_DIR
|
|
|
|
|
|
class SessionService:
|
|
FILE_DATE_SETTING_KEY = "set_file_date_to_first_aired_date"
|
|
DEFAULT_ROOT_SETTING_KEY = "default_media_root_path"
|
|
REMEMBER_MAX_SERIES_KEY = "remember_max_series"
|
|
MAX_FILENAME_LEN = 220
|
|
DEFAULT_REMEMBER_MAX_SERIES = 10
|
|
|
|
def __init__(self) -> None:
|
|
self._db_path = Path(APP_DATA_DIR) / "session_state.sqlite3"
|
|
self._db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self._init_db()
|
|
|
|
def _connect(self) -> sqlite3.Connection:
|
|
conn = sqlite3.connect(self._db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
def _init_db(self) -> None:
|
|
with self._connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS selected_episodes (
|
|
selection_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
session_id TEXT NOT NULL,
|
|
position INTEGER NOT NULL,
|
|
payload_json TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_selected_episodes_session_position
|
|
ON selected_episodes(session_id, position)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_selected_episodes_session
|
|
ON selected_episodes(session_id)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS selected_files (
|
|
selection_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
session_id TEXT NOT NULL,
|
|
position INTEGER NOT NULL,
|
|
payload_json TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_selected_files_session_position
|
|
ON selected_files(session_id, position)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_selected_files_session
|
|
ON selected_files(session_id)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS rename_runs (
|
|
run_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
session_id TEXT NOT NULL,
|
|
created_at TEXT NOT NULL,
|
|
confirm INTEGER NOT NULL,
|
|
executed INTEGER NOT NULL,
|
|
preflight_ok INTEGER NOT NULL,
|
|
episodes_count INTEGER NOT NULL,
|
|
files_count INTEGER NOT NULL,
|
|
duration_ms INTEGER NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_rename_runs_session
|
|
ON rename_runs(session_id)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS rename_run_items (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
run_id INTEGER NOT NULL,
|
|
item_index INTEGER NOT NULL,
|
|
episode_selection_id INTEGER,
|
|
file_selection_id INTEGER,
|
|
source_path TEXT NOT NULL,
|
|
destination_path TEXT NOT NULL,
|
|
proposed_filename TEXT NOT NULL,
|
|
status TEXT NOT NULL,
|
|
errors_json TEXT NOT NULL,
|
|
FOREIGN KEY(run_id) REFERENCES rename_runs(run_id)
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_rename_run_items_run
|
|
ON rename_run_items(run_id, item_index)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS app_settings (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT NOT NULL,
|
|
updated_at TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS remembered_series (
|
|
series_id TEXT PRIMARY KEY,
|
|
payload_json TEXT NOT NULL,
|
|
last_selected_at TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_remembered_series_last_selected
|
|
ON remembered_series(last_selected_at DESC)
|
|
"""
|
|
)
|
|
|
|
def get_settings(self) -> dict:
|
|
with self._connect() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT key, value
|
|
FROM app_settings
|
|
"""
|
|
).fetchall()
|
|
|
|
values = {str(row["key"]): str(row["value"]) for row in rows}
|
|
remember_max_raw = values.get(self.REMEMBER_MAX_SERIES_KEY, str(self.DEFAULT_REMEMBER_MAX_SERIES))
|
|
try:
|
|
remember_max = int(remember_max_raw)
|
|
except ValueError:
|
|
remember_max = self.DEFAULT_REMEMBER_MAX_SERIES
|
|
remember_max = max(1, min(100, remember_max))
|
|
return {
|
|
self.FILE_DATE_SETTING_KEY: values.get(self.FILE_DATE_SETTING_KEY, "0") == "1",
|
|
self.DEFAULT_ROOT_SETTING_KEY: values.get(self.DEFAULT_ROOT_SETTING_KEY) or None,
|
|
self.REMEMBER_MAX_SERIES_KEY: remember_max,
|
|
}
|
|
|
|
def update_settings(self, settings: dict) -> dict:
|
|
updated_at = datetime.now(timezone.utc).isoformat()
|
|
current = self.get_settings()
|
|
|
|
allowed_keys = {
|
|
self.FILE_DATE_SETTING_KEY,
|
|
self.DEFAULT_ROOT_SETTING_KEY,
|
|
self.REMEMBER_MAX_SERIES_KEY,
|
|
}
|
|
unknown_keys = [key for key in settings.keys() if key not in allowed_keys]
|
|
if unknown_keys:
|
|
raise ValueError(f"unknown setting key: {unknown_keys[0]}")
|
|
|
|
merged = dict(current)
|
|
merged.update(settings)
|
|
|
|
file_date_value = merged.get(self.FILE_DATE_SETTING_KEY)
|
|
if not isinstance(file_date_value, bool):
|
|
raise ValueError(f"{self.FILE_DATE_SETTING_KEY} must be boolean")
|
|
|
|
default_root_path = merged.get(self.DEFAULT_ROOT_SETTING_KEY)
|
|
if default_root_path is not None and not isinstance(default_root_path, str):
|
|
raise ValueError(f"{self.DEFAULT_ROOT_SETTING_KEY} must be string or null")
|
|
default_root_path = (default_root_path or "").strip() or None
|
|
|
|
remember_max_value = merged.get(self.REMEMBER_MAX_SERIES_KEY, self.DEFAULT_REMEMBER_MAX_SERIES)
|
|
if not isinstance(remember_max_value, int):
|
|
raise ValueError(f"{self.REMEMBER_MAX_SERIES_KEY} must be integer")
|
|
if remember_max_value < 1 or remember_max_value > 100:
|
|
raise ValueError(f"{self.REMEMBER_MAX_SERIES_KEY} must be between 1 and 100")
|
|
|
|
with self._connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO app_settings (key, value, updated_at)
|
|
VALUES (?, ?, ?)
|
|
ON CONFLICT(key) DO UPDATE SET
|
|
value = excluded.value,
|
|
updated_at = excluded.updated_at
|
|
""",
|
|
(self.FILE_DATE_SETTING_KEY, "1" if file_date_value else "0", updated_at),
|
|
)
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO app_settings (key, value, updated_at)
|
|
VALUES (?, ?, ?)
|
|
ON CONFLICT(key) DO UPDATE SET
|
|
value = excluded.value,
|
|
updated_at = excluded.updated_at
|
|
""",
|
|
(self.DEFAULT_ROOT_SETTING_KEY, default_root_path or "", updated_at),
|
|
)
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO app_settings (key, value, updated_at)
|
|
VALUES (?, ?, ?)
|
|
ON CONFLICT(key) DO UPDATE SET
|
|
value = excluded.value,
|
|
updated_at = excluded.updated_at
|
|
""",
|
|
(self.REMEMBER_MAX_SERIES_KEY, str(remember_max_value), updated_at),
|
|
)
|
|
|
|
self._enforce_remembered_series_limit(remember_max_value)
|
|
|
|
return self.get_settings()
|
|
|
|
def list_remembered_series(self) -> list[dict]:
|
|
with self._connect() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT series_id, payload_json, last_selected_at
|
|
FROM remembered_series
|
|
ORDER BY last_selected_at DESC
|
|
"""
|
|
).fetchall()
|
|
|
|
items = []
|
|
for row in rows:
|
|
payload = json.loads(row["payload_json"])
|
|
items.append(
|
|
{
|
|
"series_id": row["series_id"],
|
|
"last_selected_at": row["last_selected_at"],
|
|
"series": payload,
|
|
}
|
|
)
|
|
return items
|
|
|
|
def remember_series(self, item: dict) -> list[dict]:
|
|
series_id = str(item.get("id") or "").strip()
|
|
if not series_id:
|
|
raise ValueError("series id is required")
|
|
|
|
# Preserve only expected display fields plus raw payload.
|
|
payload = {
|
|
"id": item.get("id"),
|
|
"name": item.get("name"),
|
|
"year": item.get("year"),
|
|
"display_name": item.get("display_name"),
|
|
"raw": item.get("raw", {}),
|
|
}
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
with self._connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO remembered_series (series_id, payload_json, last_selected_at)
|
|
VALUES (?, ?, ?)
|
|
ON CONFLICT(series_id) DO UPDATE SET
|
|
payload_json = excluded.payload_json,
|
|
last_selected_at = excluded.last_selected_at
|
|
""",
|
|
(series_id, json.dumps(payload, ensure_ascii=True), now),
|
|
)
|
|
|
|
remember_max = int(self.get_settings().get(self.REMEMBER_MAX_SERIES_KEY, self.DEFAULT_REMEMBER_MAX_SERIES))
|
|
self._enforce_remembered_series_limit(remember_max)
|
|
return self.list_remembered_series()
|
|
|
|
def clear_remembered_series(self) -> None:
|
|
with self._connect() as conn:
|
|
conn.execute("DELETE FROM remembered_series")
|
|
|
|
def remove_remembered_series(self, series_id: str) -> list[dict]:
|
|
normalized = str(series_id or "").strip()
|
|
if not normalized:
|
|
raise ValueError("series_id is required")
|
|
with self._connect() as conn:
|
|
conn.execute(
|
|
"DELETE FROM remembered_series WHERE series_id = ?",
|
|
(normalized,),
|
|
)
|
|
return self.list_remembered_series()
|
|
|
|
def _enforce_remembered_series_limit(self, limit: int) -> None:
|
|
with self._connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
DELETE FROM remembered_series
|
|
WHERE series_id NOT IN (
|
|
SELECT series_id
|
|
FROM remembered_series
|
|
ORDER BY last_selected_at DESC
|
|
LIMIT ?
|
|
)
|
|
""",
|
|
(int(limit),),
|
|
)
|
|
|
|
def list_selected_episodes(self, session_id: str) -> list[dict]:
|
|
with self._connect() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT selection_id, position, payload_json
|
|
FROM selected_episodes
|
|
WHERE session_id = ?
|
|
ORDER BY position ASC
|
|
""",
|
|
(session_id,),
|
|
).fetchall()
|
|
|
|
items = []
|
|
for row in rows:
|
|
payload = json.loads(row["payload_json"])
|
|
items.append(
|
|
{
|
|
"selection_id": row["selection_id"],
|
|
"position": row["position"],
|
|
"episode": payload,
|
|
}
|
|
)
|
|
return items
|
|
|
|
def add_selected_episodes(self, session_id: str, items: list[dict]) -> list[dict]:
|
|
if not items:
|
|
return self.list_selected_episodes(session_id)
|
|
|
|
with self._connect() as conn:
|
|
current_max = conn.execute(
|
|
"""
|
|
SELECT COALESCE(MAX(position), -1) AS max_position
|
|
FROM selected_episodes
|
|
WHERE session_id = ?
|
|
""",
|
|
(session_id,),
|
|
).fetchone()
|
|
next_position = int(current_max["max_position"]) + 1
|
|
|
|
for item in items:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO selected_episodes (session_id, position, payload_json)
|
|
VALUES (?, ?, ?)
|
|
""",
|
|
(session_id, next_position, json.dumps(item, ensure_ascii=True)),
|
|
)
|
|
next_position += 1
|
|
|
|
return self.list_selected_episodes(session_id)
|
|
|
|
def clear_selected_episodes(self, session_id: str) -> None:
|
|
with self._connect() as conn:
|
|
conn.execute(
|
|
"DELETE FROM selected_episodes WHERE session_id = ?",
|
|
(session_id,),
|
|
)
|
|
|
|
def remove_selected_episode(self, session_id: str, selection_id: int) -> list[dict]:
|
|
with self._connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
DELETE FROM selected_episodes
|
|
WHERE session_id = ? AND selection_id = ?
|
|
""",
|
|
(session_id, selection_id),
|
|
)
|
|
return self._compact_positions(session_id)
|
|
|
|
def reorder_selected_episodes(
|
|
self,
|
|
session_id: str,
|
|
from_index: int,
|
|
to_index: int,
|
|
) -> list[dict]:
|
|
current_items = self.list_selected_episodes(session_id)
|
|
|
|
if from_index < 0 or from_index >= len(current_items):
|
|
raise ValueError("from_index out of range")
|
|
if to_index < 0 or to_index >= len(current_items):
|
|
raise ValueError("to_index out of range")
|
|
if from_index == to_index:
|
|
return current_items
|
|
|
|
moved = current_items.pop(from_index)
|
|
current_items.insert(to_index, moved)
|
|
|
|
with self._connect() as conn:
|
|
# Two-phase update avoids transient UNIQUE conflicts on (session_id, position).
|
|
for position, item in enumerate(current_items):
|
|
conn.execute(
|
|
"""
|
|
UPDATE selected_episodes
|
|
SET position = ?
|
|
WHERE session_id = ? AND selection_id = ?
|
|
""",
|
|
(-(position + 1), session_id, item["selection_id"]),
|
|
)
|
|
|
|
for position, item in enumerate(current_items):
|
|
conn.execute(
|
|
"""
|
|
UPDATE selected_episodes
|
|
SET position = ?
|
|
WHERE session_id = ? AND selection_id = ?
|
|
""",
|
|
(position, session_id, item["selection_id"]),
|
|
)
|
|
|
|
return self.list_selected_episodes(session_id)
|
|
|
|
def _compact_positions(self, session_id: str) -> list[dict]:
|
|
items = self.list_selected_episodes(session_id)
|
|
with self._connect() as conn:
|
|
for position, item in enumerate(items):
|
|
if item["position"] == position:
|
|
continue
|
|
conn.execute(
|
|
"""
|
|
UPDATE selected_episodes
|
|
SET position = ?
|
|
WHERE session_id = ? AND selection_id = ?
|
|
""",
|
|
(position, session_id, item["selection_id"]),
|
|
)
|
|
return self.list_selected_episodes(session_id)
|
|
|
|
def list_selected_files(self, session_id: str) -> list[dict]:
|
|
with self._connect() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT selection_id, position, payload_json
|
|
FROM selected_files
|
|
WHERE session_id = ?
|
|
ORDER BY position ASC
|
|
""",
|
|
(session_id,),
|
|
).fetchall()
|
|
|
|
items = []
|
|
for row in rows:
|
|
payload = json.loads(row["payload_json"])
|
|
items.append(
|
|
{
|
|
"selection_id": row["selection_id"],
|
|
"position": row["position"],
|
|
"file": payload,
|
|
}
|
|
)
|
|
return items
|
|
|
|
def add_selected_files(self, session_id: str, items: list[dict]) -> list[dict]:
|
|
if not items:
|
|
return self.list_selected_files(session_id)
|
|
|
|
with self._connect() as conn:
|
|
current_max = conn.execute(
|
|
"""
|
|
SELECT COALESCE(MAX(position), -1) AS max_position
|
|
FROM selected_files
|
|
WHERE session_id = ?
|
|
""",
|
|
(session_id,),
|
|
).fetchone()
|
|
next_position = int(current_max["max_position"]) + 1
|
|
|
|
for item in items:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO selected_files (session_id, position, payload_json)
|
|
VALUES (?, ?, ?)
|
|
""",
|
|
(session_id, next_position, json.dumps(item, ensure_ascii=True)),
|
|
)
|
|
next_position += 1
|
|
|
|
return self.list_selected_files(session_id)
|
|
|
|
def clear_selected_files(self, session_id: str) -> None:
|
|
with self._connect() as conn:
|
|
conn.execute(
|
|
"DELETE FROM selected_files WHERE session_id = ?",
|
|
(session_id,),
|
|
)
|
|
|
|
def remove_selected_file(self, session_id: str, selection_id: int) -> list[dict]:
|
|
with self._connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
DELETE FROM selected_files
|
|
WHERE session_id = ? AND selection_id = ?
|
|
""",
|
|
(session_id, selection_id),
|
|
)
|
|
return self._compact_file_positions(session_id)
|
|
|
|
def reorder_selected_files(
|
|
self,
|
|
session_id: str,
|
|
from_index: int,
|
|
to_index: int,
|
|
) -> list[dict]:
|
|
current_items = self.list_selected_files(session_id)
|
|
|
|
if from_index < 0 or from_index >= len(current_items):
|
|
raise ValueError("from_index out of range")
|
|
if to_index < 0 or to_index >= len(current_items):
|
|
raise ValueError("to_index out of range")
|
|
if from_index == to_index:
|
|
return current_items
|
|
|
|
moved = current_items.pop(from_index)
|
|
current_items.insert(to_index, moved)
|
|
|
|
with self._connect() as conn:
|
|
# Two-phase update avoids transient UNIQUE conflicts on (session_id, position).
|
|
for position, item in enumerate(current_items):
|
|
conn.execute(
|
|
"""
|
|
UPDATE selected_files
|
|
SET position = ?
|
|
WHERE session_id = ? AND selection_id = ?
|
|
""",
|
|
(-(position + 1), session_id, item["selection_id"]),
|
|
)
|
|
|
|
for position, item in enumerate(current_items):
|
|
conn.execute(
|
|
"""
|
|
UPDATE selected_files
|
|
SET position = ?
|
|
WHERE session_id = ? AND selection_id = ?
|
|
""",
|
|
(position, session_id, item["selection_id"]),
|
|
)
|
|
|
|
return self.list_selected_files(session_id)
|
|
|
|
def _compact_file_positions(self, session_id: str) -> list[dict]:
|
|
items = self.list_selected_files(session_id)
|
|
with self._connect() as conn:
|
|
for position, item in enumerate(items):
|
|
if item["position"] == position:
|
|
continue
|
|
conn.execute(
|
|
"""
|
|
UPDATE selected_files
|
|
SET position = ?
|
|
WHERE session_id = ? AND selection_id = ?
|
|
""",
|
|
(position, session_id, item["selection_id"]),
|
|
)
|
|
return self.list_selected_files(session_id)
|
|
|
|
def build_mapping_preview(self, session_id: str) -> dict:
|
|
episodes = self.list_selected_episodes(session_id)
|
|
files = self.list_selected_files(session_id)
|
|
|
|
if len(episodes) != len(files):
|
|
raise ValueError(
|
|
"Selected episodes and selected files count mismatch: "
|
|
f"{len(episodes)} episodes vs {len(files)} files"
|
|
)
|
|
|
|
mappings = []
|
|
for index, (episode_item, file_item) in enumerate(zip(episodes, files)):
|
|
mappings.append(
|
|
{
|
|
"index": index,
|
|
"episode_selection_id": episode_item["selection_id"],
|
|
"file_selection_id": file_item["selection_id"],
|
|
"episode": episode_item["episode"],
|
|
"file": file_item["file"],
|
|
}
|
|
)
|
|
|
|
return {
|
|
"session_id": session_id,
|
|
"counts": {
|
|
"episodes": len(episodes),
|
|
"files": len(files),
|
|
},
|
|
"mappings": mappings,
|
|
}
|
|
|
|
def build_filename_preview(self, session_id: str) -> dict:
|
|
mapping_preview = self.build_mapping_preview(session_id)
|
|
previews = []
|
|
|
|
for item in mapping_preview["mappings"]:
|
|
episode = item["episode"]
|
|
file_payload = item["file"]
|
|
|
|
series = (
|
|
episode.get("series")
|
|
or episode.get("series_name")
|
|
or episode.get("show")
|
|
or "Unknown Series"
|
|
)
|
|
year = episode.get("year") or "0000"
|
|
series = self._normalize_series_name(series, year)
|
|
series = self.sanitize_filename_component(series)
|
|
title = episode.get("title") or "Untitled"
|
|
title = self.sanitize_filename_component(title)
|
|
|
|
season_raw = episode.get("season_number") or episode.get("season") or 0
|
|
episode_raw = episode.get("episode_number") or episode.get("number") or 0
|
|
|
|
try:
|
|
season_number = int(season_raw)
|
|
except (TypeError, ValueError):
|
|
season_number = 0
|
|
try:
|
|
episode_number = int(episode_raw)
|
|
except (TypeError, ValueError):
|
|
episode_number = 0
|
|
|
|
source_name = file_payload.get("name") or file_payload.get("path") or ""
|
|
ext = Path(source_name).suffix
|
|
|
|
proposed_filename = (
|
|
f"{series} ({year}) - S{season_number:02}E{episode_number:02} - {title}{ext}"
|
|
)
|
|
proposed_filename = self._finalize_filename(proposed_filename, ext)
|
|
|
|
previews.append(
|
|
{
|
|
"index": item["index"],
|
|
"episode_selection_id": item["episode_selection_id"],
|
|
"file_selection_id": item["file_selection_id"],
|
|
"episode": episode,
|
|
"file": file_payload,
|
|
"proposed_filename": proposed_filename,
|
|
}
|
|
)
|
|
|
|
return {
|
|
"session_id": mapping_preview["session_id"],
|
|
"counts": mapping_preview["counts"],
|
|
"template": "{series} ({year}) - S{season:02}E{episode:02} - {title}{ext}",
|
|
"items": previews,
|
|
}
|
|
|
|
def _normalize_series_name(self, series: str, year: int | str) -> str:
|
|
text = str(series or "").strip()
|
|
year_str = str(year or "").strip()
|
|
if not text or not year_str:
|
|
return text
|
|
|
|
# Strip trailing " (YEAR)" to avoid duplicate year in the template output.
|
|
pattern = re.compile(rf"\s*\({re.escape(year_str)}\)\s*$")
|
|
return pattern.sub("", text).strip()
|
|
|
|
def sanitize_filename_component(self, value: str) -> str:
|
|
text = str(value or "")
|
|
# Replace Windows/SMB disallowed characters with spaces.
|
|
text = re.sub(r'[\\/:*?"<>|]', " ", text)
|
|
# Normalize any repeated whitespace and trim trailing/leading dot/space.
|
|
text = re.sub(r"\s+", " ", text).strip(" .")
|
|
return text or "Untitled"
|
|
|
|
def _finalize_filename(self, filename: str, ext: str) -> str:
|
|
extension = str(ext or "")
|
|
stem = filename[: -len(extension)] if extension and filename.endswith(extension) else filename
|
|
stem = re.sub(r"\s+", " ", stem).strip(" .")
|
|
if not stem:
|
|
stem = "Untitled"
|
|
|
|
max_stem_len = max(1, self.MAX_FILENAME_LEN - len(extension))
|
|
if len(stem) > max_stem_len:
|
|
stem = stem[:max_stem_len].rstrip(" .")
|
|
if not stem:
|
|
stem = "Untitled"
|
|
|
|
return f"{stem}{extension}"
|
|
|
|
def execute_rename(self, session_id: str, confirm: bool) -> dict:
|
|
if not confirm:
|
|
raise ValueError("confirm=true is required to execute rename")
|
|
|
|
started_at = time.perf_counter()
|
|
preview = self.build_filename_preview(session_id)
|
|
settings = self.get_settings()
|
|
set_file_date_to_first_aired = bool(settings.get(self.FILE_DATE_SETTING_KEY, False))
|
|
allowed_roots = self._allowed_media_roots()
|
|
preflight_items = []
|
|
preflight_errors = 0
|
|
aired_by_index = {}
|
|
|
|
for preview_item in preview["items"]:
|
|
aired_by_index[int(preview_item["index"])] = preview_item["episode"].get("aired")
|
|
|
|
for item in preview["items"]:
|
|
source_path_str = str(item["file"].get("path") or "").strip()
|
|
proposed_filename = item["proposed_filename"]
|
|
source_path = Path(source_path_str)
|
|
|
|
destination_path = source_path.with_name(proposed_filename) if source_path_str else Path("")
|
|
errors = self._preflight_errors(
|
|
source_path=source_path,
|
|
destination_path=destination_path,
|
|
proposed_filename=proposed_filename,
|
|
allowed_roots=allowed_roots,
|
|
)
|
|
|
|
status = "ready"
|
|
if errors:
|
|
status = "preflight_error"
|
|
preflight_errors += 1
|
|
|
|
preflight_items.append(
|
|
{
|
|
"index": item["index"],
|
|
"episode_selection_id": item["episode_selection_id"],
|
|
"file_selection_id": item["file_selection_id"],
|
|
"source_path": source_path_str,
|
|
"destination_path": str(destination_path) if source_path_str else "",
|
|
"proposed_filename": proposed_filename,
|
|
"status": status,
|
|
"errors": errors,
|
|
"file_date_status": "file_date_skipped",
|
|
"file_date_detail": "rename not executed due to preflight failure",
|
|
}
|
|
)
|
|
|
|
if preflight_errors > 0:
|
|
result = {
|
|
"session_id": session_id,
|
|
"confirm": confirm,
|
|
"executed": False,
|
|
"preflight_ok": False,
|
|
"counts": preview["counts"],
|
|
"items": preflight_items,
|
|
}
|
|
self._log_rename_run(
|
|
session_id=session_id,
|
|
result=result,
|
|
duration_ms=int((time.perf_counter() - started_at) * 1000),
|
|
)
|
|
return result
|
|
|
|
results = []
|
|
for item in preflight_items:
|
|
source_path = Path(item["source_path"])
|
|
destination_path = Path(item["destination_path"])
|
|
os.replace(str(source_path), str(destination_path))
|
|
|
|
file_date_status, file_date_detail = self._apply_file_date_after_rename(
|
|
enabled=set_file_date_to_first_aired,
|
|
aired_value=aired_by_index.get(int(item["index"])),
|
|
destination_path=destination_path,
|
|
)
|
|
results.append(
|
|
{
|
|
**item,
|
|
"status": "renamed",
|
|
"errors": [],
|
|
"file_date_status": file_date_status,
|
|
"file_date_detail": file_date_detail,
|
|
}
|
|
)
|
|
|
|
result = {
|
|
"session_id": session_id,
|
|
"confirm": confirm,
|
|
"executed": True,
|
|
"preflight_ok": True,
|
|
"counts": preview["counts"],
|
|
"items": results,
|
|
}
|
|
self._log_rename_run(
|
|
session_id=session_id,
|
|
result=result,
|
|
duration_ms=int((time.perf_counter() - started_at) * 1000),
|
|
)
|
|
return result
|
|
|
|
def _apply_file_date_after_rename(
|
|
self,
|
|
enabled: bool,
|
|
aired_value,
|
|
destination_path: Path,
|
|
) -> tuple[str, str]:
|
|
if not enabled:
|
|
return ("file_date_skipped", "setting disabled")
|
|
|
|
ts = self._aired_to_local_noon_timestamp(aired_value)
|
|
if ts is None:
|
|
return ("file_date_skipped", "aired date missing or invalid")
|
|
|
|
try:
|
|
os.utime(destination_path, (ts, ts))
|
|
return ("file_date_updated", "mtime+atime set to aired date at 12:00 local time")
|
|
except Exception as exc:
|
|
return ("file_date_error", str(exc))
|
|
|
|
def _aired_to_local_noon_timestamp(self, aired_value) -> float | None:
|
|
if aired_value is None:
|
|
return None
|
|
|
|
text = str(aired_value).strip()
|
|
if not text:
|
|
return None
|
|
date_text = text[:10]
|
|
|
|
try:
|
|
date_part = datetime.strptime(date_text, "%Y-%m-%d")
|
|
except ValueError:
|
|
return None
|
|
|
|
# Convert with local-time semantics (host/container local timezone),
|
|
# avoiding implicit UTC conversion paths.
|
|
local_struct = (
|
|
date_part.year,
|
|
date_part.month,
|
|
date_part.day,
|
|
12,
|
|
0,
|
|
0,
|
|
-1,
|
|
-1,
|
|
-1,
|
|
)
|
|
return time.mktime(local_struct)
|
|
|
|
def _log_rename_run(self, session_id: str, result: dict, duration_ms: int) -> None:
|
|
created_at = datetime.now(timezone.utc).isoformat()
|
|
counts = result.get("counts", {})
|
|
|
|
with self._connect() as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
INSERT INTO rename_runs (
|
|
session_id,
|
|
created_at,
|
|
confirm,
|
|
executed,
|
|
preflight_ok,
|
|
episodes_count,
|
|
files_count,
|
|
duration_ms
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
session_id,
|
|
created_at,
|
|
1 if result.get("confirm") else 0,
|
|
1 if result.get("executed") else 0,
|
|
1 if result.get("preflight_ok") else 0,
|
|
int(counts.get("episodes") or 0),
|
|
int(counts.get("files") or 0),
|
|
int(duration_ms),
|
|
),
|
|
)
|
|
run_id = int(cursor.lastrowid)
|
|
|
|
for item in result.get("items", []):
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO rename_run_items (
|
|
run_id,
|
|
item_index,
|
|
episode_selection_id,
|
|
file_selection_id,
|
|
source_path,
|
|
destination_path,
|
|
proposed_filename,
|
|
status,
|
|
errors_json
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
run_id,
|
|
int(item.get("index") or 0),
|
|
item.get("episode_selection_id"),
|
|
item.get("file_selection_id"),
|
|
str(item.get("source_path") or ""),
|
|
str(item.get("destination_path") or ""),
|
|
str(item.get("proposed_filename") or ""),
|
|
str(item.get("status") or ""),
|
|
json.dumps(item.get("errors") or [], ensure_ascii=True),
|
|
),
|
|
)
|
|
|
|
def list_rename_runs(self, session_id: str, limit: int = 20) -> list[dict]:
|
|
with self._connect() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT
|
|
run_id,
|
|
session_id,
|
|
created_at,
|
|
confirm,
|
|
executed,
|
|
preflight_ok,
|
|
episodes_count,
|
|
files_count,
|
|
duration_ms
|
|
FROM rename_runs
|
|
WHERE session_id = ?
|
|
ORDER BY run_id DESC
|
|
LIMIT ?
|
|
""",
|
|
(session_id, limit),
|
|
).fetchall()
|
|
|
|
item_counts = conn.execute(
|
|
"""
|
|
SELECT run_id, COUNT(*) AS item_count
|
|
FROM rename_run_items
|
|
WHERE run_id IN (
|
|
SELECT run_id
|
|
FROM rename_runs
|
|
WHERE session_id = ?
|
|
ORDER BY run_id DESC
|
|
LIMIT ?
|
|
)
|
|
GROUP BY run_id
|
|
""",
|
|
(session_id, limit),
|
|
).fetchall()
|
|
|
|
counts_by_run = {int(row["run_id"]): int(row["item_count"]) for row in item_counts}
|
|
runs = []
|
|
for row in rows:
|
|
run_id = int(row["run_id"])
|
|
runs.append(
|
|
{
|
|
"run_id": run_id,
|
|
"session_id": row["session_id"],
|
|
"created_at": row["created_at"],
|
|
"confirm": bool(row["confirm"]),
|
|
"executed": bool(row["executed"]),
|
|
"preflight_ok": bool(row["preflight_ok"]),
|
|
"counts": {
|
|
"episodes": int(row["episodes_count"]),
|
|
"files": int(row["files_count"]),
|
|
},
|
|
"duration_ms": int(row["duration_ms"]),
|
|
"items_count": counts_by_run.get(run_id, 0),
|
|
}
|
|
)
|
|
return runs
|
|
|
|
def get_rename_run(self, run_id: int) -> dict | None:
|
|
with self._connect() as conn:
|
|
run = conn.execute(
|
|
"""
|
|
SELECT
|
|
run_id,
|
|
session_id,
|
|
created_at,
|
|
confirm,
|
|
executed,
|
|
preflight_ok,
|
|
episodes_count,
|
|
files_count,
|
|
duration_ms
|
|
FROM rename_runs
|
|
WHERE run_id = ?
|
|
""",
|
|
(run_id,),
|
|
).fetchone()
|
|
|
|
if run is None:
|
|
return None
|
|
|
|
item_rows = conn.execute(
|
|
"""
|
|
SELECT
|
|
item_index,
|
|
episode_selection_id,
|
|
file_selection_id,
|
|
source_path,
|
|
destination_path,
|
|
proposed_filename,
|
|
status,
|
|
errors_json
|
|
FROM rename_run_items
|
|
WHERE run_id = ?
|
|
ORDER BY item_index ASC
|
|
""",
|
|
(run_id,),
|
|
).fetchall()
|
|
|
|
items = []
|
|
for row in item_rows:
|
|
items.append(
|
|
{
|
|
"index": int(row["item_index"]),
|
|
"episode_selection_id": row["episode_selection_id"],
|
|
"file_selection_id": row["file_selection_id"],
|
|
"source_path": row["source_path"],
|
|
"destination_path": row["destination_path"],
|
|
"proposed_filename": row["proposed_filename"],
|
|
"status": row["status"],
|
|
"errors": json.loads(row["errors_json"]),
|
|
}
|
|
)
|
|
|
|
return {
|
|
"run_id": int(run["run_id"]),
|
|
"session_id": run["session_id"],
|
|
"created_at": run["created_at"],
|
|
"confirm": bool(run["confirm"]),
|
|
"executed": bool(run["executed"]),
|
|
"preflight_ok": bool(run["preflight_ok"]),
|
|
"counts": {
|
|
"episodes": int(run["episodes_count"]),
|
|
"files": int(run["files_count"]),
|
|
},
|
|
"duration_ms": int(run["duration_ms"]),
|
|
"items": items,
|
|
}
|
|
|
|
def _allowed_media_roots(self) -> list[Path]:
|
|
raw = os.getenv("ALLOWED_MEDIA_ROOTS", "").strip()
|
|
if raw:
|
|
candidates = [p.strip() for p in raw.split(",") if p.strip()]
|
|
else:
|
|
media_root = os.getenv("MEDIA_ROOT", "").strip()
|
|
if media_root:
|
|
candidates = [media_root]
|
|
else:
|
|
candidates = [
|
|
"/Volumes/8TB/Shared_Folders/TV_Shows",
|
|
"/Volumes/8TB_RAID1/Shared_Folders/Library/TV_Shows",
|
|
]
|
|
|
|
roots = []
|
|
for candidate in candidates:
|
|
try:
|
|
roots.append(Path(candidate).resolve())
|
|
except Exception:
|
|
continue
|
|
return roots
|
|
|
|
def _is_within_allowed_roots(self, path: Path, allowed_roots: list[Path]) -> bool:
|
|
try:
|
|
resolved = path.resolve()
|
|
except Exception:
|
|
return False
|
|
|
|
for root in allowed_roots:
|
|
try:
|
|
resolved.relative_to(root)
|
|
return True
|
|
except ValueError:
|
|
continue
|
|
return False
|
|
|
|
def _preflight_errors(
|
|
self,
|
|
source_path: Path,
|
|
destination_path: Path,
|
|
proposed_filename: str,
|
|
allowed_roots: list[Path],
|
|
) -> list[str]:
|
|
errors = []
|
|
|
|
if not str(source_path):
|
|
errors.append("source path missing")
|
|
return errors
|
|
|
|
if ".." in source_path.parts:
|
|
errors.append("source path traversal is not allowed")
|
|
if ".." in Path(proposed_filename).parts:
|
|
errors.append("destination filename traversal is not allowed")
|
|
if Path(proposed_filename).name != proposed_filename:
|
|
errors.append("destination filename must not contain path separators")
|
|
|
|
if not self._is_within_allowed_roots(source_path, allowed_roots):
|
|
errors.append("source path is outside allowed media roots")
|
|
if not self._is_within_allowed_roots(destination_path, allowed_roots):
|
|
errors.append("destination path is outside allowed media roots")
|
|
|
|
if not source_path.exists():
|
|
errors.append("source file does not exist")
|
|
if source_path.exists() and not source_path.is_file():
|
|
errors.append("source path is not a file")
|
|
|
|
if source_path == destination_path:
|
|
errors.append("source and destination paths are equal")
|
|
|
|
if destination_path.exists():
|
|
errors.append("destination file already exists")
|
|
|
|
if not destination_path.parent.exists():
|
|
errors.append("destination parent directory does not exist")
|
|
|
|
return errors
|