Files
podman-mvp/control/app_files.py
T
kodi bacab3b20a fix (security): sluit path traversal in legacy /workloads/ endpoints
Drie endpoints gebruikten os.path.join zonder validatie, waardoor een
aanvaller buiten WORKLOADS_DIR kon lezen/schrijven. Vervangen door de
bestaande _files_safe_join() helper die al door alle /files/ endpoints
werd gebruikt.

Endpoints: /workloads/read/, /workloads/save-file, /workloads/deploy/

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-22 09:52:27 +01:00

169 lines
6.0 KiB
Python

import os
from pathlib import Path
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
class FileContent(BaseModel):
content: str
def safe_join(base, path):
# prevent traversal
base = os.path.abspath(base)
final = os.path.abspath(os.path.join(base, path))
if not final.startswith(base):
raise HTTPException(status_code=403, detail="Forbidden path")
return final
def init_files_router(session, podman_api_base: str, workloads_dir: str) -> APIRouter:
router = APIRouter(tags=["files"])
def _podman_post(url: str, **kwargs):
# Keep behavior identical to app.py wrapper used by old /workloads/deploy.
return session.post(url, **kwargs)
# STEP 4: Centralize WORKLOADS_DIR subtree enforcement via one helper.
# MUST be behavior-identical to previous safe_join(WORKLOADS_DIR, ...) calls.
def _files_safe_join(path: str) -> str:
return safe_join(workloads_dir, path)
# --- WORKLOADS ---
@router.get("/workloads")
def list_workloads():
workloads = []
for root, _, files in os.walk(workloads_dir):
for f in files:
if f.endswith((".yaml", ".yml", ".json")):
full = os.path.join(root, f)
rel = os.path.relpath(full, workloads_dir)
workloads.append(rel)
return {"workloads": workloads}
@router.get("/workloads/read/{filename:path}")
def read_workload(filename: str):
path = _files_safe_join(filename)
if not os.path.exists(path):
raise HTTPException(404)
with open(path, 'r') as f:
content = f.read()
return {"filename": filename, "content": content}
@router.post("/workloads/save-file")
def save_workload_file(data: dict):
path = data.get("path")
content = data.get("content")
full_path = _files_safe_join(path)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, "w") as f:
f.write(content)
return {"status": "success"}
@router.post("/workloads/deploy/{filename:path}")
def deploy_workload(filename: str):
path = _files_safe_join(filename)
with open(path, 'r') as f:
yaml_content = f.read()
url = f"{podman_api_base}/libpod/kube/play"
return _podman_post(url, data=yaml_content).json()
# --- FILES API ---
@router.get("/files/tree")
def file_tree():
root = workloads_dir
result = []
for dirpath, dirnames, filenames in os.walk(root):
rel = os.path.relpath(dirpath, root)
if rel == ".":
rel = ""
result.append({
"path": rel,
"dirs": sorted(dirnames),
"files": sorted(filenames),
})
return result
@router.get("/files/read")
def file_read(path: str = Query(...)):
full = _files_safe_join(path)
if not os.path.exists(full):
raise HTTPException(status_code=404, detail="Not found")
if os.path.isdir(full):
raise HTTPException(status_code=403, detail="Is a directory")
with open(full, "r") as f:
content = f.read()
return {"content": content}
@router.post("/files/save")
def file_save(path: str = Query(...), data: FileContent = None):
full = _files_safe_join(path)
os.makedirs(os.path.dirname(full), exist_ok=True)
with open(full, "w") as f:
f.write(data.content)
return {"status": "success", "path": path}
@router.delete("/files/delete")
def file_delete(path: str = Query(...)):
full = _files_safe_join(path)
if not os.path.exists(full):
raise HTTPException(status_code=404, detail="Not found")
if os.path.isdir(full):
raise HTTPException(status_code=400, detail="Kan niet verwijderen: is directory")
try:
os.remove(full)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Kan niet verwijderen: {e}")
return {"status": "deleted", "type": "file"}
@router.post("/files/mkdir")
def file_mkdir(path: str = Query(...)):
# UI expects operations under systemd/; enforce prefix if absent.
if not path.startswith("systemd"):
path = os.path.join("systemd", path)
full = _files_safe_join(path)
os.makedirs(full, exist_ok=True)
return {"status": "directory created", "path": path}
@router.delete("/files/rmdir")
def file_rmdir(path: str = Query(..., description="Directory path under systemd/")):
# Only allow deletion under systemd subtree
if not path or path == "systemd" or path == "systemd/":
raise HTTPException(status_code=400, detail="Refusing to delete systemd root")
if not path.startswith("systemd/") and path != "systemd":
raise HTTPException(status_code=400, detail="Only systemd subtree is allowed")
full = _files_safe_join(path)
if not os.path.exists(full):
raise HTTPException(status_code=404, detail="Directory not found")
if not os.path.isdir(full):
raise HTTPException(status_code=400, detail="Path is not a directory")
# directory must be empty
try:
Path(full).rmdir()
except OSError:
# not empty
# build a stable detail payload
try:
dirs = []
files = []
for entry in os.listdir(full):
p = os.path.join(full, entry)
if os.path.isdir(p):
dirs.append(entry)
else:
files.append(entry)
except Exception:
dirs, files = [], []
raise HTTPException(status_code=409, detail={
"error": "directory not empty",
"dirs": sorted(dirs),
"files": sorted(files),
})
return {"deleted": True, "path": path}
return router