Skip to content

Commit a8d2b93

Browse files
committed
Normalize OCR targets and expose repair packing
1 parent 3a3a401 commit a8d2b93

8 files changed

Lines changed: 198 additions & 4 deletions

src/glossapi/_naming.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import annotations
44

55
from pathlib import Path
6+
import re
67
from typing import Union
78

89
_KNOWN_SUFFIXES = (
@@ -19,6 +20,8 @@
1920
".htm",
2021
)
2122

23+
_PAGE_CHUNK_SUFFIX_RE = re.compile(r"__p\d{4,5}-\d{4,5}$")
24+
2225

2326
def canonical_stem(value: Union[str, Path]) -> str:
2427
"""Return a normalised stem for any pipeline artefact."""
@@ -33,6 +36,7 @@ def canonical_stem(value: Union[str, Path]) -> str:
3336
working = working[: -len(suffix)]
3437
stripped = True
3538
break
39+
working = _PAGE_CHUNK_SUFFIX_RE.sub("", working)
3640
if working:
3741
return working
3842
fallback = Path(name).stem

src/glossapi/corpus/phase_ocr_math.py

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,14 @@ def _apply_ocr_success_updates(
8484
if column not in df_meta.columns:
8585
df_meta[column] = None
8686

87+
filename_series = df_meta["filename"].astype(str)
88+
stem_series = filename_series.map(canonical_stem)
89+
8790
for fname in filenames:
88-
mask = df_meta["filename"].astype(str) == str(fname)
91+
stem = canonical_stem(fname)
92+
mask = stem_series == stem
8993
if not bool(mask.any()):
9094
continue
91-
stem = canonical_stem(fname)
9295
artifact_update = _build_ocr_stage_artifact_update(
9396
markdown_dir=markdown_dir,
9497
metrics_dir=metrics_dir,
@@ -107,6 +110,27 @@ def _apply_ocr_success_updates(
107110
return df_meta
108111

109112

113+
def _normalize_ocr_target_filenames(*, filenames: List[str], input_dir: Path) -> List[str]:
114+
"""Collapse chunk-like metadata rows back to real OCR source files when possible."""
115+
116+
source_by_stem: Dict[str, str] = {}
117+
try:
118+
for path in sorted(Path(input_dir).glob("*.pdf")):
119+
source_by_stem.setdefault(canonical_stem(path.name), path.name)
120+
except Exception:
121+
source_by_stem = {}
122+
123+
normalized: List[str] = []
124+
seen: Set[str] = set()
125+
for fname in filenames:
126+
resolved = source_by_stem.get(canonical_stem(fname), str(fname))
127+
if resolved in seen:
128+
continue
129+
normalized.append(resolved)
130+
seen.add(resolved)
131+
return normalized
132+
133+
110134
class OcrMathPhaseMixin:
111135
def ocr(
112136
self,
@@ -137,6 +161,8 @@ def ocr(
137161
gpu_memory_utilization: Optional[float] = None,
138162
disable_fp8_kv: bool = False,
139163
repair_mode: str = "auto",
164+
repair_exec_batch_target_pages: Optional[int] = None,
165+
repair_exec_batch_target_items: Optional[int] = None,
140166
scheduler: str = "auto",
141167
target_batch_pages: int = 160,
142168
shard_pages: int = 0,
@@ -196,8 +222,11 @@ def ocr(
196222
- vllm_batch_size/gpu_memory_utilization/disable_fp8_kv/repair_mode:
197223
Optional vLLM controls. ``repair_mode='auto'`` enables the markdown-first
198224
repair pipeline (plain fallback for garbage pages, tiled fallback for
199-
short coverage failures). These are ignored by the transformers runtime
200-
except for ``prompt_override``.
225+
short coverage failures). ``repair_exec_batch_target_pages`` and
226+
``repair_exec_batch_target_items`` control how many pending repair rows
227+
a worker tries to execute together once the global repair phase begins.
228+
These are ignored by the transformers runtime except for
229+
``prompt_override``.
201230
- force: [DEPRECATED] alias for fix_bad retained for backward compatibility.
202231
- reprocess_completed: when False, skip documents already flagged as successfully
203232
OCRed or math-enriched in metadata. Set True to force reprocessing. Defaults to False
@@ -357,6 +386,17 @@ def ocr(
357386
removed,
358387
)
359388
try:
389+
normalized_bad_files = _normalize_ocr_target_filenames(
390+
filenames=bad_files,
391+
input_dir=Path(self.input_dir),
392+
)
393+
if len(normalized_bad_files) != len(bad_files):
394+
self.logger.info(
395+
"OCR: collapsed %d metadata-selected row(s) onto %d real source PDF(s) by canonical stem.",
396+
len(bad_files),
397+
len(normalized_bad_files),
398+
)
399+
bad_files = normalized_bad_files
360400
self.logger.info(
361401
"OCR targets: total=%d kept=%d skipped_completed=%d skipped_skiplist=%d",
362402
ocr_candidates_initial,
@@ -727,6 +767,8 @@ def _run_math(stems: List[str]) -> None:
727767
gpu_memory_utilization=gpu_memory_utilization,
728768
disable_fp8_kv=disable_fp8_kv,
729769
repair_mode=repair_mode,
770+
repair_exec_batch_target_pages=repair_exec_batch_target_pages,
771+
repair_exec_batch_target_items=repair_exec_batch_target_items,
730772
scheduler=scheduler,
731773
target_batch_pages=int(max(1, target_batch_pages)),
732774
shard_pages=int(max(0, shard_pages)),

src/glossapi/scripts/full_pipeline_checkpoint.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ def _parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace:
5757
p.add_argument("--ocr-devices", nargs="*", type=int, default=None)
5858
p.add_argument("--ocr-workers-per-gpu", type=int, default=1)
5959
p.add_argument("--ocr-vllm-batch-size", type=int, default=None)
60+
p.add_argument("--ocr-repair-exec-batch-target-pages", type=int, default=None)
61+
p.add_argument("--ocr-repair-exec-batch-target-items", type=int, default=None)
6062
p.add_argument("--ocr-target-batch-pages", type=int, default=160)
6163
p.add_argument("--ocr-render-dpi", type=int, default=None)
6264
p.add_argument("--ocr-scheduler", default="auto")
@@ -165,6 +167,8 @@ def main(argv: Optional[List[str]] = None) -> int:
165167
devices=_parse_int_list(args.ocr_devices),
166168
workers_per_gpu=int(args.ocr_workers_per_gpu),
167169
vllm_batch_size=args.ocr_vllm_batch_size,
170+
repair_exec_batch_target_pages=args.ocr_repair_exec_batch_target_pages,
171+
repair_exec_batch_target_items=args.ocr_repair_exec_batch_target_items,
168172
target_batch_pages=int(args.ocr_target_batch_pages),
169173
render_dpi=args.ocr_render_dpi,
170174
scheduler=str(args.ocr_scheduler),

src/glossapi/scripts/openarchives_ocr_run_node.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ def _parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace:
5353
p.add_argument("--max-new-tokens", type=int, default=2048)
5454
p.add_argument("--render-dpi", type=int, default=144)
5555
p.add_argument("--repair-mode", default="auto")
56+
p.add_argument("--repair-exec-batch-target-pages", type=int, default=None)
57+
p.add_argument("--repair-exec-batch-target-items", type=int, default=None)
5658
p.add_argument("--gpu-memory-utilization", type=float, default=0.9)
5759
return p.parse_args(argv)
5860

@@ -348,6 +350,8 @@ def main(argv: Optional[List[str]] = None) -> int:
348350
render_dpi=int(args.render_dpi),
349351
max_new_tokens=int(args.max_new_tokens),
350352
repair_mode=str(args.repair_mode),
353+
repair_exec_batch_target_pages=args.repair_exec_batch_target_pages,
354+
repair_exec_batch_target_items=args.repair_exec_batch_target_items,
351355
scheduler=str(args.scheduler),
352356
target_batch_pages=int(args.target_batch_pages),
353357
shard_pages=int(args.shard_pages),

tests/test_full_pipeline_checkpoint.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,3 +148,63 @@ def jsonl(self, output_path, **kwargs):
148148
assert report["post_extract_counts"]["needs_ocr_true"] == 1
149149
assert report["post_ocr_counts"]["ocr_success_true"] == 1
150150
assert report["export_records"] == 1
151+
152+
153+
def test_full_pipeline_checkpoint_forwards_repair_exec_batch_controls(tmp_path, monkeypatch):
154+
captured = {}
155+
156+
class DummyCorpus:
157+
def __init__(self, input_dir, output_dir):
158+
self.input_dir = input_dir
159+
self.output_dir = output_dir
160+
161+
def _metadata_path(self):
162+
path = self.output_dir / "download_results" / "download_results.parquet"
163+
path.parent.mkdir(parents=True, exist_ok=True)
164+
return path
165+
166+
def extract(self, **kwargs):
167+
pd.DataFrame(
168+
[{"filename": "doc.pdf", "needs_ocr": True, "ocr_success": False, "text": ""}]
169+
).to_parquet(self._metadata_path(), index=False)
170+
171+
def clean(self, **kwargs):
172+
return None
173+
174+
def ocr(self, **kwargs):
175+
captured.update(kwargs)
176+
pd.DataFrame(
177+
[{"filename": "doc.pdf", "needs_ocr": False, "ocr_success": True, "text": "fixed text"}]
178+
).to_parquet(self._metadata_path(), index=False)
179+
180+
def jsonl(self, output_path, **kwargs):
181+
output_path.write_text(json.dumps({"text": "fixed text"}) + "\n", encoding="utf-8")
182+
183+
monkeypatch.setattr(checkpoint, "Corpus", DummyCorpus)
184+
185+
input_dir = tmp_path / "in"
186+
input_dir.mkdir()
187+
output_dir = tmp_path / "out"
188+
export_path = tmp_path / "export.jsonl"
189+
report_path = tmp_path / "report.json"
190+
191+
rc = checkpoint.main(
192+
[
193+
"--input-dir",
194+
str(input_dir),
195+
"--output-dir",
196+
str(output_dir),
197+
"--export-path",
198+
str(export_path),
199+
"--report-path",
200+
str(report_path),
201+
"--ocr-repair-exec-batch-target-pages",
202+
"64",
203+
"--ocr-repair-exec-batch-target-items",
204+
"24",
205+
]
206+
)
207+
208+
assert rc == 0
209+
assert captured["repair_exec_batch_target_pages"] == 64
210+
assert captured["repair_exec_batch_target_items"] == 24

tests/test_metadata_fallback.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,8 @@ def test_canonical_stem_variants():
210210
"beta.metrics.json": "beta",
211211
"gamma.per_page.metrics.json": "gamma",
212212
"delta.with.dots.pdf": "delta.with.dots",
213+
"needs__p0001-0002.pdf": "needs",
214+
"needs__p00001-00096.md": "needs",
213215
}
214216
for source, expected in cases.items():
215217
assert canonical_stem(source) == expected

tests/test_ocr_backends_smoke.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,44 @@ def fake_enrich(files=None, **kwargs):
7373
== hashlib.sha256(b"ds md\n").hexdigest()
7474
)
7575
assert captured.get("files") == ["clean"], "Math-only should run for non-OCR stem only"
76+
77+
78+
def test_deepseek_ocr_normalizes_chunk_rows_to_real_source_pdf(tmp_path, monkeypatch):
79+
corpus = _mk_corpus(tmp_path)
80+
81+
(corpus.input_dir / "needs.pdf").write_bytes(b"%PDF-1.4\n%stub\n")
82+
83+
dl_dir = corpus.output_dir / "download_results"
84+
dl_dir.mkdir(parents=True, exist_ok=True)
85+
parquet_path = dl_dir / "download_results.parquet"
86+
pd.DataFrame(
87+
[
88+
{"filename": "needs.pdf", corpus.url_column: "", "needs_ocr": True, "ocr_success": False},
89+
{"filename": "needs__p0001-0002.pdf", corpus.url_column: "", "needs_ocr": True, "ocr_success": False},
90+
]
91+
).to_parquet(parquet_path, index=False)
92+
93+
from glossapi.ocr.deepseek import runner
94+
95+
captured = {}
96+
97+
def fake_run_for_files(self_ref, files, **kwargs):
98+
captured["files"] = list(files)
99+
markdown_dir = corpus.output_dir / "markdown"
100+
metrics_dir = corpus.output_dir / "json" / "metrics"
101+
markdown_dir.mkdir(parents=True, exist_ok=True)
102+
metrics_dir.mkdir(parents=True, exist_ok=True)
103+
(markdown_dir / "needs.md").write_text("normalized md\n", encoding="utf-8")
104+
(metrics_dir / "needs.metrics.json").write_text("{\n \"page_count\": 1\n}\n", encoding="utf-8")
105+
return {"needs": {"page_count": 1}}
106+
107+
monkeypatch.setattr(runner, "run_for_files", fake_run_for_files)
108+
109+
corpus.ocr(backend="deepseek", fix_bad=True, math_enhance=False, mode="ocr_bad")
110+
111+
assert captured["files"] == ["needs.pdf"]
112+
updated = pd.read_parquet(parquet_path).set_index("filename")
113+
assert bool(updated.loc["needs.pdf", "ocr_success"]) is True
114+
assert bool(updated.loc["needs__p0001-0002.pdf", "ocr_success"]) is True
115+
assert updated.loc["needs.pdf", "text"] == "normalized md\n"
116+
assert updated.loc["needs__p0001-0002.pdf", "text"] == "normalized md\n"

tests/test_ocr_dispatch_backends.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,43 @@ def fail_math(*args, **kwargs):
5151
assert calls.get("files") == [fname]
5252

5353

54+
def test_deepseek_backend_forwards_repair_exec_batch_controls(tmp_path, monkeypatch):
55+
corpus = _mk_corpus(tmp_path)
56+
57+
dl_dir = corpus.output_dir / "download_results"
58+
dl_dir.mkdir(parents=True, exist_ok=True)
59+
fname = "doc.pdf"
60+
df = pd.DataFrame([
61+
{"filename": fname, corpus.url_column: "", "needs_ocr": True, "ocr_success": False}
62+
])
63+
df.to_parquet(dl_dir / "download_results.parquet", index=False)
64+
(corpus.input_dir / fname).write_bytes(b"%PDF-1.4\n%stub\n")
65+
66+
from glossapi.ocr.deepseek import runner
67+
68+
calls = {}
69+
70+
def fake_run_for_files(self_ref, files, **kwargs):
71+
calls["files"] = list(files)
72+
calls["kwargs"] = dict(kwargs)
73+
return {"doc": {"page_count": 1}}
74+
75+
monkeypatch.setattr(runner, "run_for_files", fake_run_for_files)
76+
77+
corpus.ocr(
78+
backend="deepseek",
79+
fix_bad=True,
80+
math_enhance=False,
81+
mode="ocr_bad",
82+
repair_exec_batch_target_pages=64,
83+
repair_exec_batch_target_items=24,
84+
)
85+
86+
assert calls.get("files") == [fname]
87+
assert calls["kwargs"]["repair_exec_batch_target_pages"] == 64
88+
assert calls["kwargs"]["repair_exec_batch_target_items"] == 24
89+
90+
5491
def test_invalid_backend_is_rejected(tmp_path):
5592
corpus = _mk_corpus(tmp_path)
5693
with pytest.raises(ValueError, match="backend must be 'deepseek'"):

0 commit comments

Comments
 (0)