from __future__ import annotations import threading from pathlib import Path from backend.app.db.history_repository import HistoryRepository from backend.app.db.task_repository import TaskRepository from backend.app.fs.filesystem_adapter import FilesystemAdapter class TaskRunner: def __init__(self, repository: TaskRepository, filesystem: FilesystemAdapter, history_repository: HistoryRepository | None = None): self._repository = repository self._filesystem = filesystem self._history_repository = history_repository def enqueue_copy_file(self, task_id: str, source: str, destination: str, total_bytes: int) -> None: thread = threading.Thread( target=self._run_copy_file, args=(task_id, source, destination, total_bytes), daemon=True, ) thread.start() def enqueue_copy_directory(self, task_id: str, source: str, destination: str) -> None: thread = threading.Thread( target=self._run_copy_directory, args=(task_id, source, destination), daemon=True, ) thread.start() def enqueue_copy_batch(self, task_id: str, items: list[dict[str, str]]) -> None: thread = threading.Thread( target=self._run_copy_batch, args=(task_id, items), daemon=True, ) thread.start() def enqueue_move_file( self, task_id: str, source: str, destination: str, total_bytes: int, same_root: bool, ) -> None: thread = threading.Thread( target=self._run_move_file, args=(task_id, source, destination, total_bytes, same_root), daemon=True, ) thread.start() def enqueue_move_directory(self, task_id: str, source: str, destination: str) -> None: thread = threading.Thread( target=self._run_move_directory, args=(task_id, source, destination), daemon=True, ) thread.start() def enqueue_move_batch(self, task_id: str, items: list[dict[str, str]]) -> None: thread = threading.Thread( target=self._run_move_batch, args=(task_id, items), daemon=True, ) thread.start() def enqueue_archive_prepare(self, worker) -> None: thread = threading.Thread( target=worker, daemon=True, ) thread.start() def _run_copy_file(self, task_id: str, source: str, destination: str, total_bytes: int) -> None: self._repository.mark_running( task_id=task_id, done_bytes=0, total_bytes=total_bytes, current_item=source, ) progress = {"done": 0} def on_progress(done_bytes: int) -> None: progress["done"] = done_bytes self._repository.update_progress( task_id=task_id, done_bytes=done_bytes, total_bytes=total_bytes, current_item=source, ) try: self._filesystem.copy_file(source=source, destination=destination, on_progress=on_progress) self._repository.mark_completed( task_id=task_id, done_bytes=total_bytes, total_bytes=total_bytes, ) self._update_history_completed(task_id) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=source, done_bytes=progress["done"], total_bytes=total_bytes, ) self._update_history_failed(task_id, str(exc)) def _run_copy_directory(self, task_id: str, source: str, destination: str) -> None: self._repository.mark_running( task_id=task_id, done_items=0, total_items=1, current_item=source, ) try: self._filesystem.copy_directory(source=source, destination=destination) self._repository.mark_completed( task_id=task_id, done_items=1, total_items=1, ) self._update_history_completed(task_id) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=source, done_bytes=None, total_bytes=None, done_items=0, total_items=1, ) self._update_history_failed(task_id, str(exc)) def _run_copy_batch(self, task_id: str, items: list[dict[str, str]]) -> None: total_items = len(items) current_item = items[0]["source"] if items else None self._repository.mark_running( task_id=task_id, done_items=0, total_items=total_items, current_item=current_item, ) completed_items = 0 for index, item in enumerate(items): source = item["source"] destination = item["destination"] try: if item["kind"] == "directory": self._filesystem.copy_directory(source=source, destination=destination) else: self._filesystem.copy_file(source=source, destination=destination) completed_items = index + 1 next_item = items[index + 1]["source"] if index + 1 < total_items else source self._repository.update_progress( task_id=task_id, done_items=completed_items, total_items=total_items, current_item=next_item, ) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=source, done_bytes=None, total_bytes=None, done_items=completed_items, total_items=total_items, ) self._update_history_failed(task_id, str(exc)) return self._repository.mark_completed( task_id=task_id, done_items=total_items, total_items=total_items, ) self._update_history_completed(task_id) def _run_move_file( self, task_id: str, source: str, destination: str, total_bytes: int, same_root: bool, ) -> None: self._repository.mark_running( task_id=task_id, done_bytes=0, total_bytes=total_bytes, current_item=source, ) progress = {"done": 0} try: if same_root: self._filesystem.move_file(source=source, destination=destination) self._repository.mark_completed( task_id=task_id, done_bytes=total_bytes, total_bytes=total_bytes, ) self._update_history_completed(task_id) return def on_progress(done_bytes: int) -> None: progress["done"] = done_bytes self._repository.update_progress( task_id=task_id, done_bytes=done_bytes, total_bytes=total_bytes, current_item=source, ) self._filesystem.copy_file(source=source, destination=destination, on_progress=on_progress) self._filesystem.delete_file(Path(source)) self._repository.mark_completed( task_id=task_id, done_bytes=total_bytes, total_bytes=total_bytes, ) self._update_history_completed(task_id) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=source, done_bytes=progress["done"], total_bytes=total_bytes, ) self._update_history_failed(task_id, str(exc)) def _run_move_directory(self, task_id: str, source: str, destination: str) -> None: self._repository.mark_running( task_id=task_id, done_items=0, total_items=1, current_item=source, ) try: self._filesystem.move_directory(source=source, destination=destination) self._repository.mark_completed( task_id=task_id, done_items=1, total_items=1, ) self._update_history_completed(task_id) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=source, done_items=0, total_items=1, ) self._update_history_failed(task_id, str(exc)) def _run_move_batch(self, task_id: str, items: list[dict[str, str]]) -> None: total_items = len(items) current_item = items[0]["source"] if items else None self._repository.mark_running( task_id=task_id, done_items=0, total_items=total_items, current_item=current_item, ) completed_items = 0 for index, item in enumerate(items): source = item["source"] destination = item["destination"] try: if item["kind"] == "directory": self._filesystem.move_directory(source=source, destination=destination) else: self._filesystem.move_file(source=source, destination=destination) completed_items = index + 1 next_item = items[index + 1]["source"] if index + 1 < total_items else source self._repository.update_progress( task_id=task_id, done_items=completed_items, total_items=total_items, current_item=next_item, ) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=source, done_bytes=None, total_bytes=None, done_items=completed_items, total_items=total_items, ) self._update_history_failed(task_id, str(exc)) return self._repository.mark_completed( task_id=task_id, done_items=total_items, total_items=total_items, ) self._update_history_completed(task_id) def _update_history_completed(self, task_id: str) -> None: if self._history_repository: self._history_repository.update_entry(entry_id=task_id, status="completed") def _update_history_failed(self, task_id: str, error_message: str) -> None: if self._history_repository: self._history_repository.update_entry( entry_id=task_id, status="failed", error_code="io_error", error_message=error_message, )