Files
webmanager-mvp/webui/backend/app/db/history_repository.py
T
2026-03-14 17:20:36 +01:00

170 lines
5.5 KiB
Python

from __future__ import annotations
import sqlite3
import uuid
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
VALID_HISTORY_STATUSES = {"queued", "completed", "failed", "requested", "ready", "preflight_failed", "cancelled"}
VALID_HISTORY_OPERATIONS = {"mkdir", "rename", "delete", "copy", "move", "upload", "download", "duplicate"}
class HistoryRepository:
def __init__(self, db_path: str):
self._db_path = db_path
self._ensure_schema()
def create_entry(
self,
*,
operation: str,
status: str,
source: str | None = None,
destination: str | None = None,
path: str | None = None,
error_code: str | None = None,
error_message: str | None = None,
created_at: str | None = None,
finished_at: str | None = None,
entry_id: str | None = None,
) -> dict:
if operation not in VALID_HISTORY_OPERATIONS:
raise ValueError("invalid operation")
if status not in VALID_HISTORY_STATUSES:
raise ValueError("invalid status")
history_id = entry_id or str(uuid.uuid4())
created_value = created_at or self._now_iso()
with self._connection() as conn:
conn.execute(
"""
INSERT INTO history (
id, operation, status, source, destination, path,
error_code, error_message, created_at, finished_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
history_id,
operation,
status,
source,
destination,
path,
error_code,
error_message,
created_value,
finished_at,
),
)
row = conn.execute("SELECT * FROM history WHERE id = ?", (history_id,)).fetchone()
return self._to_dict(row)
def update_entry(
self,
*,
entry_id: str,
status: str,
error_code: str | None = None,
error_message: str | None = None,
finished_at: str | None = None,
) -> None:
if status not in VALID_HISTORY_STATUSES:
raise ValueError("invalid status")
finished_value = finished_at or self._now_iso()
with self._connection() as conn:
conn.execute(
"""
UPDATE history
SET status = ?, error_code = ?, error_message = ?, finished_at = ?
WHERE id = ?
""",
(status, error_code, error_message, finished_value, entry_id),
)
def list_history(self, limit: int = 100) -> list[dict]:
max_limit = max(1, min(limit, 200))
with self._connection() as conn:
rows = conn.execute(
"""
SELECT * FROM history
ORDER BY created_at DESC
LIMIT ?
""",
(max_limit,),
).fetchall()
return [self._to_dict(row) for row in rows]
def insert_entry_for_testing(self, entry: dict) -> None:
with self._connection() as conn:
conn.execute(
"""
INSERT INTO history (
id, operation, status, source, destination, path,
error_code, error_message, created_at, finished_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
entry["id"],
entry["operation"],
entry["status"],
entry.get("source"),
entry.get("destination"),
entry.get("path"),
entry.get("error_code"),
entry.get("error_message"),
entry["created_at"],
entry.get("finished_at"),
),
)
def _ensure_schema(self) -> None:
db_path = Path(self._db_path)
if db_path.parent and str(db_path.parent) not in {"", "."}:
db_path.parent.mkdir(parents=True, exist_ok=True)
with self._connection() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS history (
id TEXT PRIMARY KEY,
operation TEXT NOT NULL,
status TEXT NOT NULL,
source TEXT NULL,
destination TEXT NULL,
path TEXT NULL,
error_code TEXT NULL,
error_message TEXT NULL,
created_at TEXT NOT NULL,
finished_at TEXT NULL
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_history_created_at_desc
ON history(created_at DESC)
"""
)
@contextmanager
def _connection(self):
conn = sqlite3.connect(self._db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
@staticmethod
def _to_dict(row: sqlite3.Row | None) -> dict | None:
if row is None:
return None
return dict(row)
@staticmethod
def _now_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")