Bring the local project into the remote repository and reduce generated image object suffixes to six characters for shorter URLs. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
313 lines
10 KiB
Python
313 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
import argparse
|
|
import datetime as dt
|
|
import mimetypes
|
|
import os
|
|
import posixpath
|
|
import re
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
import uuid
|
|
import zipfile
|
|
from pathlib import Path
|
|
from urllib.parse import urlparse
|
|
|
|
import requests
|
|
from dotenv import load_dotenv
|
|
|
|
MINERU_BASE_URL = "https://mineru.net"
|
|
IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp", ".tif", ".tiff", ".svg"}
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Convert a PDF academic paper to pure Markdown with hosted images via MinerU."
|
|
)
|
|
parser.add_argument("pdf", help="Path to the input PDF file")
|
|
parser.add_argument(
|
|
"-o",
|
|
"--output",
|
|
help="Path to the output Markdown file. Defaults to <pdf_stem>.md next to the PDF.",
|
|
)
|
|
parser.add_argument(
|
|
"--language",
|
|
default="en",
|
|
help="Document language hint for MinerU, default: en",
|
|
)
|
|
parser.add_argument(
|
|
"--ocr",
|
|
action="store_true",
|
|
help="Enable OCR mode for scanned PDFs",
|
|
)
|
|
parser.add_argument(
|
|
"--disable-table",
|
|
action="store_true",
|
|
help="Disable table extraction",
|
|
)
|
|
parser.add_argument(
|
|
"--disable-formula",
|
|
action="store_true",
|
|
help="Disable formula extraction",
|
|
)
|
|
parser.add_argument(
|
|
"--poll-interval",
|
|
type=float,
|
|
default=float(os.getenv("POLL_INTERVAL_SECONDS", "5")),
|
|
help="Polling interval in seconds",
|
|
)
|
|
parser.add_argument(
|
|
"--timeout",
|
|
type=int,
|
|
default=int(os.getenv("TIMEOUT_SECONDS", "1800")),
|
|
help="Overall timeout in seconds",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def require_env(name: str) -> str:
|
|
value = os.getenv(name, "").strip()
|
|
if not value:
|
|
raise SystemExit(f"Missing required environment variable: {name}")
|
|
return value
|
|
|
|
|
|
def mineru_headers(token: str) -> dict[str, str]:
|
|
return {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
|
|
def submit_pdf(pdf_path: Path, token: str, language: str, is_ocr: bool, enable_table: bool, enable_formula: bool) -> tuple[str, str]:
|
|
data_id = uuid.uuid4().hex
|
|
payload = {
|
|
"files": [
|
|
{
|
|
"name": pdf_path.name,
|
|
"data_id": data_id,
|
|
"is_ocr": is_ocr,
|
|
}
|
|
],
|
|
"model_version": "vlm",
|
|
"enable_table": enable_table,
|
|
"enable_formula": enable_formula,
|
|
"language": language,
|
|
}
|
|
response = requests.post(
|
|
f"{MINERU_BASE_URL}/api/v4/file-urls/batch",
|
|
headers=mineru_headers(token),
|
|
json=payload,
|
|
timeout=60,
|
|
)
|
|
response.raise_for_status()
|
|
body = response.json()
|
|
if body.get("code") != 0:
|
|
raise RuntimeError(f"MinerU submit failed: {body}")
|
|
data = body["data"]
|
|
file_urls = data.get("file_urls") or []
|
|
if len(file_urls) != 1:
|
|
raise RuntimeError(f"Unexpected MinerU file_urls response: {body}")
|
|
return data["batch_id"], file_urls[0]
|
|
|
|
|
|
def upload_pdf_to_signed_url(pdf_path: Path, signed_url: str) -> None:
|
|
with pdf_path.open("rb") as f:
|
|
response = requests.put(signed_url, data=f, timeout=300)
|
|
response.raise_for_status()
|
|
|
|
|
|
def wait_for_result(batch_id: str, token: str, timeout_seconds: int, poll_interval: float) -> str:
|
|
deadline = time.monotonic() + timeout_seconds
|
|
last_state = None
|
|
while time.monotonic() < deadline:
|
|
response = requests.get(
|
|
f"{MINERU_BASE_URL}/api/v4/extract-results/batch/{batch_id}",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
timeout=60,
|
|
)
|
|
response.raise_for_status()
|
|
body = response.json()
|
|
if body.get("code") != 0:
|
|
raise RuntimeError(f"MinerU polling failed: {body}")
|
|
|
|
results = body.get("data", {}).get("extract_result") or []
|
|
if len(results) != 1:
|
|
raise RuntimeError(f"Unexpected MinerU extract_result response: {body}")
|
|
|
|
result = results[0]
|
|
state = result.get("state")
|
|
if state != last_state:
|
|
print(f"MinerU state: {state}", file=sys.stderr)
|
|
last_state = state
|
|
|
|
if state == "done":
|
|
full_zip_url = result.get("full_zip_url")
|
|
if not full_zip_url:
|
|
raise RuntimeError(f"MinerU returned done without full_zip_url: {body}")
|
|
return full_zip_url
|
|
if state == "failed":
|
|
raise RuntimeError(f"MinerU extraction failed: {result.get('err_msg') or body}")
|
|
|
|
time.sleep(poll_interval)
|
|
|
|
raise TimeoutError(f"Timed out waiting for MinerU batch {batch_id}")
|
|
|
|
|
|
def download_zip(full_zip_url: str, target_dir: Path) -> Path:
|
|
zip_path = target_dir / "result.zip"
|
|
with requests.get(full_zip_url, stream=True, timeout=300) as response:
|
|
response.raise_for_status()
|
|
with zip_path.open("wb") as f:
|
|
for chunk in response.iter_content(chunk_size=1024 * 1024):
|
|
if chunk:
|
|
f.write(chunk)
|
|
return zip_path
|
|
|
|
|
|
def extract_zip(zip_path: Path, target_dir: Path) -> Path:
|
|
with zipfile.ZipFile(zip_path) as zf:
|
|
zf.extractall(target_dir)
|
|
return target_dir
|
|
|
|
|
|
def find_full_markdown(extracted_dir: Path) -> Path:
|
|
matches = list(extracted_dir.rglob("full.md"))
|
|
if not matches:
|
|
raise FileNotFoundError("full.md was not found in the MinerU zip output")
|
|
return matches[0]
|
|
|
|
|
|
def infer_content_type(path: Path) -> str:
|
|
content_type, _ = mimetypes.guess_type(path.name)
|
|
return content_type or "application/octet-stream"
|
|
|
|
|
|
def build_r2_url(base_url: str, prefix: str, folder_name: str, object_name: str) -> str:
|
|
base = base_url.rstrip("/")
|
|
parts = [segment for segment in [prefix.strip("/"), folder_name.strip("/"), object_name] if segment]
|
|
return f"{base}/{posixpath.join(*parts)}"
|
|
|
|
|
|
def slugify_paper_name(pdf_stem: str, limit: int = 48) -> str:
|
|
slug = re.sub(r"[^a-z0-9]+", "-", pdf_stem.lower()).strip("-")
|
|
if not slug:
|
|
slug = "paper"
|
|
return slug[:limit].rstrip("-") or "paper"
|
|
|
|
|
|
def unique_object_name(local_path: Path) -> str:
|
|
stem = re.sub(r"[^a-zA-Z0-9._-]+", "-", local_path.stem).strip("-._") or "image"
|
|
suffix = local_path.suffix.lower() or ".bin"
|
|
return f"{stem}-{uuid.uuid4().hex[:6]}{suffix}"
|
|
|
|
|
|
def upload_image_to_r2(local_path: Path, target_url: str, bearer_token: str) -> str:
|
|
headers = {
|
|
"Authorization": f"Bearer {bearer_token}",
|
|
"Content-Type": infer_content_type(local_path),
|
|
}
|
|
with local_path.open("rb") as f:
|
|
response = requests.put(target_url, headers=headers, data=f, timeout=300)
|
|
response.raise_for_status()
|
|
return target_url
|
|
|
|
|
|
def normalize_public_url(uploaded_url: str, public_base_url: str | None) -> str:
|
|
if not public_base_url:
|
|
return uploaded_url
|
|
path = urlparse(uploaded_url).path.lstrip("/")
|
|
return f"{public_base_url.rstrip('/')}/{path}"
|
|
|
|
|
|
def upload_images_and_rewrite(markdown_text: str, asset_root: Path, pdf_stem: str) -> str:
|
|
r2_base_url = require_env("R2_BASE_URL")
|
|
r2_bearer_token = require_env("R2_BEARER_TOKEN")
|
|
r2_prefix = os.getenv("R2_PREFIX", "").strip()
|
|
r2_public_base_url = os.getenv("R2_PUBLIC_BASE_URL", "").strip() or None
|
|
date_prefix = dt.date.today().strftime("%Y/%m/%d")
|
|
paper_folder = slugify_paper_name(pdf_stem)
|
|
folder_name = posixpath.join(date_prefix, paper_folder)
|
|
|
|
uploaded: dict[str, str] = {}
|
|
|
|
def replace(match: re.Match[str]) -> str:
|
|
alt_text = match.group(1)
|
|
original_path = match.group(2).strip()
|
|
if original_path.startswith(("http://", "https://", "data:")):
|
|
return match.group(0)
|
|
|
|
if original_path in uploaded:
|
|
hosted_url = uploaded[original_path]
|
|
else:
|
|
local_path = (asset_root / original_path).resolve()
|
|
if not local_path.exists() or not local_path.is_file():
|
|
return match.group(0)
|
|
if local_path.suffix.lower() not in IMAGE_EXTENSIONS:
|
|
return match.group(0)
|
|
object_name = unique_object_name(local_path)
|
|
upload_url = build_r2_url(r2_base_url, r2_prefix, folder_name, object_name)
|
|
uploaded_url = upload_image_to_r2(local_path, upload_url, r2_bearer_token)
|
|
hosted_url = normalize_public_url(uploaded_url, r2_public_base_url)
|
|
uploaded[original_path] = hosted_url
|
|
return f""
|
|
|
|
return re.sub(r"!\[([^\]]*)\]\(([^)]+)\)", replace, markdown_text)
|
|
|
|
|
|
def main() -> int:
|
|
load_dotenv()
|
|
args = parse_args()
|
|
token = require_env("MINERU_API_TOKEN")
|
|
|
|
pdf_path = Path(args.pdf).expanduser().resolve()
|
|
if not pdf_path.exists() or not pdf_path.is_file():
|
|
raise SystemExit(f"PDF not found: {pdf_path}")
|
|
|
|
output_path = Path(args.output).expanduser().resolve() if args.output else pdf_path.with_suffix(".md")
|
|
|
|
enable_table = not args.disable_table
|
|
enable_formula = not args.disable_formula
|
|
|
|
print("Submitting PDF to MinerU...", file=sys.stderr)
|
|
batch_id, signed_upload_url = submit_pdf(
|
|
pdf_path=pdf_path,
|
|
token=token,
|
|
language=args.language,
|
|
is_ocr=args.ocr,
|
|
enable_table=enable_table,
|
|
enable_formula=enable_formula,
|
|
)
|
|
|
|
print("Uploading PDF...", file=sys.stderr)
|
|
upload_pdf_to_signed_url(pdf_path, signed_upload_url)
|
|
|
|
print("Waiting for extraction result...", file=sys.stderr)
|
|
full_zip_url = wait_for_result(
|
|
batch_id=batch_id,
|
|
token=token,
|
|
timeout_seconds=args.timeout,
|
|
poll_interval=args.poll_interval,
|
|
)
|
|
|
|
with tempfile.TemporaryDirectory(prefix="mineru-") as temp_dir:
|
|
temp_path = Path(temp_dir)
|
|
zip_path = download_zip(full_zip_url, temp_path)
|
|
extracted_dir = extract_zip(zip_path, temp_path / "extracted")
|
|
markdown_path = find_full_markdown(extracted_dir)
|
|
markdown_text = markdown_path.read_text(encoding="utf-8")
|
|
rewritten_markdown = upload_images_and_rewrite(
|
|
markdown_text=markdown_text,
|
|
asset_root=markdown_path.parent,
|
|
pdf_stem=pdf_path.stem,
|
|
)
|
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
output_path.write_text(rewritten_markdown, encoding="utf-8", newline="\n")
|
|
print(str(output_path))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|