from __future__ import annotations import sys import tempfile import unittest from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parents[3])) from backend.app.db.history_repository import HistoryRepository from backend.app.db.task_repository import TaskRepository from backend.app.services.task_recovery_service import reconcile_persisted_incomplete_tasks class TaskRecoveryServiceTest(unittest.TestCase): def setUp(self) -> None: self.temp_dir = tempfile.TemporaryDirectory() self.db_path = str(Path(self.temp_dir.name) / "tasks.db") self.task_repo = TaskRepository(self.db_path) self.history_repo = HistoryRepository(self.db_path) def tearDown(self) -> None: self.temp_dir.cleanup() def test_reconcile_persisted_incomplete_tasks_marks_old_non_terminal_tasks_failed(self) -> None: self.task_repo.insert_task_for_testing( { "id": "task-running", "operation": "copy", "status": "running", "source": "storage1/a.txt", "destination": "storage2/a.txt", "created_at": "2026-03-10T10:00:00Z", "started_at": "2026-03-10T10:00:01Z", "current_item": "storage1/a.txt", } ) self.history_repo.create_entry( entry_id="task-running", operation="copy", status="queued", source="storage1/a.txt", destination="storage2/a.txt", created_at="2026-03-10T10:00:00Z", ) self.task_repo.insert_task_for_testing( { "id": "task-ready", "operation": "download", "status": "ready", "source": "single_directory_zip", "destination": "docs.zip", "created_at": "2026-03-10T10:02:00Z", "finished_at": "2026-03-10T10:03:00Z", } ) changed = reconcile_persisted_incomplete_tasks(self.task_repo, self.history_repo) self.assertEqual(changed, ["task-running"]) task = self.task_repo.get_task("task-running") self.assertEqual(task["status"], "failed") self.assertEqual(task["error_code"], "task_interrupted") self.assertEqual(task["error_message"], "Task was interrupted before completion") self.assertIsNone(task["current_item"]) history = self.history_repo.list_history(limit=5)[0] self.assertEqual(history["id"], "task-running") self.assertEqual(history["status"], "failed") self.assertEqual(history["error_code"], "task_interrupted") ready_task = self.task_repo.get_task("task-ready") self.assertEqual(ready_task["status"], "ready") def test_reconcile_persisted_incomplete_tasks_is_noop_when_all_tasks_terminal(self) -> None: self.task_repo.insert_task_for_testing( { "id": "task-completed", "operation": "move", "status": "completed", "source": "storage1/a.txt", "destination": "storage2/a.txt", "created_at": "2026-03-10T10:00:00Z", "finished_at": "2026-03-10T10:00:02Z", } ) changed = reconcile_persisted_incomplete_tasks(self.task_repo, self.history_repo) self.assertEqual(changed, []) self.assertEqual(self.task_repo.get_task("task-completed")["status"], "completed") def test_reconcile_persisted_incomplete_tasks_marks_stale_delete_task_failed(self) -> None: self.task_repo.insert_task_for_testing( { "id": "task-delete", "operation": "delete", "status": "running", "source": "storage1/trash.txt", "destination": "", "created_at": "2026-03-10T10:00:00Z", "started_at": "2026-03-10T10:00:01Z", "current_item": "storage1/trash.txt", } ) changed = reconcile_persisted_incomplete_tasks(self.task_repo, self.history_repo) self.assertEqual(changed, ["task-delete"]) task = self.task_repo.get_task("task-delete") self.assertEqual(task["status"], "failed") self.assertEqual(task["error_code"], "task_interrupted") if __name__ == "__main__": unittest.main()