From 0b9924b43271ca149b050a470cb93eabc5f4b80d Mon Sep 17 00:00:00 2001 From: Ayush Chaurasia Date: Fri, 18 Aug 2023 09:56:30 +0530 Subject: [PATCH] Make creating (and adding to) tables via Iterators more flexible & intuitive (#430) It improves the UX as iterators can be of any type supported by the table (plus recordbatch) & there is no separate requirement. Also expands the test cases for pydantic & arrow schema. If this is looks good I'll update the docs. Example usage: ``` class Content(LanceModel): vector: vector(2) item: str price: float def make_batches(): for _ in range(5): yield from [ # pandas pd.DataFrame({ "vector": [[3.1, 4.1], [1, 1]], "item": ["foo", "bar"], "price": [10.0, 20.0], }), # pylist [ {"vector": [3.1, 4.1], "item": "foo", "price": 10.0}, {"vector": [5.9, 26.5], "item": "bar", "price": 20.0}, ], # recordbatch pa.RecordBatch.from_arrays( [ pa.array([[3.1, 4.1], [5.9, 26.5]], pa.list_(pa.float32(), 2)), pa.array(["foo", "bar"]), pa.array([10.0, 20.0]), ], ["vector", "item", "price"], ), # pydantic list [ Content(vector=[3.1, 4.1], item="foo", price=10.0), Content(vector=[5.9, 26.5], item="bar", price=20.0), ]] db = lancedb.connect("db") tbl = db.create_table("tabley", make_batches(), schema=Content, mode="overwrite") tbl.add(make_batches()) ``` Same should with arrow schema. --------- Co-authored-by: Weston Pace --- python/lancedb/table.py | 11 +++++ python/tests/test_db.py | 97 +++++++++++++++++++++++++++++------------ 2 files changed, 81 insertions(+), 27 deletions(-) diff --git a/python/lancedb/table.py b/python/lancedb/table.py index 39eb94b6..89510f90 100644 --- a/python/lancedb/table.py +++ b/python/lancedb/table.py @@ -56,11 +56,22 @@ def _sanitize_data(data, schema, on_bad_vectors, fill_value): metadata = {k: v for k, v in metadata.items() if k != b"pandas"} schema = data.schema.with_metadata(metadata) data = pa.Table.from_arrays(data.columns, schema=schema) + if isinstance(data, Iterable): + data = _to_record_batch_generator(data, schema, on_bad_vectors, fill_value) if not isinstance(data, (pa.Table, Iterable)): raise TypeError(f"Unsupported data type: {type(data)}") return data +def _to_record_batch_generator(data: Iterable, schema, on_bad_vectors, fill_value): + for batch in data: + if not isinstance(batch, pa.RecordBatch): + table = _sanitize_data(batch, schema, on_bad_vectors, fill_value) + for batch in table.to_batches(): + yield batch + yield batch + + class Table(ABC): """ A [Table](Table) is a collection of Records in a LanceDB [Database](Database). diff --git a/python/tests/test_db.py b/python/tests/test_db.py index fb16cf16..643b2e22 100644 --- a/python/tests/test_db.py +++ b/python/tests/test_db.py @@ -17,7 +17,7 @@ import pyarrow as pa import pytest import lancedb -from lancedb.pydantic import LanceModel +from lancedb.pydantic import LanceModel, vector def test_basic(tmp_path): @@ -77,35 +77,78 @@ def test_ingest_pd(tmp_path): assert db.open_table("test").name == db["test"].name -def test_ingest_record_batch_iterator(tmp_path): - def batch_reader(): - for i in range(5): - yield pa.RecordBatch.from_arrays( - [ - pa.array([[3.1, 4.1], [5.9, 26.5]]), - pa.array(["foo", "bar"]), - pa.array([10.0, 20.0]), - ], - ["vector", "item", "price"], - ) +def test_ingest_iterator(tmp_path): + class PydanticSchema(LanceModel): + vector: vector(2) + item: str + price: float - db = lancedb.connect(tmp_path) - tbl = db.create_table( - "test", - batch_reader(), - schema=pa.schema( - [ - pa.field("vector", pa.list_(pa.float32())), - pa.field("item", pa.utf8()), - pa.field("price", pa.float32()), - ] - ), + arrow_schema = pa.schema( + [ + pa.field("vector", pa.list_(pa.float32(), 2)), + pa.field("item", pa.utf8()), + pa.field("price", pa.float32()), + ] ) - tbl_len = len(tbl) - tbl.add(batch_reader()) - assert len(tbl) == tbl_len * 2 - assert len(tbl.list_versions()) == 2 + def make_batches(): + for _ in range(5): + yield from [ + # pandas + pd.DataFrame( + { + "vector": [[3.1, 4.1], [1, 1]], + "item": ["foo", "bar"], + "price": [10.0, 20.0], + } + ), + # pylist + [ + {"vector": [3.1, 4.1], "item": "foo", "price": 10.0}, + {"vector": [5.9, 26.5], "item": "bar", "price": 20.0}, + ], + # recordbatch + pa.RecordBatch.from_arrays( + [ + pa.array([[3.1, 4.1], [5.9, 26.5]], pa.list_(pa.float32(), 2)), + pa.array(["foo", "bar"]), + pa.array([10.0, 20.0]), + ], + ["vector", "item", "price"], + ), + # pa Table + pa.Table.from_arrays( + [ + pa.array([[3.1, 4.1], [5.9, 26.5]], pa.list_(pa.float32(), 2)), + pa.array(["foo", "bar"]), + pa.array([10.0, 20.0]), + ], + ["vector", "item", "price"], + ), + # pydantic list + [ + PydanticSchema(vector=[3.1, 4.1], item="foo", price=10.0), + PydanticSchema(vector=[5.9, 26.5], item="bar", price=20.0), + ] + # TODO: test pydict separately. it is unique column number and names contraint + ] + + def run_tests(schema): + db = lancedb.connect(tmp_path) + tbl = db.create_table("table2", make_batches(), schema=schema, mode="overwrite") + + tbl.to_pandas() + assert tbl.search([3.1, 4.1]).limit(1).to_df()["_distance"][0] == 0.0 + assert tbl.search([5.9, 26.5]).limit(1).to_df()["_distance"][0] == 0.0 + + tbl_len = len(tbl) + tbl.add(make_batches()) + assert len(tbl) == tbl_len * 2 + assert len(tbl.list_versions()) == 2 + db.drop_database() + + run_tests(arrow_schema) + run_tests(PydanticSchema) def test_create_mode(tmp_path):