docs: user guide for merge insert (#2083)

Closes #2062
This commit is contained in:
Will Jones
2025-01-31 10:03:21 -08:00
committed by GitHub
parent 555fa26147
commit dba85f4d6f
12 changed files with 474 additions and 32 deletions

View File

@@ -518,7 +518,7 @@ After a table has been created, you can always add more data to it using the `ad
--8<-- "python/python/tests/docs/test_guide_tables.py:add_table_from_polars"
```
=== "Async API"
```python
--8<-- "python/python/tests/docs/test_guide_tables.py:add_table_async_from_polars"
```
@@ -601,6 +601,38 @@ After a table has been created, you can always add more data to it using the `ad
)
```
## Upserting into a table
Upserting lets you insert new rows or update existing rows in a table. To upsert
in LanceDB, use the merge insert API.
=== "Python"
=== "Sync API"
```python
--8<-- "python/python/tests/docs/test_merge_insert.py:upsert_basic"
```
**API Reference**: [lancedb.table.Table.merge_insert][]
=== "Async API"
```python
--8<-- "python/python/tests/docs/test_merge_insert.py:upsert_basic_async"
```
**API Reference**: [lancedb.table.AsyncTable.merge_insert][]
=== "Typescript[^1]"
=== "@lancedb/lancedb"
```typescript
--8<-- "nodejs/examples/merge_insert.test.ts:upsert_basic"
```
**API Reference**: [lancedb.Table.mergeInsert](../js/classes/Table.md/#mergeInsert)
Read more in the guide on [merge insert](tables/merge_insert.md).
## Deleting from a table
Use the `delete()` method on tables to delete rows from a table. To choose which rows to delete, provide a filter that matches on the metadata columns. This can delete any number of rows that match the filter.
@@ -630,7 +662,7 @@ Use the `delete()` method on tables to delete rows from a table. To choose which
```python
--8<-- "python/python/tests/docs/test_guide_tables.py:delete_specific_row_async"
```
### Delete from a list of values
=== "Sync API"
@@ -838,7 +870,7 @@ a table:
You can add new columns to the table with the `add_columns` method. New columns
are filled with values based on a SQL expression. For example, you can add a new
column `y` to the table, fill it with the value of `x * 2` and set the expected
column `y` to the table, fill it with the value of `x * 2` and set the expected
data type for it.
=== "Python"

View File

@@ -0,0 +1,135 @@
The merge insert command is a flexible API that can be used to perform:
1. Upsert
2. Insert-if-not-exists
3. Replace range
It works by joining the input data with the target table on a key you provide.
Often this key is a unique row id key. You can then specify what to do when
there is a match and when there is not a match. For example, for upsert you want
to update if the row has a match and insert if the row doesn't have a match.
Whereas for insert-if-not-exists you only want to insert if the row doesn't have
a match.
You can also read more in the API reference:
* Python
* Sync: [lancedb.table.Table.merge_insert][]
* Async: [lancedb.table.AsyncTable.merge_insert][]
* Typescript: [lancedb.Table.mergeInsert](../../js/classes/Table.md/#mergeinsert)
!!! tip "Use scalar indices to speed up merge insert"
The merge insert command needs to perform a join between the input data and the
target table on the `on` key you provide. This requires scanning that entire
column, which can be expensive for large tables. To speed up this operation,
you can create a scalar index on the `on` column, which will allow LanceDB to
find matches without having to scan the whole tables.
Read more about scalar indices in [Building a Scalar Index](../scalar_index.md)
guide.
!!! info "Embedding Functions"
Like the create table and add APIs, the merge insert API will automatically
compute embeddings if the table has a embedding definition in its schema.
If the input data doesn't contain the source column, or the vector column
is already filled, then the embeddings won't be computed. See the
[Embedding Functions](../../embeddings/embedding_functions.md) guide for more
information.
## Upsert
Upsert updates rows if they exist and inserts them if they don't. To do this
with merge insert, enable both `when_matched_update_all()` and
`when_not_matched_insert_all()`.
=== "Python"
=== "Sync API"
```python
--8<-- "python/python/tests/docs/test_merge_insert.py:upsert_basic"
```
=== "Async API"
```python
--8<-- "python/python/tests/docs/test_merge_insert.py:upsert_basic_async"
```
=== "Typescript"
=== "@lancedb/lancedb"
```typescript
--8<-- "nodejs/examples/merge_insert.test.ts:upsert_basic"
```
!!! note "Providing subsets of columns"
If a column is nullable, it can be omitted from input data and it will be
considered `null`. Columns can also be provided in any order.
## Insert-if-not-exists
To avoid inserting duplicate rows, you can use the insert-if-not-exists command.
This will only insert rows that do not have a match in the target table. To do
this with merge insert, enable just `when_not_matched_insert_all()`.
=== "Python"
=== "Sync API"
```python
--8<-- "python/python/tests/docs/test_merge_insert.py:insert_if_not_exists"
```
=== "Async API"
```python
--8<-- "python/python/tests/docs/test_merge_insert.py:insert_if_not_exists_async"
```
=== "Typescript"
=== "@lancedb/lancedb"
```typescript
--8<-- "nodejs/examples/merge_insert.test.ts:insert_if_not_exists"
```
## Replace range
You can also replace a range of rows in the target table with the input data.
For example, if you have a table of document chunks, where each chunk has
both a `doc_id` and a `chunk_id`, you can replace all chunks for a given
`doc_id` with updated chunks. This can be tricky otherwise because if you
try to use upsert when the new data has fewer chunks you will end up with
extra chunks. To avoid this, add another clause to delete any chunks for
the document that are not in the new data, with
`when_not_matched_by_source_delete`.
=== "Python"
=== "Sync API"
```python
--8<-- "python/python/tests/docs/test_merge_insert.py:replace_range"
```
=== "Async API"
```python
--8<-- "python/python/tests/docs/test_merge_insert.py:replace_range_async"
```
=== "Typescript"
=== "@lancedb/lancedb"
```typescript
--8<-- "nodejs/examples/merge_insert.test.ts:replace_range"
```

View File

@@ -11,6 +11,7 @@ excluded_globs = [
"../src/examples/*.md",
"../src/integrations/*.md",
"../src/guides/tables.md",
"../src/guides/tables/merge_insert.md",
"../src/python/duckdb.md",
"../src/python/pandas_and_pyarrow.md",
"../src/python/polars_arrow.md",

View File

@@ -3,7 +3,7 @@
import { expect, test } from "@jest/globals";
// --8<-- [start:import]
import * as lancedb from "@lancedb/lancedb";
import { VectorQuery } from "@lancedb/lancedb";
import type { VectorQuery } from "@lancedb/lancedb";
// --8<-- [end:import]
import { withTempDirectory } from "./util.ts";

View File

@@ -117,26 +117,24 @@ test("basic table examples", async () => {
// --8<-- [end:add_data]
}
{
// --8<-- [start:add_columns]
await tbl.addColumns([
{ name: "double_price", valueSql: "cast((price * 2) as Float)" },
]);
// --8<-- [end:add_columns]
// --8<-- [start:alter_columns]
await tbl.alterColumns([
{
path: "double_price",
rename: "dbl_price",
dataType: "float",
nullable: true,
},
]);
// --8<-- [end:alter_columns]
// --8<-- [start:drop_columns]
await tbl.dropColumns(["dbl_price"]);
// --8<-- [end:drop_columns]
}
// --8<-- [start:add_columns]
await tbl.addColumns([
{ name: "double_price", valueSql: "cast((price * 2) as Float)" },
]);
// --8<-- [end:add_columns]
// --8<-- [start:alter_columns]
await tbl.alterColumns([
{
path: "double_price",
rename: "dbl_price",
dataType: "float",
nullable: true,
},
]);
// --8<-- [end:alter_columns]
// --8<-- [start:drop_columns]
await tbl.dropColumns(["dbl_price"]);
// --8<-- [end:drop_columns]
{
// --8<-- [start:vector_search]

View File

@@ -0,0 +1,52 @@
{
"$schema": "https://biomejs.dev/schemas/1.9.4/schema.json",
"vcs": {
"enabled": false,
"clientKind": "git",
"useIgnoreFile": false
},
"files": {
"ignoreUnknown": false,
"ignore": []
},
"formatter": {
"enabled": true,
"indentStyle": "space"
},
"organizeImports": {
"enabled": true
},
"linter": {
"enabled": true,
"rules": {
"recommended": true
}
},
"javascript": {
"formatter": {
"quoteStyle": "double"
}
},
"overrides": [
{
"include": ["*"],
"linter": {
"rules": {
"style": {
"noNonNullAssertion": "off"
}
}
}
},
{
"include": ["merge_insert.test.ts"],
"linter": {
"rules": {
"style": {
"useNamingConvention": "off"
}
}
}
}
]
}

View File

@@ -1,4 +1,7 @@
import { FeatureExtractionPipeline, pipeline } from "@huggingface/transformers";
import {
type FeatureExtractionPipeline,
pipeline,
} from "@huggingface/transformers";
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
import { expect, test } from "@jest/globals";

View File

@@ -0,0 +1,68 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
import { expect, test } from "@jest/globals";
import * as lancedb from "@lancedb/lancedb";
test("basic upsert", async () => {
const db = await lancedb.connect("memory://");
// --8<-- [start:upsert_basic]
const table = await db.createTable("users", [
{ id: 0, name: "Alice" },
{ id: 1, name: "Bob" },
]);
const newUsers = [
{ id: 1, name: "Bobby" },
{ id: 2, name: "Charlie" },
];
await table
.mergeInsert("id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute(newUsers);
await table.countRows(); // 3
// --8<-- [end:upsert_basic]
expect(await table.countRows()).toBe(3);
// --8<-- [start:insert_if_not_exists]
const table2 = await db.createTable("domains", [
{ domain: "google.com", name: "Google" },
{ domain: "github.com", name: "GitHub" },
]);
const newDomains = [
{ domain: "google.com", name: "Google" },
{ domain: "facebook.com", name: "Facebook" },
];
await table2
.mergeInsert("domain")
.whenNotMatchedInsertAll()
.execute(newDomains);
await table2.countRows(); // 3
// --8<-- [end:insert_if_not_exists]
expect(await table2.countRows()).toBe(3);
// --8<-- [start:replace_range]
const table3 = await db.createTable("chunks", [
{ doc_id: 0, chunk_id: 0, text: "Hello" },
{ doc_id: 0, chunk_id: 1, text: "World" },
{ doc_id: 1, chunk_id: 0, text: "Foo" },
{ doc_id: 1, chunk_id: 1, text: "Bar" },
]);
const newChunks = [{ doc_id: 1, chunk_id: 0, text: "Baz" }];
await table3
.mergeInsert(["doc_id", "chunk_id"])
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete({ where: "doc_id = 1" })
.execute(newChunks);
await table3.countRows("doc_id = 1"); // 1
// --8<-- [end:replace_range]
expect(await table3.countRows("doc_id = 1")).toBe(1);
});

View File

@@ -6,7 +6,7 @@ import { withTempDirectory } from "./util.ts";
import * as lancedb from "@lancedb/lancedb";
import "@lancedb/lancedb/embedding/transformers";
import { LanceSchema, getRegistry } from "@lancedb/lancedb/embedding";
import { EmbeddingFunction } from "@lancedb/lancedb/embedding";
import type { EmbeddingFunction } from "@lancedb/lancedb/embedding";
import { Utf8 } from "apache-arrow";
test("full text search", async () => {
@@ -58,6 +58,6 @@ test("full text search", async () => {
const query = "How many bones are in the human body?";
const actual = await tbl.search(query).limit(1).toArray();
expect(actual[0]["text"]).toBe("The human body has 206 bones.");
expect(actual[0].text).toBe("The human body has 206 bones.");
});
}, 100_000);

View File

@@ -1,8 +1,8 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
import * as fs from "fs";
import { tmpdir } from "os";
import * as path from "path";
import * as fs from "node:fs";
import { tmpdir } from "node:os";
import * as path from "node:path";
export async function withTempDirectory(
fn: (tempDir: string) => Promise<void>,

View File

@@ -830,7 +830,7 @@ class Table(ABC):
2 3 y
3 4 z
"""
on = [on] if isinstance(on, str) else list(on.iter())
on = [on] if isinstance(on, str) else list(iter(on))
return LanceMergeInsertBuilder(self, on)
@@ -2863,7 +2863,7 @@ class AsyncTable:
2 3 y
3 4 z
"""
on = [on] if isinstance(on, str) else list(on.iter())
on = [on] if isinstance(on, str) else list(iter(on))
return LanceMergeInsertBuilder(self, on)

View File

@@ -0,0 +1,153 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright The LanceDB Authors
import pytest
def test_upsert(mem_db):
db = mem_db
# --8<-- [start:upsert_basic]
table = db.create_table(
"users",
[
{"id": 0, "name": "Alice"},
{"id": 1, "name": "Bob"},
],
)
new_users = [
{"id": 1, "name": "Bobby"},
{"id": 2, "name": "Charlie"},
]
(
table.merge_insert("id")
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(new_users)
)
table.count_rows() # 3
# --8<-- [end:upsert_basic]
assert table.count_rows() == 3
@pytest.mark.asyncio
async def test_upsert_async(mem_db_async):
db = mem_db_async
# --8<-- [start:upsert_basic_async]
table = await db.create_table(
"users",
[
{"id": 0, "name": "Alice"},
{"id": 1, "name": "Bob"},
],
)
new_users = [
{"id": 1, "name": "Bobby"},
{"id": 2, "name": "Charlie"},
]
await (
table.merge_insert("id")
.when_matched_update_all()
.when_not_matched_insert_all()
.execute(new_users)
)
await table.count_rows() # 3
# --8<-- [end:upsert_basic_async]
assert await table.count_rows() == 3
def test_insert_if_not_exists(mem_db):
db = mem_db
# --8<-- [start:insert_if_not_exists]
table = db.create_table(
"domains",
[
{"domain": "google.com", "name": "Google"},
{"domain": "github.com", "name": "GitHub"},
],
)
new_domains = [
{"domain": "google.com", "name": "Google"},
{"domain": "facebook.com", "name": "Facebook"},
]
(table.merge_insert("domain").when_not_matched_insert_all().execute(new_domains))
table.count_rows() # 3
# --8<-- [end:insert_if_not_exists]
assert table.count_rows() == 3
@pytest.mark.asyncio
async def test_insert_if_not_exists_async(mem_db_async):
db = mem_db_async
# --8<-- [start:insert_if_not_exists_async]
table = await db.create_table(
"domains",
[
{"domain": "google.com", "name": "Google"},
{"domain": "github.com", "name": "GitHub"},
],
)
new_domains = [
{"domain": "google.com", "name": "Google"},
{"domain": "facebook.com", "name": "Facebook"},
]
await (
table.merge_insert("domain").when_not_matched_insert_all().execute(new_domains)
)
await table.count_rows() # 3
# --8<-- [end:insert_if_not_exists_async]
assert await table.count_rows() == 3
def test_replace_range(mem_db):
db = mem_db
# --8<-- [start:replace_range]
table = db.create_table(
"chunks",
[
{"doc_id": 0, "chunk_id": 0, "text": "Hello"},
{"doc_id": 0, "chunk_id": 1, "text": "World"},
{"doc_id": 1, "chunk_id": 0, "text": "Foo"},
{"doc_id": 1, "chunk_id": 1, "text": "Bar"},
],
)
new_chunks = [
{"doc_id": 1, "chunk_id": 0, "text": "Baz"},
]
(
table.merge_insert(["doc_id", "chunk_id"])
.when_matched_update_all()
.when_not_matched_insert_all()
.when_not_matched_by_source_delete("doc_id = 1")
.execute(new_chunks)
)
table.count_rows("doc_id = 1") # 1
# --8<-- [end:replace_range]
assert table.count_rows("doc_id = 1") == 1
@pytest.mark.asyncio
async def test_replace_range_async(mem_db_async):
db = mem_db_async
# --8<-- [start:replace_range_async]
table = await db.create_table(
"chunks",
[
{"doc_id": 0, "chunk_id": 0, "text": "Hello"},
{"doc_id": 0, "chunk_id": 1, "text": "World"},
{"doc_id": 1, "chunk_id": 0, "text": "Foo"},
{"doc_id": 1, "chunk_id": 1, "text": "Bar"},
],
)
new_chunks = [
{"doc_id": 1, "chunk_id": 0, "text": "Baz"},
]
await (
table.merge_insert(["doc_id", "chunk_id"])
.when_matched_update_all()
.when_not_matched_insert_all()
.when_not_matched_by_source_delete("doc_id = 1")
.execute(new_chunks)
)
await table.count_rows("doc_id = 1") # 1
# --8<-- [end:replace_range_async]
assert await table.count_rows("doc_id = 1") == 1