Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions nemo_retriever/src/nemo_retriever/recall/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,13 @@ def _hits_to_keys(raw_hits: List[List[Dict[str, Any]]]) -> List[List[str]]:
keys: List[str] = []
for h in hits:
page_number = h["page_number"]
source = h["source"]
raw_source = h["source"]
# source may be a bare path string or a JSON object {"source_id": "..."}.
source_map = _parse_mapping(raw_source)
source = source_map.get("source_id", raw_source) if source_map else raw_source
# Prefer explicit `pdf_page` column; fall back to derived form.
if page_number is not None and source:
filename = Path(source).stem
filename = Path(str(source)).stem
keys.append(f"{filename}_{str(page_number)}")
else:
logger.warning(
Expand Down
5 changes: 3 additions & 2 deletions nemo_retriever/src/nemo_retriever/text_embed/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from nv_ingest_api.internal.transform.embed_text import transform_create_text_embeddings_internal

from nemo_retriever.io.dataframe import validate_primitives_dataframe
from nemo_retriever.vector_store.lancedb_store import LanceDBConfig, write_embeddings_to_lancedb
from nemo_retriever.params.models import LanceDbParams
from nemo_retriever.vector_store.lancedb_store import write_embeddings_to_lancedb

logger = logging.getLogger(__name__)

Expand All @@ -25,7 +26,7 @@ def embed_text_from_primitives_df(
*,
transform_config: TextEmbeddingSchema,
task_config: Optional[Dict[str, Any]] = None,
lancedb: Optional[LanceDBConfig] = None,
lancedb: Optional[LanceDbParams] = None,
trace_info: Optional[Dict[str, Any]] = None,
) -> Tuple[pd.DataFrame, Dict[str, Any]]:
"""Generate embeddings for supported content types and write to metadata."""
Expand Down
9 changes: 7 additions & 2 deletions nemo_retriever/src/nemo_retriever/vector_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@
# SPDX-License-Identifier: Apache-2.0

from .__main__ import app
from .lancedb_backend import LanceDBBackend
from .lancedb_store import (
LanceDBConfig,
create_lancedb_index,
write_embeddings_to_lancedb,
write_text_embeddings_dir_to_lancedb,
)
from .vdb import VectorStore
from .vdb_records import build_vdb_records, build_vdb_records_from_dicts

__all__ = [
"app",
"LanceDBConfig",
"LanceDBBackend",
"VectorStore",
"build_vdb_records",
"build_vdb_records_from_dicts",
"create_lancedb_index",
"write_embeddings_to_lancedb",
"write_text_embeddings_dir_to_lancedb",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""LanceDB implementation of the :class:`VectorStore` interface."""

from __future__ import annotations

import logging
from typing import Any, Sequence

from nemo_retriever.params.models import LanceDbParams
from nemo_retriever.vector_store.lancedb_utils import infer_vector_dim, lancedb_schema
from nemo_retriever.vector_store.vdb import VectorStore

logger = logging.getLogger(__name__)


class LanceDBBackend(VectorStore):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you recreating this backend? Remember part of the motivation here is to show that we can consume VDBOperators that were created for the legacy system. One of your tests should be can I consume and correctly use the LanceDB operator from legacy.

"""LanceDB vector store backend.

Lazily connects and creates the table on the first :meth:`write_rows`
call so that the embedding dimension can be inferred from the data.
"""

def __init__(self, params: LanceDbParams | None = None) -> None:
self._params = params or LanceDbParams()
self._db: Any = None
self._table: Any = None

def open_table(self) -> None:
"""Open an existing LanceDB table without creating it.

Used by the driver to run post-pipeline finalization (e.g. index
creation) after distributed workers have written all rows.
"""
import lancedb

self._db = lancedb.connect(uri=self._params.lancedb_uri)
self._table = self._db.open_table(self._params.table_name)

def create_table(self, *, dim: int, **kwargs: Any) -> None:
import lancedb

self._db = lancedb.connect(uri=self._params.lancedb_uri)
schema = lancedb_schema(vector_dim=dim)
mode = "overwrite" if self._params.overwrite else "create"
self._table = self._db.create_table(
self._params.table_name,
schema=schema,
mode=mode,
)

def write_rows(self, rows: Sequence[dict[str, Any]], **kwargs: Any) -> None:
if not rows:
return
if self._table is None:
self.create_table(dim=infer_vector_dim(list(rows)))
self._table.add(list(rows))

def create_index(self, **kwargs: Any) -> None:
if self._table is None:
return
if not self._params.create_index:
return

from nemo_retriever.vector_store.lancedb_store import create_lancedb_index

try:
create_lancedb_index(self._table, cfg=self._params)
except RuntimeError:
logger.warning(
"Index creation failed (likely too few rows for %d partitions); skipping.",
self._params.num_partitions,
exc_info=True,
)
Comment on lines +69 to +76
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 RuntimeError swallowed without traceback

The bare except RuntimeError: catches any RuntimeError thrown by create_lancedb_index — not only the KMeans partition-size failure. A real infrastructure error (e.g. connection failure, schema mismatch) would log a misleading "too few rows" message and silently succeed, leaving callers with a table that has no index and no indication of what went wrong. Add exc_info=True and bind the exception so it can be inspected:

Suggested change
try:
create_lancedb_index(self._table, cfg=cfg)
except RuntimeError:
# KMeans cannot train when the dataset is smaller than num_partitions.
# This is expected for dev/test datasets; log and continue.
logger.warning(
"Index creation failed (likely too few rows for %d partitions); skipping.",
self._params.num_partitions,
)
try:
create_lancedb_index(self._table, cfg=cfg)
except RuntimeError as exc:
# KMeans cannot train when the dataset is smaller than num_partitions.
# This is expected for dev/test datasets; log and continue.
logger.warning(
"Index creation failed (likely too few rows for %d partitions); skipping. Error: %s",
self._params.num_partitions,
exc,
exc_info=True,
)
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/vector_store/lancedb_backend.py
Line: 68-76

Comment:
**`RuntimeError` swallowed without traceback**

The bare `except RuntimeError:` catches any `RuntimeError` thrown by `create_lancedb_index` — not only the KMeans partition-size failure. A real infrastructure error (e.g. connection failure, schema mismatch) would log a misleading "too few rows" message and silently succeed, leaving callers with a table that has no index and no indication of what went wrong. Add `exc_info=True` and bind the exception so it can be inspected:

```suggestion
        try:
            create_lancedb_index(self._table, cfg=cfg)
        except RuntimeError as exc:
            # KMeans cannot train when the dataset is smaller than num_partitions.
            # This is expected for dev/test datasets; log and continue.
            logger.warning(
                "Index creation failed (likely too few rows for %d partitions); skipping. Error: %s",
                self._params.num_partitions,
                exc,
                exc_info=True,
            )
```

How can I resolve this? If you propose a fix, please make it concise.

Loading
Loading