from __future__ import annotations import shutil import mimetypes import struct import grp import pwd from datetime import datetime, timezone from pathlib import Path class FilesystemAdapter: def stat_info(self, path: Path) -> dict: stat = path.stat() owner = None group = None try: owner = pwd.getpwuid(stat.st_uid).pw_name except (KeyError, ImportError, AttributeError): owner = None try: group = grp.getgrgid(stat.st_gid).gr_name except (KeyError, ImportError, AttributeError): group = None content_type, _ = mimetypes.guess_type(path.name) width, height = self._image_dimensions(path) if path.is_file() else (None, None) return { "name": path.name, "size": int(stat.st_size) if path.is_file() else None, "modified": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z"), "owner": owner, "group": group, "content_type": content_type, "extension": path.suffix.lower() or None, "width": width, "height": height, } def list_directory(self, directory: Path, show_hidden: bool) -> tuple[list[dict], list[dict]]: directories: list[dict] = [] files: list[dict] = [] for entry in sorted(directory.iterdir(), key=lambda item: item.name.lower()): if not show_hidden and entry.name.startswith("."): continue stat = entry.stat() modified = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z") if entry.is_dir(): directories.append({"name": entry.name, "modified": modified, "absolute": entry}) elif entry.is_file(): files.append( { "name": entry.name, "size": int(stat.st_size), "modified": modified, "absolute": entry, } ) return directories, files def search_names(self, directory: Path, query: str, limit: int) -> tuple[list[dict], bool]: normalized_query = query.lower() results: list[dict] = [] for root, dirnames, filenames in __import__("os").walk(directory): dirnames[:] = sorted([name for name in dirnames if not name.startswith(".")], key=str.lower) filenames = sorted([name for name in filenames if not name.startswith(".")], key=str.lower) root_path = Path(root) for name in dirnames: if normalized_query in name.lower(): results.append({"name": name, "kind": "directory", "absolute": root_path / name}) if len(results) >= limit: return results, True for name in filenames: if normalized_query in name.lower(): results.append({"name": name, "kind": "file", "absolute": root_path / name}) if len(results) >= limit: return results, True return results, False def make_directory(self, path: Path) -> None: path.mkdir(parents=False, exist_ok=False) def rename_path(self, source: Path, destination: Path) -> None: source.rename(destination) def move_file(self, source: str, destination: str) -> None: Path(source).rename(Path(destination)) def move_directory(self, source: str, destination: str) -> None: Path(source).rename(Path(destination)) def is_directory_empty(self, path: Path) -> bool: return not any(path.iterdir()) def delete_file(self, path: Path) -> None: path.unlink() def delete_empty_directory(self, path: Path) -> None: path.rmdir() def delete_directory_recursive(self, path: Path) -> None: shutil.rmtree(path) def copy_file(self, source: str, destination: str, on_progress: callable | None = None) -> None: src = Path(source) dst = Path(destination) with src.open("rb") as in_f, dst.open("xb") as out_f: while True: chunk = in_f.read(1024 * 1024) if not chunk: break out_f.write(chunk) if on_progress: on_progress(out_f.tell()) shutil.copystat(src, dst, follow_symlinks=False) def copy_directory(self, source: str, destination: str) -> None: shutil.copytree(source, destination, symlinks=True, copy_function=shutil.copy2) def read_text_preview(self, path: Path, max_bytes: int, encoding: str = "utf-8") -> dict: size = int(path.stat().st_size) limit = max_bytes + 1 with path.open("rb") as in_f: raw = in_f.read(limit) modified = self.modified_iso(path) truncated = size > max_bytes or len(raw) > max_bytes if truncated: raw = raw[:max_bytes] return { "size": size, "modified": modified, "truncated": truncated, "content": raw.decode(encoding, errors="replace"), } def write_text_file(self, path: Path, content: str, encoding: str = "utf-8") -> dict: path.write_text(content, encoding=encoding) return { "size": int(path.stat().st_size), "modified": self.modified_iso(path), } def write_uploaded_file(self, path: Path, file_stream, chunk_size: int = 1024 * 1024, overwrite: bool = False) -> dict: mode = "wb" if overwrite else "xb" with path.open(mode) as handle: while True: chunk = file_stream.read(chunk_size) if not chunk: break handle.write(chunk) return { "size": int(path.stat().st_size), "modified": self.modified_iso(path), } async def stream_file_range(self, path: Path, start: int, end: int, chunk_size: int = 1024 * 1024): with path.open("rb") as handle: handle.seek(start) remaining = (end - start) + 1 while remaining > 0: chunk = handle.read(min(chunk_size, remaining)) if not chunk: break remaining -= len(chunk) yield chunk async def stream_file(self, path: Path, chunk_size: int = 1024 * 1024): with path.open("rb") as handle: while True: chunk = handle.read(chunk_size) if not chunk: break yield chunk @staticmethod def modified_iso(path: Path) -> str: stat = path.stat() return datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc).isoformat().replace("+00:00", "Z") def _image_dimensions(self, path: Path) -> tuple[int | None, int | None]: suffix = path.suffix.lower() try: if suffix == ".png": return self._png_dimensions(path) if suffix in {".jpg", ".jpeg"}: return self._jpeg_dimensions(path) if suffix == ".gif": return self._gif_dimensions(path) if suffix == ".bmp": return self._bmp_dimensions(path) if suffix == ".webp": return self._webp_dimensions(path) if suffix == ".avif": return self._avif_dimensions(path) except (OSError, ValueError, struct.error): return None, None return None, None @staticmethod def _png_dimensions(path: Path) -> tuple[int | None, int | None]: with path.open("rb") as handle: header = handle.read(24) if len(header) < 24 or header[:8] != b"\x89PNG\r\n\x1a\n": return None, None return struct.unpack(">II", header[16:24]) @staticmethod def _jpeg_dimensions(path: Path) -> tuple[int | None, int | None]: with path.open("rb") as handle: if handle.read(2) != b"\xff\xd8": return None, None while True: marker_prefix = handle.read(1) if not marker_prefix: return None, None if marker_prefix != b"\xff": continue marker = handle.read(1) while marker == b"\xff": marker = handle.read(1) if not marker or marker in {b"\xd8", b"\xd9"}: return None, None segment_length_bytes = handle.read(2) if len(segment_length_bytes) != 2: return None, None segment_length = struct.unpack(">H", segment_length_bytes)[0] if segment_length < 2: return None, None if marker in {b"\xc0", b"\xc1", b"\xc2", b"\xc3", b"\xc5", b"\xc6", b"\xc7", b"\xc9", b"\xca", b"\xcb", b"\xcd", b"\xce", b"\xcf"}: payload = handle.read(5) if len(payload) != 5: return None, None height, width = struct.unpack(">HH", payload[1:5]) return width, height handle.seek(segment_length - 2, 1) @staticmethod def _gif_dimensions(path: Path) -> tuple[int | None, int | None]: with path.open("rb") as handle: header = handle.read(10) if len(header) < 10 or header[:6] not in {b"GIF87a", b"GIF89a"}: return None, None width, height = struct.unpack(" tuple[int | None, int | None]: with path.open("rb") as handle: header = handle.read(26) if len(header) < 26 or header[:2] != b"BM": return None, None width, height = struct.unpack(" tuple[int | None, int | None]: with path.open("rb") as handle: header = handle.read(64) if len(header) < 30 or header[:4] != b"RIFF" or header[8:12] != b"WEBP": return None, None chunk = header[12:16] if chunk == b"VP8 " and len(header) >= 30: width, height = struct.unpack("= 25: bits = struct.unpack("> 14) & 0x3FFF) + 1 return width, height if chunk == b"VP8X" and len(header) >= 30: width = 1 + int.from_bytes(header[24:27], "little") height = 1 + int.from_bytes(header[27:30], "little") return width, height return None, None @staticmethod def _avif_dimensions(path: Path) -> tuple[int | None, int | None]: with path.open("rb") as handle: data = handle.read(256 * 1024) if b"ftypavif" not in data and b"ftypavis" not in data: return None, None index = data.find(b"ispe") if index == -1 or index + 20 > len(data): return None, None width = int.from_bytes(data[index + 12:index + 16], "big") height = int.from_bytes(data[index + 16:index + 20], "big") if width <= 0 or height <= 0: return None, None return width, height