827 lines
31 KiB
Python
827 lines
31 KiB
Python
from __future__ import annotations
|
|
|
|
import errno
|
|
import os
|
|
import shutil
|
|
import threading
|
|
from pathlib import Path
|
|
|
|
from backend.app.db.history_repository import HistoryRepository
|
|
from backend.app.db.task_repository import TaskRepository
|
|
from backend.app.fs.filesystem_adapter import FilesystemAdapter
|
|
|
|
|
|
class TaskRunner:
|
|
def __init__(self, repository: TaskRepository, filesystem: FilesystemAdapter, history_repository: HistoryRepository | None = None):
|
|
self._repository = repository
|
|
self._filesystem = filesystem
|
|
self._history_repository = history_repository
|
|
|
|
def enqueue_copy_file(self, task_id: str, source: str, destination: str, total_bytes: int, current_item: str) -> None:
|
|
thread = threading.Thread(
|
|
target=self._run_copy_file,
|
|
args=(task_id, source, destination, total_bytes, current_item),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
def enqueue_copy_directory(self, task_id: str, item: dict[str, object]) -> None:
|
|
thread = threading.Thread(
|
|
target=self._run_copy_directory,
|
|
args=(task_id, item),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
def enqueue_copy_batch(self, task_id: str, items: list[dict[str, str]]) -> None:
|
|
thread = threading.Thread(
|
|
target=self._run_copy_batch,
|
|
args=(task_id, items),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
def enqueue_move_file(
|
|
self,
|
|
task_id: str,
|
|
source: str,
|
|
destination: str,
|
|
total_bytes: int,
|
|
same_root: bool,
|
|
current_item: str,
|
|
) -> None:
|
|
thread = threading.Thread(
|
|
target=self._run_move_file,
|
|
args=(task_id, source, destination, total_bytes, same_root, current_item),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
def enqueue_move_directory(self, task_id: str, item: dict[str, object]) -> None:
|
|
thread = threading.Thread(
|
|
target=self._run_move_directory,
|
|
args=(task_id, item),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
def enqueue_move_batch(self, task_id: str, items: list[dict[str, str]]) -> None:
|
|
thread = threading.Thread(
|
|
target=self._run_move_batch,
|
|
args=(task_id, items),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
def enqueue_duplicate_batch(self, task_id: str, items: list[dict[str, str]]) -> None:
|
|
thread = threading.Thread(
|
|
target=self._run_duplicate_batch,
|
|
args=(task_id, items),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
def enqueue_delete_path(self, task_id: str, item: dict[str, object]) -> None:
|
|
thread = threading.Thread(
|
|
target=self._run_delete_path,
|
|
args=(task_id, item),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
def enqueue_archive_prepare(self, worker) -> None:
|
|
thread = threading.Thread(
|
|
target=worker,
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
def _run_copy_file(self, task_id: str, source: str, destination: str, total_bytes: int, current_item: str) -> None:
|
|
if not self._repository.mark_running(
|
|
task_id=task_id,
|
|
done_bytes=0,
|
|
total_bytes=total_bytes,
|
|
done_items=0,
|
|
total_items=1,
|
|
current_item=current_item,
|
|
):
|
|
self._finalize_if_already_cancelled(task_id, done_bytes=0, total_bytes=total_bytes, done_items=0, total_items=1)
|
|
return
|
|
|
|
progress = {"done": 0}
|
|
|
|
def on_progress(done_bytes: int) -> None:
|
|
progress["done"] = done_bytes
|
|
self._repository.update_progress(
|
|
task_id=task_id,
|
|
done_bytes=done_bytes,
|
|
total_bytes=total_bytes,
|
|
done_items=0,
|
|
total_items=1,
|
|
current_item=current_item,
|
|
)
|
|
|
|
try:
|
|
self._filesystem.copy_file(source=source, destination=destination, on_progress=on_progress)
|
|
self._complete_or_cancel_file_task(
|
|
task_id=task_id,
|
|
done_bytes=total_bytes,
|
|
total_bytes=total_bytes,
|
|
done_items=1,
|
|
total_items=1,
|
|
)
|
|
except OSError as exc:
|
|
self._repository.mark_failed(
|
|
task_id=task_id,
|
|
error_code="io_error",
|
|
error_message=str(exc),
|
|
failed_item=source,
|
|
done_bytes=progress["done"],
|
|
total_bytes=total_bytes,
|
|
done_items=0,
|
|
total_items=1,
|
|
)
|
|
self._update_history_failed(task_id, str(exc))
|
|
|
|
def _run_copy_directory(self, task_id: str, item: dict[str, object]) -> None:
|
|
files = self._file_entries(item)
|
|
directories = self._directory_entries(item)
|
|
total_items = len(files)
|
|
if not self._repository.mark_running(
|
|
task_id=task_id,
|
|
done_items=0,
|
|
total_items=total_items,
|
|
current_item=files[0]["label"] if files else None,
|
|
):
|
|
self._finalize_if_already_cancelled(task_id, done_items=0, total_items=total_items)
|
|
return
|
|
|
|
try:
|
|
completed_items = self._copy_directory_files(
|
|
directories,
|
|
files,
|
|
task_id=task_id,
|
|
completed_items=0,
|
|
total_items=total_items,
|
|
)
|
|
if self._is_cancel_requested(task_id):
|
|
self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items)
|
|
return
|
|
self._complete_or_cancel_item_task(
|
|
task_id=task_id,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
)
|
|
except OSError as exc:
|
|
self._repository.mark_failed(
|
|
task_id=task_id,
|
|
error_code="io_error",
|
|
error_message=str(exc),
|
|
failed_item=str(item["source"]),
|
|
done_bytes=None,
|
|
total_bytes=None,
|
|
done_items=self._completed_files(task_id),
|
|
total_items=total_items,
|
|
)
|
|
self._update_history_failed(task_id, str(exc))
|
|
|
|
def _run_copy_batch(self, task_id: str, items: list[dict[str, str]]) -> None:
|
|
total_items = self._total_file_count(items)
|
|
current_item = self._first_file_label(items)
|
|
if not self._repository.mark_running(
|
|
task_id=task_id,
|
|
done_items=0,
|
|
total_items=total_items,
|
|
current_item=current_item,
|
|
):
|
|
self._finalize_if_already_cancelled(task_id, done_items=0, total_items=total_items)
|
|
return
|
|
|
|
completed_items = 0
|
|
for index, item in enumerate(items):
|
|
if self._is_cancel_requested(task_id):
|
|
self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items)
|
|
return
|
|
try:
|
|
if item["kind"] == "directory":
|
|
completed_items = self._copy_directory_item(task_id, item, completed_items, total_items)
|
|
else:
|
|
file_entry = self._file_entries(item)[0]
|
|
completed_items = self._copy_single_planned_file(task_id, file_entry, completed_items, total_items)
|
|
if self._is_cancel_requested(task_id):
|
|
self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items)
|
|
return
|
|
except OSError as exc:
|
|
self._repository.mark_failed(
|
|
task_id=task_id,
|
|
error_code="io_error",
|
|
error_message=str(exc),
|
|
failed_item=str(item["source"]),
|
|
done_bytes=None,
|
|
total_bytes=None,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
)
|
|
self._update_history_failed(task_id, str(exc))
|
|
return
|
|
|
|
self._complete_or_cancel_item_task(
|
|
task_id=task_id,
|
|
done_items=total_items,
|
|
total_items=total_items,
|
|
)
|
|
|
|
def _run_move_file(
|
|
self,
|
|
task_id: str,
|
|
source: str,
|
|
destination: str,
|
|
total_bytes: int,
|
|
same_root: bool,
|
|
current_item: str,
|
|
) -> None:
|
|
if not self._repository.mark_running(
|
|
task_id=task_id,
|
|
done_bytes=0,
|
|
total_bytes=total_bytes,
|
|
done_items=0,
|
|
total_items=1,
|
|
current_item=current_item,
|
|
):
|
|
self._finalize_if_already_cancelled(task_id, done_bytes=0, total_bytes=total_bytes, done_items=0, total_items=1)
|
|
return
|
|
|
|
progress = {"done": 0}
|
|
|
|
try:
|
|
if same_root:
|
|
self._filesystem.move_file(source=source, destination=destination)
|
|
self._complete_or_cancel_file_task(
|
|
task_id=task_id,
|
|
done_bytes=total_bytes,
|
|
total_bytes=total_bytes,
|
|
done_items=1,
|
|
total_items=1,
|
|
)
|
|
return
|
|
|
|
def on_progress(done_bytes: int) -> None:
|
|
progress["done"] = done_bytes
|
|
self._repository.update_progress(
|
|
task_id=task_id,
|
|
done_bytes=done_bytes,
|
|
total_bytes=total_bytes,
|
|
done_items=0,
|
|
total_items=1,
|
|
current_item=current_item,
|
|
)
|
|
|
|
self._filesystem.copy_file(source=source, destination=destination, on_progress=on_progress)
|
|
self._filesystem.delete_file(Path(source))
|
|
self._complete_or_cancel_file_task(
|
|
task_id=task_id,
|
|
done_bytes=total_bytes,
|
|
total_bytes=total_bytes,
|
|
done_items=1,
|
|
total_items=1,
|
|
)
|
|
except OSError as exc:
|
|
self._repository.mark_failed(
|
|
task_id=task_id,
|
|
error_code="io_error",
|
|
error_message=str(exc),
|
|
failed_item=source,
|
|
done_bytes=progress["done"],
|
|
total_bytes=total_bytes,
|
|
done_items=0,
|
|
total_items=1,
|
|
)
|
|
self._update_history_failed(task_id, str(exc))
|
|
|
|
def _run_move_directory(self, task_id: str, item: dict[str, object]) -> None:
|
|
total_items = int(item.get("progress_total_items", 1))
|
|
source_path = self._item_source_path(item)
|
|
destination_path = self._item_destination_path(item)
|
|
current_item = str(item.get("progress_label") or Path(source_path).name)
|
|
if not self._repository.mark_running(
|
|
task_id=task_id,
|
|
done_items=0,
|
|
total_items=total_items,
|
|
current_item=current_item,
|
|
):
|
|
self._finalize_if_already_cancelled(task_id, done_items=0, total_items=total_items)
|
|
return
|
|
|
|
try:
|
|
self._filesystem.move_directory(source=source_path, destination=destination_path)
|
|
completed_items = total_items
|
|
if self._is_cancel_requested(task_id):
|
|
self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items)
|
|
return
|
|
self._complete_or_cancel_item_task(
|
|
task_id=task_id,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
)
|
|
except OSError as exc:
|
|
self._repository.mark_failed(
|
|
task_id=task_id,
|
|
error_code="io_error",
|
|
error_message=str(exc),
|
|
failed_item=str(item["source"]),
|
|
done_items=self._completed_files(task_id),
|
|
total_items=total_items,
|
|
)
|
|
self._update_history_failed(task_id, str(exc))
|
|
|
|
def _run_move_batch(self, task_id: str, items: list[dict[str, str]]) -> None:
|
|
total_items = self._total_move_work_count(items)
|
|
current_item = self._first_move_item_label(items)
|
|
if not self._repository.mark_running(
|
|
task_id=task_id,
|
|
done_items=0,
|
|
total_items=total_items,
|
|
current_item=current_item,
|
|
):
|
|
self._finalize_if_already_cancelled(task_id, done_items=0, total_items=total_items)
|
|
return
|
|
|
|
completed_items = 0
|
|
for index, item in enumerate(items):
|
|
if self._is_cancel_requested(task_id):
|
|
self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items)
|
|
return
|
|
try:
|
|
if item["kind"] == "directory":
|
|
completed_items = self._move_directory_item(task_id, item, completed_items, total_items)
|
|
else:
|
|
file_entry = self._file_entries(item)[0]
|
|
completed_items = self._move_single_planned_file(
|
|
task_id,
|
|
file_entry,
|
|
completed_items,
|
|
total_items,
|
|
same_root=bool(item.get("same_root", True)),
|
|
)
|
|
if self._is_cancel_requested(task_id):
|
|
self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items)
|
|
return
|
|
except OSError as exc:
|
|
self._repository.mark_failed(
|
|
task_id=task_id,
|
|
error_code="io_error",
|
|
error_message=str(exc),
|
|
failed_item=str(item["source"]),
|
|
done_bytes=None,
|
|
total_bytes=None,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
)
|
|
self._update_history_failed(task_id, str(exc))
|
|
return
|
|
|
|
self._complete_or_cancel_item_task(
|
|
task_id=task_id,
|
|
done_items=total_items,
|
|
total_items=total_items,
|
|
)
|
|
|
|
def _run_duplicate_batch(self, task_id: str, items: list[dict[str, str]]) -> None:
|
|
total_items = self._total_file_count(items)
|
|
current_item = self._first_file_label(items)
|
|
if not self._repository.mark_running(
|
|
task_id=task_id,
|
|
done_items=0,
|
|
total_items=total_items,
|
|
current_item=current_item,
|
|
):
|
|
self._finalize_if_already_cancelled(task_id, done_items=0, total_items=total_items)
|
|
return
|
|
|
|
completed_items = 0
|
|
for index, item in enumerate(items):
|
|
if self._is_cancel_requested(task_id):
|
|
self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items)
|
|
return
|
|
try:
|
|
if item["kind"] == "directory":
|
|
completed_items = self._copy_directory_item(task_id, item, completed_items, total_items, cleanup_on_failure=True)
|
|
else:
|
|
file_entry = self._file_entries(item)[0]
|
|
completed_items = self._copy_single_planned_file(task_id, file_entry, completed_items, total_items)
|
|
if self._is_cancel_requested(task_id):
|
|
self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items)
|
|
return
|
|
except OSError as exc:
|
|
if item["kind"] == "directory":
|
|
self._cleanup_partial_duplicate(Path(str(item["destination"])))
|
|
else:
|
|
self._cleanup_partial_duplicate(Path(self._file_entries(item)[0]["destination"]))
|
|
self._repository.mark_failed(
|
|
task_id=task_id,
|
|
error_code="io_error",
|
|
error_message=str(exc),
|
|
failed_item=str(item["source"]),
|
|
done_bytes=None,
|
|
total_bytes=None,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
)
|
|
self._update_history_failed(task_id, str(exc))
|
|
return
|
|
|
|
self._complete_or_cancel_item_task(
|
|
task_id=task_id,
|
|
done_items=total_items,
|
|
total_items=total_items,
|
|
)
|
|
|
|
def _run_delete_path(self, task_id: str, item: dict[str, object]) -> None:
|
|
target = str(item["target"])
|
|
kind = str(item["kind"])
|
|
recursive = bool(item["recursive"])
|
|
files = list(item.get("files", [])) # type: ignore[arg-type]
|
|
directories = list(item.get("directories", [])) # type: ignore[arg-type]
|
|
total_items = int(item.get("progress_total_items", len(files)))
|
|
current_item = str(item.get("progress_label")) if item.get("progress_label") else None
|
|
if not self._repository.mark_running(
|
|
task_id=task_id,
|
|
done_items=0,
|
|
total_items=total_items,
|
|
current_item=current_item,
|
|
):
|
|
self._finalize_if_already_cancelled(task_id, done_items=0, total_items=total_items)
|
|
return
|
|
|
|
completed_items = 0
|
|
try:
|
|
if kind == "file":
|
|
file_entry = files[0]
|
|
completed_items = self._delete_planned_file(task_id, file_entry, completed_items, total_items)
|
|
elif recursive:
|
|
for file_entry in files:
|
|
if self._is_cancel_requested(task_id):
|
|
self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items)
|
|
return
|
|
completed_items = self._delete_planned_file(task_id, file_entry, completed_items, total_items)
|
|
if self._is_cancel_requested(task_id):
|
|
self._finalize_cancelled(task_id, done_items=completed_items, total_items=total_items)
|
|
return
|
|
for directory in directories:
|
|
self._filesystem.delete_empty_directory(Path(directory))
|
|
else:
|
|
self._filesystem.delete_empty_directory(Path(target))
|
|
self._complete_or_cancel_item_task(
|
|
task_id=task_id,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
)
|
|
except OSError as exc:
|
|
task = self._repository.get_task(task_id)
|
|
self._repository.mark_failed(
|
|
task_id=task_id,
|
|
error_code="io_error",
|
|
error_message=str(exc),
|
|
failed_item=(task.get("current_item") if task else None) or target,
|
|
done_bytes=None,
|
|
total_bytes=None,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
)
|
|
self._update_history_failed(task_id, str(exc))
|
|
|
|
def _cleanup_partial_duplicate(self, path: Path) -> None:
|
|
if not path.exists():
|
|
return
|
|
if path.is_dir():
|
|
shutil.rmtree(path)
|
|
return
|
|
path.unlink()
|
|
|
|
@staticmethod
|
|
def _file_entries(item: dict[str, object]) -> list[dict[str, str]]:
|
|
return list(item.get("files", [])) # type: ignore[arg-type]
|
|
|
|
@staticmethod
|
|
def _directory_entries(item: dict[str, object]) -> list[dict[str, str]]:
|
|
return list(item.get("directories", [])) # type: ignore[arg-type]
|
|
|
|
def _total_file_count(self, items: list[dict[str, object]]) -> int:
|
|
return sum(len(self._file_entries(item)) for item in items)
|
|
|
|
def _first_file_label(self, items: list[dict[str, object]]) -> str | None:
|
|
for item in items:
|
|
files = self._file_entries(item)
|
|
if files:
|
|
return files[0]["label"]
|
|
return None
|
|
|
|
def _completed_files(self, task_id: str) -> int:
|
|
task = self._repository.get_task(task_id)
|
|
if not task or task["done_items"] is None:
|
|
return 0
|
|
return int(task["done_items"])
|
|
|
|
@staticmethod
|
|
def _item_source_path(item: dict[str, object]) -> str:
|
|
value = item.get("source")
|
|
if isinstance(value, str):
|
|
return value
|
|
return str(item["source_absolute"])
|
|
|
|
@staticmethod
|
|
def _item_destination_path(item: dict[str, object]) -> str:
|
|
value = item.get("destination")
|
|
if isinstance(value, str):
|
|
return value
|
|
return str(item["destination_absolute"])
|
|
|
|
def _total_move_work_count(self, items: list[dict[str, object]]) -> int:
|
|
total = 0
|
|
for item in items:
|
|
progress_total_items = item.get("progress_total_items")
|
|
if progress_total_items is not None:
|
|
total += int(progress_total_items)
|
|
continue
|
|
total += len(self._file_entries(item))
|
|
return total
|
|
|
|
def _first_move_item_label(self, items: list[dict[str, object]]) -> str | None:
|
|
for item in items:
|
|
progress_label = item.get("progress_label")
|
|
if isinstance(progress_label, str) and progress_label:
|
|
return progress_label
|
|
files = self._file_entries(item)
|
|
if files:
|
|
return files[0]["label"]
|
|
return None
|
|
|
|
def _copy_single_planned_file(
|
|
self,
|
|
task_id: str,
|
|
file_entry: dict[str, str],
|
|
completed_items: int,
|
|
total_items: int,
|
|
) -> int:
|
|
self._repository.update_progress(
|
|
task_id=task_id,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
current_item=file_entry["label"],
|
|
)
|
|
self._filesystem.copy_file(source=file_entry["source"], destination=file_entry["destination"])
|
|
completed_items += 1
|
|
self._repository.update_progress(
|
|
task_id=task_id,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
current_item=self._next_item_label_after_completion(completed_items, total_items, file_entry["label"]),
|
|
)
|
|
return completed_items
|
|
|
|
def _copy_directory_item(
|
|
self,
|
|
task_id: str,
|
|
item: dict[str, object],
|
|
completed_items: int,
|
|
total_items: int,
|
|
cleanup_on_failure: bool = False,
|
|
) -> int:
|
|
directories = self._directory_entries(item)
|
|
files = self._file_entries(item)
|
|
try:
|
|
return self._copy_directory_files(directories, files, task_id=task_id, completed_items=completed_items, total_items=total_items)
|
|
except Exception:
|
|
if cleanup_on_failure:
|
|
self._cleanup_partial_duplicate(Path(str(item["destination"])))
|
|
raise
|
|
|
|
def _copy_directory_files(
|
|
self,
|
|
directories: list[dict[str, str]],
|
|
files: list[dict[str, str]],
|
|
*,
|
|
task_id: str | None = None,
|
|
completed_items: int = 0,
|
|
total_items: int = 0,
|
|
) -> int:
|
|
for directory in directories:
|
|
Path(directory["destination"]).mkdir(parents=True, exist_ok=True)
|
|
for file_entry in files:
|
|
if task_id is not None and self._is_cancel_requested(task_id):
|
|
return completed_items
|
|
if task_id is not None:
|
|
self._repository.update_progress(
|
|
task_id=task_id,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
current_item=file_entry["label"],
|
|
)
|
|
self._filesystem.copy_file(source=file_entry["source"], destination=file_entry["destination"])
|
|
completed_items += 1
|
|
if task_id is not None:
|
|
self._repository.update_progress(
|
|
task_id=task_id,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
current_item=self._next_item_label_after_completion(completed_items, total_items, file_entry["label"]),
|
|
)
|
|
if task_id is not None and self._is_cancel_requested(task_id):
|
|
return completed_items
|
|
for directory in reversed(directories):
|
|
shutil.copystat(Path(directory["source"]), Path(directory["destination"]), follow_symlinks=False)
|
|
return completed_items
|
|
|
|
def _move_single_planned_file(
|
|
self,
|
|
task_id: str,
|
|
file_entry: dict[str, str],
|
|
completed_items: int,
|
|
total_items: int,
|
|
*,
|
|
same_root: bool,
|
|
) -> int:
|
|
self._repository.update_progress(
|
|
task_id=task_id,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
current_item=file_entry["label"],
|
|
)
|
|
try:
|
|
if same_root:
|
|
self._filesystem.move_file(source=file_entry["source"], destination=file_entry["destination"])
|
|
else:
|
|
self._filesystem.copy_file(source=file_entry["source"], destination=file_entry["destination"])
|
|
self._filesystem.delete_file(Path(file_entry["source"]))
|
|
except OSError as exc:
|
|
if same_root and exc.errno == errno.EXDEV:
|
|
self._filesystem.copy_file(source=file_entry["source"], destination=file_entry["destination"])
|
|
self._filesystem.delete_file(Path(file_entry["source"]))
|
|
else:
|
|
raise
|
|
completed_items += 1
|
|
self._repository.update_progress(
|
|
task_id=task_id,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
current_item=self._next_item_label_after_completion(completed_items, total_items, file_entry["label"]),
|
|
)
|
|
return completed_items
|
|
|
|
def _delete_planned_file(
|
|
self,
|
|
task_id: str,
|
|
file_entry: dict[str, str],
|
|
completed_items: int,
|
|
total_items: int,
|
|
) -> int:
|
|
self._repository.update_progress(
|
|
task_id=task_id,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
current_item=file_entry["label"],
|
|
)
|
|
self._filesystem.delete_file(Path(file_entry["path"]))
|
|
completed_items += 1
|
|
self._repository.update_progress(
|
|
task_id=task_id,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
current_item=self._next_item_label_after_completion(completed_items, total_items, file_entry["label"]),
|
|
)
|
|
return completed_items
|
|
|
|
def _move_directory_item(
|
|
self,
|
|
task_id: str,
|
|
item: dict[str, object],
|
|
completed_items: int,
|
|
total_items: int,
|
|
) -> int:
|
|
progress_total_items = int(item.get("progress_total_items", 1))
|
|
source_path = self._item_source_path(item)
|
|
destination_path = self._item_destination_path(item)
|
|
progress_label = str(item.get("progress_label") or Path(source_path).name)
|
|
self._repository.update_progress(
|
|
task_id=task_id,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
current_item=progress_label,
|
|
)
|
|
self._filesystem.move_directory(source=source_path, destination=destination_path)
|
|
completed_items += progress_total_items
|
|
self._repository.update_progress(
|
|
task_id=task_id,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
current_item=self._next_item_label_after_completion(completed_items, total_items, progress_label),
|
|
)
|
|
return completed_items
|
|
|
|
@staticmethod
|
|
def _next_item_label_after_completion(completed_items: int, total_items: int, current_label: str) -> str | None:
|
|
return None
|
|
|
|
def _is_cancel_requested(self, task_id: str) -> bool:
|
|
task = self._repository.get_task(task_id)
|
|
return bool(task) and task["status"] == "cancelling"
|
|
|
|
def _finalize_if_already_cancelled(
|
|
self,
|
|
task_id: str,
|
|
*,
|
|
done_bytes: int | None = None,
|
|
total_bytes: int | None = None,
|
|
done_items: int | None = None,
|
|
total_items: int | None = None,
|
|
) -> None:
|
|
task = self._repository.get_task(task_id)
|
|
if task and task["status"] == "cancelled":
|
|
self._update_history_cancelled(task_id)
|
|
return
|
|
if task and task["status"] == "cancelling":
|
|
self._finalize_cancelled(
|
|
task_id,
|
|
done_bytes=done_bytes,
|
|
total_bytes=total_bytes,
|
|
done_items=done_items,
|
|
total_items=total_items,
|
|
)
|
|
|
|
def _complete_or_cancel_file_task(
|
|
self,
|
|
*,
|
|
task_id: str,
|
|
done_bytes: int | None,
|
|
total_bytes: int | None,
|
|
done_items: int | None = None,
|
|
total_items: int | None = None,
|
|
) -> None:
|
|
if self._is_cancel_requested(task_id):
|
|
self._finalize_cancelled(task_id, done_bytes=done_bytes, total_bytes=total_bytes, done_items=done_items, total_items=total_items)
|
|
return
|
|
if self._repository.mark_completed(
|
|
task_id=task_id,
|
|
done_bytes=done_bytes,
|
|
total_bytes=total_bytes,
|
|
done_items=done_items,
|
|
total_items=total_items,
|
|
):
|
|
self._update_history_completed(task_id)
|
|
return
|
|
self._finalize_if_already_cancelled(task_id, done_bytes=done_bytes, total_bytes=total_bytes, done_items=done_items, total_items=total_items)
|
|
|
|
def _complete_or_cancel_item_task(
|
|
self,
|
|
*,
|
|
task_id: str,
|
|
done_items: int | None,
|
|
total_items: int | None,
|
|
) -> None:
|
|
if self._is_cancel_requested(task_id):
|
|
self._finalize_cancelled(task_id, done_items=done_items, total_items=total_items)
|
|
return
|
|
if self._repository.mark_completed(
|
|
task_id=task_id,
|
|
done_items=done_items,
|
|
total_items=total_items,
|
|
):
|
|
self._update_history_completed(task_id)
|
|
return
|
|
self._finalize_if_already_cancelled(task_id, done_items=done_items, total_items=total_items)
|
|
|
|
def _finalize_cancelled(
|
|
self,
|
|
task_id: str,
|
|
*,
|
|
done_bytes: int | None = None,
|
|
total_bytes: int | None = None,
|
|
done_items: int | None = None,
|
|
total_items: int | None = None,
|
|
) -> None:
|
|
if self._repository.finalize_cancelled(
|
|
task_id=task_id,
|
|
done_bytes=done_bytes,
|
|
total_bytes=total_bytes,
|
|
done_items=done_items,
|
|
total_items=total_items,
|
|
):
|
|
self._update_history_cancelled(task_id)
|
|
|
|
def _update_history_completed(self, task_id: str) -> None:
|
|
if self._history_repository:
|
|
self._history_repository.update_entry(entry_id=task_id, status="completed")
|
|
|
|
def _update_history_failed(self, task_id: str, error_message: str) -> None:
|
|
if self._history_repository:
|
|
self._history_repository.update_entry(
|
|
entry_id=task_id,
|
|
status="failed",
|
|
error_code="io_error",
|
|
error_message=error_message,
|
|
)
|
|
|
|
def _update_history_cancelled(self, task_id: str) -> None:
|
|
if self._history_repository:
|
|
self._history_repository.update_entry(entry_id=task_id, status="cancelled")
|