Compare commits

...

15 Commits

Author SHA1 Message Date
Lance Release
26dab93f2a Bump version: 0.21.3-beta.0 → 0.22.0-beta.0 2025-03-30 18:04:14 +00:00
LuQQiu
b9bdb8d937 fix: fix remote restore api to always checkout latest version (#2291)
Fix restore to always checkout latest version, following local restore
api implementation

a1d1833a40/rust/lancedb/src/table.rs (L1910)
Otherwise
table.create_table -> version 1
table.add_table -> version 2
table.checkout(1), table.restore() -> the version remains at 1 (should
checkout_latest inside restore method to update version to latest
version and allow write operation)
table.checkout_latest() -> version is 3
can do write operations
2025-03-29 22:46:57 -07:00
LuQQiu
a1d1833a40 feat: add analyze_plan api (#2280)
add analyze plan api to allow executing the queries and see runtime
metrics.
Which help identify the query IO overhead and help identify query
slowness
2025-03-28 14:28:52 -07:00
Will Jones
a547c523c2 feat!: change default read_consistency_interval=5s (#2281)
Previously, when we loaded the next version of the table, we would block
all reads with a write lock. Now, we only do that if
`read_consistency_interval=0`. Otherwise, we load the next version
asynchronously in the background. This should mean that
`read_consistency_interval > 0` won't have a meaningful impact on
latency.

Along with this change, I felt it was safe to change the default
consistency interval to 5 seconds. The current default is `None`, which
means we will **never** check for a new version by default. I think that
default is contrary to most users expectations.
2025-03-28 11:04:31 -07:00
Lance Release
dc8b75feab Updating package-lock.json 2025-03-28 17:15:17 +00:00
Lance Release
c1600cdc06 Updating package-lock.json 2025-03-28 16:04:01 +00:00
Lance Release
f5dee46970 Updating package-lock.json 2025-03-28 16:03:46 +00:00
Lance Release
346cbf8bf7 Bump version: 0.18.2-beta.0 → 0.18.3-beta.0 2025-03-28 16:03:31 +00:00
Lance Release
3c7dfe9f28 Bump version: 0.21.2-beta.0 → 0.21.3-beta.0 2025-03-28 16:03:17 +00:00
Lei Xu
f52d05d3fa feat: add columns using pyarrow schema (#2284) 2025-03-28 08:51:50 -07:00
vinoyang
c321cccc12 chore(java): make rust release to be a switch option (#2277) 2025-03-28 11:26:24 +08:00
LuQQiu
cba14a5743 feat: add restore remote api (#2282) 2025-03-27 16:33:52 -07:00
vinoyang
72057b743d chore(java): introduce spotless plugin (#2278) 2025-03-27 10:38:39 +08:00
LuQQiu
698f329598 feat: add explain plan remote api (#2263)
Add explain plan remote api
2025-03-26 11:22:40 -07:00
BubbleCal
79fa745130 feat: upgrade lance to v0.25.1-beta.3 (#2276)
Signed-off-by: BubbleCal <bubble-cal@outlook.com>
2025-03-26 23:14:27 +08:00
60 changed files with 1355 additions and 420 deletions

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.18.2-beta.0"
current_version = "0.18.3-beta.0"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -43,7 +43,7 @@ jobs:
- uses: Swatinem/rust-cache@v2
- uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: "1.79.0"
toolchain: "1.81.0"
cache-workspaces: "./java/core/lancedb-jni"
# Disable full debug symbol generation to speed up CI build and keep memory down
# "1" means line tables only, which is useful for panic tracebacks.
@@ -97,7 +97,7 @@ jobs:
- name: Dry run
if: github.event_name == 'pull_request'
run: |
mvn --batch-mode -DskipTests package
mvn --batch-mode -DskipTests -Drust.release.build=true package
- name: Set github
run: |
git config --global user.email "LanceDB Github Runner"
@@ -108,7 +108,7 @@ jobs:
echo "use-agent" >> ~/.gnupg/gpg.conf
echo "pinentry-mode loopback" >> ~/.gnupg/gpg.conf
export GPG_TTY=$(tty)
mvn --batch-mode -DskipTests -DpushChanges=false -Dgpg.passphrase=${{ secrets.GPG_PASSPHRASE }} deploy -P deploy-to-ossrh
mvn --batch-mode -DskipTests -Drust.release.build=true -DpushChanges=false -Dgpg.passphrase=${{ secrets.GPG_PASSPHRASE }} deploy -P deploy-to-ossrh
env:
SONATYPE_USER: ${{ secrets.SONATYPE_USER }}
SONATYPE_TOKEN: ${{ secrets.SONATYPE_TOKEN }}

414
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -21,16 +21,16 @@ categories = ["database-implementations"]
rust-version = "1.78.0"
[workspace.dependencies]
lance = { "version" = "=0.25.0", "features" = [
lance = { "version" = "=0.25.1", "features" = [
"dynamodb",
], tag = "v0.25.0-beta.5", git = "https://github.com/lancedb/lance.git" }
lance-io = { version = "=0.25.0", tag = "v0.25.0-beta.5", git = "https://github.com/lancedb/lance.git" }
lance-index = { version = "=0.25.0", tag = "v0.25.0-beta.5", git = "https://github.com/lancedb/lance.git" }
lance-linalg = { version = "=0.25.0", tag = "v0.25.0-beta.5", git = "https://github.com/lancedb/lance.git" }
lance-table = { version = "=0.25.0", tag = "v0.25.0-beta.5", git = "https://github.com/lancedb/lance.git" }
lance-testing = { version = "=0.25.0", tag = "v0.25.0-beta.5", git = "https://github.com/lancedb/lance.git" }
lance-datafusion = { version = "=0.25.0", tag = "v0.25.0-beta.5", git = "https://github.com/lancedb/lance.git" }
lance-encoding = { version = "=0.25.0", tag = "v0.25.0-beta.5", git = "https://github.com/lancedb/lance.git" }
], tag = "v0.25.1-beta.3", git = "https://github.com/lancedb/lance.git" }
lance-io = { version = "=0.25.1", tag = "v0.25.1-beta.3", git = "https://github.com/lancedb/lance.git" }
lance-index = { version = "=0.25.1", tag = "v0.25.1-beta.3", git = "https://github.com/lancedb/lance.git" }
lance-linalg = { version = "=0.25.1", tag = "v0.25.1-beta.3", git = "https://github.com/lancedb/lance.git" }
lance-table = { version = "=0.25.1", tag = "v0.25.1-beta.3", git = "https://github.com/lancedb/lance.git" }
lance-testing = { version = "=0.25.1", tag = "v0.25.1-beta.3", git = "https://github.com/lancedb/lance.git" }
lance-datafusion = { version = "=0.25.1", tag = "v0.25.1-beta.3", git = "https://github.com/lancedb/lance.git" }
lance-encoding = { version = "=0.25.1", tag = "v0.25.1-beta.3", git = "https://github.com/lancedb/lance.git" }
# Note that this one does not include pyarrow
arrow = { version = "54.1", optional = false }
arrow-array = "54.1"

View File

@@ -1001,9 +1001,11 @@ In LanceDB OSS, users can set the `read_consistency_interval` parameter on conne
There are three possible settings for `read_consistency_interval`:
1. **Unset (default)**: The database does not check for updates to tables made by other processes. This provides the best query performance, but means that clients may not see the most up-to-date data. This setting is suitable for applications where the data does not change during the lifetime of the table reference.
2. **Zero seconds (Strong consistency)**: The database checks for updates on every read. This provides the strongest consistency guarantees, ensuring that all clients see the latest committed data. However, it has the most overhead. This setting is suitable when consistency matters more than having high QPS.
3. **Custom interval (Eventual consistency)**: The database checks for updates at a custom interval, such as every 5 seconds. This provides eventual consistency, allowing for some lag between write and read operations. Performance wise, this is a middle ground between strong consistency and no consistency check. This setting is suitable for applications where immediate consistency is not critical, but clients should see updated data eventually.
1. **Unset**: The database does not check for updates to tables made by other processes. This setting is suitable for applications where the data does not change during the lifetime of the table reference.
2. **Zero seconds (Strong consistency)**: The database checks for updates on every read. This provides the strongest consistency guarantees, ensuring that all clients see the latest committed data. However, it has the most overhead. This setting is suitable when consistency matters more than having high QPS. For best performance, combine this setting with the storage option `new_table_enable_v2_manifest_paths` set to `true`.
3. **Custom interval (Eventual consistency, the default)**: The database checks for updates at a custom interval. By default, this is every 5 seconds. This provides eventual consistency, allowing for some lag between write and read operations. Performance wise, this is a middle ground between strong consistency and no consistency check. This setting is suitable for applications where immediate consistency is not critical, but clients should see updated data eventually.
You can always force a synchronization by calling `checkout_latest()` / `checkoutLatest()` on a table.
!!! tip "Consistency in LanceDB Cloud"
@@ -1041,7 +1043,21 @@ There are three possible settings for `read_consistency_interval`:
--8<-- "python/python/tests/docs/test_guide_tables.py:table_async_eventual_consistency"
```
By default, a `Table` will never check for updates from other writers. To manually check for updates you can use `checkout_latest`:
For no consistency, use `None`:
=== "Sync API"
```python
--8<-- "python/python/tests/docs/test_guide_tables.py:table_no_consistency"
```
=== "Async API"
```python
--8<-- "python/python/tests/docs/test_guide_tables.py:table_async_no_consistency"
```
To manually check for updates you can use `checkout_latest`:
=== "Sync API"
@@ -1059,15 +1075,25 @@ There are three possible settings for `read_consistency_interval`:
To set strong consistency, use `0`:
```ts
const db = await lancedb.connect({ uri: "./.lancedb", readConsistencyInterval: 0 });
const tbl = await db.openTable("my_table");
--8<-- "nodejs/examples/basic.test.ts:table_strong_consistency"
```
For eventual consistency, specify the update interval as seconds:
```ts
const db = await lancedb.connect({ uri: "./.lancedb", readConsistencyInterval: 5 });
const tbl = await db.openTable("my_table");
--8<-- "nodejs/examples/basic.test.ts:table_eventual_consistency"
```
For no consistency, use `null`:
```ts
--8<-- "nodejs/examples/basic.test.ts:table_no_consistency"
```
To manually check for updates you can use `checkoutLatest`:
```ts
--8<-- "nodejs/examples/basic.test.ts:table_checkout_latest"
```
<!-- Node doesn't yet support the version time travel: https://github.com/lancedb/lancedb/issues/1007

View File

@@ -30,6 +30,53 @@ protected inner: Query | Promise<Query>;
## Methods
### analyzePlan()
```ts
analyzePlan(): Promise<string>
```
Executes the query and returns the physical query plan annotated with runtime metrics.
This is useful for debugging and performance analysis, as it shows how the query was executed
and includes metrics such as elapsed time, rows processed, and I/O statistics.
#### Returns
`Promise`&lt;`string`&gt;
A query execution plan with runtime metrics for each step.
#### Example
```ts
import * as lancedb from "@lancedb/lancedb"
const db = await lancedb.connect("./.lancedb");
const table = await db.createTable("my_table", [
{ vector: [1.1, 0.9], id: "1" },
]);
const plan = await table.query().nearestTo([0.5, 0.2]).analyzePlan();
Example output (with runtime metrics inlined):
AnalyzeExec verbose=true, metrics=[]
ProjectionExec: expr=[id@3 as id, vector@0 as vector, _distance@2 as _distance], metrics=[output_rows=1, elapsed_compute=3.292µs]
Take: columns="vector, _rowid, _distance, (id)", metrics=[output_rows=1, elapsed_compute=66.001µs, batches_processed=1, bytes_read=8, iops=1, requests=1]
CoalesceBatchesExec: target_batch_size=1024, metrics=[output_rows=1, elapsed_compute=3.333µs]
GlobalLimitExec: skip=0, fetch=10, metrics=[output_rows=1, elapsed_compute=167ns]
FilterExec: _distance@2 IS NOT NULL, metrics=[output_rows=1, elapsed_compute=8.542µs]
SortExec: TopK(fetch=10), expr=[_distance@2 ASC NULLS LAST], metrics=[output_rows=1, elapsed_compute=63.25µs, row_replacements=1]
KNNVectorDistance: metric=l2, metrics=[output_rows=1, elapsed_compute=114.333µs, output_batches=1]
LanceScan: uri=/path/to/data, projection=[vector], row_id=true, row_addr=false, ordered=false, metrics=[output_rows=1, elapsed_compute=103.626µs, bytes_read=549, iops=2, requests=2]
```
#### Inherited from
[`QueryBase`](QueryBase.md).[`analyzePlan`](QueryBase.md#analyzeplan)
***
### execute()
```ts

View File

@@ -36,6 +36,49 @@ protected inner: NativeQueryType | Promise<NativeQueryType>;
## Methods
### analyzePlan()
```ts
analyzePlan(): Promise<string>
```
Executes the query and returns the physical query plan annotated with runtime metrics.
This is useful for debugging and performance analysis, as it shows how the query was executed
and includes metrics such as elapsed time, rows processed, and I/O statistics.
#### Returns
`Promise`&lt;`string`&gt;
A query execution plan with runtime metrics for each step.
#### Example
```ts
import * as lancedb from "@lancedb/lancedb"
const db = await lancedb.connect("./.lancedb");
const table = await db.createTable("my_table", [
{ vector: [1.1, 0.9], id: "1" },
]);
const plan = await table.query().nearestTo([0.5, 0.2]).analyzePlan();
Example output (with runtime metrics inlined):
AnalyzeExec verbose=true, metrics=[]
ProjectionExec: expr=[id@3 as id, vector@0 as vector, _distance@2 as _distance], metrics=[output_rows=1, elapsed_compute=3.292µs]
Take: columns="vector, _rowid, _distance, (id)", metrics=[output_rows=1, elapsed_compute=66.001µs, batches_processed=1, bytes_read=8, iops=1, requests=1]
CoalesceBatchesExec: target_batch_size=1024, metrics=[output_rows=1, elapsed_compute=3.333µs]
GlobalLimitExec: skip=0, fetch=10, metrics=[output_rows=1, elapsed_compute=167ns]
FilterExec: _distance@2 IS NOT NULL, metrics=[output_rows=1, elapsed_compute=8.542µs]
SortExec: TopK(fetch=10), expr=[_distance@2 ASC NULLS LAST], metrics=[output_rows=1, elapsed_compute=63.25µs, row_replacements=1]
KNNVectorDistance: metric=l2, metrics=[output_rows=1, elapsed_compute=114.333µs, output_batches=1]
LanceScan: uri=/path/to/data, projection=[vector], row_id=true, row_addr=false, ordered=false, metrics=[output_rows=1, elapsed_compute=103.626µs, bytes_read=549, iops=2, requests=2]
```
***
### execute()
```ts

View File

@@ -48,6 +48,53 @@ addQueryVector(vector): VectorQuery
***
### analyzePlan()
```ts
analyzePlan(): Promise<string>
```
Executes the query and returns the physical query plan annotated with runtime metrics.
This is useful for debugging and performance analysis, as it shows how the query was executed
and includes metrics such as elapsed time, rows processed, and I/O statistics.
#### Returns
`Promise`&lt;`string`&gt;
A query execution plan with runtime metrics for each step.
#### Example
```ts
import * as lancedb from "@lancedb/lancedb"
const db = await lancedb.connect("./.lancedb");
const table = await db.createTable("my_table", [
{ vector: [1.1, 0.9], id: "1" },
]);
const plan = await table.query().nearestTo([0.5, 0.2]).analyzePlan();
Example output (with runtime metrics inlined):
AnalyzeExec verbose=true, metrics=[]
ProjectionExec: expr=[id@3 as id, vector@0 as vector, _distance@2 as _distance], metrics=[output_rows=1, elapsed_compute=3.292µs]
Take: columns="vector, _rowid, _distance, (id)", metrics=[output_rows=1, elapsed_compute=66.001µs, batches_processed=1, bytes_read=8, iops=1, requests=1]
CoalesceBatchesExec: target_batch_size=1024, metrics=[output_rows=1, elapsed_compute=3.333µs]
GlobalLimitExec: skip=0, fetch=10, metrics=[output_rows=1, elapsed_compute=167ns]
FilterExec: _distance@2 IS NOT NULL, metrics=[output_rows=1, elapsed_compute=8.542µs]
SortExec: TopK(fetch=10), expr=[_distance@2 ASC NULLS LAST], metrics=[output_rows=1, elapsed_compute=63.25µs, row_replacements=1]
KNNVectorDistance: metric=l2, metrics=[output_rows=1, elapsed_compute=114.333µs, output_batches=1]
LanceScan: uri=/path/to/data, projection=[vector], row_id=true, row_addr=false, ordered=false, metrics=[output_rows=1, elapsed_compute=103.626µs, bytes_read=549, iops=2, requests=2]
```
#### Inherited from
[`QueryBase`](QueryBase.md).[`analyzePlan`](QueryBase.md#analyzeplan)
***
### bypassVectorIndex()
```ts

View File

@@ -44,7 +44,7 @@ for testing purposes.
### readConsistencyInterval?
```ts
optional readConsistencyInterval: number;
optional readConsistencyInterval: null | number;
```
(For LanceDB OSS only): The interval, in seconds, at which to check for

View File

@@ -11,6 +11,7 @@ likely that someone who knows the answer will see your question.
## Common issues
* Multiprocessing with `fork` is not supported. You should use `spawn` instead.
* Data returned by queries may not reflect the most recent writes, depending on configuration. LanceDB uses eventual consistency by default. See [consistency](/docs/src/guides/tables.md#consistency) for more information.
## Enabling logging
@@ -35,3 +36,9 @@ print the resolved query plan. You can use the `explain_plan` method to do this:
* Python Sync: [LanceQueryBuilder.explain_plan][lancedb.query.LanceQueryBuilder.explain_plan]
* Python Async: [AsyncQueryBase.explain_plan][lancedb.query.AsyncQueryBase.explain_plan]
* Node @lancedb/lancedb: [LanceQueryBuilder.explainPlan](/lancedb/js/classes/QueryBase/#explainplan)
To understand how a query was actually executed—including metrics like execution time, number of rows processed, I/O stats, and more—use the analyze_plan method. This executes the query and returns a physical execution plan annotated with runtime metrics, making it especially helpful for performance tuning and debugging.
* Python Sync: [LanceQueryBuilder.analyze_plan][lancedb.query.LanceQueryBuilder.analyze_plan]
* Python Async: [AsyncQueryBase.analyze_plan][lancedb.query.AsyncQueryBase.analyze_plan]
* Node @lancedb/lancedb: [LanceQueryBuilder.analyzePlan](/lancedb/js/classes/QueryBase/#analyzePlan)

View File

@@ -8,13 +8,16 @@
<parent>
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.18.2-beta.0</version>
<version>0.18.3-beta.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>lancedb-core</artifactId>
<name>LanceDB Core</name>
<packaging>jar</packaging>
<properties>
<rust.release.build>false</rust.release.build>
</properties>
<dependencies>
<dependency>
@@ -68,7 +71,7 @@
</goals>
<configuration>
<path>lancedb-jni</path>
<release>true</release>
<release>${rust.release.build}</release>
<!-- Copy native libraries to target/classes for runtime access -->
<copyTo>${project.build.directory}/classes/nativelib</copyTo>
<copyWithPlatformDir>true</copyWithPlatformDir>

View File

@@ -1,16 +1,25 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lancedb.lancedb;
import io.questdb.jar.jni.JarJniLoader;
import java.io.Closeable;
import java.util.List;
import java.util.Optional;
/**
* Represents LanceDB database.
*/
/** Represents LanceDB database. */
public class Connection implements Closeable {
static {
JarJniLoader.loadLib(Connection.class, "/nativelib", "lancedb_jni");
@@ -18,14 +27,11 @@ public class Connection implements Closeable {
private long nativeConnectionHandle;
/**
* Connect to a LanceDB instance.
*/
/** Connect to a LanceDB instance. */
public static native Connection connect(String uri);
/**
* Get the names of all tables in the database. The names are sorted in
* ascending order.
* Get the names of all tables in the database. The names are sorted in ascending order.
*
* @return the table names
*/
@@ -34,8 +40,7 @@ public class Connection implements Closeable {
}
/**
* Get the names of filtered tables in the database. The names are sorted in
* ascending order.
* Get the names of filtered tables in the database. The names are sorted in ascending order.
*
* @param limit The number of results to return.
* @return the table names
@@ -45,12 +50,11 @@ public class Connection implements Closeable {
}
/**
* Get the names of filtered tables in the database. The names are sorted in
* ascending order.
* Get the names of filtered tables in the database. The names are sorted in ascending order.
*
* @param startAfter If present, only return names that come lexicographically after the supplied
* value. This can be combined with limit to implement pagination
* by setting this to the last table name from the previous page.
* value. This can be combined with limit to implement pagination by setting this to the last
* table name from the previous page.
* @return the table names
*/
public List<String> tableNames(String startAfter) {
@@ -58,12 +62,11 @@ public class Connection implements Closeable {
}
/**
* Get the names of filtered tables in the database. The names are sorted in
* ascending order.
* Get the names of filtered tables in the database. The names are sorted in ascending order.
*
* @param startAfter If present, only return names that come lexicographically after the supplied
* value. This can be combined with limit to implement pagination
* by setting this to the last table name from the previous page.
* value. This can be combined with limit to implement pagination by setting this to the last
* table name from the previous page.
* @param limit The number of results to return.
* @return the table names
*/
@@ -72,22 +75,19 @@ public class Connection implements Closeable {
}
/**
* Get the names of filtered tables in the database. The names are sorted in
* ascending order.
* Get the names of filtered tables in the database. The names are sorted in ascending order.
*
* @param startAfter If present, only return names that come lexicographically after the supplied
* value. This can be combined with limit to implement pagination
* by setting this to the last table name from the previous page.
* value. This can be combined with limit to implement pagination by setting this to the last
* table name from the previous page.
* @param limit The number of results to return.
* @return the table names
*/
public native List<String> tableNames(
Optional<String> startAfter, Optional<Integer> limit);
public native List<String> tableNames(Optional<String> startAfter, Optional<Integer> limit);
/**
* Closes this connection and releases any system resources associated with it. If
* the connection is
* already closed, then invoking this method has no effect.
* Closes this connection and releases any system resources associated with it. If the connection
* is already closed, then invoking this method has no effect.
*/
@Override
public void close() {
@@ -98,8 +98,7 @@ public class Connection implements Closeable {
}
/**
* Native method to release the Lance connection resources associated with the
* given handle.
* Native method to release the Lance connection resources associated with the given handle.
*
* @param handle The native handle to the connection resource.
*/

View File

@@ -1,27 +1,35 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.lancedb.lancedb;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
import java.util.List;
import java.net.URL;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.net.URL;
import java.nio.file.Path;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ConnectionTest {
private static final String[] TABLE_NAMES = {
"dataset_version",
"new_empty_dataset",
"test",
"write_stream"
"dataset_version", "new_empty_dataset", "test", "write_stream"
};
@TempDir
static Path tempDir; // Temporary directory for the tests
@TempDir static Path tempDir; // Temporary directory for the tests
private static URL lanceDbURL;
@BeforeAll
@@ -53,18 +61,21 @@ public class ConnectionTest {
@Test
void tableNamesStartAfter() {
try (Connection conn = Connection.connect(lanceDbURL.toString())) {
assertTableNamesStartAfter(conn, TABLE_NAMES[0], 3, TABLE_NAMES[1], TABLE_NAMES[2], TABLE_NAMES[3]);
assertTableNamesStartAfter(
conn, TABLE_NAMES[0], 3, TABLE_NAMES[1], TABLE_NAMES[2], TABLE_NAMES[3]);
assertTableNamesStartAfter(conn, TABLE_NAMES[1], 2, TABLE_NAMES[2], TABLE_NAMES[3]);
assertTableNamesStartAfter(conn, TABLE_NAMES[2], 1, TABLE_NAMES[3]);
assertTableNamesStartAfter(conn, TABLE_NAMES[3], 0);
assertTableNamesStartAfter(conn, "a_dataset", 4, TABLE_NAMES[0], TABLE_NAMES[1], TABLE_NAMES[2], TABLE_NAMES[3]);
assertTableNamesStartAfter(
conn, "a_dataset", 4, TABLE_NAMES[0], TABLE_NAMES[1], TABLE_NAMES[2], TABLE_NAMES[3]);
assertTableNamesStartAfter(conn, "o_dataset", 2, TABLE_NAMES[2], TABLE_NAMES[3]);
assertTableNamesStartAfter(conn, "v_dataset", 1, TABLE_NAMES[3]);
assertTableNamesStartAfter(conn, "z_dataset", 0);
}
}
private void assertTableNamesStartAfter(Connection conn, String startAfter, int expectedSize, String... expectedNames) {
private void assertTableNamesStartAfter(
Connection conn, String startAfter, int expectedSize, String... expectedNames) {
List<String> tableNames = conn.tableNames(startAfter);
assertEquals(expectedSize, tableNames.size());
for (int i = 0; i < expectedNames.length; i++) {
@@ -74,7 +85,7 @@ public class ConnectionTest {
@Test
void tableNamesLimit() {
try (Connection conn = Connection.connect(lanceDbURL.toString())) {
try (Connection conn = Connection.connect(lanceDbURL.toString())) {
for (int i = 0; i <= TABLE_NAMES.length; i++) {
List<String> tableNames = conn.tableNames(i);
assertEquals(i, tableNames.size());

View File

@@ -6,7 +6,7 @@
<groupId>com.lancedb</groupId>
<artifactId>lancedb-parent</artifactId>
<version>0.18.2-beta.0</version>
<version>0.18.3-beta.0</version>
<packaging>pom</packaging>
<name>LanceDB Parent</name>
@@ -29,6 +29,25 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<arrow.version>15.0.0</arrow.version>
<spotless.skip>false</spotless.skip>
<spotless.version>2.30.0</spotless.version>
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>
<spotless.delimiter>package</spotless.delimiter>
<spotless.license.header>
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
</spotless.license.header>
</properties>
<modules>
@@ -127,7 +146,8 @@
<configuration>
<configLocation>google_checks.xml</configLocation>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<failsOnError>false</failsOnError>
<failOnViolation>false</failOnViolation>
<violationSeverity>warning</violationSeverity>
<linkXRef>false</linkXRef>
</configuration>
@@ -141,6 +161,10 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
</plugin>
</plugins>
<pluginManagement>
<plugins>
@@ -179,6 +203,54 @@
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>${spotless.version}</version>
<configuration>
<skip>${spotless.skip}</skip>
<upToDateChecking>
<enabled>true</enabled>
</upToDateChecking>
<java>
<includes>
<include>src/main/java/**/*.java</include>
<include>src/test/java/**/*.java</include>
</includes>
<googleJavaFormat>
<version>${spotless.java.googlejavaformat.version}</version>
<style>GOOGLE</style>
</googleJavaFormat>
<importOrder>
<order>com.lancedb.lance,,javax,java,\#</order>
</importOrder>
<removeUnusedImports />
</java>
<scala>
<includes>
<include>src/main/scala/**/*.scala</include>
<include>src/main/scala-*/**/*.scala</include>
<include>src/test/scala/**/*.scala</include>
<include>src/test/scala-*/**/*.scala</include>
</includes>
</scala>
<licenseHeader>
<content>${spotless.license.header}</content>
<delimiter>${spotless.delimiter}</delimiter>
</licenseHeader>
</configuration>
<executions>
<execution>
<id>spotless-check</id>
<phase>validate</phase>
<goals>
<goal>apply</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</pluginManagement>
</build>

44
node/package-lock.json generated
View File

@@ -1,12 +1,12 @@
{
"name": "vectordb",
"version": "0.18.2-beta.0",
"version": "0.18.3-beta.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "vectordb",
"version": "0.18.2-beta.0",
"version": "0.18.3-beta.0",
"cpu": [
"x64",
"arm64"
@@ -52,11 +52,11 @@
"uuid": "^9.0.0"
},
"optionalDependencies": {
"@lancedb/vectordb-darwin-arm64": "0.18.2-beta.0",
"@lancedb/vectordb-darwin-x64": "0.18.2-beta.0",
"@lancedb/vectordb-linux-arm64-gnu": "0.18.2-beta.0",
"@lancedb/vectordb-linux-x64-gnu": "0.18.2-beta.0",
"@lancedb/vectordb-win32-x64-msvc": "0.18.2-beta.0"
"@lancedb/vectordb-darwin-arm64": "0.18.3-beta.0",
"@lancedb/vectordb-darwin-x64": "0.18.3-beta.0",
"@lancedb/vectordb-linux-arm64-gnu": "0.18.3-beta.0",
"@lancedb/vectordb-linux-x64-gnu": "0.18.3-beta.0",
"@lancedb/vectordb-win32-x64-msvc": "0.18.3-beta.0"
},
"peerDependencies": {
"@apache-arrow/ts": "^14.0.2",
@@ -327,9 +327,9 @@
}
},
"node_modules/@lancedb/vectordb-darwin-arm64": {
"version": "0.18.2-beta.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.18.2-beta.0.tgz",
"integrity": "sha512-FzIcElkS6R5I5kU1S5m7yLVTB1Duv1XcmZQtVmYl/JjNlfxS1WTtMzdzMqSBFohDcgU2Tkc5+1FpK1B94dUUbg==",
"version": "0.18.3-beta.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.18.3-beta.0.tgz",
"integrity": "sha512-dhJ5VlXV2N/L67mIpTSePhb8krX0FyQgpuz3I+4T4vYuU5JEF3cmedQ5TF5+3cGJhZim4PHRYLkfgCyTlxcqUg==",
"cpu": [
"arm64"
],
@@ -340,9 +340,9 @@
]
},
"node_modules/@lancedb/vectordb-darwin-x64": {
"version": "0.18.2-beta.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.18.2-beta.0.tgz",
"integrity": "sha512-jv+XludfLNBDm1DjdqyghwDMtd4E+ygwycQpkpK72wyZSh6Qytrgq+4dNi/zCZ3UChFLbKbIxrVxv9yENQn2Pg==",
"version": "0.18.3-beta.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.18.3-beta.0.tgz",
"integrity": "sha512-SHqPkuyfe87d5skf9GERzdeu6AKvVIbXMUwl5N+dVrE7HH6qiuP2HvOmiyHS2lJFgo0Ph8jSBVzPDxxtjF36Dg==",
"cpu": [
"x64"
],
@@ -353,9 +353,9 @@
]
},
"node_modules/@lancedb/vectordb-linux-arm64-gnu": {
"version": "0.18.2-beta.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.18.2-beta.0.tgz",
"integrity": "sha512-8/fBpbNYhhpetf/pZv0DyPnQkeAbsiICMyCoRiNu5auvQK4AsGF1XvLWrDi68u9F0GysBKvuatYuGqa/yh+Anw==",
"version": "0.18.3-beta.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.18.3-beta.0.tgz",
"integrity": "sha512-ohnWsV1n9cxL5ik/GGL4FdQ04Ff9REELcNb1zgmJYyEfwyc6TH9m5HdySO/1ACPZJiLbML4gSvZ10J0Zyb+2SA==",
"cpu": [
"arm64"
],
@@ -366,9 +366,9 @@
]
},
"node_modules/@lancedb/vectordb-linux-x64-gnu": {
"version": "0.18.2-beta.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.18.2-beta.0.tgz",
"integrity": "sha512-7a1Kc/2V2ff4HlLzXyXVdK0Z0VIFUt50v2SBRdlcycJ0NLW9ZqV+9UjB/NAOwMXVgYd7d3rKjACGkQzkpvcyeg==",
"version": "0.18.3-beta.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.18.3-beta.0.tgz",
"integrity": "sha512-nhbW2CKaBSUesiYCPBd9fAsDYIJLadlGsrb2gfjODlFy+2Lpnbz6T9SuV7dNqj6KBw+KHhaRhLqta7tyMZm/EA==",
"cpu": [
"x64"
],
@@ -379,9 +379,9 @@
]
},
"node_modules/@lancedb/vectordb-win32-x64-msvc": {
"version": "0.18.2-beta.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.18.2-beta.0.tgz",
"integrity": "sha512-EeCiSf2RtJMESnkIca28GI6rAStYj2q9sVIyNCXpmIZSkJVpfQ3iswHGAbHrEfaPl0J1Re9cnRHLLuqkumwiIQ==",
"version": "0.18.3-beta.0",
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.18.3-beta.0.tgz",
"integrity": "sha512-VE4TvMdZ7DIrTC8VYylGxEcH4h2UEejSwGX4PxRzrN9QsCQ4m4pOh3L/UguSO3g+Y1QEaGE20iWQoX6wgSEUhA==",
"cpu": [
"x64"
],

View File

@@ -1,6 +1,6 @@
{
"name": "vectordb",
"version": "0.18.2-beta.0",
"version": "0.18.3-beta.0",
"description": " Serverless, low-latency vector database for AI applications",
"private": false,
"main": "dist/index.js",
@@ -89,10 +89,10 @@
}
},
"optionalDependencies": {
"@lancedb/vectordb-darwin-x64": "0.18.2-beta.0",
"@lancedb/vectordb-darwin-arm64": "0.18.2-beta.0",
"@lancedb/vectordb-linux-x64-gnu": "0.18.2-beta.0",
"@lancedb/vectordb-linux-arm64-gnu": "0.18.2-beta.0",
"@lancedb/vectordb-win32-x64-msvc": "0.18.2-beta.0"
"@lancedb/vectordb-darwin-x64": "0.18.3-beta.0",
"@lancedb/vectordb-darwin-arm64": "0.18.3-beta.0",
"@lancedb/vectordb-linux-x64-gnu": "0.18.3-beta.0",
"@lancedb/vectordb-linux-arm64-gnu": "0.18.3-beta.0",
"@lancedb/vectordb-win32-x64-msvc": "0.18.3-beta.0"
}
}

View File

@@ -110,7 +110,7 @@ describe('LanceDB Mirrored Store Integration test', function () {
fs.readdir(path.join(mirroredPath, 'data'), { withFileTypes: true }, (err, files) => {
if (err != null) throw err
assert.equal(files.length, 1)
assert.equal(files.length, 1, `Found files: ${files.map(f => f.name)}`)
assert.isTrue(files[0].name.endsWith('.lance'))
})

View File

@@ -1,7 +1,7 @@
[package]
name = "lancedb-nodejs"
edition.workspace = true
version = "0.18.2-beta.0"
version = "0.18.3-beta.0"
license.workspace = true
description.workspace = true
repository.workspace = true

View File

@@ -17,7 +17,7 @@ describe("when connecting", () => {
it("should connect", async () => {
const db = await connect(tmpDir.name);
expect(db.display()).toBe(
`ListingDatabase(uri=${tmpDir.name}, read_consistency_interval=None)`,
`ListingDatabase(uri=${tmpDir.name}, read_consistency_interval=5s)`,
);
});

View File

@@ -58,7 +58,7 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
it("be displayable", async () => {
expect(table.display()).toMatch(
/NativeTable\(some_table, uri=.*, read_consistency_interval=None\)/,
/NativeTable\(some_table, uri=.*, read_consistency_interval=5s\)/,
);
table.close();
expect(table.display()).toBe("ClosedTable(some_table)");
@@ -633,6 +633,23 @@ describe("When creating an index", () => {
expect(plan2).not.toMatch("LanceScan");
});
it("should be able to run analyze plan", async () => {
await tbl.createIndex("vec");
await tbl.add([
{
id: 300,
vec: Array(32)
.fill(1)
.map(() => Math.random()),
tags: [],
},
]);
const plan = await tbl.query().nearestTo(queryVec).analyzePlan();
expect(plan).toMatch("AnalyzeExec");
expect(plan).toMatch("metrics=");
});
it("should be able to query with row id", async () => {
const results = await tbl
.query()
@@ -1346,6 +1363,30 @@ describe("when calling explainPlan", () => {
});
});
describe("when calling analyzePlan", () => {
let tmpDir: tmp.DirResult;
let table: Table;
let queryVec: number[];
beforeEach(async () => {
tmpDir = tmp.dirSync({ unsafeCleanup: true });
const con = await connect(tmpDir.name);
table = await con.createTable("vectors", [{ id: 1, vector: [1.1, 0.9] }]);
});
afterEach(() => {
tmpDir.removeCallback();
});
it("retrieves runtime metrics", async () => {
queryVec = Array(2)
.fill(1)
.map(() => Math.random());
const plan = await table.query().nearestTo(queryVec).analyzePlan();
console.log("Query Plan:\n", plan); // <--- Print the plan
expect(plan).toMatch("AnalyzeExec");
});
});
describe("column name options", () => {
let tmpDir: tmp.DirResult;
let table: Table;

View File

@@ -202,5 +202,35 @@ test("basic table examples", async () => {
// --8<-- [end:create_f16_table]
await db.dropTable("f16_tbl");
}
const uri = databaseDir;
await db.createTable("my_table", [{ id: 1 }, { id: 2 }]);
{
// --8<-- [start:table_strong_consistency]
const db = await lancedb.connect({ uri, readConsistencyInterval: 0 });
const tbl = await db.openTable("my_table");
// --8<-- [end:table_strong_consistency]
}
{
// --8<-- [start:table_eventual_consistency]
const db = await lancedb.connect({ uri, readConsistencyInterval: 5 });
const tbl = await db.openTable("my_table");
// --8<-- [end:table_eventual_consistency]
}
{
// --8<-- [start:table_no_consistency]
const db = await lancedb.connect({ uri, readConsistencyInterval: null });
const tbl = await db.openTable("my_table");
// --8<-- [end:table_no_consistency]
}
{
// --8<-- [start:table_checkout_latest]
const tbl = await db.openTable("my_table");
// (Other writes happen to test_table_async from another process)
// Check for updates
tbl.checkoutLatest();
// --8<-- [end:table_checkout_latest]
}
});
});

View File

@@ -348,6 +348,43 @@ export class QueryBase<NativeQueryType extends NativeQuery | NativeVectorQuery>
return this.inner.explainPlan(verbose);
}
}
/**
* Executes the query and returns the physical query plan annotated with runtime metrics.
*
* This is useful for debugging and performance analysis, as it shows how the query was executed
* and includes metrics such as elapsed time, rows processed, and I/O statistics.
*
* @example
* import * as lancedb from "@lancedb/lancedb"
*
* const db = await lancedb.connect("./.lancedb");
* const table = await db.createTable("my_table", [
* { vector: [1.1, 0.9], id: "1" },
* ]);
*
* const plan = await table.query().nearestTo([0.5, 0.2]).analyzePlan();
*
* Example output (with runtime metrics inlined):
* AnalyzeExec verbose=true, metrics=[]
* ProjectionExec: expr=[id@3 as id, vector@0 as vector, _distance@2 as _distance], metrics=[output_rows=1, elapsed_compute=3.292µs]
* Take: columns="vector, _rowid, _distance, (id)", metrics=[output_rows=1, elapsed_compute=66.001µs, batches_processed=1, bytes_read=8, iops=1, requests=1]
* CoalesceBatchesExec: target_batch_size=1024, metrics=[output_rows=1, elapsed_compute=3.333µs]
* GlobalLimitExec: skip=0, fetch=10, metrics=[output_rows=1, elapsed_compute=167ns]
* FilterExec: _distance@2 IS NOT NULL, metrics=[output_rows=1, elapsed_compute=8.542µs]
* SortExec: TopK(fetch=10), expr=[_distance@2 ASC NULLS LAST], metrics=[output_rows=1, elapsed_compute=63.25µs, row_replacements=1]
* KNNVectorDistance: metric=l2, metrics=[output_rows=1, elapsed_compute=114.333µs, output_batches=1]
* LanceScan: uri=/path/to/data, projection=[vector], row_id=true, row_addr=false, ordered=false, metrics=[output_rows=1, elapsed_compute=103.626µs, bytes_read=549, iops=2, requests=2]
*
* @returns A query execution plan with runtime metrics for each step.
*/
async analyzePlan(): Promise<string> {
if (this.inner instanceof Promise) {
return this.inner.then((inner) => inner.analyzePlan());
} else {
return this.inner.analyzePlan();
}
}
}
/**

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-arm64",
"version": "0.18.2-beta.0",
"version": "0.18.3-beta.0",
"os": ["darwin"],
"cpu": ["arm64"],
"main": "lancedb.darwin-arm64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-darwin-x64",
"version": "0.18.2-beta.0",
"version": "0.18.3-beta.0",
"os": ["darwin"],
"cpu": ["x64"],
"main": "lancedb.darwin-x64.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-gnu",
"version": "0.18.2-beta.0",
"version": "0.18.3-beta.0",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-arm64-musl",
"version": "0.18.2-beta.0",
"version": "0.18.3-beta.0",
"os": ["linux"],
"cpu": ["arm64"],
"main": "lancedb.linux-arm64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-gnu",
"version": "0.18.2-beta.0",
"version": "0.18.3-beta.0",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-gnu.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-linux-x64-musl",
"version": "0.18.2-beta.0",
"version": "0.18.3-beta.0",
"os": ["linux"],
"cpu": ["x64"],
"main": "lancedb.linux-x64-musl.node",

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-arm64-msvc",
"version": "0.18.2-beta.0",
"version": "0.18.3-beta.0",
"os": [
"win32"
],

View File

@@ -1,6 +1,6 @@
{
"name": "@lancedb/lancedb-win32-x64-msvc",
"version": "0.18.2-beta.0",
"version": "0.18.3-beta.0",
"os": ["win32"],
"cpu": ["x64"],
"main": "lancedb.win32-x64-msvc.node",

View File

@@ -1,12 +1,12 @@
{
"name": "@lancedb/lancedb",
"version": "0.18.2-beta.0",
"version": "0.18.3-beta.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@lancedb/lancedb",
"version": "0.18.2-beta.0",
"version": "0.18.3-beta.0",
"cpu": [
"x64",
"arm64"

View File

@@ -11,7 +11,7 @@
"ann"
],
"private": false,
"version": "0.18.2-beta.0",
"version": "0.18.3-beta.0",
"main": "dist/index.js",
"exports": {
".": "./dist/index.js",

View File

@@ -48,8 +48,16 @@ impl Connection {
pub async fn new(uri: String, options: ConnectionOptions) -> napi::Result<Self> {
let mut builder = ConnectBuilder::new(&uri);
if let Some(interval) = options.read_consistency_interval {
builder =
builder.read_consistency_interval(std::time::Duration::from_secs_f64(interval));
match interval {
Either::A(seconds) => {
builder = builder.read_consistency_interval(Some(
std::time::Duration::from_secs_f64(seconds),
));
}
Either::B(_) => {
builder = builder.read_consistency_interval(None);
}
}
}
if let Some(storage_options) = options.storage_options {
for (key, value) in storage_options {

View File

@@ -4,6 +4,7 @@
use std::collections::HashMap;
use env_logger::Env;
use napi::{bindgen_prelude::Null, Either};
use napi_derive::*;
mod connection;
@@ -18,7 +19,6 @@ mod table;
mod util;
#[napi(object)]
#[derive(Debug)]
pub struct ConnectionOptions {
/// (For LanceDB OSS only): The interval, in seconds, at which to check for
/// updates to the table from other processes. If None, then consistency is not
@@ -29,7 +29,7 @@ pub struct ConnectionOptions {
/// has passed since the last check, then the table will be checked for updates.
/// Note: this consistency only applies to read operations. Write operations are
/// always consistent.
pub read_consistency_interval: Option<f64>,
pub read_consistency_interval: Option<Either<f64, Null>>,
/// (For LanceDB OSS only): configuration for object storage.
///
/// The available options are described at https://lancedb.github.io/lancedb/guides/storage/

View File

@@ -114,6 +114,16 @@ impl Query {
))
})
}
#[napi(catch_unwind)]
pub async fn analyze_plan(&self) -> napi::Result<String> {
self.inner.analyze_plan().await.map_err(|e| {
napi::Error::from_reason(format!(
"Failed to execute analyze plan: {}",
convert_error(&e)
))
})
}
}
#[napi]
@@ -259,4 +269,14 @@ impl VectorQuery {
))
})
}
#[napi(catch_unwind)]
pub async fn analyze_plan(&self) -> napi::Result<String> {
self.inner.analyze_plan().await.map_err(|e| {
napi::Error::from_reason(format!(
"Failed to execute analyze plan: {}",
convert_error(&e)
))
})
}
}

View File

@@ -1,5 +1,5 @@
[tool.bumpversion]
current_version = "0.21.2-beta.0"
current_version = "0.22.0-beta.0"
parse = """(?x)
(?P<major>0|[1-9]\\d*)\\.
(?P<minor>0|[1-9]\\d*)\\.

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-python"
version = "0.21.2-beta.0"
version = "0.22.0-beta.0"
edition.workspace = true
description = "Python bindings for LanceDB"
license.workspace = true

View File

@@ -26,7 +26,7 @@ def connect(
api_key: Optional[str] = None,
region: str = "us-east-1",
host_override: Optional[str] = None,
read_consistency_interval: Optional[timedelta] = None,
read_consistency_interval: Optional[timedelta] = timedelta(seconds=5),
request_thread_pool: Optional[Union[int, ThreadPoolExecutor]] = None,
client_config: Union[ClientConfig, Dict[str, Any], None] = None,
storage_options: Optional[Dict[str, str]] = None,
@@ -49,9 +49,8 @@ def connect(
read_consistency_interval: timedelta, default None
(For LanceDB OSS only)
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked. For performance
reasons, this is the default. For strong consistency, set this to
zero seconds. Then every read will check for updates from other
processes. If None, then consistency is not checked. For strong consistency,
set this to zero seconds. Then every read will check for updates from other
processes. As a compromise, you can set this to a non-zero timedelta
for eventual consistency. If more than that interval has passed since
the last check, then the table will be checked for updates. Note: this
@@ -122,7 +121,7 @@ async def connect_async(
api_key: Optional[str] = None,
region: str = "us-east-1",
host_override: Optional[str] = None,
read_consistency_interval: Optional[timedelta] = None,
read_consistency_interval: Optional[timedelta] = timedelta(seconds=5),
client_config: Optional[Union[ClientConfig, Dict[str, Any]]] = None,
storage_options: Optional[Dict[str, str]] = None,
) -> AsyncConnection:
@@ -143,9 +142,8 @@ async def connect_async(
read_consistency_interval: timedelta, default None
(For LanceDB OSS only)
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked. For performance
reasons, this is the default. For strong consistency, set this to
zero seconds. Then every read will check for updates from other
processes. If None, then consistency is not checked. For strong consistency,
set this to zero seconds. Then every read will check for updates from other
processes. As a compromise, you can set this to a non-zero timedelta
for eventual consistency. If more than that interval has passed since
the last check, then the table will be checked for updates. Note: this

View File

@@ -48,10 +48,11 @@ class Table:
async def version(self) -> int: ...
async def checkout(self, version: int): ...
async def checkout_latest(self): ...
async def restore(self): ...
async def restore(self, version: Optional[int] = None): ...
async def list_indices(self) -> list[IndexConfig]: ...
async def delete(self, filter: str): ...
async def add_columns(self, columns: list[tuple[str, str]]) -> None: ...
async def add_columns_with_schema(self, schema: pa.Schema) -> None: ...
async def alter_columns(self, columns: list[dict[str, Any]]) -> None: ...
async def optimize(
self,
@@ -94,6 +95,8 @@ class Query:
def nearest_to(self, query_vec: pa.Array) -> VectorQuery: ...
def nearest_to_text(self, query: dict) -> FTSQuery: ...
async def execute(self, max_batch_length: Optional[int]) -> RecordBatchStream: ...
async def explain_plan(self, verbose: Optional[bool]) -> str: ...
async def analyze_plan(self) -> str: ...
def to_query_request(self) -> PyQueryRequest: ...
class FTSQuery:
@@ -108,7 +111,6 @@ class FTSQuery:
def add_query_vector(self, query_vec: pa.Array) -> None: ...
def nearest_to(self, query_vec: pa.Array) -> HybridQuery: ...
async def execute(self, max_batch_length: Optional[int]) -> RecordBatchStream: ...
async def explain_plan(self) -> str: ...
def to_query_request(self) -> PyQueryRequest: ...
class VectorQuery:

View File

@@ -6,6 +6,7 @@ from __future__ import annotations
from abc import abstractmethod
from pathlib import Path
from datetime import timedelta
from typing import TYPE_CHECKING, Dict, Iterable, List, Literal, Optional, Union
from lancedb.embeddings.registry import EmbeddingFunctionRegistry
@@ -32,7 +33,6 @@ import deprecation
if TYPE_CHECKING:
import pyarrow as pa
from .pydantic import LanceModel
from datetime import timedelta
from ._lancedb import Connection as LanceDbConnection
from .common import DATA, URI
@@ -318,9 +318,8 @@ class LanceDBConnection(DBConnection):
The root uri of the database.
read_consistency_interval: timedelta, default None
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked. For performance
reasons, this is the default. For strong consistency, set this to
zero seconds. Then every read will check for updates from other
processes. If None, then consistency is not checked. For strong consistency,
set this to zero seconds. Then every read will check for updates from other
processes. As a compromise, you can set this to a non-zero timedelta
for eventual consistency. If more than that interval has passed since
the last check, then the table will be checked for updates. Note: this
@@ -352,7 +351,7 @@ class LanceDBConnection(DBConnection):
self,
uri: URI,
*,
read_consistency_interval: Optional[timedelta] = None,
read_consistency_interval: Optional[timedelta] = timedelta(seconds=5),
storage_options: Optional[Dict[str, str]] = None,
):
if not isinstance(uri, Path):

View File

@@ -657,7 +657,45 @@ class LanceQueryBuilder(ABC):
-------
plan : str
""" # noqa: E501
return self._table._explain_plan(self.to_query_object())
return self._table._explain_plan(self.to_query_object(), verbose=verbose)
def analyze_plan(self) -> str:
"""
Run the query and return its execution plan with runtime metrics.
This returns detailed metrics for each step, such as elapsed time,
rows processed, bytes read, and I/O stats. It is useful for debugging
and performance tuning.
Examples
--------
>>> import lancedb
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table", [{"vector": [99.0, 99]}])
>>> query = [100, 100]
>>> plan = table.search(query).analyze_plan()
>>> print(plan) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
AnalyzeExec verbose=true, metrics=[]
ProjectionExec: expr=[...], metrics=[...]
GlobalLimitExec: skip=0, fetch=10, metrics=[...]
FilterExec: _distance@2 IS NOT NULL,
metrics=[output_rows=..., elapsed_compute=...]
SortExec: TopK(fetch=10), expr=[...],
preserve_partitioning=[...],
metrics=[output_rows=..., elapsed_compute=..., row_replacements=...]
KNNVectorDistance: metric=l2,
metrics=[output_rows=..., elapsed_compute=..., output_batches=...]
LanceScan: uri=..., projection=[vector], row_id=true,
row_addr=false, ordered=false,
metrics=[output_rows=..., elapsed_compute=...,
bytes_read=..., iops=..., requests=...]
Returns
-------
plan : str
The physical query execution plan with runtime metrics.
"""
return self._table._analyze_plan(self.to_query_object())
def vector(self, vector: Union[np.ndarray, list]) -> Self:
"""Set the vector to search for.
@@ -1941,6 +1979,15 @@ class AsyncQueryBase(object):
""" # noqa: E501
return await self._inner.explain_plan(verbose)
async def analyze_plan(self):
"""Execute the query and display with runtime metrics.
Returns
-------
plan : str
"""
return await self._inner.analyze_plan()
class AsyncQuery(AsyncQueryBase):
def __init__(self, inner: LanceQuery):
@@ -2510,7 +2557,7 @@ class AsyncHybridQuery(AsyncQueryBase, AsyncVectorQueryBase):
Returns
-------
plan
plan : str
""" # noqa: E501
results = ["Vector Search Plan:"]
@@ -2519,3 +2566,23 @@ class AsyncHybridQuery(AsyncQueryBase, AsyncVectorQueryBase):
results.append(await self._inner.to_fts_query().explain_plan(verbose))
return "\n".join(results)
async def analyze_plan(self):
"""
Execute the query and return the physical execution plan with runtime metrics.
This runs both the vector and FTS (full-text search) queries and returns
detailed metrics for each step of execution—such as rows processed,
elapsed time, I/O stats, and more. Its useful for debugging and
performance analysis.
Returns
-------
plan : str
"""
results = ["Vector Search Query:"]
results.append(await self._inner.to_vector_query().analyze_plan())
results.append("FTS Search Query:")
results.append(await self._inner.to_fts_query().analyze_plan())
return "\n".join(results)

View File

@@ -87,6 +87,9 @@ class RemoteTable(Table):
def checkout_latest(self):
return LOOP.run(self._table.checkout_latest())
def restore(self, version: Optional[int] = None):
return LOOP.run(self._table.restore(version))
def list_indices(self) -> Iterable[IndexConfig]:
"""List all the indices on the table"""
return LOOP.run(self._table.list_indices())
@@ -365,6 +368,12 @@ class RemoteTable(Table):
return pa.RecordBatchReader.from_batches(async_iter.schema, iter_sync())
def _explain_plan(self, query: Query, verbose: Optional[bool] = False) -> str:
return LOOP.run(self._table._explain_plan(query, verbose))
def _analyze_plan(self, query: Query) -> str:
return LOOP.run(self._table._analyze_plan(query))
def merge_insert(self, on: Union[str, Iterable[str]]) -> LanceMergeInsertBuilder:
"""Returns a [`LanceMergeInsertBuilder`][lancedb.merge.LanceMergeInsertBuilder]
that can be used to create a "merge insert" operation.

View File

@@ -1007,6 +1007,12 @@ class Table(ABC):
self, query: Query, batch_size: Optional[int] = None
) -> pa.RecordBatchReader: ...
@abstractmethod
def _explain_plan(self, query: Query, verbose: Optional[bool] = False) -> str: ...
@abstractmethod
def _analyze_plan(self, query: Query) -> str: ...
@abstractmethod
def _do_merge(
self,
@@ -1262,16 +1268,21 @@ class Table(ABC):
"""
@abstractmethod
def add_columns(self, transforms: Dict[str, str]):
def add_columns(
self, transforms: Dict[str, str] | pa.Field | List[pa.Field] | pa.Schema
):
"""
Add new columns with defined values.
Parameters
----------
transforms: Dict[str, str]
transforms: Dict[str, str], pa.Field, List[pa.Field], pa.Schema
A map of column name to a SQL expression to use to calculate the
value of the new column. These expressions will be evaluated for
each row in the table, and can reference existing columns.
Alternatively, a pyarrow Field or Schema can be provided to add
new columns with the specified data types. The new columns will
be initialized with null values.
"""
@abstractmethod
@@ -1339,6 +1350,21 @@ class Table(ABC):
It can also be used to undo a `[Self::checkout]` operation
"""
@abstractmethod
def restore(self, version: Optional[int] = None):
"""Restore a version of the table. This is an in-place operation.
This creates a new version where the data is equivalent to the
specified previous version. Data is not copied (as of python-v0.2.1).
Parameters
----------
version : int, default None
The version to restore. If unspecified then restores the currently
checked out version. If the currently checked out version is the
latest version then this is a no-op.
"""
@abstractmethod
def list_versions(self) -> List[Dict[str, Any]]:
"""List all versions of the table"""
@@ -2292,8 +2318,11 @@ class LanceTable(Table):
return pa.RecordBatchReader.from_batches(async_iter.schema, iter_sync())
def _explain_plan(self, query: Query) -> str:
return LOOP.run(self._table._explain_plan(query))
def _explain_plan(self, query: Query, verbose: Optional[bool] = False) -> str:
return LOOP.run(self._table._explain_plan(query, verbose))
def _analyze_plan(self, query: Query) -> str:
return LOOP.run(self._table._analyze_plan(query))
def _do_merge(
self,
@@ -2442,7 +2471,9 @@ class LanceTable(Table):
"""
return LOOP.run(self._table.index_stats(index_name))
def add_columns(self, transforms: Dict[str, str]):
def add_columns(
self, transforms: Dict[str, str] | pa.field | List[pa.field] | pa.Schema
):
LOOP.run(self._table.add_columns(transforms))
def alter_columns(self, *alterations: Iterable[Dict[str, str]]):
@@ -3358,10 +3389,15 @@ class AsyncTable:
return await async_query.to_batches(max_batch_length=batch_size)
async def _explain_plan(self, query: Query) -> str:
async def _explain_plan(self, query: Query, verbose: Optional[bool]) -> str:
# This method is used by the sync table
async_query = self._sync_query_to_async(query)
return await async_query.explain_plan()
return await async_query.explain_plan(verbose)
async def _analyze_plan(self, query: Query) -> str:
# This method is used by the sync table
async_query = self._sync_query_to_async(query)
return await async_query.analyze_plan()
async def _do_merge(
self,
@@ -3501,7 +3537,9 @@ class AsyncTable:
return await self._inner.update(updates_sql, where)
async def add_columns(self, transforms: dict[str, str]):
async def add_columns(
self, transforms: dict[str, str] | pa.field | List[pa.field] | pa.Schema
):
"""
Add new columns with defined values.
@@ -3511,8 +3549,19 @@ class AsyncTable:
A map of column name to a SQL expression to use to calculate the
value of the new column. These expressions will be evaluated for
each row in the table, and can reference existing columns.
Alternatively, you can pass a pyarrow field or schema to add
new columns with NULLs.
"""
await self._inner.add_columns(list(transforms.items()))
if isinstance(transforms, pa.Field):
transforms = [transforms]
if isinstance(transforms, list) and all(
{isinstance(f, pa.Field) for f in transforms}
):
transforms = pa.schema(transforms)
if isinstance(transforms, pa.Schema):
await self._inner.add_columns_with_schema(transforms)
else:
await self._inner.add_columns(list(transforms.items()))
async def alter_columns(self, *alterations: Iterable[dict[str, Any]]):
"""
@@ -3610,7 +3659,7 @@ class AsyncTable:
"""
await self._inner.checkout_latest()
async def restore(self):
async def restore(self, version: Optional[int] = None):
"""
Restore the table to the currently checked out version
@@ -3623,7 +3672,7 @@ class AsyncTable:
Once the operation concludes the table will no longer be in a checked
out state and the read_consistency_interval, if any, will apply.
"""
await self._inner.restore()
await self._inner.restore(version)
async def optimize(
self,

View File

@@ -315,6 +315,11 @@ def test_table():
db = lancedb.connect(uri, read_consistency_interval=timedelta(seconds=5))
tbl = db.open_table("test_table")
# --8<-- [end:table_eventual_consistency]
# --8<-- [start:table_no_consistency]
uri = "data/sample-lancedb"
db = lancedb.connect(uri, read_consistency_interval=None)
tbl = db.open_table("test_table")
# --8<-- [end:table_no_consistency]
# --8<-- [start:table_checkout_latest]
tbl = db.open_table("test_table")
@@ -562,13 +567,19 @@ async def test_table_async():
async_db = await lancedb.connect_async(uri, read_consistency_interval=timedelta(0))
async_tbl = await async_db.open_table("test_table_async")
# --8<-- [end:table_async_strong_consistency]
# --8<-- [start:table_async_ventual_consistency]
# --8<-- [start:table_async_eventual_consistency]
uri = "data/sample-lancedb"
async_db = await lancedb.connect_async(
uri, read_consistency_interval=timedelta(seconds=5)
)
async_tbl = await async_db.open_table("test_table_async")
# --8<-- [end:table_async_eventual_consistency]
# --8<-- [start:table_async_no_consistency]
uri = "data/sample-lancedb"
async_db = await lancedb.connect_async(uri, read_consistency_interval=None)
async_tbl = await async_db.open_table("test_table_async")
# --8<-- [end:table_async_no_consistency]
# --8<-- [start:table_async_checkout_latest]
async_tbl = await async_db.open_table("test_table_async")

View File

@@ -3,7 +3,6 @@
import re
from datetime import timedelta
import os
import lancedb
@@ -299,13 +298,11 @@ def test_create_exist_ok(tmp_db: lancedb.DBConnection):
@pytest.mark.asyncio
async def test_connect(tmp_path):
db = await lancedb.connect_async(tmp_path)
assert str(db) == f"ListingDatabase(uri={tmp_path}, read_consistency_interval=None)"
db = await lancedb.connect_async(
tmp_path, read_consistency_interval=timedelta(seconds=5)
)
assert str(db) == f"ListingDatabase(uri={tmp_path}, read_consistency_interval=5s)"
db = await lancedb.connect_async(tmp_path, read_consistency_interval=None)
assert str(db) == f"ListingDatabase(uri={tmp_path}, read_consistency_interval=None)"
@pytest.mark.asyncio
async def test_close(mem_db_async: lancedb.AsyncConnection):
@@ -453,7 +450,7 @@ async def test_open_table(tmp_path):
assert tbl.name == "test"
assert (
re.search(
r"NativeTable\(test, uri=.*test\.lance, read_consistency_interval=None\)",
r"NativeTable\(test, uri=.*test\.lance, read_consistency_interval=5s\)",
str(tbl),
)
is not None

View File

@@ -114,6 +114,16 @@ async def test_explain_plan(table: AsyncTable):
assert "LanceScan" in plan
@pytest.mark.asyncio
async def test_analyze_plan(table: AsyncTable):
res = await (
table.query().nearest_to_text("dog").nearest_to([0.1, 0.1]).analyze_plan()
)
assert "AnalyzeExec" in res
assert "metrics=" in res
def test_normalize_scores():
cases = [
(pa.array([0.1, 0.4]), pa.array([0.0, 1.0])),

View File

@@ -702,6 +702,20 @@ async def test_fast_search_async(tmp_path):
assert "LanceScan" not in plan
def test_analyze_plan(table):
q = LanceVectorQueryBuilder(table, [0, 0], "vector")
res = q.analyze_plan()
assert "AnalyzeExec" in res
assert "metrics=" in res
@pytest.mark.asyncio
async def test_analyze_plan_async(table_async: AsyncTable):
res = await table_async.query().nearest_to(pa.array([1, 2])).analyze_plan()
assert "AnalyzeExec" in res
assert "metrics=" in res
def test_explain_plan(table):
q = LanceVectorQueryBuilder(table, [0, 0], "vector")
plan = q.explain_plan(verbose=True)

View File

@@ -32,7 +32,11 @@ def test_basic(mem_db: DBConnection):
table = mem_db.create_table("test", data=data)
assert table.name == "test"
assert "LanceTable(name='test', version=1, _conn=LanceDBConnection(" in repr(table)
assert (
"LanceTable(name='test', version=1, "
"read_consistency_interval=datetime.timedelta(seconds=5), "
"_conn=LanceDBConnection("
) in repr(table)
expected_schema = pa.schema(
{
"vector": pa.list_(pa.float32(), 2),
@@ -1384,6 +1388,37 @@ async def test_add_columns_async(mem_db_async: AsyncConnection):
assert data["new_col"].to_pylist() == [2, 3]
@pytest.mark.asyncio
async def test_add_columns_with_schema(mem_db_async: AsyncConnection):
data = pa.table({"id": [0, 1]})
table = await mem_db_async.create_table("my_table", data=data)
await table.add_columns(
[pa.field("x", pa.int64()), pa.field("vector", pa.list_(pa.float32(), 8))]
)
assert await table.schema() == pa.schema(
[
pa.field("id", pa.int64()),
pa.field("x", pa.int64()),
pa.field("vector", pa.list_(pa.float32(), 8)),
]
)
table = await mem_db_async.create_table("table2", data=data)
await table.add_columns(
pa.schema(
[pa.field("y", pa.int64()), pa.field("emb", pa.list_(pa.float32(), 8))]
)
)
assert await table.schema() == pa.schema(
[
pa.field("id", pa.int64()),
pa.field("y", pa.int64()),
pa.field("emb", pa.list_(pa.float32(), 8)),
]
)
def test_alter_columns(mem_db: DBConnection):
data = pa.table({"id": [0, 1]})
table = mem_db.create_table("my_table", data=data)

View File

@@ -204,7 +204,9 @@ pub fn connect(
}
if let Some(read_consistency_interval) = read_consistency_interval {
let read_consistency_interval = Duration::from_secs_f64(read_consistency_interval);
builder = builder.read_consistency_interval(read_consistency_interval);
builder = builder.read_consistency_interval(Some(read_consistency_interval));
} else {
builder = builder.read_consistency_interval(None);
}
if let Some(storage_options) = storage_options {
builder = builder.storage_options(storage_options);

View File

@@ -271,7 +271,7 @@ impl Query {
})
}
fn explain_plan(self_: PyRef<'_, Self>, verbose: bool) -> PyResult<Bound<'_, PyAny>> {
pub fn explain_plan(self_: PyRef<'_, Self>, verbose: bool) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
inner
@@ -281,6 +281,16 @@ impl Query {
})
}
pub fn analyze_plan(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
inner
.analyze_plan()
.await
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
})
}
pub fn to_query_request(&self) -> PyQueryRequest {
PyQueryRequest::from(AnyQuery::Query(self.inner.clone().into_request()))
}
@@ -365,6 +375,16 @@ impl FTSQuery {
})
}
pub fn analyze_plan(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
inner
.analyze_plan()
.await
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
})
}
pub fn get_query(&self) -> String {
self.fts_query.query.clone()
}
@@ -470,7 +490,7 @@ impl VectorQuery {
})
}
fn explain_plan(self_: PyRef<'_, Self>, verbose: bool) -> PyResult<Bound<'_, PyAny>> {
pub fn explain_plan(self_: PyRef<'_, Self>, verbose: bool) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
inner
@@ -480,6 +500,16 @@ impl VectorQuery {
})
}
pub fn analyze_plan(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner.clone();
future_into_py(self_.py(), async move {
inner
.analyze_plan()
.await
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
})
}
pub fn nearest_to_text(&mut self, query: Bound<'_, PyDict>) -> PyResult<HybridQuery> {
let base_query = self.inner.clone().into_plain();
let fts_query = Query::new(base_query).nearest_to_text(query)?;

View File

@@ -1,9 +1,11 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
use std::{collections::HashMap, sync::Arc};
use arrow::{
datatypes::DataType,
datatypes::{DataType, Schema},
ffi_stream::ArrowArrayStreamReader,
pyarrow::{FromPyArrow, ToPyArrow},
pyarrow::{FromPyArrow, PyArrowType, ToPyArrow},
};
use lancedb::table::{
AddDataMode, ColumnAlteration, Duration, NewColumnTransform, OptimizeAction, OptimizeOptions,
@@ -16,7 +18,6 @@ use pyo3::{
Bound, FromPyObject, PyAny, PyRef, PyResult, Python,
};
use pyo3_async_runtimes::tokio::future_into_py;
use std::collections::HashMap;
use crate::{
error::PythonErrorExt,
@@ -303,12 +304,16 @@ impl Table {
})
}
pub fn restore(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
#[pyo3(signature = (version=None))]
pub fn restore(self_: PyRef<'_, Self>, version: Option<u64>) -> PyResult<Bound<'_, PyAny>> {
let inner = self_.inner_ref()?.clone();
future_into_py(
self_.py(),
async move { inner.restore().await.infer_error() },
)
future_into_py(self_.py(), async move {
if let Some(version) = version {
inner.checkout(version).await.infer_error()?;
}
inner.restore().await.infer_error()
})
}
pub fn query(&self) -> Query {
@@ -440,6 +445,20 @@ impl Table {
})
}
pub fn add_columns_with_schema(
self_: PyRef<'_, Self>,
schema: PyArrowType<Schema>,
) -> PyResult<Bound<'_, PyAny>> {
let arrow_schema = &schema.0;
let transform = NewColumnTransform::AllNulls(Arc::new(arrow_schema.clone()));
let inner = self_.inner_ref()?.clone();
future_into_py(self_.py(), async move {
inner.add_columns(transform, None).await.infer_error()?;
Ok(())
})
}
pub fn alter_columns<'a>(
self_: PyRef<'a, Self>,
alterations: Vec<Bound<PyDict>>,

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb-node"
version = "0.18.2-beta.0"
version = "0.18.3-beta.0"
description = "Serverless, low-latency vector database for AI applications"
license.workspace = true
edition.workspace = true

View File

@@ -60,7 +60,7 @@ fn database_new(mut cx: FunctionContext) -> JsResult<JsPromise> {
let mut conn_builder = connect(&path).storage_options(storage_options);
if let Some(interval) = read_consistency_interval {
conn_builder = conn_builder.read_consistency_interval(interval);
conn_builder = conn_builder.read_consistency_interval(Some(interval));
}
rt.spawn(async move {
let database = conn_builder.execute().await;

View File

@@ -1,6 +1,6 @@
[package]
name = "lancedb"
version = "0.18.2-beta.0"
version = "0.18.3-beta.0"
edition.workspace = true
description = "LanceDB: A serverless, low-latency vector database for AI applications"
license.workspace = true

View File

@@ -12,7 +12,7 @@ use super::{
Catalog, CatalogOptions, CreateDatabaseMode, CreateDatabaseRequest, DatabaseNamesRequest,
OpenDatabaseRequest,
};
use crate::connection::ConnectRequest;
use crate::connection::{ConnectRequest, DEFAULT_READ_CONSISTENCY_INTERVAL};
use crate::database::listing::{ListingDatabase, ListingDatabaseOptions};
use crate::database::{Database, DatabaseOptions};
use crate::error::{CreateDirSnafu, Error, Result};
@@ -214,7 +214,7 @@ impl Catalog for ListingCatalog {
uri: db_uri,
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: None,
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
options: Default::default(),
};
@@ -241,7 +241,7 @@ impl Catalog for ListingCatalog {
uri: db_path.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: None,
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
options: Default::default(),
};
@@ -311,7 +311,7 @@ mod tests {
#[cfg(feature = "remote")]
client_config: Default::default(),
options: Default::default(),
read_consistency_interval: None,
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
};
let catalog = ListingCatalog::connect(&request).await.unwrap();

View File

@@ -36,6 +36,9 @@ pub use lance_encoding::version::LanceFileVersion;
#[cfg(feature = "remote")]
use lance_io::object_store::StorageOptions;
pub(crate) const DEFAULT_READ_CONSISTENCY_INTERVAL: Option<std::time::Duration> =
Some(std::time::Duration::from_secs(5));
/// A builder for configuring a [`Connection::table_names`] operation
pub struct TableNamesBuilder {
parent: Arc<dyn Database>,
@@ -618,14 +621,15 @@ pub struct ConnectRequest {
/// The interval at which to check for updates from other processes.
///
/// If None, then consistency is not checked. For performance
/// reasons, this is the default. For strong consistency, set this to
/// If None, then consistency is not checked. For strong consistency, set this to
/// zero seconds. Then every read will check for updates from other
/// processes. As a compromise, you can set this to a non-zero timedelta
/// for eventual consistency. If more than that interval has passed since
/// the last check, then the table will be checked for updates. Note: this
/// consistency only applies to read operations. Write operations are
/// always consistent.
///
/// The default is 5 seconds.
pub read_consistency_interval: Option<std::time::Duration>,
}
@@ -643,7 +647,7 @@ impl ConnectBuilder {
uri: uri.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: None,
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
options: HashMap::new(),
},
embedding_registry: None,
@@ -782,8 +786,7 @@ impl ConnectBuilder {
/// The interval at which to check for updates from other processes. This
/// only affects LanceDB OSS.
///
/// If left unset, consistency is not checked. For maximum read
/// performance, this is the default. For strong consistency, set this to
/// If left unset, consistency is not checked. For strong consistency, set this to
/// zero seconds. Then every read will check for updates from other processes.
/// As a compromise, set this to a non-zero duration for eventual consistency.
/// If more than that duration has passed since the last read, the read will
@@ -792,13 +795,15 @@ impl ConnectBuilder {
/// This only affects read operations. Write operations are always
/// consistent.
///
/// The default is 5 seconds.
///
/// LanceDB Cloud uses eventual consistency under the hood, and is not
/// currently configurable.
pub fn read_consistency_interval(
mut self,
read_consistency_interval: std::time::Duration,
read_consistency_interval: Option<std::time::Duration>,
) -> Self {
self.request.read_consistency_interval = Some(read_consistency_interval);
self.request.read_consistency_interval = read_consistency_interval;
self
}
@@ -882,7 +887,7 @@ impl CatalogConnectBuilder {
uri: uri.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: None,
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
options: HashMap::new(),
},
}

View File

@@ -579,6 +579,15 @@ pub trait ExecutableQuery {
) -> impl Future<Output = Result<SendableRecordBatchStream>> + Send;
fn explain_plan(&self, verbose: bool) -> impl Future<Output = Result<String>> + Send;
fn analyze_plan(&self) -> impl Future<Output = Result<String>> + Send {
self.analyze_plan_with_options(QueryExecutionOptions::default())
}
fn analyze_plan_with_options(
&self,
options: QueryExecutionOptions,
) -> impl Future<Output = Result<String>> + Send;
}
/// A query filter that can be applied to a query
@@ -765,6 +774,11 @@ impl ExecutableQuery for Query {
let query = AnyQuery::Query(self.request.clone());
self.parent.explain_plan(&query, verbose).await
}
async fn analyze_plan_with_options(&self, options: QueryExecutionOptions) -> Result<String> {
let query = AnyQuery::Query(self.request.clone());
self.parent.analyze_plan(&query, options).await
}
}
/// A request for a nearest-neighbors search into a table
@@ -1089,6 +1103,11 @@ impl ExecutableQuery for VectorQuery {
let query = AnyQuery::VectorQuery(self.request.clone());
self.parent.explain_plan(&query, verbose).await
}
async fn analyze_plan_with_options(&self, options: QueryExecutionOptions) -> Result<String> {
let query = AnyQuery::VectorQuery(self.request.clone());
self.parent.analyze_plan(&query, options).await
}
}
impl HasQuery for VectorQuery {
@@ -1370,6 +1389,31 @@ mod tests {
}
}
#[tokio::test]
async fn test_analyze_plan() {
let tmp_dir = tempdir().unwrap();
let table = make_test_table(&tmp_dir).await;
let result = table.query().analyze_plan().await.unwrap();
assert!(result.contains("metrics="));
}
#[tokio::test]
async fn test_analyze_plan_with_options() {
let tmp_dir = tempdir().unwrap();
let table = make_test_table(&tmp_dir).await;
let result = table
.query()
.analyze_plan_with_options(QueryExecutionOptions {
max_batch_length: 10,
..Default::default()
})
.await
.unwrap();
assert!(result.contains("metrics="));
}
fn assert_plan_exists(plan: &Arc<dyn ExecutionPlan>, name: &str) -> bool {
if plan.name() == name {
return true;

View File

@@ -325,24 +325,11 @@ impl<S: HttpSend> RemoteTable<S> {
) -> Result<Vec<Pin<Box<dyn RecordBatchStream + Send>>>> {
let request = self.client.post(&format!("/v1/table/{}/query/", self.name));
let version = self.current_version().await;
let mut body = serde_json::json!({ "version": version });
let requests = match query {
AnyQuery::Query(query) => {
Self::apply_query_params(&mut body, query)?;
// Empty vector can be passed if no vector search is performed.
body["vector"] = serde_json::Value::Array(Vec::new());
vec![request.json(&body)]
}
AnyQuery::VectorQuery(query) => {
let bodies = self.apply_vector_query_params(body, query)?;
bodies
.into_iter()
.map(|body| request.try_clone().unwrap().json(&body))
.collect()
}
};
let query_bodies = self.prepare_query_bodies(query).await?;
let requests: Vec<reqwest::RequestBuilder> = query_bodies
.into_iter()
.map(|body| request.try_clone().unwrap().json(&body))
.collect();
let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.client.send(req, true).await?;
@@ -351,6 +338,22 @@ impl<S: HttpSend> RemoteTable<S> {
let streams = futures::future::try_join_all(futures).await?;
Ok(streams)
}
async fn prepare_query_bodies(&self, query: &AnyQuery) -> Result<Vec<serde_json::Value>> {
let version = self.current_version().await;
let base_body = serde_json::json!({ "version": version });
match query {
AnyQuery::Query(query) => {
let mut body = base_body.clone();
Self::apply_query_params(&mut body, query)?;
// Empty vector can be passed if no vector search is performed.
body["vector"] = serde_json::Value::Array(Vec::new());
Ok(vec![body])
}
AnyQuery::VectorQuery(query) => self.apply_vector_query_params(base_body, query),
}
}
}
#[derive(Deserialize)]
@@ -422,10 +425,17 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
Ok(())
}
async fn restore(&self) -> Result<()> {
self.check_mutable().await?;
Err(Error::NotSupported {
message: "restore is not supported on LanceDB cloud.".into(),
})
let mut request = self
.client
.post(&format!("/v1/table/{}/restore/", self.name));
let version = self.current_version().await;
let body = serde_json::json!({ "version": version });
request = request.json(&body);
let (request_id, response) = self.client.send(request, true).await?;
self.check_table_response(&request_id, response).await?;
self.checkout_latest().await?;
Ok(())
}
async fn list_versions(&self) -> Result<Vec<Version>> {
@@ -559,6 +569,94 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
)?))
}
}
async fn explain_plan(&self, query: &AnyQuery, verbose: bool) -> Result<String> {
let base_request = self
.client
.post(&format!("/v1/table/{}/explain_plan/", self.name));
let query_bodies = self.prepare_query_bodies(query).await?;
let requests: Vec<reqwest::RequestBuilder> = query_bodies
.into_iter()
.map(|query_body| {
let explain_request = serde_json::json!({
"verbose": verbose,
"query": query_body
});
base_request.try_clone().unwrap().json(&explain_request)
})
.collect::<Vec<_>>();
let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.client.send(req, true).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await.err_to_http(request_id.clone())?;
serde_json::from_str(&body).map_err(|e| Error::Http {
source: format!("Failed to parse explain plan: {}", e).into(),
request_id,
status_code: None,
})
});
let plan_texts = futures::future::try_join_all(futures).await?;
let final_plan = if plan_texts.len() > 1 {
plan_texts
.into_iter()
.enumerate()
.map(|(i, plan)| format!("--- Plan #{} ---\n{}", i + 1, plan))
.collect::<Vec<_>>()
.join("\n\n")
} else {
plan_texts.into_iter().next().unwrap_or_default()
};
Ok(final_plan)
}
async fn analyze_plan(
&self,
query: &AnyQuery,
_options: QueryExecutionOptions,
) -> Result<String> {
let request = self
.client
.post(&format!("/v1/table/{}/analyze_plan/", self.name));
let query_bodies = self.prepare_query_bodies(query).await?;
let requests: Vec<reqwest::RequestBuilder> = query_bodies
.into_iter()
.map(|body| request.try_clone().unwrap().json(&body))
.collect();
let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.client.send(req, true).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await.err_to_http(request_id.clone())?;
serde_json::from_str(&body).map_err(|e| Error::Http {
source: format!("Failed to execute analyze plan: {}", e).into(),
request_id,
status_code: None,
})
});
let analyze_result_texts = futures::future::try_join_all(futures).await?;
let final_analyze = if analyze_result_texts.len() > 1 {
analyze_result_texts
.into_iter()
.enumerate()
.map(|(i, plan)| format!("--- Query #{} ---\n{}", i + 1, plan))
.collect::<Vec<_>>()
.join("\n\n")
} else {
analyze_result_texts.into_iter().next().unwrap_or_default()
};
Ok(final_analyze)
}
async fn update(&self, update: UpdateBuilder) -> Result<u64> {
self.check_mutable().await?;
let request = self
@@ -581,6 +679,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
Ok(0) // TODO: support returning number of modified rows once supported in SaaS.
}
async fn delete(&self, predicate: &str) -> Result<()> {
self.check_mutable().await?;
let body = serde_json::json!({ "predicate": predicate });

View File

@@ -33,7 +33,7 @@ use lance::dataset::{
use lance::dataset::{MergeInsertBuilder as LanceMergeInsertBuilder, WhenNotMatchedBySource};
use lance::index::vector::utils::infer_vector_dim;
use lance::io::WrappingObjectStore;
use lance_datafusion::exec::execute_plan;
use lance_datafusion::exec::{analyze_plan as lance_analyze_plan, execute_plan};
use lance_datafusion::utils::StreamingWriteSource;
use lance_index::vector::hnsw::builder::HnswBuildParams;
use lance_index::vector::ivf::IvfBuildParams;
@@ -433,6 +433,12 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
Ok(format!("{}", display.indent(verbose)))
}
async fn analyze_plan(
&self,
query: &AnyQuery,
options: QueryExecutionOptions,
) -> Result<String>;
/// Add new records to the table.
async fn add(
&self,
@@ -2192,6 +2198,15 @@ impl BaseTable for NativeTable {
self.generic_query(query, options).await
}
async fn analyze_plan(
&self,
query: &AnyQuery,
options: QueryExecutionOptions,
) -> Result<String> {
let plan = self.create_plan(query, options).await?;
Ok(lance_analyze_plan(plan, Default::default()).await?)
}
async fn merge_insert(
&self,
params: MergeInsertBuilder,
@@ -2611,7 +2626,7 @@ mod tests {
let dataset_path = tmp_dir.path().join("test.lance");
let uri = dataset_path.to_str().unwrap();
let conn = connect(uri)
.read_consistency_interval(Duration::from_secs(0))
.read_consistency_interval(Some(Duration::from_secs(0)))
.execute()
.await
.unwrap();
@@ -2694,7 +2709,7 @@ mod tests {
let dataset_path = tmp_dir.path().join("test.lance");
let uri = dataset_path.to_str().unwrap();
let conn = connect(uri)
.read_consistency_interval(Duration::from_secs(0))
.read_consistency_interval(Some(Duration::from_secs(0)))
.execute()
.await
.unwrap();
@@ -2891,7 +2906,7 @@ mod tests {
let dataset_path = tmp_dir.path().join("test.lance");
let uri = dataset_path.to_str().unwrap();
let conn = connect(uri)
.read_consistency_interval(Duration::from_secs(0))
.read_consistency_interval(Some(Duration::from_secs(0)))
.execute()
.await
.unwrap();
@@ -3462,7 +3477,8 @@ mod tests {
let mut conn2 = ConnectBuilder::new(uri);
if let Some(interval) = interval {
conn2 = conn2.read_consistency_interval(std::time::Duration::from_millis(interval));
conn2 = conn2
.read_consistency_interval(Some(std::time::Duration::from_millis(interval)));
}
let conn2 = conn2.execute().await.unwrap();
let table2 = conn2.open_table("my_table").execute().await.unwrap();
@@ -3498,7 +3514,7 @@ mod tests {
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Duration::from_secs(0))
.read_consistency_interval(Some(Duration::from_secs(0)))
.execute()
.await
.unwrap();
@@ -3519,7 +3535,7 @@ mod tests {
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Duration::from_secs(0))
.read_consistency_interval(Some(Duration::from_secs(0)))
.execute()
.await
.unwrap();
@@ -3594,7 +3610,7 @@ mod tests {
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Duration::from_secs(0))
.read_consistency_interval(Some(Duration::from_secs(0)))
.execute()
.await
.unwrap();
@@ -3656,7 +3672,7 @@ mod tests {
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Duration::from_secs(0))
.read_consistency_interval(Some(Duration::from_secs(0)))
.execute()
.await
.unwrap();

View File

@@ -7,6 +7,7 @@ use std::{
time::{self, Duration, Instant},
};
use futures::FutureExt;
use lance::Dataset;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
@@ -22,13 +23,16 @@ pub struct DatasetConsistencyWrapper(Arc<RwLock<DatasetRef>>);
///
/// The dataset is lazily loaded, and starts off as None. On the first access,
/// the dataset is loaded.
#[derive(Debug, Clone)]
#[derive(Debug)]
enum DatasetRef {
/// In this mode, the dataset is always the latest version.
Latest {
dataset: Dataset,
read_consistency_interval: Option<Duration>,
last_consistency_check: Option<time::Instant>,
/// A background task loading the next version of the dataset. This happens
/// in the background so as not to block the current thread.
refresh_task: Option<tokio::task::JoinHandle<Result<Dataset>>>,
},
/// In this mode, the dataset is a specific version. It cannot be mutated.
TimeTravel { dataset: Dataset, version: u64 },
@@ -41,9 +45,19 @@ impl DatasetRef {
Self::Latest {
dataset,
last_consistency_check,
refresh_task,
..
} => {
dataset.checkout_latest().await?;
// Replace the refresh task
if let Some(refresh_task) = refresh_task {
refresh_task.abort();
}
let mut new_dataset = dataset.clone();
refresh_task.replace(tokio::spawn(async move {
new_dataset.checkout_latest().await?;
Ok(new_dataset)
}));
last_consistency_check.replace(Instant::now());
}
Self::TimeTravel { dataset, version } => {
@@ -57,26 +71,24 @@ impl DatasetRef {
matches!(self, Self::Latest { .. })
}
async fn need_reload(&self) -> Result<bool> {
Ok(match self {
Self::Latest { dataset, .. } => {
dataset.latest_version_id().await? != dataset.version().version
}
Self::TimeTravel { dataset, version } => dataset.version().version != *version,
})
fn strong_consistency(&self) -> bool {
matches!(
self,
Self::Latest { read_consistency_interval: Some(interval), .. }
if interval.as_nanos() == 0
)
}
async fn as_latest(&mut self, read_consistency_interval: Option<Duration>) -> Result<()> {
match self {
Self::Latest { .. } => Ok(()),
Self::TimeTravel { dataset, .. } => {
dataset
.checkout_version(dataset.latest_version_id().await?)
.await?;
dataset.checkout_latest().await?;
*self = Self::Latest {
dataset: dataset.clone(),
read_consistency_interval,
last_consistency_check: Some(Instant::now()),
refresh_task: None,
};
Ok(())
}
@@ -114,13 +126,74 @@ impl DatasetRef {
match self {
Self::Latest {
dataset: ref mut ds,
refresh_task,
last_consistency_check,
..
} => {
*ds = dataset;
if let Some(refresh_task) = refresh_task {
refresh_task.abort();
}
*refresh_task = None;
*last_consistency_check = Some(Instant::now());
}
_ => unreachable!("Dataset should be in latest mode at this point"),
}
}
/// Wait for the background refresh task to complete.
async fn await_refresh(&mut self) -> Result<()> {
if let Self::Latest {
refresh_task: Some(refresh_task),
read_consistency_interval,
..
} = self
{
let dataset = refresh_task.await.expect("Refresh task panicked")?;
*self = Self::Latest {
dataset,
read_consistency_interval: *read_consistency_interval,
last_consistency_check: Some(Instant::now()),
refresh_task: None,
};
}
Ok(())
}
/// Check if background refresh task is done, and if so, update the dataset.
fn check_refresh(&mut self) -> Result<()> {
if let Self::Latest {
refresh_task: Some(refresh_task),
read_consistency_interval,
..
} = self
{
if refresh_task.is_finished() {
let dataset = refresh_task
.now_or_never()
.unwrap()
.expect("Refresh task panicked")?;
*self = Self::Latest {
dataset,
read_consistency_interval: *read_consistency_interval,
last_consistency_check: Some(Instant::now()),
refresh_task: None,
};
}
}
Ok(())
}
fn refresh_is_ready(&self) -> bool {
matches!(
self,
Self::Latest {
refresh_task: Some(refresh_task),
..
}
if refresh_task.is_finished()
)
}
}
impl DatasetConsistencyWrapper {
@@ -130,6 +203,7 @@ impl DatasetConsistencyWrapper {
dataset,
read_consistency_interval,
last_consistency_check: Some(Instant::now()),
refresh_task: None,
})))
}
@@ -188,18 +262,9 @@ impl DatasetConsistencyWrapper {
}
pub async fn reload(&self) -> Result<()> {
if !self.0.read().await.need_reload().await? {
return Ok(());
}
let mut write_guard = self.0.write().await;
// on lock escalation -- check if someone else has already reloaded
if !write_guard.need_reload().await? {
return Ok(());
}
// actually need reloading
write_guard.reload().await
write_guard.reload().await?;
write_guard.await_refresh().await
}
/// Returns the version, if in time travel mode, or None otherwise
@@ -245,9 +310,26 @@ impl DatasetConsistencyWrapper {
/// Ensures that the dataset is loaded and up-to-date with consistency and
/// version parameters.
async fn ensure_up_to_date(&self) -> Result<()> {
// We may have previously created a background task to fetch the new
// version of the dataset. If that task is done, we should update the
// dataset.
{
let read_guard = self.0.read().await;
if read_guard.refresh_is_ready() {
drop(read_guard);
self.0.write().await.check_refresh()?;
}
}
if !self.is_up_to_date().await? {
self.reload().await?;
}
// If we are in strong consistency mode, we should await the refresh task.
if self.0.read().await.strong_consistency() {
self.0.write().await.await_refresh().await?;
}
Ok(())
}
}