import json import sqlite3 from pathlib import Path from app.config import APP_DATA_DIR class SessionService: def __init__(self) -> None: self._db_path = Path(APP_DATA_DIR) / "session_state.sqlite3" self._db_path.parent.mkdir(parents=True, exist_ok=True) self._init_db() def _connect(self) -> sqlite3.Connection: conn = sqlite3.connect(self._db_path) conn.row_factory = sqlite3.Row return conn def _init_db(self) -> None: with self._connect() as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS selected_episodes ( selection_id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT NOT NULL, position INTEGER NOT NULL, payload_json TEXT NOT NULL ) """ ) conn.execute( """ CREATE UNIQUE INDEX IF NOT EXISTS idx_selected_episodes_session_position ON selected_episodes(session_id, position) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_selected_episodes_session ON selected_episodes(session_id) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS selected_files ( selection_id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT NOT NULL, position INTEGER NOT NULL, payload_json TEXT NOT NULL ) """ ) conn.execute( """ CREATE UNIQUE INDEX IF NOT EXISTS idx_selected_files_session_position ON selected_files(session_id, position) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_selected_files_session ON selected_files(session_id) """ ) def list_selected_episodes(self, session_id: str) -> list[dict]: with self._connect() as conn: rows = conn.execute( """ SELECT selection_id, position, payload_json FROM selected_episodes WHERE session_id = ? ORDER BY position ASC """, (session_id,), ).fetchall() items = [] for row in rows: payload = json.loads(row["payload_json"]) items.append( { "selection_id": row["selection_id"], "position": row["position"], "episode": payload, } ) return items def add_selected_episodes(self, session_id: str, items: list[dict]) -> list[dict]: if not items: return self.list_selected_episodes(session_id) with self._connect() as conn: current_max = conn.execute( """ SELECT COALESCE(MAX(position), -1) AS max_position FROM selected_episodes WHERE session_id = ? """, (session_id,), ).fetchone() next_position = int(current_max["max_position"]) + 1 for item in items: conn.execute( """ INSERT INTO selected_episodes (session_id, position, payload_json) VALUES (?, ?, ?) """, (session_id, next_position, json.dumps(item, ensure_ascii=True)), ) next_position += 1 return self.list_selected_episodes(session_id) def clear_selected_episodes(self, session_id: str) -> None: with self._connect() as conn: conn.execute( "DELETE FROM selected_episodes WHERE session_id = ?", (session_id,), ) def remove_selected_episode(self, session_id: str, selection_id: int) -> list[dict]: with self._connect() as conn: conn.execute( """ DELETE FROM selected_episodes WHERE session_id = ? AND selection_id = ? """, (session_id, selection_id), ) return self._compact_positions(session_id) def reorder_selected_episodes( self, session_id: str, from_index: int, to_index: int, ) -> list[dict]: current_items = self.list_selected_episodes(session_id) if from_index < 0 or from_index >= len(current_items): raise ValueError("from_index out of range") if to_index < 0 or to_index >= len(current_items): raise ValueError("to_index out of range") if from_index == to_index: return current_items moved = current_items.pop(from_index) current_items.insert(to_index, moved) with self._connect() as conn: # Two-phase update avoids transient UNIQUE conflicts on (session_id, position). for position, item in enumerate(current_items): conn.execute( """ UPDATE selected_episodes SET position = ? WHERE session_id = ? AND selection_id = ? """, (-(position + 1), session_id, item["selection_id"]), ) for position, item in enumerate(current_items): conn.execute( """ UPDATE selected_episodes SET position = ? WHERE session_id = ? AND selection_id = ? """, (position, session_id, item["selection_id"]), ) return self.list_selected_episodes(session_id) def _compact_positions(self, session_id: str) -> list[dict]: items = self.list_selected_episodes(session_id) with self._connect() as conn: for position, item in enumerate(items): if item["position"] == position: continue conn.execute( """ UPDATE selected_episodes SET position = ? WHERE session_id = ? AND selection_id = ? """, (position, session_id, item["selection_id"]), ) return self.list_selected_episodes(session_id) def list_selected_files(self, session_id: str) -> list[dict]: with self._connect() as conn: rows = conn.execute( """ SELECT selection_id, position, payload_json FROM selected_files WHERE session_id = ? ORDER BY position ASC """, (session_id,), ).fetchall() items = [] for row in rows: payload = json.loads(row["payload_json"]) items.append( { "selection_id": row["selection_id"], "position": row["position"], "file": payload, } ) return items def add_selected_files(self, session_id: str, items: list[dict]) -> list[dict]: if not items: return self.list_selected_files(session_id) with self._connect() as conn: current_max = conn.execute( """ SELECT COALESCE(MAX(position), -1) AS max_position FROM selected_files WHERE session_id = ? """, (session_id,), ).fetchone() next_position = int(current_max["max_position"]) + 1 for item in items: conn.execute( """ INSERT INTO selected_files (session_id, position, payload_json) VALUES (?, ?, ?) """, (session_id, next_position, json.dumps(item, ensure_ascii=True)), ) next_position += 1 return self.list_selected_files(session_id) def clear_selected_files(self, session_id: str) -> None: with self._connect() as conn: conn.execute( "DELETE FROM selected_files WHERE session_id = ?", (session_id,), ) def remove_selected_file(self, session_id: str, selection_id: int) -> list[dict]: with self._connect() as conn: conn.execute( """ DELETE FROM selected_files WHERE session_id = ? AND selection_id = ? """, (session_id, selection_id), ) return self._compact_file_positions(session_id) def reorder_selected_files( self, session_id: str, from_index: int, to_index: int, ) -> list[dict]: current_items = self.list_selected_files(session_id) if from_index < 0 or from_index >= len(current_items): raise ValueError("from_index out of range") if to_index < 0 or to_index >= len(current_items): raise ValueError("to_index out of range") if from_index == to_index: return current_items moved = current_items.pop(from_index) current_items.insert(to_index, moved) with self._connect() as conn: # Two-phase update avoids transient UNIQUE conflicts on (session_id, position). for position, item in enumerate(current_items): conn.execute( """ UPDATE selected_files SET position = ? WHERE session_id = ? AND selection_id = ? """, (-(position + 1), session_id, item["selection_id"]), ) for position, item in enumerate(current_items): conn.execute( """ UPDATE selected_files SET position = ? WHERE session_id = ? AND selection_id = ? """, (position, session_id, item["selection_id"]), ) return self.list_selected_files(session_id) def _compact_file_positions(self, session_id: str) -> list[dict]: items = self.list_selected_files(session_id) with self._connect() as conn: for position, item in enumerate(items): if item["position"] == position: continue conn.execute( """ UPDATE selected_files SET position = ? WHERE session_id = ? AND selection_id = ? """, (position, session_id, item["selection_id"]), ) return self.list_selected_files(session_id) def build_mapping_preview(self, session_id: str) -> dict: episodes = self.list_selected_episodes(session_id) files = self.list_selected_files(session_id) if len(episodes) != len(files): raise ValueError( "Selected episodes and selected files count mismatch: " f"{len(episodes)} episodes vs {len(files)} files" ) mappings = [] for index, (episode_item, file_item) in enumerate(zip(episodes, files)): mappings.append( { "index": index, "episode_selection_id": episode_item["selection_id"], "file_selection_id": file_item["selection_id"], "episode": episode_item["episode"], "file": file_item["file"], } ) return { "session_id": session_id, "counts": { "episodes": len(episodes), "files": len(files), }, "mappings": mappings, }