Files
webmanager-mvp/webui/backend/tests/unit/test_task_repository.py
T

113 lines
3.5 KiB
Python

from __future__ import annotations
import sqlite3
import sys
import tempfile
import unittest
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[3]))
from backend.app.db.task_repository import TaskRepository
class TaskRepositoryTest(unittest.TestCase):
def setUp(self) -> None:
self.temp_dir = tempfile.TemporaryDirectory()
self.db_path = str(Path(self.temp_dir.name) / "tasks.db")
self.repo = TaskRepository(self.db_path)
def tearDown(self) -> None:
self.temp_dir.cleanup()
def test_list_tasks_sorted_created_at_desc(self) -> None:
self.repo.insert_task_for_testing(
{
"id": "task-old",
"operation": "copy",
"status": "queued",
"source": "storage1/a",
"destination": "storage2/a",
"created_at": "2026-03-10T09:00:00Z",
}
)
self.repo.insert_task_for_testing(
{
"id": "task-new",
"operation": "move",
"status": "queued",
"source": "storage1/b",
"destination": "storage2/b",
"created_at": "2026-03-10T10:00:00Z",
}
)
tasks = self.repo.list_tasks()
self.assertEqual([task["id"] for task in tasks], ["task-new", "task-old"])
def test_insert_rejects_invalid_status(self) -> None:
with self.assertRaises(ValueError):
self.repo.insert_task_for_testing(
{
"id": "task-x",
"operation": "copy",
"status": "unknown",
"source": "storage1/a",
"destination": "storage2/a",
"created_at": "2026-03-10T09:00:00Z",
}
)
def test_create_download_task_with_requested_status_and_artifact(self) -> None:
created = self.repo.create_task(
operation="download",
source="storage1/docs",
destination="docs.zip",
status="requested",
)
self.repo.upsert_artifact(
task_id=created["id"],
file_path="/tmp/archive.zip",
file_name="docs.zip",
expires_at="2026-03-10T10:30:00Z",
)
task = self.repo.get_task(created["id"])
artifact = self.repo.get_artifact(created["id"])
self.assertEqual(task["operation"], "download")
self.assertEqual(task["status"], "requested")
self.assertEqual(artifact["file_name"], "docs.zip")
def test_migrates_legacy_tasks_schema_missing_source_destination(self) -> None:
legacy_db_path = Path(self.temp_dir.name) / "legacy.db"
conn = sqlite3.connect(legacy_db_path)
conn.execute(
"""
CREATE TABLE tasks (
id TEXT PRIMARY KEY,
operation TEXT NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL
)
"""
)
conn.commit()
conn.close()
repo = TaskRepository(str(legacy_db_path))
created = repo.create_task(
operation="move",
source="storage1/a.txt",
destination="storage2/a.txt",
)
self.assertEqual(created["operation"], "move")
self.assertEqual(created["source"], "storage1/a.txt")
self.assertEqual(created["destination"], "storage2/a.txt")
if __name__ == "__main__":
unittest.main()