289 lines
11 KiB
Python
289 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
import shutil
|
|
import mimetypes
|
|
import struct
|
|
import grp
|
|
import pwd
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
|
|
class FilesystemAdapter:
|
|
def stat_info(self, path: Path) -> dict:
|
|
stat = path.stat()
|
|
owner = None
|
|
group = None
|
|
try:
|
|
owner = pwd.getpwuid(stat.st_uid).pw_name
|
|
except (KeyError, ImportError, AttributeError):
|
|
owner = None
|
|
try:
|
|
group = grp.getgrgid(stat.st_gid).gr_name
|
|
except (KeyError, ImportError, AttributeError):
|
|
group = None
|
|
|
|
content_type, _ = mimetypes.guess_type(path.name)
|
|
width, height = self._image_dimensions(path) if path.is_file() else (None, None)
|
|
return {
|
|
"name": path.name,
|
|
"size": int(stat.st_size) if path.is_file() else None,
|
|
"modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z"),
|
|
"owner": owner,
|
|
"group": group,
|
|
"content_type": content_type,
|
|
"extension": path.suffix.lower() or None,
|
|
"width": width,
|
|
"height": height,
|
|
}
|
|
|
|
def list_directory(self, directory: Path, show_hidden: bool) -> tuple[list[dict], list[dict]]:
|
|
directories: list[dict] = []
|
|
files: list[dict] = []
|
|
|
|
for entry in sorted(directory.iterdir(), key=lambda item: item.name.lower()):
|
|
if not show_hidden and entry.name.startswith("."):
|
|
continue
|
|
stat = entry.stat()
|
|
modified = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z")
|
|
if entry.is_dir():
|
|
directories.append({"name": entry.name, "modified": modified, "absolute": entry})
|
|
elif entry.is_file():
|
|
files.append(
|
|
{
|
|
"name": entry.name,
|
|
"size": int(stat.st_size),
|
|
"modified": modified,
|
|
"absolute": entry,
|
|
}
|
|
)
|
|
|
|
return directories, files
|
|
|
|
def search_names(self, directory: Path, query: str, limit: int) -> tuple[list[dict], bool]:
|
|
normalized_query = query.lower()
|
|
results: list[dict] = []
|
|
|
|
for root, dirnames, filenames in __import__("os").walk(directory):
|
|
dirnames[:] = sorted([name for name in dirnames if not name.startswith(".")], key=str.lower)
|
|
filenames = sorted([name for name in filenames if not name.startswith(".")], key=str.lower)
|
|
|
|
root_path = Path(root)
|
|
for name in dirnames:
|
|
if normalized_query in name.lower():
|
|
results.append({"name": name, "kind": "directory", "absolute": root_path / name})
|
|
if len(results) >= limit:
|
|
return results, True
|
|
|
|
for name in filenames:
|
|
if normalized_query in name.lower():
|
|
results.append({"name": name, "kind": "file", "absolute": root_path / name})
|
|
if len(results) >= limit:
|
|
return results, True
|
|
|
|
return results, False
|
|
|
|
def make_directory(self, path: Path) -> None:
|
|
path.mkdir(parents=False, exist_ok=False)
|
|
|
|
def rename_path(self, source: Path, destination: Path) -> None:
|
|
source.rename(destination)
|
|
|
|
def move_file(self, source: str, destination: str) -> None:
|
|
Path(source).rename(Path(destination))
|
|
|
|
def move_directory(self, source: str, destination: str) -> None:
|
|
Path(source).rename(Path(destination))
|
|
|
|
def is_directory_empty(self, path: Path) -> bool:
|
|
return not any(path.iterdir())
|
|
|
|
def delete_file(self, path: Path) -> None:
|
|
path.unlink()
|
|
|
|
def delete_empty_directory(self, path: Path) -> None:
|
|
path.rmdir()
|
|
|
|
def copy_file(self, source: str, destination: str, on_progress: callable | None = None) -> None:
|
|
src = Path(source)
|
|
dst = Path(destination)
|
|
with src.open("rb") as in_f, dst.open("xb") as out_f:
|
|
while True:
|
|
chunk = in_f.read(1024 * 1024)
|
|
if not chunk:
|
|
break
|
|
out_f.write(chunk)
|
|
if on_progress:
|
|
on_progress(out_f.tell())
|
|
shutil.copystat(src, dst, follow_symlinks=False)
|
|
|
|
def read_text_preview(self, path: Path, max_bytes: int, encoding: str = "utf-8") -> dict:
|
|
size = int(path.stat().st_size)
|
|
limit = max_bytes + 1
|
|
with path.open("rb") as in_f:
|
|
raw = in_f.read(limit)
|
|
modified = self.modified_iso(path)
|
|
truncated = size > max_bytes or len(raw) > max_bytes
|
|
if truncated:
|
|
raw = raw[:max_bytes]
|
|
return {
|
|
"size": size,
|
|
"modified": modified,
|
|
"truncated": truncated,
|
|
"content": raw.decode(encoding, errors="replace"),
|
|
}
|
|
|
|
def write_text_file(self, path: Path, content: str, encoding: str = "utf-8") -> dict:
|
|
path.write_text(content, encoding=encoding)
|
|
return {
|
|
"size": int(path.stat().st_size),
|
|
"modified": self.modified_iso(path),
|
|
}
|
|
|
|
def write_uploaded_file(self, path: Path, file_stream, chunk_size: int = 1024 * 1024) -> dict:
|
|
with path.open("xb") as handle:
|
|
while True:
|
|
chunk = file_stream.read(chunk_size)
|
|
if not chunk:
|
|
break
|
|
handle.write(chunk)
|
|
return {
|
|
"size": int(path.stat().st_size),
|
|
"modified": self.modified_iso(path),
|
|
}
|
|
|
|
async def stream_file_range(self, path: Path, start: int, end: int, chunk_size: int = 1024 * 1024):
|
|
with path.open("rb") as handle:
|
|
handle.seek(start)
|
|
remaining = (end - start) + 1
|
|
while remaining > 0:
|
|
chunk = handle.read(min(chunk_size, remaining))
|
|
if not chunk:
|
|
break
|
|
remaining -= len(chunk)
|
|
yield chunk
|
|
|
|
async def stream_file(self, path: Path, chunk_size: int = 1024 * 1024):
|
|
with path.open("rb") as handle:
|
|
while True:
|
|
chunk = handle.read(chunk_size)
|
|
if not chunk:
|
|
break
|
|
yield chunk
|
|
|
|
@staticmethod
|
|
def modified_iso(path: Path) -> str:
|
|
stat = path.stat()
|
|
return datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z")
|
|
|
|
def _image_dimensions(self, path: Path) -> tuple[int | None, int | None]:
|
|
suffix = path.suffix.lower()
|
|
try:
|
|
if suffix == ".png":
|
|
return self._png_dimensions(path)
|
|
if suffix in {".jpg", ".jpeg"}:
|
|
return self._jpeg_dimensions(path)
|
|
if suffix == ".gif":
|
|
return self._gif_dimensions(path)
|
|
if suffix == ".bmp":
|
|
return self._bmp_dimensions(path)
|
|
if suffix == ".webp":
|
|
return self._webp_dimensions(path)
|
|
if suffix == ".avif":
|
|
return self._avif_dimensions(path)
|
|
except (OSError, ValueError, struct.error):
|
|
return None, None
|
|
return None, None
|
|
|
|
@staticmethod
|
|
def _png_dimensions(path: Path) -> tuple[int | None, int | None]:
|
|
with path.open("rb") as handle:
|
|
header = handle.read(24)
|
|
if len(header) < 24 or header[:8] != b"\x89PNG\r\n\x1a\n":
|
|
return None, None
|
|
return struct.unpack(">II", header[16:24])
|
|
|
|
@staticmethod
|
|
def _jpeg_dimensions(path: Path) -> tuple[int | None, int | None]:
|
|
with path.open("rb") as handle:
|
|
if handle.read(2) != b"\xff\xd8":
|
|
return None, None
|
|
while True:
|
|
marker_prefix = handle.read(1)
|
|
if not marker_prefix:
|
|
return None, None
|
|
if marker_prefix != b"\xff":
|
|
continue
|
|
marker = handle.read(1)
|
|
while marker == b"\xff":
|
|
marker = handle.read(1)
|
|
if not marker or marker in {b"\xd8", b"\xd9"}:
|
|
return None, None
|
|
segment_length_bytes = handle.read(2)
|
|
if len(segment_length_bytes) != 2:
|
|
return None, None
|
|
segment_length = struct.unpack(">H", segment_length_bytes)[0]
|
|
if segment_length < 2:
|
|
return None, None
|
|
if marker in {b"\xc0", b"\xc1", b"\xc2", b"\xc3", b"\xc5", b"\xc6", b"\xc7", b"\xc9", b"\xca", b"\xcb", b"\xcd", b"\xce", b"\xcf"}:
|
|
payload = handle.read(5)
|
|
if len(payload) != 5:
|
|
return None, None
|
|
height, width = struct.unpack(">HH", payload[1:5])
|
|
return width, height
|
|
handle.seek(segment_length - 2, 1)
|
|
|
|
@staticmethod
|
|
def _gif_dimensions(path: Path) -> tuple[int | None, int | None]:
|
|
with path.open("rb") as handle:
|
|
header = handle.read(10)
|
|
if len(header) < 10 or header[:6] not in {b"GIF87a", b"GIF89a"}:
|
|
return None, None
|
|
width, height = struct.unpack("<HH", header[6:10])
|
|
return width, height
|
|
|
|
@staticmethod
|
|
def _bmp_dimensions(path: Path) -> tuple[int | None, int | None]:
|
|
with path.open("rb") as handle:
|
|
header = handle.read(26)
|
|
if len(header) < 26 or header[:2] != b"BM":
|
|
return None, None
|
|
width, height = struct.unpack("<ii", header[18:26])
|
|
return abs(width), abs(height)
|
|
|
|
@staticmethod
|
|
def _webp_dimensions(path: Path) -> tuple[int | None, int | None]:
|
|
with path.open("rb") as handle:
|
|
header = handle.read(64)
|
|
if len(header) < 30 or header[:4] != b"RIFF" or header[8:12] != b"WEBP":
|
|
return None, None
|
|
chunk = header[12:16]
|
|
if chunk == b"VP8 " and len(header) >= 30:
|
|
width, height = struct.unpack("<HH", header[26:30])
|
|
return width & 0x3FFF, height & 0x3FFF
|
|
if chunk == b"VP8L" and len(header) >= 25:
|
|
bits = struct.unpack("<I", header[21:25])[0]
|
|
width = (bits & 0x3FFF) + 1
|
|
height = ((bits >> 14) & 0x3FFF) + 1
|
|
return width, height
|
|
if chunk == b"VP8X" and len(header) >= 30:
|
|
width = 1 + int.from_bytes(header[24:27], "little")
|
|
height = 1 + int.from_bytes(header[27:30], "little")
|
|
return width, height
|
|
return None, None
|
|
|
|
@staticmethod
|
|
def _avif_dimensions(path: Path) -> tuple[int | None, int | None]:
|
|
with path.open("rb") as handle:
|
|
data = handle.read(256 * 1024)
|
|
if b"ftypavif" not in data and b"ftypavis" not in data:
|
|
return None, None
|
|
index = data.find(b"ispe")
|
|
if index == -1 or index + 20 > len(data):
|
|
return None, None
|
|
width = int.from_bytes(data[index + 12:index + 16], "big")
|
|
height = int.from_bytes(data[index + 16:index + 20], "big")
|
|
if width <= 0 or height <= 0:
|
|
return None, None
|
|
return width, height
|