mirror of
https://github.com/lancedb/lancedb.git
synced 2025-12-25 14:29:56 +00:00
Compare commits
4 Commits
merge_inse
...
small-doc-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c3be2e3962 | ||
|
|
d77e95a4f4 | ||
|
|
62f053ac92 | ||
|
|
34e10caad2 |
2
.github/workflows/docs.yml
vendored
2
.github/workflows/docs.yml
vendored
@@ -29,7 +29,7 @@ jobs:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v4
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.10"
|
||||
cache: "pip"
|
||||
|
||||
2
.github/workflows/docs_test.yml
vendored
2
.github/workflows/docs_test.yml
vendored
@@ -29,7 +29,7 @@ jobs:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v4
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: 3.11
|
||||
cache: "pip"
|
||||
|
||||
6
.github/workflows/make-release-commit.yml
vendored
6
.github/workflows/make-release-commit.yml
vendored
@@ -37,10 +37,10 @@ jobs:
|
||||
run: |
|
||||
git config user.name 'Lance Release'
|
||||
git config user.email 'lance-dev@lancedb.com'
|
||||
- name: Set up Python 3.10
|
||||
uses: actions/setup-python@v4
|
||||
- name: Set up Python 3.11
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.10"
|
||||
python-version: "3.11"
|
||||
- name: Bump version, create tag and commit
|
||||
run: |
|
||||
pip install bump2version
|
||||
|
||||
2
.github/workflows/pypi-publish.yml
vendored
2
.github/workflows/pypi-publish.yml
vendored
@@ -16,7 +16,7 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v4
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.8"
|
||||
- name: Build distribution
|
||||
|
||||
@@ -37,10 +37,10 @@ jobs:
|
||||
run: |
|
||||
git config user.name 'Lance Release'
|
||||
git config user.email 'lance-dev@lancedb.com'
|
||||
- name: Set up Python 3.10
|
||||
uses: actions/setup-python@v4
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.10"
|
||||
python-version: "3.11"
|
||||
- name: Bump version, create tag and commit
|
||||
working-directory: python
|
||||
run: |
|
||||
|
||||
6
.github/workflows/python.yml
vendored
6
.github/workflows/python.yml
vendored
@@ -30,7 +30,7 @@ jobs:
|
||||
fetch-depth: 0
|
||||
lfs: true
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v4
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: 3.${{ matrix.python-minor-version }}
|
||||
- name: Install lancedb
|
||||
@@ -69,7 +69,7 @@ jobs:
|
||||
fetch-depth: 0
|
||||
lfs: true
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v4
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.11"
|
||||
- name: Install lancedb
|
||||
@@ -92,7 +92,7 @@ jobs:
|
||||
fetch-depth: 0
|
||||
lfs: true
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v4
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: 3.9
|
||||
- name: Install lancedb
|
||||
|
||||
@@ -11,11 +11,10 @@ license = "Apache-2.0"
|
||||
repository = "https://github.com/lancedb/lancedb"
|
||||
|
||||
[workspace.dependencies]
|
||||
lance = { "version" = "=0.9.11", "features" = ["dynamodb"] }
|
||||
lance-datafusion = { "version" = "=0.9.11" }
|
||||
lance-index = { "version" = "=0.9.11" }
|
||||
lance-linalg = { "version" = "=0.9.11" }
|
||||
lance-testing = { "version" = "=0.9.11" }
|
||||
lance = { "version" = "=0.9.10", "features" = ["dynamodb"] }
|
||||
lance-index = { "version" = "=0.9.10" }
|
||||
lance-linalg = { "version" = "=0.9.10" }
|
||||
lance-testing = { "version" = "=0.9.10" }
|
||||
# Note that this one does not include pyarrow
|
||||
arrow = { version = "50.0", optional = false }
|
||||
arrow-array = "50.0"
|
||||
|
||||
@@ -84,7 +84,7 @@ This guide will show how to create tables, insert data into them, and update the
|
||||
const table = await con.createTable(tableName, data, { writeMode: WriteMode.Overwrite })
|
||||
```
|
||||
|
||||
### From a Pandas DataFrame
|
||||
### From a Pandas DataFrame
|
||||
|
||||
```python
|
||||
import pandas as pd
|
||||
|
||||
@@ -58,6 +58,8 @@ pip install lancedb
|
||||
|
||||
::: lancedb.schema.vector
|
||||
|
||||
::: lancedb.merge.LanceMergeInsertBuilder
|
||||
|
||||
## Integrations
|
||||
|
||||
### Pydantic
|
||||
|
||||
@@ -17,7 +17,11 @@
|
||||
},
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "https://github.com/lancedb/lancedb/node"
|
||||
"url": "https://github.com/lancedb/lancedb.git"
|
||||
},
|
||||
"homepage": "https://lancedb.github.io/lancedb/",
|
||||
"bugs": {
|
||||
"url": "https://github.com/lancedb/lancedb/issues"
|
||||
},
|
||||
"keywords": [
|
||||
"data-format",
|
||||
|
||||
@@ -451,11 +451,6 @@ export interface Table<T = number[]> {
|
||||
indexStats: (indexUuid: string) => Promise<IndexStats>
|
||||
|
||||
filter(value: string): Query<T>
|
||||
|
||||
/**
|
||||
* TODO comment
|
||||
*/
|
||||
mergeInsert: () => MergeInsertBuilder
|
||||
|
||||
schema: Promise<Schema>
|
||||
}
|
||||
@@ -905,15 +900,6 @@ export class LocalTable<T = number[]> implements Table<T> {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
mergeInsert: () => MergeInsertBuilder = () => {
|
||||
return new MergeInsertBuilder(async (args: {
|
||||
params: MergeInsertParams
|
||||
data: Array<Record<string, unknown>> | ArrowTable
|
||||
}) => {
|
||||
throw new Error('Not implemented')
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
export interface CleanupStats {
|
||||
@@ -1090,56 +1076,3 @@ export enum MetricType {
|
||||
*/
|
||||
Dot = 'dot',
|
||||
}
|
||||
|
||||
export interface MergeInsertParams {
|
||||
whenMatchedUpdateAll: boolean
|
||||
whenNotMatchedInsertAll: boolean
|
||||
whenNotMatchedBySourceDelete: boolean
|
||||
whenNotMatchedBySourceCondition: boolean
|
||||
}
|
||||
|
||||
type MergeInsertCallback = (args: {
|
||||
params: MergeInsertParams
|
||||
data: Array<Record<string, unknown>> | ArrowTable
|
||||
}) => Promise<void>
|
||||
|
||||
export class MergeInsertBuilder {
|
||||
readonly #callback: MergeInsertCallback
|
||||
readonly #params: MergeInsertParams
|
||||
|
||||
constructor (callback: MergeInsertCallback) {
|
||||
this.#callback = callback
|
||||
this.#params = {
|
||||
whenMatchedUpdateAll: false,
|
||||
whenNotMatchedInsertAll: false,
|
||||
whenNotMatchedBySourceDelete: false,
|
||||
whenNotMatchedBySourceCondition: false
|
||||
}
|
||||
}
|
||||
|
||||
whenMatchedUpdateAll (): MergeInsertBuilder {
|
||||
this.#params.whenMatchedUpdateAll = true
|
||||
return this
|
||||
}
|
||||
|
||||
whenNotMatchedInsertAll (): MergeInsertBuilder {
|
||||
this.#params.whenNotMatchedInsertAll = true
|
||||
return this
|
||||
}
|
||||
|
||||
whenNotMatchedBySourceDelete (): MergeInsertBuilder {
|
||||
this.#params.whenNotMatchedBySourceDelete = true
|
||||
return this
|
||||
}
|
||||
|
||||
whenNotMatchedBySourceCondition (): MergeInsertBuilder {
|
||||
this.#params.whenNotMatchedBySourceCondition = true
|
||||
return this
|
||||
}
|
||||
|
||||
async execute ({ data }: {
|
||||
data: Array<Record<string, unknown>> | ArrowTable
|
||||
}): Promise<void> {
|
||||
await this.#callback({ params: this.#params, data })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,7 +120,7 @@ export class HttpLancedbClient {
|
||||
public async post (
|
||||
path: string,
|
||||
data?: any,
|
||||
params?: Record<string, string | number | boolean>,
|
||||
params?: Record<string, string | number>,
|
||||
content?: string | undefined
|
||||
): Promise<AxiosResponse> {
|
||||
const response = await axios.post(
|
||||
|
||||
@@ -24,8 +24,6 @@ import {
|
||||
type IndexStats,
|
||||
type UpdateArgs,
|
||||
type UpdateSqlArgs,
|
||||
type MergeInsertParams,
|
||||
MergeInsertBuilder,
|
||||
makeArrowTable
|
||||
} from '../index'
|
||||
import { Query } from '../query'
|
||||
@@ -426,36 +424,4 @@ export class RemoteTable<T = number[]> implements Table<T> {
|
||||
numUnindexedRows: results.data.num_unindexed_rows
|
||||
}
|
||||
}
|
||||
|
||||
mergeInsert: () => MergeInsertBuilder = () => {
|
||||
return new MergeInsertBuilder(async ({ data, params }: {
|
||||
params: MergeInsertParams
|
||||
data: Array<Record<string, unknown>> | ArrowTable
|
||||
}) => {
|
||||
// TODO -- uncomment this this
|
||||
// let tbl: ArrowTable
|
||||
// if (data instanceof ArrowTable) {
|
||||
// tbl = data
|
||||
// } else {
|
||||
// tbl = makeArrowTable(data, await this.schema)
|
||||
// }
|
||||
const tbl = data as ArrowTable
|
||||
|
||||
const buffer = await fromTableToStreamBuffer(tbl, this._embeddings)
|
||||
|
||||
console.log({ buffer })
|
||||
|
||||
await this._client.post(
|
||||
`/v1/table/${this._name}/merge_insert/`,
|
||||
buffer,
|
||||
{
|
||||
when_matched_update_all: params.whenMatchedUpdateAll,
|
||||
when_not_matched_insert_all: params.whenNotMatchedInsertAll,
|
||||
when_not_matched_by_source_delete: params.whenNotMatchedBySourceDelete,
|
||||
when_not_matched_by_source_condition: params.whenNotMatchedBySourceCondition
|
||||
},
|
||||
'application/vnd.apache.arrow.stream'
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
86
python/lancedb/merge.py
Normal file
86
python/lancedb/merge.py
Normal file
@@ -0,0 +1,86 @@
|
||||
# Copyright 2023 LanceDB Developers
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Iterable, Optional
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .common import DATA
|
||||
|
||||
|
||||
class LanceMergeInsertBuilder(object):
|
||||
"""Builder for a LanceDB merge insert operation
|
||||
|
||||
See [`merge_insert`][lancedb.table.Table.merge_insert] for
|
||||
more context
|
||||
"""
|
||||
|
||||
def __init__(self, table: "Table", on: Iterable[str]): # noqa: F821
|
||||
# Do not put a docstring here. This method should be hidden
|
||||
# from API docs. Users should use merge_insert to create
|
||||
# this object.
|
||||
self._table = table
|
||||
self._on = on
|
||||
self._when_matched_update_all = False
|
||||
self._when_not_matched_insert_all = False
|
||||
self._when_not_matched_by_source_delete = False
|
||||
self._when_not_matched_by_source_condition = None
|
||||
|
||||
def when_matched_update_all(self) -> LanceMergeInsertBuilder:
|
||||
"""
|
||||
Rows that exist in both the source table (new data) and
|
||||
the target table (old data) will be updated, replacing
|
||||
the old row with the corresponding matching row.
|
||||
|
||||
If there are multiple matches then the behavior is undefined.
|
||||
Currently this causes multiple copies of the row to be created
|
||||
but that behavior is subject to change.
|
||||
"""
|
||||
self._when_matched_update_all = True
|
||||
return self
|
||||
|
||||
def when_not_matched_insert_all(self) -> LanceMergeInsertBuilder:
|
||||
"""
|
||||
Rows that exist only in the source table (new data) should
|
||||
be inserted into the target table.
|
||||
"""
|
||||
self._when_not_matched_insert_all = True
|
||||
return self
|
||||
|
||||
def when_not_matched_by_source_delete(
|
||||
self, condition: Optional[str] = None
|
||||
) -> LanceMergeInsertBuilder:
|
||||
"""
|
||||
Rows that exist only in the target table (old data) will be
|
||||
deleted. An optional condition can be provided to limit what
|
||||
data is deleted.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
condition: Optional[str], default None
|
||||
If None then all such rows will be deleted. Otherwise the
|
||||
condition will be used as an SQL filter to limit what rows
|
||||
are deleted.
|
||||
"""
|
||||
self._when_not_matched_by_source_delete = True
|
||||
if condition is not None:
|
||||
self._when_not_matched_by_source_condition = condition
|
||||
return self
|
||||
|
||||
def execute(self, new_data: DATA):
|
||||
"""
|
||||
Executes the merge insert operation
|
||||
|
||||
Nothing is returned but the [`Table`][lancedb.table.Table] is updated
|
||||
"""
|
||||
self._table._do_merge(self, new_data)
|
||||
@@ -244,6 +244,10 @@ class RemoteTable(Table):
|
||||
result = self._conn._client.query(self._name, query)
|
||||
return result.to_arrow()
|
||||
|
||||
def _do_merge(self, *_args):
|
||||
"""_do_merge() is not supported on the LanceDB cloud yet"""
|
||||
return NotImplementedError("_do_merge() is not supported on the LanceDB cloud")
|
||||
|
||||
def delete(self, predicate: str):
|
||||
"""Delete rows from the table.
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@ from lance.vector import vec_to_table
|
||||
|
||||
from .common import DATA, VEC, VECTOR_COLUMN_NAME
|
||||
from .embeddings import EmbeddingFunctionConfig, EmbeddingFunctionRegistry
|
||||
from .merge import LanceMergeInsertBuilder
|
||||
from .pydantic import LanceModel, model_to_dict
|
||||
from .query import LanceQueryBuilder, Query
|
||||
from .util import (
|
||||
@@ -334,6 +335,64 @@ class Table(ABC):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def merge_insert(self, on: Union[str, Iterable[str]]) -> LanceMergeInsertBuilder:
|
||||
"""
|
||||
Returns a [`LanceMergeInsertBuilder`][lancedb.merge.LanceMergeInsertBuilder]
|
||||
that can be used to create a "merge insert" operation
|
||||
|
||||
This operation can add rows, update rows, and remove rows all in a single
|
||||
transaction. It is a very generic tool that can be used to create
|
||||
behaviors like "insert if not exists", "update or insert (i.e. upsert)",
|
||||
or even replace a portion of existing data with new data (e.g. replace
|
||||
all data where month="january")
|
||||
|
||||
The merge insert operation works by combining new data from a
|
||||
**source table** with existing data in a **target table** by using a
|
||||
join. There are three categories of records.
|
||||
|
||||
"Matched" records are records that exist in both the source table and
|
||||
the target table. "Not matched" records exist only in the source table
|
||||
(e.g. these are new data) "Not matched by source" records exist only
|
||||
in the target table (this is old data)
|
||||
|
||||
The builder returned by this method can be used to customize what
|
||||
should happen for each category of data.
|
||||
|
||||
Please note that the data may appear to be reordered as part of this
|
||||
operation. This is because updated rows will be deleted from the
|
||||
dataset and then reinserted at the end with the new values.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
|
||||
on: Union[str, Iterable[str]]
|
||||
A column (or columns) to join on. This is how records from the
|
||||
source table and target table are matched. Typically this is some
|
||||
kind of key or id column.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> import lancedb
|
||||
>>> data = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]})
|
||||
>>> db = lancedb.connect("./.lancedb")
|
||||
>>> table = db.create_table("my_table", data)
|
||||
>>> new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
|
||||
>>> # Perform a "upsert" operation
|
||||
>>> table.merge_insert("a") \\
|
||||
... .when_matched_update_all() \\
|
||||
... .when_not_matched_insert_all() \\
|
||||
... .execute(new_data)
|
||||
>>> # The order of new rows is non-deterministic since we use
|
||||
>>> # a hash-join as part of this operation and so we sort here
|
||||
>>> table.to_arrow().sort_by("a").to_pandas()
|
||||
a b
|
||||
0 1 b
|
||||
1 2 x
|
||||
2 3 y
|
||||
3 4 z
|
||||
"""
|
||||
return LanceMergeInsertBuilder(self, on)
|
||||
|
||||
@abstractmethod
|
||||
def search(
|
||||
self,
|
||||
@@ -414,6 +473,16 @@ class Table(ABC):
|
||||
def _execute_query(self, query: Query) -> pa.Table:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def _do_merge(
|
||||
self,
|
||||
merge: LanceMergeInsertBuilder,
|
||||
new_data: DATA,
|
||||
*,
|
||||
schema: Optional[pa.Schema] = None,
|
||||
):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def delete(self, where: str):
|
||||
"""Delete rows from the table.
|
||||
@@ -1196,6 +1265,18 @@ class LanceTable(Table):
|
||||
with_row_id=query.with_row_id,
|
||||
)
|
||||
|
||||
def _do_merge(self, merge: LanceMergeInsertBuilder, new_data: DATA, *, schema=None):
|
||||
ds = self.to_lance()
|
||||
builder = ds.merge_insert(merge._on)
|
||||
if merge._when_matched_update_all:
|
||||
builder.when_matched_update_all()
|
||||
if merge._when_not_matched_insert_all:
|
||||
builder.when_not_matched_insert_all()
|
||||
if merge._when_not_matched_by_source_delete:
|
||||
cond = merge._when_not_matched_by_source_condition
|
||||
builder.when_not_matched_by_source_delete(cond)
|
||||
builder.execute(new_data, schema=schema)
|
||||
|
||||
def cleanup_old_versions(
|
||||
self,
|
||||
older_than: Optional[timedelta] = None,
|
||||
|
||||
@@ -3,7 +3,7 @@ name = "lancedb"
|
||||
version = "0.5.1"
|
||||
dependencies = [
|
||||
"deprecation",
|
||||
"pylance==0.9.10",
|
||||
"pylance==0.9.11",
|
||||
"ratelimiter~=1.0",
|
||||
"retry>=0.9.2",
|
||||
"tqdm>=4.27.0",
|
||||
|
||||
@@ -493,6 +493,62 @@ def test_update_types(db):
|
||||
assert actual == expected
|
||||
|
||||
|
||||
def test_merge_insert(db):
|
||||
table = LanceTable.create(
|
||||
db,
|
||||
"my_table",
|
||||
data=pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}),
|
||||
)
|
||||
assert len(table) == 3
|
||||
version = table.version
|
||||
|
||||
new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
|
||||
|
||||
# upsert
|
||||
table.merge_insert(
|
||||
"a"
|
||||
).when_matched_update_all().when_not_matched_insert_all().execute(new_data)
|
||||
|
||||
expected = pa.table({"a": [1, 2, 3, 4], "b": ["a", "x", "y", "z"]})
|
||||
# These `sort_by` calls can be removed once lance#1892
|
||||
# is merged (it fixes the ordering)
|
||||
assert table.to_arrow().sort_by("a") == expected
|
||||
|
||||
table.restore(version)
|
||||
|
||||
# insert-if-not-exists
|
||||
table.merge_insert("a").when_not_matched_insert_all().execute(new_data)
|
||||
|
||||
expected = pa.table({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "z"]})
|
||||
assert table.to_arrow().sort_by("a") == expected
|
||||
|
||||
table.restore(version)
|
||||
|
||||
new_data = pa.table({"a": [2, 4], "b": ["x", "z"]})
|
||||
|
||||
# replace-range
|
||||
table.merge_insert(
|
||||
"a"
|
||||
).when_matched_update_all().when_not_matched_insert_all().when_not_matched_by_source_delete(
|
||||
"a > 2"
|
||||
).execute(new_data)
|
||||
|
||||
expected = pa.table({"a": [1, 2, 4], "b": ["a", "x", "z"]})
|
||||
assert table.to_arrow().sort_by("a") == expected
|
||||
|
||||
table.restore(version)
|
||||
|
||||
# replace-range no condition
|
||||
table.merge_insert(
|
||||
"a"
|
||||
).when_matched_update_all().when_not_matched_insert_all().when_not_matched_by_source_delete().execute(
|
||||
new_data
|
||||
)
|
||||
|
||||
expected = pa.table({"a": [2, 4], "b": ["x", "z"]})
|
||||
assert table.to_arrow().sort_by("a") == expected
|
||||
|
||||
|
||||
def test_create_with_embedding_function(db):
|
||||
class MyTable(LanceModel):
|
||||
text: str
|
||||
|
||||
@@ -22,7 +22,6 @@ object_store = { workspace = true }
|
||||
snafu = { workspace = true }
|
||||
half = { workspace = true }
|
||||
lance = { workspace = true }
|
||||
lance-datafusion = { workspace = true }
|
||||
lance-index = { workspace = true }
|
||||
lance-linalg = { workspace = true }
|
||||
lance-testing = { workspace = true }
|
||||
|
||||
@@ -175,7 +175,6 @@ pub mod error;
|
||||
pub mod index;
|
||||
pub mod io;
|
||||
pub mod ipc;
|
||||
pub mod merge_insert;
|
||||
pub mod query;
|
||||
pub mod table;
|
||||
pub mod utils;
|
||||
|
||||
@@ -1,101 +0,0 @@
|
||||
// Copyright 2024 Lance Developers.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use arrow_array::RecordBatchReader;
|
||||
use lance::dataset::{self, WhenMatched, WhenNotMatched, WhenNotMatchedBySource};
|
||||
use lance_datafusion::utils::reader_to_stream;
|
||||
|
||||
use crate::TableRef;
|
||||
use crate::error::{Error, Result};
|
||||
|
||||
pub struct MergeInsertBuilder {
|
||||
table: TableRef,
|
||||
when_matched_update_all: bool,
|
||||
when_not_matched_insert_all: bool,
|
||||
when_not_matched_by_source_delete: bool,
|
||||
when_not_matched_by_source_condition: bool,
|
||||
}
|
||||
|
||||
impl MergeInsertBuilder {
|
||||
pub(crate) fn new(table: TableRef) -> Self {
|
||||
Self {
|
||||
table,
|
||||
when_matched_update_all: false,
|
||||
when_not_matched_insert_all: false,
|
||||
when_not_matched_by_source_delete: false,
|
||||
when_not_matched_by_source_condition: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn when_matched_update_all(mut self) -> Self {
|
||||
self.when_matched_update_all = true;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn when_not_matched_insert_all(mut self) -> Self {
|
||||
self.when_not_matched_insert_all = true;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn when_not_matched_by_source_delete(mut self) -> Self {
|
||||
self.when_not_matched_by_source_delete = true;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn when_not_matched_by_source_condition(mut self) -> Self {
|
||||
self.when_not_matched_by_source_condition = true;
|
||||
self
|
||||
}
|
||||
|
||||
pub async fn execute(
|
||||
mut self,
|
||||
batches: Box<dyn RecordBatchReader + Send>,
|
||||
) -> Result<()> {
|
||||
let native_table = self.table.as_native().unwrap(); // TODO no unwrap
|
||||
let ds = native_table.clone_inner_dataset();
|
||||
let mut builder = dataset::MergeInsertBuilder::try_new(
|
||||
Arc::new(ds),
|
||||
vec!["vectors".to_string()],
|
||||
)
|
||||
.unwrap(); // TODO no unwrap
|
||||
|
||||
if self.when_matched_update_all {
|
||||
builder.when_matched(WhenMatched::UpdateAll);
|
||||
}
|
||||
|
||||
if self.when_not_matched_insert_all {
|
||||
builder.when_not_matched(WhenNotMatched::InsertAll);
|
||||
}
|
||||
|
||||
if self.when_not_matched_by_source_delete {
|
||||
builder.when_not_matched_by_source(WhenNotMatchedBySource::Delete);
|
||||
}
|
||||
|
||||
// TODO
|
||||
// if self.when_not_matched_by_source_condition {
|
||||
// builder.when_not_matched_by_source(WhenNotMatchedBySource::DeleteIf(()));
|
||||
// }
|
||||
|
||||
let job = builder.try_build().unwrap(); // TODO no unwrap
|
||||
let batches = reader_to_stream(batches).await.unwrap().0; // TODO no unwrap
|
||||
let ds2 = job.execute(batches).await.unwrap(); // TODO no unwrap
|
||||
|
||||
native_table.reset_dataset(ds2.as_ref().clone());
|
||||
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -34,7 +34,6 @@ use log::info;
|
||||
use crate::error::{Error, Result};
|
||||
use crate::index::vector::{VectorIndex, VectorIndexStatistics};
|
||||
use crate::index::IndexBuilder;
|
||||
use crate::merge_insert::MergeInsertBuilder;
|
||||
use crate::query::Query;
|
||||
use crate::utils::{PatchReadParam, PatchWriteParam};
|
||||
use crate::WriteMode;
|
||||
@@ -242,8 +241,6 @@ pub trait Table: std::fmt::Display + Send + Sync {
|
||||
/// Modeled after ``VACCUM`` in PostgreSQL.
|
||||
/// Not all implementations support explicit optimization.
|
||||
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
|
||||
|
||||
fn merge_insert(&self) -> MergeInsertBuilder;
|
||||
}
|
||||
|
||||
/// Reference to a Table pointer.
|
||||
@@ -701,10 +698,6 @@ impl Table for NativeTable {
|
||||
}
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
fn merge_insert(&self) -> MergeInsertBuilder {
|
||||
MergeInsertBuilder::new(Arc::new(self.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
Reference in New Issue
Block a user