fix: 路径遍历防护 - safe_paper_id/safe_pdf_path 校验

This commit is contained in:
2026-06-03 09:37:41 +00:00
parent 3ff8e35751
commit bac03f8880

View File

@@ -26,6 +26,24 @@ API_KEY = os.environ.get("LLM_LIB_API_KEY", "change-me")
log = logging.getLogger("llm-library") log = logging.getLogger("llm-library")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
# ─── Security helpers ──────────────────────────────────
import re
SAFE_ID = re.compile(r'^[a-zA-Z0-9_.\-]+$')
def safe_paper_id(raw: str) -> str:
"""Validate paper ID, reject path traversal attempts."""
if not raw or len(raw) > 128 or not SAFE_ID.match(raw) or '..' in raw:
raise HTTPException(status_code=400, detail="Invalid paper ID")
return raw
def safe_pdf_path(base: Path, paper_id: str) -> Path:
"""Build safe PDF path, ensuring it stays within base directory."""
pid = safe_paper_id(paper_id)
fp = (base / f"{pid}.pdf").resolve()
if not str(fp).startswith(str(base.resolve())):
raise HTTPException(status_code=400, detail="Invalid path")
return fp
# ─── App ─────────────────────────────────────────────── # ─── App ───────────────────────────────────────────────
app = FastAPI( app = FastAPI(
title="LLM 论文图书馆", title="LLM 论文图书馆",
@@ -280,7 +298,7 @@ def delete_paper(
@app.get("/papers/arxiv/{arxiv_id}") @app.get("/papers/arxiv/{arxiv_id}")
def serve_arxiv_pdf(arxiv_id: str): def serve_arxiv_pdf(arxiv_id: str):
"""从本地缓存提供 arXiv PDF无 .pdf 后缀路由防 IDM 拦截)""" """从本地缓存提供 arXiv PDF无 .pdf 后缀路由防 IDM 拦截)"""
pdf_path = PAPERS_DIR / "arxiv" / f"{arxiv_id}.pdf" pdf_path = safe_pdf_path(PAPERS_DIR / "arxiv", arxiv_id)
if not pdf_path.exists(): if not pdf_path.exists():
raise HTTPException(status_code=404, detail=f"PDF not in local cache: {arxiv_id}") raise HTTPException(status_code=404, detail=f"PDF not in local cache: {arxiv_id}")
return FileResponse( return FileResponse(
@@ -448,8 +466,8 @@ def translate_status(arxiv_id: str):
@app.post("/api/download/{arxiv_id}") @app.post("/api/download/{arxiv_id}")
def download_single_pdf(arxiv_id: str, _=Depends(verify_api_key)): def download_single_pdf(arxiv_id: str, _=Depends(verify_api_key)):
"""按需下载单篇 arXiv PDF""" """按需下载单篇 arXiv PDF"""
import subprocess, sys safe_paper_id(arxiv_id)
pdf_path = PAPERS_DIR / "arxiv" / f"{arxiv_id}.pdf" pdf_path = safe_pdf_path(PAPERS_DIR / "arxiv", arxiv_id)
if pdf_path.exists(): if pdf_path.exists():
return {"ok": True, "arxiv_id": arxiv_id, "status": "cached"} return {"ok": True, "arxiv_id": arxiv_id, "status": "cached"}
@@ -470,13 +488,16 @@ TRANSLATED_DIR = PAPERS_DIR / "translated"
@app.get("/api/translated/{arxiv_id}") @app.get("/api/translated/{arxiv_id}")
def check_translation(arxiv_id: str): def check_translation(arxiv_id: str):
"""Check if translation exists for a paper""" """Check if translation exists for a paper"""
fn = f"{arxiv_id}.pdf" try:
return {"arxiv_id": arxiv_id, "exists": (TRANSLATED_DIR / fn).exists()} fp = safe_pdf_path(TRANSLATED_DIR, arxiv_id)
return {"arxiv_id": arxiv_id, "exists": fp.exists()}
except HTTPException:
return {"arxiv_id": arxiv_id, "exists": False}
@app.get("/papers/translated/{arxiv_id}.pdf") @app.get("/papers/translated/{arxiv_id}.pdf")
def serve_translated(arxiv_id: str): def serve_translated(arxiv_id: str):
"""Serve translated PDF from cache""" """Serve translated PDF from cache"""
fp = TRANSLATED_DIR / f"{arxiv_id}.pdf" fp = safe_pdf_path(TRANSLATED_DIR, arxiv_id)
if not fp.exists(): if not fp.exists():
raise HTTPException(status_code=404, detail="Translation not found") raise HTTPException(status_code=404, detail="Translation not found")
return FileResponse(fp, media_type="application/pdf", return FileResponse(fp, media_type="application/pdf",
@@ -497,25 +518,26 @@ def translation_status():
async def trigger_translation(paper_id: str, _=Depends(verify_api_key)): async def trigger_translation(paper_id: str, _=Depends(verify_api_key)):
"""Trigger pdf2zh translation for a paper (DeepSeek V4 Flash). """Trigger pdf2zh translation for a paper (DeepSeek V4 Flash).
paper_id can be arxiv ID (e.g. 1706.03762) or a HF filename.""" paper_id can be arxiv ID (e.g. 1706.03762) or a HF filename."""
safe_paper_id(paper_id) # validate
# Find the PDF # Find the PDF
pdf_path = None pdf_path = None
# Try arxiv # Try arxiv
candidate = PAPERS_DIR / "arxiv" / f"{paper_id}.pdf" candidate = safe_pdf_path(PAPERS_DIR / "arxiv", paper_id)
if candidate.exists(): if candidate.exists():
pdf_path = candidate pdf_path = candidate
else: else:
# Try HF papers directory # Try HF papers directory
hf_dir = PAPERS_DIR / "hf" hf_dir = (PAPERS_DIR / "hf").resolve()
if hf_dir.exists(): if hf_dir.exists():
for f in hf_dir.glob("*.pdf"): for f in hf_dir.glob("*.pdf"):
if paper_id in f.stem: if paper_id in f.stem and str(f.resolve()).startswith(str(hf_dir)):
pdf_path = f pdf_path = f
break break
if not pdf_path: if not pdf_path:
raise HTTPException(status_code=404, detail=f"PDF not found for {paper_id}") raise HTTPException(status_code=404, detail=f"PDF not found for {paper_id}")
out_path = TRANSLATED_DIR / f"{paper_id}.pdf" out_path = safe_pdf_path(TRANSLATED_DIR, paper_id)
if out_path.exists(): if out_path.exists():
return {"paper_id": paper_id, "status": "already_translated"} return {"paper_id": paper_id, "status": "already_translated"}