Make creating (and adding to) tables via Iterators more flexible & intuitive (#430)

It improves the UX as iterators can be of any type supported by the
table (plus recordbatch) & there is no separate requirement.
Also expands the test cases for pydantic & arrow schema.
If this is looks good I'll update the docs.

Example usage:
```
class Content(LanceModel):
    vector: vector(2)
    item: str
    price: float

def make_batches():
    for _ in range(5):
        yield from [ 
        # pandas
        pd.DataFrame({
            "vector": [[3.1, 4.1], [1, 1]],
            "item": ["foo", "bar"],
            "price": [10.0, 20.0],
        }),
        
        # pylist
        [
            {"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
            {"vector": [5.9, 26.5], "item": "bar", "price": 20.0},
        ],

        # recordbatch
        pa.RecordBatch.from_arrays(
            [
                pa.array([[3.1, 4.1], [5.9, 26.5]], pa.list_(pa.float32(), 2)),
                pa.array(["foo", "bar"]),
                pa.array([10.0, 20.0]),
            ], 
            ["vector", "item", "price"],
        ),

        # pydantic list
        [
            Content(vector=[3.1, 4.1], item="foo", price=10.0),
            Content(vector=[5.9, 26.5], item="bar", price=20.0),
        ]]

db = lancedb.connect("db")
tbl = db.create_table("tabley", make_batches(), schema=Content, mode="overwrite")

tbl.add(make_batches())
```
Same should with arrow schema.

---------

Co-authored-by: Weston Pace <weston.pace@gmail.com>
This commit is contained in:
Ayush Chaurasia
2023-08-18 09:56:30 +05:30
committed by GitHub
parent ba416a571d
commit 0b9924b432
2 changed files with 81 additions and 27 deletions

View File

@@ -56,11 +56,22 @@ def _sanitize_data(data, schema, on_bad_vectors, fill_value):
metadata = {k: v for k, v in metadata.items() if k != b"pandas"}
schema = data.schema.with_metadata(metadata)
data = pa.Table.from_arrays(data.columns, schema=schema)
if isinstance(data, Iterable):
data = _to_record_batch_generator(data, schema, on_bad_vectors, fill_value)
if not isinstance(data, (pa.Table, Iterable)):
raise TypeError(f"Unsupported data type: {type(data)}")
return data
def _to_record_batch_generator(data: Iterable, schema, on_bad_vectors, fill_value):
for batch in data:
if not isinstance(batch, pa.RecordBatch):
table = _sanitize_data(batch, schema, on_bad_vectors, fill_value)
for batch in table.to_batches():
yield batch
yield batch
class Table(ABC):
"""
A [Table](Table) is a collection of Records in a LanceDB [Database](Database).