mirror of
https://github.com/lancedb/lancedb.git
synced 2026-01-04 19:02:58 +00:00
Compare commits
4 Commits
merge_inse
...
small-doc-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c3be2e3962 | ||
|
|
d77e95a4f4 | ||
|
|
62f053ac92 | ||
|
|
34e10caad2 |
2
.github/workflows/docs.yml
vendored
2
.github/workflows/docs.yml
vendored
@@ -29,7 +29,7 @@ jobs:
|
|||||||
- name: Checkout
|
- name: Checkout
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v4
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v4
|
uses: actions/setup-python@v5
|
||||||
with:
|
with:
|
||||||
python-version: "3.10"
|
python-version: "3.10"
|
||||||
cache: "pip"
|
cache: "pip"
|
||||||
|
|||||||
2
.github/workflows/docs_test.yml
vendored
2
.github/workflows/docs_test.yml
vendored
@@ -29,7 +29,7 @@ jobs:
|
|||||||
- name: Checkout
|
- name: Checkout
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v4
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v4
|
uses: actions/setup-python@v5
|
||||||
with:
|
with:
|
||||||
python-version: 3.11
|
python-version: 3.11
|
||||||
cache: "pip"
|
cache: "pip"
|
||||||
|
|||||||
6
.github/workflows/make-release-commit.yml
vendored
6
.github/workflows/make-release-commit.yml
vendored
@@ -37,10 +37,10 @@ jobs:
|
|||||||
run: |
|
run: |
|
||||||
git config user.name 'Lance Release'
|
git config user.name 'Lance Release'
|
||||||
git config user.email 'lance-dev@lancedb.com'
|
git config user.email 'lance-dev@lancedb.com'
|
||||||
- name: Set up Python 3.10
|
- name: Set up Python 3.11
|
||||||
uses: actions/setup-python@v4
|
uses: actions/setup-python@v5
|
||||||
with:
|
with:
|
||||||
python-version: "3.10"
|
python-version: "3.11"
|
||||||
- name: Bump version, create tag and commit
|
- name: Bump version, create tag and commit
|
||||||
run: |
|
run: |
|
||||||
pip install bump2version
|
pip install bump2version
|
||||||
|
|||||||
2
.github/workflows/pypi-publish.yml
vendored
2
.github/workflows/pypi-publish.yml
vendored
@@ -16,7 +16,7 @@ jobs:
|
|||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v4
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v4
|
uses: actions/setup-python@v5
|
||||||
with:
|
with:
|
||||||
python-version: "3.8"
|
python-version: "3.8"
|
||||||
- name: Build distribution
|
- name: Build distribution
|
||||||
|
|||||||
@@ -37,10 +37,10 @@ jobs:
|
|||||||
run: |
|
run: |
|
||||||
git config user.name 'Lance Release'
|
git config user.name 'Lance Release'
|
||||||
git config user.email 'lance-dev@lancedb.com'
|
git config user.email 'lance-dev@lancedb.com'
|
||||||
- name: Set up Python 3.10
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v4
|
uses: actions/setup-python@v5
|
||||||
with:
|
with:
|
||||||
python-version: "3.10"
|
python-version: "3.11"
|
||||||
- name: Bump version, create tag and commit
|
- name: Bump version, create tag and commit
|
||||||
working-directory: python
|
working-directory: python
|
||||||
run: |
|
run: |
|
||||||
|
|||||||
6
.github/workflows/python.yml
vendored
6
.github/workflows/python.yml
vendored
@@ -30,7 +30,7 @@ jobs:
|
|||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
lfs: true
|
lfs: true
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v4
|
uses: actions/setup-python@v5
|
||||||
with:
|
with:
|
||||||
python-version: 3.${{ matrix.python-minor-version }}
|
python-version: 3.${{ matrix.python-minor-version }}
|
||||||
- name: Install lancedb
|
- name: Install lancedb
|
||||||
@@ -69,7 +69,7 @@ jobs:
|
|||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
lfs: true
|
lfs: true
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v4
|
uses: actions/setup-python@v5
|
||||||
with:
|
with:
|
||||||
python-version: "3.11"
|
python-version: "3.11"
|
||||||
- name: Install lancedb
|
- name: Install lancedb
|
||||||
@@ -92,7 +92,7 @@ jobs:
|
|||||||
fetch-depth: 0
|
fetch-depth: 0
|
||||||
lfs: true
|
lfs: true
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v4
|
uses: actions/setup-python@v5
|
||||||
with:
|
with:
|
||||||
python-version: 3.9
|
python-version: 3.9
|
||||||
- name: Install lancedb
|
- name: Install lancedb
|
||||||
|
|||||||
@@ -11,11 +11,10 @@ license = "Apache-2.0"
|
|||||||
repository = "https://github.com/lancedb/lancedb"
|
repository = "https://github.com/lancedb/lancedb"
|
||||||
|
|
||||||
[workspace.dependencies]
|
[workspace.dependencies]
|
||||||
lance = { "version" = "=0.9.11", "features" = ["dynamodb"] }
|
lance = { "version" = "=0.9.10", "features" = ["dynamodb"] }
|
||||||
lance-datafusion = { "version" = "=0.9.11" }
|
lance-index = { "version" = "=0.9.10" }
|
||||||
lance-index = { "version" = "=0.9.11" }
|
lance-linalg = { "version" = "=0.9.10" }
|
||||||
lance-linalg = { "version" = "=0.9.11" }
|
lance-testing = { "version" = "=0.9.10" }
|
||||||
lance-testing = { "version" = "=0.9.11" }
|
|
||||||
# Note that this one does not include pyarrow
|
# Note that this one does not include pyarrow
|
||||||
arrow = { version = "50.0", optional = false }
|
arrow = { version = "50.0", optional = false }
|
||||||
arrow-array = "50.0"
|
arrow-array = "50.0"
|
||||||
|
|||||||
@@ -84,7 +84,7 @@ This guide will show how to create tables, insert data into them, and update the
|
|||||||
const table = await con.createTable(tableName, data, { writeMode: WriteMode.Overwrite })
|
const table = await con.createTable(tableName, data, { writeMode: WriteMode.Overwrite })
|
||||||
```
|
```
|
||||||
|
|
||||||
### From a Pandas DataFrame
|
### From a Pandas DataFrame
|
||||||
|
|
||||||
```python
|
```python
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
|||||||
@@ -58,6 +58,8 @@ pip install lancedb
|
|||||||
|
|
||||||
::: lancedb.schema.vector
|
::: lancedb.schema.vector
|
||||||
|
|
||||||
|
::: lancedb.merge.LanceMergeInsertBuilder
|
||||||
|
|
||||||
## Integrations
|
## Integrations
|
||||||
|
|
||||||
### Pydantic
|
### Pydantic
|
||||||
|
|||||||
@@ -17,7 +17,11 @@
|
|||||||
},
|
},
|
||||||
"repository": {
|
"repository": {
|
||||||
"type": "git",
|
"type": "git",
|
||||||
"url": "https://github.com/lancedb/lancedb/node"
|
"url": "https://github.com/lancedb/lancedb.git"
|
||||||
|
},
|
||||||
|
"homepage": "https://lancedb.github.io/lancedb/",
|
||||||
|
"bugs": {
|
||||||
|
"url": "https://github.com/lancedb/lancedb/issues"
|
||||||
},
|
},
|
||||||
"keywords": [
|
"keywords": [
|
||||||
"data-format",
|
"data-format",
|
||||||
|
|||||||
@@ -451,11 +451,6 @@ export interface Table<T = number[]> {
|
|||||||
indexStats: (indexUuid: string) => Promise<IndexStats>
|
indexStats: (indexUuid: string) => Promise<IndexStats>
|
||||||
|
|
||||||
filter(value: string): Query<T>
|
filter(value: string): Query<T>
|
||||||
|
|
||||||
/**
|
|
||||||
* TODO comment
|
|
||||||
*/
|
|
||||||
mergeInsert: () => MergeInsertBuilder
|
|
||||||
|
|
||||||
schema: Promise<Schema>
|
schema: Promise<Schema>
|
||||||
}
|
}
|
||||||
@@ -905,15 +900,6 @@ export class LocalTable<T = number[]> implements Table<T> {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mergeInsert: () => MergeInsertBuilder = () => {
|
|
||||||
return new MergeInsertBuilder(async (args: {
|
|
||||||
params: MergeInsertParams
|
|
||||||
data: Array<Record<string, unknown>> | ArrowTable
|
|
||||||
}) => {
|
|
||||||
throw new Error('Not implemented')
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface CleanupStats {
|
export interface CleanupStats {
|
||||||
@@ -1090,56 +1076,3 @@ export enum MetricType {
|
|||||||
*/
|
*/
|
||||||
Dot = 'dot',
|
Dot = 'dot',
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface MergeInsertParams {
|
|
||||||
whenMatchedUpdateAll: boolean
|
|
||||||
whenNotMatchedInsertAll: boolean
|
|
||||||
whenNotMatchedBySourceDelete: boolean
|
|
||||||
whenNotMatchedBySourceCondition: boolean
|
|
||||||
}
|
|
||||||
|
|
||||||
type MergeInsertCallback = (args: {
|
|
||||||
params: MergeInsertParams
|
|
||||||
data: Array<Record<string, unknown>> | ArrowTable
|
|
||||||
}) => Promise<void>
|
|
||||||
|
|
||||||
export class MergeInsertBuilder {
|
|
||||||
readonly #callback: MergeInsertCallback
|
|
||||||
readonly #params: MergeInsertParams
|
|
||||||
|
|
||||||
constructor (callback: MergeInsertCallback) {
|
|
||||||
this.#callback = callback
|
|
||||||
this.#params = {
|
|
||||||
whenMatchedUpdateAll: false,
|
|
||||||
whenNotMatchedInsertAll: false,
|
|
||||||
whenNotMatchedBySourceDelete: false,
|
|
||||||
whenNotMatchedBySourceCondition: false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
whenMatchedUpdateAll (): MergeInsertBuilder {
|
|
||||||
this.#params.whenMatchedUpdateAll = true
|
|
||||||
return this
|
|
||||||
}
|
|
||||||
|
|
||||||
whenNotMatchedInsertAll (): MergeInsertBuilder {
|
|
||||||
this.#params.whenNotMatchedInsertAll = true
|
|
||||||
return this
|
|
||||||
}
|
|
||||||
|
|
||||||
whenNotMatchedBySourceDelete (): MergeInsertBuilder {
|
|
||||||
this.#params.whenNotMatchedBySourceDelete = true
|
|
||||||
return this
|
|
||||||
}
|
|
||||||
|
|
||||||
whenNotMatchedBySourceCondition (): MergeInsertBuilder {
|
|
||||||
this.#params.whenNotMatchedBySourceCondition = true
|
|
||||||
return this
|
|
||||||
}
|
|
||||||
|
|
||||||
async execute ({ data }: {
|
|
||||||
data: Array<Record<string, unknown>> | ArrowTable
|
|
||||||
}): Promise<void> {
|
|
||||||
await this.#callback({ params: this.#params, data })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -120,7 +120,7 @@ export class HttpLancedbClient {
|
|||||||
public async post (
|
public async post (
|
||||||
path: string,
|
path: string,
|
||||||
data?: any,
|
data?: any,
|
||||||
params?: Record<string, string | number | boolean>,
|
params?: Record<string, string | number>,
|
||||||
content?: string | undefined
|
content?: string | undefined
|
||||||
): Promise<AxiosResponse> {
|
): Promise<AxiosResponse> {
|
||||||
const response = await axios.post(
|
const response = await axios.post(
|
||||||
|
|||||||
@@ -24,8 +24,6 @@ import {
|
|||||||
type IndexStats,
|
type IndexStats,
|
||||||
type UpdateArgs,
|
type UpdateArgs,
|
||||||
type UpdateSqlArgs,
|
type UpdateSqlArgs,
|
||||||
type MergeInsertParams,
|
|
||||||
MergeInsertBuilder,
|
|
||||||
makeArrowTable
|
makeArrowTable
|
||||||
} from '../index'
|
} from '../index'
|
||||||
import { Query } from '../query'
|
import { Query } from '../query'
|
||||||
@@ -426,36 +424,4 @@ export class RemoteTable<T = number[]> implements Table<T> {
|
|||||||
numUnindexedRows: results.data.num_unindexed_rows
|
numUnindexedRows: results.data.num_unindexed_rows
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mergeInsert: () => MergeInsertBuilder = () => {
|
|
||||||
return new MergeInsertBuilder(async ({ data, params }: {
|
|
||||||
params: MergeInsertParams
|
|
||||||
data: Array<Record<string, unknown>> | ArrowTable
|
|
||||||
}) => {
|
|
||||||
// TODO -- uncomment this this
|
|
||||||
// let tbl: ArrowTable
|
|
||||||
// if (data instanceof ArrowTable) {
|
|
||||||
// tbl = data
|
|
||||||
// } else {
|
|
||||||
// tbl = makeArrowTable(data, await this.schema)
|
|
||||||
// }
|
|
||||||
const tbl = data as ArrowTable
|
|
||||||
|
|
||||||
const buffer = await fromTableToStreamBuffer(tbl, this._embeddings)
|
|
||||||
|
|
||||||
console.log({ buffer })
|
|
||||||
|
|
||||||
await this._client.post(
|
|
||||||
`/v1/table/${this._name}/merge_insert/`,
|
|
||||||
buffer,
|
|
||||||
{
|
|
||||||
when_matched_update_all: params.whenMatchedUpdateAll,
|
|
||||||
when_not_matched_insert_all: params.whenNotMatchedInsertAll,
|
|
||||||
when_not_matched_by_source_delete: params.whenNotMatchedBySourceDelete,
|
|
||||||
when_not_matched_by_source_condition: params.whenNotMatchedBySourceCondition
|
|
||||||
},
|
|
||||||
'application/vnd.apache.arrow.stream'
|
|
||||||
)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
86
python/lancedb/merge.py
Normal file
86
python/lancedb/merge.py
Normal file
@@ -0,0 +1,86 @@
|
|||||||
|
# Copyright 2023 LanceDB Developers
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import TYPE_CHECKING, Iterable, Optional
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from .common import DATA
|
||||||
|
|
||||||
|
|
||||||
|
class LanceMergeInsertBuilder(object):
|
||||||
|
"""Builder for a LanceDB merge insert operation
|
||||||
|
|
||||||
|
See [`merge_insert`][lancedb.table.Table.merge_insert] for
|
||||||
|
more context
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, table: "Table", on: Iterable[str]): # noqa: F821
|
||||||
|
# Do not put a docstring here. This method should be hidden
|
||||||
|
# from API docs. Users should use merge_insert to create
|
||||||
|
# this object.
|
||||||
|
self._table = table
|
||||||
|
self._on = on
|
||||||
|
self._when_matched_update_all = False
|
||||||
|
self._when_not_matched_insert_all = False
|
||||||
|
self._when_not_matched_by_source_delete = False
|
||||||
|
self._when_not_matched_by_source_condition = None
|
||||||
|
|
||||||
|
def when_matched_update_all(self) -> LanceMergeInsertBuilder:
|
||||||
|
"""
|
||||||
|
Rows that exist in both the source table (new data) and
|
||||||
|
the target table (old data) will be updated, replacing
|
||||||
|
the old row with the corresponding matching row.
|
||||||
|
|
||||||
|
If there are multiple matches then the behavior is undefined.
|
||||||
|
Currently this causes multiple copies of the row to be created
|
||||||
|
but that behavior is subject to change.
|
||||||
|
"""
|
||||||
|
self._when_matched_update_all = True
|
||||||
|
return self
|
||||||
|
|
||||||
|
def when_not_matched_insert_all(self) -> LanceMergeInsertBuilder:
|
||||||
|
"""
|
||||||
|
Rows that exist only in the source table (new data) should
|
||||||
|
be inserted into the target table.
|
||||||
|
"""
|
||||||
|
self._when_not_matched_insert_all = True
|
||||||
|
return self
|
||||||
|
|
||||||
|
def when_not_matched_by_source_delete(
|
||||||
|
self, condition: Optional[str] = None
|
||||||
|
) -> LanceMergeInsertBuilder:
|
||||||
|
"""
|
||||||
|
Rows that exist only in the target table (old data) will be
|
||||||
|
deleted. An optional condition can be provided to limit what
|
||||||
|
data is deleted.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
condition: Optional[str], default None
|
||||||
|
If None then all such rows will be deleted. Otherwise the
|
||||||
|
condition will be used as an SQL filter to limit what rows
|
||||||
|
are deleted.
|
||||||
|
"""
|
||||||
|
self._when_not_matched_by_source_delete = True
|
||||||
|
if condition is not None:
|
||||||
|
self._when_not_matched_by_source_condition = condition
|
||||||
|
return self
|
||||||
|
|
||||||
|
def execute(self, new_data: DATA):
|
||||||
|
"""
|
||||||
|
Executes the merge insert operation
|
||||||
|
|
||||||
|
Nothing is returned but the [`Table`][lancedb.table.Table] is updated
|
||||||
|
"""
|
||||||
|
self._table._do_merge(self, new_data)
|
||||||
@@ -244,6 +244,10 @@ class RemoteTable(Table):
|
|||||||
result = self._conn._client.query(self._name, query)
|
result = self._conn._client.query(self._name, query)
|
||||||
return result.to_arrow()
|
return result.to_arrow()
|
||||||
|
|
||||||
|
def _do_merge(self, *_args):
|
||||||
|
"""_do_merge() is not supported on the LanceDB cloud yet"""
|
||||||
|
return NotImplementedError("_do_merge() is not supported on the LanceDB cloud")
|
||||||
|
|
||||||
def delete(self, predicate: str):
|
def delete(self, predicate: str):
|
||||||
"""Delete rows from the table.
|
"""Delete rows from the table.
|
||||||
|
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ from lance.vector import vec_to_table
|
|||||||
|
|
||||||
from .common import DATA, VEC, VECTOR_COLUMN_NAME
|
from .common import DATA, VEC, VECTOR_COLUMN_NAME
|
||||||
from .embeddings import EmbeddingFunctionConfig, EmbeddingFunctionRegistry
|
from .embeddings import EmbeddingFunctionConfig, EmbeddingFunctionRegistry
|
||||||
|
from .merge import LanceMergeInsertBuilder
|
||||||
from .pydantic import LanceModel, model_to_dict
|
from .pydantic import LanceModel, model_to_dict
|
||||||
from .query import LanceQueryBuilder, Query
|
from .query import LanceQueryBuilder, Query
|
||||||
from .util import (
|
from .util import (
|
||||||
@@ -334,6 +335,64 @@ class Table(ABC):
|
|||||||
"""
|
"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def merge_insert(self, on: Union[str, Iterable[str]]) -> LanceMergeInsertBuilder:
|
||||||
|
"""
|
||||||
|
Returns a [`LanceMergeInsertBuilder`][lancedb.merge.LanceMergeInsertBuilder]
|
||||||
|
that can be used to create a "merge insert" operation
|
||||||
|
|
||||||
|
This operation can add rows, update rows, and remove rows all in a single
|
||||||
|
transaction. It is a very generic tool that can be used to create
|
||||||
|
behaviors like "insert if not exists", "update or insert (i.e. upsert)",
|
||||||
|
or even replace a portion of existing data with new data (e.g. replace
|
||||||
|
all data where month="january")
|
||||||
|
|
||||||
|
The merge insert operation works by combining new data from a
|
||||||
|
**source table** with existing data in a **target table** by using a
|
||||||
|
join. There are three categories of records.
|
||||||
|
|
||||||
|
"Matched" records are records that exist in both the source table and
|
||||||
|
the target table. "Not matched" records exist only in the source table
|
||||||
|
(e.g. these are new data) "Not matched by source" records exist only
|
||||||
|
in the target table (this is old data)
|
||||||
|
|
||||||
|
The builder returned by this method can be used to customize what
|
||||||
|
should happen for each category of data.
|
||||||
|
|
||||||
|
Please note that the data may appear to be reordered as part of this
|
||||||
|
operation. This is because updated rows will be deleted from the
|
||||||
|
dataset and then reinserted at the end with the new values.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
|
||||||
|
on: Union[str, Iterable[str]]
|
||||||
|
A column (or columns) to join on. This is how records from the
|
||||||
|
source table and target table are matched. Typically this is some
|
||||||
|
kind of key or id column.
|
||||||
|
|
||||||
|
Examples
|
||||||
|
--------
|
||||||
|
>>> import lancedb
|
||||||
|
>>> data = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]})
|
||||||
|
>>> db = lancedb.connect("./.lancedb")
|
||||||
|
>>> table = db.create_table("my_table", data)
|
||||||
|
>>> new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
|
||||||
|
>>> # Perform a "upsert" operation
|
||||||
|
>>> table.merge_insert("a") \\
|
||||||
|
... .when_matched_update_all() \\
|
||||||
|
... .when_not_matched_insert_all() \\
|
||||||
|
... .execute(new_data)
|
||||||
|
>>> # The order of new rows is non-deterministic since we use
|
||||||
|
>>> # a hash-join as part of this operation and so we sort here
|
||||||
|
>>> table.to_arrow().sort_by("a").to_pandas()
|
||||||
|
a b
|
||||||
|
0 1 b
|
||||||
|
1 2 x
|
||||||
|
2 3 y
|
||||||
|
3 4 z
|
||||||
|
"""
|
||||||
|
return LanceMergeInsertBuilder(self, on)
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def search(
|
def search(
|
||||||
self,
|
self,
|
||||||
@@ -414,6 +473,16 @@ class Table(ABC):
|
|||||||
def _execute_query(self, query: Query) -> pa.Table:
|
def _execute_query(self, query: Query) -> pa.Table:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _do_merge(
|
||||||
|
self,
|
||||||
|
merge: LanceMergeInsertBuilder,
|
||||||
|
new_data: DATA,
|
||||||
|
*,
|
||||||
|
schema: Optional[pa.Schema] = None,
|
||||||
|
):
|
||||||
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def delete(self, where: str):
|
def delete(self, where: str):
|
||||||
"""Delete rows from the table.
|
"""Delete rows from the table.
|
||||||
@@ -1196,6 +1265,18 @@ class LanceTable(Table):
|
|||||||
with_row_id=query.with_row_id,
|
with_row_id=query.with_row_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _do_merge(self, merge: LanceMergeInsertBuilder, new_data: DATA, *, schema=None):
|
||||||
|
ds = self.to_lance()
|
||||||
|
builder = ds.merge_insert(merge._on)
|
||||||
|
if merge._when_matched_update_all:
|
||||||
|
builder.when_matched_update_all()
|
||||||
|
if merge._when_not_matched_insert_all:
|
||||||
|
builder.when_not_matched_insert_all()
|
||||||
|
if merge._when_not_matched_by_source_delete:
|
||||||
|
cond = merge._when_not_matched_by_source_condition
|
||||||
|
builder.when_not_matched_by_source_delete(cond)
|
||||||
|
builder.execute(new_data, schema=schema)
|
||||||
|
|
||||||
def cleanup_old_versions(
|
def cleanup_old_versions(
|
||||||
self,
|
self,
|
||||||
older_than: Optional[timedelta] = None,
|
older_than: Optional[timedelta] = None,
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ name = "lancedb"
|
|||||||
version = "0.5.1"
|
version = "0.5.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"deprecation",
|
"deprecation",
|
||||||
"pylance==0.9.10",
|
"pylance==0.9.11",
|
||||||
"ratelimiter~=1.0",
|
"ratelimiter~=1.0",
|
||||||
"retry>=0.9.2",
|
"retry>=0.9.2",
|
||||||
"tqdm>=4.27.0",
|
"tqdm>=4.27.0",
|
||||||
|
|||||||
@@ -493,6 +493,62 @@ def test_update_types(db):
|
|||||||
assert actual == expected
|
assert actual == expected
|
||||||
|
|
||||||
|
|
||||||
|
def test_merge_insert(db):
|
||||||
|
table = LanceTable.create(
|
||||||
|
db,
|
||||||
|
"my_table",
|
||||||
|
data=pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}),
|
||||||
|
)
|
||||||
|
assert len(table) == 3
|
||||||
|
version = table.version
|
||||||
|
|
||||||
|
new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
|
||||||
|
|
||||||
|
# upsert
|
||||||
|
table.merge_insert(
|
||||||
|
"a"
|
||||||
|
).when_matched_update_all().when_not_matched_insert_all().execute(new_data)
|
||||||
|
|
||||||
|
expected = pa.table({"a": [1, 2, 3, 4], "b": ["a", "x", "y", "z"]})
|
||||||
|
# These `sort_by` calls can be removed once lance#1892
|
||||||
|
# is merged (it fixes the ordering)
|
||||||
|
assert table.to_arrow().sort_by("a") == expected
|
||||||
|
|
||||||
|
table.restore(version)
|
||||||
|
|
||||||
|
# insert-if-not-exists
|
||||||
|
table.merge_insert("a").when_not_matched_insert_all().execute(new_data)
|
||||||
|
|
||||||
|
expected = pa.table({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "z"]})
|
||||||
|
assert table.to_arrow().sort_by("a") == expected
|
||||||
|
|
||||||
|
table.restore(version)
|
||||||
|
|
||||||
|
new_data = pa.table({"a": [2, 4], "b": ["x", "z"]})
|
||||||
|
|
||||||
|
# replace-range
|
||||||
|
table.merge_insert(
|
||||||
|
"a"
|
||||||
|
).when_matched_update_all().when_not_matched_insert_all().when_not_matched_by_source_delete(
|
||||||
|
"a > 2"
|
||||||
|
).execute(new_data)
|
||||||
|
|
||||||
|
expected = pa.table({"a": [1, 2, 4], "b": ["a", "x", "z"]})
|
||||||
|
assert table.to_arrow().sort_by("a") == expected
|
||||||
|
|
||||||
|
table.restore(version)
|
||||||
|
|
||||||
|
# replace-range no condition
|
||||||
|
table.merge_insert(
|
||||||
|
"a"
|
||||||
|
).when_matched_update_all().when_not_matched_insert_all().when_not_matched_by_source_delete().execute(
|
||||||
|
new_data
|
||||||
|
)
|
||||||
|
|
||||||
|
expected = pa.table({"a": [2, 4], "b": ["x", "z"]})
|
||||||
|
assert table.to_arrow().sort_by("a") == expected
|
||||||
|
|
||||||
|
|
||||||
def test_create_with_embedding_function(db):
|
def test_create_with_embedding_function(db):
|
||||||
class MyTable(LanceModel):
|
class MyTable(LanceModel):
|
||||||
text: str
|
text: str
|
||||||
|
|||||||
@@ -22,7 +22,6 @@ object_store = { workspace = true }
|
|||||||
snafu = { workspace = true }
|
snafu = { workspace = true }
|
||||||
half = { workspace = true }
|
half = { workspace = true }
|
||||||
lance = { workspace = true }
|
lance = { workspace = true }
|
||||||
lance-datafusion = { workspace = true }
|
|
||||||
lance-index = { workspace = true }
|
lance-index = { workspace = true }
|
||||||
lance-linalg = { workspace = true }
|
lance-linalg = { workspace = true }
|
||||||
lance-testing = { workspace = true }
|
lance-testing = { workspace = true }
|
||||||
|
|||||||
@@ -175,7 +175,6 @@ pub mod error;
|
|||||||
pub mod index;
|
pub mod index;
|
||||||
pub mod io;
|
pub mod io;
|
||||||
pub mod ipc;
|
pub mod ipc;
|
||||||
pub mod merge_insert;
|
|
||||||
pub mod query;
|
pub mod query;
|
||||||
pub mod table;
|
pub mod table;
|
||||||
pub mod utils;
|
pub mod utils;
|
||||||
|
|||||||
@@ -1,101 +0,0 @@
|
|||||||
// Copyright 2024 Lance Developers.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
use std::pin::Pin;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use arrow_array::RecordBatchReader;
|
|
||||||
use lance::dataset::{self, WhenMatched, WhenNotMatched, WhenNotMatchedBySource};
|
|
||||||
use lance_datafusion::utils::reader_to_stream;
|
|
||||||
|
|
||||||
use crate::TableRef;
|
|
||||||
use crate::error::{Error, Result};
|
|
||||||
|
|
||||||
pub struct MergeInsertBuilder {
|
|
||||||
table: TableRef,
|
|
||||||
when_matched_update_all: bool,
|
|
||||||
when_not_matched_insert_all: bool,
|
|
||||||
when_not_matched_by_source_delete: bool,
|
|
||||||
when_not_matched_by_source_condition: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MergeInsertBuilder {
|
|
||||||
pub(crate) fn new(table: TableRef) -> Self {
|
|
||||||
Self {
|
|
||||||
table,
|
|
||||||
when_matched_update_all: false,
|
|
||||||
when_not_matched_insert_all: false,
|
|
||||||
when_not_matched_by_source_delete: false,
|
|
||||||
when_not_matched_by_source_condition: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn when_matched_update_all(mut self) -> Self {
|
|
||||||
self.when_matched_update_all = true;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn when_not_matched_insert_all(mut self) -> Self {
|
|
||||||
self.when_not_matched_insert_all = true;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn when_not_matched_by_source_delete(mut self) -> Self {
|
|
||||||
self.when_not_matched_by_source_delete = true;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn when_not_matched_by_source_condition(mut self) -> Self {
|
|
||||||
self.when_not_matched_by_source_condition = true;
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn execute(
|
|
||||||
mut self,
|
|
||||||
batches: Box<dyn RecordBatchReader + Send>,
|
|
||||||
) -> Result<()> {
|
|
||||||
let native_table = self.table.as_native().unwrap(); // TODO no unwrap
|
|
||||||
let ds = native_table.clone_inner_dataset();
|
|
||||||
let mut builder = dataset::MergeInsertBuilder::try_new(
|
|
||||||
Arc::new(ds),
|
|
||||||
vec!["vectors".to_string()],
|
|
||||||
)
|
|
||||||
.unwrap(); // TODO no unwrap
|
|
||||||
|
|
||||||
if self.when_matched_update_all {
|
|
||||||
builder.when_matched(WhenMatched::UpdateAll);
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.when_not_matched_insert_all {
|
|
||||||
builder.when_not_matched(WhenNotMatched::InsertAll);
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.when_not_matched_by_source_delete {
|
|
||||||
builder.when_not_matched_by_source(WhenNotMatchedBySource::Delete);
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO
|
|
||||||
// if self.when_not_matched_by_source_condition {
|
|
||||||
// builder.when_not_matched_by_source(WhenNotMatchedBySource::DeleteIf(()));
|
|
||||||
// }
|
|
||||||
|
|
||||||
let job = builder.try_build().unwrap(); // TODO no unwrap
|
|
||||||
let batches = reader_to_stream(batches).await.unwrap().0; // TODO no unwrap
|
|
||||||
let ds2 = job.execute(batches).await.unwrap(); // TODO no unwrap
|
|
||||||
|
|
||||||
native_table.reset_dataset(ds2.as_ref().clone());
|
|
||||||
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -34,7 +34,6 @@ use log::info;
|
|||||||
use crate::error::{Error, Result};
|
use crate::error::{Error, Result};
|
||||||
use crate::index::vector::{VectorIndex, VectorIndexStatistics};
|
use crate::index::vector::{VectorIndex, VectorIndexStatistics};
|
||||||
use crate::index::IndexBuilder;
|
use crate::index::IndexBuilder;
|
||||||
use crate::merge_insert::MergeInsertBuilder;
|
|
||||||
use crate::query::Query;
|
use crate::query::Query;
|
||||||
use crate::utils::{PatchReadParam, PatchWriteParam};
|
use crate::utils::{PatchReadParam, PatchWriteParam};
|
||||||
use crate::WriteMode;
|
use crate::WriteMode;
|
||||||
@@ -242,8 +241,6 @@ pub trait Table: std::fmt::Display + Send + Sync {
|
|||||||
/// Modeled after ``VACCUM`` in PostgreSQL.
|
/// Modeled after ``VACCUM`` in PostgreSQL.
|
||||||
/// Not all implementations support explicit optimization.
|
/// Not all implementations support explicit optimization.
|
||||||
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
|
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
|
||||||
|
|
||||||
fn merge_insert(&self) -> MergeInsertBuilder;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reference to a Table pointer.
|
/// Reference to a Table pointer.
|
||||||
@@ -701,10 +698,6 @@ impl Table for NativeTable {
|
|||||||
}
|
}
|
||||||
Ok(stats)
|
Ok(stats)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn merge_insert(&self) -> MergeInsertBuilder {
|
|
||||||
MergeInsertBuilder::new(Arc::new(self.clone()))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
Reference in New Issue
Block a user