feat: upgrade to lance 0.9.11 and expose merge_insert (#906)

This adds the python bindings requested in #870 The javascript/rust
bindings will be added in a future PR.
This commit is contained in:
Weston Pace
2024-02-01 11:36:29 -08:00
committed by GitHub
parent 62f053ac92
commit d77e95a4f4
6 changed files with 230 additions and 1 deletions

View File

@@ -28,6 +28,7 @@ from lance.vector import vec_to_table
from .common import DATA, VEC, VECTOR_COLUMN_NAME
from .embeddings import EmbeddingFunctionConfig, EmbeddingFunctionRegistry
from .merge import LanceMergeInsertBuilder
from .pydantic import LanceModel, model_to_dict
from .query import LanceQueryBuilder, Query
from .util import (
@@ -334,6 +335,64 @@ class Table(ABC):
"""
raise NotImplementedError
def merge_insert(self, on: Union[str, Iterable[str]]) -> LanceMergeInsertBuilder:
"""
Returns a [`LanceMergeInsertBuilder`][lancedb.merge.LanceMergeInsertBuilder]
that can be used to create a "merge insert" operation
This operation can add rows, update rows, and remove rows all in a single
transaction. It is a very generic tool that can be used to create
behaviors like "insert if not exists", "update or insert (i.e. upsert)",
or even replace a portion of existing data with new data (e.g. replace
all data where month="january")
The merge insert operation works by combining new data from a
**source table** with existing data in a **target table** by using a
join. There are three categories of records.
"Matched" records are records that exist in both the source table and
the target table. "Not matched" records exist only in the source table
(e.g. these are new data) "Not matched by source" records exist only
in the target table (this is old data)
The builder returned by this method can be used to customize what
should happen for each category of data.
Please note that the data may appear to be reordered as part of this
operation. This is because updated rows will be deleted from the
dataset and then reinserted at the end with the new values.
Parameters
----------
on: Union[str, Iterable[str]]
A column (or columns) to join on. This is how records from the
source table and target table are matched. Typically this is some
kind of key or id column.
Examples
--------
>>> import lancedb
>>> data = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]})
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table", data)
>>> new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform a "upsert" operation
>>> table.merge_insert("a") \\
... .when_matched_update_all() \\
... .when_not_matched_insert_all() \\
... .execute(new_data)
>>> # The order of new rows is non-deterministic since we use
>>> # a hash-join as part of this operation and so we sort here
>>> table.to_arrow().sort_by("a").to_pandas()
a b
0 1 b
1 2 x
2 3 y
3 4 z
"""
return LanceMergeInsertBuilder(self, on)
@abstractmethod
def search(
self,
@@ -414,6 +473,16 @@ class Table(ABC):
def _execute_query(self, query: Query) -> pa.Table:
pass
@abstractmethod
def _do_merge(
self,
merge: LanceMergeInsertBuilder,
new_data: DATA,
*,
schema: Optional[pa.Schema] = None,
):
pass
@abstractmethod
def delete(self, where: str):
"""Delete rows from the table.
@@ -1196,6 +1265,18 @@ class LanceTable(Table):
with_row_id=query.with_row_id,
)
def _do_merge(self, merge: LanceMergeInsertBuilder, new_data: DATA, *, schema=None):
ds = self.to_lance()
builder = ds.merge_insert(merge._on)
if merge._when_matched_update_all:
builder.when_matched_update_all()
if merge._when_not_matched_insert_all:
builder.when_not_matched_insert_all()
if merge._when_not_matched_by_source_delete:
cond = merge._when_not_matched_by_source_condition
builder.when_not_matched_by_source_delete(cond)
builder.execute(new_data, schema=schema)
def cleanup_old_versions(
self,
older_than: Optional[timedelta] = None,