Files
webmanager-mvp/webui/backend/app/services/delete_task_service.py
T
2026-03-15 11:52:39 +01:00

104 lines
3.6 KiB
Python

from __future__ import annotations
import uuid
from datetime import datetime, timezone
from backend.app.api.errors import AppError
from backend.app.api.schemas import TaskCreateResponse
from backend.app.db.history_repository import HistoryRepository
from backend.app.db.task_repository import TaskRepository
from backend.app.security.path_guard import PathGuard
from backend.app.tasks_runner import TaskRunner
class DeleteTaskService:
def __init__(
self,
path_guard: PathGuard,
repository: TaskRepository,
runner: TaskRunner,
history_repository: HistoryRepository | None = None,
):
self._path_guard = path_guard
self._repository = repository
self._runner = runner
self._history_repository = history_repository
def create_delete_task(self, path: str, recursive: bool = False) -> TaskCreateResponse:
try:
resolved_target = self._path_guard.resolve_existing_path(path)
if resolved_target.absolute.is_file():
kind = "file"
elif resolved_target.absolute.is_dir():
kind = "directory"
if not recursive and any(resolved_target.absolute.iterdir()):
raise AppError(
code="directory_not_empty",
message="Directory is not empty",
status_code=409,
details={"path": resolved_target.relative},
)
else:
raise AppError(
code="type_conflict",
message="Unsupported path type for delete",
status_code=409,
details={"path": resolved_target.relative},
)
task_id = str(uuid.uuid4())
task = self._repository.create_task(
operation="delete",
source=resolved_target.relative,
destination="",
task_id=task_id,
)
self._record_history(
entry_id=task_id,
operation="delete",
status="queued",
path=resolved_target.relative,
)
self._runner.enqueue_delete_path(
task_id=task["id"],
target=str(resolved_target.absolute),
kind=kind,
recursive=recursive,
)
return TaskCreateResponse(task_id=task["id"], status=task["status"])
except AppError as exc:
self._record_history(
operation="delete",
status="failed",
path=path,
error_code=exc.code,
error_message=exc.message,
finished_at=self._now_iso(),
)
raise
except OSError as exc:
error = AppError(
code="io_error",
message="Filesystem operation failed",
status_code=500,
details={"reason": str(exc)},
)
self._record_history(
operation="delete",
status="failed",
path=path,
error_code=error.code,
error_message=error.message,
finished_at=self._now_iso(),
)
raise error
def _record_history(self, **kwargs) -> None:
if self._history_repository:
self._history_repository.create_entry(**kwargs)
@staticmethod
def _now_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")