Skip to content
113 changes: 85 additions & 28 deletions deepdoc/parser/docling_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class RAGFlowPdfParser:
from deepdoc.parser.utils import extract_pdf_outlines



class DoclingContentType(str, Enum):
IMAGE = "image"
TABLE = "table"
Expand Down Expand Up @@ -350,6 +351,13 @@ def _parse_pdf_remote(
docling_server_url: Optional[str] = None,
request_timeout: Optional[int] = None,
):
"""
Parses a PDF document using a remote Docling server.

Prioritizes native chunking endpoints (/v1/chunk/source, /v1alpha/chunk/source)
to prevent token overflow, with a graceful fallback to standard conversion
endpoints if chunking is unavailable.
"""
server_url = self._effective_server_url(docling_server_url)
if not server_url:
raise RuntimeError("[Docling] DOCLING_SERVER_URL is not configured.")
Expand All @@ -372,36 +380,48 @@ def _parse_pdf_remote(

filename = Path(filepath).name or "input.pdf"
b64 = base64.b64encode(pdf_bytes).decode("ascii")
v1_payload = {
"options": {
"from_formats": ["pdf"],
"to_formats": ["json", "md", "text"],
},
"sources": [
{
"kind": "file",
"filename": filename,
"base64_string": b64,
}
],

# Standard payloads
# Standard fallback payloads (no chunking)
v1_payload_standard = {
"options": {"from_formats": ["pdf"], "to_formats": ["json", "md", "text"]},
"sources": [{"kind": "file", "filename": filename, "base64_string": b64}],
}
v1alpha_payload_standard = {
"options": {"from_formats": ["pdf"], "to_formats": ["json", "md", "text"]},
"file_sources": [{"filename": filename, "base64_string": b64}],
}

# --- NEW: Correct API Contract for Chunking ---
chunking_opts = {
"from_formats": ["pdf"],
"to_formats": ["json", "md", "text"],
"do_chunking": True,
"chunking_options": {
"max_tokens": 512,
"overlap": 50,
"tokenizer": "sentencepiece" # Required by Docling contract
}
}
v1_payload_chunked = {
"options": chunking_opts,
"sources": [{"kind": "file", "filename": filename, "base64_string": b64}],
}
v1alpha_payload = {
"options": {
"from_formats": ["pdf"],
"to_formats": ["json", "md", "text"],
},
"file_sources": [
{
"filename": filename,
"base64_string": b64,
}
],
v1alpha_payload_chunked = {
"options": chunking_opts,
"file_sources": [{"filename": filename, "base64_string": b64}],
}

errors = []
response_json = None
for endpoint, payload in (
("/v1/convert/source", v1_payload),
("/v1alpha/convert/source", v1alpha_payload),
is_chunked_response = False

# Try chunked endpoints first, then fall back to standard if the server is older
for endpoint, payload, chunk_flag in (
("/v1/convert/source", v1_payload_chunked, True),
("/v1alpha/convert/source", v1alpha_payload_chunked, True),
("/v1/convert/source", v1_payload_standard, False),
("/v1alpha/convert/source", v1alpha_payload_standard, False),
):
try:
resp = requests.post(
Expand All @@ -411,20 +431,57 @@ def _parse_pdf_remote(
)
if resp.status_code < 300:
response_json = resp.json()
is_chunked_response = chunk_flag

if chunk_flag:
self.logger.info(f"[Docling] Successfully used native chunking on: {endpoint}")
else:
self.logger.info(f"[Docling] Chunking unavailable, fell back to standard: {endpoint}")
break

# If chunking request is rejected (e.g., 422 Unprocessable Entity on older servers),
# log it and let the loop naturally fall back to the standard payload.
if chunk_flag:
self.logger.warning(f"[Docling] Server rejected chunking parameters: HTTP {resp.status_code}")
continue

errors.append(f"{endpoint}: HTTP {resp.status_code} {resp.text[:300]}")

except Exception as exc:
self.logger.error(f"[Docling] Request error on {endpoint}: {exc}")
errors.append(f"{endpoint}: {exc}")

if response_json is None:
raise RuntimeError("[Docling] remote convert failed: " + " | ".join(errors))

sections: list[tuple[str, ...]] = []
tables = []

# --- NEW: Handle Native Chunked Response ---
if is_chunked_response:
# The chunking endpoint returns an array of chunk items
chunks = response_json if isinstance(response_json, list) else response_json.get("results", [])
for chunk_data in chunks:
if not isinstance(chunk_data, dict):
continue
# Depending on the exact docling-serve spec, the text might be nested
chunk_text = chunk_data.get("text", "")
if not chunk_text and isinstance(chunk_data.get("chunk"), dict):
chunk_text = chunk_data["chunk"].get("text", "")
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if isinstance(chunk_text, str) and chunk_text.strip():
# Feed the pre-sliced chunks directly into RAGFlow's expected format
sections.extend(self._sections_from_remote_text(chunk_text, parse_method=parse_method))
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if callback:
callback(0.95, f"[Docling] Native chunks received: {len(sections)}")
return sections, tables
Comment thread
ParasSondhi marked this conversation as resolved.

# --- FALLBACK: Standard RAGFlow parsing for older docling servers ---
docs = self._extract_remote_document_entries(response_json)
if not docs:
raise RuntimeError("[Docling] remote response does not contain parsed documents.")

sections: list[tuple[str, ...]] = []
tables = []
for doc in docs:
md = doc.get("md_content")
txt = doc.get("text_content")
Expand Down
Loading