vector_store: stream VDB uploads in graph pipeline via client VDB wrapper#1847
vector_store: stream VDB uploads in graph pipeline via client VDB wrapper#1847jioffe502 wants to merge 10 commits intoNVIDIA:mainfrom
Conversation
…pper - VDBUploadOperator wraps client VDB classes (LanceDB, Milvus) as a streaming graph stage with concurrency=1 and batch_size=64 - Preprocess extracts canonical records; process writes per-backend - Finalization delegates to client LanceDB.write_to_index for indexing - jp20 recall@5=0.8783 (parity), PPS=21.79 (parity with baseline 21.50) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
…atch - Fix _write_via_client to use get_connection_params/get_write_params matching the client Milvus.run() dispatch pattern - Add integration_test_milvus_vdb.py (writes + search verification) - Add integration_test_milvus_recall.py (full jp20 pipeline + recall) - Milvus recall: @1=0.6435 @5=0.8783 @10=0.9217 (matches LanceDB) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
- Remove monkeypatches for _ensure_lancedb_table and handle_lancedb - Add vdb_upload/store/caption/dedup stubs to _FakeIngestor - Add count() to _FakeDataset for no-collect code path - Delete dead handle_lancedb function and its tests Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
- black reformatting on test files - remove unused imports flagged by flake8 - end-of-file-fixer on lancedb_store.py Co-Authored-By: Claude Opus 4 <noreply@anthropic.com> Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
Lets callers pass a ready-made LanceDB/Milvus instance from nv_ingest_client.util.vdb directly into GraphIngestor.vdb_upload(vdb_op=...) so the graph wraps it instead of rebuilding one from VdbUploadParams. The operator captures vdb_op in get_constructor_kwargs() so it round-trips to Ray workers unchanged. - VDBUploadOperator: new vdb_op kwarg, backend derived from its class name - GraphIngestor.vdb_upload + build_graph + _append_ordered_transform_stages thread vdb_op through to the operator - _finalize_vdb reuses the passed LanceDB instance for post-run indexing - Extracted build_client_lancedb helper; removed dead build_lancedb_rows / build_vdb_records_from_dicts; fixed latent missing logger in graph_ingestor - Added 3 unit tests + end-to-end integration test covering both backends (bo20 ingested 831 rows to both LanceDB and Milvus via passthrough) Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
…ring Signed-off-by: Jacob Ioffe <jioffe@nvidia.com> # Conflicts: # nemo_retriever/src/nemo_retriever/examples/graph_pipeline.py # nemo_retriever/src/nemo_retriever/graph_ingestor.py # nemo_retriever/src/nemo_retriever/vector_store/lancedb_store.py
Preparatory checkpoint for the Milvus streaming-write fix. Adds a no-op `VDBUploadOperator.finalize()` that the driver calls once after `executor.ingest()` returns (via a new `vdb_upload_ops_out` handle threaded through `build_graph`), so one-shot flush/wait-for-index work can live off the per-batch lifecycle that `AbstractOperator.run()` fires. LanceDB and the bulk-fallback write path remain no-ops. Also reverts the Milvus CLI additions from `graph_pipeline.py` (backend selection moves back to the caller passing `vdb_op=`) and sanitizes the bo20 dataset path in `test_configs.yaml`. Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
Signed-off-by: jioffe502 <jioffe@nvidia.com> Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
Signed-off-by: Jacob Ioffe <jioffe@nvidia.com>
c0d7b56 to
d85d8e2
Compare
Greptile SummaryThis PR introduces
|
| Filename | Overview |
|---|---|
| nemo_retriever/src/nemo_retriever/graph/vdb_upload_operator.py | New streaming VDB upload operator — core of the PR; wraps client VDB classes for in-graph writes with LanceDB and Milvus streaming paths, finalize(), and deferred index creation. |
| nemo_retriever/src/nemo_retriever/graph_ingestor.py | Adds vdb_upload() fluent method and _finalize_vdb() post-pipeline index creation; vdb_upload_ops_out pattern wires driver-side operator references for finalize() after Ray execution. |
| nemo_retriever/src/nemo_retriever/examples/graph_pipeline.py | Replaces collect-then-write LanceDB pattern with streaming vdb_upload(); adds metrics_output_file CLI option, lazy result collection, and improved page counting vs. row counting. |
| nemo_retriever/src/nemo_retriever/vector_store/vdb_records.py | New canonical VDB record builder extracted from lancedb_utils; single source of truth for DataFrame to VDB record conversion across all backends. |
| nemo_retriever/src/nemo_retriever/vector_store/lancedb_utils.py | Adds _ensure_dict() helper to handle Arrow serialization of dict columns to strings; removes build_lancedb_rows (moved to vdb_records.py); adds build_client_lancedb factory. |
| nemo_retriever/src/nemo_retriever/vector_store/init.py | Removes LanceDBConfig from public exports and adds build_vdb_records — breaking removal of a previously public symbol without a deprecation cycle. |
| nemo_retriever/tests/integration_test_milvus_recall.py | New end-to-end Milvus recall integration test; MILVUS_URI and QUERY_CSV are hardcoded developer paths not read from environment variables. |
| nemo_retriever/tests/integration_test_milvus_vdb.py | New Milvus integration test; correctly reads MILVUS_URI from environment variable with fallback default. |
| nemo_retriever/tests/integration_test_vdb_op_passthrough.py | New passthrough integration test; MILVUS_URI hardcoded without env var fallback, inconsistent with integration_test_milvus_vdb.py. |
| nemo_retriever/tests/test_vdb_upload_operator.py | 452-line unit test suite for VDBUploadOperator covering LanceDB streaming, Milvus streaming, finalize, and passthrough behaviour. |
| nemo_retriever/tests/test_vdb_record_contract.py | 248-line contract tests for vdb_records and _canonical_to_nvingest; covers edge cases including empty dataframes and JSON serialization round-trips. |
| nemo_retriever/src/nemo_retriever/graph/ingestor_runtime.py | Wires VDBUploadOperator into build_graph() and _append_ordered_transform_stages(); adds vdb_upload_ops_out output parameter for driver-side finalize hooks. |
| nemo_retriever/src/nemo_retriever/params/models.py | Adds backend and client_vdb_kwargs fields to VdbUploadParams for multi-backend support. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[GraphIngestor.vdb_upload] --> B[build_graph / ingestor_runtime]
B --> C[VDBUploadOperator added to graph]
C --> D[Per batch: preprocess]
D --> E[build_vdb_records from DataFrame]
E --> F[process]
F --> G{backend}
G -->|lancedb| H[table.add records]
G -->|milvus| I[milvus_client.insert]
G -->|other| J[client_vdb.write_to_index]
H --> K[DataFrame passed through unchanged]
I --> K
J --> K
K --> L[executor.ingest returns]
L --> M[op.finalize for each op in vdb_upload_ops]
M --> N{backend}
N -->|lancedb| O[no-op: client_vdb is None on driver]
N -->|milvus| P[flush + wait_for_index on Milvus server]
L --> Q[GraphIngestor._finalize_vdb]
Q --> R{backend = lancedb?}
R -->|yes| S[open_table then write_to_index: ANN + FTS index]
R -->|no| T[return]
Prompt To Fix All With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/examples/graph_pipeline.py
Line: 183-192
Comment:
**Silent exception swallow in `_count_processed_pages_from_dataset`**
The first `except Exception` block returns a fallback value with no logging at all. If `dataset.columns()` raises (e.g., due to a serialisation bug or disconnected Ray cluster), the caller silently gets an incorrect page count and no signal that anything went wrong. The `no-bare-except` rule requires broad catches to at minimum log with `exc_info=True`.
```suggestion
try:
columns = set(dataset.columns())
except Exception:
logger.warning("Could not read Ray Dataset columns; falling back to output row count.", exc_info=True)
return int(fallback_rows)
```
**Rule Used:** Never use bare 'except:' that silently swallows er... ([source](https://app.greptile.com/review/custom-context?memory=no-bare-except))
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: nemo_retriever/tests/integration_test_milvus_recall.py
Line: 23
Comment:
**Hardcoded developer IP address for `MILVUS_URI`**
`MILVUS_URI` is a hardcoded IP that won't be reachable on any other machine. `integration_test_milvus_vdb.py` already sets the right pattern with `os.environ.get(...)` — this file and `integration_test_vdb_op_passthrough.py` (line 31) should follow the same convention.
```suggestion
MILVUS_URI = os.environ.get("NEMO_RETRIEVER_MILVUS_URI", "http://172.20.0.4:19530")
```
Note: you'll also need `import os` at the top of this file.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: nemo_retriever/tests/integration_test_vdb_op_passthrough.py
Line: 31
Comment:
**Hardcoded developer IP address for `MILVUS_URI`**
Same issue as `integration_test_milvus_recall.py` — `MILVUS_URI` is a developer-specific IP without an environment-variable fallback, inconsistent with `integration_test_milvus_vdb.py`.
```suggestion
MILVUS_URI = os.environ.get("NEMO_RETRIEVER_MILVUS_URI", "http://172.20.0.4:19530")
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/vector_store/__init__.py
Line: 1-16
Comment:
**`LanceDBConfig` removed from public exports without a deprecation cycle**
`LanceDBConfig` was listed in `__all__` of this module and is referenced in the API reference docs. Removing it is a breaking change for callers doing `from nemo_retriever.vector_store import LanceDBConfig`. The `api-backward-compatibility` rule requires a deprecation warning and migration note before removal. Consider re-exporting it with a deprecation shim until the next minor version bump:
```python
# Deprecated: import LanceDbParams from nemo_retriever.params instead.
from nemo_retriever.vector_store.lancedb_store import LanceDBConfig # noqa: F401
```
**Rule Used:** Changes to public API surfaces (FastAPI endpoints,... ([source](https://app.greptile.com/review/custom-context?memory=api-backward-compatibility))
How can I resolve this? If you propose a fix, please make it concise.Reviews (2): Last reviewed commit: "Merge remote-tracking branch 'upstream/m..." | Re-trigger Greptile
| table = db.open_table(table_name) | ||
| except Exception: | ||
| return | ||
|
|
There was a problem hiding this comment.
Silent exception swallow in
_finalize_vdb()
The bare except Exception: return silently discards any error that occurs when connecting to LanceDB or opening the table. If the pipeline failed to write any rows (e.g., all batches encountered errors), the table would not exist and this block would silently suppress the fact that no index was created. The user gets no signal that index creation was skipped.
This violates the no-bare-except rule which requires that broad exception catches at least log with full context.
try:
db = lancedb.connect(uri=uri)
table = db.open_table(table_name)
except Exception:
logger.warning(
"Could not open LanceDB table %r at %r for index creation; skipping.",
table_name,
uri,
exc_info=True,
)
returnPrompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/graph_ingestor.py
Line: 529-532
Comment:
**Silent exception swallow in `_finalize_vdb()`**
The bare `except Exception: return` silently discards any error that occurs when connecting to LanceDB or opening the table. If the pipeline failed to write any rows (e.g., all batches encountered errors), the table would not exist and this block would silently suppress the fact that no index was created. The user gets no signal that index creation was skipped.
This violates the `no-bare-except` rule which requires that broad exception catches at least log with full context.
```python
try:
db = lancedb.connect(uri=uri)
table = db.open_table(table_name)
except Exception:
logger.warning(
"Could not open LanceDB table %r at %r for index creation; skipping.",
table_name,
uri,
exc_info=True,
)
return
```
How can I resolve this? If you propose a fix, please make it concise.| source_metadata = json.loads(source_str) if isinstance(source_str, str) else source_str | ||
| except (json.JSONDecodeError, TypeError): | ||
| source_metadata = {} | ||
|
|
||
| elements.append( | ||
| { | ||
| "document_type": "text", | ||
| "metadata": { | ||
| "content": row.get("text", ""), | ||
| "embedding": row.get("vector"), | ||
| "content_metadata": content_metadata, | ||
| "source_metadata": source_metadata, | ||
| }, | ||
| } | ||
| ) | ||
| return [elements] if elements else [] | ||
|
|
There was a problem hiding this comment.
document_type hardcoded to "text" drops chart/table content in Milvus
_canonical_to_nvingest sets "document_type": "text" for every record regardless of the original content type. The client Milvus cleanup_records function filters records by document_type when enable_charts, enable_tables, etc. are set. Because every record arrives as "text", type-specific enable flags become no-ops — chart and table embeddings are misclassified and may be silently dropped, breaking multi-modal recall.
The canonical records built by build_vdb_records lose the original document_type; it should be carried through and used here instead of the hardcoded "text".
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/graph/vdb_upload_operator.py
Line: 40-56
Comment:
**`document_type` hardcoded to `"text"` drops chart/table content in Milvus**
`_canonical_to_nvingest` sets `"document_type": "text"` for every record regardless of the original content type. The client Milvus `cleanup_records` function filters records by `document_type` when `enable_charts`, `enable_tables`, etc. are set. Because every record arrives as `"text"`, type-specific enable flags become no-ops — chart and table embeddings are misclassified and may be silently dropped, breaking multi-modal recall.
The canonical records built by `build_vdb_records` lose the original `document_type`; it should be carried through and used here instead of the hardcoded `"text"`.
How can I resolve this? If you propose a fix, please make it concise.| QUERY_CSV = "/raid/jioffe/NeMo-Retriever/data/jp20_query_gt.csv" | ||
| EMBED_MODEL = "nvidia/llama-nemotron-embed-1b-v2" | ||
| EMBED_DIM = 2048 | ||
| TOP_K = 10 |
There was a problem hiding this comment.
Hardcoded personal filesystem path committed to the repository
QUERY_CSV is set to a developer-specific local path. No other team member or CI system can use this script without editing the source. This should be read from an environment variable.
QUERY_CSV = os.environ.get("NEMO_RETRIEVER_QUERY_CSV", "/datasets/nv-ingest/jp20/jp20_query_gt.csv")Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/tests/integration_test_milvus_recall.py
Line: 29
Comment:
**Hardcoded personal filesystem path committed to the repository**
`QUERY_CSV` is set to a developer-specific local path. No other team member or CI system can use this script without editing the source. This should be read from an environment variable.
```python
QUERY_CSV = os.environ.get("NEMO_RETRIEVER_QUERY_CSV", "/datasets/nv-ingest/jp20/jp20_query_gt.csv")
```
How can I resolve this? If you propose a fix, please make it concise.| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| *, | ||
| params: Any = None, | ||
| vdb_op: Any = None, | ||
| ) -> None: | ||
| super().__init__() | ||
| from nemo_retriever.params.models import LanceDbParams, VdbUploadParams | ||
|
|
||
| # Store as self.<name> so get_constructor_kwargs() captures both for | ||
| # deferred reconstruction on Ray workers. | ||
| self.params = params | ||
| self.vdb_op = vdb_op | ||
|
|
||
| if isinstance(params, VdbUploadParams): | ||
| self._vdb_params = params | ||
| self._lance_params = params.lancedb | ||
| elif isinstance(params, LanceDbParams): | ||
| self._vdb_params = None | ||
| self._lance_params = params | ||
| else: | ||
| self._vdb_params = VdbUploadParams() | ||
| self._lance_params = self._vdb_params.lancedb | ||
|
|
||
| if vdb_op is not None: | ||
| self._backend_name = type(vdb_op).__name__.lower() | ||
| else: | ||
| self._backend_name = getattr(self._vdb_params, "backend", "lancedb") if self._vdb_params else "lancedb" | ||
|
|
||
| self._client_vdb: Any = vdb_op | ||
| self._table: Any = None | ||
| self._milvus_client: Any = None | ||
| self._index_created: bool = False | ||
| self._pending_records: List[Dict[str, Any]] = [] | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Client VDB construction | ||
| # ------------------------------------------------------------------ | ||
|
|
There was a problem hiding this comment.
purge_results_after_upload field from VdbUploadParams is never referenced
VdbUploadParams.purge_results_after_upload is never read inside VDBUploadOperator or _finalize_vdb(). If this was a meaningful knob for callers relying on the existing API contract, it should either be honoured or formally deprecated. Leaving a public config field silently ignored is a contract violation.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/graph/vdb_upload_operator.py
Line: 90-130
Comment:
**`purge_results_after_upload` field from `VdbUploadParams` is never referenced**
`VdbUploadParams.purge_results_after_upload` is never read inside `VDBUploadOperator` or `_finalize_vdb()`. If this was a meaningful knob for callers relying on the existing API contract, it should either be honoured or formally deprecated. Leaving a public config field silently ignored is a contract violation.
How can I resolve this? If you propose a fix, please make it concise.| ) | ||
| except RuntimeError: | ||
| logger.warning( | ||
| "Index creation failed (likely too few rows for %d partitions); skipping.", | ||
| lance_params.num_partitions, | ||
| exc_info=True, | ||
| ) | ||
|
|
||
| def _record_stage(self, name: str) -> None: |
There was a problem hiding this comment.
except RuntimeError too broad in _finalize_vdb()
The catch on RuntimeError (intended for the IVF partitions constraint) is too wide — any RuntimeError from write_to_index will be silently swallowed with only a warning. A failed network call or lancedb API change would surface identically, obscuring root causes. Consider matching the error message or narrowing the exception type.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/graph_ingestor.py
Line: 544-552
Comment:
**`except RuntimeError` too broad in `_finalize_vdb()`**
The catch on `RuntimeError` (intended for the IVF partitions constraint) is too wide — any `RuntimeError` from `write_to_index` will be silently swallowed with only a warning. A failed network call or lancedb API change would surface identically, obscuring root causes. Consider matching the error message or narrowing the exception type.
How can I resolve this? If you propose a fix, please make it concise.…ring Signed-off-by: Jacob Ioffe <jioffe@nvidia.com> # Conflicts: # nemo_retriever/src/nemo_retriever/graph/__init__.py # nemo_retriever/src/nemo_retriever/graph/ingestor_runtime.py # nemo_retriever/src/nemo_retriever/graph_ingestor.py # nemo_retriever/tests/test_harness_run.py
| columns = set(dataset.columns()) | ||
| except Exception: | ||
| return int(fallback_rows) | ||
|
|
||
| if not columns.intersection({"source_id", "path", "source_path", "page_number"}): | ||
| return int(fallback_rows) | ||
|
|
||
| try: | ||
| key_ds = dataset.map_batches(_extract_page_key_batch, batch_format="pandas") | ||
| if int(key_ds.count()) == 0: |
There was a problem hiding this comment.
Silent exception swallow in
_count_processed_pages_from_dataset
The first except Exception block returns a fallback value with no logging at all. If dataset.columns() raises (e.g., due to a serialisation bug or disconnected Ray cluster), the caller silently gets an incorrect page count and no signal that anything went wrong. The no-bare-except rule requires broad catches to at minimum log with exc_info=True.
| columns = set(dataset.columns()) | |
| except Exception: | |
| return int(fallback_rows) | |
| if not columns.intersection({"source_id", "path", "source_path", "page_number"}): | |
| return int(fallback_rows) | |
| try: | |
| key_ds = dataset.map_batches(_extract_page_key_batch, batch_format="pandas") | |
| if int(key_ds.count()) == 0: | |
| try: | |
| columns = set(dataset.columns()) | |
| except Exception: | |
| logger.warning("Could not read Ray Dataset columns; falling back to output row count.", exc_info=True) | |
| return int(fallback_rows) |
Rule Used: Never use bare 'except:' that silently swallows er... (source)
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/examples/graph_pipeline.py
Line: 183-192
Comment:
**Silent exception swallow in `_count_processed_pages_from_dataset`**
The first `except Exception` block returns a fallback value with no logging at all. If `dataset.columns()` raises (e.g., due to a serialisation bug or disconnected Ray cluster), the caller silently gets an incorrect page count and no signal that anything went wrong. The `no-bare-except` rule requires broad catches to at minimum log with `exc_info=True`.
```suggestion
try:
columns = set(dataset.columns())
except Exception:
logger.warning("Could not read Ray Dataset columns; falling back to output row count.", exc_info=True)
return int(fallback_rows)
```
**Rule Used:** Never use bare 'except:' that silently swallows er... ([source](https://app.greptile.com/review/custom-context?memory=no-bare-except))
How can I resolve this? If you propose a fix, please make it concise.
TLDR
VDBUploadOperator wraps existing client VDB classes (LanceDB, Milvus) as an in-graph streaming stage, eliminating the collect-then-write pattern. The same graph operator can target different VDB backends; recall and throughput are validated per backend and are not expected to be identical across VDB implementations.
What changed
VDBABC.preprocessextracts canonical records (backend-agnostic);processwrites per-backend (LanceDB streaming viatable.add(), Milvus streaming via the Milvus client insert path with final index wait). Runs withconcurrency=1+batch_size=64._finalize_vdb()delegates post-pipeline index creation to the clientLanceDB.write_to_index(table=table)instead of a custom backend.graph_pipeline.pyreplacestake_all() + handle_lancedb()with.vdb_upload()in the fluent chain. VDB writes stream during the pipeline — the driver never collects the full dataset for VDB purposes. (take_allremains only for optional driver-side summary/export paths.)Quantitative evidence
LanceDB (harness validated, jp20)
Milvus (standalone integration test, jp20)
These numbers are backend validation results, not an equality target. Milvus and LanceDB can differ because their indexing/search implementations and filtering semantics differ.
Known follow-ups
take_all()for detection summary by default — decoupling that is a follow-upwrite_embeddings_to_lancedb()intext_embed/processor.pyis a pre-graph entry point that bypasses the operator — candidate for removal when the stage registry is retiredTest plan
test_vdb_upload_operator.py,test_vdb_record_contract.py)create_index+write_to_indexcontractretriever harness run --dataset bo20— smoke test PASSretriever harness run --dataset jp20 --override embed_batch_size=32— LanceDB streaming recall + PPS parityintegration_test_milvus_vdb.py)integration_test_milvus_recall.py)