Compare commits

...

12 Commits

Author SHA1 Message Date
Lance Release
ce2242e06d [python] Bump version: 0.5.1 → 0.5.2 2024-02-02 21:33:02 +00:00
Weston Pace
778339388a chore: bump pylance version to latest in pyproject.toml (#918) 2024-02-02 13:32:12 -08:00
Weston Pace
7f8637a0b4 feat: add merge_insert to the node and rust APIs (#915) 2024-02-02 13:16:51 -08:00
QianZhu
09cd08222d make it explicit about the vector column data type (#916)
<img width="837" alt="Screenshot 2024-02-01 at 4 23 34 PM"
src="https://github.com/lancedb/lancedb/assets/1305083/4f0f5c5a-2a24-4b00-aad1-ef80a593d964">
[
<img width="838" alt="Screenshot 2024-02-01 at 4 26 03 PM"
src="https://github.com/lancedb/lancedb/assets/1305083/ca073bc8-b518-4be3-811d-8a7184416f07">
](url)

---------

Co-authored-by: Weston Pace <weston.pace@gmail.com>
2024-02-02 09:02:02 -08:00
Bert
a248d7feec fix: add request retry to python client (#917)
Adds capability to the remote python SDK to retry requests (fixes #911)

This can be configured through environment:
- `LANCE_CLIENT_MAX_RETRIES`= total number of retries. Set to 0 to
disable retries. default = 3
- `LANCE_CLIENT_CONNECT_RETRIES` = number of times to retry request in
case of TCP connect failure. default = 3
- `LANCE_CLIENT_READ_RETRIES` = number of times to retry request in case
of HTTP request failure. default = 3
- `LANCE_CLIENT_RETRY_STATUSES` = http statuses for which the request
will be retried. passed as comma separated list of ints. default `500,
502, 503`
- `LANCE_CLIENT_RETRY_BACKOFF_FACTOR` = controls time between retry
requests. see
[here](23f2287eb5/src/urllib3/util/retry.py (L141-L146)).
default = 0.25

Only read requests will be retried:
- list table names
- query
- describe table
- list table indices

This does not add retry capabilities for writes as it could possibly
cause issues in the case where the retried write isn't idempotent. For
example, in the case where the LB times-out the request but the server
completes the request anyway, we might not want to blindly retry an
insert request.
2024-02-02 11:27:29 -05:00
Weston Pace
cc9473a94a docs: add cleanup_old_versions and compact_files to Table for documentation purposes (#900)
Closes #819
2024-02-01 15:06:00 -08:00
Weston Pace
d77e95a4f4 feat: upgrade to lance 0.9.11 and expose merge_insert (#906)
This adds the python bindings requested in #870 The javascript/rust
bindings will be added in a future PR.
2024-02-01 11:36:29 -08:00
Lei Xu
62f053ac92 ci: bump to new version of python action to use node 20 gIthub action runtime (#909)
Github action is deprecating old node-16 runtime.
2024-02-01 11:36:03 -08:00
JacobLinCool
34e10caad2 fix the repo link on npm, add links for homepage and bug report (#910)
- fix the repo link on npm
- add links for homepage and bug report
2024-01-31 21:07:11 -08:00
QianZhu
f5726e2d0c arrow table/f16 example (#907) 2024-01-31 14:41:28 -08:00
Lance Release
12b4fb42fc Updating package-lock.json 2024-01-31 21:18:24 +00:00
Lance Release
1328cd46f1 Updating package-lock.json 2024-01-31 20:29:38 +00:00
29 changed files with 1131 additions and 156 deletions

View File

@@ -29,7 +29,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: "3.10"
cache: "pip"

View File

@@ -29,7 +29,7 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: 3.11
cache: "pip"

View File

@@ -37,10 +37,10 @@ jobs:
run: |
git config user.name 'Lance Release'
git config user.email 'lance-dev@lancedb.com'
- name: Set up Python 3.10
uses: actions/setup-python@v4
- name: Set up Python 3.11
uses: actions/setup-python@v5
with:
python-version: "3.10"
python-version: "3.11"
- name: Bump version, create tag and commit
run: |
pip install bump2version

View File

@@ -16,7 +16,7 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: "3.8"
- name: Build distribution

View File

@@ -37,10 +37,10 @@ jobs:
run: |
git config user.name 'Lance Release'
git config user.email 'lance-dev@lancedb.com'
- name: Set up Python 3.10
uses: actions/setup-python@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.10"
python-version: "3.11"
- name: Bump version, create tag and commit
working-directory: python
run: |

View File

@@ -30,7 +30,7 @@ jobs:
fetch-depth: 0
lfs: true
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: 3.${{ matrix.python-minor-version }}
- name: Install lancedb
@@ -69,7 +69,7 @@ jobs:
fetch-depth: 0
lfs: true
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install lancedb
@@ -92,7 +92,7 @@ jobs:
fetch-depth: 0
lfs: true
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: 3.9
- name: Install lancedb

View File

@@ -11,10 +11,10 @@ license = "Apache-2.0"
repository = "https://github.com/lancedb/lancedb"
[workspace.dependencies]
lance = { "version" = "=0.9.10", "features" = ["dynamodb"] }
lance-index = { "version" = "=0.9.10" }
lance-linalg = { "version" = "=0.9.10" }
lance-testing = { "version" = "=0.9.10" }
lance = { "version" = "=0.9.12", "features" = ["dynamodb"] }
lance-index = { "version" = "=0.9.12" }
lance-linalg = { "version" = "=0.9.12" }
lance-testing = { "version" = "=0.9.12" }
# Note that this one does not include pyarrow
arrow = { version = "50.0", optional = false }
arrow-array = "50.0"

View File

@@ -1,6 +1,6 @@
// --8<-- [start:import]
import * as lancedb from "vectordb";
import { Schema, Field, Float32, FixedSizeList, Int32 } from "apache-arrow";
import { Schema, Field, Float32, FixedSizeList, Int32, Float16 } from "apache-arrow";
// --8<-- [end:import]
import * as fs from "fs";
import { Table as ArrowTable, Utf8 } from "apache-arrow";
@@ -8,6 +8,7 @@ import { Table as ArrowTable, Utf8 } from "apache-arrow";
const example = async () => {
fs.rmSync("data/sample-lancedb", { recursive: true, force: true });
// --8<-- [start:open_db]
const lancedb = require("vectordb");
const uri = "data/sample-lancedb";
const db = await lancedb.connect(uri);
// --8<-- [end:open_db]
@@ -48,6 +49,27 @@ const example = async () => {
const empty_tbl = await db.createTable({ name: "empty_table", schema });
// --8<-- [end:create_empty_table]
// --8<-- [start:create_f16_table]
const dim = 16
const total = 10
const f16_schema = new Schema([
new Field('id', new Int32()),
new Field(
'vector',
new FixedSizeList(dim, new Field('item', new Float16(), true)),
false
)
])
const data = lancedb.makeArrowTable(
Array.from(Array(total), (_, i) => ({
id: i,
vector: Array.from(Array(dim), Math.random)
})),
{ f16_schema }
)
const table = await db.createTable('f16_tbl', data)
// --8<-- [end:create_f16_table]
// --8<-- [start:search]
const query = await tbl.search([100, 100]).limit(2).execute();
// --8<-- [end:search]

View File

@@ -16,9 +16,22 @@ This guide will show how to create tables, insert data into them, and update the
db = lancedb.connect("./.lancedb")
```
=== "Javascript"
Initialize a VectorDB connection and create a table using one of the many methods listed below.
```javascript
const lancedb = require("vectordb");
const uri = "data/sample-lancedb";
const db = await lancedb.connect(uri);
```
LanceDB allows ingesting data from various sources - `dict`, `list[dict]`, `pd.DataFrame`, `pa.Table` or a `Iterator[pa.RecordBatch]`. Let's take a look at some of the these.
### From list of tuples or dictionaries
### From list of tuples or dictionaries
=== "Python"
```python
import lancedb
@@ -32,7 +45,6 @@ This guide will show how to create tables, insert data into them, and update the
db["my_table"].head()
```
!!! info "Note"
If the table already exists, LanceDB will raise an error by default.
@@ -51,6 +63,27 @@ This guide will show how to create tables, insert data into them, and update the
db.create_table("name", data, mode="overwrite")
```
=== "Javascript"
You can create a LanceDB table in JavaScript using an array of JSON records as follows.
```javascript
const tb = await db.createTable("my_table", [{
"vector": [3.1, 4.1],
"item": "foo",
"price": 10.0
}, {
"vector": [5.9, 26.5],
"item": "bar",
"price": 20.0
}]);
```
!!! info "Note"
If the table already exists, LanceDB will raise an error by default. If you want to overwrite the table, you need to specify the `WriteMode` in the createTable function.
```javascript
const table = await con.createTable(tableName, data, { writeMode: WriteMode.Overwrite })
```
### From a Pandas DataFrame
```python
@@ -67,7 +100,9 @@ This guide will show how to create tables, insert data into them, and update the
db["my_table"].head()
```
!!! info "Note"
Data is converted to Arrow before being written to disk. For maximum control over how data is saved, either provide the PyArrow schema to convert to or else provide a PyArrow Table directly.
Data is converted to Arrow before being written to disk. For maximum control over how data is saved, either provide the PyArrow schema to convert to or else provide a PyArrow Table directly.
The **`vector`** column needs to be a [Vector](../python/pydantic.md#vector-field) (defined as [pyarrow.FixedSizeList](https://arrow.apache.org/docs/python/generated/pyarrow.list_.html)) type.
```python
custom_schema = pa.schema([
@@ -79,7 +114,7 @@ This guide will show how to create tables, insert data into them, and update the
table = db.create_table("my_table", data, schema=custom_schema)
```
### From a Polars DataFrame
### From a Polars DataFrame
LanceDB supports [Polars](https://pola.rs/), a modern, fast DataFrame library
written in Rust. Just like in Pandas, the Polars integration is enabled by PyArrow
@@ -97,26 +132,44 @@ This guide will show how to create tables, insert data into them, and update the
table = db.create_table("pl_table", data=data)
```
### From PyArrow Tables
You can also create LanceDB tables directly from PyArrow tables
### From an Arrow Table
=== "Python"
You can also create LanceDB tables directly from Arrow tables.
LanceDB supports float16 data type!
```python
table = pa.Table.from_arrays(
[
pa.array([[3.1, 4.1, 5.1, 6.1], [5.9, 26.5, 4.7, 32.8]],
pa.list_(pa.float32(), 4)),
pa.array(["foo", "bar"]),
pa.array([10.0, 20.0]),
],
["vector", "item", "price"],
)
import pyarrows as pa
import numpy as np
dim = 16
total = 2
schema = pa.schema(
[
pa.field("vector", pa.list_(pa.float16(), dim)),
pa.field("text", pa.string())
]
)
data = pa.Table.from_arrays(
[
pa.array([np.random.randn(dim).astype(np.float16) for _ in range(total)],
pa.list_(pa.float16(), dim)),
pa.array(["foo", "bar"])
],
["vector", "text"],
)
tbl = db.create_table("f16_tbl", data, schema=schema)
```
db = lancedb.connect("db")
=== "Javascript"
You can also create LanceDB tables directly from Arrow tables.
LanceDB supports Float16 data type!
tbl = db.create_table("my_table", table)
```javascript
--8<-- "docs/src/basic_legacy.ts:create_f16_table"
```
### From Pydantic Models
When you create an empty table without data, you must specify the table schema.
LanceDB supports creating tables by specifying a PyArrow schema or a specialized
Pydantic model called `LanceModel`.
@@ -261,37 +314,6 @@ This guide will show how to create tables, insert data into them, and update the
You can also use iterators of other types like Pandas DataFrame or Pylists directly in the above example.
=== "JavaScript"
Initialize a VectorDB connection and create a table using one of the many methods listed below.
```javascript
const lancedb = require("vectordb");
const uri = "data/sample-lancedb";
const db = await lancedb.connect(uri);
```
You can create a LanceDB table in JavaScript using an array of JSON records as follows.
```javascript
const tb = await db.createTable("my_table", [{
"vector": [3.1, 4.1],
"item": "foo",
"price": 10.0
}, {
"vector": [5.9, 26.5],
"item": "bar",
"price": 20.0
}]);
```
!!! info "Note"
If the table already exists, LanceDB will raise an error by default. If you want to overwrite the table, you need to specify the `WriteMode` in the createTable function.
```javascript
const table = await con.createTable(tableName, data, { writeMode: WriteMode.Overwrite })
```
## Open existing tables
=== "Python"

View File

@@ -13,7 +13,7 @@
},
{
"cell_type": "code",
"execution_count": 50,
"execution_count": 2,
"id": "c1b4e34b-a49c-471d-a343-a5940bb5138a",
"metadata": {},
"outputs": [],
@@ -23,7 +23,7 @@
},
{
"cell_type": "code",
"execution_count": 1,
"execution_count": 3,
"id": "4e5a8d07-d9a1-48c1-913a-8e0629289579",
"metadata": {},
"outputs": [],
@@ -44,7 +44,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": 4,
"id": "5df12f66-8d99-43ad-8d0b-22189ec0a6b9",
"metadata": {},
"outputs": [
@@ -62,7 +62,7 @@
"long: [[-122.7,-74.1]]"
]
},
"execution_count": 2,
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
@@ -90,7 +90,7 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": 5,
"id": "f4d87ae9-0ccb-48eb-b31d-bb8f2370e47e",
"metadata": {},
"outputs": [
@@ -108,7 +108,7 @@
"long: [[-122.7,-74.1]]"
]
},
"execution_count": 3,
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
@@ -135,10 +135,17 @@
},
{
"cell_type": "code",
"execution_count": 8,
"execution_count": 6,
"id": "25f34bcf-fca0-4431-8601-eac95d1bd347",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"[2024-01-31T18:59:33Z WARN lance::dataset] No existing dataset at /Users/qian/Work/LanceDB/lancedb/docs/src/notebooks/.lancedb/table3.lance, it will be created\n"
]
},
{
"data": {
"text/plain": [
@@ -148,7 +155,7 @@
"long: float"
]
},
"execution_count": 8,
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
@@ -171,45 +178,51 @@
"id": "4df51925-7ca2-4005-9c72-38b3d26240c6",
"metadata": {},
"source": [
"### From PyArrow Tables\n",
"### From an Arrow Table\n",
"\n",
"You can also create LanceDB tables directly from pyarrow tables"
"You can also create LanceDB tables directly from pyarrow tables. LanceDB supports float16 type."
]
},
{
"cell_type": "code",
"execution_count": 12,
"execution_count": 7,
"id": "90a880f6-be43-4c9d-ba65-0b05197c0f6f",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"vector: fixed_size_list<item: float>[2]\n",
" child 0, item: float\n",
"item: string\n",
"price: double"
"vector: fixed_size_list<item: halffloat>[16]\n",
" child 0, item: halffloat\n",
"text: string"
]
},
"execution_count": 12,
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"table = pa.Table.from_arrays(\n",
" [\n",
" pa.array([[3.1, 4.1], [5.9, 26.5]],\n",
" pa.list_(pa.float32(), 2)),\n",
" pa.array([\"foo\", \"bar\"]),\n",
" pa.array([10.0, 20.0]),\n",
" ],\n",
" [\"vector\", \"item\", \"price\"],\n",
" )\n",
"import numpy as np\n",
"\n",
"db = lancedb.connect(\"db\")\n",
"dim = 16\n",
"total = 2\n",
"schema = pa.schema(\n",
" [\n",
" pa.field(\"vector\", pa.list_(pa.float16(), dim)),\n",
" pa.field(\"text\", pa.string())\n",
" ]\n",
")\n",
"data = pa.Table.from_arrays(\n",
" [\n",
" pa.array([np.random.randn(dim).astype(np.float16) for _ in range(total)],\n",
" pa.list_(pa.float16(), dim)),\n",
" pa.array([\"foo\", \"bar\"])\n",
" ],\n",
" [\"vector\", \"text\"],\n",
")\n",
"\n",
"tbl = db.create_table(\"test1\", table, mode=\"overwrite\")\n",
"tbl = db.create_table(\"f16_tbl\", data, schema=schema)\n",
"tbl.schema"
]
},
@@ -225,7 +238,7 @@
},
{
"cell_type": "code",
"execution_count": 13,
"execution_count": 8,
"id": "d81121d7-e4b7-447c-a48c-974b6ebb464a",
"metadata": {},
"outputs": [
@@ -240,7 +253,7 @@
"imdb_id: int64 not null"
]
},
"execution_count": 13,
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
@@ -282,7 +295,7 @@
},
{
"cell_type": "code",
"execution_count": 14,
"execution_count": 9,
"id": "bc247142-4e3c-41a2-b94c-8e00d2c2a508",
"metadata": {},
"outputs": [
@@ -292,7 +305,7 @@
"LanceTable(table4)"
]
},
"execution_count": 14,
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
@@ -333,7 +346,7 @@
},
{
"cell_type": "code",
"execution_count": 16,
"execution_count": 10,
"id": "25ad3523-e0c9-4c28-b3df-38189c4e0e5f",
"metadata": {},
"outputs": [
@@ -346,7 +359,7 @@
"price: double not null"
]
},
"execution_count": 16,
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
@@ -385,7 +398,7 @@
},
{
"cell_type": "code",
"execution_count": 17,
"execution_count": 11,
"id": "2814173a-eacc-4dd8-a64d-6312b44582cc",
"metadata": {},
"outputs": [],
@@ -411,7 +424,7 @@
},
{
"cell_type": "code",
"execution_count": 18,
"execution_count": 12,
"id": "df9e13c0-41f6-437f-9dfa-2fd71d3d9c45",
"metadata": {},
"outputs": [
@@ -421,7 +434,7 @@
"['table6', 'table4', 'table5', 'movielens_small']"
]
},
"execution_count": 18,
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
@@ -432,7 +445,7 @@
},
{
"cell_type": "code",
"execution_count": 20,
"execution_count": 13,
"id": "9343f5ad-6024-42ee-ac2f-6c1471df8679",
"metadata": {},
"outputs": [
@@ -541,7 +554,7 @@
"9 [5.9, 26.5] bar 20.0"
]
},
"execution_count": 20,
"execution_count": 13,
"metadata": {},
"output_type": "execute_result"
}
@@ -564,7 +577,7 @@
},
{
"cell_type": "code",
"execution_count": 21,
"execution_count": 14,
"id": "8a56250f-73a1-4c26-a6ad-5c7a0ce3a9ab",
"metadata": {},
"outputs": [],
@@ -590,7 +603,7 @@
},
{
"cell_type": "code",
"execution_count": 22,
"execution_count": 15,
"id": "030c7057-b98e-4e2f-be14-b8c1f927f83c",
"metadata": {},
"outputs": [],
@@ -621,7 +634,7 @@
},
{
"cell_type": "code",
"execution_count": 24,
"execution_count": 16,
"id": "e7a17de2-08d2-41b7-bd05-f63d1045ab1f",
"metadata": {},
"outputs": [
@@ -629,16 +642,16 @@
"name": "stdout",
"output_type": "stream",
"text": [
"32\n"
"22\n"
]
},
{
"data": {
"text/plain": [
"17"
"12"
]
},
"execution_count": 24,
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
@@ -661,7 +674,7 @@
},
{
"cell_type": "code",
"execution_count": 30,
"execution_count": 17,
"id": "fe3310bd-08f4-4a22-a63b-b3127d22f9f7",
"metadata": {},
"outputs": [
@@ -681,25 +694,20 @@
"8 [3.1, 4.1] foo 10.0\n",
"9 [3.1, 4.1] foo 10.0\n",
"10 [3.1, 4.1] foo 10.0\n",
"11 [3.1, 4.1] foo 10.0\n",
"12 [3.1, 4.1] foo 10.0\n",
"13 [3.1, 4.1] foo 10.0\n",
"14 [3.1, 4.1] foo 10.0\n",
"15 [3.1, 4.1] foo 10.0\n",
"16 [3.1, 4.1] foo 10.0\n"
"11 [3.1, 4.1] foo 10.0\n"
]
},
{
"ename": "OSError",
"evalue": "LanceError(IO): Error during planning: column foo does not exist",
"evalue": "LanceError(IO): Error during planning: column foo does not exist, /Users/runner/work/lance/lance/rust/lance-core/src/error.rs:212:23",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mOSError\u001b[0m Traceback (most recent call last)",
"Cell \u001b[0;32mIn[30], line 4\u001b[0m\n\u001b[1;32m 2\u001b[0m to_remove \u001b[38;5;241m=\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m, \u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;241m.\u001b[39mjoin(\u001b[38;5;28mstr\u001b[39m(v) \u001b[38;5;28;01mfor\u001b[39;00m v \u001b[38;5;129;01min\u001b[39;00m to_remove)\n\u001b[1;32m 3\u001b[0m \u001b[38;5;28mprint\u001b[39m(tbl\u001b[38;5;241m.\u001b[39mto_pandas())\n\u001b[0;32m----> 4\u001b[0m \u001b[43mtbl\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdelete\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43mf\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mitem IN (\u001b[39;49m\u001b[38;5;132;43;01m{\u001b[39;49;00m\u001b[43mto_remove\u001b[49m\u001b[38;5;132;43;01m}\u001b[39;49;00m\u001b[38;5;124;43m)\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\n\u001b[1;32m 5\u001b[0m tbl\u001b[38;5;241m.\u001b[39mto_pandas()\n",
"File \u001b[0;32m~/Documents/lancedb/lancedb/python/lancedb/table.py:610\u001b[0m, in \u001b[0;36mLanceTable.delete\u001b[0;34m(self, where)\u001b[0m\n\u001b[1;32m 609\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mdelete\u001b[39m(\u001b[38;5;28mself\u001b[39m, where: \u001b[38;5;28mstr\u001b[39m):\n\u001b[0;32m--> 610\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_dataset\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdelete\u001b[49m\u001b[43m(\u001b[49m\u001b[43mwhere\u001b[49m\u001b[43m)\u001b[49m\n",
"File \u001b[0;32m~/Documents/lancedb/lancedb/env/lib/python3.11/site-packages/lance/dataset.py:489\u001b[0m, in \u001b[0;36mLanceDataset.delete\u001b[0;34m(self, predicate)\u001b[0m\n\u001b[1;32m 487\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(predicate, pa\u001b[38;5;241m.\u001b[39mcompute\u001b[38;5;241m.\u001b[39mExpression):\n\u001b[1;32m 488\u001b[0m predicate \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mstr\u001b[39m(predicate)\n\u001b[0;32m--> 489\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_ds\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdelete\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpredicate\u001b[49m\u001b[43m)\u001b[49m\n",
"\u001b[0;31mOSError\u001b[0m: LanceError(IO): Error during planning: column foo does not exist"
"Cell \u001b[0;32mIn[17], line 4\u001b[0m\n\u001b[1;32m 2\u001b[0m to_remove \u001b[38;5;241m=\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m, \u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;241m.\u001b[39mjoin(\u001b[38;5;28mstr\u001b[39m(v) \u001b[38;5;28;01mfor\u001b[39;00m v \u001b[38;5;129;01min\u001b[39;00m to_remove)\n\u001b[1;32m 3\u001b[0m \u001b[38;5;28mprint\u001b[39m(tbl\u001b[38;5;241m.\u001b[39mto_pandas())\n\u001b[0;32m----> 4\u001b[0m \u001b[43mtbl\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdelete\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43mf\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mitem IN (\u001b[39;49m\u001b[38;5;132;43;01m{\u001b[39;49;00m\u001b[43mto_remove\u001b[49m\u001b[38;5;132;43;01m}\u001b[39;49;00m\u001b[38;5;124;43m)\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\n",
"File \u001b[0;32m~/Work/LanceDB/lancedb/docs/doc-venv/lib/python3.11/site-packages/lancedb/table.py:872\u001b[0m, in \u001b[0;36mLanceTable.delete\u001b[0;34m(self, where)\u001b[0m\n\u001b[1;32m 871\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mdelete\u001b[39m(\u001b[38;5;28mself\u001b[39m, where: \u001b[38;5;28mstr\u001b[39m):\n\u001b[0;32m--> 872\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_dataset\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdelete\u001b[49m\u001b[43m(\u001b[49m\u001b[43mwhere\u001b[49m\u001b[43m)\u001b[49m\n",
"File \u001b[0;32m~/Work/LanceDB/lancedb/docs/doc-venv/lib/python3.11/site-packages/lance/dataset.py:596\u001b[0m, in \u001b[0;36mLanceDataset.delete\u001b[0;34m(self, predicate)\u001b[0m\n\u001b[1;32m 594\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(predicate, pa\u001b[38;5;241m.\u001b[39mcompute\u001b[38;5;241m.\u001b[39mExpression):\n\u001b[1;32m 595\u001b[0m predicate \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mstr\u001b[39m(predicate)\n\u001b[0;32m--> 596\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_ds\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdelete\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpredicate\u001b[49m\u001b[43m)\u001b[49m\n",
"\u001b[0;31mOSError\u001b[0m: LanceError(IO): Error during planning: column foo does not exist, /Users/runner/work/lance/lance/rust/lance-core/src/error.rs:212:23"
]
}
],
@@ -712,7 +720,7 @@
},
{
"cell_type": "code",
"execution_count": 43,
"execution_count": null,
"id": "87d5bc21-847f-4c81-b56e-f6dbe5d05aac",
"metadata": {},
"outputs": [],
@@ -729,7 +737,7 @@
},
{
"cell_type": "code",
"execution_count": 44,
"execution_count": null,
"id": "9cba4519-eb3a-4941-ab7e-873d762e750f",
"metadata": {},
"outputs": [],
@@ -742,7 +750,7 @@
},
{
"cell_type": "code",
"execution_count": 46,
"execution_count": null,
"id": "5bdc9801-d5ed-4871-92d0-88b27108e788",
"metadata": {},
"outputs": [
@@ -817,7 +825,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.4"
"version": "3.11.7"
}
},
"nbformat": 4,

View File

@@ -58,6 +58,8 @@ pip install lancedb
::: lancedb.schema.vector
::: lancedb.merge.LanceMergeInsertBuilder
## Integrations
### Pydantic

44
node/package-lock.json generated
View File

@@ -1,12 +1,12 @@
{
"name": "vectordb",
"version": "0.4.6",
"version": "0.4.7",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "vectordb",
"version": "0.4.6",
"version": "0.4.7",
"cpu": [
"x64",
"arm64"
@@ -53,11 +53,11 @@
"uuid": "^9.0.0"
},
"optionalDependencies": {
"@lancedb/vectordb-darwin-arm64": "0.4.6",
"@lancedb/vectordb-darwin-x64": "0.4.6",
"@lancedb/vectordb-linux-arm64-gnu": "0.4.6",
"@lancedb/vectordb-linux-x64-gnu": "0.4.6",
"@lancedb/vectordb-win32-x64-msvc": "0.4.6"
"@lancedb/vectordb-darwin-arm64": "0.4.7",
"@lancedb/vectordb-darwin-x64": "0.4.7",
"@lancedb/vectordb-linux-arm64-gnu": "0.4.7",
"@lancedb/vectordb-linux-x64-gnu": "0.4.7",
"@lancedb/vectordb-win32-x64-msvc": "0.4.7"
}
},
"node_modules/@75lb/deep-merge": {
@@ -329,9 +329,9 @@
}
},
"node_modules/@lancedb/vectordb-darwin-arm64": {
"version": "0.4.6",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.4.6.tgz",
"integrity": "sha512-p6w/BXBxgFHR87phxvfBPPbvz4wDGmG2guRSQPEriwrc8h/gQ3wuexHhyzi7SWcV2E25vyUO9QcFL3vYKhIJRg==",
"version": "0.4.7",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.4.7.tgz",
"integrity": "sha512-kACOIytgjBfX8NRwjPKe311XRN3lbSN13B7avT5htMd3kYm3AnnMag9tZhlwoO7lIuvGaXhy7mApygJrjhfJ4g==",
"cpu": [
"arm64"
],
@@ -341,9 +341,9 @@
]
},
"node_modules/@lancedb/vectordb-darwin-x64": {
"version": "0.4.6",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.4.6.tgz",
"integrity": "sha512-7Fmg63Ky783ROpaQEL6I1uTrO//YDi4MgG0pjWAkDKsdHQ8QisFF8kd+JvjPh4PhMScC/rtB0SXpY/Y4zZvLfw==",
"version": "0.4.7",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.4.7.tgz",
"integrity": "sha512-vb74iK5uPWCwz5E60r3yWp/R/HSg54/Z9AZWYckYXqsPv4w/nfbkM5iZhfRqqR/9uE6JClWJKOtjbk7b8CFRFg==",
"cpu": [
"x64"
],
@@ -353,9 +353,9 @@
]
},
"node_modules/@lancedb/vectordb-linux-arm64-gnu": {
"version": "0.4.6",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.4.6.tgz",
"integrity": "sha512-2wM+BKnjtZyKhiQPvldpfORH2JdKy6AuLFJ7AQtuyly57mkvgZRJeqK0DsRi/hyyZPRUOvWaDp/LfAxZvhLWgA==",
"version": "0.4.7",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.4.7.tgz",
"integrity": "sha512-jHp7THm6S9sB8RaCxGoZXLAwGAUHnawUUilB1K3mvQsRdfB2bBs0f7wDehW+PDhr+Iog4LshaWbcnoQEUJWR+Q==",
"cpu": [
"arm64"
],
@@ -365,9 +365,9 @@
]
},
"node_modules/@lancedb/vectordb-linux-x64-gnu": {
"version": "0.4.6",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.4.6.tgz",
"integrity": "sha512-1BK9i3DnnFHyBVLxOfsIW2i800o9exDEHm5onikvfoa5Ot5tXwIwAw86+0HGsBm5YbJnKKxZmbAM6Pr9qfMKiQ==",
"version": "0.4.7",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.4.7.tgz",
"integrity": "sha512-LKbVe6Wrp/AGqCCjKliNDmYoeTNgY/wfb2DTLjrx41Jko/04ywLrJ6xSEAn3XD5RDCO5u3fyUdXHHHv5a3VAAQ==",
"cpu": [
"x64"
],
@@ -377,9 +377,9 @@
]
},
"node_modules/@lancedb/vectordb-win32-x64-msvc": {
"version": "0.4.6",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.4.6.tgz",
"integrity": "sha512-Fh/fw+HRf/LDZKCDQTvpWoacFfmLXGwQpcqxxlwIZ0vy45eCNYvnZrpjQBjej0uh3tEVC6OHh6Jhn7Pr9k8r2w==",
"version": "0.4.7",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.4.7.tgz",
"integrity": "sha512-C5ln4+wafeY1Sm4PeV0Ios9lUaQVVip5Mjl9XU7ngioSEMEuXI/XMVfIdVfDPppVNXPeQxg33wLA272uw88D1Q==",
"cpu": [
"x64"
],

View File

@@ -17,7 +17,11 @@
},
"repository": {
"type": "git",
"url": "https://github.com/lancedb/lancedb/node"
"url": "https://github.com/lancedb/lancedb.git"
},
"homepage": "https://lancedb.github.io/lancedb/",
"bugs": {
"url": "https://github.com/lancedb/lancedb/issues"
},
"keywords": [
"data-format",

View File

@@ -37,6 +37,7 @@ const {
tableCountRows,
tableDelete,
tableUpdate,
tableMergeInsert,
tableCleanupOldVersions,
tableCompactFiles,
tableListIndices,
@@ -440,6 +441,38 @@ export interface Table<T = number[]> {
*/
update: (args: UpdateArgs | UpdateSqlArgs) => Promise<void>
/**
* Runs a "merge insert" operation on the table
*
* This operation can add rows, update rows, and remove rows all in a single
* transaction. It is a very generic tool that can be used to create
* behaviors like "insert if not exists", "update or insert (i.e. upsert)",
* or even replace a portion of existing data with new data (e.g. replace
* all data where month="january")
*
* The merge insert operation works by combining new data from a
* **source table** with existing data in a **target table** by using a
* join. There are three categories of records.
*
* "Matched" records are records that exist in both the source table and
* the target table. "Not matched" records exist only in the source table
* (e.g. these are new data) "Not matched by source" records exist only
* in the target table (this is old data)
*
* The MergeInsertArgs can be used to customize what should happen for
* each category of data.
*
* Please note that the data may appear to be reordered as part of this
* operation. This is because updated rows will be deleted from the
* dataset and then reinserted at the end with the new values.
*
* @param on a column to join on. This is how records from the source
* table and target table are matched.
* @param data the new data to insert
* @param args parameters controlling how the operation should behave
*/
mergeInsert: (on: string, data: Array<Record<string, unknown>> | ArrowTable, args: MergeInsertArgs) => Promise<void>
/**
* List the indicies on this table.
*/
@@ -483,6 +516,36 @@ export interface UpdateSqlArgs {
valuesSql: Record<string, string>
}
export interface MergeInsertArgs {
/**
* If true then rows that exist in both the source table (new data) and
* the target table (old data) will be updated, replacing the old row
* with the corresponding matching row.
*
* If there are multiple matches then the behavior is undefined.
* Currently this causes multiple copies of the row to be created
* but that behavior is subject to change.
*/
whenMatchedUpdateAll?: boolean
/**
* If true then rows that exist only in the source table (new data)
* will be inserted into the target table.
*/
whenNotMatchedInsertAll?: boolean
/**
* If true then rows that exist only in the target table (old data)
* will be deleted.
*
* If this is a string then it will be treated as an SQL filter and
* only rows that both do not match any row in the source table and
* match the given filter will be deleted.
*
* This can be used to replace a selection of existing data with
* new data.
*/
whenNotMatchedBySourceDelete?: string | boolean
}
export interface VectorIndex {
columns: string[]
name: string
@@ -821,6 +884,38 @@ export class LocalTable<T = number[]> implements Table<T> {
})
}
async mergeInsert (on: string, data: Array<Record<string, unknown>> | ArrowTable, args: MergeInsertArgs): Promise<void> {
const whenMatchedUpdateAll = args.whenMatchedUpdateAll ?? false
const whenNotMatchedInsertAll = args.whenNotMatchedInsertAll ?? false
let whenNotMatchedBySourceDelete = false
let whenNotMatchedBySourceDeleteFilt = null
if (args.whenNotMatchedBySourceDelete !== undefined && args.whenNotMatchedBySourceDelete !== null) {
whenNotMatchedBySourceDelete = true
if (args.whenNotMatchedBySourceDelete !== true) {
whenNotMatchedBySourceDeleteFilt = args.whenNotMatchedBySourceDelete
}
}
const schema = await this.schema
let tbl: ArrowTable
if (data instanceof ArrowTable) {
tbl = data
} else {
tbl = makeArrowTable(data, { schema })
}
const buffer = await fromTableToBuffer(tbl, this._embeddings, schema)
this._tbl = await tableMergeInsert.call(
this._tbl,
on,
whenMatchedUpdateAll,
whenNotMatchedInsertAll,
whenNotMatchedBySourceDelete,
whenNotMatchedBySourceDeleteFilt,
buffer
)
}
/**
* Clean up old versions of the table, freeing disk space.
*

View File

@@ -24,7 +24,8 @@ import {
type IndexStats,
type UpdateArgs,
type UpdateSqlArgs,
makeArrowTable
makeArrowTable,
type MergeInsertArgs
} from '../index'
import { Query } from '../query'
@@ -274,6 +275,52 @@ export class RemoteTable<T = number[]> implements Table<T> {
throw new Error('Not implemented')
}
async mergeInsert (on: string, data: Array<Record<string, unknown>> | ArrowTable, args: MergeInsertArgs): Promise<void> {
let tbl: ArrowTable
if (data instanceof ArrowTable) {
tbl = data
} else {
tbl = makeArrowTable(data, await this.schema)
}
const queryParams: any = {
on
}
if (args.whenMatchedUpdateAll ?? false) {
queryParams.when_matched_update_all = 'true'
} else {
queryParams.when_matched_update_all = 'false'
}
if (args.whenNotMatchedInsertAll ?? false) {
queryParams.when_not_matched_insert_all = 'true'
} else {
queryParams.when_not_matched_insert_all = 'false'
}
if (args.whenNotMatchedBySourceDelete !== false && args.whenNotMatchedBySourceDelete !== null && args.whenNotMatchedBySourceDelete !== undefined) {
queryParams.when_not_matched_by_source_delete = 'true'
if (typeof args.whenNotMatchedBySourceDelete === 'string') {
queryParams.when_not_matched_by_source_delete_filt = args.whenNotMatchedBySourceDelete
}
} else {
queryParams.when_not_matched_by_source_delete = 'false'
}
const buffer = await fromTableToStreamBuffer(tbl, this._embeddings)
const res = await this._client.post(
`/v1/table/${this._name}/merge_insert/`,
buffer,
queryParams,
'application/vnd.apache.arrow.stream'
)
if (res.status !== 200) {
throw new Error(
`Server Error, status: ${res.status}, ` +
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
`message: ${res.statusText}: ${res.data}`
)
}
}
async add (data: Array<Record<string, unknown>> | ArrowTable): Promise<number> {
let tbl: ArrowTable
if (data instanceof ArrowTable) {

View File

@@ -531,6 +531,44 @@ describe('LanceDB client', function () {
assert.equal(await table.countRows(), 2)
})
it('can merge insert records into the table', async function () {
const dir = await track().mkdir('lancejs')
const con = await lancedb.connect(dir)
const data = [{ id: 1, age: 1 }, { id: 2, age: 1 }]
const table = await con.createTable('my_table', data)
let newData = [{ id: 2, age: 2 }, { id: 3, age: 2 }]
await table.mergeInsert('id', newData, {
whenNotMatchedInsertAll: true
})
assert.equal(await table.countRows(), 3)
assert.equal((await table.filter('age = 2').execute()).length, 1)
newData = [{ id: 3, age: 3 }, { id: 4, age: 3 }]
await table.mergeInsert('id', newData, {
whenNotMatchedInsertAll: true,
whenMatchedUpdateAll: true
})
assert.equal(await table.countRows(), 4)
assert.equal((await table.filter('age = 3').execute()).length, 2)
newData = [{ id: 5, age: 4 }]
await table.mergeInsert('id', newData, {
whenNotMatchedInsertAll: true,
whenMatchedUpdateAll: true,
whenNotMatchedBySourceDelete: 'age < 3'
})
assert.equal(await table.countRows(), 3)
await table.mergeInsert('id', newData, {
whenNotMatchedInsertAll: true,
whenMatchedUpdateAll: true,
whenNotMatchedBySourceDelete: true
})
assert.equal(await table.countRows(), 1)
})
it('can update records in the table', async function () {
const uri = await createTestDB()
const con = await lancedb.connect(uri)

View File

@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.5.1
current_version = 0.5.2
commit = True
message = [python] Bump version: {current_version} → {new_version}
tag = True

103
python/lancedb/merge.py Normal file
View File

@@ -0,0 +1,103 @@
# Copyright 2023 LanceDB Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from typing import TYPE_CHECKING, List, Optional
if TYPE_CHECKING:
from .common import DATA
class LanceMergeInsertBuilder(object):
"""Builder for a LanceDB merge insert operation
See [`merge_insert`][lancedb.table.Table.merge_insert] for
more context
"""
def __init__(self, table: "Table", on: List[str]): # noqa: F821
# Do not put a docstring here. This method should be hidden
# from API docs. Users should use merge_insert to create
# this object.
self._table = table
self._on = on
self._when_matched_update_all = False
self._when_not_matched_insert_all = False
self._when_not_matched_by_source_delete = False
self._when_not_matched_by_source_condition = None
def when_matched_update_all(self) -> LanceMergeInsertBuilder:
"""
Rows that exist in both the source table (new data) and
the target table (old data) will be updated, replacing
the old row with the corresponding matching row.
If there are multiple matches then the behavior is undefined.
Currently this causes multiple copies of the row to be created
but that behavior is subject to change.
"""
self._when_matched_update_all = True
return self
def when_not_matched_insert_all(self) -> LanceMergeInsertBuilder:
"""
Rows that exist only in the source table (new data) should
be inserted into the target table.
"""
self._when_not_matched_insert_all = True
return self
def when_not_matched_by_source_delete(
self, condition: Optional[str] = None
) -> LanceMergeInsertBuilder:
"""
Rows that exist only in the target table (old data) will be
deleted. An optional condition can be provided to limit what
data is deleted.
Parameters
----------
condition: Optional[str], default None
If None then all such rows will be deleted. Otherwise the
condition will be used as an SQL filter to limit what rows
are deleted.
"""
self._when_not_matched_by_source_delete = True
if condition is not None:
self._when_not_matched_by_source_condition = condition
return self
def execute(
self,
new_data: DATA,
on_bad_vectors: str = "error",
fill_value: float = 0.0,
):
"""
Executes the merge insert operation
Nothing is returned but the [`Table`][lancedb.table.Table] is updated
Parameters
----------
new_data: DATA
New records which will be matched against the existing records
to potentially insert or update into the table. This parameter
can be anything you use for [`add`][lancedb.table.Table.add]
on_bad_vectors: str, default "error"
What to do if any of the vectors are not the same size or contains NaNs.
One of "error", "drop", "fill".
fill_value: float, default 0.
The value to use when filling vectors. Only used if on_bad_vectors="fill".
"""
self._table._do_merge(self, new_data, on_bad_vectors, fill_value)

View File

@@ -13,6 +13,8 @@
import functools
import logging
import os
from typing import Any, Callable, Dict, List, Optional, Union
from urllib.parse import urljoin
@@ -20,6 +22,8 @@ import attrs
import pyarrow as pa
import requests
from pydantic import BaseModel
from requests.adapters import HTTPAdapter
from urllib3 import Retry
from lancedb.common import Credential
from lancedb.remote import VectorQuery, VectorQueryResult
@@ -57,6 +61,10 @@ class RestfulLanceDBClient:
@functools.cached_property
def session(self) -> requests.Session:
sess = requests.Session()
retry_adapter_instance = retry_adapter(retry_adapter_options())
sess.mount(urljoin(self.url, "/v1/table/"), retry_adapter_instance)
adapter_class = LanceDBClientHTTPAdapterFactory()
sess.mount("https://", adapter_class())
return sess
@@ -170,3 +178,72 @@ class RestfulLanceDBClient:
"""Query a table."""
tbl = self.post(f"/v1/table/{table_name}/query/", query, deserialize=_read_ipc)
return VectorQueryResult(tbl)
def mount_retry_adapter_for_table(self, table_name: str) -> None:
"""
Adds an http adapter to session that will retry retryable requests to the table.
"""
retry_options = retry_adapter_options(methods=["GET", "POST"])
retry_adapter_instance = retry_adapter(retry_options)
session = self.session
session.mount(
urljoin(self.url, f"/v1/table/{table_name}/query/"), retry_adapter_instance
)
session.mount(
urljoin(self.url, f"/v1/table/{table_name}/describe/"),
retry_adapter_instance,
)
session.mount(
urljoin(self.url, f"/v1/table/{table_name}/index/list/"),
retry_adapter_instance,
)
def retry_adapter_options(methods=["GET"]) -> Dict[str, Any]:
return {
"retries": int(os.environ.get("LANCE_CLIENT_MAX_RETRIES", "3")),
"connect_retries": int(os.environ.get("LANCE_CLIENT_CONNECT_RETRIES", "3")),
"read_retries": int(os.environ.get("LANCE_CLIENT_READ_RETRIES", "3")),
"backoff_factor": float(
os.environ.get("LANCE_CLIENT_RETRY_BACKOFF_FACTOR", "0.25")
),
"backoff_jitter": float(
os.environ.get("LANCE_CLIENT_RETRY_BACKOFF_JITTER", "0.25")
),
"statuses": [
int(i.strip())
for i in os.environ.get(
"LANCE_CLIENT_RETRY_STATUSES", "429, 500, 502, 503"
).split(",")
],
"methods": methods,
}
def retry_adapter(options: Dict[str, Any]) -> HTTPAdapter:
total_retries = options["retries"]
connect_retries = options["connect_retries"]
read_retries = options["read_retries"]
backoff_factor = options["backoff_factor"]
backoff_jitter = options["backoff_jitter"]
statuses = options["statuses"]
methods = frozenset(options["methods"])
logging.debug(
f"Setting up retry adapter with {total_retries} retries," # noqa G003
+ f"connect retries {connect_retries}, read retries {read_retries},"
+ f"backoff factor {backoff_factor}, statuses {statuses}, "
+ f"methods {methods}"
)
return HTTPAdapter(
max_retries=Retry(
total=total_retries,
connect=connect_retries,
read=read_retries,
backoff_factor=backoff_factor,
backoff_jitter=backoff_jitter,
status_forcelist=statuses,
allowed_methods=methods,
)
)

View File

@@ -95,6 +95,8 @@ class RemoteDBConnection(DBConnection):
"""
from .table import RemoteTable
self._client.mount_retry_adapter_for_table(name)
# check if table exists
try:
self._client.post(f"/v1/table/{name}/describe/")

View File

@@ -19,6 +19,7 @@ import pyarrow as pa
from lance import json_to_schema
from lancedb.common import DATA, VEC, VECTOR_COLUMN_NAME
from lancedb.merge import LanceMergeInsertBuilder
from ..query import LanceVectorQueryBuilder
from ..table import Query, Table, _sanitize_data
@@ -244,6 +245,47 @@ class RemoteTable(Table):
result = self._conn._client.query(self._name, query)
return result.to_arrow()
def _do_merge(
self,
merge: LanceMergeInsertBuilder,
new_data: DATA,
on_bad_vectors: str,
fill_value: float,
):
data = _sanitize_data(
new_data,
self.schema,
metadata=None,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
)
payload = to_ipc_binary(data)
params = {}
if len(merge._on) != 1:
raise ValueError(
"RemoteTable only supports a single on key in merge_insert"
)
params["on"] = merge._on[0]
params["when_matched_update_all"] = str(merge._when_matched_update_all).lower()
params["when_not_matched_insert_all"] = str(
merge._when_not_matched_insert_all
).lower()
params["when_not_matched_by_source_delete"] = str(
merge._when_not_matched_by_source_delete
).lower()
if merge._when_not_matched_by_source_condition is not None:
params[
"when_not_matched_by_source_delete_filt"
] = merge._when_not_matched_by_source_condition
self._conn._client.post(
f"/v1/table/{self._name}/merge_insert/",
data=payload,
params=params,
content_type=ARROW_STREAM_CONTENT_TYPE,
)
def delete(self, predicate: str):
"""Delete rows from the table.
@@ -355,6 +397,18 @@ class RemoteTable(Table):
payload = {"predicate": where, "updates": updates}
self._conn._client.post(f"/v1/table/{self._name}/update/", data=payload)
def cleanup_old_versions(self, *_):
"""cleanup_old_versions() is not supported on the LanceDB cloud"""
raise NotImplementedError(
"cleanup_old_versions() is not supported on the LanceDB cloud"
)
def compact_files(self, *_):
"""compact_files() is not supported on the LanceDB cloud"""
raise NotImplementedError(
"compact_files() is not supported on the LanceDB cloud"
)
def add_index(tbl: pa.Table, i: int) -> pa.Table:
return tbl.add_column(

View File

@@ -28,6 +28,7 @@ from lance.vector import vec_to_table
from .common import DATA, VEC, VECTOR_COLUMN_NAME
from .embeddings import EmbeddingFunctionConfig, EmbeddingFunctionRegistry
from .merge import LanceMergeInsertBuilder
from .pydantic import LanceModel, model_to_dict
from .query import LanceQueryBuilder, Query
from .util import (
@@ -334,6 +335,66 @@ class Table(ABC):
"""
raise NotImplementedError
def merge_insert(self, on: Union[str, Iterable[str]]) -> LanceMergeInsertBuilder:
"""
Returns a [`LanceMergeInsertBuilder`][lancedb.merge.LanceMergeInsertBuilder]
that can be used to create a "merge insert" operation
This operation can add rows, update rows, and remove rows all in a single
transaction. It is a very generic tool that can be used to create
behaviors like "insert if not exists", "update or insert (i.e. upsert)",
or even replace a portion of existing data with new data (e.g. replace
all data where month="january")
The merge insert operation works by combining new data from a
**source table** with existing data in a **target table** by using a
join. There are three categories of records.
"Matched" records are records that exist in both the source table and
the target table. "Not matched" records exist only in the source table
(e.g. these are new data) "Not matched by source" records exist only
in the target table (this is old data)
The builder returned by this method can be used to customize what
should happen for each category of data.
Please note that the data may appear to be reordered as part of this
operation. This is because updated rows will be deleted from the
dataset and then reinserted at the end with the new values.
Parameters
----------
on: Union[str, Iterable[str]]
A column (or columns) to join on. This is how records from the
source table and target table are matched. Typically this is some
kind of key or id column.
Examples
--------
>>> import lancedb
>>> data = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]})
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table", data)
>>> new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform a "upsert" operation
>>> table.merge_insert("a") \\
... .when_matched_update_all() \\
... .when_not_matched_insert_all() \\
... .execute(new_data)
>>> # The order of new rows is non-deterministic since we use
>>> # a hash-join as part of this operation and so we sort here
>>> table.to_arrow().sort_by("a").to_pandas()
a b
0 1 b
1 2 x
2 3 y
3 4 z
"""
on = [on] if isinstance(on, str) else list(on.iter())
return LanceMergeInsertBuilder(self, on)
@abstractmethod
def search(
self,
@@ -379,6 +440,8 @@ class Table(ABC):
the table
vector_column_name: str
The name of the vector column to search.
The vector column needs to be a pyarrow fixed size list type
*default "vector"*
query_type: str
*default "auto"*.
@@ -414,6 +477,16 @@ class Table(ABC):
def _execute_query(self, query: Query) -> pa.Table:
pass
@abstractmethod
def _do_merge(
self,
merge: LanceMergeInsertBuilder,
new_data: DATA,
on_bad_vectors: str,
fill_value: float,
):
pass
@abstractmethod
def delete(self, where: str):
"""Delete rows from the table.
@@ -521,6 +594,52 @@ class Table(ABC):
"""
raise NotImplementedError
@abstractmethod
def cleanup_old_versions(
self,
older_than: Optional[timedelta] = None,
*,
delete_unverified: bool = False,
) -> CleanupStats:
"""
Clean up old versions of the table, freeing disk space.
Note: This function is not available in LanceDb Cloud (since LanceDb
Cloud manages cleanup for you automatically)
Parameters
----------
older_than: timedelta, default None
The minimum age of the version to delete. If None, then this defaults
to two weeks.
delete_unverified: bool, default False
Because they may be part of an in-progress transaction, files newer
than 7 days old are not deleted by default. If you are sure that
there are no in-progress transactions, then you can set this to True
to delete all files older than `older_than`.
Returns
-------
CleanupStats
The stats of the cleanup operation, including how many bytes were
freed.
"""
@abstractmethod
def compact_files(self, *args, **kwargs):
"""
Run the compaction process on the table.
Note: This function is not available in LanceDb Cloud (since LanceDb
Cloud manages compaction for you automatically)
This can be run after making several small appends to optimize the table
for faster reads.
Arguments are passed onto :meth:`lance.dataset.DatasetOptimizer.compact_files`.
For most cases, the default should be fine.
"""
class LanceTable(Table):
"""
@@ -1196,6 +1315,31 @@ class LanceTable(Table):
with_row_id=query.with_row_id,
)
def _do_merge(
self,
merge: LanceMergeInsertBuilder,
new_data: DATA,
on_bad_vectors: str,
fill_value: float,
):
new_data = _sanitize_data(
new_data,
self.schema,
metadata=self.schema.metadata,
on_bad_vectors=on_bad_vectors,
fill_value=fill_value,
)
ds = self.to_lance()
builder = ds.merge_insert(merge._on)
if merge._when_matched_update_all:
builder.when_matched_update_all()
if merge._when_not_matched_insert_all:
builder.when_not_matched_insert_all()
if merge._when_not_matched_by_source_delete:
cond = merge._when_not_matched_by_source_condition
builder.when_not_matched_by_source_delete(cond)
builder.execute(new_data)
def cleanup_old_versions(
self,
older_than: Optional[timedelta] = None,
@@ -1233,8 +1377,9 @@ class LanceTable(Table):
This can be run after making several small appends to optimize the table
for faster reads.
Arguments are passed onto :meth:`lance.dataset.DatasetOptimizer.compact_files`.
For most cases, the default should be fine.
Arguments are passed onto `lance.dataset.DatasetOptimizer.compact_files`.
(see Lance documentation for more details) For most cases, the default
should be fine.
"""
return self.to_lance().optimize.compact_files(*args, **kwargs)

View File

@@ -1,9 +1,9 @@
[project]
name = "lancedb"
version = "0.5.1"
version = "0.5.2"
dependencies = [
"deprecation",
"pylance==0.9.10",
"pylance==0.9.12",
"ratelimiter~=1.0",
"retry>=0.9.2",
"tqdm>=4.27.0",

View File

@@ -29,6 +29,9 @@ class FakeLanceDBClient:
def post(self, path: str):
pass
def mount_retry_adapter_for_table(self, table_name: str):
pass
def test_remote_db():
conn = lancedb.connect("db://client-will-be-injected", api_key="fake")

View File

@@ -493,6 +493,62 @@ def test_update_types(db):
assert actual == expected
def test_merge_insert(db):
table = LanceTable.create(
db,
"my_table",
data=pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}),
)
assert len(table) == 3
version = table.version
new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
# upsert
table.merge_insert(
"a"
).when_matched_update_all().when_not_matched_insert_all().execute(new_data)
expected = pa.table({"a": [1, 2, 3, 4], "b": ["a", "x", "y", "z"]})
# These `sort_by` calls can be removed once lance#1892
# is merged (it fixes the ordering)
assert table.to_arrow().sort_by("a") == expected
table.restore(version)
# insert-if-not-exists
table.merge_insert("a").when_not_matched_insert_all().execute(new_data)
expected = pa.table({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "z"]})
assert table.to_arrow().sort_by("a") == expected
table.restore(version)
new_data = pa.table({"a": [2, 4], "b": ["x", "z"]})
# replace-range
table.merge_insert(
"a"
).when_matched_update_all().when_not_matched_insert_all().when_not_matched_by_source_delete(
"a > 2"
).execute(new_data)
expected = pa.table({"a": [1, 2, 4], "b": ["a", "x", "z"]})
assert table.to_arrow().sort_by("a") == expected
table.restore(version)
# replace-range no condition
table.merge_insert(
"a"
).when_matched_update_all().when_not_matched_insert_all().when_not_matched_by_source_delete().execute(
new_data
)
expected = pa.table({"a": [2, 4], "b": ["x", "z"]})
assert table.to_arrow().sort_by("a") == expected
def test_create_with_embedding_function(db):
class MyTable(LanceModel):
text: str

View File

@@ -260,6 +260,7 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> {
cx.export_function("tableCountRows", JsTable::js_count_rows)?;
cx.export_function("tableDelete", JsTable::js_delete)?;
cx.export_function("tableUpdate", JsTable::js_update)?;
cx.export_function("tableMergeInsert", JsTable::js_merge_insert)?;
cx.export_function("tableCleanupOldVersions", JsTable::js_cleanup)?;
cx.export_function("tableCompactFiles", JsTable::js_compact)?;
cx.export_function("tableListIndices", JsTable::js_list_indices)?;

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Deref;
use arrow_array::{RecordBatch, RecordBatchIterator};
use lance::dataset::optimize::CompactionOptions;
use lance::dataset::{WriteMode, WriteParams};
@@ -166,6 +168,53 @@ impl JsTable {
Ok(promise)
}
pub(crate) fn js_merge_insert(mut cx: FunctionContext) -> JsResult<JsPromise> {
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
let rt = runtime(&mut cx)?;
let (deferred, promise) = cx.promise();
let channel = cx.channel();
let table = js_table.table.clone();
let key = cx.argument::<JsString>(0)?.value(&mut cx);
let mut builder = table.merge_insert(&[&key]);
if cx.argument::<JsBoolean>(1)?.value(&mut cx) {
builder.when_matched_update_all();
}
if cx.argument::<JsBoolean>(2)?.value(&mut cx) {
builder.when_not_matched_insert_all();
}
if cx.argument::<JsBoolean>(3)?.value(&mut cx) {
if let Some(filter) = cx.argument_opt(4) {
if filter.is_a::<JsNull, _>(&mut cx) {
builder.when_not_matched_by_source_delete(None);
} else {
let filter = filter
.downcast_or_throw::<JsString, _>(&mut cx)?
.deref()
.value(&mut cx);
builder.when_not_matched_by_source_delete(Some(filter));
}
} else {
builder.when_not_matched_by_source_delete(None);
}
}
let buffer = cx.argument::<JsBuffer>(5)?;
let (batches, schema) =
arrow_buffer_to_record_batch(buffer.as_slice(&cx)).or_throw(&mut cx)?;
rt.spawn(async move {
let new_data = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
let merge_insert_result = builder.execute(Box::new(new_data)).await;
deferred.settle_with(&channel, move |mut cx| {
merge_insert_result.or_throw(&mut cx)?;
Ok(cx.boxed(JsTable::from(table)))
})
});
Ok(promise)
}
pub(crate) fn js_update(mut cx: FunctionContext) -> JsResult<JsPromise> {
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
let table = js_table.table.clone();

View File

@@ -19,6 +19,7 @@ use std::sync::{Arc, Mutex};
use arrow_array::RecordBatchReader;
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use chrono::Duration;
use lance::dataset::builder::DatasetBuilder;
use lance::dataset::cleanup::RemovalStats;
@@ -27,6 +28,7 @@ use lance::dataset::optimize::{
};
pub use lance::dataset::ReadParams;
use lance::dataset::{Dataset, UpdateBuilder, WriteParams};
use lance::dataset::{MergeInsertBuilder as LanceMergeInsertBuilder, WhenNotMatchedBySource};
use lance::io::WrappingObjectStore;
use lance_index::{optimize::OptimizeOptions, DatasetIndexExt};
use log::info;
@@ -38,6 +40,10 @@ use crate::query::Query;
use crate::utils::{PatchReadParam, PatchWriteParam};
use crate::WriteMode;
use self::merge::{MergeInsert, MergeInsertBuilder};
pub mod merge;
/// Optimize the dataset.
///
/// Similar to `VACUUM` in PostgreSQL, it offers different options to
@@ -170,6 +176,71 @@ pub trait Table: std::fmt::Display + Send + Sync {
/// ```
fn create_index(&self, column: &[&str]) -> IndexBuilder;
/// Create a builder for a merge insert operation
///
/// This operation can add rows, update rows, and remove rows all in a single
/// transaction. It is a very generic tool that can be used to create
/// behaviors like "insert if not exists", "update or insert (i.e. upsert)",
/// or even replace a portion of existing data with new data (e.g. replace
/// all data where month="january")
///
/// The merge insert operation works by combining new data from a
/// **source table** with existing data in a **target table** by using a
/// join. There are three categories of records.
///
/// "Matched" records are records that exist in both the source table and
/// the target table. "Not matched" records exist only in the source table
/// (e.g. these are new data) "Not matched by source" records exist only
/// in the target table (this is old data)
///
/// The builder returned by this method can be used to customize what
/// should happen for each category of data.
///
/// Please note that the data may appear to be reordered as part of this
/// operation. This is because updated rows will be deleted from the
/// dataset and then reinserted at the end with the new values.
///
/// # Arguments
///
/// * `on` One or more columns to join on. This is how records from the
/// source table and target table are matched. Typically this is some
/// kind of key or id column.
///
/// # Examples
///
/// ```no_run
/// # use std::sync::Arc;
/// # use vectordb::connection::{Database, Connection};
/// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
/// # RecordBatchIterator, Int32Array};
/// # use arrow_schema::{Schema, Field, DataType};
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
/// let tmpdir = tempfile::tempdir().unwrap();
/// let db = Database::connect(tmpdir.path().to_str().unwrap()).await.unwrap();
/// # let tbl = db.open_table("idx_test").await.unwrap();
/// # let schema = Arc::new(Schema::new(vec![
/// # Field::new("id", DataType::Int32, false),
/// # Field::new("vector", DataType::FixedSizeList(
/// # Arc::new(Field::new("item", DataType::Float32, true)), 128), true),
/// # ]));
/// let new_data = RecordBatchIterator::new(vec![
/// RecordBatch::try_new(schema.clone(),
/// vec![
/// Arc::new(Int32Array::from_iter_values(0..10)),
/// Arc::new(FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
/// (0..10).map(|_| Some(vec![Some(1.0); 128])), 128)),
/// ]).unwrap()
/// ].into_iter().map(Ok),
/// schema.clone());
/// // Perform an upsert operation
/// let mut merge_insert = tbl.merge_insert(&["id"]);
/// merge_insert.when_matched_update_all()
/// .when_not_matched_insert_all();
/// merge_insert.execute(Box::new(new_data)).await.unwrap();
/// # });
/// ```
fn merge_insert(&self, on: &[&str]) -> MergeInsertBuilder;
/// Search the table with a given query vector.
///
/// This is a convenience method for preparing an ANN query.
@@ -593,6 +664,42 @@ impl NativeTable {
}
}
#[async_trait]
impl MergeInsert for NativeTable {
async fn do_merge_insert(
&self,
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<()> {
let dataset = Arc::new(self.clone_inner_dataset());
let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
if params.when_matched_update_all {
builder.when_matched(lance::dataset::WhenMatched::UpdateAll);
} else {
builder.when_matched(lance::dataset::WhenMatched::DoNothing);
}
if params.when_not_matched_insert_all {
builder.when_not_matched(lance::dataset::WhenNotMatched::InsertAll);
} else {
builder.when_not_matched(lance::dataset::WhenNotMatched::DoNothing);
}
if params.when_not_matched_by_source_delete {
let behavior = if let Some(filter) = params.when_not_matched_by_source_delete_filt {
WhenNotMatchedBySource::delete_if(dataset.as_ref(), &filter)?
} else {
WhenNotMatchedBySource::Delete
};
builder.when_not_matched_by_source(behavior);
} else {
builder.when_not_matched_by_source(WhenNotMatchedBySource::Keep);
}
let job = builder.try_build()?;
let new_dataset = job.execute_reader(new_data).await?;
self.reset_dataset((*new_dataset).clone());
Ok(())
}
}
#[async_trait::async_trait]
impl Table for NativeTable {
fn as_any(&self) -> &dyn std::any::Any {
@@ -637,6 +744,11 @@ impl Table for NativeTable {
Ok(())
}
fn merge_insert(&self, on: &[&str]) -> MergeInsertBuilder {
let on = Vec::from_iter(on.iter().map(|key| key.to_string()));
MergeInsertBuilder::new(Arc::new(self.clone()), on)
}
fn create_index(&self, columns: &[&str]) -> IndexBuilder {
IndexBuilder::new(Arc::new(self.clone()), columns)
}
@@ -802,6 +914,38 @@ mod tests {
assert_eq!(table.name, "test");
}
#[tokio::test]
async fn test_merge_insert() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
// Create a dataset with i=0..10
let batches = make_test_batches_with_offset(0);
let table = NativeTable::create(&uri, "test", batches, None, None)
.await
.unwrap();
assert_eq!(table.count_rows().await.unwrap(), 10);
// Create new data with i=5..15
let new_batches = Box::new(make_test_batches_with_offset(5));
// Perform a "insert if not exists"
let mut merge_insert_builder = table.merge_insert(&["i"]);
merge_insert_builder.when_not_matched_insert_all();
merge_insert_builder.execute(new_batches).await.unwrap();
// Only 5 rows should actually be inserted
assert_eq!(table.count_rows().await.unwrap(), 15);
// Create new data with i=15..25 (no id matches)
let new_batches = Box::new(make_test_batches_with_offset(15));
// Perform a "bulk update" (should not affect anything)
let mut merge_insert_builder = table.merge_insert(&["i"]);
merge_insert_builder.when_matched_update_all();
merge_insert_builder.execute(new_batches).await.unwrap();
// No new rows should have been inserted
assert_eq!(table.count_rows().await.unwrap(), 15);
}
#[tokio::test]
async fn test_add_overwrite() {
let tmp_dir = tempdir().unwrap();
@@ -1148,17 +1292,25 @@ mod tests {
assert!(wrapper.called());
}
fn make_test_batches() -> impl RecordBatchReader + Send + Sync + 'static {
fn make_test_batches_with_offset(
offset: i32,
) -> impl RecordBatchReader + Send + Sync + 'static {
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
RecordBatchIterator::new(
vec![RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(0..10))],
vec![Arc::new(Int32Array::from_iter_values(
offset..(offset + 10),
))],
)],
schema,
)
}
fn make_test_batches() -> impl RecordBatchReader + Send + Sync + 'static {
make_test_batches_with_offset(0)
}
#[tokio::test]
async fn test_create_index() {
use arrow_array::RecordBatch;

View File

@@ -0,0 +1,95 @@
// Copyright 2024 Lance Developers.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use arrow_array::RecordBatchReader;
use async_trait::async_trait;
use crate::Result;
#[async_trait]
pub(super) trait MergeInsert: Send + Sync {
async fn do_merge_insert(
&self,
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<()>;
}
/// A builder used to create and run a merge insert operation
///
/// See [`super::Table::merge_insert`] for more context
pub struct MergeInsertBuilder {
table: Arc<dyn MergeInsert>,
pub(super) on: Vec<String>,
pub(super) when_matched_update_all: bool,
pub(super) when_not_matched_insert_all: bool,
pub(super) when_not_matched_by_source_delete: bool,
pub(super) when_not_matched_by_source_delete_filt: Option<String>,
}
impl MergeInsertBuilder {
pub(super) fn new(table: Arc<dyn MergeInsert>, on: Vec<String>) -> Self {
Self {
table,
on,
when_matched_update_all: false,
when_not_matched_insert_all: false,
when_not_matched_by_source_delete: false,
when_not_matched_by_source_delete_filt: None,
}
}
/// Rows that exist in both the source table (new data) and
/// the target table (old data) will be updated, replacing
/// the old row with the corresponding matching row.
///
/// If there are multiple matches then the behavior is undefined.
/// Currently this causes multiple copies of the row to be created
/// but that behavior is subject to change.
pub fn when_matched_update_all(&mut self) -> &mut Self {
self.when_matched_update_all = true;
self
}
/// Rows that exist only in the source table (new data) should
/// be inserted into the target table.
pub fn when_not_matched_insert_all(&mut self) -> &mut Self {
self.when_not_matched_insert_all = true;
self
}
/// Rows that exist only in the target table (old data) will be
/// deleted. An optional condition can be provided to limit what
/// data is deleted.
///
/// # Arguments
///
/// * `condition` - If None then all such rows will be deleted.
/// Otherwise the condition will be used as an SQL filter to
/// limit what rows are deleted.
pub fn when_not_matched_by_source_delete(&mut self, filter: Option<String>) -> &mut Self {
self.when_not_matched_by_source_delete = true;
self.when_not_matched_by_source_delete_filt = filter;
self
}
/// Executes the merge insert operation
///
/// Nothing is returned but the [`super::Table`] is updated
pub async fn execute(self, new_data: Box<dyn RecordBatchReader + Send>) -> Result<()> {
self.table.clone().do_merge_insert(self, new_data).await
}
}