Compare commits

..

4 Commits

Author SHA1 Message Date
qzhu
c3be2e3962 small fix for the guides/table page 2024-02-01 14:41:00 -08:00
Weston Pace
d77e95a4f4 feat: upgrade to lance 0.9.11 and expose merge_insert (#906)
This adds the python bindings requested in #870 The javascript/rust
bindings will be added in a future PR.
2024-02-01 11:36:29 -08:00
Lei Xu
62f053ac92 ci: bump to new version of python action to use node 20 gIthub action runtime (#909)
Github action is deprecating old node-16 runtime.
2024-02-01 11:36:03 -08:00
JacobLinCool
34e10caad2 fix the repo link on npm, add links for homepage and bug report (#910)
- fix the repo link on npm
- add links for homepage and bug report
2024-01-31 21:07:11 -08:00
22 changed files with 253 additions and 232 deletions

View File

@@ -29,7 +29,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: "3.10"
cache: "pip"

View File

@@ -29,7 +29,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: 3.11
cache: "pip"

View File

@@ -37,10 +37,10 @@ jobs:
run: |
git config user.name 'Lance Release'
git config user.email 'lance-dev@lancedb.com'
- name: Set up Python 3.10
uses: actions/setup-python@v4
- name: Set up Python 3.11
uses: actions/setup-python@v5
with:
python-version: "3.10"
python-version: "3.11"
- name: Bump version, create tag and commit
run: |
pip install bump2version

View File

@@ -16,7 +16,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: "3.8"
- name: Build distribution

View File

@@ -37,10 +37,10 @@ jobs:
run: |
git config user.name 'Lance Release'
git config user.email 'lance-dev@lancedb.com'
- name: Set up Python 3.10
uses: actions/setup-python@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.10"
python-version: "3.11"
- name: Bump version, create tag and commit
working-directory: python
run: |

View File

@@ -30,7 +30,7 @@ jobs:
fetch-depth: 0
lfs: true
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: 3.${{ matrix.python-minor-version }}
- name: Install lancedb
@@ -69,7 +69,7 @@ jobs:
fetch-depth: 0
lfs: true
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install lancedb
@@ -92,7 +92,7 @@ jobs:
fetch-depth: 0
lfs: true
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: 3.9
- name: Install lancedb

View File

@@ -11,11 +11,10 @@ license = "Apache-2.0"
repository = "https://github.com/lancedb/lancedb"
[workspace.dependencies]
lance = { "version" = "=0.9.11", "features" = ["dynamodb"] }
lance-datafusion = { "version" = "=0.9.11" }
lance-index = { "version" = "=0.9.11" }
lance-linalg = { "version" = "=0.9.11" }
lance-testing = { "version" = "=0.9.11" }
lance = { "version" = "=0.9.10", "features" = ["dynamodb"] }
lance-index = { "version" = "=0.9.10" }
lance-linalg = { "version" = "=0.9.10" }
lance-testing = { "version" = "=0.9.10" }
# Note that this one does not include pyarrow
arrow = { version = "50.0", optional = false }
arrow-array = "50.0"

View File

@@ -84,7 +84,7 @@ This guide will show how to create tables, insert data into them, and update the
const table = await con.createTable(tableName, data, { writeMode: WriteMode.Overwrite })
```
### From a Pandas DataFrame
### From a Pandas DataFrame
```python
import pandas as pd

View File

@@ -58,6 +58,8 @@ pip install lancedb
::: lancedb.schema.vector
::: lancedb.merge.LanceMergeInsertBuilder
## Integrations
### Pydantic

View File

@@ -17,7 +17,11 @@
},
"repository": {
"type": "git",
"url": "https://github.com/lancedb/lancedb/node"
"url": "https://github.com/lancedb/lancedb.git"
},
"homepage": "https://lancedb.github.io/lancedb/",
"bugs": {
"url": "https://github.com/lancedb/lancedb/issues"
},
"keywords": [
"data-format",

View File

@@ -451,11 +451,6 @@ export interface Table<T = number[]> {
indexStats: (indexUuid: string) => Promise<IndexStats>
filter(value: string): Query<T>
/**
* TODO comment
*/
mergeInsert: () => MergeInsertBuilder
schema: Promise<Schema>
}
@@ -905,15 +900,6 @@ export class LocalTable<T = number[]> implements Table<T> {
return false
}
}
mergeInsert: () => MergeInsertBuilder = () => {
return new MergeInsertBuilder(async (args: {
params: MergeInsertParams
data: Array<Record<string, unknown>> | ArrowTable
}) => {
throw new Error('Not implemented')
})
}
}
export interface CleanupStats {
@@ -1090,56 +1076,3 @@ export enum MetricType {
*/
Dot = 'dot',
}
export interface MergeInsertParams {
whenMatchedUpdateAll: boolean
whenNotMatchedInsertAll: boolean
whenNotMatchedBySourceDelete: boolean
whenNotMatchedBySourceCondition: boolean
}
type MergeInsertCallback = (args: {
params: MergeInsertParams
data: Array<Record<string, unknown>> | ArrowTable
}) => Promise<void>
export class MergeInsertBuilder {
readonly #callback: MergeInsertCallback
readonly #params: MergeInsertParams
constructor (callback: MergeInsertCallback) {
this.#callback = callback
this.#params = {
whenMatchedUpdateAll: false,
whenNotMatchedInsertAll: false,
whenNotMatchedBySourceDelete: false,
whenNotMatchedBySourceCondition: false
}
}
whenMatchedUpdateAll (): MergeInsertBuilder {
this.#params.whenMatchedUpdateAll = true
return this
}
whenNotMatchedInsertAll (): MergeInsertBuilder {
this.#params.whenNotMatchedInsertAll = true
return this
}
whenNotMatchedBySourceDelete (): MergeInsertBuilder {
this.#params.whenNotMatchedBySourceDelete = true
return this
}
whenNotMatchedBySourceCondition (): MergeInsertBuilder {
this.#params.whenNotMatchedBySourceCondition = true
return this
}
async execute ({ data }: {
data: Array<Record<string, unknown>> | ArrowTable
}): Promise<void> {
await this.#callback({ params: this.#params, data })
}
}

View File

@@ -120,7 +120,7 @@ export class HttpLancedbClient {
public async post (
path: string,
data?: any,
params?: Record<string, string | number | boolean>,
params?: Record<string, string | number>,
content?: string | undefined
): Promise<AxiosResponse> {
const response = await axios.post(

View File

@@ -24,8 +24,6 @@ import {
type IndexStats,
type UpdateArgs,
type UpdateSqlArgs,
type MergeInsertParams,
MergeInsertBuilder,
makeArrowTable
} from '../index'
import { Query } from '../query'
@@ -426,36 +424,4 @@ export class RemoteTable<T = number[]> implements Table<T> {
numUnindexedRows: results.data.num_unindexed_rows
}
}
mergeInsert: () => MergeInsertBuilder = () => {
return new MergeInsertBuilder(async ({ data, params }: {
params: MergeInsertParams
data: Array<Record<string, unknown>> | ArrowTable
}) => {
// TODO -- uncomment this this
// let tbl: ArrowTable
// if (data instanceof ArrowTable) {
// tbl = data
// } else {
// tbl = makeArrowTable(data, await this.schema)
// }
const tbl = data as ArrowTable
const buffer = await fromTableToStreamBuffer(tbl, this._embeddings)
console.log({ buffer })
await this._client.post(
`/v1/table/${this._name}/merge_insert/`,
buffer,
{
when_matched_update_all: params.whenMatchedUpdateAll,
when_not_matched_insert_all: params.whenNotMatchedInsertAll,
when_not_matched_by_source_delete: params.whenNotMatchedBySourceDelete,
when_not_matched_by_source_condition: params.whenNotMatchedBySourceCondition
},
'application/vnd.apache.arrow.stream'
)
})
}
}

86
python/lancedb/merge.py Normal file
View File

@@ -0,0 +1,86 @@
# Copyright 2023 LanceDB Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import TYPE_CHECKING, Iterable, Optional
if TYPE_CHECKING:
from .common import DATA
class LanceMergeInsertBuilder(object):
"""Builder for a LanceDB merge insert operation
See [`merge_insert`][lancedb.table.Table.merge_insert] for
more context
"""
def __init__(self, table: "Table", on: Iterable[str]): # noqa: F821
# Do not put a docstring here. This method should be hidden
# from API docs. Users should use merge_insert to create
# this object.
self._table = table
self._on = on
self._when_matched_update_all = False
self._when_not_matched_insert_all = False
self._when_not_matched_by_source_delete = False
self._when_not_matched_by_source_condition = None
def when_matched_update_all(self) -> LanceMergeInsertBuilder:
"""
Rows that exist in both the source table (new data) and
the target table (old data) will be updated, replacing
the old row with the corresponding matching row.
If there are multiple matches then the behavior is undefined.
Currently this causes multiple copies of the row to be created
but that behavior is subject to change.
"""
self._when_matched_update_all = True
return self
def when_not_matched_insert_all(self) -> LanceMergeInsertBuilder:
"""
Rows that exist only in the source table (new data) should
be inserted into the target table.
"""
self._when_not_matched_insert_all = True
return self
def when_not_matched_by_source_delete(
self, condition: Optional[str] = None
) -> LanceMergeInsertBuilder:
"""
Rows that exist only in the target table (old data) will be
deleted. An optional condition can be provided to limit what
data is deleted.
Parameters
----------
condition: Optional[str], default None
If None then all such rows will be deleted. Otherwise the
condition will be used as an SQL filter to limit what rows
are deleted.
"""
self._when_not_matched_by_source_delete = True
if condition is not None:
self._when_not_matched_by_source_condition = condition
return self
def execute(self, new_data: DATA):
"""
Executes the merge insert operation
Nothing is returned but the [`Table`][lancedb.table.Table] is updated
"""
self._table._do_merge(self, new_data)

View File

@@ -244,6 +244,10 @@ class RemoteTable(Table):
result = self._conn._client.query(self._name, query)
return result.to_arrow()
def _do_merge(self, *_args):
"""_do_merge() is not supported on the LanceDB cloud yet"""
return NotImplementedError("_do_merge() is not supported on the LanceDB cloud")
def delete(self, predicate: str):
"""Delete rows from the table.

View File

@@ -28,6 +28,7 @@ from lance.vector import vec_to_table
from .common import DATA, VEC, VECTOR_COLUMN_NAME
from .embeddings import EmbeddingFunctionConfig, EmbeddingFunctionRegistry
from .merge import LanceMergeInsertBuilder
from .pydantic import LanceModel, model_to_dict
from .query import LanceQueryBuilder, Query
from .util import (
@@ -334,6 +335,64 @@ class Table(ABC):
"""
raise NotImplementedError
def merge_insert(self, on: Union[str, Iterable[str]]) -> LanceMergeInsertBuilder:
"""
Returns a [`LanceMergeInsertBuilder`][lancedb.merge.LanceMergeInsertBuilder]
that can be used to create a "merge insert" operation
This operation can add rows, update rows, and remove rows all in a single
transaction. It is a very generic tool that can be used to create
behaviors like "insert if not exists", "update or insert (i.e. upsert)",
or even replace a portion of existing data with new data (e.g. replace
all data where month="january")
The merge insert operation works by combining new data from a
**source table** with existing data in a **target table** by using a
join. There are three categories of records.
"Matched" records are records that exist in both the source table and
the target table. "Not matched" records exist only in the source table
(e.g. these are new data) "Not matched by source" records exist only
in the target table (this is old data)
The builder returned by this method can be used to customize what
should happen for each category of data.
Please note that the data may appear to be reordered as part of this
operation. This is because updated rows will be deleted from the
dataset and then reinserted at the end with the new values.
Parameters
----------
on: Union[str, Iterable[str]]
A column (or columns) to join on. This is how records from the
source table and target table are matched. Typically this is some
kind of key or id column.
Examples
--------
>>> import lancedb
>>> data = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]})
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table", data)
>>> new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform a "upsert" operation
>>> table.merge_insert("a") \\
... .when_matched_update_all() \\
... .when_not_matched_insert_all() \\
... .execute(new_data)
>>> # The order of new rows is non-deterministic since we use
>>> # a hash-join as part of this operation and so we sort here
>>> table.to_arrow().sort_by("a").to_pandas()
a b
0 1 b
1 2 x
2 3 y
3 4 z
"""
return LanceMergeInsertBuilder(self, on)
@abstractmethod
def search(
self,
@@ -414,6 +473,16 @@ class Table(ABC):
def _execute_query(self, query: Query) -> pa.Table:
pass
@abstractmethod
def _do_merge(
self,
merge: LanceMergeInsertBuilder,
new_data: DATA,
*,
schema: Optional[pa.Schema] = None,
):
pass
@abstractmethod
def delete(self, where: str):
"""Delete rows from the table.
@@ -1196,6 +1265,18 @@ class LanceTable(Table):
with_row_id=query.with_row_id,
)
def _do_merge(self, merge: LanceMergeInsertBuilder, new_data: DATA, *, schema=None):
ds = self.to_lance()
builder = ds.merge_insert(merge._on)
if merge._when_matched_update_all:
builder.when_matched_update_all()
if merge._when_not_matched_insert_all:
builder.when_not_matched_insert_all()
if merge._when_not_matched_by_source_delete:
cond = merge._when_not_matched_by_source_condition
builder.when_not_matched_by_source_delete(cond)
builder.execute(new_data, schema=schema)
def cleanup_old_versions(
self,
older_than: Optional[timedelta] = None,

View File

@@ -3,7 +3,7 @@ name = "lancedb"
version = "0.5.1"
dependencies = [
"deprecation",
"pylance==0.9.10",
"pylance==0.9.11",
"ratelimiter~=1.0",
"retry>=0.9.2",
"tqdm>=4.27.0",

View File

@@ -493,6 +493,62 @@ def test_update_types(db):
assert actual == expected
def test_merge_insert(db):
table = LanceTable.create(
db,
"my_table",
data=pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}),
)
assert len(table) == 3
version = table.version
new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
# upsert
table.merge_insert(
"a"
).when_matched_update_all().when_not_matched_insert_all().execute(new_data)
expected = pa.table({"a": [1, 2, 3, 4], "b": ["a", "x", "y", "z"]})
# These `sort_by` calls can be removed once lance#1892
# is merged (it fixes the ordering)
assert table.to_arrow().sort_by("a") == expected
table.restore(version)
# insert-if-not-exists
table.merge_insert("a").when_not_matched_insert_all().execute(new_data)
expected = pa.table({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "z"]})
assert table.to_arrow().sort_by("a") == expected
table.restore(version)
new_data = pa.table({"a": [2, 4], "b": ["x", "z"]})
# replace-range
table.merge_insert(
"a"
).when_matched_update_all().when_not_matched_insert_all().when_not_matched_by_source_delete(
"a > 2"
).execute(new_data)
expected = pa.table({"a": [1, 2, 4], "b": ["a", "x", "z"]})
assert table.to_arrow().sort_by("a") == expected
table.restore(version)
# replace-range no condition
table.merge_insert(
"a"
).when_matched_update_all().when_not_matched_insert_all().when_not_matched_by_source_delete().execute(
new_data
)
expected = pa.table({"a": [2, 4], "b": ["x", "z"]})
assert table.to_arrow().sort_by("a") == expected
def test_create_with_embedding_function(db):
class MyTable(LanceModel):
text: str

View File

@@ -22,7 +22,6 @@ object_store = { workspace = true }
snafu = { workspace = true }
half = { workspace = true }
lance = { workspace = true }
lance-datafusion = { workspace = true }
lance-index = { workspace = true }
lance-linalg = { workspace = true }
lance-testing = { workspace = true }

View File

@@ -175,7 +175,6 @@ pub mod error;
pub mod index;
pub mod io;
pub mod ipc;
pub mod merge_insert;
pub mod query;
pub mod table;
pub mod utils;

View File

@@ -1,101 +0,0 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::pin::Pin;
use std::sync::Arc;
use arrow_array::RecordBatchReader;
use lance::dataset::{self, WhenMatched, WhenNotMatched, WhenNotMatchedBySource};
use lance_datafusion::utils::reader_to_stream;
use crate::TableRef;
use crate::error::{Error, Result};
pub struct MergeInsertBuilder {
table: TableRef,
when_matched_update_all: bool,
when_not_matched_insert_all: bool,
when_not_matched_by_source_delete: bool,
when_not_matched_by_source_condition: bool,
}
impl MergeInsertBuilder {
pub(crate) fn new(table: TableRef) -> Self {
Self {
table,
when_matched_update_all: false,
when_not_matched_insert_all: false,
when_not_matched_by_source_delete: false,
when_not_matched_by_source_condition: false,
}
}
pub fn when_matched_update_all(mut self) -> Self {
self.when_matched_update_all = true;
self
}
pub fn when_not_matched_insert_all(mut self) -> Self {
self.when_not_matched_insert_all = true;
self
}
pub fn when_not_matched_by_source_delete(mut self) -> Self {
self.when_not_matched_by_source_delete = true;
self
}
pub fn when_not_matched_by_source_condition(mut self) -> Self {
self.when_not_matched_by_source_condition = true;
self
}
pub async fn execute(
mut self,
batches: Box<dyn RecordBatchReader + Send>,
) -> Result<()> {
let native_table = self.table.as_native().unwrap(); // TODO no unwrap
let ds = native_table.clone_inner_dataset();
let mut builder = dataset::MergeInsertBuilder::try_new(
Arc::new(ds),
vec!["vectors".to_string()],
)
.unwrap(); // TODO no unwrap
if self.when_matched_update_all {
builder.when_matched(WhenMatched::UpdateAll);
}
if self.when_not_matched_insert_all {
builder.when_not_matched(WhenNotMatched::InsertAll);
}
if self.when_not_matched_by_source_delete {
builder.when_not_matched_by_source(WhenNotMatchedBySource::Delete);
}
// TODO
// if self.when_not_matched_by_source_condition {
// builder.when_not_matched_by_source(WhenNotMatchedBySource::DeleteIf(()));
// }
let job = builder.try_build().unwrap(); // TODO no unwrap
let batches = reader_to_stream(batches).await.unwrap().0; // TODO no unwrap
let ds2 = job.execute(batches).await.unwrap(); // TODO no unwrap
native_table.reset_dataset(ds2.as_ref().clone());
Ok(())
}
}

View File

@@ -34,7 +34,6 @@ use log::info;
use crate::error::{Error, Result};
use crate::index::vector::{VectorIndex, VectorIndexStatistics};
use crate::index::IndexBuilder;
use crate::merge_insert::MergeInsertBuilder;
use crate::query::Query;
use crate::utils::{PatchReadParam, PatchWriteParam};
use crate::WriteMode;
@@ -242,8 +241,6 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// Modeled after ``VACCUM`` in PostgreSQL.
/// Not all implementations support explicit optimization.
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
fn merge_insert(&self) -> MergeInsertBuilder;
}
/// Reference to a Table pointer.
@@ -701,10 +698,6 @@ impl Table for NativeTable {
}
Ok(stats)
}
fn merge_insert(&self) -> MergeInsertBuilder {
MergeInsertBuilder::new(Arc::new(self.clone()))
}
}
#[cfg(test)]