Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f2be9cc
python: add explicit cuvs accelerator path
Xuanwo Mar 31, 2026
2f071f6
python: document cuvs installation requirements
Xuanwo Mar 31, 2026
1a6c44b
python: fix cuvs training on real datasets
Xuanwo Apr 1, 2026
76995c0
python: format cuvs helper
Xuanwo Apr 1, 2026
fbe0f50
python: clarify accelerator hardware requirements
Xuanwo Apr 1, 2026
f00d078
python: add cuvs one-pass ivfpq assignment
Xuanwo Apr 1, 2026
7f7e6e2
python: use cuvs transform for full ivf pq build
Xuanwo Apr 1, 2026
f9c5d03
python: route cuvs precomputed shuffle to v3 files
Xuanwo Apr 2, 2026
ec99cda
python: fix cuvs pq_dim semantics
Xuanwo Apr 2, 2026
1991638
python: fix cuvs pq_dim semantics
Xuanwo Apr 2, 2026
54c29d8
python: revert cuvs shuffle dataset integration
Xuanwo Apr 2, 2026
5ecefc4
Support precomputed encoded datasets for IVF_PQ build
Xuanwo Apr 3, 2026
e3f29f5
python: fix fragment scans in cuvs encoded dataset metadata
Xuanwo Apr 3, 2026
769a218
feat: add partition artifacts for cuvs builds
Xuanwo Apr 7, 2026
c0af491
refactor: decouple cuvs backend from main tree
Xuanwo Apr 7, 2026
578f789
refactor: remove in-tree cuvs integration
Xuanwo Apr 8, 2026
5ea2498
python: delegate cuvs acceleration to external backend
Xuanwo Apr 8, 2026
b38f393
Merge remote-tracking branch 'origin/main' into HEAD
Xuanwo Apr 8, 2026
d60e11f
fix: remove merge leftover import
Xuanwo Apr 8, 2026
f254850
refactor: drop transitional cuvs compatibility paths
Xuanwo Apr 8, 2026
51a141b
docs: document partition artifact internals
Xuanwo Apr 8, 2026
15e42fd
fix: stream partition artifact writes
Xuanwo Apr 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions python/python/lance/cuvs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The Lance Authors

from __future__ import annotations

import os
import tempfile
from importlib import import_module
from typing import TYPE_CHECKING, Optional

if TYPE_CHECKING:
from pathlib import Path


def is_cuvs_accelerator(accelerator: object) -> bool:
return isinstance(accelerator, str) and accelerator.lower() == "cuvs"


def _require_lance_cuvs():
try:
return import_module("lance_cuvs")
except ModuleNotFoundError as exc:
raise ModuleNotFoundError(
"accelerator='cuvs' requires the external 'lance-cuvs' package "
"to be installed."
) from exc


def build_vector_index_on_cuvs(
dataset,
column: str,
metric_type: str,
accelerator: str,
num_partitions: int,
num_sub_vectors: int,
dst_dataset_uri: str | Path | None = None,
storage_options: Optional[dict[str, str]] = None,
*,
sample_rate: int = 256,
max_iters: int = 50,
num_bits: int = 8,
batch_size: int = 1024 * 128,
filter_nan: bool = True,
):
if not is_cuvs_accelerator(accelerator):
raise ValueError("build_vector_index_on_cuvs requires accelerator='cuvs'")

backend = _require_lance_cuvs()
artifact_uri = (
os.fspath(dst_dataset_uri)
if dst_dataset_uri is not None
else tempfile.mkdtemp(prefix="lance-cuvs-artifact-")
)
training = backend.train_ivf_pq(
dataset.uri,
column,
metric_type=metric_type,
num_partitions=num_partitions,
num_sub_vectors=num_sub_vectors,
sample_rate=sample_rate,
max_iters=max_iters,
num_bits=num_bits,
filter_nan=filter_nan,
storage_options=storage_options,
)
artifact = backend.build_ivf_pq_artifact(
dataset.uri,
column,
training=training,
artifact_uri=artifact_uri,
batch_size=batch_size,
filter_nan=filter_nan,
storage_options=storage_options,
)
return (
artifact.artifact_uri,
artifact.files,
training.ivf_centroids(),
training.pq_codebook(),
)


def prepare_global_ivf_pq_on_cuvs(
dataset,
column: str,
num_partitions: int,
num_sub_vectors: int,
*,
distance_type: str = "l2",
accelerator: str = "cuvs",
sample_rate: int = 256,
max_iters: int = 50,
num_bits: int = 8,
filter_nan: bool = True,
):
if not is_cuvs_accelerator(accelerator):
raise ValueError("prepare_global_ivf_pq_on_cuvs requires accelerator='cuvs'")

