feat: hook up new writer for insert (#3029)

This hooks up a new writer implementation for the `add()` method. The
main immediate benefit is it allows streaming requests to remote tables,
and at the same time allowing retries for most inputs.

In NodeJS, we always convert the data to `Vec<RecordBatch>`, so it's
always retry-able.

For Python, all are retry-able, except `Iterator` and
`pa.RecordBatchReader`, which can only be consumed once. Some, like
`pa.datasets.Dataset` are retry-able *and* streaming.

A lot of the changes here are to make the new DataFusion write pipeline
maintain the same behavior as the existing Python-based preprocessing,
such as:

* casting input data to target schema
* rejecting NaN values if `on_bad_vectors="error"`
* applying embedding functions.

In future PRs, we'll enhance these by moving the embedding calls into
DataFusion and making sure we parallelize them. See:
https://github.com/lancedb/lancedb/issues/3048

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Will Jones
2026-02-23 14:43:31 -08:00
committed by GitHub
parent 367262662d
commit 0e486511fa
20 changed files with 2446 additions and 359 deletions

View File

@@ -1,8 +1,10 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
from functools import singledispatch
from typing import List, Optional, Tuple, Union
from lancedb.pydantic import LanceModel, model_to_dict
import pyarrow as pa
from ._lancedb import RecordBatchStream
@@ -80,3 +82,32 @@ def peek_reader(
yield from reader
return batch, pa.RecordBatchReader.from_batches(batch.schema, all_batches())
@singledispatch
def to_arrow(data) -> pa.Table:
"""Convert a single data object to a pa.Table."""
raise NotImplementedError(f"to_arrow not implemented for type {type(data)}")
@to_arrow.register(pa.RecordBatch)
def _arrow_from_batch(data: pa.RecordBatch) -> pa.Table:
return pa.Table.from_batches([data])
@to_arrow.register(pa.Table)
def _arrow_from_table(data: pa.Table) -> pa.Table:
return data
@to_arrow.register(list)
def _arrow_from_list(data: list) -> pa.Table:
if not data:
raise ValueError("Cannot create table from empty list without a schema")
if isinstance(data[0], LanceModel):
schema = data[0].__class__.to_arrow_schema()
dicts = [model_to_dict(d) for d in data]
return pa.Table.from_pylist(dicts, schema=schema)
return pa.Table.from_pylist(data)

View File

@@ -0,0 +1,214 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
from dataclasses import dataclass
from functools import singledispatch
import sys
from typing import Callable, Iterator, Optional
from lancedb.arrow import to_arrow
import pyarrow as pa
import pyarrow.dataset as ds
from .pydantic import LanceModel
@dataclass
class Scannable:
schema: pa.Schema
num_rows: Optional[int]
# Factory function to create a new reader each time (supports re-scanning)
reader: Callable[[], pa.RecordBatchReader]
# Whether reader can be called more than once. For example, an iterator can
# only be consumed once, while a DataFrame can be converted to a new reader
# each time.
rescannable: bool = True
@singledispatch
def to_scannable(data) -> Scannable:
# Fallback: try iterable protocol
if hasattr(data, "__iter__"):
return _from_iterable(iter(data))
raise NotImplementedError(f"to_scannable not implemented for type {type(data)}")
@to_scannable.register(pa.RecordBatchReader)
def _from_reader(data: pa.RecordBatchReader) -> Scannable:
# RecordBatchReader can only be consumed once - not rescannable
return Scannable(
schema=data.schema, num_rows=None, reader=lambda: data, rescannable=False
)
@to_scannable.register(pa.RecordBatch)
def _from_batch(data: pa.RecordBatch) -> Scannable:
return Scannable(
schema=data.schema,
num_rows=data.num_rows,
reader=lambda: pa.RecordBatchReader.from_batches(data.schema, [data]),
)
@to_scannable.register(pa.Table)
def _from_table(data: pa.Table) -> Scannable:
return Scannable(schema=data.schema, num_rows=data.num_rows, reader=data.to_reader)
@to_scannable.register(ds.Dataset)
def _from_dataset(data: ds.Dataset) -> Scannable:
return Scannable(
schema=data.schema,
num_rows=data.count_rows(),
reader=lambda: data.scanner().to_reader(),
)
@to_scannable.register(ds.Scanner)
def _from_scanner(data: ds.Scanner) -> Scannable:
# Scanner can only be consumed once - not rescannable
return Scannable(
schema=data.projected_schema,
num_rows=None,
reader=data.to_reader,
rescannable=False,
)
@to_scannable.register(list)
def _from_list(data: list) -> Scannable:
if not data:
raise ValueError("Cannot create table from empty list without a schema")
table = to_arrow(data)
return Scannable(
schema=table.schema, num_rows=table.num_rows, reader=table.to_reader
)
@to_scannable.register(dict)
def _from_dict(data: dict) -> Scannable:
raise ValueError("Cannot add a single dictionary to a table. Use a list.")
@to_scannable.register(LanceModel)
def _from_lance_model(data: LanceModel) -> Scannable:
raise ValueError("Cannot add a single LanceModel to a table. Use a list.")
def _from_iterable(data: Iterator) -> Scannable:
first_item = next(data, None)
if first_item is None:
raise ValueError("Cannot create table from empty iterator")
first = to_arrow(first_item)
schema = first.schema
def iter():
yield from first.to_batches()
for item in data:
batch = to_arrow(item)
if batch.schema != schema:
try:
batch = batch.cast(schema)
except pa.lib.ArrowInvalid:
raise ValueError(
f"Input iterator yielded a batch with schema that "
f"does not match the schema of other batches.\n"
f"Expected:\n{schema}\nGot:\n{batch.schema}"
)
yield from batch.to_batches()
reader = pa.RecordBatchReader.from_batches(schema, iter())
return to_scannable(reader)
_registered_modules: set[str] = set()
def _register_optional_converters():
"""Register converters for optional dependencies that are already imported."""
if "pandas" in sys.modules and "pandas" not in _registered_modules:
_registered_modules.add("pandas")
import pandas as pd
@to_arrow.register(pd.DataFrame)
def _arrow_from_pandas(data: pd.DataFrame) -> pa.Table:
table = pa.Table.from_pandas(data, preserve_index=False)
return table.replace_schema_metadata(None)
@to_scannable.register(pd.DataFrame)
def _from_pandas(data: pd.DataFrame) -> Scannable:
return to_scannable(_arrow_from_pandas(data))
if "polars" in sys.modules and "polars" not in _registered_modules:
_registered_modules.add("polars")
import polars as pl
@to_arrow.register(pl.DataFrame)
def _arrow_from_polars(data: pl.DataFrame) -> pa.Table:
return data.to_arrow()
@to_scannable.register(pl.DataFrame)
def _from_polars(data: pl.DataFrame) -> Scannable:
arrow = data.to_arrow()
return Scannable(
schema=arrow.schema, num_rows=len(data), reader=arrow.to_reader
)
@to_scannable.register(pl.LazyFrame)
def _from_polars_lazy(data: pl.LazyFrame) -> Scannable:
arrow = data.collect().to_arrow()
return Scannable(
schema=arrow.schema, num_rows=arrow.num_rows, reader=arrow.to_reader
)
if "datasets" in sys.modules and "datasets" not in _registered_modules:
_registered_modules.add("datasets")
from datasets import Dataset as HFDataset
from datasets import DatasetDict as HFDatasetDict
@to_scannable.register(HFDataset)
def _from_hf_dataset(data: HFDataset) -> Scannable:
table = data.data.table # Access underlying Arrow table
return Scannable(
schema=table.schema, num_rows=len(data), reader=table.to_reader
)
@to_scannable.register(HFDatasetDict)
def _from_hf_dataset_dict(data: HFDatasetDict) -> Scannable:
# HuggingFace DatasetDict: combine all splits with a 'split' column
schema = data[list(data.keys())[0]].features.arrow_schema
if "split" not in schema.names:
schema = schema.append(pa.field("split", pa.string()))
def gen():
for split_name, dataset in data.items():
for batch in dataset.data.to_batches():
split_arr = pa.array(
[split_name] * len(batch), type=pa.string()
)
yield pa.RecordBatch.from_arrays(
list(batch.columns) + [split_arr], schema=schema
)
total_rows = sum(len(dataset) for dataset in data.values())
return Scannable(
schema=schema,
num_rows=total_rows,
reader=lambda: pa.RecordBatchReader.from_batches(schema, gen()),
)
if "lance" in sys.modules and "lance" not in _registered_modules:
_registered_modules.add("lance")
import lance
@to_scannable.register(lance.LanceDataset)
def _from_lance(data: lance.LanceDataset) -> Scannable:
return Scannable(
schema=data.schema,
num_rows=data.count_rows(),
reader=lambda: data.scanner().to_reader(),
)
# Register on module load
_register_optional_converters()

View File

@@ -25,6 +25,8 @@ from typing import (
)
from urllib.parse import urlparse
from lancedb.scannable import _register_optional_converters, to_scannable
from . import __version__
from lancedb.arrow import peek_reader
from lancedb.background_loop import LOOP
@@ -3727,18 +3729,31 @@ class AsyncTable:
on_bad_vectors = "error"
if fill_value is None:
fill_value = 0.0
data = _sanitize_data(
data,
schema,
metadata=schema.metadata,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
allow_subschema=True,
)
if isinstance(data, pa.Table):
data = data.to_reader()
return await self._inner.add(data, mode or "append")
# _santitize_data is an old code path, but we will use it until the
# new code path is ready.
if on_bad_vectors != "error" or (
schema.metadata is not None and b"embedding_functions" in schema.metadata
):
data = _sanitize_data(
data,
schema,
metadata=schema.metadata,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
allow_subschema=True,
)
_register_optional_converters()
data = to_scannable(data)
try:
return await self._inner.add(data, mode or "append")
except RuntimeError as e:
if "Cast error" in str(e):
raise ValueError(e)
elif "Vector column contains NaN" in str(e):
raise ValueError(e)
else:
raise
def merge_insert(self, on: Union[str, Iterable[str]]) -> LanceMergeInsertBuilder:
"""

View File

@@ -810,7 +810,7 @@ def test_create_index_name_and_train_parameters(
)
def test_add_with_nans(mem_db: DBConnection):
def test_create_with_nans(mem_db: DBConnection):
# by default we raise an error on bad input vectors
bad_data = [
{"vector": [np.nan], "item": "bar", "price": 20.0},
@@ -854,6 +854,57 @@ def test_add_with_nans(mem_db: DBConnection):
assert np.allclose(v, np.array([0.0, 0.0]))
def test_add_with_nans(mem_db: DBConnection):
schema = pa.schema(
[
pa.field("vector", pa.list_(pa.float32(), 2), nullable=True),
pa.field("item", pa.string(), nullable=True),
pa.field("price", pa.float64(), nullable=False),
],
)
table = mem_db.create_table("test", schema=schema)
# by default we raise an error on bad input vectors
bad_data = [
{"vector": [np.nan], "item": "bar", "price": 20.0},
{"vector": [5], "item": "bar", "price": 20.0},
{"vector": [np.nan, np.nan], "item": "bar", "price": 20.0},
{"vector": [np.nan, 5.0], "item": "bar", "price": 20.0},
]
for row in bad_data:
with pytest.raises(ValueError):
table.add(
data=[row],
)
table.add(
[
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
{"vector": [2.1, 4.1], "item": "foo", "price": 9.0},
{"vector": [np.nan], "item": "bar", "price": 20.0},
{"vector": [5], "item": "bar", "price": 20.0},
{"vector": [np.nan, np.nan], "item": "bar", "price": 20.0},
],
on_bad_vectors="drop",
)
assert len(table) == 2
table.delete("true")
# We can fill bad input with some value
table.add(
data=[
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
{"vector": [np.nan], "item": "bar", "price": 20.0},
{"vector": [np.nan, np.nan], "item": "bar", "price": 20.0},
],
on_bad_vectors="fill",
fill_value=0.0,
)
assert len(table) == 3
arrow_tbl = table.search().where("item == 'bar'").to_arrow()
v = arrow_tbl["vector"].to_pylist()[0]
assert np.allclose(v, np.array([0.0, 0.0]))
def test_restore(mem_db: DBConnection):
table = mem_db.create_table(
"my_table",

View File

@@ -7,6 +7,7 @@ use crate::{
error::PythonErrorExt,
index::{extract_index_params, IndexConfig},
query::{Query, TakeQuery},
table::scannable::PyScannable,
};
use arrow::{
datatypes::{DataType, Schema},
@@ -25,6 +26,8 @@ use pyo3::{
};
use pyo3_async_runtimes::tokio::future_into_py;
mod scannable;
/// Statistics about a compaction operation.
#[pyclass(get_all)]
#[derive(Clone, Debug)]
@@ -293,12 +296,10 @@ impl Table {
pub fn add<'a>(
self_: PyRef<'a, Self>,
data: Bound<'_, PyAny>,
data: PyScannable,
mode: String,
) -> PyResult<Bound<'a, PyAny>> {
let batches: Box<dyn arrow::array::RecordBatchReader + Send> =
Box::new(ArrowArrayStreamReader::from_pyarrow_bound(&data)?);
let mut op = self_.inner_ref()?.add(batches);
let mut op = self_.inner_ref()?.add(data);
if mode == "append" {
op = op.mode(AddDataMode::Append);
} else if mode == "overwrite" {

View File

@@ -0,0 +1,145 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::sync::Arc;
use arrow::{
datatypes::{Schema, SchemaRef},
ffi_stream::ArrowArrayStreamReader,
pyarrow::{FromPyArrow, PyArrowType},
};
use futures::StreamExt;
use lancedb::{
arrow::{SendableRecordBatchStream, SimpleRecordBatchStream},
data::scannable::Scannable,
Error,
};
use pyo3::{types::PyAnyMethods, FromPyObject, Py, PyAny, Python};
/// Adapter that implements Scannable for a Python reader factory callable.
///
/// This holds a Python callable that returns a RecordBatchReader when called.
/// For rescannable sources, the callable can be invoked multiple times to
/// get fresh readers.
pub struct PyScannable {
/// Python callable that returns a RecordBatchReader
reader_factory: Py<PyAny>,
schema: SchemaRef,
num_rows: Option<usize>,
rescannable: bool,
}
impl std::fmt::Debug for PyScannable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PyScannable")
.field("schema", &self.schema)
.field("num_rows", &self.num_rows)
.field("rescannable", &self.rescannable)
.finish()
}
}
impl Scannable for PyScannable {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn scan_as_stream(&mut self) -> SendableRecordBatchStream {
let reader: Result<ArrowArrayStreamReader, Error> = {
Python::attach(|py| {
let result =
self.reader_factory
.call0(py)
.map_err(|e| lancedb::Error::Runtime {
message: format!("Python reader factory failed: {}", e),
})?;
ArrowArrayStreamReader::from_pyarrow_bound(result.bind(py)).map_err(|e| {
lancedb::Error::Runtime {
message: format!("Failed to create Arrow reader from Python: {}", e),
}
})
})
};
// Reader is blocking but stream is non-blocking, so we need to spawn a task to pull.
let (tx, rx) = tokio::sync::mpsc::channel(1);
let join_handle = tokio::task::spawn_blocking(move || {
let reader = match reader {
Ok(reader) => reader,
Err(e) => {
let _ = tx.blocking_send(Err(e));
return;
}
};
for batch in reader {
match batch {
Ok(batch) => {
if tx.blocking_send(Ok(batch)).is_err() {
// Receiver dropped, stop processing
break;
}
}
Err(source) => {
let _ = tx.blocking_send(Err(Error::Arrow { source }));
break;
}
}
}
});
let schema = self.schema.clone();
let stream = futures::stream::unfold(
(rx, Some(join_handle)),
|(mut rx, join_handle)| async move {
match rx.recv().await {
Some(Ok(batch)) => Some((Ok(batch), (rx, join_handle))),
Some(Err(e)) => Some((Err(e), (rx, join_handle))),
None => {
// Channel closed. Check if the task panicked — a panic
// drops the sender without sending an error, so without
// this check we'd silently return a truncated stream.
if let Some(handle) = join_handle {
if let Err(join_err) = handle.await {
return Some((
Err(Error::Runtime {
message: format!("Reader task panicked: {}", join_err),
}),
(rx, None),
));
}
}
None
}
}
},
);
Box::pin(SimpleRecordBatchStream::new(stream.fuse(), schema))
}
fn num_rows(&self) -> Option<usize> {
self.num_rows
}
fn rescannable(&self) -> bool {
self.rescannable
}
}
impl<'py> FromPyObject<'py> for PyScannable {
fn extract_bound(ob: &pyo3::Bound<'py, PyAny>) -> pyo3::PyResult<Self> {
// Convert from Scannable dataclass.
let schema: PyArrowType<Schema> = ob.getattr("schema")?.extract()?;
let schema = Arc::new(schema.0);
let num_rows: Option<usize> = ob.getattr("num_rows")?.extract()?;
let rescannable: bool = ob.getattr("rescannable")?.extract()?;
let reader_factory: Py<PyAny> = ob.getattr("reader")?.unbind();
Ok(Self {
schema,
reader_factory,
num_rows,
rescannable,
})
}
}