diff --git a/python/lancedb/table.py b/python/lancedb/table.py index 39eb94b6..89510f90 100644 --- a/python/lancedb/table.py +++ b/python/lancedb/table.py @@ -56,11 +56,22 @@ def _sanitize_data(data, schema, on_bad_vectors, fill_value): metadata = {k: v for k, v in metadata.items() if k != b"pandas"} schema = data.schema.with_metadata(metadata) data = pa.Table.from_arrays(data.columns, schema=schema) + if isinstance(data, Iterable): + data = _to_record_batch_generator(data, schema, on_bad_vectors, fill_value) if not isinstance(data, (pa.Table, Iterable)): raise TypeError(f"Unsupported data type: {type(data)}") return data +def _to_record_batch_generator(data: Iterable, schema, on_bad_vectors, fill_value): + for batch in data: + if not isinstance(batch, pa.RecordBatch): + table = _sanitize_data(batch, schema, on_bad_vectors, fill_value) + for batch in table.to_batches(): + yield batch + yield batch + + class Table(ABC): """ A [Table](Table) is a collection of Records in a LanceDB [Database](Database). diff --git a/python/tests/test_db.py b/python/tests/test_db.py index fb16cf16..643b2e22 100644 --- a/python/tests/test_db.py +++ b/python/tests/test_db.py @@ -17,7 +17,7 @@ import pyarrow as pa import pytest import lancedb -from lancedb.pydantic import LanceModel +from lancedb.pydantic import LanceModel, vector def test_basic(tmp_path): @@ -77,35 +77,78 @@ def test_ingest_pd(tmp_path): assert db.open_table("test").name == db["test"].name -def test_ingest_record_batch_iterator(tmp_path): - def batch_reader(): - for i in range(5): - yield pa.RecordBatch.from_arrays( - [ - pa.array([[3.1, 4.1], [5.9, 26.5]]), - pa.array(["foo", "bar"]), - pa.array([10.0, 20.0]), - ], - ["vector", "item", "price"], - ) +def test_ingest_iterator(tmp_path): + class PydanticSchema(LanceModel): + vector: vector(2) + item: str + price: float - db = lancedb.connect(tmp_path) - tbl = db.create_table( - "test", - batch_reader(), - schema=pa.schema( - [ - pa.field("vector", pa.list_(pa.float32())), - pa.field("item", pa.utf8()), - pa.field("price", pa.float32()), - ] - ), + arrow_schema = pa.schema( + [ + pa.field("vector", pa.list_(pa.float32(), 2)), + pa.field("item", pa.utf8()), + pa.field("price", pa.float32()), + ] ) - tbl_len = len(tbl) - tbl.add(batch_reader()) - assert len(tbl) == tbl_len * 2 - assert len(tbl.list_versions()) == 2 + def make_batches(): + for _ in range(5): + yield from [ + # pandas + pd.DataFrame( + { + "vector": [[3.1, 4.1], [1, 1]], + "item": ["foo", "bar"], + "price": [10.0, 20.0], + } + ), + # pylist + [ + {"vector": [3.1, 4.1], "item": "foo", "price": 10.0}, + {"vector": [5.9, 26.5], "item": "bar", "price": 20.0}, + ], + # recordbatch + pa.RecordBatch.from_arrays( + [ + pa.array([[3.1, 4.1], [5.9, 26.5]], pa.list_(pa.float32(), 2)), + pa.array(["foo", "bar"]), + pa.array([10.0, 20.0]), + ], + ["vector", "item", "price"], + ), + # pa Table + pa.Table.from_arrays( + [ + pa.array([[3.1, 4.1], [5.9, 26.5]], pa.list_(pa.float32(), 2)), + pa.array(["foo", "bar"]), + pa.array([10.0, 20.0]), + ], + ["vector", "item", "price"], + ), + # pydantic list + [ + PydanticSchema(vector=[3.1, 4.1], item="foo", price=10.0), + PydanticSchema(vector=[5.9, 26.5], item="bar", price=20.0), + ] + # TODO: test pydict separately. it is unique column number and names contraint + ] + + def run_tests(schema): + db = lancedb.connect(tmp_path) + tbl = db.create_table("table2", make_batches(), schema=schema, mode="overwrite") + + tbl.to_pandas() + assert tbl.search([3.1, 4.1]).limit(1).to_df()["_distance"][0] == 0.0 + assert tbl.search([5.9, 26.5]).limit(1).to_df()["_distance"][0] == 0.0 + + tbl_len = len(tbl) + tbl.add(make_batches()) + assert len(tbl) == tbl_len * 2 + assert len(tbl.list_versions()) == 2 + db.drop_database() + + run_tests(arrow_schema) + run_tests(PydanticSchema) def test_create_mode(tmp_path):