from __future__ import annotations import os import shutil import threading from pathlib import Path from backend.app.db.history_repository import HistoryRepository from backend.app.db.task_repository import TaskRepository from backend.app.fs.filesystem_adapter import FilesystemAdapter class TaskRunner: def __init__(self, repository: TaskRepository, filesystem: FilesystemAdapter, history_repository: HistoryRepository | None = None): self._repository = repository self._filesystem = filesystem self._history_repository = history_repository def enqueue_copy_file(self, task_id: str, source: str, destination: str, total_bytes: int, current_item: str) -> None: thread = threading.Thread( target=self._run_copy_file, args=(task_id, source, destination, total_bytes, current_item), daemon=True, ) thread.start() def enqueue_copy_directory(self, task_id: str, item: dict[str, object]) -> None: thread = threading.Thread( target=self._run_copy_directory, args=(task_id, item), daemon=True, ) thread.start() def enqueue_copy_batch(self, task_id: str, items: list[dict[str, str]]) -> None: thread = threading.Thread( target=self._run_copy_batch, args=(task_id, items), daemon=True, ) thread.start() def enqueue_move_file( self, task_id: str, source: str, destination: str, total_bytes: int, same_root: bool, current_item: str, ) -> None: thread = threading.Thread( target=self._run_move_file, args=(task_id, source, destination, total_bytes, same_root, current_item), daemon=True, ) thread.start() def enqueue_move_directory(self, task_id: str, item: dict[str, object]) -> None: thread = threading.Thread( target=self._run_move_directory, args=(task_id, item), daemon=True, ) thread.start() def enqueue_move_batch(self, task_id: str, items: list[dict[str, str]]) -> None: thread = threading.Thread( target=self._run_move_batch, args=(task_id, items), daemon=True, ) thread.start() def enqueue_duplicate_batch(self, task_id: str, items: list[dict[str, str]]) -> None: thread = threading.Thread( target=self._run_duplicate_batch, args=(task_id, items), daemon=True, ) thread.start() def enqueue_delete_path(self, task_id: str, target: str, kind: str, recursive: bool) -> None: thread = threading.Thread( target=self._run_delete_path, args=(task_id, target, kind, recursive), daemon=True, ) thread.start() def enqueue_archive_prepare(self, worker) -> None: thread = threading.Thread( target=worker, daemon=True, ) thread.start() def _run_copy_file(self, task_id: str, source: str, destination: str, total_bytes: int, current_item: str) -> None: if not self._repository.mark_running( task_id=task_id, done_bytes=0, total_bytes=total_bytes, done_items=0, total_items=1, current_item=current_item, ): self._finalize_if_already_cancelled(task_id, done_bytes=0, total_bytes=total_bytes, done_items=0, total_items=1) return progress = {"done": 0} def on_progress(done_bytes: int) -> None: progress["done"] = done_bytes self._repository.update_progress( task_id=task_id, done_bytes=done_bytes, total_bytes=total_bytes, done_items=0, total_items=1, current_item=current_item, ) try: self._filesystem.copy_file(source=source, destination=destination, on_progress=on_progress) self._complete_or_cancel_file_task( task_id=task_id, done_bytes=total_bytes, total_bytes=total_bytes, done_items=1, total_items=1, ) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=source, done_bytes=progress["done"], total_bytes=total_bytes, done_items=0, total_items=1, ) self._update_history_failed(task_id, str(exc)) def _run_copy_directory(self, task_id: str, item: dict[str, object]) -> None: files = self._file_entries(item) directories = self._directory_entries(item) total_items = len(files) if not self._repository.mark_running( task_id=task_id, done_items=0, total_items=total_items, current_item=files[0]["label"] if files else None, ): self._finalize_if_already_cancelled(task_id, done_items=0, total_items=total_items) return try: completed_items = self._copy_directory_files( directories, files, task_id=task_id, completed_items=0, total_items=total_items, ) if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return self._complete_or_cancel_item_task( task_id=task_id, done_items=completed_items, total_items=total_items, ) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=str(item["source"]), done_bytes=None, total_bytes=None, done_items=self._completed_files(task_id), total_items=total_items, ) self._update_history_failed(task_id, str(exc)) def _run_copy_batch(self, task_id: str, items: list[dict[str, str]]) -> None: total_items = self._total_file_count(items) current_item = self._first_file_label(items) if not self._repository.mark_running( task_id=task_id, done_items=0, total_items=total_items, current_item=current_item, ): self._finalize_if_already_cancelled(task_id, done_items=0, total_items=total_items) return completed_items = 0 for index, item in enumerate(items): if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return try: if item["kind"] == "directory": completed_items = self._copy_directory_item(task_id, item, completed_items, total_items) else: file_entry = self._file_entries(item)[0] completed_items = self._copy_single_planned_file(task_id, file_entry, completed_items, total_items) if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=str(item["source"]), done_bytes=None, total_bytes=None, done_items=completed_items, total_items=total_items, ) self._update_history_failed(task_id, str(exc)) return self._complete_or_cancel_item_task( task_id=task_id, done_items=total_items, total_items=total_items, ) def _run_move_file( self, task_id: str, source: str, destination: str, total_bytes: int, same_root: bool, current_item: str, ) -> None: if not self._repository.mark_running( task_id=task_id, done_bytes=0, total_bytes=total_bytes, done_items=0, total_items=1, current_item=current_item, ): self._finalize_if_already_cancelled(task_id, done_bytes=0, total_bytes=total_bytes, done_items=0, total_items=1) return progress = {"done": 0} try: if same_root: self._filesystem.move_file(source=source, destination=destination) self._complete_or_cancel_file_task( task_id=task_id, done_bytes=total_bytes, total_bytes=total_bytes, done_items=1, total_items=1, ) return def on_progress(done_bytes: int) -> None: progress["done"] = done_bytes self._repository.update_progress( task_id=task_id, done_bytes=done_bytes, total_bytes=total_bytes, done_items=0, total_items=1, current_item=current_item, ) self._filesystem.copy_file(source=source, destination=destination, on_progress=on_progress) self._filesystem.delete_file(Path(source)) self._complete_or_cancel_file_task( task_id=task_id, done_bytes=total_bytes, total_bytes=total_bytes, done_items=1, total_items=1, ) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=source, done_bytes=progress["done"], total_bytes=total_bytes, done_items=0, total_items=1, ) self._update_history_failed(task_id, str(exc)) def _run_move_directory(self, task_id: str, item: dict[str, object]) -> None: files = self._file_entries(item) directories = self._directory_entries(item) total_items = len(files) if not self._repository.mark_running( task_id=task_id, done_items=0, total_items=total_items, current_item=files[0]["label"] if files else None, ): self._finalize_if_already_cancelled(task_id, done_items=0, total_items=total_items) return try: completed_items = self._move_directory_files( directories, files, task_id=task_id, completed_items=0, total_items=total_items, ) if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return self._complete_or_cancel_item_task( task_id=task_id, done_items=completed_items, total_items=total_items, ) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=str(item["source"]), done_items=self._completed_files(task_id), total_items=total_items, ) self._update_history_failed(task_id, str(exc)) def _run_move_batch(self, task_id: str, items: list[dict[str, str]]) -> None: total_items = self._total_file_count(items) current_item = self._first_file_label(items) if not self._repository.mark_running( task_id=task_id, done_items=0, total_items=total_items, current_item=current_item, ): self._finalize_if_already_cancelled(task_id, done_items=0, total_items=total_items) return completed_items = 0 for index, item in enumerate(items): if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return try: if item["kind"] == "directory": completed_items = self._move_directory_item(task_id, item, completed_items, total_items) else: file_entry = self._file_entries(item)[0] completed_items = self._move_single_planned_file(task_id, file_entry, completed_items, total_items) if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=str(item["source"]), done_bytes=None, total_bytes=None, done_items=completed_items, total_items=total_items, ) self._update_history_failed(task_id, str(exc)) return self._complete_or_cancel_item_task( task_id=task_id, done_items=total_items, total_items=total_items, ) def _run_duplicate_batch(self, task_id: str, items: list[dict[str, str]]) -> None: total_items = self._total_file_count(items) current_item = self._first_file_label(items) if not self._repository.mark_running( task_id=task_id, done_items=0, total_items=total_items, current_item=current_item, ): self._finalize_if_already_cancelled(task_id, done_items=0, total_items=total_items) return completed_items = 0 for index, item in enumerate(items): if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return try: if item["kind"] == "directory": completed_items = self._copy_directory_item(task_id, item, completed_items, total_items, cleanup_on_failure=True) else: file_entry = self._file_entries(item)[0] completed_items = self._copy_single_planned_file(task_id, file_entry, completed_items, total_items) if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items) return except OSError as exc: if item["kind"] == "directory": self._cleanup_partial_duplicate(Path(str(item["destination"]))) else: self._cleanup_partial_duplicate(Path(self._file_entries(item)[0]["destination"])) self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=str(item["source"]), done_bytes=None, total_bytes=None, done_items=completed_items, total_items=total_items, ) self._update_history_failed(task_id, str(exc)) return self._complete_or_cancel_item_task( task_id=task_id, done_items=total_items, total_items=total_items, ) def _run_delete_path(self, task_id: str, target: str, kind: str, recursive: bool) -> None: if not self._repository.mark_running( task_id=task_id, done_items=0, total_items=1, current_item=target, ): self._finalize_if_already_cancelled(task_id, done_items=0, total_items=1) return try: path = Path(target) if kind == "file": self._filesystem.delete_file(path) elif recursive: self._filesystem.delete_directory_recursive(path) else: self._filesystem.delete_empty_directory(path) self._complete_or_cancel_item_task( task_id=task_id, done_items=1, total_items=1, ) except OSError as exc: self._repository.mark_failed( task_id=task_id, error_code="io_error", error_message=str(exc), failed_item=target, done_bytes=None, total_bytes=None, done_items=0, total_items=1, ) self._update_history_failed(task_id, str(exc)) def _cleanup_partial_duplicate(self, path: Path) -> None: if not path.exists(): return if path.is_dir(): shutil.rmtree(path) return path.unlink() @staticmethod def _file_entries(item: dict[str, object]) -> list[dict[str, str]]: return list(item.get("files", [])) # type: ignore[arg-type] @staticmethod def _directory_entries(item: dict[str, object]) -> list[dict[str, str]]: return list(item.get("directories", [])) # type: ignore[arg-type] def _total_file_count(self, items: list[dict[str, object]]) -> int: return sum(len(self._file_entries(item)) for item in items) def _first_file_label(self, items: list[dict[str, object]]) -> str | None: for item in items: files = self._file_entries(item) if files: return files[0]["label"] return None def _completed_files(self, task_id: str) -> int: task = self._repository.get_task(task_id) if not task or task["done_items"] is None: return 0 return int(task["done_items"]) def _copy_single_planned_file( self, task_id: str, file_entry: dict[str, str], completed_items: int, total_items: int, ) -> int: self._repository.update_progress( task_id=task_id, done_items=completed_items, total_items=total_items, current_item=file_entry["label"], ) self._filesystem.copy_file(source=file_entry["source"], destination=file_entry["destination"]) completed_items += 1 self._repository.update_progress( task_id=task_id, done_items=completed_items, total_items=total_items, current_item=self._next_item_label_after_completion(completed_items, total_items, file_entry["label"]), ) return completed_items def _copy_directory_item( self, task_id: str, item: dict[str, object], completed_items: int, total_items: int, cleanup_on_failure: bool = False, ) -> int: directories = self._directory_entries(item) files = self._file_entries(item) try: return self._copy_directory_files(directories, files, task_id=task_id, completed_items=completed_items, total_items=total_items) except Exception: if cleanup_on_failure: self._cleanup_partial_duplicate(Path(str(item["destination"]))) raise def _copy_directory_files( self, directories: list[dict[str, str]], files: list[dict[str, str]], *, task_id: str | None = None, completed_items: int = 0, total_items: int = 0, ) -> int: for directory in directories: Path(directory["destination"]).mkdir(parents=True, exist_ok=True) for file_entry in files: if task_id is not None and self._is_cancel_requested(task_id): return completed_items if task_id is not None: self._repository.update_progress( task_id=task_id, done_items=completed_items, total_items=total_items, current_item=file_entry["label"], ) self._filesystem.copy_file(source=file_entry["source"], destination=file_entry["destination"]) completed_items += 1 if task_id is not None: self._repository.update_progress( task_id=task_id, done_items=completed_items, total_items=total_items, current_item=self._next_item_label_after_completion(completed_items, total_items, file_entry["label"]), ) if task_id is not None and self._is_cancel_requested(task_id): return completed_items for directory in reversed(directories): shutil.copystat(Path(directory["source"]), Path(directory["destination"]), follow_symlinks=False) return completed_items def _move_single_planned_file( self, task_id: str, file_entry: dict[str, str], completed_items: int, total_items: int, ) -> int: self._repository.update_progress( task_id=task_id, done_items=completed_items, total_items=total_items, current_item=file_entry["label"], ) self._filesystem.move_file(source=file_entry["source"], destination=file_entry["destination"]) completed_items += 1 self._repository.update_progress( task_id=task_id, done_items=completed_items, total_items=total_items, current_item=self._next_item_label_after_completion(completed_items, total_items, file_entry["label"]), ) return completed_items def _move_directory_item( self, task_id: str, item: dict[str, object], completed_items: int, total_items: int, ) -> int: return self._move_directory_files(self._directory_entries(item), self._file_entries(item), task_id=task_id, completed_items=completed_items, total_items=total_items) def _move_directory_files( self, directories: list[dict[str, str]], files: list[dict[str, str]], *, task_id: str | None = None, completed_items: int = 0, total_items: int = 0, ) -> int: for directory in directories: Path(directory["destination"]).mkdir(parents=True, exist_ok=True) for file_entry in files: if task_id is not None and self._is_cancel_requested(task_id): return completed_items if task_id is not None: self._repository.update_progress( task_id=task_id, done_items=completed_items, total_items=total_items, current_item=file_entry["label"], ) self._filesystem.move_file(source=file_entry["source"], destination=file_entry["destination"]) completed_items += 1 if task_id is not None: self._repository.update_progress( task_id=task_id, done_items=completed_items, total_items=total_items, current_item=self._next_item_label_after_completion(completed_items, total_items, file_entry["label"]), ) if task_id is not None and self._is_cancel_requested(task_id): return completed_items for directory in reversed(directories): shutil.copystat(Path(directory["source"]), Path(directory["destination"]), follow_symlinks=False) for directory in reversed(directories): self._filesystem.delete_empty_directory(Path(directory["source"])) return completed_items @staticmethod def _next_item_label_after_completion(completed_items: int, total_items: int, current_label: str) -> str | None: return None def _is_cancel_requested(self, task_id: str) -> bool: task = self._repository.get_task(task_id) return bool(task) and task["status"] == "cancelling" def _finalize_if_already_cancelled( self, task_id: str, *, done_bytes: int | None = None, total_bytes: int | None = None, done_items: int | None = None, total_items: int | None = None, ) -> None: task = self._repository.get_task(task_id) if task and task["status"] == "cancelled": self._update_history_cancelled(task_id) return if task and task["status"] == "cancelling": self._finalize_cancelled( task_id, done_bytes=done_bytes, total_bytes=total_bytes, done_items=done_items, total_items=total_items, ) def _complete_or_cancel_file_task( self, *, task_id: str, done_bytes: int | None, total_bytes: int | None, done_items: int | None = None, total_items: int | None = None, ) -> None: if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_bytes=done_bytes, total_bytes=total_bytes, done_items=done_items, total_items=total_items) return if self._repository.mark_completed( task_id=task_id, done_bytes=done_bytes, total_bytes=total_bytes, done_items=done_items, total_items=total_items, ): self._update_history_completed(task_id) return self._finalize_if_already_cancelled(task_id, done_bytes=done_bytes, total_bytes=total_bytes, done_items=done_items, total_items=total_items) def _complete_or_cancel_item_task( self, *, task_id: str, done_items: int | None, total_items: int | None, ) -> None: if self._is_cancel_requested(task_id): self._finalize_cancelled(task_id, done_items=done_items, total_items=total_items) return if self._repository.mark_completed( task_id=task_id, done_items=done_items, total_items=total_items, ): self._update_history_completed(task_id) return self._finalize_if_already_cancelled(task_id, done_items=done_items, total_items=total_items) def _finalize_cancelled( self, task_id: str, *, done_bytes: int | None = None, total_bytes: int | None = None, done_items: int | None = None, total_items: int | None = None, ) -> None: if self._repository.finalize_cancelled( task_id=task_id, done_bytes=done_bytes, total_bytes=total_bytes, done_items=done_items, total_items=total_items, ): self._update_history_cancelled(task_id) def _update_history_completed(self, task_id: str) -> None: if self._history_repository: self._history_repository.update_entry(entry_id=task_id, status="completed") def _update_history_failed(self, task_id: str, error_message: str) -> None: if self._history_repository: self._history_repository.update_entry( entry_id=task_id, status="failed", error_code="io_error", error_message=error_message, ) def _update_history_cancelled(self, task_id: str) -> None: if self._history_repository: self._history_repository.update_entry(entry_id=task_id, status="cancelled")