792 lines
30 KiB
Python
792 lines
30 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
from io import BytesIO
|
|
import zipfile
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from backend.app.api.errors import AppError
|
|
from backend.app.api.schemas import DeleteResponse, FileInfoResponse, MkdirResponse, RenameResponse, SaveResponse, UploadResponse, ViewResponse
|
|
from backend.app.db.history_repository import HistoryRepository
|
|
from backend.app.fs.filesystem_adapter import FilesystemAdapter
|
|
from backend.app.security.path_guard import PathGuard
|
|
|
|
TEXT_PREVIEW_MAX_BYTES = 256 * 1024
|
|
TEXT_EDIT_MAX_BYTES = 256 * 1024
|
|
TEXT_CONTENT_TYPES = {
|
|
".txt": "text/plain",
|
|
".log": "text/plain",
|
|
".md": "text/markdown",
|
|
".yml": "text/yaml",
|
|
".yaml": "text/yaml",
|
|
".json": "application/json",
|
|
".js": "text/javascript",
|
|
".py": "text/x-python",
|
|
".css": "text/css",
|
|
".html": "text/html",
|
|
}
|
|
SPECIAL_TEXT_FILENAMES = {
|
|
"dockerfile": "text/plain",
|
|
"containerfile": "text/plain",
|
|
}
|
|
THUMBNAIL_CONTENT_TYPES = {
|
|
".jpg": "image/jpeg",
|
|
".jpeg": "image/jpeg",
|
|
".png": "image/png",
|
|
".webp": "image/webp",
|
|
}
|
|
IMAGE_CONTENT_TYPES = {
|
|
".jpg": "image/jpeg",
|
|
".jpeg": "image/jpeg",
|
|
".png": "image/png",
|
|
".webp": "image/webp",
|
|
".gif": "image/gif",
|
|
".bmp": "image/bmp",
|
|
".avif": "image/avif",
|
|
}
|
|
VIDEO_CONTENT_TYPES = {
|
|
".mp4": "video/mp4",
|
|
".mkv": "video/x-matroska",
|
|
}
|
|
PDF_CONTENT_TYPES = {
|
|
".pdf": "application/pdf",
|
|
}
|
|
|
|
|
|
class FileOpsService:
|
|
def __init__(self, path_guard: PathGuard, filesystem: FilesystemAdapter, history_repository: HistoryRepository | None = None):
|
|
self._path_guard = path_guard
|
|
self._filesystem = filesystem
|
|
self._history_repository = history_repository
|
|
|
|
def mkdir(self, parent_path: str, name: str) -> MkdirResponse:
|
|
try:
|
|
resolved_parent = self._path_guard.resolve_directory_path(parent_path)
|
|
safe_name = self._path_guard.validate_name(name, field="name")
|
|
target_relative = self._join_relative(resolved_parent.relative, safe_name)
|
|
resolved_target = self._path_guard.resolve_path(target_relative)
|
|
|
|
if resolved_target.absolute.exists():
|
|
raise AppError(
|
|
code="already_exists",
|
|
message="Target path already exists",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
self._filesystem.make_directory(resolved_target.absolute)
|
|
self._record_history(operation="mkdir", status="completed", path=resolved_target.relative, finished_at=self._now_iso())
|
|
return MkdirResponse(path=resolved_target.relative)
|
|
except FileExistsError:
|
|
error = AppError(
|
|
code="already_exists",
|
|
message="Target path already exists",
|
|
status_code=409,
|
|
details={"path": self._join_relative(parent_path, name)},
|
|
)
|
|
self._record_history_error(operation="mkdir", path=self._join_relative(parent_path, name), error=error)
|
|
raise error
|
|
except AppError as exc:
|
|
self._record_history_error(operation="mkdir", path=self._join_relative(parent_path, name), error=exc)
|
|
raise
|
|
except OSError as exc:
|
|
error = AppError(
|
|
code="io_error",
|
|
message="Filesystem operation failed",
|
|
status_code=500,
|
|
details={"reason": str(exc)},
|
|
)
|
|
self._record_history_error(operation="mkdir", path=self._join_relative(parent_path, name), error=error)
|
|
raise error
|
|
|
|
def rename(self, path: str, new_name: str) -> RenameResponse:
|
|
try:
|
|
resolved_source = self._path_guard.resolve_existing_path(path)
|
|
safe_name = self._path_guard.validate_name(new_name, field="new_name")
|
|
|
|
parent_relative = self._path_guard.entry_relative_path(
|
|
resolved_source.alias,
|
|
resolved_source.absolute.parent,
|
|
display_style=resolved_source.display_style,
|
|
)
|
|
target_relative = self._join_relative(parent_relative, safe_name)
|
|
resolved_target = self._path_guard.resolve_path(target_relative)
|
|
|
|
if resolved_target.absolute.exists():
|
|
raise AppError(
|
|
code="already_exists",
|
|
message="Target path already exists",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
self._filesystem.rename_path(resolved_source.absolute, resolved_target.absolute)
|
|
self._record_history(
|
|
operation="rename",
|
|
status="completed",
|
|
source=path,
|
|
destination=resolved_target.relative,
|
|
path=resolved_target.relative,
|
|
finished_at=self._now_iso(),
|
|
)
|
|
return RenameResponse(path=resolved_target.relative)
|
|
except FileNotFoundError:
|
|
error = AppError(
|
|
code="path_not_found",
|
|
message="Requested path was not found",
|
|
status_code=404,
|
|
details={"path": path},
|
|
)
|
|
self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=error)
|
|
raise error
|
|
except FileExistsError:
|
|
error = AppError(
|
|
code="already_exists",
|
|
message="Target path already exists",
|
|
status_code=409,
|
|
details={"path": new_name},
|
|
)
|
|
self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=error)
|
|
raise error
|
|
except AppError as exc:
|
|
self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=exc)
|
|
raise
|
|
except OSError as exc:
|
|
error = AppError(
|
|
code="io_error",
|
|
message="Filesystem operation failed",
|
|
status_code=500,
|
|
details={"reason": str(exc)},
|
|
)
|
|
self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=error)
|
|
raise error
|
|
|
|
def delete(self, path: str, recursive: bool = False) -> DeleteResponse:
|
|
try:
|
|
resolved_target = self._path_guard.resolve_existing_path(path)
|
|
|
|
if resolved_target.absolute.is_file():
|
|
self._filesystem.delete_file(resolved_target.absolute)
|
|
elif resolved_target.absolute.is_dir():
|
|
if not self._filesystem.is_directory_empty(resolved_target.absolute):
|
|
if not recursive:
|
|
raise AppError(
|
|
code="directory_not_empty",
|
|
message="Directory is not empty",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
self._filesystem.delete_directory_recursive(resolved_target.absolute)
|
|
else:
|
|
self._filesystem.delete_empty_directory(resolved_target.absolute)
|
|
else:
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Unsupported path type for delete",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
self._record_history(operation="delete", status="completed", path=resolved_target.relative, finished_at=self._now_iso())
|
|
return DeleteResponse(path=resolved_target.relative)
|
|
except AppError as exc:
|
|
self._record_history_error(operation="delete", path=path, error=exc)
|
|
raise
|
|
except FileNotFoundError:
|
|
error = AppError(
|
|
code="path_not_found",
|
|
message="Requested path was not found",
|
|
status_code=404,
|
|
details={"path": path},
|
|
)
|
|
self._record_history_error(operation="delete", path=path, error=error)
|
|
raise error
|
|
except OSError as exc:
|
|
error = AppError(
|
|
code="io_error",
|
|
message="Filesystem operation failed",
|
|
status_code=500,
|
|
details={"reason": str(exc)},
|
|
)
|
|
self._record_history_error(operation="delete", path=path, error=error)
|
|
raise error
|
|
|
|
def upload(self, target_path: str, upload_file, overwrite: bool = False) -> UploadResponse:
|
|
destination_relative = None
|
|
history_path = target_path
|
|
try:
|
|
resolved_target = self._path_guard.resolve_directory_path(target_path)
|
|
filename = Path(upload_file.filename or "").name
|
|
safe_name = self._path_guard.validate_name(filename, field="name")
|
|
destination_relative = self._join_relative(resolved_target.relative, safe_name)
|
|
history_path = destination_relative
|
|
resolved_destination = self._path_guard.resolve_path(destination_relative)
|
|
|
|
if resolved_destination.absolute.exists():
|
|
if not overwrite:
|
|
raise AppError(
|
|
code="already_exists",
|
|
message="Target path already exists",
|
|
status_code=409,
|
|
details={"path": resolved_destination.relative},
|
|
)
|
|
if resolved_destination.absolute.is_dir():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Cannot overwrite an existing directory",
|
|
status_code=409,
|
|
details={"path": resolved_destination.relative},
|
|
)
|
|
|
|
saved = self._filesystem.write_uploaded_file(
|
|
resolved_destination.absolute,
|
|
upload_file.file,
|
|
overwrite=overwrite,
|
|
)
|
|
self._record_history(
|
|
operation="upload",
|
|
status="completed",
|
|
destination=resolved_destination.relative,
|
|
path=resolved_destination.relative,
|
|
finished_at=self._now_iso(),
|
|
)
|
|
return UploadResponse(
|
|
path=resolved_destination.relative,
|
|
size=saved["size"],
|
|
modified=saved["modified"],
|
|
)
|
|
except AppError as exc:
|
|
self._record_history_error(
|
|
operation="upload",
|
|
destination=destination_relative,
|
|
path=history_path,
|
|
error=exc,
|
|
)
|
|
raise
|
|
except OSError as exc:
|
|
error = AppError(
|
|
code="io_error",
|
|
message="Filesystem operation failed",
|
|
status_code=500,
|
|
details={"reason": str(exc)},
|
|
)
|
|
self._record_history_error(
|
|
operation="upload",
|
|
destination=destination_relative,
|
|
path=history_path,
|
|
error=error,
|
|
)
|
|
raise error
|
|
|
|
def view(self, path: str, for_edit: bool = False) -> ViewResponse:
|
|
resolved_target = self._path_guard.resolve_existing_path(path)
|
|
|
|
if resolved_target.absolute.is_dir():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Source must be a file",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
if not resolved_target.absolute.is_file():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Unsupported path type for view",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
content_type = self._content_type_for(resolved_target.absolute)
|
|
if content_type is None:
|
|
raise AppError(
|
|
code="unsupported_type",
|
|
message="File type is not supported for preview",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
if for_edit and resolved_target.absolute.stat().st_size > TEXT_EDIT_MAX_BYTES:
|
|
raise AppError(
|
|
code="file_too_large",
|
|
message="File is too large for edit",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
try:
|
|
preview = self._filesystem.read_text_preview(
|
|
resolved_target.absolute,
|
|
max_bytes=TEXT_PREVIEW_MAX_BYTES,
|
|
encoding="utf-8",
|
|
)
|
|
except OSError as exc:
|
|
raise AppError(
|
|
code="io_error",
|
|
message="Filesystem operation failed",
|
|
status_code=500,
|
|
details={"reason": str(exc)},
|
|
)
|
|
|
|
return ViewResponse(
|
|
path=resolved_target.relative,
|
|
name=resolved_target.absolute.name,
|
|
content_type=content_type,
|
|
encoding="utf-8",
|
|
truncated=preview["truncated"],
|
|
size=preview["size"],
|
|
modified=preview["modified"],
|
|
content=preview["content"],
|
|
)
|
|
|
|
def info(self, path: str) -> FileInfoResponse:
|
|
resolved_target = self._path_guard.resolve_existing_path(path)
|
|
metadata = self._filesystem.stat_info(resolved_target.absolute)
|
|
|
|
return FileInfoResponse(
|
|
name=metadata["name"],
|
|
path=resolved_target.relative,
|
|
type="directory" if resolved_target.absolute.is_dir() else "file",
|
|
size=metadata["size"],
|
|
modified=metadata["modified"],
|
|
root=resolved_target.alias,
|
|
extension=metadata["extension"],
|
|
content_type=metadata["content_type"],
|
|
owner=metadata["owner"],
|
|
group=metadata["group"],
|
|
width=metadata["width"],
|
|
height=metadata["height"],
|
|
)
|
|
|
|
def prepare_download(self, paths: list[str]) -> dict:
|
|
if not paths:
|
|
raise AppError(
|
|
code="invalid_request",
|
|
message="At least one path is required",
|
|
status_code=400,
|
|
)
|
|
|
|
resolved_targets = [self._path_guard.resolve_existing_path(path) for path in paths]
|
|
if len(resolved_targets) == 1 and resolved_targets[0].absolute.is_file():
|
|
return self._prepare_single_file_download(resolved_targets[0])
|
|
return self._prepare_zip_download(resolved_targets)
|
|
|
|
def save(self, path: str, content: str, expected_modified: str) -> SaveResponse:
|
|
resolved_target = self._path_guard.resolve_existing_path(path)
|
|
|
|
if resolved_target.absolute.is_dir():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Source must be a file",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
if not resolved_target.absolute.is_file():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Unsupported path type for save",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
if self._content_type_for(resolved_target.absolute) is None:
|
|
raise AppError(
|
|
code="unsupported_type",
|
|
message="File type is not supported for edit",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
if len(content.encode("utf-8")) > TEXT_EDIT_MAX_BYTES:
|
|
raise AppError(
|
|
code="file_too_large",
|
|
message="File is too large for edit",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
current_modified = self._filesystem.modified_iso(resolved_target.absolute)
|
|
if current_modified != expected_modified:
|
|
raise AppError(
|
|
code="conflict",
|
|
message="File changed since it was opened",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
try:
|
|
saved = self._filesystem.write_text_file(
|
|
resolved_target.absolute,
|
|
content=content,
|
|
encoding="utf-8",
|
|
)
|
|
except OSError as exc:
|
|
raise AppError(
|
|
code="io_error",
|
|
message="Filesystem operation failed",
|
|
status_code=500,
|
|
details={"reason": str(exc)},
|
|
)
|
|
|
|
return SaveResponse(
|
|
path=resolved_target.relative,
|
|
size=saved["size"],
|
|
modified=saved["modified"],
|
|
)
|
|
|
|
def prepare_video_stream(self, path: str, range_header: str | None = None) -> dict:
|
|
resolved_target = self._path_guard.resolve_existing_path(path)
|
|
|
|
if resolved_target.absolute.is_dir():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Source must be a file",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
if not resolved_target.absolute.is_file():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Unsupported path type for video",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
content_type = self._video_content_type_for(resolved_target.absolute)
|
|
if content_type is None:
|
|
raise AppError(
|
|
code="unsupported_type",
|
|
message="File type is not supported for video playback",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
file_size = int(resolved_target.absolute.stat().st_size)
|
|
start = 0
|
|
end = max(file_size - 1, 0)
|
|
status_code = 200
|
|
headers = {"Accept-Ranges": "bytes"}
|
|
|
|
if range_header:
|
|
start, end = self._parse_range_header(range_header, file_size)
|
|
status_code = 206
|
|
headers["Content-Range"] = f"bytes {start}-{end}/{file_size}"
|
|
|
|
headers["Content-Length"] = str(max((end - start) + 1, 0))
|
|
|
|
return {
|
|
"status_code": status_code,
|
|
"headers": headers,
|
|
"content_type": content_type,
|
|
"content": self._filesystem.stream_file_range(resolved_target.absolute, start, end),
|
|
}
|
|
|
|
def prepare_thumbnail_stream(self, path: str) -> dict:
|
|
resolved_target = self._path_guard.resolve_existing_path(path)
|
|
|
|
if resolved_target.absolute.is_dir():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Source must be a file",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
if not resolved_target.absolute.is_file():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Unsupported path type for thumbnail",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
content_type = self._thumbnail_content_type_for(resolved_target.absolute)
|
|
if content_type is None:
|
|
raise AppError(
|
|
code="unsupported_type",
|
|
message="File type is not supported for thumbnail",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
return {
|
|
"headers": {"Content-Length": str(int(resolved_target.absolute.stat().st_size))},
|
|
"content_type": content_type,
|
|
"content": self._filesystem.stream_file(resolved_target.absolute),
|
|
}
|
|
|
|
def prepare_image_stream(self, path: str) -> dict:
|
|
resolved_target = self._path_guard.resolve_existing_path(path)
|
|
|
|
if resolved_target.absolute.is_dir():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Source must be a file",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
if not resolved_target.absolute.is_file():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Unsupported path type for image",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
content_type = self._image_content_type_for(resolved_target.absolute)
|
|
if content_type is None:
|
|
raise AppError(
|
|
code="unsupported_type",
|
|
message="File type is not supported for image viewing",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
return {
|
|
"headers": {"Content-Length": str(int(resolved_target.absolute.stat().st_size))},
|
|
"content_type": content_type,
|
|
"content": self._filesystem.stream_file(resolved_target.absolute),
|
|
}
|
|
|
|
def prepare_pdf_stream(self, path: str) -> dict:
|
|
resolved_target = self._path_guard.resolve_existing_path(path)
|
|
|
|
if resolved_target.absolute.is_dir():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Source must be a file",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
if not resolved_target.absolute.is_file():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Unsupported path type for pdf",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
content_type = self._pdf_content_type_for(resolved_target.absolute)
|
|
if content_type is None:
|
|
raise AppError(
|
|
code="unsupported_type",
|
|
message="File type is not supported for pdf viewing",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
return {
|
|
"headers": {"Content-Length": str(int(resolved_target.absolute.stat().st_size))},
|
|
"content_type": content_type,
|
|
"content": self._filesystem.stream_file(resolved_target.absolute),
|
|
}
|
|
|
|
@staticmethod
|
|
def _join_relative(base: str, name: str) -> str:
|
|
return f"{base}/{name}" if base else name
|
|
|
|
@staticmethod
|
|
def _content_type_for(path: Path) -> str | None:
|
|
special_name = SPECIAL_TEXT_FILENAMES.get(path.name.lower())
|
|
if special_name:
|
|
return special_name
|
|
return TEXT_CONTENT_TYPES.get(path.suffix.lower())
|
|
|
|
@staticmethod
|
|
def _video_content_type_for(path: Path) -> str | None:
|
|
return VIDEO_CONTENT_TYPES.get(path.suffix.lower())
|
|
|
|
@staticmethod
|
|
def _thumbnail_content_type_for(path: Path) -> str | None:
|
|
return THUMBNAIL_CONTENT_TYPES.get(path.suffix.lower())
|
|
|
|
@staticmethod
|
|
def _image_content_type_for(path: Path) -> str | None:
|
|
return IMAGE_CONTENT_TYPES.get(path.suffix.lower())
|
|
|
|
@staticmethod
|
|
def _pdf_content_type_for(path: Path) -> str | None:
|
|
return PDF_CONTENT_TYPES.get(path.suffix.lower())
|
|
|
|
def _record_history(
|
|
self,
|
|
*,
|
|
operation: str,
|
|
status: str,
|
|
source: str | None = None,
|
|
destination: str | None = None,
|
|
path: str | None = None,
|
|
error_code: str | None = None,
|
|
error_message: str | None = None,
|
|
finished_at: str | None = None,
|
|
) -> None:
|
|
if not self._history_repository:
|
|
return
|
|
self._history_repository.create_entry(
|
|
operation=operation,
|
|
status=status,
|
|
source=source,
|
|
destination=destination,
|
|
path=path,
|
|
error_code=error_code,
|
|
error_message=error_message,
|
|
finished_at=finished_at,
|
|
)
|
|
|
|
def _record_history_error(
|
|
self,
|
|
*,
|
|
operation: str,
|
|
error: AppError,
|
|
source: str | None = None,
|
|
destination: str | None = None,
|
|
path: str | None = None,
|
|
) -> None:
|
|
self._record_history(
|
|
operation=operation,
|
|
status="failed",
|
|
source=source,
|
|
destination=destination,
|
|
path=path,
|
|
error_code=error.code,
|
|
error_message=error.message,
|
|
finished_at=self._now_iso(),
|
|
)
|
|
|
|
@staticmethod
|
|
def _now_iso() -> str:
|
|
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
|
|
|
def _prepare_single_file_download(self, resolved_target) -> dict:
|
|
_, _, lexical_source, _ = self._path_guard.resolve_lexical_path(resolved_target.relative)
|
|
if lexical_source.is_symlink():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Source must not be a symlink",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
return {
|
|
"content": self._filesystem.stream_file(resolved_target.absolute),
|
|
"headers": {
|
|
"Content-Disposition": f'attachment; filename="{resolved_target.absolute.name}"',
|
|
},
|
|
"content_type": self._content_type_for(resolved_target.absolute) or "application/octet-stream",
|
|
}
|
|
|
|
def _prepare_zip_download(self, resolved_targets: list) -> dict:
|
|
archive_names: set[str] = set()
|
|
for resolved_target in resolved_targets:
|
|
self._validate_download_target(resolved_target)
|
|
archive_name = resolved_target.absolute.name
|
|
if archive_name in archive_names:
|
|
raise AppError(
|
|
code="invalid_request",
|
|
message="Selected items must have distinct top-level names",
|
|
status_code=400,
|
|
)
|
|
archive_names.add(archive_name)
|
|
|
|
if len(resolved_targets) == 1 and resolved_targets[0].absolute.is_dir():
|
|
download_name = f"{resolved_targets[0].absolute.name}.zip"
|
|
else:
|
|
download_name = f"kodidownload-{datetime.now(timezone.utc).strftime('%Y%m%d-%H%M%S')}.zip"
|
|
|
|
buffer = BytesIO()
|
|
with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as archive:
|
|
for resolved_target in resolved_targets:
|
|
self._write_download_target_to_zip(archive, resolved_target)
|
|
payload = buffer.getvalue()
|
|
|
|
async def _stream_zip():
|
|
yield payload
|
|
|
|
return {
|
|
"content": _stream_zip(),
|
|
"headers": {
|
|
"Content-Disposition": f'attachment; filename="{download_name}"',
|
|
},
|
|
"content_type": "application/zip",
|
|
}
|
|
|
|
def _validate_download_target(self, resolved_target) -> None:
|
|
_, _, lexical_source, _ = self._path_guard.resolve_lexical_path(resolved_target.relative)
|
|
if lexical_source.is_symlink():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Source must not be a symlink",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
if resolved_target.absolute.is_file():
|
|
return
|
|
if resolved_target.absolute.is_dir():
|
|
for root, dirnames, filenames in os.walk(resolved_target.absolute, followlinks=False):
|
|
root_path = Path(root)
|
|
for name in [*dirnames, *filenames]:
|
|
entry = root_path / name
|
|
if entry.is_symlink():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Source directory must not contain symlinks",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
return
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Unsupported path type for download",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
def _write_download_target_to_zip(self, archive: zipfile.ZipFile, resolved_target) -> None:
|
|
root_name = resolved_target.absolute.name
|
|
if resolved_target.absolute.is_file():
|
|
archive.write(resolved_target.absolute, arcname=root_name)
|
|
return
|
|
|
|
archive.writestr(f"{root_name}/", b"")
|
|
for child in sorted(resolved_target.absolute.rglob("*")):
|
|
arcname = f"{root_name}/{child.relative_to(resolved_target.absolute).as_posix()}"
|
|
if child.is_dir():
|
|
archive.writestr(f"{arcname}/", b"")
|
|
else:
|
|
archive.write(child, arcname=arcname)
|
|
@staticmethod
|
|
def _parse_range_header(range_header: str, file_size: int) -> tuple[int, int]:
|
|
def invalid_range() -> AppError:
|
|
return AppError(
|
|
code="invalid_request",
|
|
message="Invalid Range header",
|
|
status_code=400,
|
|
)
|
|
|
|
if not range_header.startswith("bytes="):
|
|
raise invalid_range()
|
|
value = range_header[len("bytes="):].strip()
|
|
if "," in value or "-" not in value:
|
|
raise invalid_range()
|
|
start_text, end_text = value.split("-", 1)
|
|
if start_text == "" and end_text == "":
|
|
raise invalid_range()
|
|
|
|
try:
|
|
if start_text == "":
|
|
suffix_length = int(end_text)
|
|
if suffix_length <= 0:
|
|
raise invalid_range()
|
|
if suffix_length >= file_size:
|
|
return 0, max(file_size - 1, 0)
|
|
return file_size - suffix_length, file_size - 1
|
|
|
|
start = int(start_text)
|
|
if start < 0 or start >= file_size:
|
|
raise invalid_range()
|
|
|
|
if end_text == "":
|
|
return start, file_size - 1
|
|
|
|
end = int(end_text)
|
|
if end < start:
|
|
raise invalid_range()
|
|
return start, min(end, file_size - 1)
|
|
except ValueError as exc:
|
|
raise invalid_range() from exc
|