mirror of
https://github.com/lancedb/lancedb.git
synced 2026-01-16 08:42:57 +00:00
[python] Support schema evolution in local LanceDB (#452)
Previously if you needed to add a column to a table you'd have to rewrite the whole table. Instead, we use the merge functionality from Lance format to incrementally add columns from another table or dataframe. --------- Co-authored-by: Chang She <chang@lancedb.com> Co-authored-by: Weston Pace <weston.pace@gmail.com>
This commit is contained in:
@@ -17,13 +17,14 @@ import inspect
|
||||
import os
|
||||
from abc import ABC, abstractmethod
|
||||
from functools import cached_property
|
||||
from typing import Iterable, List, Union
|
||||
from typing import Iterable, List, Optional, Union
|
||||
|
||||
import lance
|
||||
import numpy as np
|
||||
import pyarrow as pa
|
||||
import pyarrow.compute as pc
|
||||
from lance import LanceDataset
|
||||
from lance.dataset import ReaderLike
|
||||
from lance.vector import vec_to_table
|
||||
|
||||
from .common import DATA, VEC, VECTOR_COLUMN_NAME
|
||||
@@ -505,6 +506,69 @@ class LanceTable(Table):
|
||||
lance.write_dataset(data, self._dataset_uri, schema=self.schema, mode=mode)
|
||||
self._reset_dataset()
|
||||
|
||||
def merge(
|
||||
self,
|
||||
other_table: Union[LanceTable, ReaderLike],
|
||||
left_on: str,
|
||||
right_on: Optional[str] = None,
|
||||
schema: Optional[pa.Schema, LanceModel] = None,
|
||||
):
|
||||
"""Merge another table into this table.
|
||||
|
||||
Performs a left join, where the dataset is the left side and other_table
|
||||
is the right side. Rows existing in the dataset but not on the left will
|
||||
be filled with null values, unless Lance doesn't support null values for
|
||||
some types, in which case an error will be raised. The only overlapping
|
||||
column allowed is the join column. If other overlapping columns exist,
|
||||
an error will be raised.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
other_table: LanceTable or Reader-like
|
||||
The data to be merged. Acceptable types are:
|
||||
- Pandas DataFrame, Pyarrow Table, Dataset, Scanner,
|
||||
Iterator[RecordBatch], or RecordBatchReader
|
||||
- LanceTable
|
||||
left_on: str
|
||||
The name of the column in the dataset to join on.
|
||||
right_on: str or None
|
||||
The name of the column in other_table to join on. If None, defaults to
|
||||
left_on.
|
||||
schema: pa.Schema or LanceModel, optional
|
||||
The schema of the other_table.
|
||||
If not provided, the schema is inferred from the data.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> import lancedb
|
||||
>>> import pyarrow as pa
|
||||
>>> df = pa.table({'x': [1, 2, 3], 'y': ['a', 'b', 'c']})
|
||||
>>> db = lancedb.connect("./.lancedb")
|
||||
>>> table = db.create_table("dataset", df)
|
||||
>>> table.to_pandas()
|
||||
x y
|
||||
0 1 a
|
||||
1 2 b
|
||||
2 3 c
|
||||
>>> new_df = pa.table({'x': [1, 2, 3], 'z': ['d', 'e', 'f']})
|
||||
>>> table.merge(new_df, 'x')
|
||||
>>> table.to_pandas()
|
||||
x y z
|
||||
0 1 a d
|
||||
1 2 b e
|
||||
2 3 c f
|
||||
"""
|
||||
if isinstance(schema, LanceModel):
|
||||
schema = schema.to_arrow_schema()
|
||||
if isinstance(other_table, LanceTable):
|
||||
other_table = other_table.to_lance()
|
||||
if isinstance(other_table, LanceDataset):
|
||||
other_table = other_table.to_table()
|
||||
self._dataset.merge(
|
||||
other_table, left_on=left_on, right_on=right_on, schema=schema
|
||||
)
|
||||
self._reset_dataset()
|
||||
|
||||
def search(
|
||||
self, query: Union[VEC, str], vector_column_name=VECTOR_COLUMN_NAME
|
||||
) -> LanceQueryBuilder:
|
||||
|
||||
@@ -16,6 +16,7 @@ from pathlib import Path
|
||||
from typing import List
|
||||
from unittest.mock import PropertyMock, patch
|
||||
|
||||
import lance
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import pyarrow as pa
|
||||
@@ -295,3 +296,23 @@ def test_restore(db):
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
table.restore(0)
|
||||
|
||||
|
||||
def test_merge(db, tmp_path):
|
||||
table = LanceTable.create(
|
||||
db,
|
||||
"my_table",
|
||||
data=[{"vector": [1.1, 0.9], "id": 0}, {"vector": [1.2, 1.9], "id": 1}],
|
||||
)
|
||||
other_table = pa.table({"document": ["foo", "bar"], "id": [0, 1]})
|
||||
table.merge(other_table, left_on="id")
|
||||
assert len(table.list_versions()) == 2
|
||||
expected = pa.table(
|
||||
{"vector": [[1.1, 0.9], [1.2, 1.9]], "id": [0, 1], "document": ["foo", "bar"]},
|
||||
schema=table.schema,
|
||||
)
|
||||
assert table.to_arrow() == expected
|
||||
|
||||
other_dataset = lance.write_dataset(other_table, tmp_path / "other_table.lance")
|
||||
table.restore(1)
|
||||
table.merge(other_dataset, left_on="id")
|
||||
|
||||
Reference in New Issue
Block a user