Files
webmanager-mvp/webui/backend/app/services/file_ops_service.py
T

1053 lines
40 KiB
Python

from __future__ import annotations
import os
import time
import zipfile
from dataclasses import dataclass
from datetime import datetime, timezone
from io import BytesIO
from pathlib import Path
from typing import Callable
from backend.app.api.errors import AppError
from backend.app.api.schemas import DeleteResponse, FileInfoResponse, MkdirResponse, RenameResponse, SaveResponse, UploadResponse, ViewResponse
from backend.app.db.history_repository import HistoryRepository
from backend.app.fs.filesystem_adapter import FilesystemAdapter
from backend.app.security.path_guard import PathGuard
TEXT_PREVIEW_MAX_BYTES = 256 * 1024
TEXT_EDIT_MAX_BYTES = 256 * 1024
TEXT_CONTENT_TYPES = {
".txt": "text/plain",
".log": "text/plain",
".md": "text/markdown",
".yml": "text/yaml",
".yaml": "text/yaml",
".json": "application/json",
".js": "text/javascript",
".py": "text/x-python",
".css": "text/css",
".html": "text/html",
}
SPECIAL_TEXT_FILENAMES = {
"dockerfile": "text/plain",
"containerfile": "text/plain",
}
THUMBNAIL_CONTENT_TYPES = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".webp": "image/webp",
}
IMAGE_CONTENT_TYPES = {
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".webp": "image/webp",
".gif": "image/gif",
".bmp": "image/bmp",
".avif": "image/avif",
}
VIDEO_CONTENT_TYPES = {
".mp4": "video/mp4",
".mkv": "video/x-matroska",
}
PDF_CONTENT_TYPES = {
".pdf": "application/pdf",
}
@dataclass(frozen=True)
class ZipDownloadPreflightLimits:
max_items: int = 1000
max_total_input_bytes: int = 2 * 1024 * 1024 * 1024
max_individual_file_bytes: int = 500 * 1024 * 1024
scan_timeout_seconds: float = 10.0
@dataclass
class ZipDownloadPreflightState:
item_count: int = 0
total_input_bytes: int = 0
ZIP_DOWNLOAD_PREFLIGHT_LIMITS = ZipDownloadPreflightLimits()
class FileOpsService:
def __init__(
self,
path_guard: PathGuard,
filesystem: FilesystemAdapter,
history_repository: HistoryRepository | None = None,
zip_download_preflight_limits: ZipDownloadPreflightLimits = ZIP_DOWNLOAD_PREFLIGHT_LIMITS,
monotonic: Callable[[], float] | None = None,
):
self._path_guard = path_guard
self._filesystem = filesystem
self._history_repository = history_repository
self._zip_download_preflight_limits = zip_download_preflight_limits
self._monotonic = monotonic or time.monotonic
def mkdir(self, parent_path: str, name: str) -> MkdirResponse:
try:
resolved_parent = self._path_guard.resolve_directory_path(parent_path)
safe_name = self._path_guard.validate_name(name, field="name")
target_relative = self._join_relative(resolved_parent.relative, safe_name)
resolved_target = self._path_guard.resolve_path(target_relative)
if resolved_target.absolute.exists():
raise AppError(
code="already_exists",
message="Target path already exists",
status_code=409,
details={"path": resolved_target.relative},
)
self._filesystem.make_directory(resolved_target.absolute)
self._record_history(operation="mkdir", status="completed", path=resolved_target.relative, finished_at=self._now_iso())
return MkdirResponse(path=resolved_target.relative)
except FileExistsError:
error = AppError(
code="already_exists",
message="Target path already exists",
status_code=409,
details={"path": self._join_relative(parent_path, name)},
)
self._record_history_error(operation="mkdir", path=self._join_relative(parent_path, name), error=error)
raise error
except AppError as exc:
self._record_history_error(operation="mkdir", path=self._join_relative(parent_path, name), error=exc)
raise
except OSError as exc:
error = AppError(
code="io_error",
message="Filesystem operation failed",
status_code=500,
details={"reason": str(exc)},
)
self._record_history_error(operation="mkdir", path=self._join_relative(parent_path, name), error=error)
raise error
def rename(self, path: str, new_name: str) -> RenameResponse:
try:
resolved_source = self._path_guard.resolve_existing_path(path)
safe_name = self._path_guard.validate_name(new_name, field="new_name")
parent_relative = self._path_guard.entry_relative_path(
resolved_source.alias,
resolved_source.absolute.parent,
display_style=resolved_source.display_style,
)
target_relative = self._join_relative(parent_relative, safe_name)
resolved_target = self._path_guard.resolve_path(target_relative)
if resolved_target.absolute.exists():
raise AppError(
code="already_exists",
message="Target path already exists",
status_code=409,
details={"path": resolved_target.relative},
)
self._filesystem.rename_path(resolved_source.absolute, resolved_target.absolute)
self._record_history(
operation="rename",
status="completed",
source=path,
destination=resolved_target.relative,
path=resolved_target.relative,
finished_at=self._now_iso(),
)
return RenameResponse(path=resolved_target.relative)
except FileNotFoundError:
error = AppError(
code="path_not_found",
message="Requested path was not found",
status_code=404,
details={"path": path},
)
self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=error)
raise error
except FileExistsError:
error = AppError(
code="already_exists",
message="Target path already exists",
status_code=409,
details={"path": new_name},
)
self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=error)
raise error
except AppError as exc:
self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=exc)
raise
except OSError as exc:
error = AppError(
code="io_error",
message="Filesystem operation failed",
status_code=500,
details={"reason": str(exc)},
)
self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=error)
raise error
def delete(self, path: str, recursive: bool = False) -> DeleteResponse:
try:
resolved_target = self._path_guard.resolve_existing_path(path)
if resolved_target.absolute.is_file():
self._filesystem.delete_file(resolved_target.absolute)
elif resolved_target.absolute.is_dir():
if not self._filesystem.is_directory_empty(resolved_target.absolute):
if not recursive:
raise AppError(
code="directory_not_empty",
message="Directory is not empty",
status_code=409,
details={"path": resolved_target.relative},
)
self._filesystem.delete_directory_recursive(resolved_target.absolute)
else:
self._filesystem.delete_empty_directory(resolved_target.absolute)
else:
raise AppError(
code="type_conflict",
message="Unsupported path type for delete",
status_code=409,
details={"path": resolved_target.relative},
)
self._record_history(operation="delete", status="completed", path=resolved_target.relative, finished_at=self._now_iso())
return DeleteResponse(path=resolved_target.relative)
except AppError as exc:
self._record_history_error(operation="delete", path=path, error=exc)
raise
except FileNotFoundError:
error = AppError(
code="path_not_found",
message="Requested path was not found",
status_code=404,
details={"path": path},
)
self._record_history_error(operation="delete", path=path, error=error)
raise error
except OSError as exc:
error = AppError(
code="io_error",
message="Filesystem operation failed",
status_code=500,
details={"reason": str(exc)},
)
self._record_history_error(operation="delete", path=path, error=error)
raise error
def upload(self, target_path: str, upload_file, overwrite: bool = False) -> UploadResponse:
destination_relative = None
history_path = target_path
try:
resolved_target = self._path_guard.resolve_directory_path(target_path)
filename = Path(upload_file.filename or "").name
safe_name = self._path_guard.validate_name(filename, field="name")
destination_relative = self._join_relative(resolved_target.relative, safe_name)
history_path = destination_relative
resolved_destination = self._path_guard.resolve_path(destination_relative)
if resolved_destination.absolute.exists():
if not overwrite:
raise AppError(
code="already_exists",
message="Target path already exists",
status_code=409,
details={"path": resolved_destination.relative},
)
if resolved_destination.absolute.is_dir():
raise AppError(
code="type_conflict",
message="Cannot overwrite an existing directory",
status_code=409,
details={"path": resolved_destination.relative},
)
saved = self._filesystem.write_uploaded_file(
resolved_destination.absolute,
upload_file.file,
overwrite=overwrite,
)
self._record_history(
operation="upload",
status="completed",
destination=resolved_destination.relative,
path=resolved_destination.relative,
finished_at=self._now_iso(),
)
return UploadResponse(
path=resolved_destination.relative,
size=saved["size"],
modified=saved["modified"],
)
except AppError as exc:
self._record_history_error(
operation="upload",
destination=destination_relative,
path=history_path,
error=exc,
)
raise
except OSError as exc:
error = AppError(
code="io_error",
message="Filesystem operation failed",
status_code=500,
details={"reason": str(exc)},
)
self._record_history_error(
operation="upload",
destination=destination_relative,
path=history_path,
error=error,
)
raise error
def view(self, path: str, for_edit: bool = False) -> ViewResponse:
resolved_target = self._path_guard.resolve_existing_path(path)
if resolved_target.absolute.is_dir():
raise AppError(
code="type_conflict",
message="Source must be a file",
status_code=409,
details={"path": resolved_target.relative},
)
if not resolved_target.absolute.is_file():
raise AppError(
code="type_conflict",
message="Unsupported path type for view",
status_code=409,
details={"path": resolved_target.relative},
)
content_type = self._content_type_for(resolved_target.absolute)
if content_type is None:
raise AppError(
code="unsupported_type",
message="File type is not supported for preview",
status_code=409,
details={"path": resolved_target.relative},
)
if for_edit and resolved_target.absolute.stat().st_size > TEXT_EDIT_MAX_BYTES:
raise AppError(
code="file_too_large",
message="File is too large for edit",
status_code=409,
details={"path": resolved_target.relative},
)
try:
preview = self._filesystem.read_text_preview(
resolved_target.absolute,
max_bytes=TEXT_PREVIEW_MAX_BYTES,
encoding="utf-8",
)
except OSError as exc:
raise AppError(
code="io_error",
message="Filesystem operation failed",
status_code=500,
details={"reason": str(exc)},
)
return ViewResponse(
path=resolved_target.relative,
name=resolved_target.absolute.name,
content_type=content_type,
encoding="utf-8",
truncated=preview["truncated"],
size=preview["size"],
modified=preview["modified"],
content=preview["content"],
)
def info(self, path: str) -> FileInfoResponse:
resolved_target = self._path_guard.resolve_existing_path(path)
metadata = self._filesystem.stat_info(resolved_target.absolute)
return FileInfoResponse(
name=metadata["name"],
path=resolved_target.relative,
type="directory" if resolved_target.absolute.is_dir() else "file",
size=metadata["size"],
modified=metadata["modified"],
root=resolved_target.alias,
extension=metadata["extension"],
content_type=metadata["content_type"],
owner=metadata["owner"],
group=metadata["group"],
width=metadata["width"],
height=metadata["height"],
)
def prepare_download(self, paths: list[str]) -> dict:
history_entry_id: str | None = None
history_mode = self._download_mode_from_request_paths(paths)
history_path = self._summarize_download_targets(paths)
history_download_name: str | None = None
if not paths:
error = AppError(
code="invalid_request",
message="At least one path is required",
status_code=400,
)
self._record_download_failure(
mode=history_mode,
path_summary=history_path,
download_name=None,
error=error,
history_entry_id=None,
)
raise error
try:
resolved_targets = [self._path_guard.resolve_existing_path(path) for path in paths]
history_mode = self._download_mode_from_resolved_targets(resolved_targets)
history_path = self._summarize_download_targets([target.relative for target in resolved_targets])
history_download_name = self._download_name_for_targets(resolved_targets)
history_entry_id = self._record_download_status(
status="requested",
mode=history_mode,
path_summary=history_path,
download_name=history_download_name,
)
if len(resolved_targets) == 1 and resolved_targets[0].absolute.is_file():
prepared = self._prepare_single_file_download(resolved_targets[0])
else:
prepared = self._prepare_zip_download(resolved_targets, history_download_name)
self._record_download_status(
status="ready",
mode=history_mode,
path_summary=history_path,
download_name=history_download_name,
history_entry_id=history_entry_id,
)
return prepared
except AppError as error:
self._record_download_failure(
mode=history_mode,
path_summary=history_path,
download_name=history_download_name,
error=error,
history_entry_id=history_entry_id,
)
raise
except OSError as exc:
error = AppError(
code="io_error",
message="Filesystem operation failed",
status_code=500,
details={"reason": str(exc)},
)
self._record_download_failure(
mode=history_mode,
path_summary=history_path,
download_name=history_download_name,
error=error,
history_entry_id=history_entry_id,
)
raise error
def save(self, path: str, content: str, expected_modified: str) -> SaveResponse:
resolved_target = self._path_guard.resolve_existing_path(path)
if resolved_target.absolute.is_dir():
raise AppError(
code="type_conflict",
message="Source must be a file",
status_code=409,
details={"path": resolved_target.relative},
)
if not resolved_target.absolute.is_file():
raise AppError(
code="type_conflict",
message="Unsupported path type for save",
status_code=409,
details={"path": resolved_target.relative},
)
if self._content_type_for(resolved_target.absolute) is None:
raise AppError(
code="unsupported_type",
message="File type is not supported for edit",
status_code=409,
details={"path": resolved_target.relative},
)
if len(content.encode("utf-8")) > TEXT_EDIT_MAX_BYTES:
raise AppError(
code="file_too_large",
message="File is too large for edit",
status_code=409,
details={"path": resolved_target.relative},
)
current_modified = self._filesystem.modified_iso(resolved_target.absolute)
if current_modified != expected_modified:
raise AppError(
code="conflict",
message="File changed since it was opened",
status_code=409,
details={"path": resolved_target.relative},
)
try:
saved = self._filesystem.write_text_file(
resolved_target.absolute,
content=content,
encoding="utf-8",
)
except OSError as exc:
raise AppError(
code="io_error",
message="Filesystem operation failed",
status_code=500,
details={"reason": str(exc)},
)
return SaveResponse(
path=resolved_target.relative,
size=saved["size"],
modified=saved["modified"],
)
def prepare_video_stream(self, path: str, range_header: str | None = None) -> dict:
resolved_target = self._path_guard.resolve_existing_path(path)
if resolved_target.absolute.is_dir():
raise AppError(
code="type_conflict",
message="Source must be a file",
status_code=409,
details={"path": resolved_target.relative},
)
if not resolved_target.absolute.is_file():
raise AppError(
code="type_conflict",
message="Unsupported path type for video",
status_code=409,
details={"path": resolved_target.relative},
)
content_type = self._video_content_type_for(resolved_target.absolute)
if content_type is None:
raise AppError(
code="unsupported_type",
message="File type is not supported for video playback",
status_code=409,
details={"path": resolved_target.relative},
)
file_size = int(resolved_target.absolute.stat().st_size)
start = 0
end = max(file_size - 1, 0)
status_code = 200
headers = {"Accept-Ranges": "bytes"}
if range_header:
start, end = self._parse_range_header(range_header, file_size)
status_code = 206
headers["Content-Range"] = f"bytes {start}-{end}/{file_size}"
headers["Content-Length"] = str(max((end - start) + 1, 0))
return {
"status_code": status_code,
"headers": headers,
"content_type": content_type,
"content": self._filesystem.stream_file_range(resolved_target.absolute, start, end),
}
def prepare_thumbnail_stream(self, path: str) -> dict:
resolved_target = self._path_guard.resolve_existing_path(path)
if resolved_target.absolute.is_dir():
raise AppError(
code="type_conflict",
message="Source must be a file",
status_code=409,
details={"path": resolved_target.relative},
)
if not resolved_target.absolute.is_file():
raise AppError(
code="type_conflict",
message="Unsupported path type for thumbnail",
status_code=409,
details={"path": resolved_target.relative},
)
content_type = self._thumbnail_content_type_for(resolved_target.absolute)
if content_type is None:
raise AppError(
code="unsupported_type",
message="File type is not supported for thumbnail",
status_code=409,
details={"path": resolved_target.relative},
)
return {
"headers": {"Content-Length": str(int(resolved_target.absolute.stat().st_size))},
"content_type": content_type,
"content": self._filesystem.stream_file(resolved_target.absolute),
}
def prepare_image_stream(self, path: str) -> dict:
resolved_target = self._path_guard.resolve_existing_path(path)
if resolved_target.absolute.is_dir():
raise AppError(
code="type_conflict",
message="Source must be a file",
status_code=409,
details={"path": resolved_target.relative},
)
if not resolved_target.absolute.is_file():
raise AppError(
code="type_conflict",
message="Unsupported path type for image",
status_code=409,
details={"path": resolved_target.relative},
)
content_type = self._image_content_type_for(resolved_target.absolute)
if content_type is None:
raise AppError(
code="unsupported_type",
message="File type is not supported for image viewing",
status_code=409,
details={"path": resolved_target.relative},
)
return {
"headers": {"Content-Length": str(int(resolved_target.absolute.stat().st_size))},
"content_type": content_type,
"content": self._filesystem.stream_file(resolved_target.absolute),
}
def prepare_pdf_stream(self, path: str) -> dict:
resolved_target = self._path_guard.resolve_existing_path(path)
if resolved_target.absolute.is_dir():
raise AppError(
code="type_conflict",
message="Source must be a file",
status_code=409,
details={"path": resolved_target.relative},
)
if not resolved_target.absolute.is_file():
raise AppError(
code="type_conflict",
message="Unsupported path type for pdf",
status_code=409,
details={"path": resolved_target.relative},
)
content_type = self._pdf_content_type_for(resolved_target.absolute)
if content_type is None:
raise AppError(
code="unsupported_type",
message="File type is not supported for pdf viewing",
status_code=409,
details={"path": resolved_target.relative},
)
return {
"headers": {"Content-Length": str(int(resolved_target.absolute.stat().st_size))},
"content_type": content_type,
"content": self._filesystem.stream_file(resolved_target.absolute),
}
@staticmethod
def _join_relative(base: str, name: str) -> str:
return f"{base}/{name}" if base else name
@staticmethod
def _content_type_for(path: Path) -> str | None:
special_name = SPECIAL_TEXT_FILENAMES.get(path.name.lower())
if special_name:
return special_name
return TEXT_CONTENT_TYPES.get(path.suffix.lower())
@staticmethod
def _video_content_type_for(path: Path) -> str | None:
return VIDEO_CONTENT_TYPES.get(path.suffix.lower())
@staticmethod
def _thumbnail_content_type_for(path: Path) -> str | None:
return THUMBNAIL_CONTENT_TYPES.get(path.suffix.lower())
@staticmethod
def _image_content_type_for(path: Path) -> str | None:
return IMAGE_CONTENT_TYPES.get(path.suffix.lower())
@staticmethod
def _pdf_content_type_for(path: Path) -> str | None:
return PDF_CONTENT_TYPES.get(path.suffix.lower())
def _record_history(
self,
*,
operation: str,
status: str,
source: str | None = None,
destination: str | None = None,
path: str | None = None,
error_code: str | None = None,
error_message: str | None = None,
finished_at: str | None = None,
) -> None:
if not self._history_repository:
return
self._history_repository.create_entry(
operation=operation,
status=status,
source=source,
destination=destination,
path=path,
error_code=error_code,
error_message=error_message,
finished_at=finished_at,
)
def _record_history_error(
self,
*,
operation: str,
error: AppError,
source: str | None = None,
destination: str | None = None,
path: str | None = None,
) -> None:
self._record_history(
operation=operation,
status="failed",
source=source,
destination=destination,
path=path,
error_code=error.code,
error_message=error.message,
finished_at=self._now_iso(),
)
@staticmethod
def _now_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def _prepare_single_file_download(self, resolved_target) -> dict:
_, _, lexical_source, _ = self._path_guard.resolve_lexical_path(resolved_target.relative)
if lexical_source.is_symlink():
raise AppError(
code="type_conflict",
message="Source must not be a symlink",
status_code=409,
details={"path": resolved_target.relative},
)
return {
"content": self._filesystem.stream_file(resolved_target.absolute),
"headers": {
"Content-Disposition": f'attachment; filename="{resolved_target.absolute.name}"',
},
"content_type": self._content_type_for(resolved_target.absolute) or "application/octet-stream",
}
def _prepare_zip_download(self, resolved_targets: list, download_name: str) -> dict:
archive_names: set[str] = set()
for resolved_target in resolved_targets:
archive_name = resolved_target.absolute.name
if archive_name in archive_names:
raise AppError(
code="invalid_request",
message="Selected items must have distinct top-level names",
status_code=400,
)
archive_names.add(archive_name)
self._run_zip_download_preflight(resolved_targets)
buffer = BytesIO()
with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as archive:
for resolved_target in resolved_targets:
self._write_download_target_to_zip(archive, resolved_target)
payload = buffer.getvalue()
async def _stream_zip():
yield payload
return {
"content": _stream_zip(),
"headers": {
"Content-Disposition": f'attachment; filename="{download_name}"',
},
"content_type": "application/zip",
}
def _download_name_for_targets(self, resolved_targets: list) -> str:
if len(resolved_targets) == 1 and resolved_targets[0].absolute.is_file():
return resolved_targets[0].absolute.name
if len(resolved_targets) == 1 and resolved_targets[0].absolute.is_dir():
return f"{resolved_targets[0].absolute.name}.zip"
return f"kodidownload-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}.zip"
@staticmethod
def _download_mode_from_request_paths(paths: list[str]) -> str:
return "multi_zip" if len(paths) > 1 else "single_file"
@staticmethod
def _download_mode_from_resolved_targets(resolved_targets: list) -> str:
if len(resolved_targets) == 1 and resolved_targets[0].absolute.is_file():
return "single_file"
if len(resolved_targets) == 1 and resolved_targets[0].absolute.is_dir():
return "single_directory_zip"
return "multi_zip"
@staticmethod
def _summarize_download_targets(paths: list[str]) -> str:
if not paths:
return "-"
if len(paths) == 1:
return paths[0]
if len(paths) == 2:
return f"{paths[0]}, {paths[1]}"
return f"{paths[0]}, {paths[1]}, +{len(paths) - 2} more"
def _record_download_status(
self,
*,
status: str,
mode: str,
path_summary: str,
download_name: str | None,
history_entry_id: str | None = None,
) -> str | None:
if not self._history_repository:
return history_entry_id
if history_entry_id:
self._history_repository.update_entry(
entry_id=history_entry_id,
status=status,
error_code=None,
error_message=None,
finished_at=self._now_iso(),
)
return history_entry_id
created = self._history_repository.create_entry(
operation="download",
status=status,
source=mode,
destination=download_name,
path=path_summary,
finished_at=self._now_iso() if status != "requested" else None,
)
return created["id"]
def _record_download_failure(
self,
*,
mode: str,
path_summary: str,
download_name: str | None,
error: AppError,
history_entry_id: str | None,
) -> None:
if not self._history_repository:
return
failure_status = "preflight_failed" if error.code == "download_preflight_failed" else "failed"
if history_entry_id:
self._history_repository.update_entry(
entry_id=history_entry_id,
status=failure_status,
error_code=error.code,
error_message=error.message,
finished_at=self._now_iso(),
)
return
self._history_repository.create_entry(
operation="download",
status=failure_status,
source=mode,
destination=download_name,
path=path_summary,
error_code=error.code,
error_message=error.message,
finished_at=self._now_iso(),
)
def _run_zip_download_preflight(self, resolved_targets: list) -> None:
started_at = self._monotonic()
state = ZipDownloadPreflightState()
for resolved_target in resolved_targets:
self._ensure_zip_download_preflight_within_timeout(started_at)
self._validate_zip_download_root_target(resolved_target)
if resolved_target.absolute.is_file():
self._record_zip_download_file(
state=state,
entry_path=resolved_target.absolute,
entry_relative=resolved_target.relative,
)
continue
self._increment_zip_download_item_count(
state=state,
entry_relative=resolved_target.relative,
)
self._scan_zip_download_directory(
state=state,
resolved_target=resolved_target,
started_at=started_at,
)
def _validate_zip_download_root_target(self, resolved_target) -> None:
_, _, lexical_source, _ = self._path_guard.resolve_lexical_path(resolved_target.relative)
if lexical_source.is_symlink():
self._raise_zip_download_preflight_error(
reason="symlink_detected",
details={"path": resolved_target.relative},
)
if resolved_target.absolute.is_file() or resolved_target.absolute.is_dir():
return
self._raise_zip_download_preflight_error(
reason="unsupported_path_type",
details={"path": resolved_target.relative},
)
def _scan_zip_download_directory(self, state: ZipDownloadPreflightState, resolved_target, started_at: float) -> None:
for root, dirnames, filenames in os.walk(resolved_target.absolute, followlinks=False):
root_path = Path(root)
dirnames.sort()
filenames.sort()
for name in [*dirnames, *filenames]:
self._ensure_zip_download_preflight_within_timeout(started_at)
entry_path = root_path / name
relative_suffix = entry_path.relative_to(resolved_target.absolute).as_posix()
entry_relative = self._join_relative(resolved_target.relative, relative_suffix)
if entry_path.is_symlink():
self._raise_zip_download_preflight_error(
reason="symlink_detected",
details={"path": entry_relative},
)
if entry_path.is_dir():
self._increment_zip_download_item_count(state=state, entry_relative=entry_relative)
continue
self._record_zip_download_file(
state=state,
entry_path=entry_path,
entry_relative=entry_relative,
)
def _record_zip_download_file(
self,
*,
state: ZipDownloadPreflightState,
entry_path: Path,
entry_relative: str,
) -> None:
self._increment_zip_download_item_count(state=state, entry_relative=entry_relative)
file_size = int(entry_path.stat().st_size)
if file_size > self._zip_download_preflight_limits.max_individual_file_bytes:
self._raise_zip_download_preflight_error(
reason="max_individual_file_size_exceeded",
details={
"path": entry_relative,
"limit_bytes": str(self._zip_download_preflight_limits.max_individual_file_bytes),
"actual_bytes": str(file_size),
},
)
state.total_input_bytes += file_size
if state.total_input_bytes > self._zip_download_preflight_limits.max_total_input_bytes:
self._raise_zip_download_preflight_error(
reason="max_total_input_bytes_exceeded",
details={
"limit_bytes": str(self._zip_download_preflight_limits.max_total_input_bytes),
"actual_bytes": str(state.total_input_bytes),
},
)
def _increment_zip_download_item_count(self, *, state: ZipDownloadPreflightState, entry_relative: str) -> None:
state.item_count += 1
if state.item_count > self._zip_download_preflight_limits.max_items:
self._raise_zip_download_preflight_error(
reason="max_items_exceeded",
details={
"path": entry_relative,
"limit": str(self._zip_download_preflight_limits.max_items),
"actual": str(state.item_count),
},
)
def _ensure_zip_download_preflight_within_timeout(self, started_at: float) -> None:
elapsed = self._monotonic() - started_at
if elapsed > self._zip_download_preflight_limits.scan_timeout_seconds:
self._raise_zip_download_preflight_error(
reason="preflight_timeout",
details={
"timeout_seconds": str(self._zip_download_preflight_limits.scan_timeout_seconds),
},
)
@staticmethod
def _raise_zip_download_preflight_error(reason: str, details: dict[str, str]) -> None:
raise AppError(
code="download_preflight_failed",
message="Zip download preflight failed",
status_code=409,
details={"reason": reason, **details},
)
def _write_download_target_to_zip(self, archive: zipfile.ZipFile, resolved_target) -> None:
root_name = resolved_target.absolute.name
if resolved_target.absolute.is_file():
archive.write(resolved_target.absolute, arcname=root_name)
return
archive.writestr(f"{root_name}/", b"")
for child in sorted(resolved_target.absolute.rglob("*")):
arcname = f"{root_name}/{child.relative_to(resolved_target.absolute).as_posix()}"
if child.is_dir():
archive.writestr(f"{arcname}/", b"")
else:
archive.write(child, arcname=arcname)
@staticmethod
def _parse_range_header(range_header: str, file_size: int) -> tuple[int, int]:
def invalid_range() -> AppError:
return AppError(
code="invalid_request",
message="Invalid Range header",
status_code=400,
)
if not range_header.startswith("bytes="):
raise invalid_range()
value = range_header[len("bytes="):].strip()
if "," in value or "-" not in value:
raise invalid_range()
start_text, end_text = value.split("-", 1)
if start_text == "" and end_text == "":
raise invalid_range()
try:
if start_text == "":
suffix_length = int(end_text)
if suffix_length <= 0:
raise invalid_range()
if suffix_length >= file_size:
return 0, max(file_size - 1, 0)
return file_size - suffix_length, file_size - 1
start = int(start_text)
if start < 0 or start >= file_size:
raise invalid_range()
if end_text == "":
return start, file_size - 1
end = int(end_text)
if end < start:
raise invalid_range()
return start, min(end, file_size - 1)
except ValueError as exc:
raise invalid_range() from exc