from __future__ import annotations import threading from pathlib import Path from backend.app.db.task_repository import TaskRepository from backend.app.fs.filesystem_adapter import FilesystemAdapter class TaskRunner: def __init__(self, repository: TaskRepository, filesystem: FilesystemAdapter): self._repository = repository self._filesystem = filesystem def enqueue_copy_file(self, task_id: str, source: str, destination: str, total_bytes: int) -> None: thread = threading.Thread( target=self._run_copy_file, args=(task_id, source, destination, total_bytes), daemon=True, ) thread.start() def enqueue_move_file( self, task_id: str, source: str, destination: str, total_bytes: int, same_root: bool, ) -> None: thread = threading.Thread( target=self._run_move_file, args=(task_id, source, destination, total_bytes, same_root), daemon=True, ) thread.start() def enqueue_move_directory(self, task_id: str, source: str, destination: str) -> None: thread = threading.Thread( target=self._run_move_directory, args=(task_id, source, destination), daemon=True, ) thread.start() def enqueue_move_batch(self, task_id: str, items: list[dict[str, str]]) -> None: thread = threading.Thread( target=self._run_move_batch, args=(task_id, items), daemon=True, ) thread.start() def _run_copy_file(self, task_id: str, source: str, destination: str, total_bytes: int) -> None: self._repository.mark_running( task_id=task_id, done_bytes=0, total_bytes=total_bytes, current_item=source, ) progress = {"done": 0} def on_progress(done_bytes: int) -> None: progress["done"] = done_bytes self._repository.update_progress( task_id=task_id, done_bytes=done_bytes, total_bytes=total_bytes, current_item=source, ) try: self._filesystem.copy_file(source=source, destination=destination, on_progress=on_progress) self._repository.mark_completed( task_id=task_id, done_bytes=total_bytes, total_bytes=total_bytes, ) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=source, done_bytes=progress["done"], total_bytes=total_bytes, ) def _run_move_file( self, task_id: str, source: str, destination: str, total_bytes: int, same_root: bool, ) -> None: self._repository.mark_running( task_id=task_id, done_bytes=0, total_bytes=total_bytes, current_item=source, ) progress = {"done": 0} try: if same_root: self._filesystem.move_file(source=source, destination=destination) self._repository.mark_completed( task_id=task_id, done_bytes=total_bytes, total_bytes=total_bytes, ) return def on_progress(done_bytes: int) -> None: progress["done"] = done_bytes self._repository.update_progress( task_id=task_id, done_bytes=done_bytes, total_bytes=total_bytes, current_item=source, ) self._filesystem.copy_file(source=source, destination=destination, on_progress=on_progress) self._filesystem.delete_file(Path(source)) self._repository.mark_completed( task_id=task_id, done_bytes=total_bytes, total_bytes=total_bytes, ) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=source, done_bytes=progress["done"], total_bytes=total_bytes, ) def _run_move_directory(self, task_id: str, source: str, destination: str) -> None: self._repository.mark_running( task_id=task_id, done_items=0, total_items=1, current_item=source, ) try: self._filesystem.move_directory(source=source, destination=destination) self._repository.mark_completed( task_id=task_id, done_items=1, total_items=1, ) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=source, done_items=0, total_items=1, ) def _run_move_batch(self, task_id: str, items: list[dict[str, str]]) -> None: total_items = len(items) current_item = items[0]["source"] if items else None self._repository.mark_running( task_id=task_id, done_items=0, total_items=total_items, current_item=current_item, ) completed_items = 0 for index, item in enumerate(items): source = item["source"] destination = item["destination"] try: if item["kind"] == "directory": self._filesystem.move_directory(source=source, destination=destination) else: self._filesystem.move_file(source=source, destination=destination) completed_items = index + 1 next_item = items[index + 1]["source"] if index + 1 < total_items else source self._repository.update_progress( task_id=task_id, done_items=completed_items, total_items=total_items, current_item=next_item, ) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=source, done_bytes=None, total_bytes=None, done_items=completed_items, total_items=total_items, ) return self._repository.mark_completed( task_id=task_id, done_items=total_items, total_items=total_items, )