Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions nemo_retriever/src/nemo_retriever/recall/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,13 @@ def _hits_to_keys(raw_hits: List[List[Dict[str, Any]]]) -> List[List[str]]:
keys: List[str] = []
for h in hits:
page_number = h["page_number"]
source = h["source"]
raw_source = h["source"]
# source may be a bare path string or a JSON object {"source_id": "..."}.
source_map = _parse_mapping(raw_source)
source = source_map.get("source_id", raw_source) if source_map else raw_source
# Prefer explicit `pdf_page` column; fall back to derived form.
if page_number is not None and source:
filename = Path(source).stem
filename = Path(str(source)).stem
keys.append(f"{filename}_{str(page_number)}")
else:
logger.warning(
Expand Down
7 changes: 7 additions & 0 deletions nemo_retriever/src/nemo_retriever/vector_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,23 @@
# SPDX-License-Identifier: Apache-2.0

from .__main__ import app
from .lancedb_backend import LanceDBBackend
from .lancedb_store import (
LanceDBConfig,
create_lancedb_index,
write_embeddings_to_lancedb,
write_text_embeddings_dir_to_lancedb,
)
from .vdb import VectorStore
from .vdb_records import build_vdb_records, build_vdb_records_from_dicts

__all__ = [
"app",
"LanceDBBackend",
"LanceDBConfig",
"VectorStore",
"build_vdb_records",
"build_vdb_records_from_dicts",
"create_lancedb_index",
"write_embeddings_to_lancedb",
"write_text_embeddings_dir_to_lancedb",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""LanceDB implementation of the :class:`VectorStore` interface."""

from __future__ import annotations

import logging
from typing import Any, Sequence

from nemo_retriever.params.models import LanceDbParams
from nemo_retriever.vector_store.lancedb_utils import infer_vector_dim, lancedb_schema
from nemo_retriever.vector_store.vdb import VectorStore

logger = logging.getLogger(__name__)


class LanceDBBackend(VectorStore):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you recreating this backend? Remember part of the motivation here is to show that we can consume VDBOperators that were created for the legacy system. One of your tests should be can I consume and correctly use the LanceDB operator from legacy.

"""LanceDB vector store backend.

Lazily connects and creates the table on the first :meth:`write_rows`
call so that the embedding dimension can be inferred from the data.
"""

def __init__(self, params: LanceDbParams | None = None) -> None:
self._params = params or LanceDbParams()
self._db: Any = None
self._table: Any = None

def create_table(self, *, dim: int, **kwargs: Any) -> None:
import lancedb

self._db = lancedb.connect(uri=self._params.lancedb_uri)
schema = lancedb_schema(vector_dim=dim)
mode = "overwrite" if self._params.overwrite else "create"
self._table = self._db.create_table(
self._params.table_name,
schema=schema,
mode=mode,
)

def write_rows(self, rows: Sequence[dict[str, Any]], **kwargs: Any) -> None:
if not rows:
return
if self._table is None:
self.create_table(dim=infer_vector_dim(list(rows)))
self._table.add(list(rows))

def create_index(self, **kwargs: Any) -> None:
if self._table is None:
return
if not self._params.create_index:
return

from nemo_retriever.vector_store.lancedb_store import LanceDBConfig, create_lancedb_index

cfg = LanceDBConfig(
uri=self._params.lancedb_uri,
table_name=self._params.table_name,
hybrid=self._params.hybrid,
fts_language=self._params.fts_language,
index_type=self._params.index_type,
metric=self._params.metric,
num_partitions=self._params.num_partitions,
num_sub_vectors=self._params.num_sub_vectors,
)
try:
create_lancedb_index(self._table, cfg=cfg)
except RuntimeError:
# KMeans cannot train when the dataset is smaller than num_partitions.
# This is expected for dev/test datasets; log and continue.
logger.warning(
"Index creation failed (likely too few rows for %d partitions); skipping.",
self._params.num_partitions,
)
Comment on lines +69 to +76
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 RuntimeError swallowed without traceback

The bare except RuntimeError: catches any RuntimeError thrown by create_lancedb_index — not only the KMeans partition-size failure. A real infrastructure error (e.g. connection failure, schema mismatch) would log a misleading "too few rows" message and silently succeed, leaving callers with a table that has no index and no indication of what went wrong. Add exc_info=True and bind the exception so it can be inspected:

Suggested change
try:
create_lancedb_index(self._table, cfg=cfg)
except RuntimeError:
# KMeans cannot train when the dataset is smaller than num_partitions.
# This is expected for dev/test datasets; log and continue.
logger.warning(
"Index creation failed (likely too few rows for %d partitions); skipping.",
self._params.num_partitions,
)
try:
create_lancedb_index(self._table, cfg=cfg)
except RuntimeError as exc:
# KMeans cannot train when the dataset is smaller than num_partitions.
# This is expected for dev/test datasets; log and continue.
logger.warning(
"Index creation failed (likely too few rows for %d partitions); skipping. Error: %s",
self._params.num_partitions,
exc,
exc_info=True,
)
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/vector_store/lancedb_backend.py
Line: 68-76

Comment:
**`RuntimeError` swallowed without traceback**

The bare `except RuntimeError:` catches any `RuntimeError` thrown by `create_lancedb_index` — not only the KMeans partition-size failure. A real infrastructure error (e.g. connection failure, schema mismatch) would log a misleading "too few rows" message and silently succeed, leaving callers with a table that has no index and no indication of what went wrong. Add `exc_info=True` and bind the exception so it can be inspected:

```suggestion
        try:
            create_lancedb_index(self._table, cfg=cfg)
        except RuntimeError as exc:
            # KMeans cannot train when the dataset is smaller than num_partitions.
            # This is expected for dev/test datasets; log and continue.
            logger.warning(
                "Index creation failed (likely too few rows for %d partitions); skipping. Error: %s",
                self._params.num_partitions,
                exc,
                exc_info=True,
            )
```

How can I resolve this? If you propose a fix, please make it concise.

58 changes: 39 additions & 19 deletions nemo_retriever/src/nemo_retriever/vector_store/lancedb_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
from datetime import timedelta

from nv_ingest_client.util.vdb.lancedb import LanceDB
from nemo_retriever.params.models import LanceDbParams
from nemo_retriever.vector_store.lancedb_utils import lancedb_schema
from nemo_retriever.vector_store.vdb_records import build_vdb_records, build_vdb_records_from_dicts
import pandas as pd
import lancedb

Expand Down Expand Up @@ -246,12 +248,29 @@ def _write_rows_to_lancedb(rows: Sequence[Dict[str, Any]], *, cfg: LanceDBConfig

def write_embeddings_to_lancedb(df_with_embeddings: pd.DataFrame, *, cfg: LanceDBConfig) -> None:
"""
Write embeddings found in `df_with_embeddings.metadata.embedding` to LanceDB.
Write embeddings found in *df_with_embeddings* to LanceDB.

This is used programmatically by `nemo_retriever.text_embed.stage.embed_text_from_primitives_df(...)`.
This is used programmatically by ``nemo_retriever.text_embed.stage``.
"""
rows = _build_lancedb_rows_from_df(df_with_embeddings)
_write_rows_to_lancedb(rows, cfg=cfg)
from nemo_retriever.vector_store.lancedb_backend import LanceDBBackend

records = build_vdb_records(df_with_embeddings)
params = LanceDbParams(
lancedb_uri=cfg.uri,
table_name=cfg.table_name,
overwrite=cfg.overwrite,
create_index=cfg.create_index,
hybrid=cfg.hybrid,
fts_language=cfg.fts_language,
index_type=cfg.index_type,
metric=cfg.metric,
num_partitions=cfg.num_partitions,
num_sub_vectors=cfg.num_sub_vectors,
)
backend = LanceDBBackend(params)
backend.write_rows(records)
if cfg.create_index:
backend.create_index()
Comment on lines +101 to +103
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Redundant outer create_index guard

backend.create_index() already returns early when self._params.create_index is False (which was just set from cfg.create_index), so the outer if cfg.create_index: check is a no-op. The symmetry is confusing and can mask bugs if the two flags ever diverge.

Suggested change
backend.write_rows(records)
if cfg.create_index:
backend.create_index()
backend.write_rows(records)
backend.create_index()
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/vector_store/lancedb_store.py
Line: 271-273

Comment:
**Redundant outer `create_index` guard**

`backend.create_index()` already returns early when `self._params.create_index` is `False` (which was just set from `cfg.create_index`), so the outer `if cfg.create_index:` check is a no-op. The symmetry is confusing and can mask bugs if the two flags ever diverge.

```suggestion
    backend.write_rows(records)
    backend.create_index()
```

How can I resolve this? If you propose a fix, please make it concise.



def write_text_embeddings_dir_to_lancedb(
Expand Down Expand Up @@ -307,25 +326,26 @@ def write_text_embeddings_dir_to_lancedb(


def handle_lancedb(
rows: Path,
rows: Any,
uri: str,
table_name: str,
hybrid: bool = False,
mode: str = "overwrite",
) -> Dict[str, Any]:
"""
Handle LanceDB writing for a batch pipeline run.
) -> None:
"""Write pipeline results to LanceDB."""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Docstring regression on public function

The previous handle_lancedb docstring described the caller context, the input expectations, and the index-creation behaviour. The replacement is a single-line stub. Per the project's docstrings-public-interface rule, public functions must document parameters, return values, and exceptions raised. The new signature also accepts rows: Any — the docstring should at least describe the two accepted types (pd.DataFrame or list[dict]) and note that this function returns None (the old type was Dict[str, Any]).

Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/vector_store/lancedb_store.py
Line: 335

Comment:
**Docstring regression on public function**

The previous `handle_lancedb` docstring described the caller context, the input expectations, and the index-creation behaviour. The replacement is a single-line stub. Per the project's `docstrings-public-interface` rule, public functions must document parameters, return values, and exceptions raised. The new signature also accepts `rows: Any` — the docstring should at least describe the two accepted types (`pd.DataFrame` or `list[dict]`) and note that this function returns `None` (the old type was `Dict[str, Any]`).

How can I resolve this? If you propose a fix, please make it concise.

from nemo_retriever.vector_store.lancedb_backend import LanceDBBackend

This is used by `nemo_retriever.examples.batch_pipeline.run(...)` after the embedding stage.
if isinstance(rows, pd.DataFrame):
records = build_vdb_records(rows)
else:
records = build_vdb_records_from_dicts(rows)

Reads `*.text_embeddings.json` files from `input_dir`, extracts embeddings, and uploads to LanceDB.
params = LanceDbParams(
lancedb_uri=uri,
table_name=table_name,
hybrid=hybrid,
overwrite=(mode == "overwrite"),
)
"""
lancedb_config = LanceDBConfig(
uri=uri, table_name=table_name, hybrid=hybrid
) # Use the same LanceDB config for writing and recall.
db = lancedb.connect(uri=lancedb_config.uri)
cleaned_rows = _build_lancedb_rows_from_df(rows)
_write_rows_to_lancedb(cleaned_rows, cfg=lancedb_config)
table = db.open_table(lancedb_config.table_name) # Ensure table is open and metadata is updated before proceeding.
create_lancedb_index(table, cfg=lancedb_config)
backend = LanceDBBackend(params)
backend.write_rows(records)
backend.create_index()
31 changes: 31 additions & 0 deletions nemo_retriever/src/nemo_retriever/vector_store/vdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""Abstract base class for vector store backends.

Backends receive rows in the canonical VDB record format produced by
:func:`nemo_retriever.vector_store.vdb_records.build_vdb_records`.
Write-path only; retrieval support will be added with the second backend.
"""

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Any, Sequence


class VectorStore(ABC):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you created a new vectorstore ABC, why not use the one we already had?

"""Abstract base for vector store backends."""

@abstractmethod
def create_table(self, *, dim: int, **kwargs: Any) -> None:
"""Create or reset the storage table / index."""

@abstractmethod
def write_rows(self, rows: Sequence[dict[str, Any]], **kwargs: Any) -> None:
"""Write a batch of canonical VDB records."""

@abstractmethod
def create_index(self, **kwargs: Any) -> None:
"""Build search indices after all writes complete."""
79 changes: 79 additions & 0 deletions nemo_retriever/src/nemo_retriever/vector_store/vdb_records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""Canonical VDB record builder.

Converts a pandas DataFrame (the graph pipeline's output format) into a list
of backend-neutral VDB record dicts. Every VDB backend in ``nemo_retriever``
consumes this record format — it is the single source of truth for the
DataFrame → VDB record contract.

Canonical record schema (matches ``retriever.py`` query expectations)::

vector : list[float] # embedding
text : str # content
metadata : str # JSON string (round-trips via json.loads)
source : str # JSON string {"source_id": "..."}
page_number : int
pdf_page : str # "basename_pagenum"
pdf_basename : str
filename : str
source_id : str
path : str
"""

from __future__ import annotations

from typing import Any, Dict, List

import pandas as pd

from nemo_retriever.vector_store.lancedb_utils import build_lancedb_row


def build_vdb_records(
df: pd.DataFrame,
*,
embedding_column: str = "text_embeddings_1b_v2",
embedding_key: str = "embedding",
text_column: str = "text",
include_text: bool = True,
) -> List[Dict[str, Any]]:
"""Convert a post-embed DataFrame into canonical VDB records.

Rows without a valid embedding are silently skipped.
"""
rows: List[Dict[str, Any]] = []
for row in df.itertuples(index=False):
row_out = build_lancedb_row(
row,
embedding_column=embedding_column,
embedding_key=embedding_key,
text_column=text_column,
include_text=include_text,
)
if row_out is not None:
rows.append(row_out)
return rows


def build_vdb_records_from_dicts(
records: List[Dict[str, Any]],
*,
embedding_column: str = "text_embeddings_1b_v2",
embedding_key: str = "embedding",
text_column: str = "text",
include_text: bool = True,
) -> List[Dict[str, Any]]:
"""Convert a list of dicts (e.g. from ``take_all()``) into canonical VDB records."""
if not records:
return []
df = pd.DataFrame(records)
return build_vdb_records(
df,
embedding_column=embedding_column,
embedding_key=embedding_key,
text_column=text_column,
include_text=include_text,
)
Loading
Loading