521 lines
19 KiB
Python
521 lines
19 KiB
Python
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
from backend.app.api.errors import AppError
|
|
from backend.app.api.schemas import DeleteResponse, FileInfoResponse, MkdirResponse, RenameResponse, SaveResponse, ViewResponse
|
|
from backend.app.db.history_repository import HistoryRepository
|
|
from backend.app.fs.filesystem_adapter import FilesystemAdapter
|
|
from backend.app.security.path_guard import PathGuard
|
|
|
|
TEXT_PREVIEW_MAX_BYTES = 256 * 1024
|
|
TEXT_EDIT_MAX_BYTES = 256 * 1024
|
|
TEXT_CONTENT_TYPES = {
|
|
".txt": "text/plain",
|
|
".log": "text/plain",
|
|
".md": "text/markdown",
|
|
".yml": "text/yaml",
|
|
".yaml": "text/yaml",
|
|
".json": "application/json",
|
|
".js": "text/javascript",
|
|
".css": "text/css",
|
|
".html": "text/html",
|
|
}
|
|
SPECIAL_TEXT_FILENAMES = {
|
|
"dockerfile": "text/plain",
|
|
"containerfile": "text/plain",
|
|
}
|
|
THUMBNAIL_CONTENT_TYPES = {
|
|
".jpg": "image/jpeg",
|
|
".jpeg": "image/jpeg",
|
|
".png": "image/png",
|
|
".webp": "image/webp",
|
|
}
|
|
VIDEO_CONTENT_TYPES = {
|
|
".mp4": "video/mp4",
|
|
".mkv": "video/x-matroska",
|
|
}
|
|
|
|
|
|
class FileOpsService:
|
|
def __init__(self, path_guard: PathGuard, filesystem: FilesystemAdapter, history_repository: HistoryRepository | None = None):
|
|
self._path_guard = path_guard
|
|
self._filesystem = filesystem
|
|
self._history_repository = history_repository
|
|
|
|
def mkdir(self, parent_path: str, name: str) -> MkdirResponse:
|
|
try:
|
|
resolved_parent = self._path_guard.resolve_directory_path(parent_path)
|
|
safe_name = self._path_guard.validate_name(name, field="name")
|
|
target_relative = self._join_relative(resolved_parent.relative, safe_name)
|
|
resolved_target = self._path_guard.resolve_path(target_relative)
|
|
|
|
if resolved_target.absolute.exists():
|
|
raise AppError(
|
|
code="already_exists",
|
|
message="Target path already exists",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
self._filesystem.make_directory(resolved_target.absolute)
|
|
self._record_history(operation="mkdir", status="completed", path=resolved_target.relative, finished_at=self._now_iso())
|
|
return MkdirResponse(path=resolved_target.relative)
|
|
except FileExistsError:
|
|
error = AppError(
|
|
code="already_exists",
|
|
message="Target path already exists",
|
|
status_code=409,
|
|
details={"path": self._join_relative(parent_path, name)},
|
|
)
|
|
self._record_history_error(operation="mkdir", path=self._join_relative(parent_path, name), error=error)
|
|
raise error
|
|
except AppError as exc:
|
|
self._record_history_error(operation="mkdir", path=self._join_relative(parent_path, name), error=exc)
|
|
raise
|
|
except OSError as exc:
|
|
error = AppError(
|
|
code="io_error",
|
|
message="Filesystem operation failed",
|
|
status_code=500,
|
|
details={"reason": str(exc)},
|
|
)
|
|
self._record_history_error(operation="mkdir", path=self._join_relative(parent_path, name), error=error)
|
|
raise error
|
|
|
|
def rename(self, path: str, new_name: str) -> RenameResponse:
|
|
try:
|
|
resolved_source = self._path_guard.resolve_existing_path(path)
|
|
safe_name = self._path_guard.validate_name(new_name, field="new_name")
|
|
|
|
parent_relative = self._path_guard.entry_relative_path(
|
|
resolved_source.alias,
|
|
resolved_source.absolute.parent,
|
|
display_style=resolved_source.display_style,
|
|
)
|
|
target_relative = self._join_relative(parent_relative, safe_name)
|
|
resolved_target = self._path_guard.resolve_path(target_relative)
|
|
|
|
if resolved_target.absolute.exists():
|
|
raise AppError(
|
|
code="already_exists",
|
|
message="Target path already exists",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
self._filesystem.rename_path(resolved_source.absolute, resolved_target.absolute)
|
|
self._record_history(
|
|
operation="rename",
|
|
status="completed",
|
|
source=path,
|
|
destination=resolved_target.relative,
|
|
path=resolved_target.relative,
|
|
finished_at=self._now_iso(),
|
|
)
|
|
return RenameResponse(path=resolved_target.relative)
|
|
except FileNotFoundError:
|
|
error = AppError(
|
|
code="path_not_found",
|
|
message="Requested path was not found",
|
|
status_code=404,
|
|
details={"path": path},
|
|
)
|
|
self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=error)
|
|
raise error
|
|
except FileExistsError:
|
|
error = AppError(
|
|
code="already_exists",
|
|
message="Target path already exists",
|
|
status_code=409,
|
|
details={"path": new_name},
|
|
)
|
|
self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=error)
|
|
raise error
|
|
except AppError as exc:
|
|
self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=exc)
|
|
raise
|
|
except OSError as exc:
|
|
error = AppError(
|
|
code="io_error",
|
|
message="Filesystem operation failed",
|
|
status_code=500,
|
|
details={"reason": str(exc)},
|
|
)
|
|
self._record_history_error(operation="rename", source=path, destination=new_name, path=path, error=error)
|
|
raise error
|
|
|
|
def delete(self, path: str) -> DeleteResponse:
|
|
try:
|
|
resolved_target = self._path_guard.resolve_existing_path(path)
|
|
|
|
if resolved_target.absolute.is_file():
|
|
self._filesystem.delete_file(resolved_target.absolute)
|
|
elif resolved_target.absolute.is_dir():
|
|
if not self._filesystem.is_directory_empty(resolved_target.absolute):
|
|
raise AppError(
|
|
code="directory_not_empty",
|
|
message="Directory is not empty",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
self._filesystem.delete_empty_directory(resolved_target.absolute)
|
|
else:
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Unsupported path type for delete",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
self._record_history(operation="delete", status="completed", path=resolved_target.relative, finished_at=self._now_iso())
|
|
return DeleteResponse(path=resolved_target.relative)
|
|
except AppError as exc:
|
|
self._record_history_error(operation="delete", path=path, error=exc)
|
|
raise
|
|
except FileNotFoundError:
|
|
error = AppError(
|
|
code="path_not_found",
|
|
message="Requested path was not found",
|
|
status_code=404,
|
|
details={"path": path},
|
|
)
|
|
self._record_history_error(operation="delete", path=path, error=error)
|
|
raise error
|
|
except OSError as exc:
|
|
error = AppError(
|
|
code="io_error",
|
|
message="Filesystem operation failed",
|
|
status_code=500,
|
|
details={"reason": str(exc)},
|
|
)
|
|
self._record_history_error(operation="delete", path=path, error=error)
|
|
raise error
|
|
|
|
def view(self, path: str, for_edit: bool = False) -> ViewResponse:
|
|
resolved_target = self._path_guard.resolve_existing_path(path)
|
|
|
|
if resolved_target.absolute.is_dir():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Source must be a file",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
if not resolved_target.absolute.is_file():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Unsupported path type for view",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
content_type = self._content_type_for(resolved_target.absolute)
|
|
if content_type is None:
|
|
raise AppError(
|
|
code="unsupported_type",
|
|
message="File type is not supported for preview",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
if for_edit and resolved_target.absolute.stat().st_size > TEXT_EDIT_MAX_BYTES:
|
|
raise AppError(
|
|
code="file_too_large",
|
|
message="File is too large for edit",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
try:
|
|
preview = self._filesystem.read_text_preview(
|
|
resolved_target.absolute,
|
|
max_bytes=TEXT_PREVIEW_MAX_BYTES,
|
|
encoding="utf-8",
|
|
)
|
|
except OSError as exc:
|
|
raise AppError(
|
|
code="io_error",
|
|
message="Filesystem operation failed",
|
|
status_code=500,
|
|
details={"reason": str(exc)},
|
|
)
|
|
|
|
return ViewResponse(
|
|
path=resolved_target.relative,
|
|
name=resolved_target.absolute.name,
|
|
content_type=content_type,
|
|
encoding="utf-8",
|
|
truncated=preview["truncated"],
|
|
size=preview["size"],
|
|
modified=preview["modified"],
|
|
content=preview["content"],
|
|
)
|
|
|
|
def info(self, path: str) -> FileInfoResponse:
|
|
resolved_target = self._path_guard.resolve_existing_path(path)
|
|
metadata = self._filesystem.stat_info(resolved_target.absolute)
|
|
|
|
return FileInfoResponse(
|
|
name=metadata["name"],
|
|
path=resolved_target.relative,
|
|
type="directory" if resolved_target.absolute.is_dir() else "file",
|
|
size=metadata["size"],
|
|
modified=metadata["modified"],
|
|
root=resolved_target.alias,
|
|
extension=metadata["extension"],
|
|
content_type=metadata["content_type"],
|
|
owner=metadata["owner"],
|
|
group=metadata["group"],
|
|
)
|
|
|
|
def save(self, path: str, content: str, expected_modified: str) -> SaveResponse:
|
|
resolved_target = self._path_guard.resolve_existing_path(path)
|
|
|
|
if resolved_target.absolute.is_dir():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Source must be a file",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
if not resolved_target.absolute.is_file():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Unsupported path type for save",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
if self._content_type_for(resolved_target.absolute) is None:
|
|
raise AppError(
|
|
code="unsupported_type",
|
|
message="File type is not supported for edit",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
if len(content.encode("utf-8")) > TEXT_EDIT_MAX_BYTES:
|
|
raise AppError(
|
|
code="file_too_large",
|
|
message="File is too large for edit",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
current_modified = self._filesystem.modified_iso(resolved_target.absolute)
|
|
if current_modified != expected_modified:
|
|
raise AppError(
|
|
code="conflict",
|
|
message="File changed since it was opened",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
try:
|
|
saved = self._filesystem.write_text_file(
|
|
resolved_target.absolute,
|
|
content=content,
|
|
encoding="utf-8",
|
|
)
|
|
except OSError as exc:
|
|
raise AppError(
|
|
code="io_error",
|
|
message="Filesystem operation failed",
|
|
status_code=500,
|
|
details={"reason": str(exc)},
|
|
)
|
|
|
|
return SaveResponse(
|
|
path=resolved_target.relative,
|
|
size=saved["size"],
|
|
modified=saved["modified"],
|
|
)
|
|
|
|
def prepare_video_stream(self, path: str, range_header: str | None = None) -> dict:
|
|
resolved_target = self._path_guard.resolve_existing_path(path)
|
|
|
|
if resolved_target.absolute.is_dir():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Source must be a file",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
if not resolved_target.absolute.is_file():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Unsupported path type for video",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
content_type = self._video_content_type_for(resolved_target.absolute)
|
|
if content_type is None:
|
|
raise AppError(
|
|
code="unsupported_type",
|
|
message="File type is not supported for video playback",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
file_size = int(resolved_target.absolute.stat().st_size)
|
|
start = 0
|
|
end = max(file_size - 1, 0)
|
|
status_code = 200
|
|
headers = {"Accept-Ranges": "bytes"}
|
|
|
|
if range_header:
|
|
start, end = self._parse_range_header(range_header, file_size)
|
|
status_code = 206
|
|
headers["Content-Range"] = f"bytes {start}-{end}/{file_size}"
|
|
|
|
headers["Content-Length"] = str(max((end - start) + 1, 0))
|
|
|
|
return {
|
|
"status_code": status_code,
|
|
"headers": headers,
|
|
"content_type": content_type,
|
|
"content": self._filesystem.stream_file_range(resolved_target.absolute, start, end),
|
|
}
|
|
|
|
def prepare_thumbnail_stream(self, path: str) -> dict:
|
|
resolved_target = self._path_guard.resolve_existing_path(path)
|
|
|
|
if resolved_target.absolute.is_dir():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Source must be a file",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
if not resolved_target.absolute.is_file():
|
|
raise AppError(
|
|
code="type_conflict",
|
|
message="Unsupported path type for thumbnail",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
content_type = self._thumbnail_content_type_for(resolved_target.absolute)
|
|
if content_type is None:
|
|
raise AppError(
|
|
code="unsupported_type",
|
|
message="File type is not supported for thumbnail",
|
|
status_code=409,
|
|
details={"path": resolved_target.relative},
|
|
)
|
|
|
|
return {
|
|
"headers": {"Content-Length": str(int(resolved_target.absolute.stat().st_size))},
|
|
"content_type": content_type,
|
|
"content": self._filesystem.stream_file(resolved_target.absolute),
|
|
}
|
|
|
|
@staticmethod
|
|
def _join_relative(base: str, name: str) -> str:
|
|
return f"{base}/{name}" if base else name
|
|
|
|
@staticmethod
|
|
def _content_type_for(path: Path) -> str | None:
|
|
special_name = SPECIAL_TEXT_FILENAMES.get(path.name.lower())
|
|
if special_name:
|
|
return special_name
|
|
return TEXT_CONTENT_TYPES.get(path.suffix.lower())
|
|
|
|
@staticmethod
|
|
def _video_content_type_for(path: Path) -> str | None:
|
|
return VIDEO_CONTENT_TYPES.get(path.suffix.lower())
|
|
|
|
@staticmethod
|
|
def _thumbnail_content_type_for(path: Path) -> str | None:
|
|
return THUMBNAIL_CONTENT_TYPES.get(path.suffix.lower())
|
|
|
|
def _record_history(
|
|
self,
|
|
*,
|
|
operation: str,
|
|
status: str,
|
|
source: str | None = None,
|
|
destination: str | None = None,
|
|
path: str | None = None,
|
|
error_code: str | None = None,
|
|
error_message: str | None = None,
|
|
finished_at: str | None = None,
|
|
) -> None:
|
|
if not self._history_repository:
|
|
return
|
|
self._history_repository.create_entry(
|
|
operation=operation,
|
|
status=status,
|
|
source=source,
|
|
destination=destination,
|
|
path=path,
|
|
error_code=error_code,
|
|
error_message=error_message,
|
|
finished_at=finished_at,
|
|
)
|
|
|
|
def _record_history_error(
|
|
self,
|
|
*,
|
|
operation: str,
|
|
error: AppError,
|
|
source: str | None = None,
|
|
destination: str | None = None,
|
|
path: str | None = None,
|
|
) -> None:
|
|
self._record_history(
|
|
operation=operation,
|
|
status="failed",
|
|
source=source,
|
|
destination=destination,
|
|
path=path,
|
|
error_code=error.code,
|
|
error_message=error.message,
|
|
finished_at=self._now_iso(),
|
|
)
|
|
|
|
@staticmethod
|
|
def _now_iso() -> str:
|
|
from datetime import datetime, timezone
|
|
|
|
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
|
|
@staticmethod
|
|
def _parse_range_header(range_header: str, file_size: int) -> tuple[int, int]:
|
|
def invalid_range() -> AppError:
|
|
return AppError(
|
|
code="invalid_request",
|
|
message="Invalid Range header",
|
|
status_code=400,
|
|
)
|
|
|
|
if not range_header.startswith("bytes="):
|
|
raise invalid_range()
|
|
value = range_header[len("bytes="):].strip()
|
|
if "," in value or "-" not in value:
|
|
raise invalid_range()
|
|
start_text, end_text = value.split("-", 1)
|
|
if start_text == "" and end_text == "":
|
|
raise invalid_range()
|
|
|
|
try:
|
|
if start_text == "":
|
|
suffix_length = int(end_text)
|
|
if suffix_length <= 0:
|
|
raise invalid_range()
|
|
if suffix_length >= file_size:
|
|
return 0, max(file_size - 1, 0)
|
|
return file_size - suffix_length, file_size - 1
|
|
|
|
start = int(start_text)
|
|
if start < 0 or start >= file_size:
|
|
raise invalid_range()
|
|
|
|
if end_text == "":
|
|
return start, file_size - 1
|
|
|
|
end = int(end_text)
|
|
if end < start:
|
|
raise invalid_range()
|
|
return start, min(end, file_size - 1)
|
|
except ValueError as exc:
|
|
raise invalid_range() from exc
|