mirror of
https://github.com/lancedb/lancedb.git
synced 2025-12-25 14:29:56 +00:00
Compare commits
9 Commits
python-v0.
...
v0.19.0-be
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e67cd0baf9 | ||
|
|
26dab93f2a | ||
|
|
b9bdb8d937 | ||
|
|
a1d1833a40 | ||
|
|
a547c523c2 | ||
|
|
dc8b75feab | ||
|
|
c1600cdc06 | ||
|
|
f5dee46970 | ||
|
|
346cbf8bf7 |
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.18.2-beta.0"
|
||||
current_version = "0.19.0-beta.0"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
101
Cargo.lock
generated
101
Cargo.lock
generated
@@ -390,9 +390,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "async-compression"
|
||||
version = "0.4.21"
|
||||
version = "0.4.22"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c0cf008e5e1a9e9e22a7d3c9a4992e21a350290069e36d8fb72304ed17e8f2d2"
|
||||
checksum = "59a194f9d963d8099596278594b3107448656ba73831c9d8c783e613ce86da64"
|
||||
dependencies = [
|
||||
"flate2",
|
||||
"futures-core",
|
||||
@@ -512,9 +512,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
|
||||
|
||||
[[package]]
|
||||
name = "aws-config"
|
||||
version = "1.6.0"
|
||||
version = "1.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6a84fe2c5e9965fba0fbc2001db252f1d57527d82a905cca85127df227bca748"
|
||||
checksum = "8c39646d1a6b51240a1a23bb57ea4eebede7e16fbc237fdc876980233dcecb4f"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -603,9 +603,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-bedrockruntime"
|
||||
version = "1.77.0"
|
||||
version = "1.78.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4198493316dab97e1fed7716f3823462b73a34c518f4ee7b9799921645e232e5"
|
||||
checksum = "6457fd617f20075dd2d5c9f6c3cb09815e6b83ab61c55499055499da9e04478d"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -627,9 +627,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-dynamodb"
|
||||
version = "1.69.0"
|
||||
version = "1.70.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c42f454f50a050aaa3f3d200a3ac072e48c18c4bb5356c38be7eee1da1439a43"
|
||||
checksum = "4ac281113af7f8700394bf25eb272b842b7ca088810e96c928f812282f2e6f44"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -650,9 +650,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-kms"
|
||||
version = "1.63.0"
|
||||
version = "1.64.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a971bfe62ca4a228627a1b74a87a7a142979b20b168d2e2884f4893212ebb715"
|
||||
checksum = "c23289881f4071421bbef3688ca43501c4fede796e0cbca942a54f0eb6906fbc"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -664,6 +664,7 @@ dependencies = [
|
||||
"aws-smithy-types",
|
||||
"aws-types",
|
||||
"bytes",
|
||||
"fastrand",
|
||||
"http 0.2.12",
|
||||
"once_cell",
|
||||
"regex-lite",
|
||||
@@ -672,9 +673,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-s3"
|
||||
version = "1.79.0"
|
||||
version = "1.80.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a8f63ba8f5fca32061c7d62d866ef65470edde38d4c5f8a0ebb8ff40a0521e1c"
|
||||
checksum = "3a36b09e8273d89c4f35ea122b83b30e48f906f3b644460d72a7d3656d1be93d"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -707,9 +708,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-sso"
|
||||
version = "1.62.0"
|
||||
version = "1.64.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1d5330ad4e8a1ff49e9f26b738611caa72b105c41d41733801d1a36e8f9de936"
|
||||
checksum = "02d4bdb0e5f80f0689e61c77ab678b2b9304af329616af38aef5b6b967b8e736"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -721,6 +722,7 @@ dependencies = [
|
||||
"aws-smithy-types",
|
||||
"aws-types",
|
||||
"bytes",
|
||||
"fastrand",
|
||||
"http 0.2.12",
|
||||
"once_cell",
|
||||
"regex-lite",
|
||||
@@ -729,9 +731,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-ssooidc"
|
||||
version = "1.63.0"
|
||||
version = "1.65.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7956b1a85d49082347a7d17daa2e32df191f3e23c03d47294b99f95413026a78"
|
||||
checksum = "acbbb3ce8da257aedbccdcb1aadafbbb6a5fe9adf445db0e1ea897bdc7e22d08"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -743,6 +745,7 @@ dependencies = [
|
||||
"aws-smithy-types",
|
||||
"aws-types",
|
||||
"bytes",
|
||||
"fastrand",
|
||||
"http 0.2.12",
|
||||
"once_cell",
|
||||
"regex-lite",
|
||||
@@ -751,9 +754,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-sdk-sts"
|
||||
version = "1.63.0"
|
||||
version = "1.65.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "065c533fbe6f84962af33fcf02b0350b7c1f79285baab5924615d2be3b232855"
|
||||
checksum = "96a78a8f50a1630db757b60f679c8226a8a70ee2ab5f5e6e51dc67f6c61c7cfd"
|
||||
dependencies = [
|
||||
"aws-credential-types",
|
||||
"aws-runtime",
|
||||
@@ -766,6 +769,7 @@ dependencies = [
|
||||
"aws-smithy-types",
|
||||
"aws-smithy-xml",
|
||||
"aws-types",
|
||||
"fastrand",
|
||||
"http 0.2.12",
|
||||
"once_cell",
|
||||
"regex-lite",
|
||||
@@ -869,9 +873,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-http-client"
|
||||
version = "1.0.0"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0497ef5d53065b7cd6a35e9c1654bd1fefeae5c52900d91d1b188b0af0f29324"
|
||||
checksum = "8aff1159006441d02e57204bf57a1b890ba68bedb6904ffd2873c1c4c11c546b"
|
||||
dependencies = [
|
||||
"aws-smithy-async",
|
||||
"aws-smithy-runtime-api",
|
||||
@@ -904,6 +908,16 @@ dependencies = [
|
||||
"aws-smithy-types",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-observability"
|
||||
version = "0.1.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "445d065e76bc1ef54963db400319f1dd3ebb3e0a74af20f7f7630625b0cc7cc0"
|
||||
dependencies = [
|
||||
"aws-smithy-runtime-api",
|
||||
"once_cell",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-query"
|
||||
version = "0.60.7"
|
||||
@@ -916,13 +930,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "aws-smithy-runtime"
|
||||
version = "1.8.0"
|
||||
version = "1.8.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f6328865e36c6fd970094ead6b05efd047d3a80ec5fc3be5e743910da9f2ebf8"
|
||||
checksum = "0152749e17ce4d1b47c7747bdfec09dac1ccafdcbc741ebf9daa2a373356730f"
|
||||
dependencies = [
|
||||
"aws-smithy-async",
|
||||
"aws-smithy-http",
|
||||
"aws-smithy-http-client",
|
||||
"aws-smithy-observability",
|
||||
"aws-smithy-runtime-api",
|
||||
"aws-smithy-types",
|
||||
"bytes",
|
||||
@@ -2537,9 +2552,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "event-listener-strategy"
|
||||
version = "0.5.3"
|
||||
version = "0.5.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2"
|
||||
checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93"
|
||||
dependencies = [
|
||||
"event-listener 5.4.0",
|
||||
"pin-project-lite",
|
||||
@@ -3367,9 +3382,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "icu_locid_transform_data"
|
||||
version = "1.5.0"
|
||||
version = "1.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fdc8ff3388f852bede6b579ad4e978ab004f139284d7b28715f773507b946f6e"
|
||||
checksum = "7515e6d781098bf9f7205ab3fc7e9709d34554ae0b21ddbcb5febfa4bc7df11d"
|
||||
|
||||
[[package]]
|
||||
name = "icu_normalizer"
|
||||
@@ -3391,9 +3406,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "icu_normalizer_data"
|
||||
version = "1.5.0"
|
||||
version = "1.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f8cafbf7aa791e9b22bec55a167906f9e1215fd475cd22adfcf660e03e989516"
|
||||
checksum = "c5e8338228bdc8ab83303f16b797e177953730f601a96c25d10cb3ab0daa0cb7"
|
||||
|
||||
[[package]]
|
||||
name = "icu_properties"
|
||||
@@ -3412,9 +3427,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "icu_properties_data"
|
||||
version = "1.5.0"
|
||||
version = "1.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "67a8effbc3dd3e4ba1afa8ad918d5684b8868b3b26500753effea8d2eed19569"
|
||||
checksum = "85fb8799753b75aee8d2a21d7c14d9f38921b54b3dbda10f5a3c7a7b82dba5e2"
|
||||
|
||||
[[package]]
|
||||
name = "icu_provider"
|
||||
@@ -4035,7 +4050,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb"
|
||||
version = "0.18.2-beta.0"
|
||||
version = "0.18.3-beta.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4122,7 +4137,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-node"
|
||||
version = "0.18.2-beta.0"
|
||||
version = "0.18.3-beta.0"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-ipc",
|
||||
@@ -4147,7 +4162,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-nodejs"
|
||||
version = "0.18.2-beta.0"
|
||||
version = "0.18.3-beta.0"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-ipc",
|
||||
@@ -4165,7 +4180,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lancedb-python"
|
||||
version = "0.21.2-beta.0"
|
||||
version = "0.21.3-beta.0"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"env_logger",
|
||||
@@ -4902,9 +4917,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
version = "1.21.1"
|
||||
version = "1.21.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d75b0bedcc4fe52caa0e03d9f1151a323e4aa5e2d78ba3580400cd3c9e2bc4bc"
|
||||
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
|
||||
|
||||
[[package]]
|
||||
name = "oneshot"
|
||||
@@ -5727,9 +5742,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.37.2"
|
||||
version = "0.37.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "165859e9e55f79d67b96c5d96f4e88b6f2695a1972849c15a6a3f5c59fc2c003"
|
||||
checksum = "bf763ab1c7a3aa408be466efc86efe35ed1bd3dd74173ed39d6b0d0a6f0ba148"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
"serde",
|
||||
@@ -5777,9 +5792,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "quinn-udp"
|
||||
version = "0.5.10"
|
||||
version = "0.5.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e46f3055866785f6b92bc6164b76be02ca8f2eb4b002c0354b28cf4c119e5944"
|
||||
checksum = "541d0f57c6ec747a90738a52741d3221f7960e8ac2f0ff4b1a63680e033b4ab5"
|
||||
dependencies = [
|
||||
"cfg_aliases",
|
||||
"libc",
|
||||
@@ -6269,7 +6284,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"ring",
|
||||
"rustls-pki-types",
|
||||
"rustls-webpki 0.103.0",
|
||||
"rustls-webpki 0.103.1",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
@@ -6337,9 +6352,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
version = "0.103.0"
|
||||
version = "0.103.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0aa4eeac2588ffff23e9d7a7e9b3f971c5fb5b7ebc9452745e0c232c64f83b2f"
|
||||
checksum = "fef8b8769aaccf73098557a87cd1816b4f9c7c16811c9c77142aa695c16f2c03"
|
||||
dependencies = [
|
||||
"aws-lc-rs",
|
||||
"ring",
|
||||
|
||||
@@ -1001,9 +1001,11 @@ In LanceDB OSS, users can set the `read_consistency_interval` parameter on conne
|
||||
|
||||
There are three possible settings for `read_consistency_interval`:
|
||||
|
||||
1. **Unset (default)**: The database does not check for updates to tables made by other processes. This provides the best query performance, but means that clients may not see the most up-to-date data. This setting is suitable for applications where the data does not change during the lifetime of the table reference.
|
||||
2. **Zero seconds (Strong consistency)**: The database checks for updates on every read. This provides the strongest consistency guarantees, ensuring that all clients see the latest committed data. However, it has the most overhead. This setting is suitable when consistency matters more than having high QPS.
|
||||
3. **Custom interval (Eventual consistency)**: The database checks for updates at a custom interval, such as every 5 seconds. This provides eventual consistency, allowing for some lag between write and read operations. Performance wise, this is a middle ground between strong consistency and no consistency check. This setting is suitable for applications where immediate consistency is not critical, but clients should see updated data eventually.
|
||||
1. **Unset**: The database does not check for updates to tables made by other processes. This setting is suitable for applications where the data does not change during the lifetime of the table reference.
|
||||
2. **Zero seconds (Strong consistency)**: The database checks for updates on every read. This provides the strongest consistency guarantees, ensuring that all clients see the latest committed data. However, it has the most overhead. This setting is suitable when consistency matters more than having high QPS. For best performance, combine this setting with the storage option `new_table_enable_v2_manifest_paths` set to `true`.
|
||||
3. **Custom interval (Eventual consistency, the default)**: The database checks for updates at a custom interval. By default, this is every 5 seconds. This provides eventual consistency, allowing for some lag between write and read operations. Performance wise, this is a middle ground between strong consistency and no consistency check. This setting is suitable for applications where immediate consistency is not critical, but clients should see updated data eventually.
|
||||
|
||||
You can always force a synchronization by calling `checkout_latest()` / `checkoutLatest()` on a table.
|
||||
|
||||
!!! tip "Consistency in LanceDB Cloud"
|
||||
|
||||
@@ -1041,7 +1043,21 @@ There are three possible settings for `read_consistency_interval`:
|
||||
--8<-- "python/python/tests/docs/test_guide_tables.py:table_async_eventual_consistency"
|
||||
```
|
||||
|
||||
By default, a `Table` will never check for updates from other writers. To manually check for updates you can use `checkout_latest`:
|
||||
For no consistency, use `None`:
|
||||
|
||||
=== "Sync API"
|
||||
|
||||
```python
|
||||
--8<-- "python/python/tests/docs/test_guide_tables.py:table_no_consistency"
|
||||
```
|
||||
|
||||
=== "Async API"
|
||||
|
||||
```python
|
||||
--8<-- "python/python/tests/docs/test_guide_tables.py:table_async_no_consistency"
|
||||
```
|
||||
|
||||
To manually check for updates you can use `checkout_latest`:
|
||||
|
||||
=== "Sync API"
|
||||
|
||||
@@ -1059,15 +1075,25 @@ There are three possible settings for `read_consistency_interval`:
|
||||
To set strong consistency, use `0`:
|
||||
|
||||
```ts
|
||||
const db = await lancedb.connect({ uri: "./.lancedb", readConsistencyInterval: 0 });
|
||||
const tbl = await db.openTable("my_table");
|
||||
--8<-- "nodejs/examples/basic.test.ts:table_strong_consistency"
|
||||
```
|
||||
|
||||
For eventual consistency, specify the update interval as seconds:
|
||||
|
||||
```ts
|
||||
const db = await lancedb.connect({ uri: "./.lancedb", readConsistencyInterval: 5 });
|
||||
const tbl = await db.openTable("my_table");
|
||||
--8<-- "nodejs/examples/basic.test.ts:table_eventual_consistency"
|
||||
```
|
||||
|
||||
For no consistency, use `null`:
|
||||
|
||||
```ts
|
||||
--8<-- "nodejs/examples/basic.test.ts:table_no_consistency"
|
||||
```
|
||||
|
||||
To manually check for updates you can use `checkoutLatest`:
|
||||
|
||||
```ts
|
||||
--8<-- "nodejs/examples/basic.test.ts:table_checkout_latest"
|
||||
```
|
||||
|
||||
<!-- Node doesn't yet support the version time travel: https://github.com/lancedb/lancedb/issues/1007
|
||||
|
||||
@@ -30,6 +30,53 @@ protected inner: Query | Promise<Query>;
|
||||
|
||||
## Methods
|
||||
|
||||
### analyzePlan()
|
||||
|
||||
```ts
|
||||
analyzePlan(): Promise<string>
|
||||
```
|
||||
|
||||
Executes the query and returns the physical query plan annotated with runtime metrics.
|
||||
|
||||
This is useful for debugging and performance analysis, as it shows how the query was executed
|
||||
and includes metrics such as elapsed time, rows processed, and I/O statistics.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`string`>
|
||||
|
||||
A query execution plan with runtime metrics for each step.
|
||||
|
||||
#### Example
|
||||
|
||||
```ts
|
||||
import * as lancedb from "@lancedb/lancedb"
|
||||
|
||||
const db = await lancedb.connect("./.lancedb");
|
||||
const table = await db.createTable("my_table", [
|
||||
{ vector: [1.1, 0.9], id: "1" },
|
||||
]);
|
||||
|
||||
const plan = await table.query().nearestTo([0.5, 0.2]).analyzePlan();
|
||||
|
||||
Example output (with runtime metrics inlined):
|
||||
AnalyzeExec verbose=true, metrics=[]
|
||||
ProjectionExec: expr=[id@3 as id, vector@0 as vector, _distance@2 as _distance], metrics=[output_rows=1, elapsed_compute=3.292µs]
|
||||
Take: columns="vector, _rowid, _distance, (id)", metrics=[output_rows=1, elapsed_compute=66.001µs, batches_processed=1, bytes_read=8, iops=1, requests=1]
|
||||
CoalesceBatchesExec: target_batch_size=1024, metrics=[output_rows=1, elapsed_compute=3.333µs]
|
||||
GlobalLimitExec: skip=0, fetch=10, metrics=[output_rows=1, elapsed_compute=167ns]
|
||||
FilterExec: _distance@2 IS NOT NULL, metrics=[output_rows=1, elapsed_compute=8.542µs]
|
||||
SortExec: TopK(fetch=10), expr=[_distance@2 ASC NULLS LAST], metrics=[output_rows=1, elapsed_compute=63.25µs, row_replacements=1]
|
||||
KNNVectorDistance: metric=l2, metrics=[output_rows=1, elapsed_compute=114.333µs, output_batches=1]
|
||||
LanceScan: uri=/path/to/data, projection=[vector], row_id=true, row_addr=false, ordered=false, metrics=[output_rows=1, elapsed_compute=103.626µs, bytes_read=549, iops=2, requests=2]
|
||||
```
|
||||
|
||||
#### Inherited from
|
||||
|
||||
[`QueryBase`](QueryBase.md).[`analyzePlan`](QueryBase.md#analyzeplan)
|
||||
|
||||
***
|
||||
|
||||
### execute()
|
||||
|
||||
```ts
|
||||
|
||||
@@ -36,6 +36,49 @@ protected inner: NativeQueryType | Promise<NativeQueryType>;
|
||||
|
||||
## Methods
|
||||
|
||||
### analyzePlan()
|
||||
|
||||
```ts
|
||||
analyzePlan(): Promise<string>
|
||||
```
|
||||
|
||||
Executes the query and returns the physical query plan annotated with runtime metrics.
|
||||
|
||||
This is useful for debugging and performance analysis, as it shows how the query was executed
|
||||
and includes metrics such as elapsed time, rows processed, and I/O statistics.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`string`>
|
||||
|
||||
A query execution plan with runtime metrics for each step.
|
||||
|
||||
#### Example
|
||||
|
||||
```ts
|
||||
import * as lancedb from "@lancedb/lancedb"
|
||||
|
||||
const db = await lancedb.connect("./.lancedb");
|
||||
const table = await db.createTable("my_table", [
|
||||
{ vector: [1.1, 0.9], id: "1" },
|
||||
]);
|
||||
|
||||
const plan = await table.query().nearestTo([0.5, 0.2]).analyzePlan();
|
||||
|
||||
Example output (with runtime metrics inlined):
|
||||
AnalyzeExec verbose=true, metrics=[]
|
||||
ProjectionExec: expr=[id@3 as id, vector@0 as vector, _distance@2 as _distance], metrics=[output_rows=1, elapsed_compute=3.292µs]
|
||||
Take: columns="vector, _rowid, _distance, (id)", metrics=[output_rows=1, elapsed_compute=66.001µs, batches_processed=1, bytes_read=8, iops=1, requests=1]
|
||||
CoalesceBatchesExec: target_batch_size=1024, metrics=[output_rows=1, elapsed_compute=3.333µs]
|
||||
GlobalLimitExec: skip=0, fetch=10, metrics=[output_rows=1, elapsed_compute=167ns]
|
||||
FilterExec: _distance@2 IS NOT NULL, metrics=[output_rows=1, elapsed_compute=8.542µs]
|
||||
SortExec: TopK(fetch=10), expr=[_distance@2 ASC NULLS LAST], metrics=[output_rows=1, elapsed_compute=63.25µs, row_replacements=1]
|
||||
KNNVectorDistance: metric=l2, metrics=[output_rows=1, elapsed_compute=114.333µs, output_batches=1]
|
||||
LanceScan: uri=/path/to/data, projection=[vector], row_id=true, row_addr=false, ordered=false, metrics=[output_rows=1, elapsed_compute=103.626µs, bytes_read=549, iops=2, requests=2]
|
||||
```
|
||||
|
||||
***
|
||||
|
||||
### execute()
|
||||
|
||||
```ts
|
||||
|
||||
@@ -48,6 +48,53 @@ addQueryVector(vector): VectorQuery
|
||||
|
||||
***
|
||||
|
||||
### analyzePlan()
|
||||
|
||||
```ts
|
||||
analyzePlan(): Promise<string>
|
||||
```
|
||||
|
||||
Executes the query and returns the physical query plan annotated with runtime metrics.
|
||||
|
||||
This is useful for debugging and performance analysis, as it shows how the query was executed
|
||||
and includes metrics such as elapsed time, rows processed, and I/O statistics.
|
||||
|
||||
#### Returns
|
||||
|
||||
`Promise`<`string`>
|
||||
|
||||
A query execution plan with runtime metrics for each step.
|
||||
|
||||
#### Example
|
||||
|
||||
```ts
|
||||
import * as lancedb from "@lancedb/lancedb"
|
||||
|
||||
const db = await lancedb.connect("./.lancedb");
|
||||
const table = await db.createTable("my_table", [
|
||||
{ vector: [1.1, 0.9], id: "1" },
|
||||
]);
|
||||
|
||||
const plan = await table.query().nearestTo([0.5, 0.2]).analyzePlan();
|
||||
|
||||
Example output (with runtime metrics inlined):
|
||||
AnalyzeExec verbose=true, metrics=[]
|
||||
ProjectionExec: expr=[id@3 as id, vector@0 as vector, _distance@2 as _distance], metrics=[output_rows=1, elapsed_compute=3.292µs]
|
||||
Take: columns="vector, _rowid, _distance, (id)", metrics=[output_rows=1, elapsed_compute=66.001µs, batches_processed=1, bytes_read=8, iops=1, requests=1]
|
||||
CoalesceBatchesExec: target_batch_size=1024, metrics=[output_rows=1, elapsed_compute=3.333µs]
|
||||
GlobalLimitExec: skip=0, fetch=10, metrics=[output_rows=1, elapsed_compute=167ns]
|
||||
FilterExec: _distance@2 IS NOT NULL, metrics=[output_rows=1, elapsed_compute=8.542µs]
|
||||
SortExec: TopK(fetch=10), expr=[_distance@2 ASC NULLS LAST], metrics=[output_rows=1, elapsed_compute=63.25µs, row_replacements=1]
|
||||
KNNVectorDistance: metric=l2, metrics=[output_rows=1, elapsed_compute=114.333µs, output_batches=1]
|
||||
LanceScan: uri=/path/to/data, projection=[vector], row_id=true, row_addr=false, ordered=false, metrics=[output_rows=1, elapsed_compute=103.626µs, bytes_read=549, iops=2, requests=2]
|
||||
```
|
||||
|
||||
#### Inherited from
|
||||
|
||||
[`QueryBase`](QueryBase.md).[`analyzePlan`](QueryBase.md#analyzeplan)
|
||||
|
||||
***
|
||||
|
||||
### bypassVectorIndex()
|
||||
|
||||
```ts
|
||||
|
||||
@@ -44,7 +44,7 @@ for testing purposes.
|
||||
### readConsistencyInterval?
|
||||
|
||||
```ts
|
||||
optional readConsistencyInterval: number;
|
||||
optional readConsistencyInterval: null | number;
|
||||
```
|
||||
|
||||
(For LanceDB OSS only): The interval, in seconds, at which to check for
|
||||
|
||||
@@ -11,6 +11,7 @@ likely that someone who knows the answer will see your question.
|
||||
## Common issues
|
||||
|
||||
* Multiprocessing with `fork` is not supported. You should use `spawn` instead.
|
||||
* Data returned by queries may not reflect the most recent writes, depending on configuration. LanceDB uses eventual consistency by default. See [consistency](/docs/src/guides/tables.md#consistency) for more information.
|
||||
|
||||
## Enabling logging
|
||||
|
||||
@@ -35,3 +36,9 @@ print the resolved query plan. You can use the `explain_plan` method to do this:
|
||||
* Python Sync: [LanceQueryBuilder.explain_plan][lancedb.query.LanceQueryBuilder.explain_plan]
|
||||
* Python Async: [AsyncQueryBase.explain_plan][lancedb.query.AsyncQueryBase.explain_plan]
|
||||
* Node @lancedb/lancedb: [LanceQueryBuilder.explainPlan](/lancedb/js/classes/QueryBase/#explainplan)
|
||||
|
||||
To understand how a query was actually executed—including metrics like execution time, number of rows processed, I/O stats, and more—use the analyze_plan method. This executes the query and returns a physical execution plan annotated with runtime metrics, making it especially helpful for performance tuning and debugging.
|
||||
|
||||
* Python Sync: [LanceQueryBuilder.analyze_plan][lancedb.query.LanceQueryBuilder.analyze_plan]
|
||||
* Python Async: [AsyncQueryBase.analyze_plan][lancedb.query.AsyncQueryBase.analyze_plan]
|
||||
* Node @lancedb/lancedb: [LanceQueryBuilder.analyzePlan](/lancedb/js/classes/QueryBase/#analyzePlan)
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
<parent>
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.18.2-beta.0</version>
|
||||
<version>0.19.0-beta.0</version>
|
||||
<relativePath>../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
<groupId>com.lancedb</groupId>
|
||||
<artifactId>lancedb-parent</artifactId>
|
||||
<version>0.18.2-beta.0</version>
|
||||
<version>0.19.0-beta.0</version>
|
||||
<packaging>pom</packaging>
|
||||
|
||||
<name>LanceDB Parent</name>
|
||||
|
||||
44
node/package-lock.json
generated
44
node/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "vectordb",
|
||||
"version": "0.18.2-beta.0",
|
||||
"version": "0.18.3-beta.0",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "vectordb",
|
||||
"version": "0.18.2-beta.0",
|
||||
"version": "0.18.3-beta.0",
|
||||
"cpu": [
|
||||
"x64",
|
||||
"arm64"
|
||||
@@ -52,11 +52,11 @@
|
||||
"uuid": "^9.0.0"
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"@lancedb/vectordb-darwin-arm64": "0.18.2-beta.0",
|
||||
"@lancedb/vectordb-darwin-x64": "0.18.2-beta.0",
|
||||
"@lancedb/vectordb-linux-arm64-gnu": "0.18.2-beta.0",
|
||||
"@lancedb/vectordb-linux-x64-gnu": "0.18.2-beta.0",
|
||||
"@lancedb/vectordb-win32-x64-msvc": "0.18.2-beta.0"
|
||||
"@lancedb/vectordb-darwin-arm64": "0.18.3-beta.0",
|
||||
"@lancedb/vectordb-darwin-x64": "0.18.3-beta.0",
|
||||
"@lancedb/vectordb-linux-arm64-gnu": "0.18.3-beta.0",
|
||||
"@lancedb/vectordb-linux-x64-gnu": "0.18.3-beta.0",
|
||||
"@lancedb/vectordb-win32-x64-msvc": "0.18.3-beta.0"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"@apache-arrow/ts": "^14.0.2",
|
||||
@@ -327,9 +327,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-darwin-arm64": {
|
||||
"version": "0.18.2-beta.0",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.18.2-beta.0.tgz",
|
||||
"integrity": "sha512-FzIcElkS6R5I5kU1S5m7yLVTB1Duv1XcmZQtVmYl/JjNlfxS1WTtMzdzMqSBFohDcgU2Tkc5+1FpK1B94dUUbg==",
|
||||
"version": "0.18.3-beta.0",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-arm64/-/vectordb-darwin-arm64-0.18.3-beta.0.tgz",
|
||||
"integrity": "sha512-dhJ5VlXV2N/L67mIpTSePhb8krX0FyQgpuz3I+4T4vYuU5JEF3cmedQ5TF5+3cGJhZim4PHRYLkfgCyTlxcqUg==",
|
||||
"cpu": [
|
||||
"arm64"
|
||||
],
|
||||
@@ -340,9 +340,9 @@
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-darwin-x64": {
|
||||
"version": "0.18.2-beta.0",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.18.2-beta.0.tgz",
|
||||
"integrity": "sha512-jv+XludfLNBDm1DjdqyghwDMtd4E+ygwycQpkpK72wyZSh6Qytrgq+4dNi/zCZ3UChFLbKbIxrVxv9yENQn2Pg==",
|
||||
"version": "0.18.3-beta.0",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-darwin-x64/-/vectordb-darwin-x64-0.18.3-beta.0.tgz",
|
||||
"integrity": "sha512-SHqPkuyfe87d5skf9GERzdeu6AKvVIbXMUwl5N+dVrE7HH6qiuP2HvOmiyHS2lJFgo0Ph8jSBVzPDxxtjF36Dg==",
|
||||
"cpu": [
|
||||
"x64"
|
||||
],
|
||||
@@ -353,9 +353,9 @@
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-linux-arm64-gnu": {
|
||||
"version": "0.18.2-beta.0",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.18.2-beta.0.tgz",
|
||||
"integrity": "sha512-8/fBpbNYhhpetf/pZv0DyPnQkeAbsiICMyCoRiNu5auvQK4AsGF1XvLWrDi68u9F0GysBKvuatYuGqa/yh+Anw==",
|
||||
"version": "0.18.3-beta.0",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-arm64-gnu/-/vectordb-linux-arm64-gnu-0.18.3-beta.0.tgz",
|
||||
"integrity": "sha512-ohnWsV1n9cxL5ik/GGL4FdQ04Ff9REELcNb1zgmJYyEfwyc6TH9m5HdySO/1ACPZJiLbML4gSvZ10J0Zyb+2SA==",
|
||||
"cpu": [
|
||||
"arm64"
|
||||
],
|
||||
@@ -366,9 +366,9 @@
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-linux-x64-gnu": {
|
||||
"version": "0.18.2-beta.0",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.18.2-beta.0.tgz",
|
||||
"integrity": "sha512-7a1Kc/2V2ff4HlLzXyXVdK0Z0VIFUt50v2SBRdlcycJ0NLW9ZqV+9UjB/NAOwMXVgYd7d3rKjACGkQzkpvcyeg==",
|
||||
"version": "0.18.3-beta.0",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-linux-x64-gnu/-/vectordb-linux-x64-gnu-0.18.3-beta.0.tgz",
|
||||
"integrity": "sha512-nhbW2CKaBSUesiYCPBd9fAsDYIJLadlGsrb2gfjODlFy+2Lpnbz6T9SuV7dNqj6KBw+KHhaRhLqta7tyMZm/EA==",
|
||||
"cpu": [
|
||||
"x64"
|
||||
],
|
||||
@@ -379,9 +379,9 @@
|
||||
]
|
||||
},
|
||||
"node_modules/@lancedb/vectordb-win32-x64-msvc": {
|
||||
"version": "0.18.2-beta.0",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.18.2-beta.0.tgz",
|
||||
"integrity": "sha512-EeCiSf2RtJMESnkIca28GI6rAStYj2q9sVIyNCXpmIZSkJVpfQ3iswHGAbHrEfaPl0J1Re9cnRHLLuqkumwiIQ==",
|
||||
"version": "0.18.3-beta.0",
|
||||
"resolved": "https://registry.npmjs.org/@lancedb/vectordb-win32-x64-msvc/-/vectordb-win32-x64-msvc-0.18.3-beta.0.tgz",
|
||||
"integrity": "sha512-VE4TvMdZ7DIrTC8VYylGxEcH4h2UEejSwGX4PxRzrN9QsCQ4m4pOh3L/UguSO3g+Y1QEaGE20iWQoX6wgSEUhA==",
|
||||
"cpu": [
|
||||
"x64"
|
||||
],
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "vectordb",
|
||||
"version": "0.18.2-beta.0",
|
||||
"version": "0.19.0-beta.0",
|
||||
"description": " Serverless, low-latency vector database for AI applications",
|
||||
"private": false,
|
||||
"main": "dist/index.js",
|
||||
@@ -89,10 +89,10 @@
|
||||
}
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"@lancedb/vectordb-darwin-x64": "0.18.2-beta.0",
|
||||
"@lancedb/vectordb-darwin-arm64": "0.18.2-beta.0",
|
||||
"@lancedb/vectordb-linux-x64-gnu": "0.18.2-beta.0",
|
||||
"@lancedb/vectordb-linux-arm64-gnu": "0.18.2-beta.0",
|
||||
"@lancedb/vectordb-win32-x64-msvc": "0.18.2-beta.0"
|
||||
"@lancedb/vectordb-darwin-x64": "0.19.0-beta.0",
|
||||
"@lancedb/vectordb-darwin-arm64": "0.19.0-beta.0",
|
||||
"@lancedb/vectordb-linux-x64-gnu": "0.19.0-beta.0",
|
||||
"@lancedb/vectordb-linux-arm64-gnu": "0.19.0-beta.0",
|
||||
"@lancedb/vectordb-win32-x64-msvc": "0.19.0-beta.0"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -110,7 +110,7 @@ describe('LanceDB Mirrored Store Integration test', function () {
|
||||
|
||||
fs.readdir(path.join(mirroredPath, 'data'), { withFileTypes: true }, (err, files) => {
|
||||
if (err != null) throw err
|
||||
assert.equal(files.length, 1)
|
||||
assert.equal(files.length, 1, `Found files: ${files.map(f => f.name)}`)
|
||||
assert.isTrue(files[0].name.endsWith('.lance'))
|
||||
})
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "lancedb-nodejs"
|
||||
edition.workspace = true
|
||||
version = "0.18.2-beta.0"
|
||||
version = "0.19.0-beta.0"
|
||||
license.workspace = true
|
||||
description.workspace = true
|
||||
repository.workspace = true
|
||||
|
||||
@@ -17,7 +17,7 @@ describe("when connecting", () => {
|
||||
it("should connect", async () => {
|
||||
const db = await connect(tmpDir.name);
|
||||
expect(db.display()).toBe(
|
||||
`ListingDatabase(uri=${tmpDir.name}, read_consistency_interval=None)`,
|
||||
`ListingDatabase(uri=${tmpDir.name}, read_consistency_interval=5s)`,
|
||||
);
|
||||
});
|
||||
|
||||
|
||||
@@ -58,7 +58,7 @@ describe.each([arrow15, arrow16, arrow17, arrow18])(
|
||||
|
||||
it("be displayable", async () => {
|
||||
expect(table.display()).toMatch(
|
||||
/NativeTable\(some_table, uri=.*, read_consistency_interval=None\)/,
|
||||
/NativeTable\(some_table, uri=.*, read_consistency_interval=5s\)/,
|
||||
);
|
||||
table.close();
|
||||
expect(table.display()).toBe("ClosedTable(some_table)");
|
||||
@@ -633,6 +633,23 @@ describe("When creating an index", () => {
|
||||
expect(plan2).not.toMatch("LanceScan");
|
||||
});
|
||||
|
||||
it("should be able to run analyze plan", async () => {
|
||||
await tbl.createIndex("vec");
|
||||
await tbl.add([
|
||||
{
|
||||
id: 300,
|
||||
vec: Array(32)
|
||||
.fill(1)
|
||||
.map(() => Math.random()),
|
||||
tags: [],
|
||||
},
|
||||
]);
|
||||
|
||||
const plan = await tbl.query().nearestTo(queryVec).analyzePlan();
|
||||
expect(plan).toMatch("AnalyzeExec");
|
||||
expect(plan).toMatch("metrics=");
|
||||
});
|
||||
|
||||
it("should be able to query with row id", async () => {
|
||||
const results = await tbl
|
||||
.query()
|
||||
@@ -1346,6 +1363,30 @@ describe("when calling explainPlan", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("when calling analyzePlan", () => {
|
||||
let tmpDir: tmp.DirResult;
|
||||
let table: Table;
|
||||
let queryVec: number[];
|
||||
beforeEach(async () => {
|
||||
tmpDir = tmp.dirSync({ unsafeCleanup: true });
|
||||
const con = await connect(tmpDir.name);
|
||||
table = await con.createTable("vectors", [{ id: 1, vector: [1.1, 0.9] }]);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
tmpDir.removeCallback();
|
||||
});
|
||||
|
||||
it("retrieves runtime metrics", async () => {
|
||||
queryVec = Array(2)
|
||||
.fill(1)
|
||||
.map(() => Math.random());
|
||||
const plan = await table.query().nearestTo(queryVec).analyzePlan();
|
||||
console.log("Query Plan:\n", plan); // <--- Print the plan
|
||||
expect(plan).toMatch("AnalyzeExec");
|
||||
});
|
||||
});
|
||||
|
||||
describe("column name options", () => {
|
||||
let tmpDir: tmp.DirResult;
|
||||
let table: Table;
|
||||
|
||||
@@ -202,5 +202,35 @@ test("basic table examples", async () => {
|
||||
// --8<-- [end:create_f16_table]
|
||||
await db.dropTable("f16_tbl");
|
||||
}
|
||||
const uri = databaseDir;
|
||||
await db.createTable("my_table", [{ id: 1 }, { id: 2 }]);
|
||||
{
|
||||
// --8<-- [start:table_strong_consistency]
|
||||
const db = await lancedb.connect({ uri, readConsistencyInterval: 0 });
|
||||
const tbl = await db.openTable("my_table");
|
||||
// --8<-- [end:table_strong_consistency]
|
||||
}
|
||||
{
|
||||
// --8<-- [start:table_eventual_consistency]
|
||||
const db = await lancedb.connect({ uri, readConsistencyInterval: 5 });
|
||||
const tbl = await db.openTable("my_table");
|
||||
// --8<-- [end:table_eventual_consistency]
|
||||
}
|
||||
{
|
||||
// --8<-- [start:table_no_consistency]
|
||||
const db = await lancedb.connect({ uri, readConsistencyInterval: null });
|
||||
const tbl = await db.openTable("my_table");
|
||||
// --8<-- [end:table_no_consistency]
|
||||
}
|
||||
{
|
||||
// --8<-- [start:table_checkout_latest]
|
||||
const tbl = await db.openTable("my_table");
|
||||
|
||||
// (Other writes happen to test_table_async from another process)
|
||||
|
||||
// Check for updates
|
||||
tbl.checkoutLatest();
|
||||
// --8<-- [end:table_checkout_latest]
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -348,6 +348,43 @@ export class QueryBase<NativeQueryType extends NativeQuery | NativeVectorQuery>
|
||||
return this.inner.explainPlan(verbose);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the query and returns the physical query plan annotated with runtime metrics.
|
||||
*
|
||||
* This is useful for debugging and performance analysis, as it shows how the query was executed
|
||||
* and includes metrics such as elapsed time, rows processed, and I/O statistics.
|
||||
*
|
||||
* @example
|
||||
* import * as lancedb from "@lancedb/lancedb"
|
||||
*
|
||||
* const db = await lancedb.connect("./.lancedb");
|
||||
* const table = await db.createTable("my_table", [
|
||||
* { vector: [1.1, 0.9], id: "1" },
|
||||
* ]);
|
||||
*
|
||||
* const plan = await table.query().nearestTo([0.5, 0.2]).analyzePlan();
|
||||
*
|
||||
* Example output (with runtime metrics inlined):
|
||||
* AnalyzeExec verbose=true, metrics=[]
|
||||
* ProjectionExec: expr=[id@3 as id, vector@0 as vector, _distance@2 as _distance], metrics=[output_rows=1, elapsed_compute=3.292µs]
|
||||
* Take: columns="vector, _rowid, _distance, (id)", metrics=[output_rows=1, elapsed_compute=66.001µs, batches_processed=1, bytes_read=8, iops=1, requests=1]
|
||||
* CoalesceBatchesExec: target_batch_size=1024, metrics=[output_rows=1, elapsed_compute=3.333µs]
|
||||
* GlobalLimitExec: skip=0, fetch=10, metrics=[output_rows=1, elapsed_compute=167ns]
|
||||
* FilterExec: _distance@2 IS NOT NULL, metrics=[output_rows=1, elapsed_compute=8.542µs]
|
||||
* SortExec: TopK(fetch=10), expr=[_distance@2 ASC NULLS LAST], metrics=[output_rows=1, elapsed_compute=63.25µs, row_replacements=1]
|
||||
* KNNVectorDistance: metric=l2, metrics=[output_rows=1, elapsed_compute=114.333µs, output_batches=1]
|
||||
* LanceScan: uri=/path/to/data, projection=[vector], row_id=true, row_addr=false, ordered=false, metrics=[output_rows=1, elapsed_compute=103.626µs, bytes_read=549, iops=2, requests=2]
|
||||
*
|
||||
* @returns A query execution plan with runtime metrics for each step.
|
||||
*/
|
||||
async analyzePlan(): Promise<string> {
|
||||
if (this.inner instanceof Promise) {
|
||||
return this.inner.then((inner) => inner.analyzePlan());
|
||||
} else {
|
||||
return this.inner.analyzePlan();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-darwin-arm64",
|
||||
"version": "0.18.2-beta.0",
|
||||
"version": "0.19.0-beta.0",
|
||||
"os": ["darwin"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.darwin-arm64.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-darwin-x64",
|
||||
"version": "0.18.2-beta.0",
|
||||
"version": "0.19.0-beta.0",
|
||||
"os": ["darwin"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.darwin-x64.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-gnu",
|
||||
"version": "0.18.2-beta.0",
|
||||
"version": "0.19.0-beta.0",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-arm64-musl",
|
||||
"version": "0.18.2-beta.0",
|
||||
"version": "0.19.0-beta.0",
|
||||
"os": ["linux"],
|
||||
"cpu": ["arm64"],
|
||||
"main": "lancedb.linux-arm64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-gnu",
|
||||
"version": "0.18.2-beta.0",
|
||||
"version": "0.19.0-beta.0",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-gnu.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-linux-x64-musl",
|
||||
"version": "0.18.2-beta.0",
|
||||
"version": "0.19.0-beta.0",
|
||||
"os": ["linux"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.linux-x64-musl.node",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-arm64-msvc",
|
||||
"version": "0.18.2-beta.0",
|
||||
"version": "0.19.0-beta.0",
|
||||
"os": [
|
||||
"win32"
|
||||
],
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb-win32-x64-msvc",
|
||||
"version": "0.18.2-beta.0",
|
||||
"version": "0.19.0-beta.0",
|
||||
"os": ["win32"],
|
||||
"cpu": ["x64"],
|
||||
"main": "lancedb.win32-x64-msvc.node",
|
||||
|
||||
4
nodejs/package-lock.json
generated
4
nodejs/package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.18.2-beta.0",
|
||||
"version": "0.18.3-beta.0",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "@lancedb/lancedb",
|
||||
"version": "0.18.2-beta.0",
|
||||
"version": "0.18.3-beta.0",
|
||||
"cpu": [
|
||||
"x64",
|
||||
"arm64"
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
"ann"
|
||||
],
|
||||
"private": false,
|
||||
"version": "0.18.2-beta.0",
|
||||
"version": "0.19.0-beta.0",
|
||||
"main": "dist/index.js",
|
||||
"exports": {
|
||||
".": "./dist/index.js",
|
||||
|
||||
@@ -48,8 +48,16 @@ impl Connection {
|
||||
pub async fn new(uri: String, options: ConnectionOptions) -> napi::Result<Self> {
|
||||
let mut builder = ConnectBuilder::new(&uri);
|
||||
if let Some(interval) = options.read_consistency_interval {
|
||||
builder =
|
||||
builder.read_consistency_interval(std::time::Duration::from_secs_f64(interval));
|
||||
match interval {
|
||||
Either::A(seconds) => {
|
||||
builder = builder.read_consistency_interval(Some(
|
||||
std::time::Duration::from_secs_f64(seconds),
|
||||
));
|
||||
}
|
||||
Either::B(_) => {
|
||||
builder = builder.read_consistency_interval(None);
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(storage_options) = options.storage_options {
|
||||
for (key, value) in storage_options {
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use env_logger::Env;
|
||||
use napi::{bindgen_prelude::Null, Either};
|
||||
use napi_derive::*;
|
||||
|
||||
mod connection;
|
||||
@@ -18,7 +19,6 @@ mod table;
|
||||
mod util;
|
||||
|
||||
#[napi(object)]
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectionOptions {
|
||||
/// (For LanceDB OSS only): The interval, in seconds, at which to check for
|
||||
/// updates to the table from other processes. If None, then consistency is not
|
||||
@@ -29,7 +29,7 @@ pub struct ConnectionOptions {
|
||||
/// has passed since the last check, then the table will be checked for updates.
|
||||
/// Note: this consistency only applies to read operations. Write operations are
|
||||
/// always consistent.
|
||||
pub read_consistency_interval: Option<f64>,
|
||||
pub read_consistency_interval: Option<Either<f64, Null>>,
|
||||
/// (For LanceDB OSS only): configuration for object storage.
|
||||
///
|
||||
/// The available options are described at https://lancedb.github.io/lancedb/guides/storage/
|
||||
|
||||
@@ -114,6 +114,16 @@ impl Query {
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn analyze_plan(&self) -> napi::Result<String> {
|
||||
self.inner.analyze_plan().await.map_err(|e| {
|
||||
napi::Error::from_reason(format!(
|
||||
"Failed to execute analyze plan: {}",
|
||||
convert_error(&e)
|
||||
))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[napi]
|
||||
@@ -259,4 +269,14 @@ impl VectorQuery {
|
||||
))
|
||||
})
|
||||
}
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn analyze_plan(&self) -> napi::Result<String> {
|
||||
self.inner.analyze_plan().await.map_err(|e| {
|
||||
napi::Error::from_reason(format!(
|
||||
"Failed to execute analyze plan: {}",
|
||||
convert_error(&e)
|
||||
))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[tool.bumpversion]
|
||||
current_version = "0.21.3-beta.0"
|
||||
current_version = "0.22.0-beta.0"
|
||||
parse = """(?x)
|
||||
(?P<major>0|[1-9]\\d*)\\.
|
||||
(?P<minor>0|[1-9]\\d*)\\.
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb-python"
|
||||
version = "0.21.3-beta.0"
|
||||
version = "0.22.0-beta.0"
|
||||
edition.workspace = true
|
||||
description = "Python bindings for LanceDB"
|
||||
license.workspace = true
|
||||
|
||||
@@ -26,7 +26,7 @@ def connect(
|
||||
api_key: Optional[str] = None,
|
||||
region: str = "us-east-1",
|
||||
host_override: Optional[str] = None,
|
||||
read_consistency_interval: Optional[timedelta] = None,
|
||||
read_consistency_interval: Optional[timedelta] = timedelta(seconds=5),
|
||||
request_thread_pool: Optional[Union[int, ThreadPoolExecutor]] = None,
|
||||
client_config: Union[ClientConfig, Dict[str, Any], None] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
@@ -49,9 +49,8 @@ def connect(
|
||||
read_consistency_interval: timedelta, default None
|
||||
(For LanceDB OSS only)
|
||||
The interval at which to check for updates to the table from other
|
||||
processes. If None, then consistency is not checked. For performance
|
||||
reasons, this is the default. For strong consistency, set this to
|
||||
zero seconds. Then every read will check for updates from other
|
||||
processes. If None, then consistency is not checked. For strong consistency,
|
||||
set this to zero seconds. Then every read will check for updates from other
|
||||
processes. As a compromise, you can set this to a non-zero timedelta
|
||||
for eventual consistency. If more than that interval has passed since
|
||||
the last check, then the table will be checked for updates. Note: this
|
||||
@@ -122,7 +121,7 @@ async def connect_async(
|
||||
api_key: Optional[str] = None,
|
||||
region: str = "us-east-1",
|
||||
host_override: Optional[str] = None,
|
||||
read_consistency_interval: Optional[timedelta] = None,
|
||||
read_consistency_interval: Optional[timedelta] = timedelta(seconds=5),
|
||||
client_config: Optional[Union[ClientConfig, Dict[str, Any]]] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
) -> AsyncConnection:
|
||||
@@ -143,9 +142,8 @@ async def connect_async(
|
||||
read_consistency_interval: timedelta, default None
|
||||
(For LanceDB OSS only)
|
||||
The interval at which to check for updates to the table from other
|
||||
processes. If None, then consistency is not checked. For performance
|
||||
reasons, this is the default. For strong consistency, set this to
|
||||
zero seconds. Then every read will check for updates from other
|
||||
processes. If None, then consistency is not checked. For strong consistency,
|
||||
set this to zero seconds. Then every read will check for updates from other
|
||||
processes. As a compromise, you can set this to a non-zero timedelta
|
||||
for eventual consistency. If more than that interval has passed since
|
||||
the last check, then the table will be checked for updates. Note: this
|
||||
|
||||
@@ -96,6 +96,7 @@ class Query:
|
||||
def nearest_to_text(self, query: dict) -> FTSQuery: ...
|
||||
async def execute(self, max_batch_length: Optional[int]) -> RecordBatchStream: ...
|
||||
async def explain_plan(self, verbose: Optional[bool]) -> str: ...
|
||||
async def analyze_plan(self) -> str: ...
|
||||
def to_query_request(self) -> PyQueryRequest: ...
|
||||
|
||||
class FTSQuery:
|
||||
|
||||
@@ -6,6 +6,7 @@ from __future__ import annotations
|
||||
|
||||
from abc import abstractmethod
|
||||
from pathlib import Path
|
||||
from datetime import timedelta
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Literal, Optional, Union
|
||||
|
||||
from lancedb.embeddings.registry import EmbeddingFunctionRegistry
|
||||
@@ -32,7 +33,6 @@ import deprecation
|
||||
if TYPE_CHECKING:
|
||||
import pyarrow as pa
|
||||
from .pydantic import LanceModel
|
||||
from datetime import timedelta
|
||||
|
||||
from ._lancedb import Connection as LanceDbConnection
|
||||
from .common import DATA, URI
|
||||
@@ -318,9 +318,8 @@ class LanceDBConnection(DBConnection):
|
||||
The root uri of the database.
|
||||
read_consistency_interval: timedelta, default None
|
||||
The interval at which to check for updates to the table from other
|
||||
processes. If None, then consistency is not checked. For performance
|
||||
reasons, this is the default. For strong consistency, set this to
|
||||
zero seconds. Then every read will check for updates from other
|
||||
processes. If None, then consistency is not checked. For strong consistency,
|
||||
set this to zero seconds. Then every read will check for updates from other
|
||||
processes. As a compromise, you can set this to a non-zero timedelta
|
||||
for eventual consistency. If more than that interval has passed since
|
||||
the last check, then the table will be checked for updates. Note: this
|
||||
@@ -352,7 +351,7 @@ class LanceDBConnection(DBConnection):
|
||||
self,
|
||||
uri: URI,
|
||||
*,
|
||||
read_consistency_interval: Optional[timedelta] = None,
|
||||
read_consistency_interval: Optional[timedelta] = timedelta(seconds=5),
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
):
|
||||
if not isinstance(uri, Path):
|
||||
|
||||
@@ -659,6 +659,44 @@ class LanceQueryBuilder(ABC):
|
||||
""" # noqa: E501
|
||||
return self._table._explain_plan(self.to_query_object(), verbose=verbose)
|
||||
|
||||
def analyze_plan(self) -> str:
|
||||
"""
|
||||
Run the query and return its execution plan with runtime metrics.
|
||||
|
||||
This returns detailed metrics for each step, such as elapsed time,
|
||||
rows processed, bytes read, and I/O stats. It is useful for debugging
|
||||
and performance tuning.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> import lancedb
|
||||
>>> db = lancedb.connect("./.lancedb")
|
||||
>>> table = db.create_table("my_table", [{"vector": [99.0, 99]}])
|
||||
>>> query = [100, 100]
|
||||
>>> plan = table.search(query).analyze_plan()
|
||||
>>> print(plan) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
|
||||
AnalyzeExec verbose=true, metrics=[]
|
||||
ProjectionExec: expr=[...], metrics=[...]
|
||||
GlobalLimitExec: skip=0, fetch=10, metrics=[...]
|
||||
FilterExec: _distance@2 IS NOT NULL,
|
||||
metrics=[output_rows=..., elapsed_compute=...]
|
||||
SortExec: TopK(fetch=10), expr=[...],
|
||||
preserve_partitioning=[...],
|
||||
metrics=[output_rows=..., elapsed_compute=..., row_replacements=...]
|
||||
KNNVectorDistance: metric=l2,
|
||||
metrics=[output_rows=..., elapsed_compute=..., output_batches=...]
|
||||
LanceScan: uri=..., projection=[vector], row_id=true,
|
||||
row_addr=false, ordered=false,
|
||||
metrics=[output_rows=..., elapsed_compute=...,
|
||||
bytes_read=..., iops=..., requests=...]
|
||||
|
||||
Returns
|
||||
-------
|
||||
plan : str
|
||||
The physical query execution plan with runtime metrics.
|
||||
"""
|
||||
return self._table._analyze_plan(self.to_query_object())
|
||||
|
||||
def vector(self, vector: Union[np.ndarray, list]) -> Self:
|
||||
"""Set the vector to search for.
|
||||
|
||||
@@ -1941,6 +1979,15 @@ class AsyncQueryBase(object):
|
||||
""" # noqa: E501
|
||||
return await self._inner.explain_plan(verbose)
|
||||
|
||||
async def analyze_plan(self):
|
||||
"""Execute the query and display with runtime metrics.
|
||||
|
||||
Returns
|
||||
-------
|
||||
plan : str
|
||||
"""
|
||||
return await self._inner.analyze_plan()
|
||||
|
||||
|
||||
class AsyncQuery(AsyncQueryBase):
|
||||
def __init__(self, inner: LanceQuery):
|
||||
@@ -2510,7 +2557,7 @@ class AsyncHybridQuery(AsyncQueryBase, AsyncVectorQueryBase):
|
||||
|
||||
Returns
|
||||
-------
|
||||
plan
|
||||
plan : str
|
||||
""" # noqa: E501
|
||||
|
||||
results = ["Vector Search Plan:"]
|
||||
@@ -2519,3 +2566,23 @@ class AsyncHybridQuery(AsyncQueryBase, AsyncVectorQueryBase):
|
||||
results.append(await self._inner.to_fts_query().explain_plan(verbose))
|
||||
|
||||
return "\n".join(results)
|
||||
|
||||
async def analyze_plan(self):
|
||||
"""
|
||||
Execute the query and return the physical execution plan with runtime metrics.
|
||||
|
||||
This runs both the vector and FTS (full-text search) queries and returns
|
||||
detailed metrics for each step of execution—such as rows processed,
|
||||
elapsed time, I/O stats, and more. It’s useful for debugging and
|
||||
performance analysis.
|
||||
|
||||
Returns
|
||||
-------
|
||||
plan : str
|
||||
"""
|
||||
results = ["Vector Search Query:"]
|
||||
results.append(await self._inner.to_vector_query().analyze_plan())
|
||||
results.append("FTS Search Query:")
|
||||
results.append(await self._inner.to_fts_query().analyze_plan())
|
||||
|
||||
return "\n".join(results)
|
||||
|
||||
@@ -371,6 +371,9 @@ class RemoteTable(Table):
|
||||
def _explain_plan(self, query: Query, verbose: Optional[bool] = False) -> str:
|
||||
return LOOP.run(self._table._explain_plan(query, verbose))
|
||||
|
||||
def _analyze_plan(self, query: Query) -> str:
|
||||
return LOOP.run(self._table._analyze_plan(query))
|
||||
|
||||
def merge_insert(self, on: Union[str, Iterable[str]]) -> LanceMergeInsertBuilder:
|
||||
"""Returns a [`LanceMergeInsertBuilder`][lancedb.merge.LanceMergeInsertBuilder]
|
||||
that can be used to create a "merge insert" operation.
|
||||
|
||||
@@ -1010,6 +1010,9 @@ class Table(ABC):
|
||||
@abstractmethod
|
||||
def _explain_plan(self, query: Query, verbose: Optional[bool] = False) -> str: ...
|
||||
|
||||
@abstractmethod
|
||||
def _analyze_plan(self, query: Query) -> str: ...
|
||||
|
||||
@abstractmethod
|
||||
def _do_merge(
|
||||
self,
|
||||
@@ -2318,6 +2321,9 @@ class LanceTable(Table):
|
||||
def _explain_plan(self, query: Query, verbose: Optional[bool] = False) -> str:
|
||||
return LOOP.run(self._table._explain_plan(query, verbose))
|
||||
|
||||
def _analyze_plan(self, query: Query) -> str:
|
||||
return LOOP.run(self._table._analyze_plan(query))
|
||||
|
||||
def _do_merge(
|
||||
self,
|
||||
merge: LanceMergeInsertBuilder,
|
||||
@@ -3388,6 +3394,11 @@ class AsyncTable:
|
||||
async_query = self._sync_query_to_async(query)
|
||||
return await async_query.explain_plan(verbose)
|
||||
|
||||
async def _analyze_plan(self, query: Query) -> str:
|
||||
# This method is used by the sync table
|
||||
async_query = self._sync_query_to_async(query)
|
||||
return await async_query.analyze_plan()
|
||||
|
||||
async def _do_merge(
|
||||
self,
|
||||
merge: LanceMergeInsertBuilder,
|
||||
|
||||
@@ -315,6 +315,11 @@ def test_table():
|
||||
db = lancedb.connect(uri, read_consistency_interval=timedelta(seconds=5))
|
||||
tbl = db.open_table("test_table")
|
||||
# --8<-- [end:table_eventual_consistency]
|
||||
# --8<-- [start:table_no_consistency]
|
||||
uri = "data/sample-lancedb"
|
||||
db = lancedb.connect(uri, read_consistency_interval=None)
|
||||
tbl = db.open_table("test_table")
|
||||
# --8<-- [end:table_no_consistency]
|
||||
# --8<-- [start:table_checkout_latest]
|
||||
tbl = db.open_table("test_table")
|
||||
|
||||
@@ -562,13 +567,19 @@ async def test_table_async():
|
||||
async_db = await lancedb.connect_async(uri, read_consistency_interval=timedelta(0))
|
||||
async_tbl = await async_db.open_table("test_table_async")
|
||||
# --8<-- [end:table_async_strong_consistency]
|
||||
# --8<-- [start:table_async_ventual_consistency]
|
||||
# --8<-- [start:table_async_eventual_consistency]
|
||||
uri = "data/sample-lancedb"
|
||||
async_db = await lancedb.connect_async(
|
||||
uri, read_consistency_interval=timedelta(seconds=5)
|
||||
)
|
||||
async_tbl = await async_db.open_table("test_table_async")
|
||||
# --8<-- [end:table_async_eventual_consistency]
|
||||
# --8<-- [start:table_async_no_consistency]
|
||||
uri = "data/sample-lancedb"
|
||||
async_db = await lancedb.connect_async(uri, read_consistency_interval=None)
|
||||
async_tbl = await async_db.open_table("test_table_async")
|
||||
# --8<-- [end:table_async_no_consistency]
|
||||
|
||||
# --8<-- [start:table_async_checkout_latest]
|
||||
async_tbl = await async_db.open_table("test_table_async")
|
||||
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
|
||||
|
||||
import re
|
||||
from datetime import timedelta
|
||||
import os
|
||||
|
||||
import lancedb
|
||||
@@ -299,13 +298,11 @@ def test_create_exist_ok(tmp_db: lancedb.DBConnection):
|
||||
@pytest.mark.asyncio
|
||||
async def test_connect(tmp_path):
|
||||
db = await lancedb.connect_async(tmp_path)
|
||||
assert str(db) == f"ListingDatabase(uri={tmp_path}, read_consistency_interval=None)"
|
||||
|
||||
db = await lancedb.connect_async(
|
||||
tmp_path, read_consistency_interval=timedelta(seconds=5)
|
||||
)
|
||||
assert str(db) == f"ListingDatabase(uri={tmp_path}, read_consistency_interval=5s)"
|
||||
|
||||
db = await lancedb.connect_async(tmp_path, read_consistency_interval=None)
|
||||
assert str(db) == f"ListingDatabase(uri={tmp_path}, read_consistency_interval=None)"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_close(mem_db_async: lancedb.AsyncConnection):
|
||||
@@ -453,7 +450,7 @@ async def test_open_table(tmp_path):
|
||||
assert tbl.name == "test"
|
||||
assert (
|
||||
re.search(
|
||||
r"NativeTable\(test, uri=.*test\.lance, read_consistency_interval=None\)",
|
||||
r"NativeTable\(test, uri=.*test\.lance, read_consistency_interval=5s\)",
|
||||
str(tbl),
|
||||
)
|
||||
is not None
|
||||
|
||||
@@ -114,6 +114,16 @@ async def test_explain_plan(table: AsyncTable):
|
||||
assert "LanceScan" in plan
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_analyze_plan(table: AsyncTable):
|
||||
res = await (
|
||||
table.query().nearest_to_text("dog").nearest_to([0.1, 0.1]).analyze_plan()
|
||||
)
|
||||
|
||||
assert "AnalyzeExec" in res
|
||||
assert "metrics=" in res
|
||||
|
||||
|
||||
def test_normalize_scores():
|
||||
cases = [
|
||||
(pa.array([0.1, 0.4]), pa.array([0.0, 1.0])),
|
||||
|
||||
@@ -702,6 +702,20 @@ async def test_fast_search_async(tmp_path):
|
||||
assert "LanceScan" not in plan
|
||||
|
||||
|
||||
def test_analyze_plan(table):
|
||||
q = LanceVectorQueryBuilder(table, [0, 0], "vector")
|
||||
res = q.analyze_plan()
|
||||
assert "AnalyzeExec" in res
|
||||
assert "metrics=" in res
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_analyze_plan_async(table_async: AsyncTable):
|
||||
res = await table_async.query().nearest_to(pa.array([1, 2])).analyze_plan()
|
||||
assert "AnalyzeExec" in res
|
||||
assert "metrics=" in res
|
||||
|
||||
|
||||
def test_explain_plan(table):
|
||||
q = LanceVectorQueryBuilder(table, [0, 0], "vector")
|
||||
plan = q.explain_plan(verbose=True)
|
||||
|
||||
@@ -32,7 +32,11 @@ def test_basic(mem_db: DBConnection):
|
||||
table = mem_db.create_table("test", data=data)
|
||||
|
||||
assert table.name == "test"
|
||||
assert "LanceTable(name='test', version=1, _conn=LanceDBConnection(" in repr(table)
|
||||
assert (
|
||||
"LanceTable(name='test', version=1, "
|
||||
"read_consistency_interval=datetime.timedelta(seconds=5), "
|
||||
"_conn=LanceDBConnection("
|
||||
) in repr(table)
|
||||
expected_schema = pa.schema(
|
||||
{
|
||||
"vector": pa.list_(pa.float32(), 2),
|
||||
|
||||
@@ -204,7 +204,9 @@ pub fn connect(
|
||||
}
|
||||
if let Some(read_consistency_interval) = read_consistency_interval {
|
||||
let read_consistency_interval = Duration::from_secs_f64(read_consistency_interval);
|
||||
builder = builder.read_consistency_interval(read_consistency_interval);
|
||||
builder = builder.read_consistency_interval(Some(read_consistency_interval));
|
||||
} else {
|
||||
builder = builder.read_consistency_interval(None);
|
||||
}
|
||||
if let Some(storage_options) = storage_options {
|
||||
builder = builder.storage_options(storage_options);
|
||||
|
||||
@@ -281,6 +281,16 @@ impl Query {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn analyze_plan(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
inner
|
||||
.analyze_plan()
|
||||
.await
|
||||
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
|
||||
})
|
||||
}
|
||||
|
||||
pub fn to_query_request(&self) -> PyQueryRequest {
|
||||
PyQueryRequest::from(AnyQuery::Query(self.inner.clone().into_request()))
|
||||
}
|
||||
@@ -365,6 +375,16 @@ impl FTSQuery {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn analyze_plan(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
inner
|
||||
.analyze_plan()
|
||||
.await
|
||||
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_query(&self) -> String {
|
||||
self.fts_query.query.clone()
|
||||
}
|
||||
@@ -480,6 +500,16 @@ impl VectorQuery {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn analyze_plan(self_: PyRef<'_, Self>) -> PyResult<Bound<'_, PyAny>> {
|
||||
let inner = self_.inner.clone();
|
||||
future_into_py(self_.py(), async move {
|
||||
inner
|
||||
.analyze_plan()
|
||||
.await
|
||||
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
|
||||
})
|
||||
}
|
||||
|
||||
pub fn nearest_to_text(&mut self, query: Bound<'_, PyDict>) -> PyResult<HybridQuery> {
|
||||
let base_query = self.inner.clone().into_plain();
|
||||
let fts_query = Query::new(base_query).nearest_to_text(query)?;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb-node"
|
||||
version = "0.18.2-beta.0"
|
||||
version = "0.19.0-beta.0"
|
||||
description = "Serverless, low-latency vector database for AI applications"
|
||||
license.workspace = true
|
||||
edition.workspace = true
|
||||
|
||||
@@ -60,7 +60,7 @@ fn database_new(mut cx: FunctionContext) -> JsResult<JsPromise> {
|
||||
let mut conn_builder = connect(&path).storage_options(storage_options);
|
||||
|
||||
if let Some(interval) = read_consistency_interval {
|
||||
conn_builder = conn_builder.read_consistency_interval(interval);
|
||||
conn_builder = conn_builder.read_consistency_interval(Some(interval));
|
||||
}
|
||||
rt.spawn(async move {
|
||||
let database = conn_builder.execute().await;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lancedb"
|
||||
version = "0.18.2-beta.0"
|
||||
version = "0.19.0-beta.0"
|
||||
edition.workspace = true
|
||||
description = "LanceDB: A serverless, low-latency vector database for AI applications"
|
||||
license.workspace = true
|
||||
|
||||
@@ -12,7 +12,7 @@ use super::{
|
||||
Catalog, CatalogOptions, CreateDatabaseMode, CreateDatabaseRequest, DatabaseNamesRequest,
|
||||
OpenDatabaseRequest,
|
||||
};
|
||||
use crate::connection::ConnectRequest;
|
||||
use crate::connection::{ConnectRequest, DEFAULT_READ_CONSISTENCY_INTERVAL};
|
||||
use crate::database::listing::{ListingDatabase, ListingDatabaseOptions};
|
||||
use crate::database::{Database, DatabaseOptions};
|
||||
use crate::error::{CreateDirSnafu, Error, Result};
|
||||
@@ -214,7 +214,7 @@ impl Catalog for ListingCatalog {
|
||||
uri: db_uri,
|
||||
#[cfg(feature = "remote")]
|
||||
client_config: Default::default(),
|
||||
read_consistency_interval: None,
|
||||
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
|
||||
options: Default::default(),
|
||||
};
|
||||
|
||||
@@ -241,7 +241,7 @@ impl Catalog for ListingCatalog {
|
||||
uri: db_path.to_string(),
|
||||
#[cfg(feature = "remote")]
|
||||
client_config: Default::default(),
|
||||
read_consistency_interval: None,
|
||||
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
|
||||
options: Default::default(),
|
||||
};
|
||||
|
||||
@@ -311,7 +311,7 @@ mod tests {
|
||||
#[cfg(feature = "remote")]
|
||||
client_config: Default::default(),
|
||||
options: Default::default(),
|
||||
read_consistency_interval: None,
|
||||
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
|
||||
};
|
||||
|
||||
let catalog = ListingCatalog::connect(&request).await.unwrap();
|
||||
|
||||
@@ -36,6 +36,9 @@ pub use lance_encoding::version::LanceFileVersion;
|
||||
#[cfg(feature = "remote")]
|
||||
use lance_io::object_store::StorageOptions;
|
||||
|
||||
pub(crate) const DEFAULT_READ_CONSISTENCY_INTERVAL: Option<std::time::Duration> =
|
||||
Some(std::time::Duration::from_secs(5));
|
||||
|
||||
/// A builder for configuring a [`Connection::table_names`] operation
|
||||
pub struct TableNamesBuilder {
|
||||
parent: Arc<dyn Database>,
|
||||
@@ -618,14 +621,15 @@ pub struct ConnectRequest {
|
||||
|
||||
/// The interval at which to check for updates from other processes.
|
||||
///
|
||||
/// If None, then consistency is not checked. For performance
|
||||
/// reasons, this is the default. For strong consistency, set this to
|
||||
/// If None, then consistency is not checked. For strong consistency, set this to
|
||||
/// zero seconds. Then every read will check for updates from other
|
||||
/// processes. As a compromise, you can set this to a non-zero timedelta
|
||||
/// for eventual consistency. If more than that interval has passed since
|
||||
/// the last check, then the table will be checked for updates. Note: this
|
||||
/// consistency only applies to read operations. Write operations are
|
||||
/// always consistent.
|
||||
///
|
||||
/// The default is 5 seconds.
|
||||
pub read_consistency_interval: Option<std::time::Duration>,
|
||||
}
|
||||
|
||||
@@ -643,7 +647,7 @@ impl ConnectBuilder {
|
||||
uri: uri.to_string(),
|
||||
#[cfg(feature = "remote")]
|
||||
client_config: Default::default(),
|
||||
read_consistency_interval: None,
|
||||
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
|
||||
options: HashMap::new(),
|
||||
},
|
||||
embedding_registry: None,
|
||||
@@ -782,8 +786,7 @@ impl ConnectBuilder {
|
||||
/// The interval at which to check for updates from other processes. This
|
||||
/// only affects LanceDB OSS.
|
||||
///
|
||||
/// If left unset, consistency is not checked. For maximum read
|
||||
/// performance, this is the default. For strong consistency, set this to
|
||||
/// If left unset, consistency is not checked. For strong consistency, set this to
|
||||
/// zero seconds. Then every read will check for updates from other processes.
|
||||
/// As a compromise, set this to a non-zero duration for eventual consistency.
|
||||
/// If more than that duration has passed since the last read, the read will
|
||||
@@ -792,13 +795,15 @@ impl ConnectBuilder {
|
||||
/// This only affects read operations. Write operations are always
|
||||
/// consistent.
|
||||
///
|
||||
/// The default is 5 seconds.
|
||||
///
|
||||
/// LanceDB Cloud uses eventual consistency under the hood, and is not
|
||||
/// currently configurable.
|
||||
pub fn read_consistency_interval(
|
||||
mut self,
|
||||
read_consistency_interval: std::time::Duration,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
) -> Self {
|
||||
self.request.read_consistency_interval = Some(read_consistency_interval);
|
||||
self.request.read_consistency_interval = read_consistency_interval;
|
||||
self
|
||||
}
|
||||
|
||||
@@ -882,7 +887,7 @@ impl CatalogConnectBuilder {
|
||||
uri: uri.to_string(),
|
||||
#[cfg(feature = "remote")]
|
||||
client_config: Default::default(),
|
||||
read_consistency_interval: None,
|
||||
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
|
||||
options: HashMap::new(),
|
||||
},
|
||||
}
|
||||
|
||||
@@ -579,6 +579,15 @@ pub trait ExecutableQuery {
|
||||
) -> impl Future<Output = Result<SendableRecordBatchStream>> + Send;
|
||||
|
||||
fn explain_plan(&self, verbose: bool) -> impl Future<Output = Result<String>> + Send;
|
||||
|
||||
fn analyze_plan(&self) -> impl Future<Output = Result<String>> + Send {
|
||||
self.analyze_plan_with_options(QueryExecutionOptions::default())
|
||||
}
|
||||
|
||||
fn analyze_plan_with_options(
|
||||
&self,
|
||||
options: QueryExecutionOptions,
|
||||
) -> impl Future<Output = Result<String>> + Send;
|
||||
}
|
||||
|
||||
/// A query filter that can be applied to a query
|
||||
@@ -765,6 +774,11 @@ impl ExecutableQuery for Query {
|
||||
let query = AnyQuery::Query(self.request.clone());
|
||||
self.parent.explain_plan(&query, verbose).await
|
||||
}
|
||||
|
||||
async fn analyze_plan_with_options(&self, options: QueryExecutionOptions) -> Result<String> {
|
||||
let query = AnyQuery::Query(self.request.clone());
|
||||
self.parent.analyze_plan(&query, options).await
|
||||
}
|
||||
}
|
||||
|
||||
/// A request for a nearest-neighbors search into a table
|
||||
@@ -1089,6 +1103,11 @@ impl ExecutableQuery for VectorQuery {
|
||||
let query = AnyQuery::VectorQuery(self.request.clone());
|
||||
self.parent.explain_plan(&query, verbose).await
|
||||
}
|
||||
|
||||
async fn analyze_plan_with_options(&self, options: QueryExecutionOptions) -> Result<String> {
|
||||
let query = AnyQuery::VectorQuery(self.request.clone());
|
||||
self.parent.analyze_plan(&query, options).await
|
||||
}
|
||||
}
|
||||
|
||||
impl HasQuery for VectorQuery {
|
||||
@@ -1370,6 +1389,31 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_analyze_plan() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let table = make_test_table(&tmp_dir).await;
|
||||
|
||||
let result = table.query().analyze_plan().await.unwrap();
|
||||
assert!(result.contains("metrics="));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_analyze_plan_with_options() {
|
||||
let tmp_dir = tempdir().unwrap();
|
||||
let table = make_test_table(&tmp_dir).await;
|
||||
|
||||
let result = table
|
||||
.query()
|
||||
.analyze_plan_with_options(QueryExecutionOptions {
|
||||
max_batch_length: 10,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(result.contains("metrics="));
|
||||
}
|
||||
|
||||
fn assert_plan_exists(plan: &Arc<dyn ExecutionPlan>, name: &str) -> bool {
|
||||
if plan.name() == name {
|
||||
return true;
|
||||
|
||||
@@ -434,6 +434,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
|
||||
let (request_id, response) = self.client.send(request, true).await?;
|
||||
self.check_table_response(&request_id, response).await?;
|
||||
self.checkout_latest().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -614,6 +615,48 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
Ok(final_plan)
|
||||
}
|
||||
|
||||
async fn analyze_plan(
|
||||
&self,
|
||||
query: &AnyQuery,
|
||||
_options: QueryExecutionOptions,
|
||||
) -> Result<String> {
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/analyze_plan/", self.name));
|
||||
|
||||
let query_bodies = self.prepare_query_bodies(query).await?;
|
||||
let requests: Vec<reqwest::RequestBuilder> = query_bodies
|
||||
.into_iter()
|
||||
.map(|body| request.try_clone().unwrap().json(&body))
|
||||
.collect();
|
||||
|
||||
let futures = requests.into_iter().map(|req| async move {
|
||||
let (request_id, response) = self.client.send(req, true).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
let body = response.text().await.err_to_http(request_id.clone())?;
|
||||
|
||||
serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
source: format!("Failed to execute analyze plan: {}", e).into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
})
|
||||
});
|
||||
|
||||
let analyze_result_texts = futures::future::try_join_all(futures).await?;
|
||||
let final_analyze = if analyze_result_texts.len() > 1 {
|
||||
analyze_result_texts
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(i, plan)| format!("--- Query #{} ---\n{}", i + 1, plan))
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n\n")
|
||||
} else {
|
||||
analyze_result_texts.into_iter().next().unwrap_or_default()
|
||||
};
|
||||
|
||||
Ok(final_analyze)
|
||||
}
|
||||
|
||||
async fn update(&self, update: UpdateBuilder) -> Result<u64> {
|
||||
self.check_mutable().await?;
|
||||
let request = self
|
||||
|
||||
@@ -33,7 +33,7 @@ use lance::dataset::{
|
||||
use lance::dataset::{MergeInsertBuilder as LanceMergeInsertBuilder, WhenNotMatchedBySource};
|
||||
use lance::index::vector::utils::infer_vector_dim;
|
||||
use lance::io::WrappingObjectStore;
|
||||
use lance_datafusion::exec::execute_plan;
|
||||
use lance_datafusion::exec::{analyze_plan as lance_analyze_plan, execute_plan};
|
||||
use lance_datafusion::utils::StreamingWriteSource;
|
||||
use lance_index::vector::hnsw::builder::HnswBuildParams;
|
||||
use lance_index::vector::ivf::IvfBuildParams;
|
||||
@@ -433,6 +433,12 @@ pub trait BaseTable: std::fmt::Display + std::fmt::Debug + Send + Sync {
|
||||
|
||||
Ok(format!("{}", display.indent(verbose)))
|
||||
}
|
||||
async fn analyze_plan(
|
||||
&self,
|
||||
query: &AnyQuery,
|
||||
options: QueryExecutionOptions,
|
||||
) -> Result<String>;
|
||||
|
||||
/// Add new records to the table.
|
||||
async fn add(
|
||||
&self,
|
||||
@@ -2192,6 +2198,15 @@ impl BaseTable for NativeTable {
|
||||
self.generic_query(query, options).await
|
||||
}
|
||||
|
||||
async fn analyze_plan(
|
||||
&self,
|
||||
query: &AnyQuery,
|
||||
options: QueryExecutionOptions,
|
||||
) -> Result<String> {
|
||||
let plan = self.create_plan(query, options).await?;
|
||||
Ok(lance_analyze_plan(plan, Default::default()).await?)
|
||||
}
|
||||
|
||||
async fn merge_insert(
|
||||
&self,
|
||||
params: MergeInsertBuilder,
|
||||
@@ -2611,7 +2626,7 @@ mod tests {
|
||||
let dataset_path = tmp_dir.path().join("test.lance");
|
||||
let uri = dataset_path.to_str().unwrap();
|
||||
let conn = connect(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.read_consistency_interval(Some(Duration::from_secs(0)))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -2694,7 +2709,7 @@ mod tests {
|
||||
let dataset_path = tmp_dir.path().join("test.lance");
|
||||
let uri = dataset_path.to_str().unwrap();
|
||||
let conn = connect(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.read_consistency_interval(Some(Duration::from_secs(0)))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -2891,7 +2906,7 @@ mod tests {
|
||||
let dataset_path = tmp_dir.path().join("test.lance");
|
||||
let uri = dataset_path.to_str().unwrap();
|
||||
let conn = connect(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.read_consistency_interval(Some(Duration::from_secs(0)))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -3462,7 +3477,8 @@ mod tests {
|
||||
|
||||
let mut conn2 = ConnectBuilder::new(uri);
|
||||
if let Some(interval) = interval {
|
||||
conn2 = conn2.read_consistency_interval(std::time::Duration::from_millis(interval));
|
||||
conn2 = conn2
|
||||
.read_consistency_interval(Some(std::time::Duration::from_millis(interval)));
|
||||
}
|
||||
let conn2 = conn2.execute().await.unwrap();
|
||||
let table2 = conn2.open_table("my_table").execute().await.unwrap();
|
||||
@@ -3498,7 +3514,7 @@ mod tests {
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.read_consistency_interval(Some(Duration::from_secs(0)))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -3519,7 +3535,7 @@ mod tests {
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.read_consistency_interval(Some(Duration::from_secs(0)))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -3594,7 +3610,7 @@ mod tests {
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.read_consistency_interval(Some(Duration::from_secs(0)))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -3656,7 +3672,7 @@ mod tests {
|
||||
let uri = tmp_dir.path().to_str().unwrap();
|
||||
|
||||
let conn = ConnectBuilder::new(uri)
|
||||
.read_consistency_interval(Duration::from_secs(0))
|
||||
.read_consistency_interval(Some(Duration::from_secs(0)))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -7,6 +7,7 @@ use std::{
|
||||
time::{self, Duration, Instant},
|
||||
};
|
||||
|
||||
use futures::FutureExt;
|
||||
use lance::Dataset;
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
|
||||
@@ -22,13 +23,16 @@ pub struct DatasetConsistencyWrapper(Arc<RwLock<DatasetRef>>);
|
||||
///
|
||||
/// The dataset is lazily loaded, and starts off as None. On the first access,
|
||||
/// the dataset is loaded.
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug)]
|
||||
enum DatasetRef {
|
||||
/// In this mode, the dataset is always the latest version.
|
||||
Latest {
|
||||
dataset: Dataset,
|
||||
read_consistency_interval: Option<Duration>,
|
||||
last_consistency_check: Option<time::Instant>,
|
||||
/// A background task loading the next version of the dataset. This happens
|
||||
/// in the background so as not to block the current thread.
|
||||
refresh_task: Option<tokio::task::JoinHandle<Result<Dataset>>>,
|
||||
},
|
||||
/// In this mode, the dataset is a specific version. It cannot be mutated.
|
||||
TimeTravel { dataset: Dataset, version: u64 },
|
||||
@@ -41,9 +45,19 @@ impl DatasetRef {
|
||||
Self::Latest {
|
||||
dataset,
|
||||
last_consistency_check,
|
||||
refresh_task,
|
||||
..
|
||||
} => {
|
||||
dataset.checkout_latest().await?;
|
||||
// Replace the refresh task
|
||||
if let Some(refresh_task) = refresh_task {
|
||||
refresh_task.abort();
|
||||
}
|
||||
let mut new_dataset = dataset.clone();
|
||||
refresh_task.replace(tokio::spawn(async move {
|
||||
new_dataset.checkout_latest().await?;
|
||||
Ok(new_dataset)
|
||||
}));
|
||||
last_consistency_check.replace(Instant::now());
|
||||
}
|
||||
Self::TimeTravel { dataset, version } => {
|
||||
@@ -57,26 +71,24 @@ impl DatasetRef {
|
||||
matches!(self, Self::Latest { .. })
|
||||
}
|
||||
|
||||
async fn need_reload(&self) -> Result<bool> {
|
||||
Ok(match self {
|
||||
Self::Latest { dataset, .. } => {
|
||||
dataset.latest_version_id().await? != dataset.version().version
|
||||
}
|
||||
Self::TimeTravel { dataset, version } => dataset.version().version != *version,
|
||||
})
|
||||
fn strong_consistency(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
Self::Latest { read_consistency_interval: Some(interval), .. }
|
||||
if interval.as_nanos() == 0
|
||||
)
|
||||
}
|
||||
|
||||
async fn as_latest(&mut self, read_consistency_interval: Option<Duration>) -> Result<()> {
|
||||
match self {
|
||||
Self::Latest { .. } => Ok(()),
|
||||
Self::TimeTravel { dataset, .. } => {
|
||||
dataset
|
||||
.checkout_version(dataset.latest_version_id().await?)
|
||||
.await?;
|
||||
dataset.checkout_latest().await?;
|
||||
*self = Self::Latest {
|
||||
dataset: dataset.clone(),
|
||||
read_consistency_interval,
|
||||
last_consistency_check: Some(Instant::now()),
|
||||
refresh_task: None,
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
@@ -114,13 +126,74 @@ impl DatasetRef {
|
||||
match self {
|
||||
Self::Latest {
|
||||
dataset: ref mut ds,
|
||||
refresh_task,
|
||||
last_consistency_check,
|
||||
..
|
||||
} => {
|
||||
*ds = dataset;
|
||||
if let Some(refresh_task) = refresh_task {
|
||||
refresh_task.abort();
|
||||
}
|
||||
*refresh_task = None;
|
||||
*last_consistency_check = Some(Instant::now());
|
||||
}
|
||||
_ => unreachable!("Dataset should be in latest mode at this point"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait for the background refresh task to complete.
|
||||
async fn await_refresh(&mut self) -> Result<()> {
|
||||
if let Self::Latest {
|
||||
refresh_task: Some(refresh_task),
|
||||
read_consistency_interval,
|
||||
..
|
||||
} = self
|
||||
{
|
||||
let dataset = refresh_task.await.expect("Refresh task panicked")?;
|
||||
*self = Self::Latest {
|
||||
dataset,
|
||||
read_consistency_interval: *read_consistency_interval,
|
||||
last_consistency_check: Some(Instant::now()),
|
||||
refresh_task: None,
|
||||
};
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if background refresh task is done, and if so, update the dataset.
|
||||
fn check_refresh(&mut self) -> Result<()> {
|
||||
if let Self::Latest {
|
||||
refresh_task: Some(refresh_task),
|
||||
read_consistency_interval,
|
||||
..
|
||||
} = self
|
||||
{
|
||||
if refresh_task.is_finished() {
|
||||
let dataset = refresh_task
|
||||
.now_or_never()
|
||||
.unwrap()
|
||||
.expect("Refresh task panicked")?;
|
||||
*self = Self::Latest {
|
||||
dataset,
|
||||
read_consistency_interval: *read_consistency_interval,
|
||||
last_consistency_check: Some(Instant::now()),
|
||||
refresh_task: None,
|
||||
};
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn refresh_is_ready(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
Self::Latest {
|
||||
refresh_task: Some(refresh_task),
|
||||
..
|
||||
}
|
||||
if refresh_task.is_finished()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl DatasetConsistencyWrapper {
|
||||
@@ -130,6 +203,7 @@ impl DatasetConsistencyWrapper {
|
||||
dataset,
|
||||
read_consistency_interval,
|
||||
last_consistency_check: Some(Instant::now()),
|
||||
refresh_task: None,
|
||||
})))
|
||||
}
|
||||
|
||||
@@ -188,18 +262,9 @@ impl DatasetConsistencyWrapper {
|
||||
}
|
||||
|
||||
pub async fn reload(&self) -> Result<()> {
|
||||
if !self.0.read().await.need_reload().await? {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut write_guard = self.0.write().await;
|
||||
// on lock escalation -- check if someone else has already reloaded
|
||||
if !write_guard.need_reload().await? {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// actually need reloading
|
||||
write_guard.reload().await
|
||||
write_guard.reload().await?;
|
||||
write_guard.await_refresh().await
|
||||
}
|
||||
|
||||
/// Returns the version, if in time travel mode, or None otherwise
|
||||
@@ -245,9 +310,26 @@ impl DatasetConsistencyWrapper {
|
||||
/// Ensures that the dataset is loaded and up-to-date with consistency and
|
||||
/// version parameters.
|
||||
async fn ensure_up_to_date(&self) -> Result<()> {
|
||||
// We may have previously created a background task to fetch the new
|
||||
// version of the dataset. If that task is done, we should update the
|
||||
// dataset.
|
||||
{
|
||||
let read_guard = self.0.read().await;
|
||||
if read_guard.refresh_is_ready() {
|
||||
drop(read_guard);
|
||||
self.0.write().await.check_refresh()?;
|
||||
}
|
||||
}
|
||||
|
||||
if !self.is_up_to_date().await? {
|
||||
self.reload().await?;
|
||||
}
|
||||
|
||||
// If we are in strong consistency mode, we should await the refresh task.
|
||||
if self.0.read().await.strong_consistency() {
|
||||
self.0.write().await.await_refresh().await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user