mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-23 06:50:40 +00:00
Compare commits
1 Commits
read-consi
...
codex/upda
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b6310ed905 |
92
Cargo.lock
generated
92
Cargo.lock
generated
@@ -3284,8 +3284,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
|
||||
|
||||
[[package]]
|
||||
name = "fsst"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.1.0-beta.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.2#24b8afec580737d61c59845175f8ba2f0f390793"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"rand 0.9.4",
|
||||
@@ -4506,8 +4506,8 @@ checksum = "e037a2e1d8d5fdbd49b16a4ea09d5d6401c1f29eca5ff29d03d3824dba16256a"
|
||||
|
||||
[[package]]
|
||||
name = "lance"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.1.0-beta.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.2#24b8afec580737d61c59845175f8ba2f0f390793"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"arrow",
|
||||
@@ -4525,6 +4525,7 @@ dependencies = [
|
||||
"async_cell",
|
||||
"aws-credential-types",
|
||||
"aws-sdk-dynamodb",
|
||||
"bitpacking",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"chrono",
|
||||
@@ -4551,9 +4552,11 @@ dependencies = [
|
||||
"lance-io",
|
||||
"lance-linalg",
|
||||
"lance-namespace",
|
||||
"lance-select",
|
||||
"lance-table",
|
||||
"lance-tokenizer",
|
||||
"log",
|
||||
"moka",
|
||||
"object_store",
|
||||
"permutation",
|
||||
"pin-project",
|
||||
@@ -4577,8 +4580,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-arrow"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.1.0-beta.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.2#24b8afec580737d61c59845175f8ba2f0f390793"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4598,8 +4601,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-bitpacking"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.1.0-beta.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.2#24b8afec580737d61c59845175f8ba2f0f390793"
|
||||
dependencies = [
|
||||
"arrayref",
|
||||
"paste",
|
||||
@@ -4608,8 +4611,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-core"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.1.0-beta.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.2#24b8afec580737d61c59845175f8ba2f0f390793"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4644,8 +4647,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-datafusion"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.1.0-beta.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.2#24b8afec580737d61c59845175f8ba2f0f390793"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4675,8 +4678,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-datagen"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.1.0-beta.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.2#24b8afec580737d61c59845175f8ba2f0f390793"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4694,8 +4697,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-encoding"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.1.0-beta.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.2#24b8afec580737d61c59845175f8ba2f0f390793"
|
||||
dependencies = [
|
||||
"arrow-arith",
|
||||
"arrow-array",
|
||||
@@ -4730,8 +4733,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-file"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.1.0-beta.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.2#24b8afec580737d61c59845175f8ba2f0f390793"
|
||||
dependencies = [
|
||||
"arrow-arith",
|
||||
"arrow-array",
|
||||
@@ -4762,8 +4765,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-index"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.1.0-beta.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.2#24b8afec580737d61c59845175f8ba2f0f390793"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"arrow",
|
||||
@@ -4800,6 +4803,7 @@ dependencies = [
|
||||
"lance-file",
|
||||
"lance-io",
|
||||
"lance-linalg",
|
||||
"lance-select",
|
||||
"lance-table",
|
||||
"lance-tokenizer",
|
||||
"libm",
|
||||
@@ -4827,8 +4831,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-io"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.1.0-beta.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.2#24b8afec580737d61c59845175f8ba2f0f390793"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-arith",
|
||||
@@ -4870,8 +4874,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-linalg"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.1.0-beta.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.2#24b8afec580737d61c59845175f8ba2f0f390793"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
@@ -4887,8 +4891,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.1.0-beta.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.2#24b8afec580737d61c59845175f8ba2f0f390793"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"async-trait",
|
||||
@@ -4900,8 +4904,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace-impls"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.1.0-beta.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.2#24b8afec580737d61c59845175f8ba2f0f390793"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-ipc",
|
||||
@@ -4936,9 +4940,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-namespace-reqwest-client"
|
||||
version = "0.7.6"
|
||||
version = "0.7.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f65e31bdaa13e01dab6e7cf566da31df243c34a542f0d915d3601ec0e01e61d2"
|
||||
checksum = "6369eee4682fb11edf538388b43c61ce288b8302fe89bb40944d7daa7faaae99"
|
||||
dependencies = [
|
||||
"reqwest 0.12.28",
|
||||
"serde",
|
||||
@@ -4948,10 +4952,25 @@ dependencies = [
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lance-select"
|
||||
version = "7.1.0-beta.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.2#24b8afec580737d61c59845175f8ba2f0f390793"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-buffer",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"deepsize",
|
||||
"itertools 0.13.0",
|
||||
"lance-core",
|
||||
"roaring",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lance-table"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.1.0-beta.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.2#24b8afec580737d61c59845175f8ba2f0f390793"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"arrow-array",
|
||||
@@ -4970,6 +4989,7 @@ dependencies = [
|
||||
"lance-core",
|
||||
"lance-file",
|
||||
"lance-io",
|
||||
"lance-select",
|
||||
"log",
|
||||
"object_store",
|
||||
"prost",
|
||||
@@ -4990,8 +5010,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-testing"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.1.0-beta.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.2#24b8afec580737d61c59845175f8ba2f0f390793"
|
||||
dependencies = [
|
||||
"arrow-array",
|
||||
"arrow-schema",
|
||||
@@ -5002,8 +5022,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lance-tokenizer"
|
||||
version = "7.0.0-beta.13"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1"
|
||||
version = "7.1.0-beta.2"
|
||||
source = "git+https://github.com/lance-format/lance.git?tag=v7.1.0-beta.2#24b8afec580737d61c59845175f8ba2f0f390793"
|
||||
dependencies = [
|
||||
"jieba-rs",
|
||||
"lindera",
|
||||
|
||||
28
Cargo.toml
28
Cargo.toml
@@ -13,20 +13,20 @@ categories = ["database-implementations"]
|
||||
rust-version = "1.91.0"
|
||||
|
||||
[workspace.dependencies]
|
||||
lance = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance = { "version" = "=7.1.0-beta.2", default-features = false, "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-core = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datagen = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-file = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-io = { "version" = "=7.1.0-beta.2", default-features = false, "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-index = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-linalg = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-namespace-impls = { "version" = "=7.1.0-beta.2", default-features = false, "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-table = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-testing = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-datafusion = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-encoding = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
lance-arrow = { "version" = "=7.1.0-beta.2", "tag" = "v7.1.0-beta.2", "git" = "https://github.com/lance-format/lance.git" }
|
||||
ahash = "0.8"
|
||||
# Note that this one does not include pyarrow
|
||||
arrow = { version = "58.0.0", optional = false }
|
||||
|
||||
@@ -70,20 +70,16 @@ client used by manifest-enabled native connections.
|
||||
optional readConsistencyInterval: number;
|
||||
```
|
||||
|
||||
The interval, in seconds, at which to check for updates to the table
|
||||
from other processes. If None, then consistency is not checked. For
|
||||
performance reasons, this is the default. For strong consistency, set
|
||||
this to zero seconds. Then every read will check for updates from other
|
||||
processes. As a compromise, you can set this to a non-zero value for
|
||||
eventual consistency. If more than that interval has passed since the
|
||||
last check, then the table will be checked for updates. Note: this
|
||||
consistency only applies to read operations. Write operations are
|
||||
(For LanceDB OSS only): The interval, in seconds, at which to check for
|
||||
updates to the table from other processes. If None, then consistency is not
|
||||
checked. For performance reasons, this is the default. For strong
|
||||
consistency, set this to zero seconds. Then every read will check for
|
||||
updates from other processes. As a compromise, you can set this to a
|
||||
non-zero value for eventual consistency. If more than that interval
|
||||
has passed since the last check, then the table will be checked for updates.
|
||||
Note: this consistency only applies to read operations. Write operations are
|
||||
always consistent.
|
||||
|
||||
Stronger consistency is not free. The smaller the interval, the more
|
||||
often each read pays the cost of checking for updates against object
|
||||
storage, raising per-read latency and cost.
|
||||
|
||||
***
|
||||
|
||||
### region?
|
||||
|
||||
@@ -28,7 +28,7 @@
|
||||
<properties>
|
||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||
<arrow.version>15.0.0</arrow.version>
|
||||
<lance-core.version>7.0.0-beta.13</lance-core.version>
|
||||
<lance-core.version>7.1.0-beta.2</lance-core.version>
|
||||
<spotless.skip>false</spotless.skip>
|
||||
<spotless.version>2.30.0</spotless.version>
|
||||
<spotless.java.googlejavaformat.version>1.7</spotless.java.googlejavaformat.version>
|
||||
|
||||
@@ -24,19 +24,15 @@ mod util;
|
||||
#[napi(object)]
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectionOptions {
|
||||
/// The interval, in seconds, at which to check for updates to the table
|
||||
/// from other processes. If None, then consistency is not checked. For
|
||||
/// performance reasons, this is the default. For strong consistency, set
|
||||
/// this to zero seconds. Then every read will check for updates from other
|
||||
/// processes. As a compromise, you can set this to a non-zero value for
|
||||
/// eventual consistency. If more than that interval has passed since the
|
||||
/// last check, then the table will be checked for updates. Note: this
|
||||
/// consistency only applies to read operations. Write operations are
|
||||
/// (For LanceDB OSS only): The interval, in seconds, at which to check for
|
||||
/// updates to the table from other processes. If None, then consistency is not
|
||||
/// checked. For performance reasons, this is the default. For strong
|
||||
/// consistency, set this to zero seconds. Then every read will check for
|
||||
/// updates from other processes. As a compromise, you can set this to a
|
||||
/// non-zero value for eventual consistency. If more than that interval
|
||||
/// has passed since the last check, then the table will be checked for updates.
|
||||
/// Note: this consistency only applies to read operations. Write operations are
|
||||
/// always consistent.
|
||||
///
|
||||
/// Stronger consistency is not free. The smaller the interval, the more
|
||||
/// often each read pays the cost of checking for updates against object
|
||||
/// storage, raising per-read latency and cost.
|
||||
pub read_consistency_interval: Option<f64>,
|
||||
/// (For LanceDB OSS only): configuration for object storage.
|
||||
///
|
||||
|
||||
@@ -94,6 +94,7 @@ def connect(
|
||||
host_override: str, optional
|
||||
The override url for LanceDB Cloud.
|
||||
read_consistency_interval: timedelta, default None
|
||||
(For LanceDB OSS only)
|
||||
The interval at which to check for updates to the table from other
|
||||
processes. If None, then consistency is not checked. For performance
|
||||
reasons, this is the default. For strong consistency, set this to
|
||||
@@ -103,10 +104,6 @@ def connect(
|
||||
the last check, then the table will be checked for updates. Note: this
|
||||
consistency only applies to read operations. Write operations are
|
||||
always consistent.
|
||||
|
||||
Stronger consistency is not free. The smaller the interval, the more
|
||||
often each read pays the cost of checking for updates against object
|
||||
storage, raising per-read latency and cost.
|
||||
client_config: ClientConfig or dict, optional
|
||||
Configuration options for the LanceDB Cloud HTTP client. If a dict, then
|
||||
the keys are the attributes of the ClientConfig class. If None, then the
|
||||
@@ -220,7 +217,6 @@ def connect(
|
||||
request_thread_pool=request_thread_pool,
|
||||
client_config=client_config,
|
||||
storage_options=storage_options,
|
||||
read_consistency_interval=read_consistency_interval,
|
||||
**kwargs,
|
||||
)
|
||||
_check_s3_bucket_with_dots(str(uri), storage_options)
|
||||
@@ -347,6 +343,7 @@ async def connect_async(
|
||||
host_override: str, optional
|
||||
The override url for LanceDB Cloud.
|
||||
read_consistency_interval: timedelta, default None
|
||||
(For LanceDB OSS only)
|
||||
The interval at which to check for updates to the table from other
|
||||
processes. If None, then consistency is not checked. For performance
|
||||
reasons, this is the default. For strong consistency, set this to
|
||||
@@ -356,10 +353,6 @@ async def connect_async(
|
||||
the last check, then the table will be checked for updates. Note: this
|
||||
consistency only applies to read operations. Write operations are
|
||||
always consistent.
|
||||
|
||||
Stronger consistency is not free. The smaller the interval, the more
|
||||
often each read pays the cost of checking for updates against object
|
||||
storage, raising per-read latency and cost.
|
||||
client_config: ClientConfig or dict, optional
|
||||
Configuration options for the LanceDB Cloud HTTP client. If a dict, then
|
||||
the keys are the attributes of the ClientConfig class. If None, then the
|
||||
|
||||
@@ -50,7 +50,6 @@ class RemoteDBConnection(DBConnection):
|
||||
connection_timeout: Optional[float] = None,
|
||||
read_timeout: Optional[float] = None,
|
||||
storage_options: Optional[Dict[str, str]] = None,
|
||||
read_consistency_interval: Optional[timedelta] = None,
|
||||
):
|
||||
"""Connect to a remote LanceDB database."""
|
||||
if isinstance(client_config, dict):
|
||||
@@ -104,7 +103,6 @@ class RemoteDBConnection(DBConnection):
|
||||
host_override=host_override,
|
||||
client_config=client_config,
|
||||
storage_options=storage_options,
|
||||
read_consistency_interval=read_consistency_interval,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -812,7 +812,8 @@ impl ConnectBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// The interval at which to check for updates from other processes.
|
||||
/// The interval at which to check for updates from other processes. This
|
||||
/// only affects LanceDB OSS.
|
||||
///
|
||||
/// If left unset, consistency is not checked. For maximum read
|
||||
/// performance, this is the default. For strong consistency, set this to
|
||||
@@ -824,11 +825,8 @@ impl ConnectBuilder {
|
||||
/// This only affects read operations. Write operations are always
|
||||
/// consistent.
|
||||
///
|
||||
/// # Cost
|
||||
///
|
||||
/// Stronger consistency is not free. The smaller the interval, the more
|
||||
/// often each read pays the cost of checking for updates against object
|
||||
/// storage, raising per-read latency and cost.
|
||||
/// LanceDB Cloud uses eventual consistency under the hood, and is not
|
||||
/// currently configurable.
|
||||
pub fn read_consistency_interval(
|
||||
mut self,
|
||||
read_consistency_interval: std::time::Duration,
|
||||
@@ -888,7 +886,6 @@ impl ConnectBuilder {
|
||||
options.host_override,
|
||||
self.request.client_config,
|
||||
storage_options.into(),
|
||||
self.request.read_consistency_interval,
|
||||
)?);
|
||||
Ok(Connection {
|
||||
internal,
|
||||
|
||||
@@ -245,9 +245,6 @@ pub struct RestfulLanceDbClient<S: HttpSend = Sender> {
|
||||
pub(crate) sender: S,
|
||||
pub(crate) id_delimiter: String,
|
||||
pub(crate) header_provider: Option<Arc<dyn HeaderProvider>>,
|
||||
/// Connection-level read consistency interval. Drives the
|
||||
/// `x-lancedb-min-timestamp` freshness header sent on read requests.
|
||||
pub(crate) read_consistency_interval: Option<Duration>,
|
||||
}
|
||||
|
||||
impl<S: HttpSend> std::fmt::Debug for RestfulLanceDbClient<S> {
|
||||
@@ -341,7 +338,6 @@ impl RestfulLanceDbClient<Sender> {
|
||||
host_override: Option<String>,
|
||||
default_headers: HeaderMap,
|
||||
client_config: ClientConfig,
|
||||
read_consistency_interval: Option<Duration>,
|
||||
) -> Result<Self> {
|
||||
// Get the timeouts
|
||||
let timeout =
|
||||
@@ -439,7 +435,6 @@ impl RestfulLanceDbClient<Sender> {
|
||||
.clone()
|
||||
.unwrap_or("$".to_string()),
|
||||
header_provider: client_config.header_provider,
|
||||
read_consistency_interval,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -845,16 +840,6 @@ pub mod test_utils {
|
||||
pub fn client_with_handler<T>(
|
||||
handler: impl Fn(reqwest::Request) -> http::response::Response<T> + Send + Sync + 'static,
|
||||
) -> RestfulLanceDbClient<MockSender>
|
||||
where
|
||||
T: Into<reqwest::Body>,
|
||||
{
|
||||
client_with_handler_and_interval(handler, None)
|
||||
}
|
||||
|
||||
pub fn client_with_handler_and_interval<T>(
|
||||
handler: impl Fn(reqwest::Request) -> http::response::Response<T> + Send + Sync + 'static,
|
||||
read_consistency_interval: Option<Duration>,
|
||||
) -> RestfulLanceDbClient<MockSender>
|
||||
where
|
||||
T: Into<reqwest::Body>,
|
||||
{
|
||||
@@ -872,7 +857,6 @@ pub mod test_utils {
|
||||
},
|
||||
id_delimiter: "$".to_string(),
|
||||
header_provider: None,
|
||||
read_consistency_interval,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -897,7 +881,6 @@ pub mod test_utils {
|
||||
},
|
||||
id_delimiter: config.id_delimiter.unwrap_or_else(|| "$".to_string()),
|
||||
header_provider: config.header_provider,
|
||||
read_consistency_interval: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1064,7 +1047,6 @@ mod tests {
|
||||
sender: Sender,
|
||||
id_delimiter: "+".to_string(),
|
||||
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
|
||||
read_consistency_interval: None,
|
||||
};
|
||||
|
||||
// Apply dynamic headers
|
||||
@@ -1100,7 +1082,6 @@ mod tests {
|
||||
sender: Sender,
|
||||
id_delimiter: "+".to_string(),
|
||||
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
|
||||
read_consistency_interval: None,
|
||||
};
|
||||
|
||||
// Apply dynamic headers
|
||||
@@ -1138,7 +1119,6 @@ mod tests {
|
||||
sender: Sender,
|
||||
id_delimiter: "+".to_string(),
|
||||
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
|
||||
read_consistency_interval: None,
|
||||
};
|
||||
|
||||
// Header provider errors should fail the request
|
||||
|
||||
@@ -206,7 +206,6 @@ impl RemoteDatabase {
|
||||
host_override: Option<String>,
|
||||
client_config: ClientConfig,
|
||||
options: RemoteOptions,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
) -> Result<Self> {
|
||||
let parsed = super::client::parse_db_url(uri)?;
|
||||
let header_map = RestfulLanceDbClient::<Sender>::default_headers(
|
||||
@@ -234,7 +233,6 @@ impl RemoteDatabase {
|
||||
host_override,
|
||||
header_map,
|
||||
client_config.clone(),
|
||||
read_consistency_interval,
|
||||
)?;
|
||||
|
||||
let table_cache = Cache::builder()
|
||||
|
||||
@@ -62,67 +62,15 @@ use std::collections::HashMap;
|
||||
use std::io::Cursor;
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
|
||||
const MIN_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-version");
|
||||
const MIN_TIMESTAMP_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-timestamp");
|
||||
const METRIC_TYPE_KEY: &str = "metric_type";
|
||||
const INDEX_TYPE_KEY: &str = "index_type";
|
||||
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
|
||||
const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Per-table state driving the freshness headers (`x-lancedb-min-version` and
|
||||
/// `x-lancedb-min-timestamp`) sent on read requests.
|
||||
///
|
||||
/// `min_version` provides read-your-write within a single handle: writes that
|
||||
/// return a version update it, and reads send it. `checkout_baseline` is the
|
||||
/// wall-clock time captured at the last [`BaseTable::checkout_latest`] call;
|
||||
/// reads send `max(baseline, now - read_consistency_interval)`.
|
||||
#[derive(Debug, Default, Clone, Copy)]
|
||||
struct FreshnessState {
|
||||
min_version: Option<u64>,
|
||||
checkout_baseline: Option<SystemTime>,
|
||||
}
|
||||
|
||||
/// Snapshot of the headers that should be attached to a single read request.
|
||||
#[derive(Debug, Default, Clone, Copy)]
|
||||
struct FreshnessHeaders {
|
||||
min_version: Option<u64>,
|
||||
min_timestamp: Option<SystemTime>,
|
||||
}
|
||||
|
||||
impl FreshnessHeaders {
|
||||
fn apply(self, mut request: RequestBuilder) -> RequestBuilder {
|
||||
if let Some(v) = self.min_version {
|
||||
request = request.header(MIN_VERSION_HEADER, v.to_string());
|
||||
}
|
||||
if let Some(ts) = self.min_timestamp {
|
||||
let dt: chrono::DateTime<chrono::Utc> = ts.into();
|
||||
request = request.header(MIN_TIMESTAMP_HEADER, dt.to_rfc3339());
|
||||
}
|
||||
request
|
||||
}
|
||||
}
|
||||
|
||||
fn compute_min_timestamp(
|
||||
state: &FreshnessState,
|
||||
interval: Option<Duration>,
|
||||
now: SystemTime,
|
||||
) -> Option<SystemTime> {
|
||||
let interval_based = match interval {
|
||||
None => None,
|
||||
Some(d) if d.is_zero() => Some(now),
|
||||
Some(d) => Some(now.checked_sub(d).unwrap_or(now)),
|
||||
};
|
||||
match (interval_based, state.checkout_baseline) {
|
||||
(None, None) => None,
|
||||
(Some(t), None) | (None, Some(t)) => Some(t),
|
||||
(Some(a), Some(b)) => Some(a.max(b)),
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RemoteTags<'a, S: HttpSend = Sender> {
|
||||
inner: &'a RemoteTable<S>,
|
||||
}
|
||||
@@ -132,7 +80,8 @@ impl<S: HttpSend + 'static> Tags for RemoteTags<'_, S> {
|
||||
async fn list(&self) -> Result<HashMap<String, TagContents>> {
|
||||
let request = self
|
||||
.inner
|
||||
.post_read(&format!("/v1/table/{}/tags/list/", self.inner.identifier));
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/tags/list/", self.inner.identifier));
|
||||
let (request_id, response) = self.inner.send(request, true).await?;
|
||||
let response = self
|
||||
.inner
|
||||
@@ -163,13 +112,48 @@ impl<S: HttpSend + 'static> Tags for RemoteTags<'_, S> {
|
||||
}
|
||||
|
||||
async fn get_version(&self, tag: &str) -> Result<u64> {
|
||||
let request = self.inner.post_read(&format!(
|
||||
"/v1/table/{}/tags/version/",
|
||||
self.inner.identifier
|
||||
));
|
||||
self.inner
|
||||
.resolve_tag_version_with_request(tag, request)
|
||||
.await
|
||||
let request = self
|
||||
.inner
|
||||
.client
|
||||
.post(&format!(
|
||||
"/v1/table/{}/tags/version/",
|
||||
self.inner.identifier
|
||||
))
|
||||
.json(&serde_json::json!({ "tag": tag }));
|
||||
|
||||
let (request_id, response) = self.inner.send(request, true).await?;
|
||||
let response = self
|
||||
.inner
|
||||
.check_table_response(&request_id, response)
|
||||
.await?;
|
||||
|
||||
match response.text().await {
|
||||
Ok(body) => {
|
||||
let value: serde_json::Value =
|
||||
serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
source: format!("Failed to parse tag version: {}", e).into(),
|
||||
request_id: request_id.clone(),
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
value
|
||||
.get("version")
|
||||
.and_then(|v| v.as_u64())
|
||||
.ok_or_else(|| Error::Http {
|
||||
source: format!("Invalid tag version response: {}", body).into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
})
|
||||
}
|
||||
Err(err) => {
|
||||
let status_code = err.status();
|
||||
Err(Error::Http {
|
||||
source: Box::new(err),
|
||||
request_id,
|
||||
status_code,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn create(&mut self, tag: &str, version: u64) -> Result<()> {
|
||||
@@ -231,7 +215,6 @@ pub struct RemoteTable<S: HttpSend = Sender> {
|
||||
version: RwLock<Option<u64>>,
|
||||
location: RwLock<Option<String>>,
|
||||
schema_cache: BackgroundCache<SchemaRef, Error>,
|
||||
freshness: Mutex<FreshnessState>,
|
||||
}
|
||||
|
||||
impl<S: HttpSend> std::fmt::Debug for RemoteTable<S> {
|
||||
@@ -260,7 +243,6 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
version: RwLock::new(None),
|
||||
location: RwLock::new(None),
|
||||
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
|
||||
freshness: Mutex::new(FreshnessState::default()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -270,56 +252,12 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
}
|
||||
|
||||
async fn describe_version(&self, version: Option<u64>) -> Result<TableDescription> {
|
||||
let request = self.post_read(&format!("/v1/table/{}/describe/", self.identifier));
|
||||
self.describe_with_request(request, version).await
|
||||
}
|
||||
let mut request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/describe/", self.identifier));
|
||||
|
||||
async fn resolve_tag_version_with_request(
|
||||
&self,
|
||||
tag: &str,
|
||||
request: RequestBuilder,
|
||||
) -> Result<u64> {
|
||||
let request = request.json(&serde_json::json!({ "tag": tag }));
|
||||
|
||||
let (request_id, response) = self.send(request, true).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
|
||||
match response.text().await {
|
||||
Ok(body) => {
|
||||
let value: serde_json::Value =
|
||||
serde_json::from_str(&body).map_err(|e| Error::Http {
|
||||
source: format!("Failed to parse tag version: {}", e).into(),
|
||||
request_id: request_id.clone(),
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
value
|
||||
.get("version")
|
||||
.and_then(|v| v.as_u64())
|
||||
.ok_or_else(|| Error::Http {
|
||||
source: format!("Invalid tag version response: {}", body).into(),
|
||||
request_id,
|
||||
status_code: None,
|
||||
})
|
||||
}
|
||||
Err(err) => {
|
||||
let status_code = err.status();
|
||||
Err(Error::Http {
|
||||
source: Box::new(err),
|
||||
request_id,
|
||||
status_code,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn describe_with_request(
|
||||
&self,
|
||||
request: RequestBuilder,
|
||||
version: Option<u64>,
|
||||
) -> Result<TableDescription> {
|
||||
let body = serde_json::json!({ "version": version });
|
||||
let request = request.json(&body);
|
||||
request = request.json(&body);
|
||||
|
||||
let (request_id, response) = self.send(request, true).await?;
|
||||
|
||||
@@ -773,44 +711,14 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
*read_guard
|
||||
}
|
||||
|
||||
/// Snapshot the freshness headers to attach to a single read request.
|
||||
/// Computed at call time so that retries reuse the same snapshot.
|
||||
fn snapshot_freshness_headers(&self) -> FreshnessHeaders {
|
||||
let state = *self.freshness.lock().unwrap();
|
||||
FreshnessHeaders {
|
||||
min_version: state.min_version,
|
||||
min_timestamp: compute_min_timestamp(
|
||||
&state,
|
||||
self.client.read_consistency_interval,
|
||||
SystemTime::now(),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// Build a POST request and attach the read-freshness headers
|
||||
/// (`x-lancedb-min-version`, `x-lancedb-min-timestamp`).
|
||||
fn post_read(&self, uri: &str) -> RequestBuilder {
|
||||
self.snapshot_freshness_headers()
|
||||
.apply(self.client.post(uri))
|
||||
}
|
||||
|
||||
/// Record a version returned by a write so subsequent reads can request at
|
||||
/// least that version via `x-lancedb-min-version`. A returned `0` from a
|
||||
/// backward-compatible old server is ignored.
|
||||
fn track_write_version(&self, version: u64) {
|
||||
if version == 0 {
|
||||
return;
|
||||
}
|
||||
let mut state = self.freshness.lock().unwrap();
|
||||
state.min_version = Some(state.min_version.map_or(version, |v| v.max(version)));
|
||||
}
|
||||
|
||||
async fn execute_query(
|
||||
&self,
|
||||
query: &AnyQuery,
|
||||
options: &QueryExecutionOptions,
|
||||
) -> Result<Vec<Pin<Box<dyn RecordBatchStream + Send>>>> {
|
||||
let mut request = self.post_read(&format!("/v1/table/{}/query/", self.identifier));
|
||||
let mut request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/query/", self.identifier));
|
||||
|
||||
if let Some(timeout) = options.timeout {
|
||||
// Also send to server, so it can abort the query if it takes too long.
|
||||
@@ -916,10 +824,9 @@ async fn fetch_schema<S: HttpSend>(
|
||||
identifier: &str,
|
||||
table_name: &str,
|
||||
version: Option<u64>,
|
||||
freshness_headers: FreshnessHeaders,
|
||||
) -> Result<SchemaRef> {
|
||||
let request = freshness_headers
|
||||
.apply(client.post(&format!("/v1/table/{}/describe/", identifier)))
|
||||
let request = client
|
||||
.post(&format!("/v1/table/{}/describe/", identifier))
|
||||
.json(&serde_json::json!({ "version": version }));
|
||||
|
||||
let (request_id, response) = client.send_with_retry(request, None, true).await?;
|
||||
@@ -967,9 +874,7 @@ mod test_utils {
|
||||
use super::*;
|
||||
use crate::remote::ClientConfig;
|
||||
use crate::remote::client::test_utils::client_with_handler;
|
||||
use crate::remote::client::test_utils::{
|
||||
MockSender, client_with_handler_and_config, client_with_handler_and_interval,
|
||||
};
|
||||
use crate::remote::client::test_utils::{MockSender, client_with_handler_and_config};
|
||||
|
||||
impl RemoteTable<MockSender> {
|
||||
pub fn new_mock<F, T>(name: String, handler: F, version: Option<semver::Version>) -> Self
|
||||
@@ -987,30 +892,6 @@ mod test_utils {
|
||||
version: RwLock::new(None),
|
||||
location: RwLock::new(None),
|
||||
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
|
||||
freshness: Mutex::new(FreshnessState::default()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_mock_with_consistency_interval<F, T>(
|
||||
name: String,
|
||||
handler: F,
|
||||
read_consistency_interval: Option<Duration>,
|
||||
) -> Self
|
||||
where
|
||||
F: Fn(reqwest::Request) -> http::Response<T> + Send + Sync + 'static,
|
||||
T: Into<reqwest::Body>,
|
||||
{
|
||||
let client = client_with_handler_and_interval(handler, read_consistency_interval);
|
||||
Self {
|
||||
client,
|
||||
name: name.clone(),
|
||||
namespace: vec![],
|
||||
identifier: name,
|
||||
server_version: ServerVersion::default(),
|
||||
version: RwLock::new(None),
|
||||
location: RwLock::new(None),
|
||||
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
|
||||
freshness: Mutex::new(FreshnessState::default()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1042,7 +923,6 @@ mod test_utils {
|
||||
version: RwLock::new(None),
|
||||
location: RwLock::new(None),
|
||||
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
|
||||
freshness: Mutex::new(FreshnessState::default()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1106,7 +986,6 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
|
||||
if output.overwrite {
|
||||
self.invalidate_schema_cache();
|
||||
}
|
||||
self.track_write_version(add_result.version);
|
||||
|
||||
return Ok(add_result);
|
||||
}
|
||||
@@ -1144,7 +1023,6 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
|
||||
if output.overwrite {
|
||||
self.invalidate_schema_cache();
|
||||
}
|
||||
self.track_write_version(result.version);
|
||||
return Ok(result);
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -1261,13 +1139,8 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
self.describe().await.map(|desc| desc.version)
|
||||
}
|
||||
async fn checkout(&self, version: u64) -> Result<()> {
|
||||
// Validate the version exists. The describe is sent without freshness
|
||||
// headers so a stale `min_version` from a previous write doesn't ride
|
||||
// along on an explicit time-travel request.
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/describe/", self.identifier));
|
||||
self.describe_with_request(request, Some(version))
|
||||
// check that the version exists
|
||||
self.describe_version(Some(version))
|
||||
.await
|
||||
.map_err(|e| match e {
|
||||
// try to map the error to a more user-friendly error telling them
|
||||
@@ -1283,10 +1156,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
*write_guard = Some(version);
|
||||
drop(write_guard);
|
||||
|
||||
// Explicit time-travel: drop any read-your-write / freshness
|
||||
// constraints so the user sees exactly the requested version.
|
||||
*self.freshness.lock().unwrap() = FreshnessState::default();
|
||||
|
||||
// Invalidate schema cache since we're switching versions
|
||||
self.invalidate_schema_cache();
|
||||
|
||||
@@ -1297,13 +1166,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
*write_guard = None;
|
||||
drop(write_guard);
|
||||
|
||||
// Drop any per-handle write tracking; subsequent reads use the
|
||||
// baseline timestamp captured now to guarantee freshness.
|
||||
*self.freshness.lock().unwrap() = FreshnessState {
|
||||
min_version: None,
|
||||
checkout_baseline: Some(SystemTime::now()),
|
||||
};
|
||||
|
||||
// Invalidate schema cache since we're switching versions
|
||||
self.invalidate_schema_cache();
|
||||
|
||||
@@ -1324,7 +1186,9 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
}
|
||||
|
||||
async fn list_versions(&self) -> Result<Vec<Version>> {
|
||||
let request = self.post_read(&format!("/v1/table/{}/version/list/", self.identifier));
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/version/list/", self.identifier));
|
||||
let (request_id, response) = self.send(request, true).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
|
||||
@@ -1357,25 +1221,19 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
let client = self.client.clone();
|
||||
let identifier = self.identifier.clone();
|
||||
let table_name = self.name.clone();
|
||||
let freshness_headers = self.snapshot_freshness_headers();
|
||||
|
||||
self.schema_cache
|
||||
.get(move || async move {
|
||||
fetch_schema(
|
||||
&client,
|
||||
&identifier,
|
||||
&table_name,
|
||||
version,
|
||||
freshness_headers,
|
||||
)
|
||||
.await
|
||||
fetch_schema(&client, &identifier, &table_name, version).await
|
||||
})
|
||||
.await
|
||||
.map_err(unwrap_shared_error)
|
||||
}
|
||||
|
||||
async fn count_rows(&self, filter: Option<Filter>) -> Result<usize> {
|
||||
let mut request = self.post_read(&format!("/v1/table/{}/count_rows/", self.identifier));
|
||||
let mut request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/count_rows/", self.identifier));
|
||||
|
||||
let version = self.current_version().await;
|
||||
|
||||
@@ -1501,7 +1359,9 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
}
|
||||
|
||||
async fn explain_plan(&self, query: &AnyQuery, verbose: bool) -> Result<String> {
|
||||
let base_request = self.post_read(&format!("/v1/table/{}/explain_plan/", self.identifier));
|
||||
let base_request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/explain_plan/", self.identifier));
|
||||
|
||||
let query_bodies = self.prepare_query_bodies(query).await?;
|
||||
let requests: Vec<reqwest::RequestBuilder> = query_bodies
|
||||
@@ -1548,7 +1408,9 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
query: &AnyQuery,
|
||||
_options: QueryExecutionOptions,
|
||||
) -> Result<String> {
|
||||
let request = self.post_read(&format!("/v1/table/{}/analyze_plan/", self.identifier));
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/analyze_plan/", self.identifier));
|
||||
|
||||
let query_bodies = self.prepare_query_bodies(query).await?;
|
||||
let requests: Vec<reqwest::RequestBuilder> = query_bodies
|
||||
@@ -1618,7 +1480,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
self.track_write_version(update_response.version);
|
||||
Ok(update_response)
|
||||
}
|
||||
|
||||
@@ -1645,7 +1506,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
request_id,
|
||||
status_code: None,
|
||||
})?;
|
||||
self.track_write_version(delete_response.version);
|
||||
Ok(delete_response)
|
||||
}
|
||||
|
||||
@@ -1802,7 +1662,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
self.track_write_version(merge_insert_response.version);
|
||||
Ok(merge_insert_response)
|
||||
}
|
||||
|
||||
@@ -1828,22 +1687,12 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
Ok(Box::new(RemoteTags { inner: self }))
|
||||
}
|
||||
async fn checkout_tag(&self, tag: &str) -> Result<()> {
|
||||
// Resolve the tag without attaching freshness headers; a stale
|
||||
// `min_version` from a previous write should not ride along on an
|
||||
// explicit time-travel request.
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/tags/version/", self.identifier));
|
||||
let version = self.resolve_tag_version_with_request(tag, request).await?;
|
||||
|
||||
let tags = self.tags().await?;
|
||||
let version = tags.get_version(tag).await?;
|
||||
let mut write_guard = self.version.write().await;
|
||||
*write_guard = Some(version);
|
||||
drop(write_guard);
|
||||
|
||||
// Explicit time-travel: drop any read-your-write / freshness
|
||||
// constraints so the user sees exactly the tagged version.
|
||||
*self.freshness.lock().unwrap() = FreshnessState::default();
|
||||
|
||||
// Invalidate schema cache since we're switching versions
|
||||
self.invalidate_schema_cache();
|
||||
|
||||
@@ -1894,7 +1743,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
})?;
|
||||
|
||||
self.invalidate_schema_cache();
|
||||
self.track_write_version(result.version);
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
@@ -1949,7 +1797,6 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
})?;
|
||||
|
||||
self.invalidate_schema_cache();
|
||||
self.track_write_version(result.version);
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
@@ -1977,14 +1824,15 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
})?;
|
||||
|
||||
self.invalidate_schema_cache();
|
||||
self.track_write_version(result.version);
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
|
||||
// Make request to list the indices
|
||||
let mut request = self.post_read(&format!("/v1/table/{}/index/list/", self.identifier));
|
||||
let mut request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/index/list/", self.identifier));
|
||||
let version = self.current_version().await;
|
||||
let body = serde_json::json!({ "version": version });
|
||||
request = request.json(&body);
|
||||
@@ -2048,7 +1896,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
}
|
||||
|
||||
async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>> {
|
||||
let mut request = self.post_read(&format!(
|
||||
let mut request = self.client.post(&format!(
|
||||
"/v1/table/{}/index/{}/stats/",
|
||||
self.identifier, index_name
|
||||
));
|
||||
@@ -2160,7 +2008,9 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
}
|
||||
|
||||
async fn stats(&self) -> Result<TableStatistics> {
|
||||
let request = self.post_read(&format!("/v1/table/{}/stats/", self.identifier));
|
||||
let request = self
|
||||
.client
|
||||
.post(&format!("/v1/table/{}/stats/", self.identifier));
|
||||
let (request_id, response) = self.send(request, true).await?;
|
||||
let response = self.check_table_response(&request_id, response).await?;
|
||||
let body = response.text().await.err_to_http(request_id.clone())?;
|
||||
@@ -6124,299 +5974,4 @@ mod tests {
|
||||
assert_eq!(create_count.load(Ordering::SeqCst), 2);
|
||||
assert_eq!(abort_count.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
// ---- Read freshness header tests ------------------------------------
|
||||
|
||||
#[test]
|
||||
fn test_compute_min_timestamp_combines_baseline_and_interval() {
|
||||
let now = SystemTime::now();
|
||||
let baseline = now - Duration::from_secs(60);
|
||||
|
||||
// No interval, no baseline -> no header.
|
||||
assert_eq!(
|
||||
compute_min_timestamp(&FreshnessState::default(), None, now),
|
||||
None
|
||||
);
|
||||
|
||||
// Baseline only -> baseline.
|
||||
let state = FreshnessState {
|
||||
min_version: None,
|
||||
checkout_baseline: Some(baseline),
|
||||
};
|
||||
assert_eq!(compute_min_timestamp(&state, None, now), Some(baseline));
|
||||
|
||||
// ZERO interval, no baseline -> now.
|
||||
assert_eq!(
|
||||
compute_min_timestamp(&FreshnessState::default(), Some(Duration::ZERO), now),
|
||||
Some(now)
|
||||
);
|
||||
|
||||
// Positive interval, no baseline -> now - interval.
|
||||
assert_eq!(
|
||||
compute_min_timestamp(
|
||||
&FreshnessState::default(),
|
||||
Some(Duration::from_secs(10)),
|
||||
now
|
||||
),
|
||||
Some(now - Duration::from_secs(10))
|
||||
);
|
||||
|
||||
// Both: pick the more-recent (i.e. tighter) constraint.
|
||||
// baseline = now-60, now-interval = now-10. now-10 is newer.
|
||||
let state = FreshnessState {
|
||||
min_version: None,
|
||||
checkout_baseline: Some(baseline),
|
||||
};
|
||||
assert_eq!(
|
||||
compute_min_timestamp(&state, Some(Duration::from_secs(10)), now),
|
||||
Some(now - Duration::from_secs(10))
|
||||
);
|
||||
|
||||
// Both, baseline newer: pick baseline.
|
||||
let recent_baseline = now - Duration::from_secs(5);
|
||||
let state = FreshnessState {
|
||||
min_version: None,
|
||||
checkout_baseline: Some(recent_baseline),
|
||||
};
|
||||
assert_eq!(
|
||||
compute_min_timestamp(&state, Some(Duration::from_secs(60)), now),
|
||||
Some(recent_baseline)
|
||||
);
|
||||
}
|
||||
|
||||
/// Allowed slop when comparing a header timestamp against a locally
|
||||
/// captured wall-clock bound. Tests run fast enough that 1s is plenty.
|
||||
const FRESHNESS_TOLERANCE: Duration = Duration::from_secs(1);
|
||||
|
||||
fn capturing_handler<F>(
|
||||
body_for: F,
|
||||
) -> (
|
||||
impl Fn(reqwest::Request) -> http::Response<String> + Clone + Send + Sync + 'static,
|
||||
Arc<std::sync::Mutex<Option<http::HeaderMap>>>,
|
||||
)
|
||||
where
|
||||
F: Fn(&str) -> String + Clone + Send + Sync + 'static,
|
||||
{
|
||||
let captured = Arc::new(std::sync::Mutex::new(None));
|
||||
let captured_c = captured.clone();
|
||||
let handler = move |request: reqwest::Request| {
|
||||
*captured_c.lock().unwrap() = Some(request.headers().clone());
|
||||
let path = request.url().path().to_string();
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(body_for(&path))
|
||||
.unwrap()
|
||||
};
|
||||
(handler, captured)
|
||||
}
|
||||
|
||||
fn parse_min_timestamp(headers: &http::HeaderMap) -> SystemTime {
|
||||
let value = headers
|
||||
.get("x-lancedb-min-timestamp")
|
||||
.expect("expected x-lancedb-min-timestamp header")
|
||||
.to_str()
|
||||
.unwrap();
|
||||
chrono::DateTime::parse_from_rfc3339(value)
|
||||
.unwrap()
|
||||
.with_timezone(&chrono::Utc)
|
||||
.into()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_default_sends_no_headers() {
|
||||
let (handler, captured) = capturing_handler(|_| "42".to_string());
|
||||
let table = Table::new_with_handler("my_table", handler);
|
||||
|
||||
let _ = table.count_rows(None).await.unwrap();
|
||||
|
||||
let headers = captured.lock().unwrap().clone().unwrap();
|
||||
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
|
||||
assert!(!headers.contains_key("x-lancedb-min-version"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_zero_interval_sends_now() {
|
||||
let (handler, captured) = capturing_handler(|_| "42".to_string());
|
||||
let table =
|
||||
Table::new_with_handler_and_interval("my_table", handler, Some(Duration::from_secs(0)));
|
||||
|
||||
let before = SystemTime::now();
|
||||
table.count_rows(None).await.unwrap();
|
||||
let after = SystemTime::now();
|
||||
|
||||
let headers = captured.lock().unwrap().clone().unwrap();
|
||||
let sent = parse_min_timestamp(&headers);
|
||||
assert!(
|
||||
sent >= before - FRESHNESS_TOLERANCE && sent <= after + FRESHNESS_TOLERANCE,
|
||||
"expected timestamp roughly equal to wall clock"
|
||||
);
|
||||
assert!(!headers.contains_key("x-lancedb-min-version"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_positive_interval_sends_now_minus_interval() {
|
||||
let (handler, captured) = capturing_handler(|_| "42".to_string());
|
||||
let interval = Duration::from_secs(30);
|
||||
let table = Table::new_with_handler_and_interval("my_table", handler, Some(interval));
|
||||
|
||||
let before = SystemTime::now();
|
||||
table.count_rows(None).await.unwrap();
|
||||
let after = SystemTime::now();
|
||||
|
||||
let headers = captured.lock().unwrap().clone().unwrap();
|
||||
let sent = parse_min_timestamp(&headers);
|
||||
assert!(
|
||||
sent >= before - interval - FRESHNESS_TOLERANCE
|
||||
&& sent <= after - interval + FRESHNESS_TOLERANCE,
|
||||
"expected timestamp roughly equal to now - interval"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_checkout_latest_sets_baseline() {
|
||||
let (handler, captured) = capturing_handler(|path| match path {
|
||||
"/v1/table/my_table/count_rows/" => "42".to_string(),
|
||||
_ => panic!("unexpected path: {}", path),
|
||||
});
|
||||
// No interval — only the baseline should drive the timestamp.
|
||||
let table = Table::new_with_handler_and_interval("my_table", handler, None);
|
||||
|
||||
let before_checkout = SystemTime::now();
|
||||
table.checkout_latest().await.unwrap();
|
||||
let after_checkout = SystemTime::now();
|
||||
|
||||
table.count_rows(None).await.unwrap();
|
||||
|
||||
let headers = captured.lock().unwrap().clone().unwrap();
|
||||
let sent = parse_min_timestamp(&headers);
|
||||
assert!(
|
||||
sent >= before_checkout - FRESHNESS_TOLERANCE
|
||||
&& sent <= after_checkout + FRESHNESS_TOLERANCE,
|
||||
"expected timestamp captured at checkout_latest() time"
|
||||
);
|
||||
assert!(!headers.contains_key("x-lancedb-min-version"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_min_version_tracked_after_write() {
|
||||
let (handler, captured) = capturing_handler(|path| match path {
|
||||
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
||||
"/v1/table/my_table/count_rows/" => "42".to_string(),
|
||||
_ => panic!("unexpected path: {}", path),
|
||||
});
|
||||
let table = Table::new_with_handler("my_table", handler);
|
||||
|
||||
let _ = table.update().column("a", "a + 1").execute().await.unwrap();
|
||||
// Update headers also pass through captured; reset by reading after.
|
||||
table.count_rows(None).await.unwrap();
|
||||
|
||||
let headers = captured.lock().unwrap().clone().unwrap();
|
||||
assert_eq!(
|
||||
headers
|
||||
.get("x-lancedb-min-version")
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap(),
|
||||
"7"
|
||||
);
|
||||
}
|
||||
|
||||
/// Like `capturing_handler`, but keeps a per-path snapshot of the headers
|
||||
/// from every request so tests can assert on a specific endpoint.
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn path_capturing_handler<F>(
|
||||
body_for: F,
|
||||
) -> (
|
||||
impl Fn(reqwest::Request) -> http::Response<String> + Clone + Send + Sync + 'static,
|
||||
Arc<std::sync::Mutex<HashMap<String, http::HeaderMap>>>,
|
||||
)
|
||||
where
|
||||
F: Fn(&str) -> String + Clone + Send + Sync + 'static,
|
||||
{
|
||||
let captured: Arc<std::sync::Mutex<HashMap<String, http::HeaderMap>>> =
|
||||
Arc::new(std::sync::Mutex::new(HashMap::new()));
|
||||
let captured_c = captured.clone();
|
||||
let handler = move |request: reqwest::Request| {
|
||||
let path = request.url().path().to_string();
|
||||
captured_c
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(path.clone(), request.headers().clone());
|
||||
http::Response::builder()
|
||||
.status(200)
|
||||
.body(body_for(&path))
|
||||
.unwrap()
|
||||
};
|
||||
(handler, captured)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_checkout_validation_sends_no_min_version() {
|
||||
// After a write bumps min_version, calling checkout(v) must not let
|
||||
// that stale header ride along on the validating /describe/ request.
|
||||
let (handler, captured) = path_capturing_handler(|path| match path {
|
||||
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
||||
"/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(),
|
||||
_ => panic!("unexpected path: {}", path),
|
||||
});
|
||||
let table = Table::new_with_handler("my_table", handler);
|
||||
|
||||
table.update().column("a", "a + 1").execute().await.unwrap();
|
||||
table.checkout(5).await.unwrap();
|
||||
|
||||
let captured = captured.lock().unwrap();
|
||||
let describe_headers = captured
|
||||
.get("/v1/table/my_table/describe/")
|
||||
.expect("describe should have been called by checkout(v)");
|
||||
assert!(
|
||||
!describe_headers.contains_key("x-lancedb-min-version"),
|
||||
"checkout(v) describe must not carry stale min_version",
|
||||
);
|
||||
assert!(!describe_headers.contains_key("x-lancedb-min-timestamp"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_checkout_tag_resolve_sends_no_min_version() {
|
||||
// Same invariant for checkout_tag: the tag-resolve request must not
|
||||
// pick up a stale min_version from a prior write.
|
||||
let (handler, captured) = path_capturing_handler(|path| match path {
|
||||
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
||||
"/v1/table/my_table/tags/version/" => r#"{"version":5}"#.to_string(),
|
||||
_ => panic!("unexpected path: {}", path),
|
||||
});
|
||||
let table = Table::new_with_handler("my_table", handler);
|
||||
|
||||
table.update().column("a", "a + 1").execute().await.unwrap();
|
||||
table.checkout_tag("v_initial").await.unwrap();
|
||||
|
||||
let captured = captured.lock().unwrap();
|
||||
let resolve_headers = captured
|
||||
.get("/v1/table/my_table/tags/version/")
|
||||
.expect("tags/version should have been called by checkout_tag");
|
||||
assert!(
|
||||
!resolve_headers.contains_key("x-lancedb-min-version"),
|
||||
"checkout_tag resolve must not carry stale min_version",
|
||||
);
|
||||
assert!(!resolve_headers.contains_key("x-lancedb-min-timestamp"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_checkout_clears_min_version() {
|
||||
let (handler, captured) = capturing_handler(|path| match path {
|
||||
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
||||
// checkout(5) needs to describe version 5 first
|
||||
"/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(),
|
||||
"/v1/table/my_table/count_rows/" => "42".to_string(),
|
||||
_ => panic!("unexpected path: {}", path),
|
||||
});
|
||||
let table = Table::new_with_handler("my_table", handler);
|
||||
|
||||
table.update().column("a", "a + 1").execute().await.unwrap();
|
||||
table.checkout(5).await.unwrap();
|
||||
table.count_rows(None).await.unwrap();
|
||||
|
||||
let headers = captured.lock().unwrap().clone().unwrap();
|
||||
assert!(!headers.contains_key("x-lancedb-min-version"));
|
||||
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -656,30 +656,6 @@ mod test_utils {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_handler_and_interval<T>(
|
||||
name: impl Into<String>,
|
||||
handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
|
||||
read_consistency_interval: Option<std::time::Duration>,
|
||||
) -> Self
|
||||
where
|
||||
T: Into<reqwest::Body>,
|
||||
{
|
||||
let inner = Arc::new(
|
||||
crate::remote::table::RemoteTable::new_mock_with_consistency_interval(
|
||||
name.into(),
|
||||
handler.clone(),
|
||||
read_consistency_interval,
|
||||
),
|
||||
);
|
||||
let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
|
||||
Self {
|
||||
inner,
|
||||
database: Some(database),
|
||||
// Registry is unused.
|
||||
embedding_registry: Arc::new(MemoryRegistry::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_with_handler_version<T>(
|
||||
name: impl Into<String>,
|
||||
version: semver::Version,
|
||||
|
||||
Reference in New Issue
Block a user