319 lines
11 KiB
Python
319 lines
11 KiB
Python
import json
|
|
import sqlite3
|
|
from pathlib import Path
|
|
|
|
from app.config import APP_DATA_DIR
|
|
|
|
|
|
class SessionService:
|
|
def __init__(self) -> None:
|
|
self._db_path = Path(APP_DATA_DIR) / "session_state.sqlite3"
|
|
self._db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self._init_db()
|
|
|
|
def _connect(self) -> sqlite3.Connection:
|
|
conn = sqlite3.connect(self._db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
def _init_db(self) -> None:
|
|
with self._connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS selected_episodes (
|
|
selection_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
session_id TEXT NOT NULL,
|
|
position INTEGER NOT NULL,
|
|
payload_json TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_selected_episodes_session_position
|
|
ON selected_episodes(session_id, position)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_selected_episodes_session
|
|
ON selected_episodes(session_id)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS selected_files (
|
|
selection_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
session_id TEXT NOT NULL,
|
|
position INTEGER NOT NULL,
|
|
payload_json TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_selected_files_session_position
|
|
ON selected_files(session_id, position)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_selected_files_session
|
|
ON selected_files(session_id)
|
|
"""
|
|
)
|
|
|
|
def list_selected_episodes(self, session_id: str) -> list[dict]:
|
|
with self._connect() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT selection_id, position, payload_json
|
|
FROM selected_episodes
|
|
WHERE session_id = ?
|
|
ORDER BY position ASC
|
|
""",
|
|
(session_id,),
|
|
).fetchall()
|
|
|
|
items = []
|
|
for row in rows:
|
|
payload = json.loads(row["payload_json"])
|
|
items.append(
|
|
{
|
|
"selection_id": row["selection_id"],
|
|
"position": row["position"],
|
|
"episode": payload,
|
|
}
|
|
)
|
|
return items
|
|
|
|
def add_selected_episodes(self, session_id: str, items: list[dict]) -> list[dict]:
|
|
if not items:
|
|
return self.list_selected_episodes(session_id)
|
|
|
|
with self._connect() as conn:
|
|
current_max = conn.execute(
|
|
"""
|
|
SELECT COALESCE(MAX(position), -1) AS max_position
|
|
FROM selected_episodes
|
|
WHERE session_id = ?
|
|
""",
|
|
(session_id,),
|
|
).fetchone()
|
|
next_position = int(current_max["max_position"]) + 1
|
|
|
|
for item in items:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO selected_episodes (session_id, position, payload_json)
|
|
VALUES (?, ?, ?)
|
|
""",
|
|
(session_id, next_position, json.dumps(item, ensure_ascii=True)),
|
|
)
|
|
next_position += 1
|
|
|
|
return self.list_selected_episodes(session_id)
|
|
|
|
def clear_selected_episodes(self, session_id: str) -> None:
|
|
with self._connect() as conn:
|
|
conn.execute(
|
|
"DELETE FROM selected_episodes WHERE session_id = ?",
|
|
(session_id,),
|
|
)
|
|
|
|
def remove_selected_episode(self, session_id: str, selection_id: int) -> list[dict]:
|
|
with self._connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
DELETE FROM selected_episodes
|
|
WHERE session_id = ? AND selection_id = ?
|
|
""",
|
|
(session_id, selection_id),
|
|
)
|
|
return self._compact_positions(session_id)
|
|
|
|
def reorder_selected_episodes(
|
|
self,
|
|
session_id: str,
|
|
from_index: int,
|
|
to_index: int,
|
|
) -> list[dict]:
|
|
current_items = self.list_selected_episodes(session_id)
|
|
|
|
if from_index < 0 or from_index >= len(current_items):
|
|
raise ValueError("from_index out of range")
|
|
if to_index < 0 or to_index >= len(current_items):
|
|
raise ValueError("to_index out of range")
|
|
if from_index == to_index:
|
|
return current_items
|
|
|
|
moved = current_items.pop(from_index)
|
|
current_items.insert(to_index, moved)
|
|
|
|
with self._connect() as conn:
|
|
# Two-phase update avoids transient UNIQUE conflicts on (session_id, position).
|
|
for position, item in enumerate(current_items):
|
|
conn.execute(
|
|
"""
|
|
UPDATE selected_episodes
|
|
SET position = ?
|
|
WHERE session_id = ? AND selection_id = ?
|
|
""",
|
|
(-(position + 1), session_id, item["selection_id"]),
|
|
)
|
|
|
|
for position, item in enumerate(current_items):
|
|
conn.execute(
|
|
"""
|
|
UPDATE selected_episodes
|
|
SET position = ?
|
|
WHERE session_id = ? AND selection_id = ?
|
|
""",
|
|
(position, session_id, item["selection_id"]),
|
|
)
|
|
|
|
return self.list_selected_episodes(session_id)
|
|
|
|
def _compact_positions(self, session_id: str) -> list[dict]:
|
|
items = self.list_selected_episodes(session_id)
|
|
with self._connect() as conn:
|
|
for position, item in enumerate(items):
|
|
if item["position"] == position:
|
|
continue
|
|
conn.execute(
|
|
"""
|
|
UPDATE selected_episodes
|
|
SET position = ?
|
|
WHERE session_id = ? AND selection_id = ?
|
|
""",
|
|
(position, session_id, item["selection_id"]),
|
|
)
|
|
return self.list_selected_episodes(session_id)
|
|
|
|
def list_selected_files(self, session_id: str) -> list[dict]:
|
|
with self._connect() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT selection_id, position, payload_json
|
|
FROM selected_files
|
|
WHERE session_id = ?
|
|
ORDER BY position ASC
|
|
""",
|
|
(session_id,),
|
|
).fetchall()
|
|
|
|
items = []
|
|
for row in rows:
|
|
payload = json.loads(row["payload_json"])
|
|
items.append(
|
|
{
|
|
"selection_id": row["selection_id"],
|
|
"position": row["position"],
|
|
"file": payload,
|
|
}
|
|
)
|
|
return items
|
|
|
|
def add_selected_files(self, session_id: str, items: list[dict]) -> list[dict]:
|
|
if not items:
|
|
return self.list_selected_files(session_id)
|
|
|
|
with self._connect() as conn:
|
|
current_max = conn.execute(
|
|
"""
|
|
SELECT COALESCE(MAX(position), -1) AS max_position
|
|
FROM selected_files
|
|
WHERE session_id = ?
|
|
""",
|
|
(session_id,),
|
|
).fetchone()
|
|
next_position = int(current_max["max_position"]) + 1
|
|
|
|
for item in items:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO selected_files (session_id, position, payload_json)
|
|
VALUES (?, ?, ?)
|
|
""",
|
|
(session_id, next_position, json.dumps(item, ensure_ascii=True)),
|
|
)
|
|
next_position += 1
|
|
|
|
return self.list_selected_files(session_id)
|
|
|
|
def clear_selected_files(self, session_id: str) -> None:
|
|
with self._connect() as conn:
|
|
conn.execute(
|
|
"DELETE FROM selected_files WHERE session_id = ?",
|
|
(session_id,),
|
|
)
|
|
|
|
def remove_selected_file(self, session_id: str, selection_id: int) -> list[dict]:
|
|
with self._connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
DELETE FROM selected_files
|
|
WHERE session_id = ? AND selection_id = ?
|
|
""",
|
|
(session_id, selection_id),
|
|
)
|
|
return self._compact_file_positions(session_id)
|
|
|
|
def reorder_selected_files(
|
|
self,
|
|
session_id: str,
|
|
from_index: int,
|
|
to_index: int,
|
|
) -> list[dict]:
|
|
current_items = self.list_selected_files(session_id)
|
|
|
|
if from_index < 0 or from_index >= len(current_items):
|
|
raise ValueError("from_index out of range")
|
|
if to_index < 0 or to_index >= len(current_items):
|
|
raise ValueError("to_index out of range")
|
|
if from_index == to_index:
|
|
return current_items
|
|
|
|
moved = current_items.pop(from_index)
|
|
current_items.insert(to_index, moved)
|
|
|
|
with self._connect() as conn:
|
|
# Two-phase update avoids transient UNIQUE conflicts on (session_id, position).
|
|
for position, item in enumerate(current_items):
|
|
conn.execute(
|
|
"""
|
|
UPDATE selected_files
|
|
SET position = ?
|
|
WHERE session_id = ? AND selection_id = ?
|
|
""",
|
|
(-(position + 1), session_id, item["selection_id"]),
|
|
)
|
|
|
|
for position, item in enumerate(current_items):
|
|
conn.execute(
|
|
"""
|
|
UPDATE selected_files
|
|
SET position = ?
|
|
WHERE session_id = ? AND selection_id = ?
|
|
""",
|
|
(position, session_id, item["selection_id"]),
|
|
)
|
|
|
|
return self.list_selected_files(session_id)
|
|
|
|
def _compact_file_positions(self, session_id: str) -> list[dict]:
|
|
items = self.list_selected_files(session_id)
|
|
with self._connect() as conn:
|
|
for position, item in enumerate(items):
|
|
if item["position"] == position:
|
|
continue
|
|
conn.execute(
|
|
"""
|
|
UPDATE selected_files
|
|
SET position = ?
|
|
WHERE session_id = ? AND selection_id = ?
|
|
""",
|
|
(position, session_id, item["selection_id"]),
|
|
)
|
|
return self.list_selected_files(session_id)
|