backend = _require_lance_cuvs()
training = backend.train_ivf_pq(
dataset.uri,
column,
metric_type=distance_type,
num_partitions=num_partitions,
num_sub_vectors=num_sub_vectors,
sample_rate=sample_rate,
max_iters=max_iters,
num_bits=num_bits,
filter_nan=filter_nan,
)
return {
"ivf_centroids": training.ivf_centroids(),
"pq_codebook": training.pq_codebook(),
}
145 changes: 97 additions & 48 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from lance.log import LOGGER

from .blob import BlobFile
from .cuvs import is_cuvs_accelerator
from .dependencies import (
_check_for_numpy,
_check_for_torch,
Expand Down Expand Up @@ -2918,12 +2919,14 @@ def _create_index_impl(

# Handle timing for various parts of accelerated builds
timers = {}
use_cuvs = is_cuvs_accelerator(accelerator)
if accelerator is not None and index_type != "IVF_PQ":
LOGGER.warning(
"Index type %s does not support GPU acceleration; falling back to CPU",
index_type,
)
accelerator = None
use_cuvs = False

# IMPORTANT: Distributed indexing is CPU-only. Enforce single-node when
# accelerator or torch-related paths are detected.
Expand Down Expand Up @@ -2967,57 +2970,85 @@ def _create_index_impl(
index_uuid = None

if accelerator is not None:
from .vector import (
one_pass_assign_ivf_pq_on_accelerator,
one_pass_train_ivf_pq_on_accelerator,
)

LOGGER.info("Doing one-pass ivfpq accelerated computations")
if num_partitions is None:
num_rows = self.count_rows()
num_partitions = _target_partition_size_to_num_partitions(
num_rows, target_partition_size
)
timers["ivf+pq_train:start"] = time.time()
(
ivf_centroids,
ivf_kmeans,
pq_codebook,
pq_kmeans_list,
) = one_pass_train_ivf_pq_on_accelerator(
self,
column[0],
num_partitions,
metric,
accelerator,
num_sub_vectors=num_sub_vectors,
batch_size=20480,
filter_nan=filter_nan,
)
timers["ivf+pq_train:end"] = time.time()
ivfpq_train_time = timers["ivf+pq_train:end"] - timers["ivf+pq_train:start"]
LOGGER.info("ivf+pq training time: %ss", ivfpq_train_time)
timers["ivf+pq_assign:start"] = time.time()
shuffle_output_dir, shuffle_buffers = one_pass_assign_ivf_pq_on_accelerator(
self,
column[0],
metric,
accelerator,
ivf_kmeans,
pq_kmeans_list,
batch_size=20480,
filter_nan=filter_nan,
)
timers["ivf+pq_assign:end"] = time.time()
ivfpq_assign_time = (
timers["ivf+pq_assign:end"] - timers["ivf+pq_assign:start"]
)
LOGGER.info("ivf+pq transform time: %ss", ivfpq_assign_time)
if use_cuvs:
from .cuvs import build_vector_index_on_cuvs

LOGGER.info("Doing cuVS vector backend build")
timers["ivf+pq_build:start"] = time.time()
artifact_root, _, ivf_centroids, pq_codebook = build_vector_index_on_cuvs(
self,
column[0],
metric,
accelerator,
num_partitions,
num_sub_vectors,
storage_options=storage_options,
sample_rate=kwargs.get("sample_rate", 256),
max_iters=kwargs.get("max_iters", 50),
num_bits=kwargs.get("num_bits", 8),
batch_size=1024 * 128,
filter_nan=filter_nan,
)
kwargs["precomputed_partition_artifact_uri"] = artifact_root
timers["ivf+pq_build:end"] = time.time()
ivfpq_build_time = (
timers["ivf+pq_build:end"] - timers["ivf+pq_build:start"]
)
LOGGER.info("cuVS ivf+pq build time: %ss", ivfpq_build_time)
else:
from .vector import (
one_pass_assign_ivf_pq_on_accelerator,
one_pass_train_ivf_pq_on_accelerator,
)

kwargs["precomputed_shuffle_buffers"] = shuffle_buffers
kwargs["precomputed_shuffle_buffers_path"] = os.path.join(
shuffle_output_dir, "data"
)
LOGGER.info("Doing one-pass ivfpq accelerated computations")
timers["ivf+pq_train:start"] = time.time()
(
ivf_centroids,
ivf_kmeans,
pq_codebook,
pq_kmeans_list,
) = one_pass_train_ivf_pq_on_accelerator(
self,
column[0],
num_partitions,
metric,
accelerator,
num_sub_vectors=num_sub_vectors,
batch_size=20480,
filter_nan=filter_nan,
)
timers["ivf+pq_train:end"] = time.time()
ivfpq_train_time = (
timers["ivf+pq_train:end"] - timers["ivf+pq_train:start"]
)
LOGGER.info("ivf+pq training time: %ss", ivfpq_train_time)
timers["ivf+pq_assign:start"] = time.time()
shuffle_output_dir, shuffle_buffers = one_pass_assign_ivf_pq_on_accelerator(
self,
column[0],
metric,
accelerator,
ivf_kmeans,
pq_kmeans_list,
batch_size=20480,
filter_nan=filter_nan,
)
timers["ivf+pq_assign:end"] = time.time()
ivfpq_assign_time = (
timers["ivf+pq_assign:end"] - timers["ivf+pq_assign:start"]
)
LOGGER.info("ivf+pq transform time: %ss", ivfpq_assign_time)

kwargs["precomputed_shuffle_buffers"] = shuffle_buffers
kwargs["precomputed_shuffle_buffers_path"] = os.path.join(
shuffle_output_dir, "data"
)
if index_type.startswith("IVF"):
if (ivf_centroids is not None) and (ivf_centroids_file is not None):
raise ValueError(
Expand Down Expand Up @@ -3173,6 +3204,13 @@ def _create_index_impl(
"Temporary shuffle buffers stored at %s, you may want to delete it.",
kwargs["precomputed_shuffle_buffers_path"],
)
if "precomputed_partition_artifact_uri" in kwargs.keys() and os.path.exists(
kwargs["precomputed_partition_artifact_uri"]
):
LOGGER.info(
"Temporary precomputed partition artifact stored at %s, you may want to delete it.",
kwargs["precomputed_partition_artifact_uri"],
)
return index

def create_index(
Expand Down Expand Up @@ -3249,7 +3287,12 @@ def create_index(
The number of sub-vectors for PQ (Product Quantization).
accelerator : str or ``torch.Device``, optional
If set, use an accelerator to speed up the training process.
Accepted accelerator: "cuda" (Nvidia GPU) and "mps" (Apple Silicon GPU).
Accepted accelerator:

- "cuda" (Nvidia GPU)
- "mps" (Apple Silicon GPU)
- "cuvs" for the external `lance-cuvs` backend

If not set, use the CPU.
index_cache_size : int, optional
The size of the index cache in number of entries. Default value is 256.
Expand Down Expand Up @@ -3318,6 +3361,11 @@ def create_index(
Only 4, 8 are supported.
- index_file_version
The version of the index file. Default is "V3".
- precomputed_partition_artifact_uri
An advanced input produced by an external backend such as
`lance-cuvs`. When set, Lance skips its own partition assignment
and consumes the precomputed partition-local artifact during
finalization. Requires `ivf_centroids` and `pq_codebook`.

Optional parameters for `IVF_RQ`:

Expand Down Expand Up @@ -3361,8 +3409,9 @@ def create_index(
Experimental Accelerator (GPU) support:

- *accelerate*: use GPU to train IVF partitions.
Only supports CUDA (Nvidia) or MPS (Apple) currently.
Requires PyTorch being installed.
Supports CUDA (Nvidia) and MPS (Apple) via the built-in torch path.
`accelerator="cuvs"` delegates IVF_PQ build preparation to the
external `lance-cuvs` package.

.. code-block:: python

Expand Down
25 changes: 25 additions & 0 deletions python/python/lance/indices/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import numpy as np
import pyarrow as pa

from lance.cuvs import is_cuvs_accelerator, prepare_global_ivf_pq_on_cuvs
from lance.indices.ivf import IvfModel
from lance.indices.pq import PqModel

Expand Down Expand Up @@ -115,6 +116,11 @@ def train_ivf(
self._verify_ivf_sample_rate(sample_rate, num_partitions, num_rows)
distance_type = self._normalize_distance_type(distance_type)
self._verify_ivf_params(num_partitions)
if is_cuvs_accelerator(accelerator):
raise NotImplementedError(
"IndicesBuilder.train_ivf does not support accelerator='cuvs'; "
"use prepare_global_ivf_pq instead"
)

if accelerator is None:
from lance.lance import indices
Expand Down Expand Up @@ -250,6 +256,25 @@ def prepare_global_ivf_pq(
`IndicesBuilder.train_pq` (indices.train_pq_model). No public method
names elsewhere are changed.
"""
if is_cuvs_accelerator(accelerator):
if fragment_ids is not None:
raise NotImplementedError(
"fragment_ids is not supported with accelerator='cuvs'"
)
num_rows = self._count_rows()
num_partitions = self._determine_num_partitions(num_partitions, num_rows)
num_subvectors = self._normalize_pq_params(num_subvectors, self.dimension)
return prepare_global_ivf_pq_on_cuvs(
self.dataset,
self.column[0],
num_partitions,
num_subvectors,
distance_type=distance_type,
accelerator=accelerator,
sample_rate=sample_rate,
max_iters=max_iters,
)

# Global IVF training
ivf_model = self.train_ivf(
num_partitions,
Expand Down
Loading
Loading