feat: B3 uit voor veilige archive-downloads - cancel knop toegevoegd

This commit is contained in:
kodi
2026-03-14 14:39:57 +01:00
parent d463b3977d
commit 2981ac2796
24 changed files with 471 additions and 37 deletions
+9 -1
View File
@@ -4,7 +4,7 @@ from fastapi import APIRouter, Depends, File, Form, Query, Request, UploadFile
from fastapi.responses import StreamingResponse
from starlette.background import BackgroundTask
from backend.app.api.schemas import ArchivePrepareRequest, DeleteRequest, DeleteResponse, FileInfoResponse, MkdirRequest, MkdirResponse, RenameRequest, RenameResponse, SaveRequest, SaveResponse, TaskCreateResponse, UploadResponse, ViewResponse
from backend.app.api.schemas import ArchivePrepareRequest, DeleteRequest, DeleteResponse, FileInfoResponse, MkdirRequest, MkdirResponse, RenameRequest, RenameResponse, SaveRequest, SaveResponse, TaskCreateResponse, TaskDetailResponse, UploadResponse, ViewResponse
from backend.app.dependencies import get_archive_download_task_service, get_file_ops_service
from backend.app.services.archive_download_task_service import ArchiveDownloadTaskService
from backend.app.services.file_ops_service import FileOpsService
@@ -100,6 +100,14 @@ async def archive_download(
)
@router.post("/download/archive/{task_id}/cancel", response_model=TaskDetailResponse)
async def archive_cancel(
task_id: str,
service: ArchiveDownloadTaskService = Depends(get_archive_download_task_service),
) -> TaskDetailResponse:
return TaskDetailResponse(**service.cancel_archive_prepare_task(task_id=task_id))
@router.get("/video")
async def video(
path: str,
+1 -1
View File
@@ -6,7 +6,7 @@ from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
VALID_HISTORY_STATUSES = {"queued", "completed", "failed", "requested", "ready", "preflight_failed"}
VALID_HISTORY_STATUSES = {"queued", "completed", "failed", "requested", "ready", "preflight_failed", "cancelled"}
VALID_HISTORY_OPERATIONS = {"mkdir", "rename", "delete", "copy", "move", "upload", "download"}
+59 -9
View File
@@ -6,7 +6,7 @@ from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
VALID_STATUSES = {"queued", "running", "completed", "failed", "requested", "preparing", "ready"}
VALID_STATUSES = {"queued", "running", "completed", "failed", "requested", "preparing", "ready", "cancelled"}
VALID_OPERATIONS = {"copy", "move", "download"}
TASK_MIGRATION_COLUMNS: dict[str, str] = {
"operation": "TEXT NOT NULL DEFAULT 'copy'",
@@ -160,17 +160,18 @@ class TaskRepository:
done_items: int | None = None,
total_items: int | None = None,
current_item: str | None = None,
) -> None:
) -> bool:
started_at = self._now_iso()
with self._connection() as conn:
conn.execute(
cursor = conn.execute(
"""
UPDATE tasks
SET status = ?, started_at = COALESCE(started_at, ?), done_items = ?, total_items = ?, current_item = ?
WHERE id = ?
WHERE id = ? AND status = ?
""",
("preparing", started_at, done_items, total_items, current_item, task_id),
("preparing", started_at, done_items, total_items, current_item, task_id, "requested"),
)
return cursor.rowcount > 0
def update_progress(
self,
@@ -215,17 +216,18 @@ class TaskRepository:
task_id: str,
done_items: int | None = None,
total_items: int | None = None,
) -> None:
) -> bool:
finished_at = self._now_iso()
with self._connection() as conn:
conn.execute(
cursor = conn.execute(
"""
UPDATE tasks
SET status = ?, finished_at = ?, done_items = ?, total_items = ?, current_item = NULL
WHERE id = ?
WHERE id = ? AND status = ?
""",
("ready", finished_at, done_items, total_items, task_id),
("ready", finished_at, done_items, total_items, task_id, "preparing"),
)
return cursor.rowcount > 0
def mark_failed(
self,
@@ -260,6 +262,54 @@ class TaskRepository:
),
)
def mark_failed_if_not_cancelled(
self,
task_id: str,
error_code: str,
error_message: str,
failed_item: str | None,
done_bytes: int | None,
total_bytes: int | None,
done_items: int | None = None,
total_items: int | None = None,
) -> bool:
finished_at = self._now_iso()
with self._connection() as conn:
cursor = conn.execute(
"""
UPDATE tasks
SET status = ?, finished_at = ?, error_code = ?, error_message = ?, failed_item = ?, done_bytes = ?, total_bytes = ?, done_items = ?, total_items = ?, current_item = NULL
WHERE id = ? AND status != ?
""",
(
"failed",
finished_at,
error_code,
error_message,
failed_item,
done_bytes,
total_bytes,
done_items,
total_items,
task_id,
"cancelled",
),
)
return cursor.rowcount > 0
def mark_cancelled(self, task_id: str) -> bool:
finished_at = self._now_iso()
with self._connection() as conn:
cursor = conn.execute(
"""
UPDATE tasks
SET status = ?, finished_at = ?, current_item = NULL
WHERE id = ? AND status IN (?, ?)
""",
("cancelled", finished_at, task_id, "requested", "preparing"),
)
return cursor.rowcount > 0
def _ensure_schema(self) -> None:
db_path = Path(self._db_path)
if db_path.parent and str(db_path.parent) not in {"", "."}:
@@ -17,6 +17,10 @@ from backend.app.tasks_runner import TaskRunner
ARCHIVE_DOWNLOAD_TTL_SECONDS = 30 * 60
class ArchivePrepareCancelled(Exception):
pass
class ArchiveDownloadTaskService:
def __init__(
self,
@@ -103,6 +107,13 @@ class ArchiveDownloadTaskService:
status_code=400,
details={"task_id": task_id},
)
if task["status"] == "cancelled":
raise AppError(
code="download_cancelled",
message="Archive download was cancelled",
status_code=409,
details={"task_id": task_id},
)
if task["status"] != "ready":
raise AppError(
code="download_not_ready",
@@ -147,6 +158,58 @@ class ArchiveDownloadTaskService:
"content_type": "application/zip",
}
def cancel_archive_prepare_task(self, task_id: str) -> dict:
self.sweep_artifacts()
task = self._repository.get_task(task_id)
if not task:
raise AppError(
code="task_not_found",
message="Task was not found",
status_code=404,
details={"task_id": task_id},
)
if task["operation"] != "download":
raise AppError(
code="invalid_request",
message="Task is not an archive download",
status_code=400,
details={"task_id": task_id},
)
if task["status"] == "ready":
raise AppError(
code="download_not_cancellable",
message="Archive download is already ready",
status_code=409,
details={"task_id": task_id, "status": task["status"]},
)
if task["status"] in {"failed", "cancelled"}:
raise AppError(
code="download_not_cancellable",
message="Archive download cannot be cancelled",
status_code=409,
details={"task_id": task_id, "status": task["status"]},
)
if not self._repository.mark_cancelled(task_id):
current = self._repository.get_task(task_id)
current_status = current["status"] if current else task["status"]
raise AppError(
code="download_not_cancellable",
message="Archive download cannot be cancelled",
status_code=409,
details={"task_id": task_id, "status": current_status},
)
self._cleanup_task_artifacts(task_id)
self._update_history_cancelled(task_id)
cancelled_task = self._repository.get_task(task_id)
if not cancelled_task:
raise AppError(
code="task_not_found",
message="Task was not found",
status_code=404,
details={"task_id": task_id},
)
return cancelled_task
def sweep_artifacts(self) -> None:
self._artifact_root.mkdir(parents=True, exist_ok=True)
referenced_paths: set[Path] = set()
@@ -177,37 +240,59 @@ class ArchiveDownloadTaskService:
total_items = len(target_paths)
try:
self._repository.mark_preparing(
self._raise_if_cancelled(task_id)
if not self._repository.mark_preparing(
task_id=task_id,
done_items=0,
total_items=total_items,
current_item=target_paths[0] if target_paths else None,
)
):
self._raise_if_cancelled(task_id)
return
resolved_targets = [self._path_guard.resolve_existing_path(path) for path in target_paths]
self._raise_if_cancelled(task_id)
self._file_ops_service._validate_zip_download_archive_names(resolved_targets)
self._file_ops_service._run_zip_download_preflight(resolved_targets)
self._raise_if_cancelled(task_id)
with zipfile.ZipFile(partial_path, "w", compression=zipfile.ZIP_DEFLATED) as archive:
for resolved_target in resolved_targets:
self._file_ops_service._write_download_target_to_zip(archive, resolved_target)
for index, resolved_target in enumerate(resolved_targets):
self._raise_if_cancelled(task_id)
self._repository.update_progress(
task_id=task_id,
done_items=index,
total_items=total_items,
current_item=resolved_target.relative,
)
self._file_ops_service._write_download_target_to_zip(
archive,
resolved_target,
on_each_item=lambda: self._raise_if_cancelled(task_id),
)
self._raise_if_cancelled(task_id)
os.replace(partial_path, final_path)
self._raise_if_cancelled(task_id)
self._repository.upsert_artifact(
task_id=task_id,
file_path=str(final_path),
file_name=archive_name,
expires_at=self._expires_at_iso(),
)
self._repository.mark_ready(
if not self._repository.mark_ready(
task_id=task_id,
done_items=total_items,
total_items=total_items,
)
):
self._cleanup_task_artifacts(task_id)
self._raise_if_cancelled(task_id)
return
self._update_history_ready(task_id)
except ArchivePrepareCancelled:
self._cleanup_task_artifacts(task_id)
except AppError as exc:
self._delete_artifact_record_and_file(task_id, str(partial_path))
self._delete_artifact_record_and_file(task_id, str(final_path))
self._repository.mark_failed(
self._cleanup_task_artifacts(task_id)
if self._repository.mark_failed_if_not_cancelled(
task_id=task_id,
error_code=exc.code,
error_message=exc.message,
@@ -216,12 +301,11 @@ class ArchiveDownloadTaskService:
total_bytes=None,
done_items=0,
total_items=total_items,
)
self._update_history_failed(task_id, exc.code, exc.message)
):
self._update_history_failed(task_id, exc.code, exc.message)
except OSError as exc:
self._delete_artifact_record_and_file(task_id, str(partial_path))
self._delete_artifact_record_and_file(task_id, str(final_path))
self._repository.mark_failed(
self._cleanup_task_artifacts(task_id)
if self._repository.mark_failed_if_not_cancelled(
task_id=task_id,
error_code="io_error",
error_message=str(exc),
@@ -230,8 +314,12 @@ class ArchiveDownloadTaskService:
total_bytes=None,
done_items=0,
total_items=total_items,
)
self._update_history_failed(task_id, "io_error", str(exc))
):
self._update_history_failed(task_id, "io_error", str(exc))
def _cleanup_task_artifacts(self, task_id: str) -> None:
self._delete_artifact_record_and_file(task_id, str(self._artifact_root / f"{task_id}.partial.zip"))
self._delete_artifact_record_and_file(task_id, str(self._artifact_root / f"{task_id}.zip"))
def _delete_artifact_record_and_file(self, task_id: str, file_path: str) -> None:
self._repository.delete_artifact(task_id)
@@ -254,6 +342,10 @@ class ArchiveDownloadTaskService:
error_message=error_message,
)
def _update_history_cancelled(self, task_id: str) -> None:
if self._history_repository:
self._history_repository.update_entry(entry_id=task_id, status="cancelled")
def _record_history(self, **kwargs) -> None:
if self._history_repository:
self._history_repository.create_entry(**kwargs)
@@ -264,3 +356,8 @@ class ArchiveDownloadTaskService:
@staticmethod
def _is_expired(expires_at: str) -> bool:
return datetime.now(timezone.utc) >= datetime.fromisoformat(expires_at.replace("Z", "+00:00"))
def _raise_if_cancelled(self, task_id: str) -> None:
task = self._repository.get_task(task_id)
if task and task["status"] == "cancelled":
raise ArchivePrepareCancelled()
@@ -1005,14 +1005,18 @@ class FileOpsService:
details={"reason": reason, **details},
)
def _write_download_target_to_zip(self, archive: zipfile.ZipFile, resolved_target) -> None:
def _write_download_target_to_zip(self, archive: zipfile.ZipFile, resolved_target, on_each_item=None) -> None:
root_name = resolved_target.absolute.name
if resolved_target.absolute.is_file():
if on_each_item:
on_each_item()
archive.write(resolved_target.absolute, arcname=root_name)
return
archive.writestr(f"{root_name}/", b"")
for child in sorted(resolved_target.absolute.rglob("*")):
if on_each_item:
on_each_item()
arcname = f"{root_name}/{child.relative_to(resolved_target.absolute).as_posix()}"
if child.is_dir():
archive.writestr(f"{arcname}/", b"")