from __future__ import annotations import os import shutil import threading from pathlib import Path from backend.app.db.history_repository import HistoryRepository from backend.app.db.task_repository import TaskRepository from backend.app.fs.filesystem_adapter import FilesystemAdapter class TaskRunner: def __init__(self, repository: TaskRepository, filesystem: FilesystemAdapter, history_repository: HistoryRepository | None = None): self._repository = repository self._filesystem = filesystem self._history_repository = history_repository def enqueue_copy_file(self, task_id: str, source: str, destination: str, total_bytes: int) -> None: thread = threading.Thread( target=self._run_copy_file, args=(task_id, source, destination, total_bytes), daemon=True, ) thread.start() def enqueue_copy_directory(self, task_id: str, source: str, destination: str) -> None: thread = threading.Thread( target=self._run_copy_directory, args=(task_id, source, destination), daemon=True, ) thread.start() def enqueue_copy_batch(self, task_id: str, items: list[dict[str, str]]) -> None: thread = threading.Thread( target=self._run_copy_batch, args=(task_id, items), daemon=True, ) thread.start() def enqueue_move_file( self, task_id: str, source: str, destination: str, total_bytes: int, same_root: bool, ) -> None: thread = threading.Thread( target=self._run_move_file, args=(task_id, source, destination, total_bytes, same_root), daemon=True, ) thread.start() def enqueue_move_directory(self, task_id: str, source: str, destination: str) -> None: thread = threading.Thread( target=self._run_move_directory, args=(task_id, source, destination), daemon=True, ) thread.start() def enqueue_move_batch(self, task_id: str, items: list[dict[str, str]]) -> None: thread = threading.Thread( target=self._run_move_batch, args=(task_id, items), daemon=True, ) thread.start() def enqueue_duplicate_batch(self, task_id: str, items: list[dict[str, str]]) -> None: thread = threading.Thread( target=self._run_duplicate_batch, args=(task_id, items), daemon=True, ) thread.start() def enqueue_delete_path(self, task_id: str, target: str, kind: str, recursive: bool) -> None: thread = threading.Thread( target=self._run_delete_path, args=(task_id, target, kind, recursive), daemon=True, ) thread.start() def enqueue_archive_prepare(self, worker) -> None: thread = threading.Thread( target=worker, daemon=True, ) thread.start() def _run_copy_file(self, task_id: str, source: str, destination: str, total_bytes: int) -> None: if not self._repository.mark_running( task_id=task_id, done_bytes=0, total_bytes=total_bytes, current_item=source, ): self._finalize_if_already_cancelled(task_id, done_bytes=0, total_bytes=total_bytes) return progress = {"done": 0} def on_progress(done_bytes: int) -> None: progress["done"] = done_bytes self._repository.update_progress( task_id=task_id, done_bytes=done_bytes, total_bytes=total_bytes, current_item=source, ) try: self._filesystem.copy_file(source=source, destination=destination, on_progress=on_progress) self._complete_or_cancel_file_task( task_id=task_id, done_bytes=total_bytes, total_bytes=total_bytes, ) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=source, done_bytes=progress["done"], total_bytes=total_bytes, ) self._update_history_failed(task_id, str(exc)) def _run_copy_directory(self, task_id: str, source: str, destination: str) -> None: if not self._repository.mark_running( task_id=task_id, done_items=0, total_items=1, current_item=source, ): self._finalize_if_already_cancelled(task_id, done_items=0, total_items=1) return try: self._filesystem.copy_directory(source=source, destination=destination) self._complete_or_cancel_item_task( task_id=task_id, done_items=1, total_items=1, ) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=source, done_bytes=None, total_bytes=None, done_items=0, total_items=1, ) self._update_history_failed(task_id, str(exc)) def _run_copy_batch(self, task_id: str, items: list[dict[str, str]]) -> None: total_items = len(items) current_item = items[0]["source"] if items else None if not self._repository.mark_running( task_id=task_id, done_items=0, total_items=total_items, current_item=current_item, ): self._finalize_if_already_cancelled(task_id, done_items=0, total_items=total_items) return completed_items = 0 for index, item in enumerate(items): if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return source = item["source"] destination = item["destination"] try: if item["kind"] == "directory": self._filesystem.copy_directory(source=source, destination=destination) else: self._filesystem.copy_file(source=source, destination=destination) completed_items = index + 1 next_item = items[index + 1]["source"] if index + 1 < total_items else source self._repository.update_progress( task_id=task_id, done_items=completed_items, total_items=total_items, current_item=next_item, ) if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=source, done_bytes=None, total_bytes=None, done_items=completed_items, total_items=total_items, ) self._update_history_failed(task_id, str(exc)) return self._complete_or_cancel_item_task( task_id=task_id, done_items=total_items, total_items=total_items, ) def _run_move_file( self, task_id: str, source: str, destination: str, total_bytes: int, same_root: bool, ) -> None: if not self._repository.mark_running( task_id=task_id, done_bytes=0, total_bytes=total_bytes, current_item=source, ): self._finalize_if_already_cancelled(task_id, done_bytes=0, total_bytes=total_bytes) return progress = {"done": 0} try: if same_root: self._filesystem.move_file(source=source, destination=destination) self._complete_or_cancel_file_task( task_id=task_id, done_bytes=total_bytes, total_bytes=total_bytes, ) return def on_progress(done_bytes: int) -> None: progress["done"] = done_bytes self._repository.update_progress( task_id=task_id, done_bytes=done_bytes, total_bytes=total_bytes, current_item=source, ) self._filesystem.copy_file(source=source, destination=destination, on_progress=on_progress) self._filesystem.delete_file(Path(source)) self._complete_or_cancel_file_task( task_id=task_id, done_bytes=total_bytes, total_bytes=total_bytes, ) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=source, done_bytes=progress["done"], total_bytes=total_bytes, ) self._update_history_failed(task_id, str(exc)) def _run_move_directory(self, task_id: str, source: str, destination: str) -> None: if not self._repository.mark_running( task_id=task_id, done_items=0, total_items=1, current_item=source, ): self._finalize_if_already_cancelled(task_id, done_items=0, total_items=1) return try: self._filesystem.move_directory(source=source, destination=destination) self._complete_or_cancel_item_task( task_id=task_id, done_items=1, total_items=1, ) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=source, done_items=0, total_items=1, ) self._update_history_failed(task_id, str(exc)) def _run_move_batch(self, task_id: str, items: list[dict[str, str]]) -> None: total_items = len(items) current_item = items[0]["source"] if items else None if not self._repository.mark_running( task_id=task_id, done_items=0, total_items=total_items, current_item=current_item, ): self._finalize_if_already_cancelled(task_id, done_items=0, total_items=total_items) return completed_items = 0 for index, item in enumerate(items): if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return source = item["source"] destination = item["destination"] try: if item["kind"] == "directory": self._filesystem.move_directory(source=source, destination=destination) else: self._filesystem.move_file(source=source, destination=destination) completed_items = index + 1 next_item = items[index + 1]["source"] if index + 1 < total_items else source self._repository.update_progress( task_id=task_id, done_items=completed_items, total_items=total_items, current_item=next_item, ) if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=source, done_bytes=None, total_bytes=None, done_items=completed_items, total_items=total_items, ) self._update_history_failed(task_id, str(exc)) return self._complete_or_cancel_item_task( task_id=task_id, done_items=total_items, total_items=total_items, ) def _run_duplicate_batch(self, task_id: str, items: list[dict[str, str]]) -> None: total_items = len(items) current_item = items[0]["source"] if items else None if not self._repository.mark_running( task_id=task_id, done_items=0, total_items=total_items, current_item=current_item, ): self._finalize_if_already_cancelled(task_id, done_items=0, total_items=total_items) return completed_items = 0 for index, item in enumerate(items): if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return source = item["source"] destination = item["destination"] try: if item["kind"] == "directory": self._duplicate_directory(source=Path(source), destination=Path(destination)) else: self._filesystem.copy_file(source=source, destination=destination) completed_items = index + 1 next_item = items[index + 1]["source"] if index + 1 < total_items else source self._repository.update_progress( task_id=task_id, done_items=completed_items, total_items=total_items, current_item=next_item, ) if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return except OSError as exc: self._cleanup_partial_duplicate(Path(destination)) self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=source, done_bytes=None, total_bytes=None, done_items=completed_items, total_items=total_items, ) self._update_history_failed(task_id, str(exc)) return self._complete_or_cancel_item_task( task_id=task_id, done_items=total_items, total_items=total_items, ) def _run_delete_path(self, task_id: str, target: str, kind: str, recursive: bool) -> None: if not self._repository.mark_running( task_id=task_id, done_items=0, total_items=1, current_item=target, ): self._finalize_if_already_cancelled(task_id, done_items=0, total_items=1) return try: path = Path(target) if kind == "file": self._filesystem.delete_file(path) elif recursive: self._filesystem.delete_directory_recursive(path) else: self._filesystem.delete_empty_directory(path) self._complete_or_cancel_item_task( task_id=task_id, done_items=1, total_items=1, ) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=target, done_bytes=None, total_bytes=None, done_items=0, total_items=1, ) self._update_history_failed(task_id, str(exc)) def _duplicate_directory(self, source: Path, destination: Path) -> None: destination.mkdir() copied_directories: list[tuple[Path, Path]] = [(source, destination)] try: for root, dirnames, filenames in os.walk(source, topdown=True, followlinks=False): root_path = Path(root) target_root = destination / root_path.relative_to(source) dirnames[:] = [name for name in dirnames if not name.startswith("._")] for name in dirnames: source_dir = root_path / name if source_dir.is_symlink(): raise OSError("Source directory must not contain symlinks") target_dir = target_root / name target_dir.mkdir() copied_directories.append((source_dir, target_dir)) for name in filenames: if name.startswith("._"): continue source_file = root_path / name if source_file.is_symlink(): raise OSError("Source directory must not contain symlinks") self._filesystem.copy_file( source=str(source_file), destination=str(target_root / name), ) for source_dir, target_dir in reversed(copied_directories): shutil.copystat(source_dir, target_dir, follow_symlinks=False) except Exception: self._cleanup_partial_duplicate(destination) raise def _cleanup_partial_duplicate(self, path: Path) -> None: if not path.exists(): return if path.is_dir(): shutil.rmtree(path) return path.unlink() def _is_cancel_requested(self, task_id: str) -> bool: task = self._repository.get_task(task_id) return bool(task) and task["status"] == "cancelling" def _finalize_if_already_cancelled( self, task_id: str, *, done_bytes: int | None = None, total_bytes: int | None = None, done_items: int | None = None, total_items: int | None = None, ) -> None: task = self._repository.get_task(task_id) if task and task["status"] == "cancelled": self._update_history_cancelled(task_id) return if task and task["status"] == "cancelling": self._finalize_cancelled( task_id, done_bytes=done_bytes, total_bytes=total_bytes, done_items=done_items, total_items=total_items, ) def _complete_or_cancel_file_task( self, *, task_id: str, done_bytes: int | None, total_bytes: int | None, ) -> None: if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_bytes=done_bytes, total_bytes=total_bytes) return if self._repository.mark_completed( task_id=task_id, done_bytes=done_bytes, total_bytes=total_bytes, ): self._update_history_completed(task_id) return self._finalize_if_already_cancelled(task_id, done_bytes=done_bytes, total_bytes=total_bytes) def _complete_or_cancel_item_task( self, *, task_id: str, done_items: int | None, total_items: int | None, ) -> None: if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=done_items, total_items=total_items) return if self._repository.mark_completed( task_id=task_id, done_items=done_items, total_items=total_items, ): self._update_history_completed(task_id) return self._finalize_if_already_cancelled(task_id, done_items=done_items, total_items=total_items) def _finalize_cancelled( self, task_id: str, *, done_bytes: int | None = None, total_bytes: int | None = None, done_items: int | None = None, total_items: int | None = None, ) -> None: if self._repository.finalize_cancelled( task_id=task_id, done_bytes=done_bytes, total_bytes=total_bytes, done_items=done_items, total_items=total_items, ): self._update_history_cancelled(task_id) def _update_history_completed(self, task_id: str) -> None: if self._history_repository: self._history_repository.update_entry(entry_id=task_id, status="completed") def _update_history_failed(self, task_id: str, error_message: str) -> None: if self._history_repository: self._history_repository.update_entry( entry_id=task_id, status="failed", error_code="io_error", error_message=error_message, ) def _update_history_cancelled(self, task_id: str) -> None: if self._history_repository: self._history_repository.update_entry(entry_id=task_id, status="cancelled")