mirror of
https://github.com/lancedb/lancedb.git
synced 2025-12-23 05:19:58 +00:00
Compare commits
12 Commits
v0.4.7
...
python-v0.
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ce2242e06d | ||
|
|
778339388a | ||
|
|
7f8637a0b4 | ||
|
|
09cd08222d | ||
|
|
a248d7feec | ||
|
|
cc9473a94a | ||
|
|
d77e95a4f4 | ||
|
|
62f053ac92 | ||
|
|
34e10caad2 | ||
|
|
f5726e2d0c | ||
|
|
12b4fb42fc | ||
|
|
1328cd46f1 |
2
.github/workflows/docs.yml
vendored
2
.github/workflows/docs.yml
vendored
@@ -29,7 +29,7 @@ jobs:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v4
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.10"
|
||||
cache: "pip"
|
||||
|
||||
2
.github/workflows/docs_test.yml
vendored
2
.github/workflows/docs_test.yml
vendored
@@ -29,7 +29,7 @@ jobs:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v4
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: 3.11
|
||||
cache: "pip"
|
||||
|
||||
6
.github/workflows/make-release-commit.yml
vendored
6
.github/workflows/make-release-commit.yml
vendored
@@ -37,10 +37,10 @@ jobs:
|
||||
run: |
|
||||
git config user.name 'Lance Release'
|
||||
git config user.email 'lance-dev@lancedb.com'
|
||||
- name: Set up Python 3.10
|
||||
uses: actions/setup-python@v4
|
||||
- name: Set up Python 3.11
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.10"
|
||||
python-version: "3.11"
|
||||
- name: Bump version, create tag and commit
|
||||
run: |
|
||||
pip install bump2version
|
||||
|
||||
2
.github/workflows/pypi-publish.yml
vendored
2
.github/workflows/pypi-publish.yml
vendored
@@ -16,7 +16,7 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v4
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.8"
|
||||
- name: Build distribution
|
||||
|
||||
@@ -37,10 +37,10 @@ jobs:
|
||||
run: |
|
||||
git config user.name 'Lance Release'
|
||||
git config user.email 'lance-dev@lancedb.com'
|
||||
- name: Set up Python 3.10
|
||||
uses: actions/setup-python@v4
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.10"
|
||||
python-version: "3.11"
|
||||
- name: Bump version, create tag and commit
|
||||
working-directory: python
|
||||
run: |
|
||||
|
||||
6
.github/workflows/python.yml
vendored
6
.github/workflows/python.yml
vendored
@@ -30,7 +30,7 @@ jobs:
|
||||
fetch-depth: 0
|
||||
lfs: true
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v4
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: 3.${{ matrix.python-minor-version }}
|
||||
- name: Install lancedb
|
||||
@@ -69,7 +69,7 @@ jobs:
|
||||
fetch-depth: 0
|
||||
lfs: true
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v4
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.11"
|
||||
- name: Install lancedb
|
||||
@@ -92,7 +92,7 @@ jobs:
|
||||
fetch-depth: 0
|
||||
lfs: true
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v4
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: 3.9
|
||||
- name: Install lancedb
|
||||
|
||||
@@ -11,10 +11,10 @@ license = "Apache-2.0"
|
||||
repository = "https://github.com/lancedb/lancedb"
|
||||
|
||||
[workspace.dependencies]
|
||||
lance = { "version" = "=0.9.10", "features" = ["dynamodb"] }
|
||||
lance-index = { "version" = "=0.9.10" }
|
||||
lance-linalg = { "version" = "=0.9.10" }
|
||||
lance-testing = { "version" = "=0.9.10" }
|
||||
lance = { "version" = "=0.9.12", "features" = ["dynamodb"] }
|
||||
lance-index = { "version" = "=0.9.12" }
|
||||
lance-linalg = { "version" = "=0.9.12" }
|
||||
lance-testing = { "version" = "=0.9.12" }
|
||||
# Note that this one does not include pyarrow
|
||||
arrow = { version = "50.0", optional = false }
|
||||
arrow-array = "50.0"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
// --8<-- [start:import]
|
||||
import * as lancedb from "vectordb";
|
||||
import { Schema, Field, Float32, FixedSizeList, Int32 } from "apache-arrow";
|
||||
import { Schema, Field, Float32, FixedSizeList, Int32, Float16 } from "apache-arrow";
|
||||
// --8<-- [end:import]
|
||||
import * as fs from "fs";
|
||||
import { Table as ArrowTable, Utf8 } from "apache-arrow";
|
||||
@@ -8,6 +8,7 @@ import { Table as ArrowTable, Utf8 } from "apache-arrow";
|
||||
const example = async () => {
|
||||
fs.rmSync("data/sample-lancedb", { recursive: true, force: true });
|
||||
// --8<-- [start:open_db]
|
||||
const lancedb = require("vectordb");
|
||||
const uri = "data/sample-lancedb";
|
||||
const db = await lancedb.connect(uri);
|
||||
// --8<-- [end:open_db]
|
||||
@@ -48,6 +49,27 @@ const example = async () => {
|
||||
const empty_tbl = await db.createTable({ name: "empty_table", schema });
|
||||
// --8<-- [end:create_empty_table]
|
||||
|
||||
// --8<-- [start:create_f16_table]
|
||||
const dim = 16
|
||||
const total = 10
|
||||
const f16_schema = new Schema([
|
||||
new Field('id', new Int32()),
|
||||
new Field(
|
||||
'vector',
|
||||
new FixedSizeList(dim, new Field('item', new Float16(), true)),
|
||||
false
|
||||
)
|
||||
])
|
||||
const data = lancedb.makeArrowTable(
|
||||
Array.from(Array(total), (_, i) => ({
|
||||
id: i,
|
||||
vector: Array.from(Array(dim), Math.random)
|
||||
})),
|
||||
{ f16_schema }
|
||||
)
|
||||
const table = await db.createTable('f16_tbl', data)
|
||||
// --8<-- [end:create_f16_table]
|
||||
|
||||
// --8<-- [start:search]
|
||||
const query = await tbl.search([100, 100]).limit(2).execute();
|
||||
// --8<-- [end:search]
|
||||
|
||||
@@ -16,9 +16,22 @@ This guide will show how to create tables, insert data into them, and update the
|
||||
db = lancedb.connect("./.lancedb")
|
||||
```
|
||||
|
||||
=== "Javascript"
|
||||
|
||||
Initialize a VectorDB connection and create a table using one of the many methods listed below.
|
||||
|
||||
```javascript
|
||||
const lancedb = require("vectordb");
|
||||
|
||||
const uri = "data/sample-lancedb";
|
||||
const db = await lancedb.connect(uri);
|
||||
```
|
||||
|
||||
LanceDB allows ingesting data from various sources - `dict`, `list[dict]`, `pd.DataFrame`, `pa.Table` or a `Iterator[pa.RecordBatch]`. Let's take a look at some of the these.
|
||||
|
||||
### From list of tuples or dictionaries
|
||||
### From list of tuples or dictionaries
|
||||
|
||||
=== "Python"
|
||||
|
||||
```python
|
||||
import lancedb
|
||||
@@ -32,7 +45,6 @@ This guide will show how to create tables, insert data into them, and update the
|
||||
|
||||
db["my_table"].head()
|
||||
```
|
||||
|
||||
!!! info "Note"
|
||||
If the table already exists, LanceDB will raise an error by default.
|
||||
|
||||
@@ -51,6 +63,27 @@ This guide will show how to create tables, insert data into them, and update the
|
||||
db.create_table("name", data, mode="overwrite")
|
||||
```
|
||||
|
||||
=== "Javascript"
|
||||
You can create a LanceDB table in JavaScript using an array of JSON records as follows.
|
||||
|
||||
```javascript
|
||||
const tb = await db.createTable("my_table", [{
|
||||
"vector": [3.1, 4.1],
|
||||
"item": "foo",
|
||||
"price": 10.0
|
||||
}, {
|
||||
"vector": [5.9, 26.5],
|
||||
"item": "bar",
|
||||
"price": 20.0
|
||||
}]);
|
||||
```
|
||||
!!! info "Note"
|
||||
If the table already exists, LanceDB will raise an error by default. If you want to overwrite the table, you need to specify the `WriteMode` in the createTable function.
|
||||
|
||||
```javascript
|
||||
const table = await con.createTable(tableName, data, { writeMode: WriteMode.Overwrite })
|
||||
```
|
||||
|
||||
### From a Pandas DataFrame
|
||||
|
||||
```python
|
||||
@@ -67,7 +100,9 @@ This guide will show how to create tables, insert data into them, and update the
|
||||
db["my_table"].head()
|
||||
```
|
||||
!!! info "Note"
|
||||
Data is converted to Arrow before being written to disk. For maximum control over how data is saved, either provide the PyArrow schema to convert to or else provide a PyArrow Table directly.
|
||||
Data is converted to Arrow before being written to disk. For maximum control over how data is saved, either provide the PyArrow schema to convert to or else provide a PyArrow Table directly.
|
||||
|
||||
The **`vector`** column needs to be a [Vector](../python/pydantic.md#vector-field) (defined as [pyarrow.FixedSizeList](https://arrow.apache.org/docs/python/generated/pyarrow.list_.html)) type.
|
||||
|
||||
```python
|
||||
custom_schema = pa.schema([
|
||||
@@ -79,7 +114,7 @@ This guide will show how to create tables, insert data into them, and update the
|
||||
table = db.create_table("my_table", data, schema=custom_schema)
|
||||
```
|
||||
|
||||
### From a Polars DataFrame
|
||||
### From a Polars DataFrame
|
||||
|
||||
LanceDB supports [Polars](https://pola.rs/), a modern, fast DataFrame library
|
||||
written in Rust. Just like in Pandas, the Polars integration is enabled by PyArrow
|
||||
@@ -97,26 +132,44 @@ This guide will show how to create tables, insert data into them, and update the
|
||||
table = db.create_table("pl_table", data=data)
|
||||
```
|
||||
|
||||
### From PyArrow Tables
|
||||
You can also create LanceDB tables directly from PyArrow tables
|
||||
### From an Arrow Table
|
||||
=== "Python"
|
||||
You can also create LanceDB tables directly from Arrow tables.
|
||||
LanceDB supports float16 data type!
|
||||
|
||||
```python
|
||||
table = pa.Table.from_arrays(
|
||||
[
|
||||
pa.array([[3.1, 4.1, 5.1, 6.1], [5.9, 26.5, 4.7, 32.8]],
|
||||
pa.list_(pa.float32(), 4)),
|
||||
pa.array(["foo", "bar"]),
|
||||
pa.array([10.0, 20.0]),
|
||||
],
|
||||
["vector", "item", "price"],
|
||||
)
|
||||
import pyarrows as pa
|
||||
import numpy as np
|
||||
|
||||
dim = 16
|
||||
total = 2
|
||||
schema = pa.schema(
|
||||
[
|
||||
pa.field("vector", pa.list_(pa.float16(), dim)),
|
||||
pa.field("text", pa.string())
|
||||
]
|
||||
)
|
||||
data = pa.Table.from_arrays(
|
||||
[
|
||||
pa.array([np.random.randn(dim).astype(np.float16) for _ in range(total)],
|
||||
pa.list_(pa.float16(), dim)),
|
||||
pa.array(["foo", "bar"])
|
||||
],
|
||||
["vector", "text"],
|
||||
)
|
||||
tbl = db.create_table("f16_tbl", data, schema=schema)
|
||||
```
|
||||
|
||||
db = lancedb.connect("db")
|
||||
=== "Javascript"
|
||||
You can also create LanceDB tables directly from Arrow tables.
|
||||
LanceDB supports Float16 data type!
|
||||
|
||||
tbl = db.create_table("my_table", table)
|
||||
```javascript
|
||||
--8<-- "docs/src/basic_legacy.ts:create_f16_table"
|
||||
```
|
||||
|
||||
### From Pydantic Models
|
||||
|
||||
When you create an empty table without data, you must specify the table schema.
|
||||
LanceDB supports creating tables by specifying a PyArrow schema or a specialized
|
||||
Pydantic model called `LanceModel`.
|
||||
@@ -261,37 +314,6 @@ This guide will show how to create tables, insert data into them, and update the
|
||||
|
||||
You can also use iterators of other types like Pandas DataFrame or Pylists directly in the above example.
|
||||
|
||||
=== "JavaScript"
|
||||
Initialize a VectorDB connection and create a table using one of the many methods listed below.
|
||||
|
||||
```javascript
|
||||
const lancedb = require("vectordb");
|
||||
|
||||
const uri = "data/sample-lancedb";
|
||||
const db = await lancedb.connect(uri);
|
||||
```
|
||||
|
||||
You can create a LanceDB table in JavaScript using an array of JSON records as follows.
|
||||
|
||||
```javascript
|
||||
const tb = await db.createTable("my_table", [{
|
||||
"vector": [3.1, 4.1],
|
||||
"item": "foo",
|
||||
"price": 10.0
|
||||
}, {
|
||||
"vector": [5.9, 26.5],
|
||||
"item": "bar",
|
||||
"price": 20.0
|
||||
}]);
|
||||
```
|
||||
|
||||
!!! info "Note"
|
||||
If the table already exists, LanceDB will raise an error by default. If you want to overwrite the table, you need to specify the `WriteMode` in the createTable function.
|
||||
|
||||
```javascript
|
||||
const table = await con.createTable(tableName, data, { writeMode: WriteMode.Overwrite })
|
||||
```
|
||||
|
||||
## Open existing tables
|
||||
|
||||
=== "Python"
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 50,
|
||||
"execution_count": 2,
|
||||
"id": "c1b4e34b-a49c-471d-a343-a5940bb5138a",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
@@ -23,7 +23,7 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"execution_count": 3,
|
||||
"id": "4e5a8d07-d9a1-48c1-913a-8e0629289579",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
@@ -44,7 +44,7 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 2,
|
||||
"execution_count": 4,
|
||||
"id": "5df12f66-8d99-43ad-8d0b-22189ec0a6b9",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
@@ -62,7 +62,7 @@
|
||||
"long: [[-122.7,-74.1]]"
|
||||
]
|
||||
},
|
||||
"execution_count": 2,
|
||||
"execution_count": 4,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
@@ -90,7 +90,7 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 3,
|
||||
"execution_count": 5,
|
||||
"id": "f4d87ae9-0ccb-48eb-b31d-bb8f2370e47e",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
@@ -108,7 +108,7 @@
|
||||
"long: [[-122.7,-74.1]]"
|
||||
]
|
||||
},
|
||||
"execution_count": 3,
|
||||
"execution_count": 5,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
@@ -135,10 +135,17 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 8,
|
||||
"execution_count": 6,
|
||||
"id": "25f34bcf-fca0-4431-8601-eac95d1bd347",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stderr",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"[2024-01-31T18:59:33Z WARN lance::dataset] No existing dataset at /Users/qian/Work/LanceDB/lancedb/docs/src/notebooks/.lancedb/table3.lance, it will be created\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
@@ -148,7 +155,7 @@
|
||||
"long: float"
|
||||
]
|
||||
},
|
||||
"execution_count": 8,
|
||||
"execution_count": 6,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
@@ -171,45 +178,51 @@
|
||||
"id": "4df51925-7ca2-4005-9c72-38b3d26240c6",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### From PyArrow Tables\n",
|
||||
"### From an Arrow Table\n",
|
||||
"\n",
|
||||
"You can also create LanceDB tables directly from pyarrow tables"
|
||||
"You can also create LanceDB tables directly from pyarrow tables. LanceDB supports float16 type."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 12,
|
||||
"execution_count": 7,
|
||||
"id": "90a880f6-be43-4c9d-ba65-0b05197c0f6f",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"vector: fixed_size_list<item: float>[2]\n",
|
||||
" child 0, item: float\n",
|
||||
"item: string\n",
|
||||
"price: double"
|
||||
"vector: fixed_size_list<item: halffloat>[16]\n",
|
||||
" child 0, item: halffloat\n",
|
||||
"text: string"
|
||||
]
|
||||
},
|
||||
"execution_count": 12,
|
||||
"execution_count": 7,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"table = pa.Table.from_arrays(\n",
|
||||
" [\n",
|
||||
" pa.array([[3.1, 4.1], [5.9, 26.5]],\n",
|
||||
" pa.list_(pa.float32(), 2)),\n",
|
||||
" pa.array([\"foo\", \"bar\"]),\n",
|
||||
" pa.array([10.0, 20.0]),\n",
|
||||
" ],\n",
|
||||
" [\"vector\", \"item\", \"price\"],\n",
|
||||
" )\n",
|
||||
"import numpy as np\n",
|
||||
"\n",
|
||||
"db = lancedb.connect(\"db\")\n",
|
||||
"dim = 16\n",
|
||||
"total = 2\n",
|
||||
"schema = pa.schema(\n",
|
||||
" [\n",
|
||||
" pa.field(\"vector\", pa.list_(pa.float16(), dim)),\n",
|
||||
" pa.field(\"text\", pa.string())\n",
|
||||
" ]\n",
|
||||
")\n",
|
||||
"data = pa.Table.from_arrays(\n",
|
||||
" [\n",
|
||||
" pa.array([np.random.randn(dim).astype(np.float16) for _ in range(total)],\n",
|
||||
" pa.list_(pa.float16(), dim)),\n",
|
||||
" pa.array([\"foo\", \"bar\"])\n",
|
||||
" ],\n",
|
||||
" [\"vector\", \"text\"],\n",
|
||||
")\n",
|
||||
"\n",
|
||||
"tbl = db.create_table(\"test1\", table, mode=\"overwrite\")\n",
|
||||
"tbl = db.create_table(\"f16_tbl\", data, schema=schema)\n",
|
||||
"tbl.schema"
|
||||
]
|
||||
},
|
||||
@@ -225,7 +238,7 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 13,
|
||||
"execution_count": 8,
|
||||
"id": "d81121d7-e4b7-447c-a48c-974b6ebb464a",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
@@ -240,7 +253,7 @@
|
||||
"imdb_id: int64 not null"
|
||||
]
|
||||
},
|
||||
"execution_count": 13,
|
||||
"execution_count": 8,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
@@ -282,7 +295,7 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 14,
|
||||
"execution_count": 9,
|
||||
"id": "bc247142-4e3c-41a2-b94c-8e00d2c2a508",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
@@ -292,7 +305,7 @@
|
||||
"LanceTable(table4)"
|
||||
]
|
||||
},
|
||||
"execution_count": 14,
|
||||
"execution_count": 9,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
@@ -333,7 +346,7 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 16,
|
||||
"execution_count": 10,
|
||||
"id": "25ad3523-e0c9-4c28-b3df-38189c4e0e5f",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
@@ -346,7 +359,7 @@
|
||||
"price: double not null"
|
||||
]
|
||||
},
|
||||
"execution_count": 16,
|
||||
"execution_count": 10,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
@@ -385,7 +398,7 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 17,
|
||||
"execution_count": 11,
|
||||
"id": "2814173a-eacc-4dd8-a64d-6312b44582cc",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
@@ -411,7 +424,7 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 18,
|
||||
"execution_count": 12,
|
||||
"id": "df9e13c0-41f6-437f-9dfa-2fd71d3d9c45",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
@@ -421,7 +434,7 @@
|
||||
"['table6', 'table4', 'table5', 'movielens_small']"
|
||||
]
|
||||
},
|
||||
"execution_count": 18,
|
||||
"execution_count": 12,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
@@ -432,7 +445,7 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 20,
|
||||
"execution_count": 13,
|
||||
"id": "9343f5ad-6024-42ee-ac2f-6c1471df8679",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
@@ -541,7 +554,7 @@
|
||||
"9 [5.9, 26.5] bar 20.0"
|
||||
]
|
||||
},
|
||||
"execution_count": 20,
|
||||
"execution_count": 13,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
@@ -564,7 +577,7 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 21,
|
||||
"execution_count": 14,
|
||||
"id": "8a56250f-73a1-4c26-a6ad-5c7a0ce3a9ab",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
@@ -590,7 +603,7 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 22,
|
||||
"execution_count": 15,
|
||||
"id": "030c7057-b98e-4e2f-be14-b8c1f927f83c",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
@@ -621,7 +634,7 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 24,
|
||||
"execution_count": 16,
|
||||
"id": "e7a17de2-08d2-41b7-bd05-f63d1045ab1f",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
@@ -629,16 +642,16 @@
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"32\n"
|
||||
"22\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"text/plain": [
|
||||
"17"
|
||||
"12"
|
||||
]
|
||||
},
|
||||
"execution_count": 24,
|
||||
"execution_count": 16,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
@@ -661,7 +674,7 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 30,
|
||||
"execution_count": 17,
|
||||
"id": "fe3310bd-08f4-4a22-a63b-b3127d22f9f7",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
@@ -681,25 +694,20 @@
|
||||
"8 [3.1, 4.1] foo 10.0\n",
|
||||
"9 [3.1, 4.1] foo 10.0\n",
|
||||
"10 [3.1, 4.1] foo 10.0\n",
|
||||
"11 [3.1, 4.1] foo 10.0\n",
|
||||
"12 [3.1, 4.1] foo 10.0\n",
|
||||
"13 [3.1, 4.1] foo 10.0\n",
|
||||
"14 [3.1, 4.1] foo 10.0\n",
|
||||
"15 [3.1, 4.1] foo 10.0\n",
|
||||
"16 [3.1, 4.1] foo 10.0\n"
|
||||
"11 [3.1, 4.1] foo 10.0\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"ename": "OSError",
|
||||
"evalue": "LanceError(IO): Error during planning: column foo does not exist",
|
||||
"evalue": "LanceError(IO): Error during planning: column foo does not exist, /Users/runner/work/lance/lance/rust/lance-core/src/error.rs:212:23",
|
||||
"output_type": "error",
|
||||
"traceback": [
|
||||
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
|
||||
"\u001b[0;31mOSError\u001b[0m Traceback (most recent call last)",
|
||||
"Cell \u001b[0;32mIn[30], line 4\u001b[0m\n\u001b[1;32m 2\u001b[0m to_remove \u001b[38;5;241m=\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m, \u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;241m.\u001b[39mjoin(\u001b[38;5;28mstr\u001b[39m(v) \u001b[38;5;28;01mfor\u001b[39;00m v \u001b[38;5;129;01min\u001b[39;00m to_remove)\n\u001b[1;32m 3\u001b[0m \u001b[38;5;28mprint\u001b[39m(tbl\u001b[38;5;241m.\u001b[39mto_pandas())\n\u001b[0;32m----> 4\u001b[0m \u001b[43mtbl\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdelete\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43mf\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mitem IN (\u001b[39;49m\u001b[38;5;132;43;01m{\u001b[39;49;00m\u001b[43mto_remove\u001b[49m\u001b[38;5;132;43;01m}\u001b[39;49;00m\u001b[38;5;124;43m)\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\n\u001b[1;32m 5\u001b[0m tbl\u001b[38;5;241m.\u001b[39mto_pandas()\n",
|
||||
"File \u001b[0;32m~/Documents/lancedb/lancedb/python/lancedb/table.py:610\u001b[0m, in \u001b[0;36mLanceTable.delete\u001b[0;34m(self, where)\u001b[0m\n\u001b[1;32m 609\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mdelete\u001b[39m(\u001b[38;5;28mself\u001b[39m, where: \u001b[38;5;28mstr\u001b[39m):\n\u001b[0;32m--> 610\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_dataset\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdelete\u001b[49m\u001b[43m(\u001b[49m\u001b[43mwhere\u001b[49m\u001b[43m)\u001b[49m\n",
|
||||
"File \u001b[0;32m~/Documents/lancedb/lancedb/env/lib/python3.11/site-packages/lance/dataset.py:489\u001b[0m, in \u001b[0;36mLanceDataset.delete\u001b[0;34m(self, predicate)\u001b[0m\n\u001b[1;32m 487\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(predicate, pa\u001b[38;5;241m.\u001b[39mcompute\u001b[38;5;241m.\u001b[39mExpression):\n\u001b[1;32m 488\u001b[0m predicate \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mstr\u001b[39m(predicate)\n\u001b[0;32m--> 489\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_ds\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdelete\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpredicate\u001b[49m\u001b[43m)\u001b[49m\n",
|
||||
"\u001b[0;31mOSError\u001b[0m: LanceError(IO): Error during planning: column foo does not exist"
|
||||
"Cell \u001b[0;32mIn[17], line 4\u001b[0m\n\u001b[1;32m 2\u001b[0m to_remove \u001b[38;5;241m=\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m, \u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;241m.\u001b[39mjoin(\u001b[38;5;28mstr\u001b[39m(v) \u001b[38;5;28;01mfor\u001b[39;00m v \u001b[38;5;129;01min\u001b[39;00m to_remove)\n\u001b[1;32m 3\u001b[0m \u001b[38;5;28mprint\u001b[39m(tbl\u001b[38;5;241m.\u001b[39mto_pandas())\n\u001b[0;32m----> 4\u001b[0m \u001b[43mtbl\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdelete\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43mf\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mitem IN (\u001b[39;49m\u001b[38;5;132;43;01m{\u001b[39;49;00m\u001b[43mto_remove\u001b[49m\u001b[38;5;132;43;01m}\u001b[39;49;00m\u001b[38;5;124;43m)\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\n",
|
||||
"File \u001b[0;32m~/Work/LanceDB/lancedb/docs/doc-venv/lib/python3.11/site-packages/lancedb/table.py:872\u001b[0m, in \u001b[0;36mLanceTable.delete\u001b[0;34m(self, where)\u001b[0m\n\u001b[1;32m 871\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mdelete\u001b[39m(\u001b[38;5;28mself\u001b[39m, where: \u001b[38;5;28mstr\u001b[39m):\n\u001b[0;32m--> 872\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_dataset\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdelete\u001b[49m\u001b[43m(\u001b[49m\u001b[43mwhere\u001b[49m\u001b[43m)\u001b[49m\n",
|
||||
"File \u001b[0;32m~/Work/LanceDB/lancedb/docs/doc-venv/lib/python3.11/site-packages/lance/dataset.py:596\u001b[0m, in \u001b[0;36mLanceDataset.delete\u001b[0;34m(self, predicate)\u001b[0m\n\u001b[1;32m 594\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(predicate, pa\u001b[38;5;241m.\u001b[39mcompute\u001b[38;5;241m.\u001b[39mExpression):\n\u001b[1;32m 595\u001b[0m predicate \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mstr\u001b[39m(predicate)\n\u001b[0;32m--> 596\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_ds\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdelete\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpredicate\u001b[49m\u001b[43m)\u001b[49m\n",
|
||||
"\u001b[0;31mOSError\u001b[0m: LanceError(IO): Error during planning: column foo does not exist, /Users/runner/work/lance/lance/rust/lance-core/src/error.rs:212:23"
|
||||
]
|
||||
}
|
||||
],
|
||||
@@ -712,7 +720,7 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 43,
|
||||
"execution_count": null,
|
||||
"id": "87d5bc21-847f-4c81-b56e-f6dbe5d05aac",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
@@ -729,7 +737,7 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 44,
|
||||
"execution_count": null,
|
||||
"id": "9cba4519-eb3a-4941-ab7e-873d762e750f",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
@@ -742,7 +750,7 @@
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 46,
|
||||
"execution_count": null,
|
||||
"id": "5bdc9801-d5ed-4871-92d0-88b27108e788",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
@@ -817,7 +825,7 @@
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.11.4"
|
||||
"version": "3.11.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
|
||||
@@ -58,6 +58,8 @@ pip install lancedb
|
||||
|
||||
::: lancedb.schema.vector
|
||||
|
||||
::: lancedb.merge.LanceMergeInsertBuilder
|
||||
|
||||
## Integrations
|
||||
|
||||
### Pydantic
|
||||
|
||||
44
node/package-lock.json
generated
44
node/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "vectordb",
|
||||
"version": "0.4.6",
|
||||
"version": "0.4.7",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "vectordb",
|
||||
"version": "0.4.6",
|
||||
"version": "0.4.7",
|
||||
"cpu": [
|
||||
"x64",
|
||||
"arm64"
|
||||
@@ -53,11 +53,11 @@
|
||||
"uuid": "^9.0.0"
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"@lancedb/vectordb-darwin-arm64": "0.4.6",
|
||||
"@lancedb/vectordb-darwin-x64": "0.4.6",
|
||||
"@lancedb/vectordb-linux-arm64-gnu": "0.4.6",
|
||||
"@lancedb/vectordb-linux-x64-gnu": "0.4.6",
|
||||
"@lancedb/vectordb-win32-x64-msvc": "0.4.6"
|
||||
"@lancedb/vectordb-darwin-arm64": "0.4.7",
|
||||
"@lancedb/vectordb-darwin-x64": "0.4.7",
|
||||
"@lancedb/vectordb-linux-arm64-gnu": "0.4.7",
|
||||
"@lancedb/vectordb-linux-x64-gnu": "0.4.7",
|
||||
"@lancedb/vectordb-win32-x64-msvc": "0.4.7"
|
||||
}
|
||||
},
|
||||
"node_modules/@75lb/deep-merge": {
|
||||
@@ -329,9 +329,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-darwin-arm64": {
|
||||
"version": "0.4.6",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.4.6.tgz",
|
||||
"integrity": "sha512-p6w/BXBxgFHR87phxvfBPPbvz4wDGmG2guRSQPEriwrc8h/gQ3wuexHhyzi7SWcV2E25vyUO9QcFL3vYKhIJRg==",
|
||||
"version": "0.4.7",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.4.7.tgz",
|
||||
"integrity": "sha512-kACOIytgjBfX8NRwjPKe311XRN3lbSN13B7avT5htMd3kYm3AnnMag9tZhlwoO7lIuvGaXhy7mApygJrjhfJ4g==",
|
||||
"cpu": [
|
||||
"arm64"
|
||||
],
|
||||
@@ -341,9 +341,9 @@
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-darwin-x64": {
|
||||
"version": "0.4.6",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.4.6.tgz",
|
||||
"integrity": "sha512-7Fmg63Ky783ROpaQEL6I1uTrO//YDi4MgG0pjWAkDKsdHQ8QisFF8kd+JvjPh4PhMScC/rtB0SXpY/Y4zZvLfw==",
|
||||
"version": "0.4.7",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.4.7.tgz",
|
||||
"integrity": "sha512-vb74iK5uPWCwz5E60r3yWp/R/HSg54/Z9AZWYckYXqsPv4w/nfbkM5iZhfRqqR/9uE6JClWJKOtjbk7b8CFRFg==",
|
||||
"cpu": [
|
||||
"x64"
|
||||
],
|
||||
@@ -353,9 +353,9 @@
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-linux-arm64-gnu": {
|
||||
"version": "0.4.6",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.4.6.tgz",
|
||||
"integrity": "sha512-2wM+BKnjtZyKhiQPvldpfORH2JdKy6AuLFJ7AQtuyly57mkvgZRJeqK0DsRi/hyyZPRUOvWaDp/LfAxZvhLWgA==",
|
||||
"version": "0.4.7",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.4.7.tgz",
|
||||
"integrity": "sha512-jHp7THm6S9sB8RaCxGoZXLAwGAUHnawUUilB1K3mvQsRdfB2bBs0f7wDehW+PDhr+Iog4LshaWbcnoQEUJWR+Q==",
|
||||
"cpu": [
|
||||
"arm64"
|
||||
],
|
||||
@@ -365,9 +365,9 @@
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-linux-x64-gnu": {
|
||||
"version": "0.4.6",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.4.6.tgz",
|
||||
"integrity": "sha512-1BK9i3DnnFHyBVLxOfsIW2i800o9exDEHm5onikvfoa5Ot5tXwIwAw86+0HGsBm5YbJnKKxZmbAM6Pr9qfMKiQ==",
|
||||
"version": "0.4.7",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.4.7.tgz",
|
||||
"integrity": "sha512-LKbVe6Wrp/AGqCCjKliNDmYoeTNgY/wfb2DTLjrx41Jko/04ywLrJ6xSEAn3XD5RDCO5u3fyUdXHHHv5a3VAAQ==",
|
||||
"cpu": [
|
||||
"x64"
|
||||
],
|
||||
@@ -377,9 +377,9 @@
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-win32-x64-msvc": {
|
||||
"version": "0.4.6",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.4.6.tgz",
|
||||
"integrity": "sha512-Fh/fw+HRf/LDZKCDQTvpWoacFfmLXGwQpcqxxlwIZ0vy45eCNYvnZrpjQBjej0uh3tEVC6OHh6Jhn7Pr9k8r2w==",
|
||||
"version": "0.4.7",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.4.7.tgz",
|
||||
"integrity": "sha512-C5ln4+wafeY1Sm4PeV0Ios9lUaQVVip5Mjl9XU7ngioSEMEuXI/XMVfIdVfDPppVNXPeQxg33wLA272uw88D1Q==",
|
||||
"cpu": [
|
||||
"x64"
|
||||
],
|
||||
|
||||
@@ -17,7 +17,11 @@
|
||||
},
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "https://github.com/lancedb/lancedb/node"
|
||||
"url": "https://github.com/lancedb/lancedb.git"
|
||||
},
|
||||
"homepage": "https://lancedb.github.io/lancedb/",
|
||||
"bugs": {
|
||||
"url": "https://github.com/lancedb/lancedb/issues"
|
||||
},
|
||||
"keywords": [
|
||||
"data-format",
|
||||
|
||||
@@ -37,6 +37,7 @@ const {
|
||||
tableCountRows,
|
||||
tableDelete,
|
||||
tableUpdate,
|
||||
tableMergeInsert,
|
||||
tableCleanupOldVersions,
|
||||
tableCompactFiles,
|
||||
tableListIndices,
|
||||
@@ -440,6 +441,38 @@ export interface Table<T = number[]> {
|
||||
*/
|
||||
update: (args: UpdateArgs | UpdateSqlArgs) => Promise<void>
|
||||
|
||||
/**
|
||||
* Runs a "merge insert" operation on the table
|
||||
*
|
||||
* This operation can add rows, update rows, and remove rows all in a single
|
||||
* transaction. It is a very generic tool that can be used to create
|
||||
* behaviors like "insert if not exists", "update or insert (i.e. upsert)",
|
||||
* or even replace a portion of existing data with new data (e.g. replace
|
||||
* all data where month="january")
|
||||
*
|
||||
* The merge insert operation works by combining new data from a
|
||||
* **source table** with existing data in a **target table** by using a
|
||||
* join. There are three categories of records.
|
||||
*
|
||||
* "Matched" records are records that exist in both the source table and
|
||||
* the target table. "Not matched" records exist only in the source table
|
||||
* (e.g. these are new data) "Not matched by source" records exist only
|
||||
* in the target table (this is old data)
|
||||
*
|
||||
* The MergeInsertArgs can be used to customize what should happen for
|
||||
* each category of data.
|
||||
*
|
||||
* Please note that the data may appear to be reordered as part of this
|
||||
* operation. This is because updated rows will be deleted from the
|
||||
* dataset and then reinserted at the end with the new values.
|
||||
*
|
||||
* @param on a column to join on. This is how records from the source
|
||||
* table and target table are matched.
|
||||
* @param data the new data to insert
|
||||
* @param args parameters controlling how the operation should behave
|
||||
*/
|
||||
mergeInsert: (on: string, data: Array<Record<string, unknown>> | ArrowTable, args: MergeInsertArgs) => Promise<void>
|
||||
|
||||
/**
|
||||
* List the indicies on this table.
|
||||
*/
|
||||
@@ -483,6 +516,36 @@ export interface UpdateSqlArgs {
|
||||
valuesSql: Record<string, string>
|
||||
}
|
||||
|
||||
export interface MergeInsertArgs {
|
||||
/**
|
||||
* If true then rows that exist in both the source table (new data) and
|
||||
* the target table (old data) will be updated, replacing the old row
|
||||
* with the corresponding matching row.
|
||||
*
|
||||
* If there are multiple matches then the behavior is undefined.
|
||||
* Currently this causes multiple copies of the row to be created
|
||||
* but that behavior is subject to change.
|
||||
*/
|
||||
whenMatchedUpdateAll?: boolean
|
||||
/**
|
||||
* If true then rows that exist only in the source table (new data)
|
||||
* will be inserted into the target table.
|
||||
*/
|
||||
whenNotMatchedInsertAll?: boolean
|
||||
/**
|
||||
* If true then rows that exist only in the target table (old data)
|
||||
* will be deleted.
|
||||
*
|
||||
* If this is a string then it will be treated as an SQL filter and
|
||||
* only rows that both do not match any row in the source table and
|
||||
* match the given filter will be deleted.
|
||||
*
|
||||
* This can be used to replace a selection of existing data with
|
||||
* new data.
|
||||
*/
|
||||
whenNotMatchedBySourceDelete?: string | boolean
|
||||
}
|
||||
|
||||
export interface VectorIndex {
|
||||
columns: string[]
|
||||
name: string
|
||||
@@ -821,6 +884,38 @@ export class LocalTable<T = number[]> implements Table<T> {
|
||||
})
|
||||
}
|
||||
|
||||
async mergeInsert (on: string, data: Array<Record<string, unknown>> | ArrowTable, args: MergeInsertArgs): Promise<void> {
|
||||
const whenMatchedUpdateAll = args.whenMatchedUpdateAll ?? false
|
||||
const whenNotMatchedInsertAll = args.whenNotMatchedInsertAll ?? false
|
||||
let whenNotMatchedBySourceDelete = false
|
||||
let whenNotMatchedBySourceDeleteFilt = null
|
||||
if (args.whenNotMatchedBySourceDelete !== undefined && args.whenNotMatchedBySourceDelete !== null) {
|
||||
whenNotMatchedBySourceDelete = true
|
||||
if (args.whenNotMatchedBySourceDelete !== true) {
|
||||
whenNotMatchedBySourceDeleteFilt = args.whenNotMatchedBySourceDelete
|
||||
}
|
||||
}
|
||||
|
||||
const schema = await this.schema
|
||||
let tbl: ArrowTable
|
||||
if (data instanceof ArrowTable) {
|
||||
tbl = data
|
||||
} else {
|
||||
tbl = makeArrowTable(data, { schema })
|
||||
}
|
||||
const buffer = await fromTableToBuffer(tbl, this._embeddings, schema)
|
||||
|
||||
this._tbl = await tableMergeInsert.call(
|
||||
this._tbl,
|
||||
on,
|
||||
whenMatchedUpdateAll,
|
||||
whenNotMatchedInsertAll,
|
||||
whenNotMatchedBySourceDelete,
|
||||
whenNotMatchedBySourceDeleteFilt,
|
||||
buffer
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up old versions of the table, freeing disk space.
|
||||
*
|
||||
|
||||
@@ -24,7 +24,8 @@ import {
|
||||
type IndexStats,
|
||||
type UpdateArgs,
|
||||
type UpdateSqlArgs,
|
||||
makeArrowTable
|
||||
makeArrowTable,
|
||||
type MergeInsertArgs
|
||||
} from '../index'
|
||||
import { Query } from '../query'
|
||||
|
||||
@@ -274,6 +275,52 @@ export class RemoteTable<T = number[]> implements Table<T> {
|
||||
throw new Error('Not implemented')
|
||||
}
|
||||
|
||||
async mergeInsert (on: string, data: Array<Record<string, unknown>> | ArrowTable, args: MergeInsertArgs): Promise<void> {
|
||||
let tbl: ArrowTable
|
||||
if (data instanceof ArrowTable) {
|
||||
tbl = data
|
||||
} else {
|
||||
tbl = makeArrowTable(data, await this.schema)
|
||||
}
|
||||
|
||||
const queryParams: any = {
|
||||
on
|
||||
}
|
||||
if (args.whenMatchedUpdateAll ?? false) {
|
||||
queryParams.when_matched_update_all = 'true'
|
||||
} else {
|
||||
queryParams.when_matched_update_all = 'false'
|
||||
}
|
||||
if (args.whenNotMatchedInsertAll ?? false) {
|
||||
queryParams.when_not_matched_insert_all = 'true'
|
||||
} else {
|
||||
queryParams.when_not_matched_insert_all = 'false'
|
||||
}
|
||||
if (args.whenNotMatchedBySourceDelete !== false && args.whenNotMatchedBySourceDelete !== null && args.whenNotMatchedBySourceDelete !== undefined) {
|
||||
queryParams.when_not_matched_by_source_delete = 'true'
|
||||
if (typeof args.whenNotMatchedBySourceDelete === 'string') {
|
||||
queryParams.when_not_matched_by_source_delete_filt = args.whenNotMatchedBySourceDelete
|
||||
}
|
||||
} else {
|
||||
queryParams.when_not_matched_by_source_delete = 'false'
|
||||
}
|
||||
|
||||
const buffer = await fromTableToStreamBuffer(tbl, this._embeddings)
|
||||
const res = await this._client.post(
|
||||
`/v1/table/${this._name}/merge_insert/`,
|
||||
buffer,
|
||||
queryParams,
|
||||
'application/vnd.apache.arrow.stream'
|
||||
)
|
||||
if (res.status !== 200) {
|
||||
throw new Error(
|
||||
`Server Error, status: ${res.status}, ` +
|
||||
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
|
||||
`message: ${res.statusText}: ${res.data}`
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
async add (data: Array<Record<string, unknown>> | ArrowTable): Promise<number> {
|
||||
let tbl: ArrowTable
|
||||
if (data instanceof ArrowTable) {
|
||||
|
||||
@@ -531,6 +531,44 @@ describe('LanceDB client', function () {
|
||||
assert.equal(await table.countRows(), 2)
|
||||
})
|
||||
|
||||
it('can merge insert records into the table', async function () {
|
||||
const dir = await track().mkdir('lancejs')
|
||||
const con = await lancedb.connect(dir)
|
||||
|
||||
const data = [{ id: 1, age: 1 }, { id: 2, age: 1 }]
|
||||
const table = await con.createTable('my_table', data)
|
||||
|
||||
let newData = [{ id: 2, age: 2 }, { id: 3, age: 2 }]
|
||||
await table.mergeInsert('id', newData, {
|
||||
whenNotMatchedInsertAll: true
|
||||
})
|
||||
assert.equal(await table.countRows(), 3)
|
||||
assert.equal((await table.filter('age = 2').execute()).length, 1)
|
||||
|
||||
newData = [{ id: 3, age: 3 }, { id: 4, age: 3 }]
|
||||
await table.mergeInsert('id', newData, {
|
||||
whenNotMatchedInsertAll: true,
|
||||
whenMatchedUpdateAll: true
|
||||
})
|
||||
assert.equal(await table.countRows(), 4)
|
||||
assert.equal((await table.filter('age = 3').execute()).length, 2)
|
||||
|
||||
newData = [{ id: 5, age: 4 }]
|
||||
await table.mergeInsert('id', newData, {
|
||||
whenNotMatchedInsertAll: true,
|
||||
whenMatchedUpdateAll: true,
|
||||
whenNotMatchedBySourceDelete: 'age < 3'
|
||||
})
|
||||
assert.equal(await table.countRows(), 3)
|
||||
|
||||
await table.mergeInsert('id', newData, {
|
||||
whenNotMatchedInsertAll: true,
|
||||
whenMatchedUpdateAll: true,
|
||||
whenNotMatchedBySourceDelete: true
|
||||
})
|
||||
assert.equal(await table.countRows(), 1)
|
||||
})
|
||||
|
||||
it('can update records in the table', async function () {
|
||||
const uri = await createTestDB()
|
||||
const con = await lancedb.connect(uri)
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[bumpversion]
|
||||
current_version = 0.5.1
|
||||
current_version = 0.5.2
|
||||
commit = True
|
||||
message = [python] Bump version: {current_version} → {new_version}
|
||||
tag = True
|
||||
|
||||
103
python/lancedb/merge.py
Normal file
103
python/lancedb/merge.py
Normal file
@@ -0,0 +1,103 @@
|
||||
# Copyright 2023 LanceDB Developers
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, List, Optional
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .common import DATA
|
||||
|
||||
|
||||
class LanceMergeInsertBuilder(object):
|
||||
"""Builder for a LanceDB merge insert operation
|
||||
|
||||
See [`merge_insert`][lancedb.table.Table.merge_insert] for
|
||||
more context
|
||||
"""
|
||||
|
||||
def __init__(self, table: "Table", on: List[str]): # noqa: F821
|
||||
# Do not put a docstring here. This method should be hidden
|
||||
# from API docs. Users should use merge_insert to create
|
||||
# this object.
|
||||
self._table = table
|
||||
self._on = on
|
||||
self._when_matched_update_all = False
|
||||
self._when_not_matched_insert_all = False
|
||||
self._when_not_matched_by_source_delete = False
|
||||
self._when_not_matched_by_source_condition = None
|
||||
|
||||
def when_matched_update_all(self) -> LanceMergeInsertBuilder:
|
||||
"""
|
||||
Rows that exist in both the source table (new data) and
|
||||
the target table (old data) will be updated, replacing
|
||||
the old row with the corresponding matching row.
|
||||
|
||||
If there are multiple matches then the behavior is undefined.
|
||||
Currently this causes multiple copies of the row to be created
|
||||
but that behavior is subject to change.
|
||||
"""
|
||||
self._when_matched_update_all = True
|
||||
return self
|
||||
|
||||
def when_not_matched_insert_all(self) -> LanceMergeInsertBuilder:
|
||||
"""
|
||||
Rows that exist only in the source table (new data) should
|
||||
be inserted into the target table.
|
||||
"""
|
||||
self._when_not_matched_insert_all = True
|
||||
return self
|
||||
|
||||
def when_not_matched_by_source_delete(
|
||||
self, condition: Optional[str] = None
|
||||
) -> LanceMergeInsertBuilder:
|
||||
"""
|
||||
Rows that exist only in the target table (old data) will be
|
||||
deleted. An optional condition can be provided to limit what
|
||||
data is deleted.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
condition: Optional[str], default None
|
||||
If None then all such rows will be deleted. Otherwise the
|
||||
condition will be used as an SQL filter to limit what rows
|
||||
are deleted.
|
||||
"""
|
||||
self._when_not_matched_by_source_delete = True
|
||||
if condition is not None:
|
||||
self._when_not_matched_by_source_condition = condition
|
||||
return self
|
||||
|
||||
def execute(
|
||||
self,
|
||||
new_data: DATA,
|
||||
on_bad_vectors: str = "error",
|
||||
fill_value: float = 0.0,
|
||||
):
|
||||
"""
|
||||
Executes the merge insert operation
|
||||
|
||||
Nothing is returned but the [`Table`][lancedb.table.Table] is updated
|
||||
|
||||
Parameters
|
||||
----------
|
||||
new_data: DATA
|
||||
New records which will be matched against the existing records
|
||||
to potentially insert or update into the table. This parameter
|
||||
can be anything you use for [`add`][lancedb.table.Table.add]
|
||||
on_bad_vectors: str, default "error"
|
||||
What to do if any of the vectors are not the same size or contains NaNs.
|
||||
One of "error", "drop", "fill".
|
||||
fill_value: float, default 0.
|
||||
The value to use when filling vectors. Only used if on_bad_vectors="fill".
|
||||
"""
|
||||
self._table._do_merge(self, new_data, on_bad_vectors, fill_value)
|
||||
@@ -13,6 +13,8 @@
|
||||
|
||||
|
||||
import functools
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Callable, Dict, List, Optional, Union
|
||||
from urllib.parse import urljoin
|
||||
|
||||
@@ -20,6 +22,8 @@ import attrs
|
||||
import pyarrow as pa
|
||||
import requests
|
||||
from pydantic import BaseModel
|
||||
from requests.adapters import HTTPAdapter
|
||||
from urllib3 import Retry
|
||||
|
||||
from lancedb.common import Credential
|
||||
from lancedb.remote import VectorQuery, VectorQueryResult
|
||||
@@ -57,6 +61,10 @@ class RestfulLanceDBClient:
|
||||
@functools.cached_property
|
||||
def session(self) -> requests.Session:
|
||||
sess = requests.Session()
|
||||
|
||||
retry_adapter_instance = retry_adapter(retry_adapter_options())
|
||||
sess.mount(urljoin(self.url, "/v1/table/"), retry_adapter_instance)
|
||||
|
||||
adapter_class = LanceDBClientHTTPAdapterFactory()
|
||||
sess.mount("https://", adapter_class())
|
||||
return sess
|
||||
@@ -170,3 +178,72 @@ class RestfulLanceDBClient:
|
||||
"""Query a table."""
|
||||
tbl = self.post(f"/v1/table/{table_name}/query/", query, deserialize=_read_ipc)
|
||||
return VectorQueryResult(tbl)
|
||||
|
||||
def mount_retry_adapter_for_table(self, table_name: str) -> None:
|
||||
"""
|
||||
Adds an http adapter to session that will retry retryable requests to the table.
|
||||
"""
|
||||
retry_options = retry_adapter_options(methods=["GET", "POST"])
|
||||
retry_adapter_instance = retry_adapter(retry_options)
|
||||
session = self.session
|
||||
|
||||
session.mount(
|
||||
urljoin(self.url, f"/v1/table/{table_name}/query/"), retry_adapter_instance
|
||||
)
|
||||
session.mount(
|
||||
urljoin(self.url, f"/v1/table/{table_name}/describe/"),
|
||||
retry_adapter_instance,
|
||||
)
|
||||
session.mount(
|
||||
urljoin(self.url, f"/v1/table/{table_name}/index/list/"),
|
||||
retry_adapter_instance,
|
||||
)
|
||||
|
||||
|
||||
def retry_adapter_options(methods=["GET"]) -> Dict[str, Any]:
|
||||
return {
|
||||
"retries": int(os.environ.get("LANCE_CLIENT_MAX_RETRIES", "3")),
|
||||
"connect_retries": int(os.environ.get("LANCE_CLIENT_CONNECT_RETRIES", "3")),
|
||||
"read_retries": int(os.environ.get("LANCE_CLIENT_READ_RETRIES", "3")),
|
||||
"backoff_factor": float(
|
||||
os.environ.get("LANCE_CLIENT_RETRY_BACKOFF_FACTOR", "0.25")
|
||||
),
|
||||
"backoff_jitter": float(
|
||||
os.environ.get("LANCE_CLIENT_RETRY_BACKOFF_JITTER", "0.25")
|
||||
),
|
||||
"statuses": [
|
||||
int(i.strip())
|
||||
for i in os.environ.get(
|
||||
"LANCE_CLIENT_RETRY_STATUSES", "429, 500, 502, 503"
|
||||
).split(",")
|
||||
],
|
||||
"methods": methods,
|
||||
}
|
||||
|
||||
|
||||
def retry_adapter(options: Dict[str, Any]) -> HTTPAdapter:
|
||||
total_retries = options["retries"]
|
||||
connect_retries = options["connect_retries"]
|
||||
read_retries = options["read_retries"]
|
||||
backoff_factor = options["backoff_factor"]
|
||||
backoff_jitter = options["backoff_jitter"]
|
||||
statuses = options["statuses"]
|
||||
methods = frozenset(options["methods"])
|
||||
logging.debug(
|
||||
f"Setting up retry adapter with {total_retries} retries," # noqa G003
|
||||
+ f"connect retries {connect_retries}, read retries {read_retries},"
|
||||
+ f"backoff factor {backoff_factor}, statuses {statuses}, "
|
||||
+ f"methods {methods}"
|
||||
)
|
||||
|
||||
return HTTPAdapter(
|
||||
max_retries=Retry(
|
||||
total=total_retries,
|
||||
connect=connect_retries,
|
||||
read=read_retries,
|
||||
backoff_factor=backoff_factor,
|
||||
backoff_jitter=backoff_jitter,
|
||||
status_forcelist=statuses,
|
||||
allowed_methods=methods,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -95,6 +95,8 @@ class RemoteDBConnection(DBConnection):
|
||||
"""
|
||||
from .table import RemoteTable
|
||||
|
||||
self._client.mount_retry_adapter_for_table(name)
|
||||
|
||||
# check if table exists
|
||||
try:
|
||||
self._client.post(f"/v1/table/{name}/describe/")
|
||||
|
||||
@@ -19,6 +19,7 @@ import pyarrow as pa
|
||||
from lance import json_to_schema
|
||||
|
||||
from lancedb.common import DATA, VEC, VECTOR_COLUMN_NAME
|
||||
from lancedb.merge import LanceMergeInsertBuilder
|
||||
|
||||
from ..query import LanceVectorQueryBuilder
|
||||
from ..table import Query, Table, _sanitize_data
|
||||
@@ -244,6 +245,47 @@ class RemoteTable(Table):
|
||||
result = self._conn._client.query(self._name, query)
|
||||
return result.to_arrow()
|
||||
|
||||
def _do_merge(
|
||||
self,
|
||||
merge: LanceMergeInsertBuilder,
|
||||
new_data: DATA,
|
||||
on_bad_vectors: str,
|
||||
fill_value: float,
|
||||
):
|
||||
data = _sanitize_data(
|
||||
new_data,
|
||||
self.schema,
|
||||
metadata=None,
|
||||
on_bad_vectors=on_bad_vectors,
|
||||
fill_value=fill_value,
|
||||
)
|
||||
payload = to_ipc_binary(data)
|
||||
|
||||
params = {}
|
||||
if len(merge._on) != 1:
|
||||
raise ValueError(
|
||||
"RemoteTable only supports a single on key in merge_insert"
|
||||
)
|
||||
params["on"] = merge._on[0]
|
||||
params["when_matched_update_all"] = str(merge._when_matched_update_all).lower()
|
||||
params["when_not_matched_insert_all"] = str(
|
||||
merge._when_not_matched_insert_all
|
||||
).lower()
|
||||
params["when_not_matched_by_source_delete"] = str(
|
||||
merge._when_not_matched_by_source_delete
|
||||
).lower()
|
||||
if merge._when_not_matched_by_source_condition is not None:
|
||||
params[
|
||||
"when_not_matched_by_source_delete_filt"
|
||||
] = merge._when_not_matched_by_source_condition
|
||||
|
||||
self._conn._client.post(
|
||||
f"/v1/table/{self._name}/merge_insert/",
|
||||
data=payload,
|
||||
params=params,
|
||||
content_type=ARROW_STREAM_CONTENT_TYPE,
|
||||
)
|
||||
|
||||
def delete(self, predicate: str):
|
||||
"""Delete rows from the table.
|
||||
|
||||
@@ -355,6 +397,18 @@ class RemoteTable(Table):
|
||||
payload = {"predicate": where, "updates": updates}
|
||||
self._conn._client.post(f"/v1/table/{self._name}/update/", data=payload)
|
||||
|
||||
def cleanup_old_versions(self, *_):
|
||||
"""cleanup_old_versions() is not supported on the LanceDB cloud"""
|
||||
raise NotImplementedError(
|
||||
"cleanup_old_versions() is not supported on the LanceDB cloud"
|
||||
)
|
||||
|
||||
def compact_files(self, *_):
|
||||
"""compact_files() is not supported on the LanceDB cloud"""
|
||||
raise NotImplementedError(
|
||||
"compact_files() is not supported on the LanceDB cloud"
|
||||
)
|
||||
|
||||
|
||||
def add_index(tbl: pa.Table, i: int) -> pa.Table:
|
||||
return tbl.add_column(
|
||||
|
||||
@@ -28,6 +28,7 @@ from lance.vector import vec_to_table
|
||||
|
||||
from .common import DATA, VEC, VECTOR_COLUMN_NAME
|
||||
from .embeddings import EmbeddingFunctionConfig, EmbeddingFunctionRegistry
|
||||
from .merge import LanceMergeInsertBuilder
|
||||
from .pydantic import LanceModel, model_to_dict
|
||||
from .query import LanceQueryBuilder, Query
|
||||
from .util import (
|
||||
@@ -334,6 +335,66 @@ class Table(ABC):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def merge_insert(self, on: Union[str, Iterable[str]]) -> LanceMergeInsertBuilder:
|
||||
"""
|
||||
Returns a [`LanceMergeInsertBuilder`][lancedb.merge.LanceMergeInsertBuilder]
|
||||
that can be used to create a "merge insert" operation
|
||||
|
||||
This operation can add rows, update rows, and remove rows all in a single
|
||||
transaction. It is a very generic tool that can be used to create
|
||||
behaviors like "insert if not exists", "update or insert (i.e. upsert)",
|
||||
or even replace a portion of existing data with new data (e.g. replace
|
||||
all data where month="january")
|
||||
|
||||
The merge insert operation works by combining new data from a
|
||||
**source table** with existing data in a **target table** by using a
|
||||
join. There are three categories of records.
|
||||
|
||||
"Matched" records are records that exist in both the source table and
|
||||
the target table. "Not matched" records exist only in the source table
|
||||
(e.g. these are new data) "Not matched by source" records exist only
|
||||
in the target table (this is old data)
|
||||
|
||||
The builder returned by this method can be used to customize what
|
||||
should happen for each category of data.
|
||||
|
||||
Please note that the data may appear to be reordered as part of this
|
||||
operation. This is because updated rows will be deleted from the
|
||||
dataset and then reinserted at the end with the new values.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
|
||||
on: Union[str, Iterable[str]]
|
||||
A column (or columns) to join on. This is how records from the
|
||||
source table and target table are matched. Typically this is some
|
||||
kind of key or id column.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> import lancedb
|
||||
>>> data = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]})
|
||||
>>> db = lancedb.connect("./.lancedb")
|
||||
>>> table = db.create_table("my_table", data)
|
||||
>>> new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
|
||||
>>> # Perform a "upsert" operation
|
||||
>>> table.merge_insert("a") \\
|
||||
... .when_matched_update_all() \\
|
||||
... .when_not_matched_insert_all() \\
|
||||
... .execute(new_data)
|
||||
>>> # The order of new rows is non-deterministic since we use
|
||||
>>> # a hash-join as part of this operation and so we sort here
|
||||
>>> table.to_arrow().sort_by("a").to_pandas()
|
||||
a b
|
||||
0 1 b
|
||||
1 2 x
|
||||
2 3 y
|
||||
3 4 z
|
||||
"""
|
||||
on = [on] if isinstance(on, str) else list(on.iter())
|
||||
|
||||
return LanceMergeInsertBuilder(self, on)
|
||||
|
||||
@abstractmethod
|
||||
def search(
|
||||
self,
|
||||
@@ -379,6 +440,8 @@ class Table(ABC):
|
||||
the table
|
||||
vector_column_name: str
|
||||
The name of the vector column to search.
|
||||
|
||||
The vector column needs to be a pyarrow fixed size list type
|
||||
*default "vector"*
|
||||
query_type: str
|
||||
*default "auto"*.
|
||||
@@ -414,6 +477,16 @@ class Table(ABC):
|
||||
def _execute_query(self, query: Query) -> pa.Table:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def _do_merge(
|
||||
self,
|
||||
merge: LanceMergeInsertBuilder,
|
||||
new_data: DATA,
|
||||
on_bad_vectors: str,
|
||||
fill_value: float,
|
||||
):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def delete(self, where: str):
|
||||
"""Delete rows from the table.
|
||||
@@ -521,6 +594,52 @@ class Table(ABC):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def cleanup_old_versions(
|
||||
self,
|
||||
older_than: Optional[timedelta] = None,
|
||||
*,
|
||||
delete_unverified: bool = False,
|
||||
) -> CleanupStats:
|
||||
"""
|
||||
Clean up old versions of the table, freeing disk space.
|
||||
|
||||
Note: This function is not available in LanceDb Cloud (since LanceDb
|
||||
Cloud manages cleanup for you automatically)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
older_than: timedelta, default None
|
||||
The minimum age of the version to delete. If None, then this defaults
|
||||
to two weeks.
|
||||
delete_unverified: bool, default False
|
||||
Because they may be part of an in-progress transaction, files newer
|
||||
than 7 days old are not deleted by default. If you are sure that
|
||||
there are no in-progress transactions, then you can set this to True
|
||||
to delete all files older than `older_than`.
|
||||
|
||||
Returns
|
||||
-------
|
||||
CleanupStats
|
||||
The stats of the cleanup operation, including how many bytes were
|
||||
freed.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def compact_files(self, *args, **kwargs):
|
||||
"""
|
||||
Run the compaction process on the table.
|
||||
|
||||
Note: This function is not available in LanceDb Cloud (since LanceDb
|
||||
Cloud manages compaction for you automatically)
|
||||
|
||||
This can be run after making several small appends to optimize the table
|
||||
for faster reads.
|
||||
|
||||
Arguments are passed onto :meth:`lance.dataset.DatasetOptimizer.compact_files`.
|
||||
For most cases, the default should be fine.
|
||||
"""
|
||||
|
||||
|
||||
class LanceTable(Table):
|
||||
"""
|
||||
@@ -1196,6 +1315,31 @@ class LanceTable(Table):
|
||||
with_row_id=query.with_row_id,
|
||||
)
|
||||
|
||||
def _do_merge(
|
||||
self,
|
||||
merge: LanceMergeInsertBuilder,
|
||||
new_data: DATA,
|
||||
on_bad_vectors: str,
|
||||
fill_value: float,
|
||||
):
|
||||
new_data = _sanitize_data(
|
||||
new_data,
|
||||
self.schema,
|
||||
metadata=self.schema.metadata,
|
||||
on_bad_vectors=on_bad_vectors,
|
||||
fill_value=fill_value,
|
||||
)
|
||||
ds = self.to_lance()
|
||||
builder = ds.merge_insert(merge._on)
|
||||
if merge._when_matched_update_all:
|
||||
builder.when_matched_update_all()
|
||||
if merge._when_not_matched_insert_all:
|
||||
builder.when_not_matched_insert_all()
|
||||
if merge._when_not_matched_by_source_delete:
|
||||
cond = merge._when_not_matched_by_source_condition
|
||||
builder.when_not_matched_by_source_delete(cond)
|
||||
builder.execute(new_data)
|
||||
|
||||
def cleanup_old_versions(
|
||||
self,
|
||||
older_than: Optional[timedelta] = None,
|
||||
@@ -1233,8 +1377,9 @@ class LanceTable(Table):
|
||||
This can be run after making several small appends to optimize the table
|
||||
for faster reads.
|
||||
|
||||
Arguments are passed onto :meth:`lance.dataset.DatasetOptimizer.compact_files`.
|
||||
For most cases, the default should be fine.
|
||||
Arguments are passed onto `lance.dataset.DatasetOptimizer.compact_files`.
|
||||
(see Lance documentation for more details) For most cases, the default
|
||||
should be fine.
|
||||
"""
|
||||
return self.to_lance().optimize.compact_files(*args, **kwargs)
|
||||
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
[project]
|
||||
name = "lancedb"
|
||||
version = "0.5.1"
|
||||
version = "0.5.2"
|
||||
dependencies = [
|
||||
"deprecation",
|
||||
"pylance==0.9.10",
|
||||
"pylance==0.9.12",
|
||||
"ratelimiter~=1.0",
|
||||
"retry>=0.9.2",
|
||||
"tqdm>=4.27.0",
|
||||
|
||||
@@ -29,6 +29,9 @@ class FakeLanceDBClient:
|
||||
def post(self, path: str):
|
||||
pass
|
||||
|
||||
def mount_retry_adapter_for_table(self, table_name: str):
|
||||
pass
|
||||
|
||||
|
||||
def test_remote_db():
|
||||
conn = lancedb.connect("db://client-will-be-injected", api_key="fake")
|
||||
|
||||
@@ -493,6 +493,62 @@ def test_update_types(db):
|
||||
assert actual == expected
|
||||
|
||||
|
||||
def test_merge_insert(db):
|
||||
table = LanceTable.create(
|
||||
db,
|
||||
"my_table",
|
||||
data=pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}),
|
||||
)
|
||||
assert len(table) == 3
|
||||
version = table.version
|
||||
|
||||
new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
|
||||
|
||||
# upsert
|
||||
table.merge_insert(
|
||||
"a"
|
||||
).when_matched_update_all().when_not_matched_insert_all().execute(new_data)
|
||||
|
||||
expected = pa.table({"a": [1, 2, 3, 4], "b": ["a", "x", "y", "z"]})
|
||||
# These `sort_by` calls can be removed once lance#1892
|
||||
# is merged (it fixes the ordering)
|
||||
assert table.to_arrow().sort_by("a") == expected
|
||||
|
||||
table.restore(version)
|
||||
|
||||
# insert-if-not-exists
|
||||
table.merge_insert("a").when_not_matched_insert_all().execute(new_data)
|
||||
|
||||
expected = pa.table({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "z"]})
|
||||
assert table.to_arrow().sort_by("a") == expected
|
||||
|
||||
table.restore(version)
|
||||
|
||||
new_data = pa.table({"a": [2, 4], "b": ["x", "z"]})
|
||||
|
||||
# replace-range
|
||||
table.merge_insert(
|
||||
"a"
|
||||
).when_matched_update_all().when_not_matched_insert_all().when_not_matched_by_source_delete(
|
||||
"a > 2"
|
||||
).execute(new_data)
|
||||
|
||||
expected = pa.table({"a": [1, 2, 4], "b": ["a", "x", "z"]})
|
||||
assert table.to_arrow().sort_by("a") == expected
|
||||
|
||||
table.restore(version)
|
||||
|
||||
# replace-range no condition
|
||||
table.merge_insert(
|
||||
"a"
|
||||
).when_matched_update_all().when_not_matched_insert_all().when_not_matched_by_source_delete().execute(
|
||||
new_data
|
||||
)
|
||||
|
||||
expected = pa.table({"a": [2, 4], "b": ["x", "z"]})
|
||||
assert table.to_arrow().sort_by("a") == expected
|
||||
|
||||
|
||||
def test_create_with_embedding_function(db):
|
||||
class MyTable(LanceModel):
|
||||
text: str
|
||||
|
||||
@@ -260,6 +260,7 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> {
|
||||
cx.export_function("tableCountRows", JsTable::js_count_rows)?;
|
||||
cx.export_function("tableDelete", JsTable::js_delete)?;
|
||||
cx.export_function("tableUpdate", JsTable::js_update)?;
|
||||
cx.export_function("tableMergeInsert", JsTable::js_merge_insert)?;
|
||||
cx.export_function("tableCleanupOldVersions", JsTable::js_cleanup)?;
|
||||
cx.export_function("tableCompactFiles", JsTable::js_compact)?;
|
||||
cx.export_function("tableListIndices", JsTable::js_list_indices)?;
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::ops::Deref;
|
||||
|
||||
use arrow_array::{RecordBatch, RecordBatchIterator};
|
||||
use lance::dataset::optimize::CompactionOptions;
|
||||
use lance::dataset::{WriteMode, WriteParams};
|
||||
@@ -166,6 +168,53 @@ impl JsTable {
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
pub(crate) fn js_merge_insert(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
|
||||
let rt = runtime(&mut cx)?;
|
||||
let (deferred, promise) = cx.promise();
|
||||
let channel = cx.channel();
|
||||
let table = js_table.table.clone();
|
||||
|
||||
let key = cx.argument::<JsString>(0)?.value(&mut cx);
|
||||
let mut builder = table.merge_insert(&[&key]);
|
||||
if cx.argument::<JsBoolean>(1)?.value(&mut cx) {
|
||||
builder.when_matched_update_all();
|
||||
}
|
||||
if cx.argument::<JsBoolean>(2)?.value(&mut cx) {
|
||||
builder.when_not_matched_insert_all();
|
||||
}
|
||||
if cx.argument::<JsBoolean>(3)?.value(&mut cx) {
|
||||
if let Some(filter) = cx.argument_opt(4) {
|
||||
if filter.is_a::<JsNull, _>(&mut cx) {
|
||||
builder.when_not_matched_by_source_delete(None);
|
||||
} else {
|
||||
let filter = filter
|
||||
.downcast_or_throw::<JsString, _>(&mut cx)?
|
||||
.deref()
|
||||
.value(&mut cx);
|
||||
builder.when_not_matched_by_source_delete(Some(filter));
|
||||
}
|
||||
} else {
|
||||
builder.when_not_matched_by_source_delete(None);
|
||||
}
|
||||
}
|
||||
|
||||
let buffer = cx.argument::<JsBuffer>(5)?;
|
||||
let (batches, schema) =
|
||||
arrow_buffer_to_record_batch(buffer.as_slice(&cx)).or_throw(&mut cx)?;
|
||||
|
||||
rt.spawn(async move {
|
||||
let new_data = RecordBatchIterator::new(batches.into_iter().map(Ok), schema);
|
||||
let merge_insert_result = builder.execute(Box::new(new_data)).await;
|
||||
|
||||
deferred.settle_with(&channel, move |mut cx| {
|
||||
merge_insert_result.or_throw(&mut cx)?;
|
||||
Ok(cx.boxed(JsTable::from(table)))
|
||||
})
|
||||
});
|
||||
Ok(promise)
|
||||
}
|
||||
|
||||
pub(crate) fn js_update(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let js_table = cx.this().downcast_or_throw::<JsBox<JsTable>, _>(&mut cx)?;
|
||||
let table = js_table.table.clone();
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::sync::{Arc, Mutex};
|
||||
|
||||
use arrow_array::RecordBatchReader;
|
||||
use arrow_schema::{Schema, SchemaRef};
|
||||
use async_trait::async_trait;
|
||||
use chrono::Duration;
|
||||
use lance::dataset::builder::DatasetBuilder;
|
||||
use lance::dataset::cleanup::RemovalStats;
|
||||
@@ -27,6 +28,7 @@ use lance::dataset::optimize::{
|
||||
};
|
||||
pub use lance::dataset::ReadParams;
|
||||
use lance::dataset::{Dataset, UpdateBuilder, WriteParams};
|
||||
use lance::dataset::{MergeInsertBuilder as LanceMergeInsertBuilder, WhenNotMatchedBySource};
|
||||
use lance::io::WrappingObjectStore;
|
||||
use lance_index::{optimize::OptimizeOptions, DatasetIndexExt};
|
||||
use log::info;
|
||||
@@ -38,6 +40,10 @@ use crate::query::Query;
|
||||
use crate::utils::{PatchReadParam, PatchWriteParam};
|
||||
use crate::WriteMode;
|
||||
|
||||
use self::merge::{MergeInsert, MergeInsertBuilder};
|
||||
|
||||
pub mod merge;
|
||||
|
||||
/// Optimize the dataset.
|
||||
///
|
||||
/// Similar to `VACUUM` in PostgreSQL, it offers different options to
|
||||
@@ -170,6 +176,71 @@ pub trait Table: std::fmt::Display + Send + Sync {
|
||||
/// ```
|
||||
fn create_index(&self, column: &[&str]) -> IndexBuilder;
|
||||
|
||||
/// Create a builder for a merge insert operation
|
||||
///
|
||||
/// This operation can add rows, update rows, and remove rows all in a single
|
||||
/// transaction. It is a very generic tool that can be used to create
|
||||
/// behaviors like "insert if not exists", "update or insert (i.e. upsert)",
|
||||
/// or even replace a portion of existing data with new data (e.g. replace
|
||||
/// all data where month="january")
|
||||
///
|
||||
/// The merge insert operation works by combining new data from a
|
||||
/// **source table** with existing data in a **target table** by using a
|
||||
/// join. There are three categories of records.
|
||||
///
|
||||
/// "Matched" records are records that exist in both the source table and
|
||||
/// the target table. "Not matched" records exist only in the source table
|
||||
/// (e.g. these are new data) "Not matched by source" records exist only
|
||||
/// in the target table (this is old data)
|
||||
///
|
||||
/// The builder returned by this method can be used to customize what
|
||||
/// should happen for each category of data.
|
||||
///
|
||||
/// Please note that the data may appear to be reordered as part of this
|
||||
/// operation. This is because updated rows will be deleted from the
|
||||
/// dataset and then reinserted at the end with the new values.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `on` One or more columns to join on. This is how records from the
|
||||
/// source table and target table are matched. Typically this is some
|
||||
/// kind of key or id column.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// # use std::sync::Arc;
|
||||
/// # use vectordb::connection::{Database, Connection};
|
||||
/// # use arrow_array::{FixedSizeListArray, types::Float32Type, RecordBatch,
|
||||
/// # RecordBatchIterator, Int32Array};
|
||||
/// # use arrow_schema::{Schema, Field, DataType};
|
||||
/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
|
||||
/// let tmpdir = tempfile::tempdir().unwrap();
|
||||
/// let db = Database::connect(tmpdir.path().to_str().unwrap()).await.unwrap();
|
||||
/// # let tbl = db.open_table("idx_test").await.unwrap();
|
||||
/// # let schema = Arc::new(Schema::new(vec![
|
||||
/// # Field::new("id", DataType::Int32, false),
|
||||
/// # Field::new("vector", DataType::FixedSizeList(
|
||||
/// # Arc::new(Field::new("item", DataType::Float32, true)), 128), true),
|
||||
/// # ]));
|
||||
/// let new_data = RecordBatchIterator::new(vec![
|
||||
/// RecordBatch::try_new(schema.clone(),
|
||||
/// vec![
|
||||
/// Arc::new(Int32Array::from_iter_values(0..10)),
|
||||
/// Arc::new(FixedSizeListArray::from_iter_primitive::<Float32Type, _, _>(
|
||||
/// (0..10).map(|_| Some(vec![Some(1.0); 128])), 128)),
|
||||
/// ]).unwrap()
|
||||
/// ].into_iter().map(Ok),
|
||||
/// schema.clone());
|
||||
/// // Perform an upsert operation
|
||||
/// let mut merge_insert = tbl.merge_insert(&["id"]);
|
||||
/// merge_insert.when_matched_update_all()
|
||||
/// .when_not_matched_insert_all();
|
||||
/// merge_insert.execute(Box::new(new_data)).await.unwrap();
|
||||
/// # });
|
||||
/// ```
|
||||
fn merge_insert(&self, on: &[&str]) -> MergeInsertBuilder;
|
||||
|
||||
/// Search the table with a given query vector.
|
||||
///
|
||||
/// This is a convenience method for preparing an ANN query.
|
||||
@@ -593,6 +664,42 @@ impl NativeTable {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl MergeInsert for NativeTable {
|
||||
async fn do_merge_insert(
|
||||
&self,
|
||||
params: MergeInsertBuilder,
|
||||
new_data: Box<dyn RecordBatchReader + Send>,
|
||||
) -> Result<()> {
|
||||
let dataset = Arc::new(self.clone_inner_dataset());
|
||||
let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
|
||||
if params.when_matched_update_all {
|
||||
builder.when_matched(lance::dataset::WhenMatched::UpdateAll);
|
||||
} else {
|
||||
builder.when_matched(lance::dataset::WhenMatched::DoNothing);
|
||||
}
|
||||
if params.when_not_matched_insert_all {
|
||||
builder.when_not_matched(lance::dataset::WhenNotMatched::InsertAll);
|
||||
} else {
|
||||
builder.when_not_matched(lance::dataset::WhenNotMatched::DoNothing);
|
||||
}
|
||||
if params.when_not_matched_by_source_delete {
|
||||
let behavior = if let Some(filter) = params.when_not_matched_by_source_delete_filt {
|
||||
WhenNotMatchedBySource::delete_if(dataset.as_ref(), &filter)?
|
||||
} else {
|
||||
WhenNotMatchedBySource::Delete
|
||||
};
|
||||
builder.when_not_matched_by_source(behavior);
|
||||
} else {
|
||||
builder.when_not_matched_by_source(WhenNotMatchedBySource::Keep);
|
||||
}
|
||||
let job = builder.try_build()?;
|
||||
let new_dataset = job.execute_reader(new_data).await?;
|
||||
self.reset_dataset((*new_dataset).clone());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Table for NativeTable {
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
@@ -637,6 +744,11 @@ impl Table for NativeTable {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn merge_insert(&self, on: &[&str]) -> MergeInsertBuilder {
|
||||
let on = Vec::from_iter(on.iter().map(|key| key.to_string()));
|
||||
MergeInsertBuilder::new(Arc::new(self.clone()), on)
|
||||
}
|
||||
|
||||
fn create_index(&self, columns: &[&str]) -> IndexBuilder {
|
||||
IndexBuilder::new(Arc::new(self.clone()), columns)
|
||||
}
|
||||
@@ -802,6 +914,38 @@ mod tests {
|
||||
assert_eq!(table.name, "test");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_merge_insert() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
// Create a dataset with i=0..10
|
||||
let batches = make_test_batches_with_offset(0);
|
||||
let table = NativeTable::create(&uri, "test", batches, None, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(table.count_rows().await.unwrap(), 10);
|
||||
|
||||
// Create new data with i=5..15
|
||||
let new_batches = Box::new(make_test_batches_with_offset(5));
|
||||
|
||||
// Perform a "insert if not exists"
|
||||
let mut merge_insert_builder = table.merge_insert(&["i"]);
|
||||
merge_insert_builder.when_not_matched_insert_all();
|
||||
merge_insert_builder.execute(new_batches).await.unwrap();
|
||||
// Only 5 rows should actually be inserted
|
||||
assert_eq!(table.count_rows().await.unwrap(), 15);
|
||||
|
||||
// Create new data with i=15..25 (no id matches)
|
||||
let new_batches = Box::new(make_test_batches_with_offset(15));
|
||||
// Perform a "bulk update" (should not affect anything)
|
||||
let mut merge_insert_builder = table.merge_insert(&["i"]);
|
||||
merge_insert_builder.when_matched_update_all();
|
||||
merge_insert_builder.execute(new_batches).await.unwrap();
|
||||
// No new rows should have been inserted
|
||||
assert_eq!(table.count_rows().await.unwrap(), 15);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_add_overwrite() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
@@ -1148,17 +1292,25 @@ mod tests {
|
||||
assert!(wrapper.called());
|
||||
}
|
||||
|
||||
fn make_test_batches() -> impl RecordBatchReader + Send + Sync + 'static {
|
||||
fn make_test_batches_with_offset(
|
||||
offset: i32,
|
||||
) -> impl RecordBatchReader + Send + Sync + 'static {
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
|
||||
RecordBatchIterator::new(
|
||||
vec![RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![Arc::new(Int32Array::from_iter_values(0..10))],
|
||||
vec![Arc::new(Int32Array::from_iter_values(
|
||||
offset..(offset + 10),
|
||||
))],
|
||||
)],
|
||||
schema,
|
||||
)
|
||||
}
|
||||
|
||||
fn make_test_batches() -> impl RecordBatchReader + Send + Sync + 'static {
|
||||
make_test_batches_with_offset(0)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_index() {
|
||||
use arrow_array::RecordBatch;
|
||||
|
||||
95
rust/vectordb/src/table/merge.rs
Normal file
95
rust/vectordb/src/table/merge.rs
Normal file
@@ -0,0 +1,95 @@
|
||||
// Copyright 2024 Lance Developers.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_array::RecordBatchReader;
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::Result;
|
||||
|
||||
#[async_trait]
|
||||
pub(super) trait MergeInsert: Send + Sync {
|
||||
async fn do_merge_insert(
|
||||
&self,
|
||||
params: MergeInsertBuilder,
|
||||
new_data: Box<dyn RecordBatchReader + Send>,
|
||||
) -> Result<()>;
|
||||
}
|
||||
|
||||
/// A builder used to create and run a merge insert operation
|
||||
///
|
||||
/// See [`super::Table::merge_insert`] for more context
|
||||
pub struct MergeInsertBuilder {
|
||||
table: Arc<dyn MergeInsert>,
|
||||
pub(super) on: Vec<String>,
|
||||
pub(super) when_matched_update_all: bool,
|
||||
pub(super) when_not_matched_insert_all: bool,
|
||||
pub(super) when_not_matched_by_source_delete: bool,
|
||||
pub(super) when_not_matched_by_source_delete_filt: Option<String>,
|
||||
}
|
||||
|
||||
impl MergeInsertBuilder {
|
||||
pub(super) fn new(table: Arc<dyn MergeInsert>, on: Vec<String>) -> Self {
|
||||
Self {
|
||||
table,
|
||||
on,
|
||||
when_matched_update_all: false,
|
||||
when_not_matched_insert_all: false,
|
||||
when_not_matched_by_source_delete: false,
|
||||
when_not_matched_by_source_delete_filt: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Rows that exist in both the source table (new data) and
|
||||
/// the target table (old data) will be updated, replacing
|
||||
/// the old row with the corresponding matching row.
|
||||
///
|
||||
/// If there are multiple matches then the behavior is undefined.
|
||||
/// Currently this causes multiple copies of the row to be created
|
||||
/// but that behavior is subject to change.
|
||||
pub fn when_matched_update_all(&mut self) -> &mut Self {
|
||||
self.when_matched_update_all = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Rows that exist only in the source table (new data) should
|
||||
/// be inserted into the target table.
|
||||
pub fn when_not_matched_insert_all(&mut self) -> &mut Self {
|
||||
self.when_not_matched_insert_all = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Rows that exist only in the target table (old data) will be
|
||||
/// deleted. An optional condition can be provided to limit what
|
||||
/// data is deleted.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `condition` - If None then all such rows will be deleted.
|
||||
/// Otherwise the condition will be used as an SQL filter to
|
||||
/// limit what rows are deleted.
|
||||
pub fn when_not_matched_by_source_delete(&mut self, filter: Option<String>) -> &mut Self {
|
||||
self.when_not_matched_by_source_delete = true;
|
||||
self.when_not_matched_by_source_delete_filt = filter;
|
||||
self
|
||||
}
|
||||
|
||||
/// Executes the merge insert operation
|
||||
///
|
||||
/// Nothing is returned but the [`super::Table`] is updated
|
||||
pub async fn execute(self, new_data: Box<dyn RecordBatchReader + Send>) -> Result<()> {
|
||||
self.table.clone().do_merge_insert(self, new_data).await
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user