pdf2md/skills/pdf-to-markdown-mineru/scripts/convert_pdf_to_markdown.py
qz 22165a3c26 Import pdf-to-markdown converter and shorten hosted image suffixes.
Bring the local project into the remote repository and reduce generated image object suffixes to six characters for shorter URLs.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-19 14:37:42 +08:00

328 lines
11 KiB
Python

#!/usr/bin/env python3
import argparse
import datetime as dt
import mimetypes
import os
import posixpath
import re
import sys
import tempfile
import time
import uuid
import zipfile
from pathlib import Path
from urllib.parse import urlparse
import requests
from dotenv import load_dotenv
MINERU_BASE_URL = "https://mineru.net"
IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp", ".tif", ".tiff", ".svg"}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Convert a PDF academic paper to pure Markdown with hosted images via MinerU."
)
parser.add_argument("pdf", help="Path to the input PDF file")
parser.add_argument(
"-o",
"--output",
help="Path to the output Markdown file. Defaults to <pdf_stem>.md next to the PDF.",
)
parser.add_argument(
"--env-file",
help="Optional path to a dotenv file. If omitted, the script loads .env from the current working directory and the skill directory when present.",
)
parser.add_argument(
"--language",
default="en",
help="Document language hint for MinerU, default: en",
)
parser.add_argument(
"--ocr",
action="store_true",
help="Enable OCR mode for scanned PDFs",
)
parser.add_argument(
"--disable-table",
action="store_true",
help="Disable table extraction",
)
parser.add_argument(
"--disable-formula",
action="store_true",
help="Disable formula extraction",
)
parser.add_argument(
"--poll-interval",
type=float,
default=float(os.getenv("POLL_INTERVAL_SECONDS", "5")),
help="Polling interval in seconds",
)
parser.add_argument(
"--timeout",
type=int,
default=int(os.getenv("TIMEOUT_SECONDS", "1800")),
help="Overall timeout in seconds",
)
return parser.parse_args()
def load_environment(env_file: str | None) -> None:
if env_file:
load_dotenv(Path(env_file).expanduser(), override=False)
return
load_dotenv(override=False)
bundled_env = Path(__file__).resolve().parents[1] / ".env"
if bundled_env.exists():
load_dotenv(bundled_env, override=False)
def require_env(name: str) -> str:
value = os.getenv(name, "").strip()
if not value:
raise SystemExit(f"Missing required environment variable: {name}")
return value
def mineru_headers(token: str) -> dict[str, str]:
return {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
def submit_pdf(pdf_path: Path, token: str, language: str, is_ocr: bool, enable_table: bool, enable_formula: bool) -> tuple[str, str]:
data_id = uuid.uuid4().hex
payload = {
"files": [
{
"name": pdf_path.name,
"data_id": data_id,
"is_ocr": is_ocr,
}
],
"model_version": "vlm",
"enable_table": enable_table,
"enable_formula": enable_formula,
"language": language,
}
response = requests.post(
f"{MINERU_BASE_URL}/api/v4/file-urls/batch",
headers=mineru_headers(token),
json=payload,
timeout=60,
)
response.raise_for_status()
body = response.json()
if body.get("code") != 0:
raise RuntimeError(f"MinerU submit failed: {body}")
data = body["data"]
file_urls = data.get("file_urls") or []
if len(file_urls) != 1:
raise RuntimeError(f"Unexpected MinerU file_urls response: {body}")
return data["batch_id"], file_urls[0]
def upload_pdf_to_signed_url(pdf_path: Path, signed_url: str) -> None:
with pdf_path.open("rb") as f:
response = requests.put(signed_url, data=f, timeout=300)
response.raise_for_status()
def wait_for_result(batch_id: str, token: str, timeout_seconds: int, poll_interval: float) -> str:
deadline = time.monotonic() + timeout_seconds
last_state = None
while time.monotonic() < deadline:
response = requests.get(
f"{MINERU_BASE_URL}/api/v4/extract-results/batch/{batch_id}",
headers={"Authorization": f"Bearer {token}"},
timeout=60,
)
response.raise_for_status()
body = response.json()
if body.get("code") != 0:
raise RuntimeError(f"MinerU polling failed: {body}")
results = body.get("data", {}).get("extract_result") or []
if len(results) != 1:
raise RuntimeError(f"Unexpected MinerU extract_result response: {body}")
result = results[0]
state = result.get("state")
if state != last_state:
print(f"MinerU state: {state}", file=sys.stderr)
last_state = state
if state == "done":
full_zip_url = result.get("full_zip_url")
if not full_zip_url:
raise RuntimeError(f"MinerU returned done without full_zip_url: {body}")
return full_zip_url
if state == "failed":
raise RuntimeError(f"MinerU extraction failed: {result.get('err_msg') or body}")
time.sleep(poll_interval)
raise TimeoutError(f"Timed out waiting for MinerU batch {batch_id}")
def download_zip(full_zip_url: str, target_dir: Path) -> Path:
zip_path = target_dir / "result.zip"
with requests.get(full_zip_url, stream=True, timeout=300) as response:
response.raise_for_status()
with zip_path.open("wb") as f:
for chunk in response.iter_content(chunk_size=1024 * 1024):
if chunk:
f.write(chunk)
return zip_path
def extract_zip(zip_path: Path, target_dir: Path) -> Path:
with zipfile.ZipFile(zip_path) as zf:
zf.extractall(target_dir)
return target_dir
def find_full_markdown(extracted_dir: Path) -> Path:
matches = list(extracted_dir.rglob("full.md"))
if not matches:
raise FileNotFoundError("full.md was not found in the MinerU zip output")
return matches[0]
def infer_content_type(path: Path) -> str:
content_type, _ = mimetypes.guess_type(path.name)
return content_type or "application/octet-stream"
def build_r2_url(base_url: str, prefix: str, folder_name: str, object_name: str) -> str:
base = base_url.rstrip("/")
parts = [segment for segment in [prefix.strip("/"), folder_name.strip("/"), object_name] if segment]
return f"{base}/{posixpath.join(*parts)}"
def slugify_paper_name(pdf_stem: str, limit: int = 48) -> str:
slug = re.sub(r"[^a-z0-9]+", "-", pdf_stem.lower()).strip("-")
if not slug:
slug = "paper"
return slug[:limit].rstrip("-") or "paper"
def unique_object_name(local_path: Path) -> str:
stem = re.sub(r"[^a-zA-Z0-9._-]+", "-", local_path.stem).strip("-._") or "image"
suffix = local_path.suffix.lower() or ".bin"
return f"{stem}-{uuid.uuid4().hex[:6]}{suffix}"
def upload_image_to_r2(local_path: Path, target_url: str, bearer_token: str) -> str:
headers = {
"Authorization": f"Bearer {bearer_token}",
"Content-Type": infer_content_type(local_path),
}
with local_path.open("rb") as f:
response = requests.put(target_url, headers=headers, data=f, timeout=300)
response.raise_for_status()
return target_url
def normalize_public_url(uploaded_url: str, public_base_url: str | None) -> str:
if not public_base_url:
return uploaded_url
path = urlparse(uploaded_url).path.lstrip("/")
return f"{public_base_url.rstrip('/')}/{path}"
def upload_images_and_rewrite(markdown_text: str, asset_root: Path, pdf_stem: str) -> str:
r2_base_url = require_env("R2_BASE_URL")
r2_bearer_token = require_env("R2_BEARER_TOKEN")
r2_prefix = os.getenv("R2_PREFIX", "").strip()
r2_public_base_url = os.getenv("R2_PUBLIC_BASE_URL", "").strip() or None
date_prefix = dt.date.today().strftime("%Y/%m/%d")
paper_folder = slugify_paper_name(pdf_stem)
folder_name = posixpath.join(date_prefix, paper_folder)
uploaded: dict[str, str] = {}
def replace(match: re.Match[str]) -> str:
alt_text = match.group(1)
original_path = match.group(2).strip()
if original_path.startswith(("http://", "https://", "data:")):
return match.group(0)
if original_path in uploaded:
hosted_url = uploaded[original_path]
else:
local_path = (asset_root / original_path).resolve()
if not local_path.exists() or not local_path.is_file():
return match.group(0)
if local_path.suffix.lower() not in IMAGE_EXTENSIONS:
return match.group(0)
object_name = unique_object_name(local_path)
upload_url = build_r2_url(r2_base_url, r2_prefix, folder_name, object_name)
uploaded_url = upload_image_to_r2(local_path, upload_url, r2_bearer_token)
hosted_url = normalize_public_url(uploaded_url, r2_public_base_url)
uploaded[original_path] = hosted_url
return f"![{alt_text}]({hosted_url})"
return re.sub(r"!\[([^\]]*)\]\(([^)]+)\)", replace, markdown_text)
def main() -> int:
args = parse_args()
load_environment(args.env_file)
token = require_env("MINERU_API_TOKEN")
pdf_path = Path(args.pdf).expanduser().resolve()
if not pdf_path.exists() or not pdf_path.is_file():
raise SystemExit(f"PDF not found: {pdf_path}")
output_path = Path(args.output).expanduser().resolve() if args.output else pdf_path.with_suffix(".md")
enable_table = not args.disable_table
enable_formula = not args.disable_formula
print("Submitting PDF to MinerU...", file=sys.stderr)
batch_id, signed_upload_url = submit_pdf(
pdf_path=pdf_path,
token=token,
language=args.language,
is_ocr=args.ocr,
enable_table=enable_table,
enable_formula=enable_formula,
)
print("Uploading PDF...", file=sys.stderr)
upload_pdf_to_signed_url(pdf_path, signed_upload_url)
print("Waiting for extraction result...", file=sys.stderr)
full_zip_url = wait_for_result(
batch_id=batch_id,
token=token,
timeout_seconds=args.timeout,
poll_interval=args.poll_interval,
)
with tempfile.TemporaryDirectory(prefix="mineru-") as temp_dir:
temp_path = Path(temp_dir)
zip_path = download_zip(full_zip_url, temp_path)
extracted_dir = extract_zip(zip_path, temp_path / "extracted")
markdown_path = find_full_markdown(extracted_dir)
markdown_text = markdown_path.read_text(encoding="utf-8")
rewritten_markdown = upload_images_and_rewrite(
markdown_text=markdown_text,
asset_root=markdown_path.parent,
pdf_stem=pdf_path.stem,
)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(rewritten_markdown, encoding="utf-8", newline="\n")
print(str(output_path))
return 0
if __name__ == "__main__":
raise SystemExit(main())