mirror of
https://github.com/lancedb/lancedb.git
synced 2026-01-07 12:22:59 +00:00
Compare commits
3 Commits
small-doc-
...
merge_inse
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2bbc56b9f9 | ||
|
|
65c9c0ba9b | ||
|
|
e2e45dd5a6 |
2
.github/workflows/docs.yml
vendored
2
.github/workflows/docs.yml
vendored
@@ -29,7 +29,7 @@ jobs:
|
|||||||
- name: Checkout
|
- name: Checkout
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v4
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v5
|
uses: actions/setup-python@v4
|
||||||
with:
|
with:
|
||||||
python-version: "3.10"
|
python-version: "3.10"
|
||||||
cache: "pip"
|
cache: "pip"
|
||||||
|
|||||||
2
.github/workflows/docs_test.yml
vendored
2
.github/workflows/docs_test.yml
vendored
@@ -29,7 +29,7 @@ jobs:
|
|||||||
- name: Checkout
|
- name: Checkout
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v4
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v5
|
uses: actions/setup-python@v4
|
||||||
with:
|
with:
|
||||||
python-version: 3.11
|
python-version: 3.11
|
||||||
cache: "pip"
|
cache: "pip"
|
||||||
|
|||||||
6
.github/workflows/make-release-commit.yml
vendored
6
.github/workflows/make-release-commit.yml
vendored
@@ -37,10 +37,10 @@ jobs:
|
|||||||
run: |
|
run: |
|
||||||
git config user.name 'Lance Release'
|
git config user.name 'Lance Release'
|
||||||
git config user.email 'lance-dev@lancedb.com'
|
git config user.email 'lance-dev@lancedb.com'
|
||||||
- name: Set up Python 3.11
|
- name: Set up Python 3.10
|
||||||
uses: actions/setup-python@v5
|
uses: actions/setup-python@v4
|
||||||
with:
|
with:
|
||||||
python-version: "3.11"
|
python-version: "3.10"
|
||||||
- name: Bump version, create tag and commit
|
- name: Bump version, create tag and commit
|
||||||
run: |
|
run: |
|
||||||
pip install bump2version
|
pip install bump2version
|
||||||
|
|||||||
2
.github/workflows/pypi-publish.yml
vendored
2
.github/workflows/pypi-publish.yml
vendored
@@ -16,7 +16,7 @@ jobs:
|
|||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v4
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v5
|
uses: actions/setup-python@v4
|
||||||
with:
|
with:
|
||||||
python-version: "3.8"
|
python-version: "3.8"
|
||||||
- name: Build distribution
|
- name: Build distribution
|
||||||
|
|||||||
@@ -37,10 +37,10 @@ jobs:
|
|||||||
run: |
|
run: |
|
||||||
git config user.name 'Lance Release'
|
git config user.name 'Lance Release'
|
||||||
git config user.email 'lance-dev@lancedb.com'
|
git config user.email 'lance-dev@lancedb.com'
|
||||||
- name: Set up Python
|
- name: Set up Python 3.10
|
||||||
uses: actions/setup-python@v5
|
uses: actions/setup-python@v4
|
||||||
with:
|
with:
|
||||||
python-version: "3.11"
|
python-version: "3.10"
|
||||||
- name: Bump version, create tag and commit
|
- name: Bump version, create tag and commit
|
||||||
working-directory: python
|
working-directory: python
|
||||||
run: |
|
run: |
|
||||||
|
|||||||
6
.github/workflows/python.yml
vendored
6
.github/workflows/python.yml
vendored
@@ -30,7 +30,7 @@ jobs:
|
|||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
lfs: true
|
lfs: true
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v5
|
uses: actions/setup-python@v4
|
||||||
with:
|
with:
|
||||||
python-version: 3.${{ matrix.python-minor-version }}
|
python-version: 3.${{ matrix.python-minor-version }}
|
||||||
- name: Install lancedb
|
- name: Install lancedb
|
||||||
@@ -69,7 +69,7 @@ jobs:
|
|||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
lfs: true
|
lfs: true
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v5
|
uses: actions/setup-python@v4
|
||||||
with:
|
with:
|
||||||
python-version: "3.11"
|
python-version: "3.11"
|
||||||
- name: Install lancedb
|
- name: Install lancedb
|
||||||
@@ -92,7 +92,7 @@ jobs:
|
|||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
lfs: true
|
lfs: true
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v5
|
uses: actions/setup-python@v4
|
||||||
with:
|
with:
|
||||||
python-version: 3.9
|
python-version: 3.9
|
||||||
- name: Install lancedb
|
- name: Install lancedb
|
||||||
|
|||||||
@@ -11,10 +11,11 @@ license = "Apache-2.0"
|
|||||||
repository = "https://github.com/lancedb/lancedb"
|
repository = "https://github.com/lancedb/lancedb"
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
lance = { "version" = "=0.9.10", "features" = ["dynamodb"] }
|
lance = { "version" = "=0.9.11", "features" = ["dynamodb"] }
|
||||||
lance-index = { "version" = "=0.9.10" }
|
lance-datafusion = { "version" = "=0.9.11" }
|
||||||
lance-linalg = { "version" = "=0.9.10" }
|
lance-index = { "version" = "=0.9.11" }
|
||||||
lance-testing = { "version" = "=0.9.10" }
|
lance-linalg = { "version" = "=0.9.11" }
|
||||||
|
lance-testing = { "version" = "=0.9.11" }
|
||||||
# Note that this one does not include pyarrow
|
# Note that this one does not include pyarrow
|
||||||
arrow = { version = "50.0", optional = false }
|
arrow = { version = "50.0", optional = false }
|
||||||
arrow-array = "50.0"
|
arrow-array = "50.0"
|
||||||
|
|||||||
@@ -84,7 +84,7 @@ This guide will show how to create tables, insert data into them, and update the
|
|||||||
const table = await con.createTable(tableName, data, { writeMode: WriteMode.Overwrite })
|
const table = await con.createTable(tableName, data, { writeMode: WriteMode.Overwrite })
|
||||||
```
|
```
|
||||||
|
|
||||||
### From a Pandas DataFrame
|
### From a Pandas DataFrame
|
||||||
|
|
||||||
```python
|
```python
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
|||||||
@@ -58,8 +58,6 @@ pip install lancedb
|
|||||||
|
|
||||||
::: lancedb.schema.vector
|
::: lancedb.schema.vector
|
||||||
|
|
||||||
::: lancedb.merge.LanceMergeInsertBuilder
|
|
||||||
|
|
||||||
## Integrations
|
## Integrations
|
||||||
|
|
||||||
### Pydantic
|
### Pydantic
|
||||||
|
|||||||
@@ -17,11 +17,7 @@
|
|||||||
},
|
},
|
||||||
"repository": {
|
"repository": {
|
||||||
"type": "git",
|
"type": "git",
|
||||||
"url": "https://github.com/lancedb/lancedb.git"
|
"url": "https://github.com/lancedb/lancedb/node"
|
||||||
},
|
|
||||||
"homepage": "https://lancedb.github.io/lancedb/",
|
|
||||||
"bugs": {
|
|
||||||
"url": "https://github.com/lancedb/lancedb/issues"
|
|
||||||
},
|
},
|
||||||
"keywords": [
|
"keywords": [
|
||||||
"data-format",
|
"data-format",
|
||||||
|
|||||||
@@ -451,6 +451,11 @@ export interface Table<T = number[]> {
|
|||||||
indexStats: (indexUuid: string) => Promise<IndexStats>
|
indexStats: (indexUuid: string) => Promise<IndexStats>
|
||||||
|
|
||||||
filter(value: string): Query<T>
|
filter(value: string): Query<T>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TODO comment
|
||||||
|
*/
|
||||||
|
mergeInsert: () => MergeInsertBuilder
|
||||||
|
|
||||||
schema: Promise<Schema>
|
schema: Promise<Schema>
|
||||||
}
|
}
|
||||||
@@ -900,6 +905,15 @@ export class LocalTable<T = number[]> implements Table<T> {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mergeInsert: () => MergeInsertBuilder = () => {
|
||||||
|
return new MergeInsertBuilder(async (args: {
|
||||||
|
params: MergeInsertParams
|
||||||
|
data: Array<Record<string, unknown>> | ArrowTable
|
||||||
|
}) => {
|
||||||
|
throw new Error('Not implemented')
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface CleanupStats {
|
export interface CleanupStats {
|
||||||
@@ -1076,3 +1090,56 @@ export enum MetricType {
|
|||||||
*/
|
*/
|
||||||
Dot = 'dot',
|
Dot = 'dot',
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface MergeInsertParams {
|
||||||
|
whenMatchedUpdateAll: boolean
|
||||||
|
whenNotMatchedInsertAll: boolean
|
||||||
|
whenNotMatchedBySourceDelete: boolean
|
||||||
|
whenNotMatchedBySourceCondition: boolean
|
||||||
|
}
|
||||||
|
|
||||||
|
type MergeInsertCallback = (args: {
|
||||||
|
params: MergeInsertParams
|
||||||
|
data: Array<Record<string, unknown>> | ArrowTable
|
||||||
|
}) => Promise<void>
|
||||||
|
|
||||||
|
export class MergeInsertBuilder {
|
||||||
|
readonly #callback: MergeInsertCallback
|
||||||
|
readonly #params: MergeInsertParams
|
||||||
|
|
||||||
|
constructor (callback: MergeInsertCallback) {
|
||||||
|
this.#callback = callback
|
||||||
|
this.#params = {
|
||||||
|
whenMatchedUpdateAll: false,
|
||||||
|
whenNotMatchedInsertAll: false,
|
||||||
|
whenNotMatchedBySourceDelete: false,
|
||||||
|
whenNotMatchedBySourceCondition: false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
whenMatchedUpdateAll (): MergeInsertBuilder {
|
||||||
|
this.#params.whenMatchedUpdateAll = true
|
||||||
|
return this
|
||||||
|
}
|
||||||
|
|
||||||
|
whenNotMatchedInsertAll (): MergeInsertBuilder {
|
||||||
|
this.#params.whenNotMatchedInsertAll = true
|
||||||
|
return this
|
||||||
|
}
|
||||||
|
|
||||||
|
whenNotMatchedBySourceDelete (): MergeInsertBuilder {
|
||||||
|
this.#params.whenNotMatchedBySourceDelete = true
|
||||||
|
return this
|
||||||
|
}
|
||||||
|
|
||||||
|
whenNotMatchedBySourceCondition (): MergeInsertBuilder {
|
||||||
|
this.#params.whenNotMatchedBySourceCondition = true
|
||||||
|
return this
|
||||||
|
}
|
||||||
|
|
||||||
|
async execute ({ data }: {
|
||||||
|
data: Array<Record<string, unknown>> | ArrowTable
|
||||||
|
}): Promise<void> {
|
||||||
|
await this.#callback({ params: this.#params, data })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -120,7 +120,7 @@ export class HttpLancedbClient {
|
|||||||
public async post (
|
public async post (
|
||||||
path: string,
|
path: string,
|
||||||
data?: any,
|
data?: any,
|
||||||
params?: Record<string, string | number>,
|
params?: Record<string, string | number | boolean>,
|
||||||
content?: string | undefined
|
content?: string | undefined
|
||||||
): Promise<AxiosResponse> {
|
): Promise<AxiosResponse> {
|
||||||
const response = await axios.post(
|
const response = await axios.post(
|
||||||
|
|||||||
@@ -24,6 +24,8 @@ import {
|
|||||||
type IndexStats,
|
type IndexStats,
|
||||||
type UpdateArgs,
|
type UpdateArgs,
|
||||||
type UpdateSqlArgs,
|
type UpdateSqlArgs,
|
||||||
|
type MergeInsertParams,
|
||||||
|
MergeInsertBuilder,
|
||||||
makeArrowTable
|
makeArrowTable
|
||||||
} from '../index'
|
} from '../index'
|
||||||
import { Query } from '../query'
|
import { Query } from '../query'
|
||||||
@@ -424,4 +426,36 @@ export class RemoteTable<T = number[]> implements Table<T> {
|
|||||||
numUnindexedRows: results.data.num_unindexed_rows
|
numUnindexedRows: results.data.num_unindexed_rows
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mergeInsert: () => MergeInsertBuilder = () => {
|
||||||
|
return new MergeInsertBuilder(async ({ data, params }: {
|
||||||
|
params: MergeInsertParams
|
||||||
|
data: Array<Record<string, unknown>> | ArrowTable
|
||||||
|
}) => {
|
||||||
|
// TODO -- uncomment this this
|
||||||
|
// let tbl: ArrowTable
|
||||||
|
// if (data instanceof ArrowTable) {
|
||||||
|
// tbl = data
|
||||||
|
// } else {
|
||||||
|
// tbl = makeArrowTable(data, await this.schema)
|
||||||
|
// }
|
||||||
|
const tbl = data as ArrowTable
|
||||||
|
|
||||||
|
const buffer = await fromTableToStreamBuffer(tbl, this._embeddings)
|
||||||
|
|
||||||
|
console.log({ buffer })
|
||||||
|
|
||||||
|
await this._client.post(
|
||||||
|
`/v1/table/${this._name}/merge_insert/`,
|
||||||
|
buffer,
|
||||||
|
{
|
||||||
|
when_matched_update_all: params.whenMatchedUpdateAll,
|
||||||
|
when_not_matched_insert_all: params.whenNotMatchedInsertAll,
|
||||||
|
when_not_matched_by_source_delete: params.whenNotMatchedBySourceDelete,
|
||||||
|
when_not_matched_by_source_condition: params.whenNotMatchedBySourceCondition
|
||||||
|
},
|
||||||
|
'application/vnd.apache.arrow.stream'
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,86 +0,0 @@
|
|||||||
# Copyright 2023 LanceDB Developers
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from typing import TYPE_CHECKING, Iterable, Optional
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from .common import DATA
|
|
||||||
|
|
||||||
|
|
||||||
class LanceMergeInsertBuilder(object):
|
|
||||||
"""Builder for a LanceDB merge insert operation
|
|
||||||
|
|
||||||
See [`merge_insert`][lancedb.table.Table.merge_insert] for
|
|
||||||
more context
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, table: "Table", on: Iterable[str]): # noqa: F821
|
|
||||||
# Do not put a docstring here. This method should be hidden
|
|
||||||
# from API docs. Users should use merge_insert to create
|
|
||||||
# this object.
|
|
||||||
self._table = table
|
|
||||||
self._on = on
|
|
||||||
self._when_matched_update_all = False
|
|
||||||
self._when_not_matched_insert_all = False
|
|
||||||
self._when_not_matched_by_source_delete = False
|
|
||||||
self._when_not_matched_by_source_condition = None
|
|
||||||
|
|
||||||
def when_matched_update_all(self) -> LanceMergeInsertBuilder:
|
|
||||||
"""
|
|
||||||
Rows that exist in both the source table (new data) and
|
|
||||||
the target table (old data) will be updated, replacing
|
|
||||||
the old row with the corresponding matching row.
|
|
||||||
|
|
||||||
If there are multiple matches then the behavior is undefined.
|
|
||||||
Currently this causes multiple copies of the row to be created
|
|
||||||
but that behavior is subject to change.
|
|
||||||
"""
|
|
||||||
self._when_matched_update_all = True
|
|
||||||
return self
|
|
||||||
|
|
||||||
def when_not_matched_insert_all(self) -> LanceMergeInsertBuilder:
|
|
||||||
"""
|
|
||||||
Rows that exist only in the source table (new data) should
|
|
||||||
be inserted into the target table.
|
|
||||||
"""
|
|
||||||
self._when_not_matched_insert_all = True
|
|
||||||
return self
|
|
||||||
|
|
||||||
def when_not_matched_by_source_delete(
|
|
||||||
self, condition: Optional[str] = None
|
|
||||||
) -> LanceMergeInsertBuilder:
|
|
||||||
"""
|
|
||||||
Rows that exist only in the target table (old data) will be
|
|
||||||
deleted. An optional condition can be provided to limit what
|
|
||||||
data is deleted.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
condition: Optional[str], default None
|
|
||||||
If None then all such rows will be deleted. Otherwise the
|
|
||||||
condition will be used as an SQL filter to limit what rows
|
|
||||||
are deleted.
|
|
||||||
"""
|
|
||||||
self._when_not_matched_by_source_delete = True
|
|
||||||
if condition is not None:
|
|
||||||
self._when_not_matched_by_source_condition = condition
|
|
||||||
return self
|
|
||||||
|
|
||||||
def execute(self, new_data: DATA):
|
|
||||||
"""
|
|
||||||
Executes the merge insert operation
|
|
||||||
|
|
||||||
Nothing is returned but the [`Table`][lancedb.table.Table] is updated
|
|
||||||
"""
|
|
||||||
self._table._do_merge(self, new_data)
|
|
||||||
@@ -244,10 +244,6 @@ class RemoteTable(Table):
|
|||||||
result = self._conn._client.query(self._name, query)
|
result = self._conn._client.query(self._name, query)
|
||||||
return result.to_arrow()
|
return result.to_arrow()
|
||||||
|
|
||||||
def _do_merge(self, *_args):
|
|
||||||
"""_do_merge() is not supported on the LanceDB cloud yet"""
|
|
||||||
return NotImplementedError("_do_merge() is not supported on the LanceDB cloud")
|
|
||||||
|
|
||||||
def delete(self, predicate: str):
|
def delete(self, predicate: str):
|
||||||
"""Delete rows from the table.
|
"""Delete rows from the table.
|
||||||
|
|
||||||
|
|||||||
@@ -28,7 +28,6 @@ from lance.vector import vec_to_table
|
|||||||
|
|
||||||
from .common import DATA, VEC, VECTOR_COLUMN_NAME
|
from .common import DATA, VEC, VECTOR_COLUMN_NAME
|
||||||
from .embeddings import EmbeddingFunctionConfig, EmbeddingFunctionRegistry
|
from .embeddings import EmbeddingFunctionConfig, EmbeddingFunctionRegistry
|
||||||
from .merge import LanceMergeInsertBuilder
|
|
||||||
from .pydantic import LanceModel, model_to_dict
|
from .pydantic import LanceModel, model_to_dict
|
||||||
from .query import LanceQueryBuilder, Query
|
from .query import LanceQueryBuilder, Query
|
||||||
from .util import (
|
from .util import (
|
||||||
@@ -335,64 +334,6 @@ class Table(ABC):
|
|||||||
"""
|
"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
def merge_insert(self, on: Union[str, Iterable[str]]) -> LanceMergeInsertBuilder:
|
|
||||||
"""
|
|
||||||
Returns a [`LanceMergeInsertBuilder`][lancedb.merge.LanceMergeInsertBuilder]
|
|
||||||
that can be used to create a "merge insert" operation
|
|
||||||
|
|
||||||
This operation can add rows, update rows, and remove rows all in a single
|
|
||||||
transaction. It is a very generic tool that can be used to create
|
|
||||||
behaviors like "insert if not exists", "update or insert (i.e. upsert)",
|
|
||||||
or even replace a portion of existing data with new data (e.g. replace
|
|
||||||
all data where month="january")
|
|
||||||
|
|
||||||
The merge insert operation works by combining new data from a
|
|
||||||
**source table** with existing data in a **target table** by using a
|
|
||||||
join. There are three categories of records.
|
|
||||||
|
|
||||||
"Matched" records are records that exist in both the source table and
|
|
||||||
the target table. "Not matched" records exist only in the source table
|
|
||||||
(e.g. these are new data) "Not matched by source" records exist only
|
|
||||||
in the target table (this is old data)
|
|
||||||
|
|
||||||
The builder returned by this method can be used to customize what
|
|
||||||
should happen for each category of data.
|
|
||||||
|
|
||||||
Please note that the data may appear to be reordered as part of this
|
|
||||||
operation. This is because updated rows will be deleted from the
|
|
||||||
dataset and then reinserted at the end with the new values.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
|
|
||||||
on: Union[str, Iterable[str]]
|
|
||||||
A column (or columns) to join on. This is how records from the
|
|
||||||
source table and target table are matched. Typically this is some
|
|
||||||
kind of key or id column.
|
|
||||||
|
|
||||||
Examples
|
|
||||||
--------
|
|
||||||
>>> import lancedb
|
|
||||||
>>> data = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]})
|
|
||||||
>>> db = lancedb.connect("./.lancedb")
|
|
||||||
>>> table = db.create_table("my_table", data)
|
|
||||||
>>> new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
|
|
||||||
>>> # Perform a "upsert" operation
|
|
||||||
>>> table.merge_insert("a") \\
|
|
||||||
... .when_matched_update_all() \\
|
|
||||||
... .when_not_matched_insert_all() \\
|
|
||||||
... .execute(new_data)
|
|
||||||
>>> # The order of new rows is non-deterministic since we use
|
|
||||||
>>> # a hash-join as part of this operation and so we sort here
|
|
||||||
>>> table.to_arrow().sort_by("a").to_pandas()
|
|
||||||
a b
|
|
||||||
0 1 b
|
|
||||||
1 2 x
|
|
||||||
2 3 y
|
|
||||||
3 4 z
|
|
||||||
"""
|
|
||||||
return LanceMergeInsertBuilder(self, on)
|
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def search(
|
def search(
|
||||||
self,
|
self,
|
||||||
@@ -473,16 +414,6 @@ class Table(ABC):
|
|||||||
def _execute_query(self, query: Query) -> pa.Table:
|
def _execute_query(self, query: Query) -> pa.Table:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def _do_merge(
|
|
||||||
self,
|
|
||||||
merge: LanceMergeInsertBuilder,
|
|
||||||
new_data: DATA,
|
|
||||||
*,
|
|
||||||
schema: Optional[pa.Schema] = None,
|
|
||||||
):
|
|
||||||
pass
|
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def delete(self, where: str):
|
def delete(self, where: str):
|
||||||
"""Delete rows from the table.
|
"""Delete rows from the table.
|
||||||
@@ -1265,18 +1196,6 @@ class LanceTable(Table):
|
|||||||
with_row_id=query.with_row_id,
|
with_row_id=query.with_row_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _do_merge(self, merge: LanceMergeInsertBuilder, new_data: DATA, *, schema=None):
|
|
||||||
ds = self.to_lance()
|
|
||||||
builder = ds.merge_insert(merge._on)
|
|
||||||
if merge._when_matched_update_all:
|
|
||||||
builder.when_matched_update_all()
|
|
||||||
if merge._when_not_matched_insert_all:
|
|
||||||
builder.when_not_matched_insert_all()
|
|
||||||
if merge._when_not_matched_by_source_delete:
|
|
||||||
cond = merge._when_not_matched_by_source_condition
|
|
||||||
builder.when_not_matched_by_source_delete(cond)
|
|
||||||
builder.execute(new_data, schema=schema)
|
|
||||||
|
|
||||||
def cleanup_old_versions(
|
def cleanup_old_versions(
|
||||||
self,
|
self,
|
||||||
older_than: Optional[timedelta] = None,
|
older_than: Optional[timedelta] = None,
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ name = "lancedb"
|
|||||||
version = "0.5.1"
|
version = "0.5.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"deprecation",
|
"deprecation",
|
||||||
"pylance==0.9.11",
|
"pylance==0.9.10",
|
||||||
"ratelimiter~=1.0",
|
"ratelimiter~=1.0",
|
||||||
"retry>=0.9.2",
|
"retry>=0.9.2",
|
||||||
"tqdm>=4.27.0",
|
"tqdm>=4.27.0",
|
||||||
|
|||||||
@@ -493,62 +493,6 @@ def test_update_types(db):
|
|||||||
assert actual == expected
|
assert actual == expected
|
||||||
|
|
||||||
|
|
||||||
def test_merge_insert(db):
|
|
||||||
table = LanceTable.create(
|
|
||||||
db,
|
|
||||||
"my_table",
|
|
||||||
data=pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}),
|
|
||||||
)
|
|
||||||
assert len(table) == 3
|
|
||||||
version = table.version
|
|
||||||
|
|
||||||
new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
|
|
||||||
|
|
||||||
# upsert
|
|
||||||
table.merge_insert(
|
|
||||||
"a"
|
|
||||||
).when_matched_update_all().when_not_matched_insert_all().execute(new_data)
|
|
||||||
|
|
||||||
expected = pa.table({"a": [1, 2, 3, 4], "b": ["a", "x", "y", "z"]})
|
|
||||||
# These `sort_by` calls can be removed once lance#1892
|
|
||||||
# is merged (it fixes the ordering)
|
|
||||||
assert table.to_arrow().sort_by("a") == expected
|
|
||||||
|
|
||||||
table.restore(version)
|
|
||||||
|
|
||||||
# insert-if-not-exists
|
|
||||||
table.merge_insert("a").when_not_matched_insert_all().execute(new_data)
|
|
||||||
|
|
||||||
expected = pa.table({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "z"]})
|
|
||||||
assert table.to_arrow().sort_by("a") == expected
|
|
||||||
|
|
||||||
table.restore(version)
|
|
||||||
|
|
||||||
new_data = pa.table({"a": [2, 4], "b": ["x", "z"]})
|
|
||||||
|
|
||||||
# replace-range
|
|
||||||
table.merge_insert(
|
|
||||||
"a"
|
|
||||||
).when_matched_update_all().when_not_matched_insert_all().when_not_matched_by_source_delete(
|
|
||||||
"a > 2"
|
|
||||||
).execute(new_data)
|
|
||||||
|
|
||||||
expected = pa.table({"a": [1, 2, 4], "b": ["a", "x", "z"]})
|
|
||||||
assert table.to_arrow().sort_by("a") == expected
|
|
||||||
|
|
||||||
table.restore(version)
|
|
||||||
|
|
||||||
# replace-range no condition
|
|
||||||
table.merge_insert(
|
|
||||||
"a"
|
|
||||||
).when_matched_update_all().when_not_matched_insert_all().when_not_matched_by_source_delete().execute(
|
|
||||||
new_data
|
|
||||||
)
|
|
||||||
|
|
||||||
expected = pa.table({"a": [2, 4], "b": ["x", "z"]})
|
|
||||||
assert table.to_arrow().sort_by("a") == expected
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_with_embedding_function(db):
|
def test_create_with_embedding_function(db):
|
||||||
class MyTable(LanceModel):
|
class MyTable(LanceModel):
|
||||||
text: str
|
text: str
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ object_store = { workspace = true }
|
|||||||
snafu = { workspace = true }
|
snafu = { workspace = true }
|
||||||
half = { workspace = true }
|
half = { workspace = true }
|
||||||
lance = { workspace = true }
|
lance = { workspace = true }
|
||||||
|
lance-datafusion = { workspace = true }
|
||||||
lance-index = { workspace = true }
|
lance-index = { workspace = true }
|
||||||
lance-linalg = { workspace = true }
|
lance-linalg = { workspace = true }
|
||||||
lance-testing = { workspace = true }
|
lance-testing = { workspace = true }
|
||||||
|
|||||||
@@ -175,6 +175,7 @@ pub mod error;
|
|||||||
pub mod index;
|
pub mod index;
|
||||||
pub mod io;
|
pub mod io;
|
||||||
pub mod ipc;
|
pub mod ipc;
|
||||||
|
pub mod merge_insert;
|
||||||
pub mod query;
|
pub mod query;
|
||||||
pub mod table;
|
pub mod table;
|
||||||
pub mod utils;
|
pub mod utils;
|
||||||
|
|||||||
101
rust/vectordb/src/merge_insert.rs
Normal file
101
rust/vectordb/src/merge_insert.rs
Normal file
@@ -0,0 +1,101 @@
|
|||||||
|
// Copyright 2024 Lance Developers.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use arrow_array::RecordBatchReader;
|
||||||
|
use lance::dataset::{self, WhenMatched, WhenNotMatched, WhenNotMatchedBySource};
|
||||||
|
use lance_datafusion::utils::reader_to_stream;
|
||||||
|
|
||||||
|
use crate::TableRef;
|
||||||
|
use crate::error::{Error, Result};
|
||||||
|
|
||||||
|
pub struct MergeInsertBuilder {
|
||||||
|
table: TableRef,
|
||||||
|
when_matched_update_all: bool,
|
||||||
|
when_not_matched_insert_all: bool,
|
||||||
|
when_not_matched_by_source_delete: bool,
|
||||||
|
when_not_matched_by_source_condition: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MergeInsertBuilder {
|
||||||
|
pub(crate) fn new(table: TableRef) -> Self {
|
||||||
|
Self {
|
||||||
|
table,
|
||||||
|
when_matched_update_all: false,
|
||||||
|
when_not_matched_insert_all: false,
|
||||||
|
when_not_matched_by_source_delete: false,
|
||||||
|
when_not_matched_by_source_condition: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn when_matched_update_all(mut self) -> Self {
|
||||||
|
self.when_matched_update_all = true;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn when_not_matched_insert_all(mut self) -> Self {
|
||||||
|
self.when_not_matched_insert_all = true;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn when_not_matched_by_source_delete(mut self) -> Self {
|
||||||
|
self.when_not_matched_by_source_delete = true;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn when_not_matched_by_source_condition(mut self) -> Self {
|
||||||
|
self.when_not_matched_by_source_condition = true;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn execute(
|
||||||
|
mut self,
|
||||||
|
batches: Box<dyn RecordBatchReader + Send>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let native_table = self.table.as_native().unwrap(); // TODO no unwrap
|
||||||
|
let ds = native_table.clone_inner_dataset();
|
||||||
|
let mut builder = dataset::MergeInsertBuilder::try_new(
|
||||||
|
Arc::new(ds),
|
||||||
|
vec!["vectors".to_string()],
|
||||||
|
)
|
||||||
|
.unwrap(); // TODO no unwrap
|
||||||
|
|
||||||
|
if self.when_matched_update_all {
|
||||||
|
builder.when_matched(WhenMatched::UpdateAll);
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.when_not_matched_insert_all {
|
||||||
|
builder.when_not_matched(WhenNotMatched::InsertAll);
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.when_not_matched_by_source_delete {
|
||||||
|
builder.when_not_matched_by_source(WhenNotMatchedBySource::Delete);
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO
|
||||||
|
// if self.when_not_matched_by_source_condition {
|
||||||
|
// builder.when_not_matched_by_source(WhenNotMatchedBySource::DeleteIf(()));
|
||||||
|
// }
|
||||||
|
|
||||||
|
let job = builder.try_build().unwrap(); // TODO no unwrap
|
||||||
|
let batches = reader_to_stream(batches).await.unwrap().0; // TODO no unwrap
|
||||||
|
let ds2 = job.execute(batches).await.unwrap(); // TODO no unwrap
|
||||||
|
|
||||||
|
native_table.reset_dataset(ds2.as_ref().clone());
|
||||||
|
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -34,6 +34,7 @@ use log::info;
|
|||||||
use crate::error::{Error, Result};
|
use crate::error::{Error, Result};
|
||||||
use crate::index::vector::{VectorIndex, VectorIndexStatistics};
|
use crate::index::vector::{VectorIndex, VectorIndexStatistics};
|
||||||
use crate::index::IndexBuilder;
|
use crate::index::IndexBuilder;
|
||||||
|
use crate::merge_insert::MergeInsertBuilder;
|
||||||
use crate::query::Query;
|
use crate::query::Query;
|
||||||
use crate::utils::{PatchReadParam, PatchWriteParam};
|
use crate::utils::{PatchReadParam, PatchWriteParam};
|
||||||
use crate::WriteMode;
|
use crate::WriteMode;
|
||||||
@@ -241,6 +242,8 @@ pub trait Table: std::fmt::Display + Send + Sync {
|
|||||||
/// Modeled after ``VACCUM`` in PostgreSQL.
|
/// Modeled after ``VACCUM`` in PostgreSQL.
|
||||||
/// Not all implementations support explicit optimization.
|
/// Not all implementations support explicit optimization.
|
||||||
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
|
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
|
||||||
|
|
||||||
|
fn merge_insert(&self) -> MergeInsertBuilder;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reference to a Table pointer.
|
/// Reference to a Table pointer.
|
||||||
@@ -698,6 +701,10 @@ impl Table for NativeTable {
|
|||||||
}
|
}
|
||||||
Ok(stats)
|
Ok(stats)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn merge_insert(&self) -> MergeInsertBuilder {
|
||||||
|
MergeInsertBuilder::new(Arc::new(self.clone()))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
Reference in New Issue
Block a user