438 lines
16 KiB
Python
438 lines
16 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import shutil
|
|
import threading
|
|
from pathlib import Path
|
|
|
|
from backend.app.db.history_repository import HistoryRepository
|
|
from backend.app.db.task_repository import TaskRepository
|
|
from backend.app.fs.filesystem_adapter import FilesystemAdapter
|
|
|
|
|
|
class TaskRunner:
|
|
def __init__(self, repository: TaskRepository, filesystem: FilesystemAdapter, history_repository: HistoryRepository | None = None):
|
|
self._repository = repository
|
|
self._filesystem = filesystem
|
|
self._history_repository = history_repository
|
|
|
|
def enqueue_copy_file(self, task_id: str, source: str, destination: str, total_bytes: int) -> None:
|
|
thread = threading.Thread(
|
|
target=self._run_copy_file,
|
|
args=(task_id, source, destination, total_bytes),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
def enqueue_copy_directory(self, task_id: str, source: str, destination: str) -> None:
|
|
thread = threading.Thread(
|
|
target=self._run_copy_directory,
|
|
args=(task_id, source, destination),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
def enqueue_copy_batch(self, task_id: str, items: list[dict[str, str]]) -> None:
|
|
thread = threading.Thread(
|
|
target=self._run_copy_batch,
|
|
args=(task_id, items),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
def enqueue_move_file(
|
|
self,
|
|
task_id: str,
|
|
source: str,
|
|
destination: str,
|
|
total_bytes: int,
|
|
same_root: bool,
|
|
) -> None:
|
|
thread = threading.Thread(
|
|
target=self._run_move_file,
|
|
args=(task_id, source, destination, total_bytes, same_root),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
def enqueue_move_directory(self, task_id: str, source: str, destination: str) -> None:
|
|
thread = threading.Thread(
|
|
target=self._run_move_directory,
|
|
args=(task_id, source, destination),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
def enqueue_move_batch(self, task_id: str, items: list[dict[str, str]]) -> None:
|
|
thread = threading.Thread(
|
|
target=self._run_move_batch,
|
|
args=(task_id, items),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
def enqueue_duplicate_batch(self, task_id: str, items: list[dict[str, str]]) -> None:
|
|
thread = threading.Thread(
|
|
target=self._run_duplicate_batch,
|
|
args=(task_id, items),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
def enqueue_archive_prepare(self, worker) -> None:
|
|
thread = threading.Thread(
|
|
target=worker,
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
def _run_copy_file(self, task_id: str, source: str, destination: str, total_bytes: int) -> None:
|
|
self._repository.mark_running(
|
|
task_id=task_id,
|
|
done_bytes=0,
|
|
total_bytes=total_bytes,
|
|
current_item=source,
|
|
)
|
|
|
|
progress = {"done": 0}
|
|
|
|
def on_progress(done_bytes: int) -> None:
|
|
progress["done"] = done_bytes
|
|
self._repository.update_progress(
|
|
task_id=task_id,
|
|
done_bytes=done_bytes,
|
|
total_bytes=total_bytes,
|
|
current_item=source,
|
|
)
|
|
|
|
try:
|
|
self._filesystem.copy_file(source=source, destination=destination, on_progress=on_progress)
|
|
self._repository.mark_completed(
|
|
task_id=task_id,
|
|
done_bytes=total_bytes,
|
|
total_bytes=total_bytes,
|
|
)
|
|
self._update_history_completed(task_id)
|
|
except OSError as exc:
|
|
self._repository.mark_failed(
|
|
task_id=task_id,
|
|
error_code="io_error",
|
|
error_message=str(exc),
|
|
failed_item=source,
|
|
done_bytes=progress["done"],
|
|
total_bytes=total_bytes,
|
|
)
|
|
self._update_history_failed(task_id, str(exc))
|
|
|
|
def _run_copy_directory(self, task_id: str, source: str, destination: str) -> None:
|
|
self._repository.mark_running(
|
|
task_id=task_id,
|
|
done_items=0,
|
|
total_items=1,
|
|
current_item=source,
|
|
)
|
|
|
|
try:
|
|
self._filesystem.copy_directory(source=source, destination=destination)
|
|
self._repository.mark_completed(
|
|
task_id=task_id,
|
|
done_items=1,
|
|
total_items=1,
|
|
)
|
|
self._update_history_completed(task_id)
|
|
except OSError as exc:
|
|
self._repository.mark_failed(
|
|
task_id=task_id,
|
|
error_code="io_error",
|
|
error_message=str(exc),
|
|
failed_item=source,
|
|
done_bytes=None,
|
|
total_bytes=None,
|
|
done_items=0,
|
|
total_items=1,
|
|
)
|
|
self._update_history_failed(task_id, str(exc))
|
|
|
|
def _run_copy_batch(self, task_id: str, items: list[dict[str, str]]) -> None:
|
|
total_items = len(items)
|
|
current_item = items[0]["source"] if items else None
|
|
self._repository.mark_running(
|
|
task_id=task_id,
|
|
done_items=0,
|
|
total_items=total_items,
|
|
current_item=current_item,
|
|
)
|
|
|
|
completed_items = 0
|
|
for index, item in enumerate(items):
|
|
source = item["source"]
|
|
destination = item["destination"]
|
|
try:
|
|
if item["kind"] == "directory":
|
|
self._filesystem.copy_directory(source=source, destination=destination)
|
|
else:
|
|
self._filesystem.copy_file(source=source, destination=destination)
|
|
completed_items = index + 1
|
|
next_item = items[index + 1]["source"] if index + 1 < total_items else source
|
|
self._repository.update_progress(
|
|
task_id=task_id,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
current_item=next_item,
|
|
)
|
|
except OSError as exc:
|
|
self._repository.mark_failed(
|
|
task_id=task_id,
|
|
error_code="io_error",
|
|
error_message=str(exc),
|
|
failed_item=source,
|
|
done_bytes=None,
|
|
total_bytes=None,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
)
|
|
self._update_history_failed(task_id, str(exc))
|
|
return
|
|
|
|
self._repository.mark_completed(
|
|
task_id=task_id,
|
|
done_items=total_items,
|
|
total_items=total_items,
|
|
)
|
|
self._update_history_completed(task_id)
|
|
|
|
def _run_move_file(
|
|
self,
|
|
task_id: str,
|
|
source: str,
|
|
destination: str,
|
|
total_bytes: int,
|
|
same_root: bool,
|
|
) -> None:
|
|
self._repository.mark_running(
|
|
task_id=task_id,
|
|
done_bytes=0,
|
|
total_bytes=total_bytes,
|
|
current_item=source,
|
|
)
|
|
|
|
progress = {"done": 0}
|
|
|
|
try:
|
|
if same_root:
|
|
self._filesystem.move_file(source=source, destination=destination)
|
|
self._repository.mark_completed(
|
|
task_id=task_id,
|
|
done_bytes=total_bytes,
|
|
total_bytes=total_bytes,
|
|
)
|
|
self._update_history_completed(task_id)
|
|
return
|
|
|
|
def on_progress(done_bytes: int) -> None:
|
|
progress["done"] = done_bytes
|
|
self._repository.update_progress(
|
|
task_id=task_id,
|
|
done_bytes=done_bytes,
|
|
total_bytes=total_bytes,
|
|
current_item=source,
|
|
)
|
|
|
|
self._filesystem.copy_file(source=source, destination=destination, on_progress=on_progress)
|
|
self._filesystem.delete_file(Path(source))
|
|
self._repository.mark_completed(
|
|
task_id=task_id,
|
|
done_bytes=total_bytes,
|
|
total_bytes=total_bytes,
|
|
)
|
|
self._update_history_completed(task_id)
|
|
except OSError as exc:
|
|
self._repository.mark_failed(
|
|
task_id=task_id,
|
|
error_code="io_error",
|
|
error_message=str(exc),
|
|
failed_item=source,
|
|
done_bytes=progress["done"],
|
|
total_bytes=total_bytes,
|
|
)
|
|
self._update_history_failed(task_id, str(exc))
|
|
|
|
def _run_move_directory(self, task_id: str, source: str, destination: str) -> None:
|
|
self._repository.mark_running(
|
|
task_id=task_id,
|
|
done_items=0,
|
|
total_items=1,
|
|
current_item=source,
|
|
)
|
|
|
|
try:
|
|
self._filesystem.move_directory(source=source, destination=destination)
|
|
self._repository.mark_completed(
|
|
task_id=task_id,
|
|
done_items=1,
|
|
total_items=1,
|
|
)
|
|
self._update_history_completed(task_id)
|
|
except OSError as exc:
|
|
self._repository.mark_failed(
|
|
task_id=task_id,
|
|
error_code="io_error",
|
|
error_message=str(exc),
|
|
failed_item=source,
|
|
done_items=0,
|
|
total_items=1,
|
|
)
|
|
self._update_history_failed(task_id, str(exc))
|
|
|
|
def _run_move_batch(self, task_id: str, items: list[dict[str, str]]) -> None:
|
|
total_items = len(items)
|
|
current_item = items[0]["source"] if items else None
|
|
self._repository.mark_running(
|
|
task_id=task_id,
|
|
done_items=0,
|
|
total_items=total_items,
|
|
current_item=current_item,
|
|
)
|
|
|
|
completed_items = 0
|
|
for index, item in enumerate(items):
|
|
source = item["source"]
|
|
destination = item["destination"]
|
|
try:
|
|
if item["kind"] == "directory":
|
|
self._filesystem.move_directory(source=source, destination=destination)
|
|
else:
|
|
self._filesystem.move_file(source=source, destination=destination)
|
|
completed_items = index + 1
|
|
next_item = items[index + 1]["source"] if index + 1 < total_items else source
|
|
self._repository.update_progress(
|
|
task_id=task_id,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
current_item=next_item,
|
|
)
|
|
except OSError as exc:
|
|
self._repository.mark_failed(
|
|
task_id=task_id,
|
|
error_code="io_error",
|
|
error_message=str(exc),
|
|
failed_item=source,
|
|
done_bytes=None,
|
|
total_bytes=None,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
)
|
|
self._update_history_failed(task_id, str(exc))
|
|
return
|
|
|
|
self._repository.mark_completed(
|
|
task_id=task_id,
|
|
done_items=total_items,
|
|
total_items=total_items,
|
|
)
|
|
self._update_history_completed(task_id)
|
|
|
|
def _run_duplicate_batch(self, task_id: str, items: list[dict[str, str]]) -> None:
|
|
total_items = len(items)
|
|
current_item = items[0]["source"] if items else None
|
|
self._repository.mark_running(
|
|
task_id=task_id,
|
|
done_items=0,
|
|
total_items=total_items,
|
|
current_item=current_item,
|
|
)
|
|
|
|
completed_items = 0
|
|
for index, item in enumerate(items):
|
|
source = item["source"]
|
|
destination = item["destination"]
|
|
try:
|
|
if item["kind"] == "directory":
|
|
self._duplicate_directory(source=Path(source), destination=Path(destination))
|
|
else:
|
|
self._filesystem.copy_file(source=source, destination=destination)
|
|
completed_items = index + 1
|
|
next_item = items[index + 1]["source"] if index + 1 < total_items else source
|
|
self._repository.update_progress(
|
|
task_id=task_id,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
current_item=next_item,
|
|
)
|
|
except OSError as exc:
|
|
self._cleanup_partial_duplicate(Path(destination))
|
|
self._repository.mark_failed(
|
|
task_id=task_id,
|
|
error_code="io_error",
|
|
error_message=str(exc),
|
|
failed_item=source,
|
|
done_bytes=None,
|
|
total_bytes=None,
|
|
done_items=completed_items,
|
|
total_items=total_items,
|
|
)
|
|
self._update_history_failed(task_id, str(exc))
|
|
return
|
|
|
|
self._repository.mark_completed(
|
|
task_id=task_id,
|
|
done_items=total_items,
|
|
total_items=total_items,
|
|
)
|
|
self._update_history_completed(task_id)
|
|
|
|
def _duplicate_directory(self, source: Path, destination: Path) -> None:
|
|
destination.mkdir()
|
|
copied_directories: list[tuple[Path, Path]] = [(source, destination)]
|
|
try:
|
|
for root, dirnames, filenames in os.walk(source, topdown=True, followlinks=False):
|
|
root_path = Path(root)
|
|
target_root = destination / root_path.relative_to(source)
|
|
dirnames[:] = [name for name in dirnames if not name.startswith("._")]
|
|
|
|
for name in dirnames:
|
|
source_dir = root_path / name
|
|
if source_dir.is_symlink():
|
|
raise OSError("Source directory must not contain symlinks")
|
|
target_dir = target_root / name
|
|
target_dir.mkdir()
|
|
copied_directories.append((source_dir, target_dir))
|
|
|
|
for name in filenames:
|
|
if name.startswith("._"):
|
|
continue
|
|
source_file = root_path / name
|
|
if source_file.is_symlink():
|
|
raise OSError("Source directory must not contain symlinks")
|
|
self._filesystem.copy_file(
|
|
source=str(source_file),
|
|
destination=str(target_root / name),
|
|
)
|
|
|
|
for source_dir, target_dir in reversed(copied_directories):
|
|
shutil.copystat(source_dir, target_dir, follow_symlinks=False)
|
|
except Exception:
|
|
self._cleanup_partial_duplicate(destination)
|
|
raise
|
|
|
|
def _cleanup_partial_duplicate(self, path: Path) -> None:
|
|
if not path.exists():
|
|
return
|
|
if path.is_dir():
|
|
shutil.rmtree(path)
|
|
return
|
|
path.unlink()
|
|
|
|
def _update_history_completed(self, task_id: str) -> None:
|
|
if self._history_repository:
|
|
self._history_repository.update_entry(entry_id=task_id, status="completed")
|
|
|
|
def _update_history_failed(self, task_id: str, error_message: str) -> None:
|
|
if self._history_repository:
|
|
self._history_repository.update_entry(
|
|
entry_id=task_id,
|
|
status="failed",
|
|
error_code="io_error",
|
|
error_message=error_message,
|
|
)
|