mirror of
https://github.com/lancedb/lancedb.git
synced 2025-12-23 21:39:57 +00:00
Compare commits
24 Commits
python-v0.
...
docs/quick
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9e278fc5a6 | ||
|
|
09fed1f286 | ||
|
|
cee2b5ea42 | ||
|
|
f315f9665a | ||
|
|
5deb26bc8b | ||
|
|
3cc670ac38 | ||
|
|
4ade3e31e2 | ||
|
|
a222d2cd91 | ||
|
|
508e621f3d | ||
|
|
a1a0472f3f | ||
|
|
3425a6d339 | ||
|
|
af54e0ce06 | ||
|
|
089905fe8f | ||
|
|
554939e5d2 | ||
|
|
7a13814922 | ||
|
|
e9f25f6a12 | ||
|
|
419a433244 | ||
|
|
a9311c4dc0 | ||
|
|
178bcf9c90 | ||
|
|
b9be092cb1 | ||
|
|
e8c0c52315 | ||
|
|
a60fa0d3b7 | ||
|
|
726d629b9b | ||
|
|
b493f56dee |
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.19.0-beta.11"
|
||||
current_version = "0.19.1-beta.1"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
1
.github/workflows/python.yml
vendored
1
.github/workflows/python.yml
vendored
@@ -228,6 +228,7 @@ jobs:
|
||||
- name: Install lancedb
|
||||
run: |
|
||||
pip install "pydantic<2"
|
||||
pip install pyarrow==16
|
||||
pip install --extra-index-url https://pypi.fury.io/lancedb/ -e .[tests]
|
||||
pip install tantivy
|
||||
- name: Run tests
|
||||
|
||||
330
Cargo.lock
generated
330
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
16
Cargo.toml
16
Cargo.toml
@@ -21,14 +21,14 @@ categories = ["database-implementations"]
|
||||
rust-version = "1.78.0"
|
||||
|
||||
[workspace.dependencies]
|
||||
lance = { "version" = "=0.26.0", "features" = ["dynamodb"] }
|
||||
lance-io = "=0.26.0"
|
||||
lance-index = "=0.26.0"
|
||||
lance-linalg = "=0.26.0"
|
||||
lance-table = "=0.26.0"
|
||||
lance-testing = "=0.26.0"
|
||||
lance-datafusion = "=0.26.0"
|
||||
lance-encoding = "=0.26.0"
|
||||
lance = { "version" = "=0.27.0", "features" = ["dynamodb"], tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" }
|
||||
lance-io = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" }
|
||||
lance-index = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" }
|
||||
lance-linalg = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" }
|
||||
lance-table = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" }
|
||||
lance-testing = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" }
|
||||
lance-datafusion = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" }
|
||||
lance-encoding = { version = "=0.27.0", tag = "v0.27.0-beta.2", git="https://github.com/lancedb/lance.git" }
|
||||
# Note that this one does not include pyarrow
|
||||
arrow = { version = "54.1", optional = false }
|
||||
arrow-array = "54.1"
|
||||
|
||||
@@ -105,7 +105,8 @@ markdown_extensions:
|
||||
nav:
|
||||
- Home:
|
||||
- LanceDB: index.md
|
||||
- 🏃🏼♂️ Quick start: basic.md
|
||||
- 👉 Quickstart: quickstart.md
|
||||
- 🏃🏼♂️ Basic Usage: basic.md
|
||||
- 📚 Concepts:
|
||||
- Vector search: concepts/vector_search.md
|
||||
- Indexing:
|
||||
@@ -237,7 +238,9 @@ nav:
|
||||
- 👾 JavaScript (lancedb): js/globals.md
|
||||
- 🦀 Rust: https://docs.rs/lancedb/latest/lancedb/
|
||||
|
||||
- Quick start: basic.md
|
||||
- Getting Started:
|
||||
- Quickstart: quickstart.md
|
||||
- Basic Usage: basic.md
|
||||
- Concepts:
|
||||
- Vector search: concepts/vector_search.md
|
||||
- Indexing:
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# Quick start
|
||||
# Basic Usage
|
||||
|
||||
!!! info "LanceDB can be run in a number of ways:"
|
||||
|
||||
|
||||
@@ -33,20 +33,20 @@ Construct a MergeInsertBuilder. __Internal use only.__
|
||||
### execute()
|
||||
|
||||
```ts
|
||||
execute(data): Promise<void>
|
||||
execute(data): Promise<MergeStats>
|
||||
```
|
||||
|
||||
Executes the merge insert operation
|
||||
|
||||
Nothing is returned but the `Table` is updated
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **data**: [`Data`](../type-aliases/Data.md)
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`void`>
|
||||
`Promise`<[`MergeStats`](../interfaces/MergeStats.md)>
|
||||
|
||||
Statistics about the merge operation: counts of inserted, updated, and deleted rows
|
||||
|
||||
***
|
||||
|
||||
|
||||
@@ -117,8 +117,8 @@ wish to return to standard mode, call `checkoutLatest`.
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **version**: `number`
|
||||
The version to checkout
|
||||
* **version**: `string` \| `number`
|
||||
The version to checkout, could be version number or tag
|
||||
|
||||
#### Returns
|
||||
|
||||
@@ -615,6 +615,50 @@ of the given query
|
||||
|
||||
***
|
||||
|
||||
### stats()
|
||||
|
||||
```ts
|
||||
abstract stats(): Promise<TableStatistics>
|
||||
```
|
||||
|
||||
Returns table and fragment statistics
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<[`TableStatistics`](../interfaces/TableStatistics.md)>
|
||||
|
||||
The table and fragment statistics
|
||||
|
||||
***
|
||||
|
||||
### tags()
|
||||
|
||||
```ts
|
||||
abstract tags(): Promise<Tags>
|
||||
```
|
||||
|
||||
Get a tags manager for this table.
|
||||
|
||||
Tags allow you to label specific versions of a table with a human-readable name.
|
||||
The returned tags manager can be used to list, create, update, or delete tags.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<[`Tags`](Tags.md)>
|
||||
|
||||
A tags manager for this table
|
||||
|
||||
#### Example
|
||||
|
||||
```typescript
|
||||
const tagsManager = await table.tags();
|
||||
await tagsManager.create("v1", 1);
|
||||
const tags = await tagsManager.list();
|
||||
console.log(tags); // { "v1": { version: 1, manifestSize: ... } }
|
||||
```
|
||||
|
||||
***
|
||||
|
||||
### toArrow()
|
||||
|
||||
```ts
|
||||
|
||||
35
docs/src/js/classes/TagContents.md
Normal file
35
docs/src/js/classes/TagContents.md
Normal file
@@ -0,0 +1,35 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / TagContents
|
||||
|
||||
# Class: TagContents
|
||||
|
||||
## Constructors
|
||||
|
||||
### new TagContents()
|
||||
|
||||
```ts
|
||||
new TagContents(): TagContents
|
||||
```
|
||||
|
||||
#### Returns
|
||||
|
||||
[`TagContents`](TagContents.md)
|
||||
|
||||
## Properties
|
||||
|
||||
### manifestSize
|
||||
|
||||
```ts
|
||||
manifestSize: number;
|
||||
```
|
||||
|
||||
***
|
||||
|
||||
### version
|
||||
|
||||
```ts
|
||||
version: number;
|
||||
```
|
||||
99
docs/src/js/classes/Tags.md
Normal file
99
docs/src/js/classes/Tags.md
Normal file
@@ -0,0 +1,99 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / Tags
|
||||
|
||||
# Class: Tags
|
||||
|
||||
## Constructors
|
||||
|
||||
### new Tags()
|
||||
|
||||
```ts
|
||||
new Tags(): Tags
|
||||
```
|
||||
|
||||
#### Returns
|
||||
|
||||
[`Tags`](Tags.md)
|
||||
|
||||
## Methods
|
||||
|
||||
### create()
|
||||
|
||||
```ts
|
||||
create(tag, version): Promise<void>
|
||||
```
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **tag**: `string`
|
||||
|
||||
* **version**: `number`
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`void`>
|
||||
|
||||
***
|
||||
|
||||
### delete()
|
||||
|
||||
```ts
|
||||
delete(tag): Promise<void>
|
||||
```
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **tag**: `string`
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`void`>
|
||||
|
||||
***
|
||||
|
||||
### getVersion()
|
||||
|
||||
```ts
|
||||
getVersion(tag): Promise<number>
|
||||
```
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **tag**: `string`
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`number`>
|
||||
|
||||
***
|
||||
|
||||
### list()
|
||||
|
||||
```ts
|
||||
list(): Promise<Record<string, TagContents>>
|
||||
```
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`Record`<`string`, [`TagContents`](TagContents.md)>>
|
||||
|
||||
***
|
||||
|
||||
### update()
|
||||
|
||||
```ts
|
||||
update(tag, version): Promise<void>
|
||||
```
|
||||
|
||||
#### Parameters
|
||||
|
||||
* **tag**: `string`
|
||||
|
||||
* **version**: `number`
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`void`>
|
||||
@@ -27,6 +27,8 @@
|
||||
- [QueryBase](classes/QueryBase.md)
|
||||
- [RecordBatchIterator](classes/RecordBatchIterator.md)
|
||||
- [Table](classes/Table.md)
|
||||
- [TagContents](classes/TagContents.md)
|
||||
- [Tags](classes/Tags.md)
|
||||
- [VectorColumnOptions](classes/VectorColumnOptions.md)
|
||||
- [VectorQuery](classes/VectorQuery.md)
|
||||
|
||||
@@ -40,6 +42,8 @@
|
||||
- [ConnectionOptions](interfaces/ConnectionOptions.md)
|
||||
- [CreateTableOptions](interfaces/CreateTableOptions.md)
|
||||
- [ExecutableQuery](interfaces/ExecutableQuery.md)
|
||||
- [FragmentStatistics](interfaces/FragmentStatistics.md)
|
||||
- [FragmentSummaryStats](interfaces/FragmentSummaryStats.md)
|
||||
- [FtsOptions](interfaces/FtsOptions.md)
|
||||
- [FullTextQuery](interfaces/FullTextQuery.md)
|
||||
- [FullTextSearchOptions](interfaces/FullTextSearchOptions.md)
|
||||
@@ -50,6 +54,7 @@
|
||||
- [IndexStatistics](interfaces/IndexStatistics.md)
|
||||
- [IvfFlatOptions](interfaces/IvfFlatOptions.md)
|
||||
- [IvfPqOptions](interfaces/IvfPqOptions.md)
|
||||
- [MergeStats](interfaces/MergeStats.md)
|
||||
- [OpenTableOptions](interfaces/OpenTableOptions.md)
|
||||
- [OptimizeOptions](interfaces/OptimizeOptions.md)
|
||||
- [OptimizeStats](interfaces/OptimizeStats.md)
|
||||
@@ -57,6 +62,7 @@
|
||||
- [RemovalStats](interfaces/RemovalStats.md)
|
||||
- [RetryConfig](interfaces/RetryConfig.md)
|
||||
- [TableNamesOptions](interfaces/TableNamesOptions.md)
|
||||
- [TableStatistics](interfaces/TableStatistics.md)
|
||||
- [TimeoutConfig](interfaces/TimeoutConfig.md)
|
||||
- [UpdateOptions](interfaces/UpdateOptions.md)
|
||||
- [Version](interfaces/Version.md)
|
||||
|
||||
37
docs/src/js/interfaces/FragmentStatistics.md
Normal file
37
docs/src/js/interfaces/FragmentStatistics.md
Normal file
@@ -0,0 +1,37 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / FragmentStatistics
|
||||
|
||||
# Interface: FragmentStatistics
|
||||
|
||||
## Properties
|
||||
|
||||
### lengths
|
||||
|
||||
```ts
|
||||
lengths: FragmentSummaryStats;
|
||||
```
|
||||
|
||||
Statistics on the number of rows in the table fragments
|
||||
|
||||
***
|
||||
|
||||
### numFragments
|
||||
|
||||
```ts
|
||||
numFragments: number;
|
||||
```
|
||||
|
||||
The number of fragments in the table
|
||||
|
||||
***
|
||||
|
||||
### numSmallFragments
|
||||
|
||||
```ts
|
||||
numSmallFragments: number;
|
||||
```
|
||||
|
||||
The number of uncompacted fragments in the table
|
||||
77
docs/src/js/interfaces/FragmentSummaryStats.md
Normal file
77
docs/src/js/interfaces/FragmentSummaryStats.md
Normal file
@@ -0,0 +1,77 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / FragmentSummaryStats
|
||||
|
||||
# Interface: FragmentSummaryStats
|
||||
|
||||
## Properties
|
||||
|
||||
### max
|
||||
|
||||
```ts
|
||||
max: number;
|
||||
```
|
||||
|
||||
The number of rows in the fragment with the most rows
|
||||
|
||||
***
|
||||
|
||||
### mean
|
||||
|
||||
```ts
|
||||
mean: number;
|
||||
```
|
||||
|
||||
The mean number of rows in the fragments
|
||||
|
||||
***
|
||||
|
||||
### min
|
||||
|
||||
```ts
|
||||
min: number;
|
||||
```
|
||||
|
||||
The number of rows in the fragment with the fewest rows
|
||||
|
||||
***
|
||||
|
||||
### p25
|
||||
|
||||
```ts
|
||||
p25: number;
|
||||
```
|
||||
|
||||
The 25th percentile of number of rows in the fragments
|
||||
|
||||
***
|
||||
|
||||
### p50
|
||||
|
||||
```ts
|
||||
p50: number;
|
||||
```
|
||||
|
||||
The 50th percentile of number of rows in the fragments
|
||||
|
||||
***
|
||||
|
||||
### p75
|
||||
|
||||
```ts
|
||||
p75: number;
|
||||
```
|
||||
|
||||
The 75th percentile of number of rows in the fragments
|
||||
|
||||
***
|
||||
|
||||
### p99
|
||||
|
||||
```ts
|
||||
p99: number;
|
||||
```
|
||||
|
||||
The 99th percentile of number of rows in the fragments
|
||||
31
docs/src/js/interfaces/MergeStats.md
Normal file
31
docs/src/js/interfaces/MergeStats.md
Normal file
@@ -0,0 +1,31 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / MergeStats
|
||||
|
||||
# Interface: MergeStats
|
||||
|
||||
## Properties
|
||||
|
||||
### numDeletedRows
|
||||
|
||||
```ts
|
||||
numDeletedRows: bigint;
|
||||
```
|
||||
|
||||
***
|
||||
|
||||
### numInsertedRows
|
||||
|
||||
```ts
|
||||
numInsertedRows: bigint;
|
||||
```
|
||||
|
||||
***
|
||||
|
||||
### numUpdatedRows
|
||||
|
||||
```ts
|
||||
numUpdatedRows: bigint;
|
||||
```
|
||||
47
docs/src/js/interfaces/TableStatistics.md
Normal file
47
docs/src/js/interfaces/TableStatistics.md
Normal file
@@ -0,0 +1,47 @@
|
||||
[**@lancedb/lancedb**](../README.md) • **Docs**
|
||||
|
||||
***
|
||||
|
||||
[@lancedb/lancedb](../globals.md) / TableStatistics
|
||||
|
||||
# Interface: TableStatistics
|
||||
|
||||
## Properties
|
||||
|
||||
### fragmentStats
|
||||
|
||||
```ts
|
||||
fragmentStats: FragmentStatistics;
|
||||
```
|
||||
|
||||
Statistics on table fragments
|
||||
|
||||
***
|
||||
|
||||
### numIndices
|
||||
|
||||
```ts
|
||||
numIndices: number;
|
||||
```
|
||||
|
||||
The number of indices in the table
|
||||
|
||||
***
|
||||
|
||||
### numRows
|
||||
|
||||
```ts
|
||||
numRows: number;
|
||||
```
|
||||
|
||||
The number of rows in the table
|
||||
|
||||
***
|
||||
|
||||
### totalBytes
|
||||
|
||||
```ts
|
||||
totalBytes: number;
|
||||
```
|
||||
|
||||
The total number of bytes in the table
|
||||
101
docs/src/quickstart.md
Normal file
101
docs/src/quickstart.md
Normal file
@@ -0,0 +1,101 @@
|
||||
|
||||
# Getting Started with LanceDB: A Minimal Vector Search Tutorial
|
||||
|
||||
Let's set up a LanceDB database, insert vector data, and perform a simple vector search. We'll use simple character classes like "knight" and "rogue" to illustrate semantic relevance.
|
||||
|
||||
## 1. Install Dependencies
|
||||
|
||||
Before starting, make sure you have the necessary packages:
|
||||
|
||||
```bash
|
||||
pip install lancedb pandas numpy
|
||||
```
|
||||
|
||||
## 2. Import Required Libraries
|
||||
|
||||
```python
|
||||
import lancedb
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
```
|
||||
|
||||
## 3. Connect to LanceDB
|
||||
|
||||
You can use a local directory to store your database:
|
||||
|
||||
```python
|
||||
db = lancedb.connect("./lancedb")
|
||||
```
|
||||
|
||||
## 4. Create Sample Data
|
||||
|
||||
Add sample text data and corresponding 4D vectors:
|
||||
|
||||
```python
|
||||
data = pd.DataFrame([
|
||||
{"id": "1", "vector": [1.0, 0.0, 0.0, 0.0], "text": "knight"},
|
||||
{"id": "2", "vector": [0.9, 0.1, 0.0, 0.0], "text": "warrior"},
|
||||
{"id": "3", "vector": [0.0, 1.0, 0.0, 0.0], "text": "rogue"},
|
||||
{"id": "4", "vector": [0.0, 0.9, 0.1, 0.0], "text": "thief"},
|
||||
{"id": "5", "vector": [0.5, 0.5, 0.0, 0.0], "text": "ranger"},
|
||||
])
|
||||
```
|
||||
|
||||
## 5. Create a Table in LanceDB
|
||||
|
||||
```python
|
||||
table = db.create_table("rpg_classes", data=data, mode="overwrite")
|
||||
```
|
||||
|
||||
Let's see how the table looks:
|
||||
```python
|
||||
print(data)
|
||||
```
|
||||
|
||||
| id | vector | text |
|
||||
|----|--------|------|
|
||||
| 1 | [1.0, 0.0, 0.0, 0.0] | knight |
|
||||
| 2 | [0.9, 0.1, 0.0, 0.0] | warrior |
|
||||
| 3 | [0.0, 1.0, 0.0, 0.0] | rogue |
|
||||
| 4 | [0.0, 0.9, 0.1, 0.0] | thief |
|
||||
| 5 | [0.5, 0.5, 0.0, 0.0] | ranger |
|
||||
|
||||
|
||||
|
||||
## 6. Perform a Vector Search
|
||||
|
||||
Search for the most similar character classes to our query vector:
|
||||
|
||||
```python
|
||||
# Query as if we are searching for "rogue"
|
||||
results = table.search([0.95, 0.05, 0.0, 0.0]).limit(3).to_df()
|
||||
print(results)
|
||||
```
|
||||
|
||||
This will return the top 3 closest classes to the vector, effectively showing how LanceDB can be used for semantic search.
|
||||
|
||||
| id | vector | text | _distance |
|
||||
|------|------------------------|----------|-----------|
|
||||
| 3 | [0.0, 1.0, 0.0, 0.0] | rogue | 0.00 |
|
||||
| 4 | [0.0, 0.9, 0.1, 0.0] | thief | 0.02 |
|
||||
| 5 | [0.5, 0.5, 0.0, 0.0] | ranger | 0.50 |
|
||||
|
||||
Let's try searching for "knight"
|
||||
|
||||
```python
|
||||
query_vector = [1.0, 0.0, 0.0, 0.0]
|
||||
results = table.search(query_vector).limit(3).to_pandas()
|
||||
print(results)
|
||||
```
|
||||
|
||||
| id | vector | text | _distance |
|
||||
|------|------------------------|----------|-----------|
|
||||
| 1 | [1.0, 0.0, 0.0, 0.0] | knight | 0.00 |
|
||||
| 2 | [0.9, 0.1, 0.0, 0.0] | warrior | 0.02 |
|
||||
| 5 | [0.5, 0.5, 0.0, 0.0] | ranger | 0.50 |
|
||||
|
||||
## Next Steps
|
||||
|
||||
That's it - you just conducted vector search!
|
||||
|
||||
For more beginner tips, check out the [Basic Usage](basic.md) guide.
|
||||
@@ -8,7 +8,7 @@
|
||||
<parent>
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.19.0-beta.11</version>
|
||||
<version>0.19.1-beta.1</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.19.0-beta.11</version>
|
||||
<version>0.19.1-beta.1</version>
|
||||
<packaging>pom</packaging>
|
||||
|
||||
<name>LanceDB Parent</name>
|
||||
|
||||
44
node/package-lock.json
generated
44
node/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "vectordb",
|
||||
"version": "0.19.0-beta.11",
|
||||
"version": "0.19.1-beta.1",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "vectordb",
|
||||
"version": "0.19.0-beta.11",
|
||||
"version": "0.19.1-beta.1",
|
||||
"cpu": [
|
||||
"x64",
|
||||
"arm64"
|
||||
@@ -52,11 +52,11 @@
|
||||
"uuid": "^9.0.0"
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"@lancedb/vectordb-darwin-arm64": "0.19.0-beta.11",
|
||||
"@lancedb/vectordb-darwin-x64": "0.19.0-beta.11",
|
||||
"@lancedb/vectordb-linux-arm64-gnu": "0.19.0-beta.11",
|
||||
"@lancedb/vectordb-linux-x64-gnu": "0.19.0-beta.11",
|
||||
"@lancedb/vectordb-win32-x64-msvc": "0.19.0-beta.11"
|
||||
"@lancedb/vectordb-darwin-arm64": "0.19.1-beta.1",
|
||||
"@lancedb/vectordb-darwin-x64": "0.19.1-beta.1",
|
||||
"@lancedb/vectordb-linux-arm64-gnu": "0.19.1-beta.1",
|
||||
"@lancedb/vectordb-linux-x64-gnu": "0.19.1-beta.1",
|
||||
"@lancedb/vectordb-win32-x64-msvc": "0.19.1-beta.1"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"@apache-arrow/ts": "^14.0.2",
|
||||
@@ -327,9 +327,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-darwin-arm64": {
|
||||
"version": "0.19.0-beta.11",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.19.0-beta.11.tgz",
|
||||
"integrity": "sha512-fLefGJYdlIRIjrJj8MU1r5Zix5LpKktpCYilA7tZrfvBWkubGceJ+U6RPsWz7VGBfWcETo3g5CBooUPhbtSMlQ==",
|
||||
"version": "0.19.1-beta.1",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.19.1-beta.1.tgz",
|
||||
"integrity": "sha512-Epvel0pF5TM6MtIWQ2KhqezqSSHTL3Wr7a2rGAwz6X/XY23i6DbMPpPs0HyeIDzDrhxNfE3cz3S+SiCA6xpR0g==",
|
||||
"cpu": [
|
||||
"arm64"
|
||||
],
|
||||
@@ -340,9 +340,9 @@
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-darwin-x64": {
|
||||
"version": "0.19.0-beta.11",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.19.0-beta.11.tgz",
|
||||
"integrity": "sha512-FkCa1TbPLDXAGhlRI4tafcltzApCsyvgi+I+kX07u5DKPNQVALpQ3R6X6GLlbiFsAFBdyv9t2fqQ9DlgjJIZpA==",
|
||||
"version": "0.19.1-beta.1",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.19.1-beta.1.tgz",
|
||||
"integrity": "sha512-hOiUSlIoISbiXytp46hToi/r6sF5pImAsfbzCsIq8ExDV4TPa8fjbhcIT80vxxOwc2mpSSK4HsVJYod95RSbEQ==",
|
||||
"cpu": [
|
||||
"x64"
|
||||
],
|
||||
@@ -353,9 +353,9 @@
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-linux-arm64-gnu": {
|
||||
"version": "0.19.0-beta.11",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.19.0-beta.11.tgz",
|
||||
"integrity": "sha512-iZkL/01HNUNQ8pGK0+hoNyrM7P1YtShsyIQVzJMfo41SAofCBf9qvi9YaYyd49sDb+dQXeRn1+cfaJ9siz1OHw==",
|
||||
"version": "0.19.1-beta.1",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.19.1-beta.1.tgz",
|
||||
"integrity": "sha512-/1JhGVDEngwrlM8o2TNW8G6nJ9U/VgHKAORmj/cTA7O30helJIoo9jfvUAUy+vZ4VoEwRXQbMI+gaYTg0l3MTg==",
|
||||
"cpu": [
|
||||
"arm64"
|
||||
],
|
||||
@@ -366,9 +366,9 @@
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-linux-x64-gnu": {
|
||||
"version": "0.19.0-beta.11",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.19.0-beta.11.tgz",
|
||||
"integrity": "sha512-MdKRHxe2tRQqmExNLv3f6Wvx1mEi98eFtD0ysm4tNrQdaS1MJbTp+DUehrRKkfDWsooalHkIi9d02BVw5qseUQ==",
|
||||
"version": "0.19.1-beta.1",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.19.1-beta.1.tgz",
|
||||
"integrity": "sha512-zNRGSSUt8nTJMmll4NdxhQjwxR8Rezq3T4dsRoiDts5ienMam5HFjYiZ3FkDZQo16rgq2BcbFuH1G8u1chywlg==",
|
||||
"cpu": [
|
||||
"x64"
|
||||
],
|
||||
@@ -379,9 +379,9 @@
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-win32-x64-msvc": {
|
||||
"version": "0.19.0-beta.11",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.19.0-beta.11.tgz",
|
||||
"integrity": "sha512-KWy+t9jr0feJAW9NkmM/w9kfdpp78+7mkeh9lb0g3xI3OdYU1yizNqFjbIQqJf7/L4sou4wmOjAC+FcP8qCtzg==",
|
||||
"version": "0.19.1-beta.1",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.19.1-beta.1.tgz",
|
||||
"integrity": "sha512-yV550AJGlsIFdm1KoHQPJ1TZx121ZXCIdebBtBZj3wOObIhyB/i0kZAtGvwjkmr7EYyfzt1EHZzbjSGVdehIAA==",
|
||||
"cpu": [
|
||||
"x64"
|
||||
],
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "vectordb",
|
||||
"version": "0.19.0-beta.11",
|
||||
"version": "0.19.1-beta.1",
|
||||
"description": " Serverless, low-latency vector database for AI applications",
|
||||
"private": false,
|
||||
"main": "dist/index.js",
|
||||
@@ -89,10 +89,10 @@
|
||||
}
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"@lancedb/vectordb-darwin-x64": "0.19.0-beta.11",
|
||||
"@lancedb/vectordb-darwin-arm64": "0.19.0-beta.11",
|
||||
"@lancedb/vectordb-linux-x64-gnu": "0.19.0-beta.11",
|
||||
"@lancedb/vectordb-linux-arm64-gnu": "0.19.0-beta.11",
|
||||
"@lancedb/vectordb-win32-x64-msvc": "0.19.0-beta.11"
|
||||
"@lancedb/vectordb-darwin-x64": "0.19.1-beta.1",
|
||||
"@lancedb/vectordb-darwin-arm64": "0.19.1-beta.1",
|
||||
"@lancedb/vectordb-linux-x64-gnu": "0.19.1-beta.1",
|
||||
"@lancedb/vectordb-linux-arm64-gnu": "0.19.1-beta.1",
|
||||
"@lancedb/vectordb-win32-x64-msvc": "0.19.1-beta.1"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "lancedb-nodejs"
|
||||
edition.workspace = true
|
||||
version = "0.19.0-beta.11"
|
||||
version = "0.19.1-beta.1"
|
||||
license.workspace = true
|
||||
description.workspace = true
|
||||
repository.workspace = true
|
||||
|
||||
@@ -374,6 +374,71 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
|
||||
expect(table2.numRows).toBe(4);
|
||||
expect(table2.schema).toEqual(schema);
|
||||
});
|
||||
|
||||
it("should correctly retain values in nested struct fields", async function () {
|
||||
// Define test data with nested struct
|
||||
const testData = [
|
||||
{
|
||||
id: "doc1",
|
||||
vector: [1, 2, 3],
|
||||
metadata: {
|
||||
filePath: "/path/to/file1.ts",
|
||||
startLine: 10,
|
||||
endLine: 20,
|
||||
text: "function test() { return true; }",
|
||||
},
|
||||
},
|
||||
{
|
||||
id: "doc2",
|
||||
vector: [4, 5, 6],
|
||||
metadata: {
|
||||
filePath: "/path/to/file2.ts",
|
||||
startLine: 30,
|
||||
endLine: 40,
|
||||
text: "function test2() { return false; }",
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
// Create Arrow table from the data
|
||||
const table = makeArrowTable(testData);
|
||||
|
||||
// Verify schema has the nested struct fields
|
||||
const metadataField = table.schema.fields.find(
|
||||
(f) => f.name === "metadata",
|
||||
);
|
||||
expect(metadataField).toBeDefined();
|
||||
// biome-ignore lint/suspicious/noExplicitAny: accessing fields in different Arrow versions
|
||||
const childNames = metadataField?.type.children.map((c: any) => c.name);
|
||||
expect(childNames).toEqual([
|
||||
"filePath",
|
||||
"startLine",
|
||||
"endLine",
|
||||
"text",
|
||||
]);
|
||||
|
||||
// Convert to buffer and back (simulating storage and retrieval)
|
||||
const buf = await fromTableToBuffer(table);
|
||||
const retrievedTable = tableFromIPC(buf);
|
||||
|
||||
// Verify the retrieved table has the same structure
|
||||
const rows = [];
|
||||
for (let i = 0; i < retrievedTable.numRows; i++) {
|
||||
rows.push(retrievedTable.get(i));
|
||||
}
|
||||
|
||||
// Check values in the first row
|
||||
const firstRow = rows[0];
|
||||
expect(firstRow.id).toBe("doc1");
|
||||
expect(firstRow.vector.toJSON()).toEqual([1, 2, 3]);
|
||||
|
||||
// Verify metadata values are preserved (this is where the bug is)
|
||||
expect(firstRow.metadata).toBeDefined();
|
||||
expect(firstRow.metadata.filePath).toBe("/path/to/file1.ts");
|
||||
expect(firstRow.metadata.startLine).toBe(10);
|
||||
expect(firstRow.metadata.endLine).toBe(20);
|
||||
expect(firstRow.metadata.text).toBe("function test() { return true; }");
|
||||
});
|
||||
});
|
||||
|
||||
class DummyEmbedding extends EmbeddingFunction<string> {
|
||||
|
||||
@@ -71,6 +71,29 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
|
||||
await expect(table.countRows()).resolves.toBe(3);
|
||||
});
|
||||
|
||||
it("should show table stats", async () => {
|
||||
await table.add([{ id: 1 }, { id: 2 }]);
|
||||
await table.add([{ id: 1 }]);
|
||||
await expect(table.stats()).resolves.toEqual({
|
||||
fragmentStats: {
|
||||
lengths: {
|
||||
max: 2,
|
||||
mean: 1,
|
||||
min: 1,
|
||||
p25: 1,
|
||||
p50: 2,
|
||||
p75: 2,
|
||||
p99: 2,
|
||||
},
|
||||
numFragments: 2,
|
||||
numSmallFragments: 2,
|
||||
},
|
||||
numIndices: 0,
|
||||
numRows: 3,
|
||||
totalBytes: 24,
|
||||
});
|
||||
});
|
||||
|
||||
it("should overwrite data if asked", async () => {
|
||||
await table.add([{ id: 1 }, { id: 2 }]);
|
||||
await table.add([{ id: 1 }], { mode: "overwrite" });
|
||||
@@ -315,11 +338,16 @@ describe("merge insert", () => {
|
||||
{ a: 3, b: "y" },
|
||||
{ a: 4, b: "z" },
|
||||
];
|
||||
await table
|
||||
const stats = await table
|
||||
.mergeInsert("a")
|
||||
.whenMatchedUpdateAll()
|
||||
.whenNotMatchedInsertAll()
|
||||
.execute(newData);
|
||||
|
||||
expect(stats.numInsertedRows).toBe(1n);
|
||||
expect(stats.numUpdatedRows).toBe(2n);
|
||||
expect(stats.numDeletedRows).toBe(0n);
|
||||
|
||||
const expected = [
|
||||
{ a: 1, b: "a" },
|
||||
{ a: 2, b: "x" },
|
||||
@@ -1178,6 +1206,73 @@ describe("when dealing with versioning", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("when dealing with tags", () => {
|
||||
let tmpDir: tmp.DirResult;
|
||||
beforeEach(() => {
|
||||
tmpDir = tmp.dirSync({ unsafeCleanup: true });
|
||||
});
|
||||
afterEach(() => {
|
||||
tmpDir.removeCallback();
|
||||
});
|
||||
|
||||
it("can manage tags", async () => {
|
||||
const conn = await connect(tmpDir.name, {
|
||||
readConsistencyInterval: 0,
|
||||
});
|
||||
|
||||
const table = await conn.createTable("my_table", [
|
||||
{ id: 1n, vector: [0.1, 0.2] },
|
||||
]);
|
||||
expect(await table.version()).toBe(1);
|
||||
|
||||
await table.add([{ id: 2n, vector: [0.3, 0.4] }]);
|
||||
expect(await table.version()).toBe(2);
|
||||
|
||||
const tagsManager = await table.tags();
|
||||
|
||||
const initialTags = await tagsManager.list();
|
||||
expect(Object.keys(initialTags).length).toBe(0);
|
||||
|
||||
const tag1 = "tag1";
|
||||
await tagsManager.create(tag1, 1);
|
||||
expect(await tagsManager.getVersion(tag1)).toBe(1);
|
||||
|
||||
const tagsAfterFirst = await tagsManager.list();
|
||||
expect(Object.keys(tagsAfterFirst).length).toBe(1);
|
||||
expect(tagsAfterFirst).toHaveProperty(tag1);
|
||||
expect(tagsAfterFirst[tag1].version).toBe(1);
|
||||
|
||||
await tagsManager.create("tag2", 2);
|
||||
expect(await tagsManager.getVersion("tag2")).toBe(2);
|
||||
|
||||
const tagsAfterSecond = await tagsManager.list();
|
||||
expect(Object.keys(tagsAfterSecond).length).toBe(2);
|
||||
expect(tagsAfterSecond).toHaveProperty(tag1);
|
||||
expect(tagsAfterSecond[tag1].version).toBe(1);
|
||||
expect(tagsAfterSecond).toHaveProperty("tag2");
|
||||
expect(tagsAfterSecond["tag2"].version).toBe(2);
|
||||
|
||||
await table.add([{ id: 3n, vector: [0.5, 0.6] }]);
|
||||
await tagsManager.update(tag1, 3);
|
||||
expect(await tagsManager.getVersion(tag1)).toBe(3);
|
||||
|
||||
await tagsManager.delete("tag2");
|
||||
const tagsAfterDelete = await tagsManager.list();
|
||||
expect(Object.keys(tagsAfterDelete).length).toBe(1);
|
||||
expect(tagsAfterDelete).toHaveProperty(tag1);
|
||||
expect(tagsAfterDelete[tag1].version).toBe(3);
|
||||
|
||||
await table.add([{ id: 4n, vector: [0.7, 0.8] }]);
|
||||
expect(await table.version()).toBe(4);
|
||||
|
||||
await table.checkout(tag1);
|
||||
expect(await table.version()).toBe(3);
|
||||
|
||||
await table.checkoutLatest();
|
||||
expect(await table.version()).toBe(4);
|
||||
});
|
||||
});
|
||||
|
||||
describe("when optimizing a dataset", () => {
|
||||
let tmpDir: tmp.DirResult;
|
||||
let table: Table;
|
||||
|
||||
@@ -639,8 +639,9 @@ function transposeData(
|
||||
): Vector {
|
||||
if (field.type instanceof Struct) {
|
||||
const childFields = field.type.children;
|
||||
const fullPath = [...path, field.name];
|
||||
const childVectors = childFields.map((child) => {
|
||||
return transposeData(data, child, [...path, child.name]);
|
||||
return transposeData(data, child, fullPath);
|
||||
});
|
||||
const structData = makeData({
|
||||
type: field.type,
|
||||
@@ -652,7 +653,14 @@ function transposeData(
|
||||
const values = data.map((datum) => {
|
||||
let current: unknown = datum;
|
||||
for (const key of valuesPath) {
|
||||
if (isObject(current) && Object.hasOwn(current, key)) {
|
||||
if (current == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (
|
||||
isObject(current) &&
|
||||
(Object.hasOwn(current, key) || key in current)
|
||||
) {
|
||||
current = current[key];
|
||||
} else {
|
||||
return null;
|
||||
|
||||
@@ -23,6 +23,12 @@ export {
|
||||
OptimizeStats,
|
||||
CompactionStats,
|
||||
RemovalStats,
|
||||
TableStatistics,
|
||||
FragmentStatistics,
|
||||
FragmentSummaryStats,
|
||||
Tags,
|
||||
TagContents,
|
||||
MergeStats,
|
||||
} from "./native.js";
|
||||
|
||||
export {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
import { Data, Schema, fromDataToBuffer } from "./arrow";
|
||||
import { NativeMergeInsertBuilder } from "./native";
|
||||
import { MergeStats, NativeMergeInsertBuilder } from "./native";
|
||||
|
||||
/** A builder used to create and run a merge insert operation */
|
||||
export class MergeInsertBuilder {
|
||||
@@ -73,9 +73,9 @@ export class MergeInsertBuilder {
|
||||
/**
|
||||
* Executes the merge insert operation
|
||||
*
|
||||
* Nothing is returned but the `Table` is updated
|
||||
* @returns Statistics about the merge operation: counts of inserted, updated, and deleted rows
|
||||
*/
|
||||
async execute(data: Data): Promise<void> {
|
||||
async execute(data: Data): Promise<MergeStats> {
|
||||
let schema: Schema;
|
||||
if (this.#schema instanceof Promise) {
|
||||
schema = await this.#schema;
|
||||
@@ -84,6 +84,6 @@ export class MergeInsertBuilder {
|
||||
schema = this.#schema;
|
||||
}
|
||||
const buffer = await fromDataToBuffer(data, undefined, schema);
|
||||
await this.#native.execute(buffer);
|
||||
return await this.#native.execute(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,8 @@ import {
|
||||
IndexConfig,
|
||||
IndexStatistics,
|
||||
OptimizeStats,
|
||||
TableStatistics,
|
||||
Tags,
|
||||
Table as _NativeTable,
|
||||
} from "./native";
|
||||
import {
|
||||
@@ -374,7 +376,7 @@ export abstract class Table {
|
||||
*
|
||||
* Calling this method will set the table into time-travel mode. If you
|
||||
* wish to return to standard mode, call `checkoutLatest`.
|
||||
* @param {number} version The version to checkout
|
||||
* @param {number | string} version The version to checkout, could be version number or tag
|
||||
* @example
|
||||
* ```typescript
|
||||
* import * as lancedb from "@lancedb/lancedb"
|
||||
@@ -390,7 +392,8 @@ export abstract class Table {
|
||||
* console.log(await table.version()); // 2
|
||||
* ```
|
||||
*/
|
||||
abstract checkout(version: number): Promise<void>;
|
||||
abstract checkout(version: number | string): Promise<void>;
|
||||
|
||||
/**
|
||||
* Checkout the latest version of the table. _This is an in-place operation._
|
||||
*
|
||||
@@ -404,6 +407,23 @@ export abstract class Table {
|
||||
*/
|
||||
abstract listVersions(): Promise<Version[]>;
|
||||
|
||||
/**
|
||||
* Get a tags manager for this table.
|
||||
*
|
||||
* Tags allow you to label specific versions of a table with a human-readable name.
|
||||
* The returned tags manager can be used to list, create, update, or delete tags.
|
||||
*
|
||||
* @returns {Tags} A tags manager for this table
|
||||
* @example
|
||||
* ```typescript
|
||||
* const tagsManager = await table.tags();
|
||||
* await tagsManager.create("v1", 1);
|
||||
* const tags = await tagsManager.list();
|
||||
* console.log(tags); // { "v1": { version: 1, manifestSize: ... } }
|
||||
* ```
|
||||
*/
|
||||
abstract tags(): Promise<Tags>;
|
||||
|
||||
/**
|
||||
* Restore the table to the currently checked out version
|
||||
*
|
||||
@@ -463,6 +483,13 @@ export abstract class Table {
|
||||
* Use {@link Table.listIndices} to find the names of the indices.
|
||||
*/
|
||||
abstract indexStats(name: string): Promise<IndexStatistics | undefined>;
|
||||
|
||||
/** Returns table and fragment statistics
|
||||
*
|
||||
* @returns {TableStatistics} The table and fragment statistics
|
||||
*
|
||||
*/
|
||||
abstract stats(): Promise<TableStatistics>;
|
||||
}
|
||||
|
||||
export class LocalTable extends Table {
|
||||
@@ -699,8 +726,11 @@ export class LocalTable extends Table {
|
||||
return await this.inner.version();
|
||||
}
|
||||
|
||||
async checkout(version: number): Promise<void> {
|
||||
await this.inner.checkout(version);
|
||||
async checkout(version: number | string): Promise<void> {
|
||||
if (typeof version === "string") {
|
||||
return this.inner.checkoutTag(version);
|
||||
}
|
||||
return this.inner.checkout(version);
|
||||
}
|
||||
|
||||
async checkoutLatest(): Promise<void> {
|
||||
@@ -719,6 +749,10 @@ export class LocalTable extends Table {
|
||||
await this.inner.restore();
|
||||
}
|
||||
|
||||
async tags(): Promise<Tags> {
|
||||
return await this.inner.tags();
|
||||
}
|
||||
|
||||
async optimize(options?: Partial<OptimizeOptions>): Promise<OptimizeStats> {
|
||||
let cleanupOlderThanMs;
|
||||
if (
|
||||
@@ -749,6 +783,11 @@ export class LocalTable extends Table {
|
||||
}
|
||||
return stats;
|
||||
}
|
||||
|
||||
async stats(): Promise<TableStatistics> {
|
||||
return await this.inner.stats();
|
||||
}
|
||||
|
||||
mergeInsert(on: string | string[]): MergeInsertBuilder {
|
||||
on = Array.isArray(on) ? on : [on];
|
||||
return new MergeInsertBuilder(this.inner.mergeInsert(on), this.schema());
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-darwin-arm64",
|
||||
"version": "0.19.0-beta.11",
|
||||
"version": "0.19.1-beta.1",
|
||||
"os": ["darwin"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.darwin-arm64.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-darwin-x64",
|
||||
"version": "0.19.0-beta.11",
|
||||
"version": "0.19.1-beta.1",
|
||||
"os": ["darwin"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.darwin-x64.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-gnu",
|
||||
"version": "0.19.0-beta.11",
|
||||
"version": "0.19.1-beta.1",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-musl",
|
||||
"version": "0.19.0-beta.11",
|
||||
"version": "0.19.1-beta.1",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-gnu",
|
||||
"version": "0.19.0-beta.11",
|
||||
"version": "0.19.1-beta.1",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-musl",
|
||||
"version": "0.19.0-beta.11",
|
||||
"version": "0.19.1-beta.1",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-arm64-msvc",
|
||||
"version": "0.19.0-beta.11",
|
||||
"version": "0.19.1-beta.1",
|
||||
"os": [
|
||||
"win32"
|
||||
],
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-x64-msvc",
|
||||
"version": "0.19.0-beta.11",
|
||||
"version": "0.19.1-beta.1",
|
||||
"os": ["win32"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.win32-x64-msvc.node",
|
||||
|
||||
4
nodejs/package-lock.json
generated
4
nodejs/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.19.0-beta.11",
|
||||
"version": "0.19.1-beta.1",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.19.0-beta.11",
|
||||
"version": "0.19.1-beta.1",
|
||||
"cpu": [
|
||||
"x64",
|
||||
"arm64"
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
"ann"
|
||||
],
|
||||
"private": false,
|
||||
"version": "0.19.0-beta.11",
|
||||
"version": "0.19.1-beta.1",
|
||||
"main": "dist/index.js",
|
||||
"exports": {
|
||||
".": "./dist/index.js",
|
||||
|
||||
@@ -37,7 +37,7 @@ impl NativeMergeInsertBuilder {
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn execute(&self, buf: Buffer) -> napi::Result<()> {
|
||||
pub async fn execute(&self, buf: Buffer) -> napi::Result<MergeStats> {
|
||||
let data = ipc_file_to_batches(buf.to_vec())
|
||||
.and_then(IntoArrow::into_arrow)
|
||||
.map_err(|e| {
|
||||
@@ -46,12 +46,14 @@ impl NativeMergeInsertBuilder {
|
||||
|
||||
let this = self.clone();
|
||||
|
||||
this.inner.execute(data).await.map_err(|e| {
|
||||
let stats = this.inner.execute(data).await.map_err(|e| {
|
||||
napi::Error::from_reason(format!(
|
||||
"Failed to execute merge insert: {}",
|
||||
convert_error(&e)
|
||||
))
|
||||
})
|
||||
})?;
|
||||
|
||||
Ok(stats.into())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,3 +62,20 @@ impl From<MergeInsertBuilder> for NativeMergeInsertBuilder {
|
||||
Self { inner }
|
||||
}
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct MergeStats {
|
||||
pub num_inserted_rows: BigInt,
|
||||
pub num_updated_rows: BigInt,
|
||||
pub num_deleted_rows: BigInt,
|
||||
}
|
||||
|
||||
impl From<lancedb::table::MergeStats> for MergeStats {
|
||||
fn from(stats: lancedb::table::MergeStats) -> Self {
|
||||
Self {
|
||||
num_inserted_rows: stats.num_inserted_rows.into(),
|
||||
num_updated_rows: stats.num_updated_rows.into(),
|
||||
num_deleted_rows: stats.num_deleted_rows.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,6 +157,12 @@ impl Table {
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn stats(&self) -> Result<TableStatistics> {
|
||||
let stats = self.inner_ref()?.stats().await.default_error()?;
|
||||
Ok(stats.into())
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn update(
|
||||
&self,
|
||||
@@ -249,6 +255,14 @@ impl Table {
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn checkout_tag(&self, tag: String) -> napi::Result<()> {
|
||||
self.inner_ref()?
|
||||
.checkout_tag(tag.as_str())
|
||||
.await
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn checkout_latest(&self) -> napi::Result<()> {
|
||||
self.inner_ref()?.checkout_latest().await.default_error()
|
||||
@@ -281,6 +295,13 @@ impl Table {
|
||||
self.inner_ref()?.restore().await.default_error()
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn tags(&self) -> napi::Result<Tags> {
|
||||
Ok(Tags {
|
||||
inner: self.inner_ref()?.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn optimize(
|
||||
&self,
|
||||
@@ -540,9 +561,158 @@ impl From<lancedb::index::IndexStatistics> for IndexStatistics {
|
||||
}
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct TableStatistics {
|
||||
/// The total number of bytes in the table
|
||||
pub total_bytes: i64,
|
||||
|
||||
/// The number of rows in the table
|
||||
pub num_rows: i64,
|
||||
|
||||
/// The number of indices in the table
|
||||
pub num_indices: i64,
|
||||
|
||||
/// Statistics on table fragments
|
||||
pub fragment_stats: FragmentStatistics,
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct FragmentStatistics {
|
||||
/// The number of fragments in the table
|
||||
pub num_fragments: i64,
|
||||
|
||||
/// The number of uncompacted fragments in the table
|
||||
pub num_small_fragments: i64,
|
||||
|
||||
/// Statistics on the number of rows in the table fragments
|
||||
pub lengths: FragmentSummaryStats,
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct FragmentSummaryStats {
|
||||
/// The number of rows in the fragment with the fewest rows
|
||||
pub min: i64,
|
||||
|
||||
/// The number of rows in the fragment with the most rows
|
||||
pub max: i64,
|
||||
|
||||
/// The mean number of rows in the fragments
|
||||
pub mean: i64,
|
||||
|
||||
/// The 25th percentile of number of rows in the fragments
|
||||
pub p25: i64,
|
||||
|
||||
/// The 50th percentile of number of rows in the fragments
|
||||
pub p50: i64,
|
||||
|
||||
/// The 75th percentile of number of rows in the fragments
|
||||
pub p75: i64,
|
||||
|
||||
/// The 99th percentile of number of rows in the fragments
|
||||
pub p99: i64,
|
||||
}
|
||||
|
||||
impl From<lancedb::table::TableStatistics> for TableStatistics {
|
||||
fn from(v: lancedb::table::TableStatistics) -> Self {
|
||||
Self {
|
||||
total_bytes: v.total_bytes as i64,
|
||||
num_rows: v.num_rows as i64,
|
||||
num_indices: v.num_indices as i64,
|
||||
fragment_stats: FragmentStatistics {
|
||||
num_fragments: v.fragment_stats.num_fragments as i64,
|
||||
num_small_fragments: v.fragment_stats.num_small_fragments as i64,
|
||||
lengths: FragmentSummaryStats {
|
||||
min: v.fragment_stats.lengths.min as i64,
|
||||
max: v.fragment_stats.lengths.max as i64,
|
||||
mean: v.fragment_stats.lengths.mean as i64,
|
||||
p25: v.fragment_stats.lengths.p25 as i64,
|
||||
p50: v.fragment_stats.lengths.p50 as i64,
|
||||
p75: v.fragment_stats.lengths.p75 as i64,
|
||||
p99: v.fragment_stats.lengths.p99 as i64,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[napi(object)]
|
||||
pub struct Version {
|
||||
pub version: i64,
|
||||
pub timestamp: i64,
|
||||
pub metadata: HashMap<String, String>,
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub struct TagContents {
|
||||
pub version: i64,
|
||||
pub manifest_size: i64,
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub struct Tags {
|
||||
inner: LanceDbTable,
|
||||
}
|
||||
|
||||
#[napi]
|
||||
impl Tags {
|
||||
#[napi]
|
||||
pub async fn list(&self) -> napi::Result<HashMap<String, TagContents>> {
|
||||
let rust_tags = self.inner.tags().await.default_error()?;
|
||||
let tag_list = rust_tags.as_ref().list().await.default_error()?;
|
||||
let tag_contents = tag_list
|
||||
.into_iter()
|
||||
.map(|(k, v)| {
|
||||
(
|
||||
k,
|
||||
TagContents {
|
||||
version: v.version as i64,
|
||||
manifest_size: v.manifest_size as i64,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(tag_contents)
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub async fn get_version(&self, tag: String) -> napi::Result<i64> {
|
||||
let rust_tags = self.inner.tags().await.default_error()?;
|
||||
rust_tags
|
||||
.as_ref()
|
||||
.get_version(tag.as_str())
|
||||
.await
|
||||
.map(|v| v as i64)
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub async unsafe fn create(&mut self, tag: String, version: i64) -> napi::Result<()> {
|
||||
let mut rust_tags = self.inner.tags().await.default_error()?;
|
||||
rust_tags
|
||||
.as_mut()
|
||||
.create(tag.as_str(), version as u64)
|
||||
.await
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub async unsafe fn delete(&mut self, tag: String) -> napi::Result<()> {
|
||||
let mut rust_tags = self.inner.tags().await.default_error()?;
|
||||
rust_tags
|
||||
.as_mut()
|
||||
.delete(tag.as_str())
|
||||
.await
|
||||
.default_error()
|
||||
}
|
||||
|
||||
#[napi]
|
||||
pub async unsafe fn update(&mut self, tag: String, version: i64) -> napi::Result<()> {
|
||||
let mut rust_tags = self.inner.tags().await.default_error()?;
|
||||
rust_tags
|
||||
.as_mut()
|
||||
.update(tag.as_str(), version as u64)
|
||||
.await
|
||||
.default_error()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.22.0"
|
||||
current_version = "0.22.1-beta.1"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb-python"
|
||||
version = "0.22.0"
|
||||
version = "0.22.1-beta.1"
|
||||
edition.workspace = true
|
||||
description = "Python bindings for LanceDB"
|
||||
license.workspace = true
|
||||
|
||||
@@ -7,7 +7,7 @@ dependencies = [
|
||||
"numpy",
|
||||
"overrides>=0.7",
|
||||
"packaging",
|
||||
"pyarrow>=14",
|
||||
"pyarrow>=16",
|
||||
"pydantic>=1.10",
|
||||
"tqdm>=4.27.0",
|
||||
]
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from datetime import timedelta
|
||||
from typing import Dict, List, Optional, Tuple, Any, Union, Literal
|
||||
from typing import Dict, List, Optional, Tuple, Any, TypedDict, Union, Literal
|
||||
|
||||
import pyarrow as pa
|
||||
|
||||
@@ -47,7 +47,7 @@ class Table:
|
||||
): ...
|
||||
async def list_versions(self) -> List[Dict[str, Any]]: ...
|
||||
async def version(self) -> int: ...
|
||||
async def checkout(self, version: int): ...
|
||||
async def checkout(self, version: Union[int, str]): ...
|
||||
async def checkout_latest(self): ...
|
||||
async def restore(self, version: Optional[int] = None): ...
|
||||
async def list_indices(self) -> list[IndexConfig]: ...
|
||||
@@ -61,9 +61,18 @@ class Table:
|
||||
cleanup_since_ms: Optional[int] = None,
|
||||
delete_unverified: Optional[bool] = None,
|
||||
) -> OptimizeStats: ...
|
||||
@property
|
||||
def tags(self) -> Tags: ...
|
||||
def query(self) -> Query: ...
|
||||
def vector_search(self) -> VectorQuery: ...
|
||||
|
||||
class Tags:
|
||||
async def list(self) -> Dict[str, Tag]: ...
|
||||
async def get_version(self, tag: str) -> int: ...
|
||||
async def create(self, tag: str, version: int): ...
|
||||
async def delete(self, tag: str): ...
|
||||
async def update(self, tag: str, version: int): ...
|
||||
|
||||
class IndexConfig:
|
||||
index_type: str
|
||||
columns: List[str]
|
||||
@@ -195,3 +204,7 @@ class RemovalStats:
|
||||
class OptimizeStats:
|
||||
compaction: CompactionStats
|
||||
prune: RemovalStats
|
||||
|
||||
class Tag(TypedDict):
|
||||
version: int
|
||||
manifest_size: int
|
||||
|
||||
@@ -1636,51 +1636,7 @@ class LanceHybridQueryBuilder(LanceQueryBuilder):
|
||||
raise NotImplementedError("to_query_object not yet supported on a hybrid query")
|
||||
|
||||
def to_arrow(self, *, timeout: Optional[timedelta] = None) -> pa.Table:
|
||||
vector_query, fts_query = self._validate_query(
|
||||
self._query, self._vector, self._text
|
||||
)
|
||||
self._fts_query = LanceFtsQueryBuilder(
|
||||
self._table, fts_query, fts_columns=self._fts_columns
|
||||
)
|
||||
vector_query = self._query_to_vector(
|
||||
self._table, vector_query, self._vector_column
|
||||
)
|
||||
self._vector_query = LanceVectorQueryBuilder(
|
||||
self._table, vector_query, self._vector_column
|
||||
)
|
||||
|
||||
if self._limit:
|
||||
self._vector_query.limit(self._limit)
|
||||
self._fts_query.limit(self._limit)
|
||||
if self._columns:
|
||||
self._vector_query.select(self._columns)
|
||||
self._fts_query.select(self._columns)
|
||||
if self._where:
|
||||
self._vector_query.where(self._where, self._postfilter)
|
||||
self._fts_query.where(self._where, self._postfilter)
|
||||
if self._with_row_id:
|
||||
self._vector_query.with_row_id(True)
|
||||
self._fts_query.with_row_id(True)
|
||||
if self._phrase_query:
|
||||
self._fts_query.phrase_query(True)
|
||||
if self._distance_type:
|
||||
self._vector_query.metric(self._distance_type)
|
||||
if self._nprobes:
|
||||
self._vector_query.nprobes(self._nprobes)
|
||||
if self._refine_factor:
|
||||
self._vector_query.refine_factor(self._refine_factor)
|
||||
if self._ef:
|
||||
self._vector_query.ef(self._ef)
|
||||
if self._bypass_vector_index:
|
||||
self._vector_query.bypass_vector_index()
|
||||
if self._lower_bound or self._upper_bound:
|
||||
self._vector_query.distance_range(
|
||||
lower_bound=self._lower_bound, upper_bound=self._upper_bound
|
||||
)
|
||||
|
||||
if self._reranker is None:
|
||||
self._reranker = RRFReranker()
|
||||
|
||||
self._create_query_builders()
|
||||
with ThreadPoolExecutor() as executor:
|
||||
fts_future = executor.submit(
|
||||
self._fts_query.with_row_id(True).to_arrow, timeout=timeout
|
||||
@@ -2003,6 +1959,112 @@ class LanceHybridQueryBuilder(LanceQueryBuilder):
|
||||
self._bypass_vector_index = True
|
||||
return self
|
||||
|
||||
def explain_plan(self, verbose: Optional[bool] = False) -> str:
|
||||
"""Return the execution plan for this query.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> import lancedb
|
||||
>>> db = lancedb.connect("./.lancedb")
|
||||
>>> table = db.create_table("my_table", [{"vector": [99.0, 99]}])
|
||||
>>> query = [100, 100]
|
||||
>>> plan = table.search(query).explain_plan(True)
|
||||
>>> print(plan) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
|
||||
ProjectionExec: expr=[vector@0 as vector, _distance@2 as _distance]
|
||||
GlobalLimitExec: skip=0, fetch=10
|
||||
FilterExec: _distance@2 IS NOT NULL
|
||||
SortExec: TopK(fetch=10), expr=[_distance@2 ASC NULLS LAST], preserve_partitioning=[false]
|
||||
KNNVectorDistance: metric=l2
|
||||
LanceScan: uri=..., projection=[vector], row_id=true, row_addr=false, ordered=false
|
||||
|
||||
Parameters
|
||||
----------
|
||||
verbose : bool, default False
|
||||
Use a verbose output format.
|
||||
|
||||
Returns
|
||||
-------
|
||||
plan : str
|
||||
""" # noqa: E501
|
||||
self._create_query_builders()
|
||||
|
||||
results = ["Vector Search Plan:"]
|
||||
results.append(
|
||||
self._table._explain_plan(
|
||||
self._vector_query.to_query_object(), verbose=verbose
|
||||
)
|
||||
)
|
||||
results.append("FTS Search Plan:")
|
||||
results.append(
|
||||
self._table._explain_plan(
|
||||
self._fts_query.to_query_object(), verbose=verbose
|
||||
)
|
||||
)
|
||||
return "\n".join(results)
|
||||
|
||||
def analyze_plan(self):
|
||||
"""Execute the query and display with runtime metrics.
|
||||
|
||||
Returns
|
||||
-------
|
||||
plan : str
|
||||
"""
|
||||
self._create_query_builders()
|
||||
|
||||
results = ["Vector Search Plan:"]
|
||||
results.append(self._table._analyze_plan(self._vector_query.to_query_object()))
|
||||
results.append("FTS Search Plan:")
|
||||
results.append(self._table._analyze_plan(self._fts_query.to_query_object()))
|
||||
return "\n".join(results)
|
||||
|
||||
def _create_query_builders(self):
|
||||
"""Set up and configure the vector and FTS query builders."""
|
||||
vector_query, fts_query = self._validate_query(
|
||||
self._query, self._vector, self._text
|
||||
)
|
||||
self._fts_query = LanceFtsQueryBuilder(
|
||||
self._table, fts_query, fts_columns=self._fts_columns
|
||||
)
|
||||
vector_query = self._query_to_vector(
|
||||
self._table, vector_query, self._vector_column
|
||||
)
|
||||
self._vector_query = LanceVectorQueryBuilder(
|
||||
self._table, vector_query, self._vector_column
|
||||
)
|
||||
|
||||
# Apply common configurations
|
||||
if self._limit:
|
||||
self._vector_query.limit(self._limit)
|
||||
self._fts_query.limit(self._limit)
|
||||
if self._columns:
|
||||
self._vector_query.select(self._columns)
|
||||
self._fts_query.select(self._columns)
|
||||
if self._where:
|
||||
self._vector_query.where(self._where, self._postfilter)
|
||||
self._fts_query.where(self._where, self._postfilter)
|
||||
if self._with_row_id:
|
||||
self._vector_query.with_row_id(True)
|
||||
self._fts_query.with_row_id(True)
|
||||
if self._phrase_query:
|
||||
self._fts_query.phrase_query(True)
|
||||
if self._distance_type:
|
||||
self._vector_query.metric(self._distance_type)
|
||||
if self._nprobes:
|
||||
self._vector_query.nprobes(self._nprobes)
|
||||
if self._refine_factor:
|
||||
self._vector_query.refine_factor(self._refine_factor)
|
||||
if self._ef:
|
||||
self._vector_query.ef(self._ef)
|
||||
if self._bypass_vector_index:
|
||||
self._vector_query.bypass_vector_index()
|
||||
if self._lower_bound or self._upper_bound:
|
||||
self._vector_query.distance_range(
|
||||
lower_bound=self._lower_bound, upper_bound=self._upper_bound
|
||||
)
|
||||
|
||||
if self._reranker is None:
|
||||
self._reranker = RRFReranker()
|
||||
|
||||
|
||||
class AsyncQueryBase(object):
|
||||
def __init__(self, inner: Union[LanceQuery, LanceVectorQuery]):
|
||||
|
||||
@@ -18,7 +18,7 @@ from lancedb.merge import LanceMergeInsertBuilder
|
||||
from lancedb.embeddings import EmbeddingFunctionRegistry
|
||||
|
||||
from ..query import LanceVectorQueryBuilder, LanceQueryBuilder
|
||||
from ..table import AsyncTable, IndexStatistics, Query, Table
|
||||
from ..table import AsyncTable, IndexStatistics, Query, Table, Tags
|
||||
|
||||
|
||||
class RemoteTable(Table):
|
||||
@@ -54,6 +54,10 @@ class RemoteTable(Table):
|
||||
"""Get the current version of the table"""
|
||||
return LOOP.run(self._table.version())
|
||||
|
||||
@property
|
||||
def tags(self) -> Tags:
|
||||
return Tags(self._table)
|
||||
|
||||
@cached_property
|
||||
def embedding_functions(self) -> Dict[str, EmbeddingFunctionConfig]:
|
||||
"""
|
||||
@@ -81,7 +85,7 @@ class RemoteTable(Table):
|
||||
"""to_pandas() is not yet supported on LanceDB cloud."""
|
||||
return NotImplementedError("to_pandas() is not yet supported on LanceDB cloud.")
|
||||
|
||||
def checkout(self, version: int):
|
||||
def checkout(self, version: Union[int, str]):
|
||||
return LOOP.run(self._table.checkout(version))
|
||||
|
||||
def checkout_latest(self):
|
||||
@@ -574,6 +578,9 @@ class RemoteTable(Table):
|
||||
):
|
||||
return LOOP.run(self._table.wait_for_index(index_names, timeout))
|
||||
|
||||
def stats(self):
|
||||
return LOOP.run(self._table.stats())
|
||||
|
||||
def uses_v2_manifest_paths(self) -> bool:
|
||||
raise NotImplementedError(
|
||||
"uses_v2_manifest_paths() is not supported on the LanceDB Cloud"
|
||||
|
||||
@@ -77,6 +77,7 @@ if TYPE_CHECKING:
|
||||
OptimizeStats,
|
||||
CleanupStats,
|
||||
CompactionStats,
|
||||
Tag,
|
||||
)
|
||||
from .db import LanceDBConnection
|
||||
from .index import IndexConfig
|
||||
@@ -582,6 +583,35 @@ class Table(ABC):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def tags(self) -> Tags:
|
||||
"""Tag management for the table.
|
||||
|
||||
Similar to Git, tags are a way to add metadata to a specific version of the
|
||||
table.
|
||||
|
||||
.. warning::
|
||||
|
||||
Tagged versions are exempted from the :py:meth:`cleanup_old_versions()`
|
||||
process.
|
||||
|
||||
To remove a version that has been tagged, you must first
|
||||
:py:meth:`~Tags.delete` the associated tag.
|
||||
|
||||
Examples
|
||||
--------
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
table = db.open_table("my_table")
|
||||
table.tags.create("v2-prod-20250203", 10)
|
||||
|
||||
tags = table.tags.list()
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def embedding_functions(self) -> Dict[str, EmbeddingFunctionConfig]:
|
||||
@@ -709,6 +739,13 @@ class Table(ABC):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def stats(self) -> TableStatistics:
|
||||
"""
|
||||
Retrieve table and fragment statistics.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def create_scalar_index(
|
||||
self,
|
||||
@@ -925,10 +962,12 @@ class Table(ABC):
|
||||
>>> table = db.create_table("my_table", data)
|
||||
>>> new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
|
||||
>>> # Perform a "upsert" operation
|
||||
>>> table.merge_insert("a") \\
|
||||
>>> stats = table.merge_insert("a") \\
|
||||
... .when_matched_update_all() \\
|
||||
... .when_not_matched_insert_all() \\
|
||||
... .execute(new_data)
|
||||
>>> stats
|
||||
{'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0}
|
||||
>>> # The order of new rows is non-deterministic since we use
|
||||
>>> # a hash-join as part of this operation and so we sort here
|
||||
>>> table.to_arrow().sort_by("a").to_pandas()
|
||||
@@ -1354,7 +1393,7 @@ class Table(ABC):
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def checkout(self, version: int):
|
||||
def checkout(self, version: Union[int, str]):
|
||||
"""
|
||||
Checks out a specific version of the Table
|
||||
|
||||
@@ -1369,6 +1408,12 @@ class Table(ABC):
|
||||
Any operation that modifies the table will fail while the table is in a checked
|
||||
out state.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
version: int | str,
|
||||
The version to check out. A version number (`int`) or a tag
|
||||
(`str`) can be provided.
|
||||
|
||||
To return the table to a normal state use `[Self::checkout_latest]`
|
||||
"""
|
||||
|
||||
@@ -1538,7 +1583,45 @@ class LanceTable(Table):
|
||||
"""Get the current version of the table"""
|
||||
return LOOP.run(self._table.version())
|
||||
|
||||
def checkout(self, version: int):
|
||||
@property
|
||||
def tags(self) -> Tags:
|
||||
"""Tag management for the table.
|
||||
|
||||
Similar to Git, tags are a way to add metadata to a specific version of the
|
||||
table.
|
||||
|
||||
.. warning::
|
||||
|
||||
Tagged versions are exempted from the :py:meth:`cleanup_old_versions()`
|
||||
process.
|
||||
|
||||
To remove a version that has been tagged, you must first
|
||||
:py:meth:`~Tags.delete` the associated tag.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Tags
|
||||
The tag manager for managing tags for the table.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> import lancedb
|
||||
>>> db = lancedb.connect("./.lancedb")
|
||||
>>> table = db.create_table("my_table",
|
||||
... [{"vector": [1.1, 0.9], "type": "vector"}])
|
||||
>>> table.tags.create("v1", table.version)
|
||||
>>> table.add([{"vector": [0.5, 0.2], "type": "vector"}])
|
||||
>>> tags = table.tags.list()
|
||||
>>> print(tags["v1"]["version"])
|
||||
1
|
||||
>>> table.checkout("v1")
|
||||
>>> table.to_pandas()
|
||||
vector type
|
||||
0 [1.1, 0.9] vector
|
||||
"""
|
||||
return Tags(self._table)
|
||||
|
||||
def checkout(self, version: Union[int, str]):
|
||||
"""Checkout a version of the table. This is an in-place operation.
|
||||
|
||||
This allows viewing previous versions of the table. If you wish to
|
||||
@@ -1550,8 +1633,9 @@ class LanceTable(Table):
|
||||
|
||||
Parameters
|
||||
----------
|
||||
version : int
|
||||
The version to checkout.
|
||||
version: int | str,
|
||||
The version to check out. A version number (`int`) or a tag
|
||||
(`str`) can be provided.
|
||||
|
||||
Examples
|
||||
--------
|
||||
@@ -1801,6 +1885,9 @@ class LanceTable(Table):
|
||||
) -> None:
|
||||
return LOOP.run(self._table.wait_for_index(index_names, timeout))
|
||||
|
||||
def stats(self) -> TableStatistics:
|
||||
return LOOP.run(self._table.stats())
|
||||
|
||||
def create_scalar_index(
|
||||
self,
|
||||
column: str,
|
||||
@@ -2404,7 +2491,9 @@ class LanceTable(Table):
|
||||
on_bad_vectors: OnBadVectorsType,
|
||||
fill_value: float,
|
||||
):
|
||||
LOOP.run(self._table._do_merge(merge, new_data, on_bad_vectors, fill_value))
|
||||
return LOOP.run(
|
||||
self._table._do_merge(merge, new_data, on_bad_vectors, fill_value)
|
||||
)
|
||||
|
||||
@deprecation.deprecated(
|
||||
deprecated_in="0.21.0",
|
||||
@@ -3095,6 +3184,12 @@ class AsyncTable:
|
||||
"""
|
||||
await self._inner.wait_for_index(index_names, timeout)
|
||||
|
||||
async def stats(self) -> TableStatistics:
|
||||
"""
|
||||
Retrieve table and fragment statistics.
|
||||
"""
|
||||
return await self._inner.stats()
|
||||
|
||||
async def add(
|
||||
self,
|
||||
data: DATA,
|
||||
@@ -3186,10 +3281,12 @@ class AsyncTable:
|
||||
>>> table = db.create_table("my_table", data)
|
||||
>>> new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
|
||||
>>> # Perform a "upsert" operation
|
||||
>>> table.merge_insert("a") \\
|
||||
>>> stats = table.merge_insert("a") \\
|
||||
... .when_matched_update_all() \\
|
||||
... .when_not_matched_insert_all() \\
|
||||
... .execute(new_data)
|
||||
>>> stats
|
||||
{'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0}
|
||||
>>> # The order of new rows is non-deterministic since we use
|
||||
>>> # a hash-join as part of this operation and so we sort here
|
||||
>>> table.to_arrow().sort_by("a").to_pandas()
|
||||
@@ -3545,7 +3642,7 @@ class AsyncTable:
|
||||
)
|
||||
if isinstance(data, pa.Table):
|
||||
data = pa.RecordBatchReader.from_batches(data.schema, data.to_batches())
|
||||
await self._inner.execute_merge_insert(
|
||||
return await self._inner.execute_merge_insert(
|
||||
data,
|
||||
dict(
|
||||
on=merge._on,
|
||||
@@ -3746,7 +3843,7 @@ class AsyncTable:
|
||||
|
||||
return versions
|
||||
|
||||
async def checkout(self, version: int):
|
||||
async def checkout(self, version: int | str):
|
||||
"""
|
||||
Checks out a specific version of the Table
|
||||
|
||||
@@ -3761,6 +3858,12 @@ class AsyncTable:
|
||||
Any operation that modifies the table will fail while the table is in a checked
|
||||
out state.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
version: int | str,
|
||||
The version to check out. A version number (`int`) or a tag
|
||||
(`str`) can be provided.
|
||||
|
||||
To return the table to a normal state use `[Self::checkout_latest]`
|
||||
"""
|
||||
try:
|
||||
@@ -3798,6 +3901,24 @@ class AsyncTable:
|
||||
"""
|
||||
await self._inner.restore(version)
|
||||
|
||||
@property
|
||||
def tags(self) -> AsyncTags:
|
||||
"""Tag management for the dataset.
|
||||
|
||||
Similar to Git, tags are a way to add metadata to a specific version of the
|
||||
dataset.
|
||||
|
||||
.. warning::
|
||||
|
||||
Tagged versions are exempted from the
|
||||
:py:meth:`optimize(cleanup_older_than)` process.
|
||||
|
||||
To remove a version that has been tagged, you must first
|
||||
:py:meth:`~Tags.delete` the associated tag.
|
||||
|
||||
"""
|
||||
return AsyncTags(self._inner)
|
||||
|
||||
async def optimize(
|
||||
self,
|
||||
*,
|
||||
@@ -3967,3 +4088,217 @@ class IndexStatistics:
|
||||
# a dictionary instead of a class.
|
||||
def __getitem__(self, key):
|
||||
return getattr(self, key)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TableStatistics:
|
||||
"""
|
||||
Statistics about a table and fragments.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
total_bytes: int
|
||||
The total number of bytes in the table.
|
||||
num_rows: int
|
||||
The total number of rows in the table.
|
||||
num_indices: int
|
||||
The total number of indices in the table.
|
||||
fragment_stats: FragmentStatistics
|
||||
Statistics about fragments in the table.
|
||||
"""
|
||||
|
||||
total_bytes: int
|
||||
num_rows: int
|
||||
num_indices: int
|
||||
fragment_stats: FragmentStatistics
|
||||
|
||||
|
||||
@dataclass
|
||||
class FragmentStatistics:
|
||||
"""
|
||||
Statistics about fragments.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
num_fragments: int
|
||||
The total number of fragments in the table.
|
||||
num_small_fragments: int
|
||||
The total number of small fragments in the table.
|
||||
Small fragments have low row counts and may need to be compacted.
|
||||
lengths: FragmentSummaryStats
|
||||
Statistics about the number of rows in the table fragments.
|
||||
"""
|
||||
|
||||
num_fragments: int
|
||||
num_small_fragments: int
|
||||
lengths: FragmentSummaryStats
|
||||
|
||||
|
||||
@dataclass
|
||||
class FragmentSummaryStats:
|
||||
"""
|
||||
Statistics about fragments sizes
|
||||
|
||||
Attributes
|
||||
----------
|
||||
min: int
|
||||
The number of rows in the fragment with the fewest rows.
|
||||
max: int
|
||||
The number of rows in the fragment with the most rows.
|
||||
mean: int
|
||||
The mean number of rows in the fragments.
|
||||
p25: int
|
||||
The 25th percentile of number of rows in the fragments.
|
||||
p50: int
|
||||
The 50th percentile of number of rows in the fragments.
|
||||
p75: int
|
||||
The 75th percentile of number of rows in the fragments.
|
||||
p99: int
|
||||
The 99th percentile of number of rows in the fragments.
|
||||
"""
|
||||
|
||||
min: int
|
||||
max: int
|
||||
mean: int
|
||||
p25: int
|
||||
p50: int
|
||||
p75: int
|
||||
p99: int
|
||||
|
||||
|
||||
class Tags:
|
||||
"""
|
||||
Table tag manager.
|
||||
"""
|
||||
|
||||
def __init__(self, table):
|
||||
self._table = table
|
||||
|
||||
def list(self) -> Dict[str, Tag]:
|
||||
"""
|
||||
List all table tags.
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict[str, Tag]
|
||||
A dictionary mapping tag names to version numbers.
|
||||
"""
|
||||
return LOOP.run(self._table.tags.list())
|
||||
|
||||
def get_version(self, tag: str) -> int:
|
||||
"""
|
||||
Get the version of a tag.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
tag: str,
|
||||
The name of the tag to get the version for.
|
||||
"""
|
||||
return LOOP.run(self._table.tags.get_version(tag))
|
||||
|
||||
def create(self, tag: str, version: int) -> None:
|
||||
"""
|
||||
Create a tag for a given table version.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
tag: str,
|
||||
The name of the tag to create. This name must be unique among all tag
|
||||
names for the table.
|
||||
version: int,
|
||||
The table version to tag.
|
||||
"""
|
||||
LOOP.run(self._table.tags.create(tag, version))
|
||||
|
||||
def delete(self, tag: str) -> None:
|
||||
"""
|
||||
Delete tag from the table.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
tag: str,
|
||||
The name of the tag to delete.
|
||||
"""
|
||||
LOOP.run(self._table.tags.delete(tag))
|
||||
|
||||
def update(self, tag: str, version: int) -> None:
|
||||
"""
|
||||
Update tag to a new version.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
tag: str,
|
||||
The name of the tag to update.
|
||||
version: int,
|
||||
The new table version to tag.
|
||||
"""
|
||||
LOOP.run(self._table.tags.update(tag, version))
|
||||
|
||||
|
||||
class AsyncTags:
|
||||
"""
|
||||
Async table tag manager.
|
||||
"""
|
||||
|
||||
def __init__(self, table):
|
||||
self._table = table
|
||||
|
||||
async def list(self) -> Dict[str, Tag]:
|
||||
"""
|
||||
List all table tags.
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict[str, Tag]
|
||||
A dictionary mapping tag names to version numbers.
|
||||
"""
|
||||
return await self._table.tags.list()
|
||||
|
||||
async def get_version(self, tag: str) -> int:
|
||||
"""
|
||||
Get the version of a tag.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
tag: str,
|
||||
The name of the tag to get the version for.
|
||||
"""
|
||||
return await self._table.tags.get_version(tag)
|
||||
|
||||
async def create(self, tag: str, version: int) -> None:
|
||||
"""
|
||||
Create a tag for a given table version.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
tag: str,
|
||||
The name of the tag to create. This name must be unique among all tag
|
||||
names for the table.
|
||||
version: int,
|
||||
The table version to tag.
|
||||
"""
|
||||
await self._table.tags.create(tag, version)
|
||||
|
||||
async def delete(self, tag: str) -> None:
|
||||
"""
|
||||
Delete tag from the table.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
tag: str,
|
||||
The name of the tag to delete.
|
||||
"""
|
||||
await self._table.tags.delete(tag)
|
||||
|
||||
async def update(self, tag: str, version: int) -> None:
|
||||
"""
|
||||
Update tag to a new version.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
tag: str,
|
||||
The name of the tag to update.
|
||||
version: int,
|
||||
The new table version to tag.
|
||||
"""
|
||||
await self._table.tags.update(tag, version)
|
||||
|
||||
@@ -18,15 +18,19 @@ def test_upsert(mem_db):
|
||||
{"id": 1, "name": "Bobby"},
|
||||
{"id": 2, "name": "Charlie"},
|
||||
]
|
||||
(
|
||||
stats = (
|
||||
table.merge_insert("id")
|
||||
.when_matched_update_all()
|
||||
.when_not_matched_insert_all()
|
||||
.execute(new_users)
|
||||
)
|
||||
table.count_rows() # 3
|
||||
stats # {'num_inserted_rows': 1, 'num_updated_rows': 1, 'num_deleted_rows': 0}
|
||||
# --8<-- [end:upsert_basic]
|
||||
assert table.count_rows() == 3
|
||||
assert stats["num_inserted_rows"] == 1
|
||||
assert stats["num_updated_rows"] == 1
|
||||
assert stats["num_deleted_rows"] == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -44,15 +48,19 @@ async def test_upsert_async(mem_db_async):
|
||||
{"id": 1, "name": "Bobby"},
|
||||
{"id": 2, "name": "Charlie"},
|
||||
]
|
||||
await (
|
||||
stats = await (
|
||||
table.merge_insert("id")
|
||||
.when_matched_update_all()
|
||||
.when_not_matched_insert_all()
|
||||
.execute(new_users)
|
||||
)
|
||||
await table.count_rows() # 3
|
||||
stats # {'num_inserted_rows': 1, 'num_updated_rows': 1, 'num_deleted_rows': 0}
|
||||
# --8<-- [end:upsert_basic_async]
|
||||
assert await table.count_rows() == 3
|
||||
assert stats["num_inserted_rows"] == 1
|
||||
assert stats["num_updated_rows"] == 1
|
||||
assert stats["num_deleted_rows"] == 0
|
||||
|
||||
|
||||
def test_insert_if_not_exists(mem_db):
|
||||
@@ -69,10 +77,16 @@ def test_insert_if_not_exists(mem_db):
|
||||
{"domain": "google.com", "name": "Google"},
|
||||
{"domain": "facebook.com", "name": "Facebook"},
|
||||
]
|
||||
(table.merge_insert("domain").when_not_matched_insert_all().execute(new_domains))
|
||||
stats = (
|
||||
table.merge_insert("domain").when_not_matched_insert_all().execute(new_domains)
|
||||
)
|
||||
table.count_rows() # 3
|
||||
stats # {'num_inserted_rows': 1, 'num_updated_rows': 0, 'num_deleted_rows': 0}
|
||||
# --8<-- [end:insert_if_not_exists]
|
||||
assert table.count_rows() == 3
|
||||
assert stats["num_inserted_rows"] == 1
|
||||
assert stats["num_updated_rows"] == 0
|
||||
assert stats["num_deleted_rows"] == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -90,12 +104,16 @@ async def test_insert_if_not_exists_async(mem_db_async):
|
||||
{"domain": "google.com", "name": "Google"},
|
||||
{"domain": "facebook.com", "name": "Facebook"},
|
||||
]
|
||||
await (
|
||||
stats = await (
|
||||
table.merge_insert("domain").when_not_matched_insert_all().execute(new_domains)
|
||||
)
|
||||
await table.count_rows() # 3
|
||||
stats # {'num_inserted_rows': 1, 'num_updated_rows': 0, 'num_deleted_rows': 0}
|
||||
# --8<-- [end:insert_if_not_exists_async]
|
||||
assert await table.count_rows() == 3
|
||||
assert stats["num_inserted_rows"] == 1
|
||||
assert stats["num_updated_rows"] == 0
|
||||
assert stats["num_deleted_rows"] == 0
|
||||
|
||||
|
||||
def test_replace_range(mem_db):
|
||||
@@ -113,7 +131,7 @@ def test_replace_range(mem_db):
|
||||
new_chunks = [
|
||||
{"doc_id": 1, "chunk_id": 0, "text": "Baz"},
|
||||
]
|
||||
(
|
||||
stats = (
|
||||
table.merge_insert(["doc_id", "chunk_id"])
|
||||
.when_matched_update_all()
|
||||
.when_not_matched_insert_all()
|
||||
@@ -121,8 +139,12 @@ def test_replace_range(mem_db):
|
||||
.execute(new_chunks)
|
||||
)
|
||||
table.count_rows("doc_id = 1") # 1
|
||||
stats # {'num_inserted_rows': 0, 'num_updated_rows': 1, 'num_deleted_rows': 1}
|
||||
# --8<-- [end:replace_range]
|
||||
assert table.count_rows("doc_id = 1") == 1
|
||||
assert stats["num_inserted_rows"] == 0
|
||||
assert stats["num_updated_rows"] == 1
|
||||
assert stats["num_deleted_rows"] == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -141,7 +163,7 @@ async def test_replace_range_async(mem_db_async):
|
||||
new_chunks = [
|
||||
{"doc_id": 1, "chunk_id": 0, "text": "Baz"},
|
||||
]
|
||||
await (
|
||||
stats = await (
|
||||
table.merge_insert(["doc_id", "chunk_id"])
|
||||
.when_matched_update_all()
|
||||
.when_not_matched_insert_all()
|
||||
@@ -149,5 +171,9 @@ async def test_replace_range_async(mem_db_async):
|
||||
.execute(new_chunks)
|
||||
)
|
||||
await table.count_rows("doc_id = 1") # 1
|
||||
stats # {'num_inserted_rows': 0, 'num_updated_rows': 1, 'num_deleted_rows': 1}
|
||||
# --8<-- [end:replace_range_async]
|
||||
assert await table.count_rows("doc_id = 1") == 1
|
||||
assert stats["num_inserted_rows"] == 0
|
||||
assert stats["num_updated_rows"] == 1
|
||||
assert stats["num_deleted_rows"] == 1
|
||||
|
||||
@@ -389,6 +389,50 @@ def test_table_wait_for_index_timeout():
|
||||
table.wait_for_index(["id_idx"], timedelta(seconds=1))
|
||||
|
||||
|
||||
def test_stats():
|
||||
stats = {
|
||||
"total_bytes": 38,
|
||||
"num_rows": 2,
|
||||
"num_indices": 0,
|
||||
"fragment_stats": {
|
||||
"num_fragments": 1,
|
||||
"num_small_fragments": 1,
|
||||
"lengths": {
|
||||
"min": 2,
|
||||
"max": 2,
|
||||
"mean": 2,
|
||||
"p25": 2,
|
||||
"p50": 2,
|
||||
"p75": 2,
|
||||
"p99": 2,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
def handler(request):
|
||||
if request.path == "/v1/table/test/create/?mode=create":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
request.wfile.write(b"{}")
|
||||
elif request.path == "/v1/table/test/stats/":
|
||||
request.send_response(200)
|
||||
request.send_header("Content-Type", "application/json")
|
||||
request.end_headers()
|
||||
payload = json.dumps(stats)
|
||||
request.wfile.write(payload.encode())
|
||||
else:
|
||||
print(request.path)
|
||||
request.send_response(404)
|
||||
request.end_headers()
|
||||
|
||||
with mock_lancedb_connection(handler) as db:
|
||||
table = db.create_table("test", [{"id": 1}])
|
||||
res = table.stats()
|
||||
print(f"{res=}")
|
||||
assert res == stats
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def query_test_table(query_handler, *, server_version=Version("0.1.0")):
|
||||
def handler(request):
|
||||
|
||||
@@ -529,6 +529,113 @@ def test_versioning(mem_db: DBConnection):
|
||||
assert len(table) == 2
|
||||
|
||||
|
||||
def test_tags(mem_db: DBConnection):
|
||||
table = mem_db.create_table(
|
||||
"test",
|
||||
data=[
|
||||
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
|
||||
{"vector": [5.9, 26.5], "item": "bar", "price": 20.0},
|
||||
],
|
||||
)
|
||||
|
||||
table.tags.create("tag1", 1)
|
||||
tags = table.tags.list()
|
||||
assert "tag1" in tags
|
||||
assert tags["tag1"]["version"] == 1
|
||||
|
||||
table.add(
|
||||
data=[
|
||||
{"vector": [10.0, 11.0], "item": "baz", "price": 30.0},
|
||||
],
|
||||
)
|
||||
|
||||
table.tags.create("tag2", 2)
|
||||
tags = table.tags.list()
|
||||
assert "tag1" in tags
|
||||
assert "tag2" in tags
|
||||
assert tags["tag1"]["version"] == 1
|
||||
assert tags["tag2"]["version"] == 2
|
||||
|
||||
table.tags.delete("tag2")
|
||||
table.tags.update("tag1", 2)
|
||||
tags = table.tags.list()
|
||||
assert "tag1" in tags
|
||||
assert tags["tag1"]["version"] == 2
|
||||
|
||||
table.tags.update("tag1", 1)
|
||||
tags = table.tags.list()
|
||||
assert "tag1" in tags
|
||||
assert tags["tag1"]["version"] == 1
|
||||
|
||||
table.checkout("tag1")
|
||||
assert table.version == 1
|
||||
assert table.count_rows() == 2
|
||||
table.tags.create("tag2", 2)
|
||||
table.checkout("tag2")
|
||||
assert table.version == 2
|
||||
assert table.count_rows() == 3
|
||||
table.checkout_latest()
|
||||
table.add(
|
||||
data=[
|
||||
{"vector": [12.0, 13.0], "item": "baz", "price": 40.0},
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_tags(mem_db_async: AsyncConnection):
|
||||
table = await mem_db_async.create_table(
|
||||
"test",
|
||||
data=[
|
||||
{"vector": [3.1, 4.1], "item": "foo", "price": 10.0},
|
||||
{"vector": [5.9, 26.5], "item": "bar", "price": 20.0},
|
||||
],
|
||||
)
|
||||
|
||||
await table.tags.create("tag1", 1)
|
||||
tags = await table.tags.list()
|
||||
assert "tag1" in tags
|
||||
assert tags["tag1"]["version"] == 1
|
||||
|
||||
await table.add(
|
||||
data=[
|
||||
{"vector": [10.0, 11.0], "item": "baz", "price": 30.0},
|
||||
],
|
||||
)
|
||||
|
||||
await table.tags.create("tag2", 2)
|
||||
tags = await table.tags.list()
|
||||
assert "tag1" in tags
|
||||
assert "tag2" in tags
|
||||
assert tags["tag1"]["version"] == 1
|
||||
assert tags["tag2"]["version"] == 2
|
||||
|
||||
await table.tags.delete("tag2")
|
||||
await table.tags.update("tag1", 2)
|
||||
tags = await table.tags.list()
|
||||
assert "tag1" in tags
|
||||
assert tags["tag1"]["version"] == 2
|
||||
|
||||
await table.tags.update("tag1", 1)
|
||||
tags = await table.tags.list()
|
||||
assert "tag1" in tags
|
||||
assert tags["tag1"]["version"] == 1
|
||||
|
||||
await table.checkout("tag1")
|
||||
assert await table.version() == 1
|
||||
assert await table.count_rows() == 2
|
||||
await table.tags.create("tag2", 2)
|
||||
await table.checkout("tag2")
|
||||
assert await table.version() == 2
|
||||
assert await table.count_rows() == 3
|
||||
await table.checkout_latest()
|
||||
await table.add(
|
||||
data=[
|
||||
{"vector": [12.0, 13.0], "item": "baz", "price": 40.0},
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@patch("lancedb.table.AsyncTable.create_index")
|
||||
def test_create_index_method(mock_create_index, mem_db: DBConnection):
|
||||
table = mem_db.create_table(
|
||||
@@ -1588,3 +1695,31 @@ def test_replace_field_metadata(tmp_path):
|
||||
schema = table.schema
|
||||
field = schema[0].metadata
|
||||
assert field == {b"foo": b"bar"}
|
||||
|
||||
|
||||
def test_stats(mem_db: DBConnection):
|
||||
table = mem_db.create_table(
|
||||
"my_table",
|
||||
data=[{"text": "foo", "id": 0}, {"text": "bar", "id": 1}],
|
||||
)
|
||||
assert len(table) == 2
|
||||
stats = table.stats()
|
||||
print(f"{stats=}")
|
||||
assert stats == {
|
||||
"total_bytes": 38,
|
||||
"num_rows": 2,
|
||||
"num_indices": 0,
|
||||
"fragment_stats": {
|
||||
"num_fragments": 1,
|
||||
"num_small_fragments": 1,
|
||||
"lengths": {
|
||||
"min": 2,
|
||||
"max": 2,
|
||||
"mean": 2,
|
||||
"p25": 2,
|
||||
"p50": 2,
|
||||
"p75": 2,
|
||||
"p99": 2,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -2,6 +2,11 @@
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use crate::{
|
||||
error::PythonErrorExt,
|
||||
index::{extract_index_params, IndexConfig},
|
||||
query::Query,
|
||||
};
|
||||
use arrow::{
|
||||
datatypes::{DataType, Schema},
|
||||
ffi_stream::ArrowArrayStreamReader,
|
||||
@@ -12,19 +17,13 @@ use lancedb::table::{
|
||||
Table as LanceDbTable,
|
||||
};
|
||||
use pyo3::{
|
||||
exceptions::{PyKeyError, PyRuntimeError, PyValueError},
|
||||
exceptions::{PyIOError, PyKeyError, PyRuntimeError, PyValueError},
|
||||
pyclass, pymethods,
|
||||
types::{IntoPyDict, PyAnyMethods, PyDict, PyDictMethods},
|
||||
Bound, FromPyObject, PyAny, PyRef, PyResult, Python,
|
||||
types::{IntoPyDict, PyAnyMethods, PyDict, PyDictMethods, PyInt, PyString},
|
||||
Bound, FromPyObject, PyAny, PyObject, PyRef, PyResult, Python,
|
||||
};
|
||||
use pyo3_async_runtimes::tokio::future_into_py;
|
||||
|
||||
use crate::{
|
||||
error::PythonErrorExt,
|
||||
index::{extract_index_params, IndexConfig},
|
||||
query::Query,
|
||||
};
|
||||
|
||||
/// Statistics about a compaction operation.
|
||||
#[pyclass(get_all)]
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -280,6 +279,40 @@ impl Table {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn stats(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let stats = inner.stats().await.infer_error()?;
|
||||
Python::with_gil(|py| {
|
||||
let dict = PyDict::new(py);
|
||||
dict.set_item("total_bytes", stats.total_bytes)?;
|
||||
dict.set_item("num_rows", stats.num_rows)?;
|
||||
dict.set_item("num_indices", stats.num_indices)?;
|
||||
|
||||
let fragment_stats = PyDict::new(py);
|
||||
fragment_stats.set_item("num_fragments", stats.fragment_stats.num_fragments)?;
|
||||
fragment_stats.set_item(
|
||||
"num_small_fragments",
|
||||
stats.fragment_stats.num_small_fragments,
|
||||
)?;
|
||||
|
||||
let fragment_lengths = PyDict::new(py);
|
||||
fragment_lengths.set_item("min", stats.fragment_stats.lengths.min)?;
|
||||
fragment_lengths.set_item("max", stats.fragment_stats.lengths.max)?;
|
||||
fragment_lengths.set_item("mean", stats.fragment_stats.lengths.mean)?;
|
||||
fragment_lengths.set_item("p25", stats.fragment_stats.lengths.p25)?;
|
||||
fragment_lengths.set_item("p50", stats.fragment_stats.lengths.p50)?;
|
||||
fragment_lengths.set_item("p75", stats.fragment_stats.lengths.p75)?;
|
||||
fragment_lengths.set_item("p99", stats.fragment_stats.lengths.p99)?;
|
||||
|
||||
fragment_stats.set_item("lengths", fragment_lengths)?;
|
||||
dict.set_item("fragment_stats", fragment_stats)?;
|
||||
|
||||
Ok(Some(dict.unbind()))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
pub fn __repr__(&self) -> String {
|
||||
match &self.inner {
|
||||
None => format!("ClosedTable({})", self.name),
|
||||
@@ -322,10 +355,26 @@ impl Table {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn checkout(self_: PyRef<'_, Self>, version: u64) -> PyResult<Bound<'_, PyAny>> {
|
||||
pub fn checkout(self_: PyRef<'_, Self>, version: PyObject) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner_ref()?.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
inner.checkout(version).await.infer_error()
|
||||
let py = self_.py();
|
||||
let (is_int, int_value, string_value) = if let Ok(i) = version.downcast_bound::<PyInt>(py) {
|
||||
let num: u64 = i.extract()?;
|
||||
(true, num, String::new())
|
||||
} else if let Ok(s) = version.downcast_bound::<PyString>(py) {
|
||||
let str_value = s.to_string();
|
||||
(false, 0, str_value)
|
||||
} else {
|
||||
return Err(PyIOError::new_err(
|
||||
"version must be an integer or a string.",
|
||||
));
|
||||
};
|
||||
future_into_py(py, async move {
|
||||
if is_int {
|
||||
inner.checkout(int_value).await.infer_error()
|
||||
} else {
|
||||
inner.checkout_tag(&string_value).await.infer_error()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -352,6 +401,11 @@ impl Table {
|
||||
Query::new(self.inner_ref().unwrap().query())
|
||||
}
|
||||
|
||||
#[getter]
|
||||
pub fn tags(&self) -> PyResult<Tags> {
|
||||
Ok(Tags::new(self.inner_ref()?.clone()))
|
||||
}
|
||||
|
||||
/// Optimize the on-disk data by compacting and pruning old data, for better performance.
|
||||
#[pyo3(signature = (cleanup_since_ms=None, delete_unverified=None, retrain=None))]
|
||||
pub fn optimize(
|
||||
@@ -435,8 +489,14 @@ impl Table {
|
||||
}
|
||||
|
||||
future_into_py(self_.py(), async move {
|
||||
builder.execute(Box::new(batches)).await.infer_error()?;
|
||||
Ok(())
|
||||
let stats = builder.execute(Box::new(batches)).await.infer_error()?;
|
||||
Python::with_gil(|py| {
|
||||
let dict = PyDict::new(py);
|
||||
dict.set_item("num_inserted_rows", stats.num_inserted_rows)?;
|
||||
dict.set_item("num_updated_rows", stats.num_updated_rows)?;
|
||||
dict.set_item("num_deleted_rows", stats.num_deleted_rows)?;
|
||||
Ok(dict.unbind())
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@@ -586,3 +646,72 @@ pub struct MergeInsertParams {
|
||||
when_not_matched_by_source_delete: bool,
|
||||
when_not_matched_by_source_condition: Option<String>,
|
||||
}
|
||||
|
||||
#[pyclass]
|
||||
pub struct Tags {
|
||||
inner: LanceDbTable,
|
||||
}
|
||||
|
||||
impl Tags {
|
||||
pub fn new(table: LanceDbTable) -> Self {
|
||||
Self { inner: table }
|
||||
}
|
||||
}
|
||||
|
||||
#[pymethods]
|
||||
impl Tags {
|
||||
pub fn list(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let tags = inner.tags().await.infer_error()?;
|
||||
let res = tags.list().await.infer_error()?;
|
||||
|
||||
Python::with_gil(|py| {
|
||||
let py_dict = PyDict::new(py);
|
||||
for (key, contents) in res {
|
||||
let value_dict = PyDict::new(py);
|
||||
value_dict.set_item("version", contents.version)?;
|
||||
value_dict.set_item("manifest_size", contents.manifest_size)?;
|
||||
py_dict.set_item(key, value_dict)?;
|
||||
}
|
||||
Ok(py_dict.unbind())
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_version(self_: PyRef<'_, Self>, tag: String) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let tags = inner.tags().await.infer_error()?;
|
||||
let res = tags.get_version(tag.as_str()).await.infer_error()?;
|
||||
Ok(res)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn create(self_: PyRef<Self>, tag: String, version: u64) -> PyResult<Bound<PyAny>> {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let mut tags = inner.tags().await.infer_error()?;
|
||||
tags.create(tag.as_str(), version).await.infer_error()?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
pub fn delete(self_: PyRef<Self>, tag: String) -> PyResult<Bound<PyAny>> {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let mut tags = inner.tags().await.infer_error()?;
|
||||
tags.delete(tag.as_str()).await.infer_error()?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
pub fn update(self_: PyRef<Self>, tag: String, version: u64) -> PyResult<Bound<PyAny>> {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
let mut tags = inner.tags().await.infer_error()?;
|
||||
tags.update(tag.as_str(), version).await.infer_error()?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb-node"
|
||||
version = "0.19.0-beta.11"
|
||||
version = "0.19.1-beta.1"
|
||||
description = "Serverless, low-latency vector database for AI applications"
|
||||
license.workspace = true
|
||||
edition.workspace = true
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb"
|
||||
version = "0.19.0-beta.11"
|
||||
version = "0.19.1-beta.1"
|
||||
edition.workspace = true
|
||||
description = "LanceDB: A serverless, low-latency vector database for AI applications"
|
||||
license.workspace = true
|
||||
|
||||
@@ -4,7 +4,8 @@
|
||||
use crate::index::Index;
|
||||
use crate::index::IndexStatistics;
|
||||
use crate::query::{QueryFilter, QueryRequest, Select, VectorQueryRequest};
|
||||
use crate::table::{AddDataMode, AnyQuery, Filter};
|
||||
use crate::table::Tags;
|
||||
use crate::table::{AddDataMode, AnyQuery, Filter, TableStatistics};
|
||||
use crate::utils::{supported_btree_data_type, supported_vector_data_type};
|
||||
use crate::{DistanceType, Error, Table};
|
||||
use arrow_array::{RecordBatch, RecordBatchIterator, RecordBatchReader};
|
||||
@@ -18,11 +19,13 @@ use futures::TryStreamExt;
|
||||
use http::header::CONTENT_TYPE;
|
||||
use http::{HeaderName, StatusCode};
|
||||
use lance::arrow::json::{JsonDataType, JsonSchema};
|
||||
use lance::dataset::refs::TagContents;
|
||||
use lance::dataset::scanner::DatasetRecordBatchStream;
|
||||
use lance::dataset::{ColumnAlteration, NewColumnTransform, Version};
|
||||
use lance_datafusion::exec::{execute_plan, OneShotExec};
|
||||
use reqwest::{RequestBuilder, Response};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::io::Cursor;
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
@@ -44,9 +47,141 @@ use crate::{
|
||||
TableDefinition, UpdateBuilder,
|
||||
},
|
||||
};
|
||||
use lance::dataset::MergeStats;
|
||||
|
||||
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
|
||||
|
||||
pub struct RemoteTags<'a, S: HttpSend = Sender> {
|
||||
inner: &'a RemoteTable<S>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<S: HttpSend + 'static> Tags for RemoteTags<'_, S> {
|
||||
async fn list(&self) -> Result<HashMap<String, TagContents>> {
|
||||
let request = self
|
||||
.inner
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/tags/list/", self.inner.name));
|
||||
let (request_id, response) = self.inner.send(request, true).await?;
|
||||
let response = self
|
||||
.inner
|
||||
.check_table_response(&request_id, response)
|
||||
.await?;
|
||||
|
||||
match response.text().await {
|
||||
Ok(body) => {
|
||||
// Explicitly tell serde_json what type we want to deserialize into
|
||||
let tags_map: HashMap<String, TagContents> =
|
||||
serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
source: format!("Failed to parse tags list: {}", e).into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
Ok(tags_map)
|
||||
}
|
||||
Err(err) => {
|
||||
let status_code = err.status();
|
||||
Err(Error::Http {
|
||||
source: Box::new(err),
|
||||
request_id,
|
||||
status_code,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_version(&self, tag: &str) -> Result<u64> {
|
||||
let request = self
|
||||
.inner
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/tags/version/", self.inner.name))
|
||||
.json(&serde_json::json!({ "tag": tag }));
|
||||
|
||||
let (request_id, response) = self.inner.send(request, true).await?;
|
||||
let response = self
|
||||
.inner
|
||||
.check_table_response(&request_id, response)
|
||||
.await?;
|
||||
|
||||
match response.text().await {
|
||||
Ok(body) => {
|
||||
let value: serde_json::Value =
|
||||
serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
source: format!("Failed to parse tag version: {}", e).into(),
|
||||
request_id: request_id.clone(),
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
value
|
||||
.get("version")
|
||||
.and_then(|v| v.as_u64())
|
||||
.ok_or_else(|| Error::Http {
|
||||
source: format!("Invalid tag version response: {}", body).into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
})
|
||||
}
|
||||
Err(err) => {
|
||||
let status_code = err.status();
|
||||
Err(Error::Http {
|
||||
source: Box::new(err),
|
||||
request_id,
|
||||
status_code,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn create(&mut self, tag: &str, version: u64) -> Result<()> {
|
||||
let request = self
|
||||
.inner
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/tags/create/", self.inner.name))
|
||||
.json(&serde_json::json!({
|
||||
"tag": tag,
|
||||
"version": version
|
||||
}));
|
||||
|
||||
let (request_id, response) = self.inner.send(request, true).await?;
|
||||
self.inner
|
||||
.check_table_response(&request_id, response)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete(&mut self, tag: &str) -> Result<()> {
|
||||
let request = self
|
||||
.inner
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/tags/delete/", self.inner.name))
|
||||
.json(&serde_json::json!({ "tag": tag }));
|
||||
|
||||
let (request_id, response) = self.inner.send(request, true).await?;
|
||||
self.inner
|
||||
.check_table_response(&request_id, response)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update(&mut self, tag: &str, version: u64) -> Result<()> {
|
||||
let request = self
|
||||
.inner
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/tags/update/", self.inner.name))
|
||||
.json(&serde_json::json!({
|
||||
"tag": tag,
|
||||
"version": version
|
||||
}));
|
||||
|
||||
let (request_id, response) = self.inner.send(request, true).await?;
|
||||
self.inner
|
||||
.check_table_response(&request_id, response)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RemoteTable<S: HttpSend = Sender> {
|
||||
#[allow(dead_code)]
|
||||
@@ -888,7 +1023,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
&self,
|
||||
params: MergeInsertBuilder,
|
||||
new_data: Box<dyn RecordBatchReader + Send>,
|
||||
) -> Result<()> {
|
||||
) -> Result<MergeStats> {
|
||||
self.check_mutable().await?;
|
||||
|
||||
let query = MergeInsertRequest::try_from(params)?;
|
||||
@@ -900,11 +1035,23 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
|
||||
let (request_id, response) = self.send_streaming(request, new_data, true).await?;
|
||||
|
||||
// TODO: server can response with these stats in response body.
|
||||
// We should test that we can handle both empty response from old server
|
||||
// and response with stats from new server.
|
||||
self.check_table_response(&request_id, response).await?;
|
||||
|
||||
Ok(())
|
||||
Ok(MergeStats::default())
|
||||
}
|
||||
|
||||
async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
|
||||
Ok(Box::new(RemoteTags { inner: self }))
|
||||
}
|
||||
async fn checkout_tag(&self, tag: &str) -> Result<()> {
|
||||
let tags = self.tags().await?;
|
||||
let version = tags.get_version(tag).await?;
|
||||
let mut write_guard = self.version.write().await;
|
||||
*write_guard = Some(version);
|
||||
Ok(())
|
||||
}
|
||||
async fn optimize(&self, _action: OptimizeAction) -> Result<OptimizeStats> {
|
||||
self.check_mutable().await?;
|
||||
Err(Error::NotSupported {
|
||||
@@ -1098,6 +1245,20 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
fn dataset_uri(&self) -> &str {
|
||||
"NOT_SUPPORTED"
|
||||
}
|
||||
|
||||
async fn stats(&self) -> Result<TableStatistics> {
|
||||
let request = self.client.post(&format!("/v1/table/{}/stats/", self.name));
|
||||
let (request_id, response) = self.send(request, true).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
let body = response.text().await.err_to_http(request_id.clone())?;
|
||||
|
||||
let stats = serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
source: format!("Failed to parse table statistics: {}", e).into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
})?;
|
||||
Ok(stats)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
@@ -1190,7 +1351,12 @@ mod tests {
|
||||
Box::pin(table.count_rows(None).map_ok(|_| ())),
|
||||
Box::pin(table.update().column("a", "a + 1").execute().map_ok(|_| ())),
|
||||
Box::pin(table.add(example_data()).execute().map_ok(|_| ())),
|
||||
Box::pin(table.merge_insert(&["test"]).execute(example_data())),
|
||||
Box::pin(
|
||||
table
|
||||
.merge_insert(&["test"])
|
||||
.execute(example_data())
|
||||
.map_ok(|_| ()),
|
||||
),
|
||||
Box::pin(table.delete("false")),
|
||||
Box::pin(table.add_columns(
|
||||
NewColumnTransform::SqlExpressions(vec![("x".into(), "y".into())]),
|
||||
|
||||
@@ -20,6 +20,7 @@ use lance::dataset::cleanup::RemovalStats;
|
||||
use lance::dataset::optimize::{compact_files, CompactionMetrics, IndexRemapperOptions};
|
||||
use lance::dataset::scanner::Scanner;
|
||||
pub use lance::dataset::ColumnAlteration;
|
||||
pub use lance::dataset::MergeStats;
|
||||
pub use lance::dataset::NewColumnTransform;
|
||||
pub use lance::dataset::ReadParams;
|
||||
pub use lance::dataset::Version;
|
||||
@@ -80,9 +81,13 @@ pub mod merge;
|
||||
|
||||
use crate::index::waiter::wait_for_index;
|
||||
pub use chrono::Duration;
|
||||
use futures::future::join_all;
|
||||
pub use lance::dataset::optimize::CompactionOptions;
|
||||
pub use lance::dataset::refs::{TagContents, Tags as LanceTags};
|
||||
pub use lance::dataset::scanner::DatasetRecordBatchStream;
|
||||
use lance::dataset::statistics::DatasetStatisticsExt;
|
||||
pub use lance_index::optimize::OptimizeOptions;
|
||||
use serde_with::skip_serializing_none;
|
||||
|
||||
/// Defines the type of column
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
@@ -401,6 +406,24 @@ pub enum AnyQuery {
|
||||
VectorQuery(VectorQueryRequest),
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait Tags: Send + Sync {
|
||||
/// List the tags of the table.
|
||||
async fn list(&self) -> Result<HashMap<String, TagContents>>;
|
||||
|
||||
/// Get the version of the table referenced by a tag.
|
||||
async fn get_version(&self, tag: &str) -> Result<u64>;
|
||||
|
||||
/// Create a new tag for the given version of the table.
|
||||
async fn create(&mut self, tag: &str, version: u64) -> Result<()>;
|
||||
|
||||
/// Delete a tag from the table.
|
||||
async fn delete(&mut self, tag: &str) -> Result<()>;
|
||||
|
||||
/// Update an existing tag to point to a new version of the table.
|
||||
async fn update(&mut self, tag: &str, version: u64) -> Result<()>;
|
||||
}
|
||||
|
||||
/// A trait for anything "table-like". This is used for both native tables (which target
|
||||
/// Lance datasets) and remote tables (which target LanceDB cloud)
|
||||
///
|
||||
@@ -465,7 +488,9 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
&self,
|
||||
params: MergeInsertBuilder,
|
||||
new_data: Box<dyn RecordBatchReader + Send>,
|
||||
) -> Result<()>;
|
||||
) -> Result<MergeStats>;
|
||||
/// Gets the table tag manager.
|
||||
async fn tags(&self) -> Result<Box<dyn Tags + '_>>;
|
||||
/// Optimize the dataset.
|
||||
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats>;
|
||||
/// Add columns to the table.
|
||||
@@ -482,6 +507,9 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
async fn version(&self) -> Result<u64>;
|
||||
/// Checkout a specific version of the table.
|
||||
async fn checkout(&self, version: u64) -> Result<()>;
|
||||
/// Checkout a table version referenced by a tag.
|
||||
/// Tags provide a human-readable way to reference specific versions of the table.
|
||||
async fn checkout_tag(&self, tag: &str) -> Result<()>;
|
||||
/// Checkout the latest version of the table.
|
||||
async fn checkout_latest(&self) -> Result<()>;
|
||||
/// Restore the table to the currently checked out version.
|
||||
@@ -499,6 +527,8 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
index_names: &[&str],
|
||||
timeout: std::time::Duration,
|
||||
) -> Result<()>;
|
||||
/// Get statistics on the table
|
||||
async fn stats(&self) -> Result<TableStatistics>;
|
||||
}
|
||||
|
||||
/// A Table is a collection of strong typed Rows.
|
||||
@@ -1058,6 +1088,24 @@ impl Table {
|
||||
self.inner.checkout(version).await
|
||||
}
|
||||
|
||||
/// Checks out a specific version of the Table by tag
|
||||
///
|
||||
/// Any read operation on the table will now access the data at the version referenced by the tag.
|
||||
/// As a consequence, calling this method will disable any read consistency interval
|
||||
/// that was previously set.
|
||||
///
|
||||
/// This is a read-only operation that turns the table into a sort of "view"
|
||||
/// or "detached head". Other table instances will not be affected. To make the change
|
||||
/// permanent you can use the `[Self::restore]` method.
|
||||
///
|
||||
/// Any operation that modifies the table will fail while the table is in a checked
|
||||
/// out state.
|
||||
///
|
||||
/// To return the table to a normal state use `[Self::checkout_latest]`
|
||||
pub async fn checkout_tag(&self, tag: &str) -> Result<()> {
|
||||
self.inner.checkout_tag(tag).await
|
||||
}
|
||||
|
||||
/// Ensures the table is pointing at the latest version
|
||||
///
|
||||
/// This can be used to manually update a table when the read_consistency_interval is None
|
||||
@@ -1144,6 +1192,11 @@ impl Table {
|
||||
self.inner.wait_for_index(index_names, timeout).await
|
||||
}
|
||||
|
||||
/// Get the tags manager.
|
||||
pub async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
|
||||
self.inner.tags().await
|
||||
}
|
||||
|
||||
// Take many execution plans and map them into a single plan that adds
|
||||
// a query_index column and unions them.
|
||||
pub(crate) fn multi_vector_plan(
|
||||
@@ -1194,6 +1247,40 @@ impl Table {
|
||||
.unwrap();
|
||||
Ok(Arc::new(repartitioned))
|
||||
}
|
||||
|
||||
/// Retrieve statistics on the table
|
||||
pub async fn stats(&self) -> Result<TableStatistics> {
|
||||
self.inner.stats().await
|
||||
}
|
||||
}
|
||||
|
||||
pub struct NativeTags {
|
||||
inner: LanceTags,
|
||||
}
|
||||
#[async_trait]
|
||||
impl Tags for NativeTags {
|
||||
async fn list(&self) -> Result<HashMap<String, TagContents>> {
|
||||
Ok(self.inner.list().await?)
|
||||
}
|
||||
|
||||
async fn get_version(&self, tag: &str) -> Result<u64> {
|
||||
Ok(self.inner.get_version(tag).await?)
|
||||
}
|
||||
|
||||
async fn create(&mut self, tag: &str, version: u64) -> Result<()> {
|
||||
self.inner.create(tag, version).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete(&mut self, tag: &str) -> Result<()> {
|
||||
self.inner.delete(tag).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update(&mut self, tag: &str, version: u64) -> Result<()> {
|
||||
self.inner.update(tag, version).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NativeTable> for Table {
|
||||
@@ -1940,6 +2027,10 @@ impl BaseTable for NativeTable {
|
||||
self.dataset.as_time_travel(version).await
|
||||
}
|
||||
|
||||
async fn checkout_tag(&self, tag: &str) -> Result<()> {
|
||||
self.dataset.as_time_travel(tag).await
|
||||
}
|
||||
|
||||
async fn checkout_latest(&self) -> Result<()> {
|
||||
self.dataset
|
||||
.as_latest(self.read_consistency_interval)
|
||||
@@ -2277,7 +2368,7 @@ impl BaseTable for NativeTable {
|
||||
&self,
|
||||
params: MergeInsertBuilder,
|
||||
new_data: Box<dyn RecordBatchReader + Send>,
|
||||
) -> Result<()> {
|
||||
) -> Result<MergeStats> {
|
||||
let dataset = Arc::new(self.dataset.get().await?.clone());
|
||||
let mut builder = LanceMergeInsertBuilder::try_new(dataset.clone(), params.on)?;
|
||||
match (
|
||||
@@ -2304,9 +2395,9 @@ impl BaseTable for NativeTable {
|
||||
builder.when_not_matched_by_source(WhenNotMatchedBySource::Keep);
|
||||
}
|
||||
let job = builder.try_build()?;
|
||||
let (new_dataset, _stats) = job.execute_reader(new_data).await?;
|
||||
let (new_dataset, stats) = job.execute_reader(new_data).await?;
|
||||
self.dataset.set_latest(new_dataset.as_ref().clone()).await;
|
||||
Ok(())
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
/// Delete rows from the table
|
||||
@@ -2315,6 +2406,14 @@ impl BaseTable for NativeTable {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn tags(&self) -> Result<Box<dyn Tags + '_>> {
|
||||
let dataset = self.dataset.get().await?;
|
||||
|
||||
Ok(Box::new(NativeTags {
|
||||
inner: dataset.tags.clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn optimize(&self, action: OptimizeAction) -> Result<OptimizeStats> {
|
||||
let mut stats = OptimizeStats {
|
||||
compaction: None,
|
||||
@@ -2480,6 +2579,108 @@ impl BaseTable for NativeTable {
|
||||
) -> Result<()> {
|
||||
wait_for_index(self, index_names, timeout).await
|
||||
}
|
||||
|
||||
async fn stats(&self) -> Result<TableStatistics> {
|
||||
let num_rows = self.count_rows(None).await?;
|
||||
let num_indices = self.list_indices().await?.len();
|
||||
let ds = self.dataset.get().await?;
|
||||
let ds_clone = (*ds).clone();
|
||||
let ds_stats = Arc::new(ds_clone).calculate_data_stats().await?;
|
||||
let total_bytes = ds_stats.fields.iter().map(|f| f.bytes_on_disk).sum::<u64>() as usize;
|
||||
|
||||
let frags = ds.get_fragments();
|
||||
let mut sorted_sizes = join_all(
|
||||
frags
|
||||
.iter()
|
||||
.map(|frag| async move { frag.physical_rows().await.unwrap_or(0) }),
|
||||
)
|
||||
.await;
|
||||
sorted_sizes.sort();
|
||||
|
||||
let small_frag_threshold = 100000;
|
||||
let num_fragments = sorted_sizes.len();
|
||||
let num_small_fragments = sorted_sizes
|
||||
.iter()
|
||||
.filter(|&&size| size < small_frag_threshold)
|
||||
.count();
|
||||
|
||||
let p25 = *sorted_sizes.get(num_fragments / 4).unwrap_or(&0);
|
||||
let p50 = *sorted_sizes.get(num_fragments / 2).unwrap_or(&0);
|
||||
let p75 = *sorted_sizes.get(num_fragments * 3 / 4).unwrap_or(&0);
|
||||
let p99 = *sorted_sizes.get(num_fragments * 99 / 100).unwrap_or(&0);
|
||||
let min = sorted_sizes.first().copied().unwrap_or(0);
|
||||
let max = sorted_sizes.last().copied().unwrap_or(0);
|
||||
let mean = if num_fragments == 0 {
|
||||
0
|
||||
} else {
|
||||
sorted_sizes.iter().copied().sum::<usize>() / num_fragments
|
||||
};
|
||||
|
||||
let frag_stats = FragmentStatistics {
|
||||
num_fragments,
|
||||
num_small_fragments,
|
||||
lengths: FragmentSummaryStats {
|
||||
min,
|
||||
max,
|
||||
mean,
|
||||
p25,
|
||||
p50,
|
||||
p75,
|
||||
p99,
|
||||
},
|
||||
};
|
||||
let stats = TableStatistics {
|
||||
total_bytes,
|
||||
num_rows,
|
||||
num_indices,
|
||||
fragment_stats: frag_stats,
|
||||
};
|
||||
Ok(stats)
|
||||
}
|
||||
}
|
||||
|
||||
#[skip_serializing_none]
|
||||
#[derive(Debug, Deserialize, PartialEq)]
|
||||
pub struct TableStatistics {
|
||||
/// The total number of bytes in the table
|
||||
pub total_bytes: usize,
|
||||
|
||||
/// The number of rows in the table
|
||||
pub num_rows: usize,
|
||||
|
||||
/// The number of indices in the table
|
||||
pub num_indices: usize,
|
||||
|
||||
/// Statistics on table fragments
|
||||
pub fragment_stats: FragmentStatistics,
|
||||
}
|
||||
|
||||
#[skip_serializing_none]
|
||||
#[derive(Debug, Deserialize, PartialEq)]
|
||||
pub struct FragmentStatistics {
|
||||
/// The number of fragments in the table
|
||||
pub num_fragments: usize,
|
||||
|
||||
/// The number of uncompacted fragments in the table
|
||||
pub num_small_fragments: usize,
|
||||
|
||||
/// Statistics on the number of rows in the table fragments
|
||||
pub lengths: FragmentSummaryStats,
|
||||
// todo: add size statistics
|
||||
// /// Statistics on the number of bytes in the table fragments
|
||||
// sizes: FragmentStats,
|
||||
}
|
||||
|
||||
#[skip_serializing_none]
|
||||
#[derive(Debug, Deserialize, PartialEq)]
|
||||
pub struct FragmentSummaryStats {
|
||||
pub min: usize,
|
||||
pub max: usize,
|
||||
pub mean: usize,
|
||||
pub p25: usize,
|
||||
pub p50: usize,
|
||||
pub p75: usize,
|
||||
pub p99: usize,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -3081,6 +3282,60 @@ mod tests {
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_tags() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let table = conn
|
||||
.create_table("my_table", some_sample_data())
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(table.version().await.unwrap(), 1);
|
||||
table.add(some_sample_data()).execute().await.unwrap();
|
||||
assert_eq!(table.version().await.unwrap(), 2);
|
||||
let mut tags_manager = table.tags().await.unwrap();
|
||||
let tags = tags_manager.list().await.unwrap();
|
||||
assert!(tags.is_empty(), "Tags should be empty initially");
|
||||
let tag1 = "tag1";
|
||||
tags_manager.create(tag1, 1).await.unwrap();
|
||||
assert_eq!(tags_manager.get_version(tag1).await.unwrap(), 1);
|
||||
let tags = tags_manager.list().await.unwrap();
|
||||
assert_eq!(tags.len(), 1);
|
||||
assert!(tags.contains_key(tag1));
|
||||
assert_eq!(tags.get(tag1).unwrap().version, 1);
|
||||
tags_manager.create("tag2", 2).await.unwrap();
|
||||
assert_eq!(tags_manager.get_version("tag2").await.unwrap(), 2);
|
||||
let tags = tags_manager.list().await.unwrap();
|
||||
assert_eq!(tags.len(), 2);
|
||||
assert!(tags.contains_key(tag1));
|
||||
assert_eq!(tags.get(tag1).unwrap().version, 1);
|
||||
assert!(tags.contains_key("tag2"));
|
||||
assert_eq!(tags.get("tag2").unwrap().version, 2);
|
||||
// Test update and delete
|
||||
table.add(some_sample_data()).execute().await.unwrap();
|
||||
tags_manager.update(tag1, 3).await.unwrap();
|
||||
assert_eq!(tags_manager.get_version(tag1).await.unwrap(), 3);
|
||||
tags_manager.delete("tag2").await.unwrap();
|
||||
let tags = tags_manager.list().await.unwrap();
|
||||
assert_eq!(tags.len(), 1);
|
||||
assert!(tags.contains_key(tag1));
|
||||
assert_eq!(tags.get(tag1).unwrap().version, 3);
|
||||
// Test checkout tag
|
||||
table.add(some_sample_data()).execute().await.unwrap();
|
||||
assert_eq!(table.version().await.unwrap(), 4);
|
||||
table.checkout_tag(tag1).await.unwrap();
|
||||
assert_eq!(table.version().await.unwrap(), 3);
|
||||
table.checkout_latest().await.unwrap();
|
||||
assert_eq!(table.version().await.unwrap(), 4);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_index() {
|
||||
use arrow_array::RecordBatch;
|
||||
@@ -3803,4 +4058,108 @@ mod tests {
|
||||
Some(&"test_field_val1".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn test_stats() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let conn = ConnectBuilder::new(uri).execute().await.unwrap();
|
||||
|
||||
let schema = Arc::new(Schema::new(vec![
|
||||
Field::new("id", DataType::Int32, false),
|
||||
Field::new("foo", DataType::Int32, true),
|
||||
]));
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(Int32Array::from_iter_values(0..100)),
|
||||
Arc::new(Int32Array::from_iter_values(0..100)),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let table = conn
|
||||
.create_table(
|
||||
"test_stats",
|
||||
RecordBatchIterator::new(vec![Ok(batch.clone())], batch.schema()),
|
||||
)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
for _ in 0..10 {
|
||||
let batch = RecordBatch::try_new(
|
||||
schema.clone(),
|
||||
vec![
|
||||
Arc::new(Int32Array::from_iter_values(0..15)),
|
||||
Arc::new(Int32Array::from_iter_values(0..15)),
|
||||
],
|
||||
)
|
||||
.unwrap();
|
||||
table
|
||||
.add(RecordBatchIterator::new(
|
||||
vec![Ok(batch.clone())],
|
||||
batch.schema(),
|
||||
))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let empty_table = conn
|
||||
.create_table(
|
||||
"test_stats_empty",
|
||||
RecordBatchIterator::new(vec![], batch.schema()),
|
||||
)
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let res = table.stats().await.unwrap();
|
||||
println!("{:#?}", res);
|
||||
assert_eq!(
|
||||
res,
|
||||
TableStatistics {
|
||||
num_rows: 250,
|
||||
num_indices: 0,
|
||||
total_bytes: 2000,
|
||||
fragment_stats: FragmentStatistics {
|
||||
num_fragments: 11,
|
||||
num_small_fragments: 11,
|
||||
lengths: FragmentSummaryStats {
|
||||
min: 15,
|
||||
max: 100,
|
||||
mean: 22,
|
||||
p25: 15,
|
||||
p50: 15,
|
||||
p75: 15,
|
||||
p99: 100,
|
||||
},
|
||||
},
|
||||
}
|
||||
);
|
||||
let res = empty_table.stats().await.unwrap();
|
||||
println!("{:#?}", res);
|
||||
assert_eq!(
|
||||
res,
|
||||
TableStatistics {
|
||||
num_rows: 0,
|
||||
num_indices: 0,
|
||||
total_bytes: 0,
|
||||
fragment_stats: FragmentStatistics {
|
||||
num_fragments: 0,
|
||||
num_small_fragments: 0,
|
||||
lengths: FragmentSummaryStats {
|
||||
min: 0,
|
||||
max: 0,
|
||||
mean: 0,
|
||||
p25: 0,
|
||||
p50: 0,
|
||||
p75: 0,
|
||||
p99: 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::{
|
||||
time::{self, Duration, Instant},
|
||||
};
|
||||
|
||||
use lance::Dataset;
|
||||
use lance::{dataset::refs, Dataset};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
|
||||
use crate::error::Result;
|
||||
@@ -83,19 +83,32 @@ impl DatasetRef {
|
||||
}
|
||||
}
|
||||
|
||||
async fn as_time_travel(&mut self, target_version: u64) -> Result<()> {
|
||||
async fn as_time_travel(&mut self, target_version: impl Into<refs::Ref>) -> Result<()> {
|
||||
let target_ref = target_version.into();
|
||||
|
||||
match self {
|
||||
Self::Latest { dataset, .. } => {
|
||||
let new_dataset = dataset.checkout_version(target_ref.clone()).await?;
|
||||
let version_value = new_dataset.version().version;
|
||||
|
||||
*self = Self::TimeTravel {
|
||||
dataset: dataset.checkout_version(target_version).await?,
|
||||
version: target_version,
|
||||
dataset: new_dataset,
|
||||
version: version_value,
|
||||
};
|
||||
}
|
||||
Self::TimeTravel { dataset, version } => {
|
||||
if *version != target_version {
|
||||
let should_checkout = match &target_ref {
|
||||
refs::Ref::Version(target_ver) => version != target_ver,
|
||||
refs::Ref::Tag(_) => true, // Always checkout for tags
|
||||
};
|
||||
|
||||
if should_checkout {
|
||||
let new_dataset = dataset.checkout_version(target_ref).await?;
|
||||
let version_value = new_dataset.version().version;
|
||||
|
||||
*self = Self::TimeTravel {
|
||||
dataset: dataset.checkout_version(target_version).await?,
|
||||
version: target_version,
|
||||
dataset: new_dataset,
|
||||
version: version_value,
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -175,7 +188,7 @@ impl DatasetConsistencyWrapper {
|
||||
write_guard.as_latest(read_consistency_interval).await
|
||||
}
|
||||
|
||||
pub async fn as_time_travel(&self, target_version: u64) -> Result<()> {
|
||||
pub async fn as_time_travel(&self, target_version: impl Into<refs::Ref>) -> Result<()> {
|
||||
self.0.write().await.as_time_travel(target_version).await
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_array::RecordBatchReader;
|
||||
use lance::dataset::MergeStats;
|
||||
|
||||
use crate::Result;
|
||||
|
||||
@@ -86,8 +87,9 @@ impl MergeInsertBuilder {
|
||||
|
||||
/// Executes the merge insert operation
|
||||
///
|
||||
/// Nothing is returned but the [`super::Table`] is updated
|
||||
pub async fn execute(self, new_data: Box<dyn RecordBatchReader + Send>) -> Result<()> {
|
||||
/// Returns statistics about the merge operation including the number of rows
|
||||
/// inserted, updated, and deleted.
|
||||
pub async fn execute(self, new_data: Box<dyn RecordBatchReader + Send>) -> Result<MergeStats> {
|
||||
self.table.clone().merge_insert(self, new_data).await
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user