From 15e75804c4cbff2c34df2a622c7a29e8d64bd7cb Mon Sep 17 00:00:00 2001 From: Brendan Clement Date: Tue, 26 May 2026 13:38:07 -0700 Subject: [PATCH] feat(remote): send read freshness headers for remote table consistency (#3439) Closes client side work of #3370 ### Summary - Plumbs `read_consistency_interval` from `ConnectBuilder` through `RestfulLanceDbClient` so remote reads attach an `x-lancedb-min-timestamp` freshness header. None = no header (default), zero = "now", positive = `now - interval`. - Adds per-table `FreshnessState` on `RemoteTable`: write responses (`update`, `delete`, `merge_insert`, `add_columns`, `alter_columns`, `drop_columns`) track the committed version, and the next read sends `x-lancedb-min-version` so the server's cache honors read-your-write. - `checkout(v)` / `checkout_tag(t)` / `checkout_latest()` / `restore()` reset the freshness state appropriately; the validating `/describe/` and tag-resolve requests are sent without freshness headers so they don't carry stale state. - Updates Rust, Python, and Node docstrings and calls out that stronger consistency raises per-read latency and cost. ### Testing - Unit tests cover default behavior, interval=0, positive interval, checkout_latest baseline, min_version-after-write, checkout clears state, and the two no-stale-header invariants on `checkout(v)` and `checkout_tag(t)`. - Ran smoke tests against local remote table to verify functionality --- docs/src/js/interfaces/ConnectionOptions.md | 20 +- nodejs/src/lib.rs | 20 +- python/python/lancedb/__init__.py | 11 +- python/python/lancedb/remote/db.py | 2 + rust/lancedb/src/connection.rs | 11 +- rust/lancedb/src/remote/client.rs | 20 + rust/lancedb/src/remote/db.rs | 2 + rust/lancedb/src/remote/table.rs | 612 +++++++++++++++++--- rust/lancedb/src/table.rs | 24 + 9 files changed, 621 insertions(+), 101 deletions(-) diff --git a/docs/src/js/interfaces/ConnectionOptions.md b/docs/src/js/interfaces/ConnectionOptions.md index 1ad0e127a..de2083a9b 100644 --- a/docs/src/js/interfaces/ConnectionOptions.md +++ b/docs/src/js/interfaces/ConnectionOptions.md @@ -70,16 +70,20 @@ client used by manifest-enabled native connections. optional readConsistencyInterval: number; ``` -(For LanceDB OSS only): The interval, in seconds, at which to check for -updates to the table from other processes. If None, then consistency is not -checked. For performance reasons, this is the default. For strong -consistency, set this to zero seconds. Then every read will check for -updates from other processes. As a compromise, you can set this to a -non-zero value for eventual consistency. If more than that interval -has passed since the last check, then the table will be checked for updates. -Note: this consistency only applies to read operations. Write operations are +The interval, in seconds, at which to check for updates to the table +from other processes. If None, then consistency is not checked. For +performance reasons, this is the default. For strong consistency, set +this to zero seconds. Then every read will check for updates from other +processes. As a compromise, you can set this to a non-zero value for +eventual consistency. If more than that interval has passed since the +last check, then the table will be checked for updates. Note: this +consistency only applies to read operations. Write operations are always consistent. +Stronger consistency is not free. The smaller the interval, the more +often each read pays the cost of checking for updates against object +storage, raising per-read latency and cost. + *** ### region? diff --git a/nodejs/src/lib.rs b/nodejs/src/lib.rs index f241fb81f..53c630c93 100644 --- a/nodejs/src/lib.rs +++ b/nodejs/src/lib.rs @@ -24,15 +24,19 @@ mod util; #[napi(object)] #[derive(Debug)] pub struct ConnectionOptions { - /// (For LanceDB OSS only): The interval, in seconds, at which to check for - /// updates to the table from other processes. If None, then consistency is not - /// checked. For performance reasons, this is the default. For strong - /// consistency, set this to zero seconds. Then every read will check for - /// updates from other processes. As a compromise, you can set this to a - /// non-zero value for eventual consistency. If more than that interval - /// has passed since the last check, then the table will be checked for updates. - /// Note: this consistency only applies to read operations. Write operations are + /// The interval, in seconds, at which to check for updates to the table + /// from other processes. If None, then consistency is not checked. For + /// performance reasons, this is the default. For strong consistency, set + /// this to zero seconds. Then every read will check for updates from other + /// processes. As a compromise, you can set this to a non-zero value for + /// eventual consistency. If more than that interval has passed since the + /// last check, then the table will be checked for updates. Note: this + /// consistency only applies to read operations. Write operations are /// always consistent. + /// + /// Stronger consistency is not free. The smaller the interval, the more + /// often each read pays the cost of checking for updates against object + /// storage, raising per-read latency and cost. pub read_consistency_interval: Option, /// (For LanceDB OSS only): configuration for object storage. /// diff --git a/python/python/lancedb/__init__.py b/python/python/lancedb/__init__.py index 9ddecc6cf..c169633c4 100644 --- a/python/python/lancedb/__init__.py +++ b/python/python/lancedb/__init__.py @@ -94,7 +94,6 @@ def connect( host_override: str, optional The override url for LanceDB Cloud. read_consistency_interval: timedelta, default None - (For LanceDB OSS only) The interval at which to check for updates to the table from other processes. If None, then consistency is not checked. For performance reasons, this is the default. For strong consistency, set this to @@ -104,6 +103,10 @@ def connect( the last check, then the table will be checked for updates. Note: this consistency only applies to read operations. Write operations are always consistent. + + Stronger consistency is not free. The smaller the interval, the more + often each read pays the cost of checking for updates against object + storage, raising per-read latency and cost. client_config: ClientConfig or dict, optional Configuration options for the LanceDB Cloud HTTP client. If a dict, then the keys are the attributes of the ClientConfig class. If None, then the @@ -217,6 +220,7 @@ def connect( request_thread_pool=request_thread_pool, client_config=client_config, storage_options=storage_options, + read_consistency_interval=read_consistency_interval, **kwargs, ) _check_s3_bucket_with_dots(str(uri), storage_options) @@ -343,7 +347,6 @@ async def connect_async( host_override: str, optional The override url for LanceDB Cloud. read_consistency_interval: timedelta, default None - (For LanceDB OSS only) The interval at which to check for updates to the table from other processes. If None, then consistency is not checked. For performance reasons, this is the default. For strong consistency, set this to @@ -353,6 +356,10 @@ async def connect_async( the last check, then the table will be checked for updates. Note: this consistency only applies to read operations. Write operations are always consistent. + + Stronger consistency is not free. The smaller the interval, the more + often each read pays the cost of checking for updates against object + storage, raising per-read latency and cost. client_config: ClientConfig or dict, optional Configuration options for the LanceDB Cloud HTTP client. If a dict, then the keys are the attributes of the ClientConfig class. If None, then the diff --git a/python/python/lancedb/remote/db.py b/python/python/lancedb/remote/db.py index e110cdac1..e75a3964c 100644 --- a/python/python/lancedb/remote/db.py +++ b/python/python/lancedb/remote/db.py @@ -50,6 +50,7 @@ class RemoteDBConnection(DBConnection): connection_timeout: Optional[float] = None, read_timeout: Optional[float] = None, storage_options: Optional[Dict[str, str]] = None, + read_consistency_interval: Optional[timedelta] = None, ): """Connect to a remote LanceDB database.""" if isinstance(client_config, dict): @@ -103,6 +104,7 @@ class RemoteDBConnection(DBConnection): host_override=host_override, client_config=client_config, storage_options=storage_options, + read_consistency_interval=read_consistency_interval, ) ) diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index 8034c2a53..c1d475c9c 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -812,8 +812,7 @@ impl ConnectBuilder { self } - /// The interval at which to check for updates from other processes. This - /// only affects LanceDB OSS. + /// The interval at which to check for updates from other processes. /// /// If left unset, consistency is not checked. For maximum read /// performance, this is the default. For strong consistency, set this to @@ -825,8 +824,11 @@ impl ConnectBuilder { /// This only affects read operations. Write operations are always /// consistent. /// - /// LanceDB Cloud uses eventual consistency under the hood, and is not - /// currently configurable. + /// # Cost + /// + /// Stronger consistency is not free. The smaller the interval, the more + /// often each read pays the cost of checking for updates against object + /// storage, raising per-read latency and cost. pub fn read_consistency_interval( mut self, read_consistency_interval: std::time::Duration, @@ -886,6 +888,7 @@ impl ConnectBuilder { options.host_override, self.request.client_config, storage_options.into(), + self.request.read_consistency_interval, )?); Ok(Connection { internal, diff --git a/rust/lancedb/src/remote/client.rs b/rust/lancedb/src/remote/client.rs index 9197ba74a..378698ca6 100644 --- a/rust/lancedb/src/remote/client.rs +++ b/rust/lancedb/src/remote/client.rs @@ -245,6 +245,9 @@ pub struct RestfulLanceDbClient { pub(crate) sender: S, pub(crate) id_delimiter: String, pub(crate) header_provider: Option>, + /// Connection-level read consistency interval. Drives the + /// `x-lancedb-min-timestamp` freshness header sent on read requests. + pub(crate) read_consistency_interval: Option, } impl std::fmt::Debug for RestfulLanceDbClient { @@ -338,6 +341,7 @@ impl RestfulLanceDbClient { host_override: Option, default_headers: HeaderMap, client_config: ClientConfig, + read_consistency_interval: Option, ) -> Result { // Get the timeouts let timeout = @@ -435,6 +439,7 @@ impl RestfulLanceDbClient { .clone() .unwrap_or("$".to_string()), header_provider: client_config.header_provider, + read_consistency_interval, }) } } @@ -840,6 +845,16 @@ pub mod test_utils { pub fn client_with_handler( handler: impl Fn(reqwest::Request) -> http::response::Response + Send + Sync + 'static, ) -> RestfulLanceDbClient + where + T: Into, + { + client_with_handler_and_interval(handler, None) + } + + pub fn client_with_handler_and_interval( + handler: impl Fn(reqwest::Request) -> http::response::Response + Send + Sync + 'static, + read_consistency_interval: Option, + ) -> RestfulLanceDbClient where T: Into, { @@ -857,6 +872,7 @@ pub mod test_utils { }, id_delimiter: "$".to_string(), header_provider: None, + read_consistency_interval, } } @@ -881,6 +897,7 @@ pub mod test_utils { }, id_delimiter: config.id_delimiter.unwrap_or_else(|| "$".to_string()), header_provider: config.header_provider, + read_consistency_interval: None, } } } @@ -1047,6 +1064,7 @@ mod tests { sender: Sender, id_delimiter: "+".to_string(), header_provider: Some(Arc::new(provider) as Arc), + read_consistency_interval: None, }; // Apply dynamic headers @@ -1082,6 +1100,7 @@ mod tests { sender: Sender, id_delimiter: "+".to_string(), header_provider: Some(Arc::new(provider) as Arc), + read_consistency_interval: None, }; // Apply dynamic headers @@ -1119,6 +1138,7 @@ mod tests { sender: Sender, id_delimiter: "+".to_string(), header_provider: Some(Arc::new(provider) as Arc), + read_consistency_interval: None, }; // Header provider errors should fail the request diff --git a/rust/lancedb/src/remote/db.rs b/rust/lancedb/src/remote/db.rs index dfe5d0c99..c11584157 100644 --- a/rust/lancedb/src/remote/db.rs +++ b/rust/lancedb/src/remote/db.rs @@ -206,6 +206,7 @@ impl RemoteDatabase { host_override: Option, client_config: ClientConfig, options: RemoteOptions, + read_consistency_interval: Option, ) -> Result { let parsed = super::client::parse_db_url(uri)?; let header_map = RestfulLanceDbClient::::default_headers( @@ -233,6 +234,7 @@ impl RemoteDatabase { host_override, header_map, client_config.clone(), + read_consistency_interval, )?; let table_cache = Cache::builder() diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index 47831fee2..4f034ba43 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -62,15 +62,76 @@ use std::collections::HashMap; use std::io::Cursor; use std::pin::Pin; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use tokio::sync::RwLock; const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms"); +const MIN_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-version"); +const MIN_TIMESTAMP_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-timestamp"); const METRIC_TYPE_KEY: &str = "metric_type"; const INDEX_TYPE_KEY: &str = "index_type"; const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30); const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5); +/// Per-table state driving the freshness headers (`x-lancedb-min-version` and +/// `x-lancedb-min-timestamp`) sent on read requests. +#[derive(Debug, Default, Clone, Copy)] +struct FreshnessState { + /// Provides read-your-write within a single handle: writes that return a + /// version update this, and reads send it as `x-lancedb-min-version`. + min_version: Option, + /// Wall-clock time captured at the last [`BaseTable::checkout_latest`] + /// call. Subsequent reads send + /// `max(baseline, now - read_consistency_interval)` as + /// `x-lancedb-min-timestamp`. + /// + /// Without this, `checkout_latest()` would have no effect on subsequent + /// reads when `read_consistency_interval` is unset (the default): a + /// server-side cache could still serve a snapshot older than the moment + /// the user explicitly asked for "latest". The baseline forces the + /// server to skip any cache entry older than the checkout time, so the + /// `checkout_latest()` signal is preserved across reads on the same + /// handle regardless of the configured consistency interval. + checkout_baseline: Option, +} + +/// Snapshot of the headers that should be attached to a single read request. +#[derive(Debug, Default, Clone, Copy)] +struct FreshnessHeaders { + min_version: Option, + min_timestamp: Option, +} + +impl FreshnessHeaders { + fn apply(self, mut request: RequestBuilder) -> RequestBuilder { + if let Some(v) = self.min_version { + request = request.header(MIN_VERSION_HEADER, v.to_string()); + } + if let Some(ts) = self.min_timestamp { + let dt: chrono::DateTime = ts.into(); + request = request.header(MIN_TIMESTAMP_HEADER, dt.to_rfc3339()); + } + request + } +} + +fn compute_min_timestamp( + state: &FreshnessState, + interval: Option, + now: SystemTime, +) -> Option { + let interval_based = match interval { + None => None, + Some(d) if d.is_zero() => Some(now), + Some(d) => Some(now.checked_sub(d).unwrap_or(now)), + }; + match (interval_based, state.checkout_baseline) { + (None, None) => None, + (Some(t), None) | (None, Some(t)) => Some(t), + (Some(a), Some(b)) => Some(a.max(b)), + } +} + pub struct RemoteTags<'a, S: HttpSend = Sender> { inner: &'a RemoteTable, } @@ -80,8 +141,7 @@ impl Tags for RemoteTags<'_, S> { async fn list(&self) -> Result> { let request = self .inner - .client - .post(&format!("/v1/table/{}/tags/list/", self.inner.identifier)); + .post_read(&format!("/v1/table/{}/tags/list/", self.inner.identifier)); let (request_id, response) = self.inner.send(request, true).await?; let response = self .inner @@ -112,48 +172,13 @@ impl Tags for RemoteTags<'_, S> { } async fn get_version(&self, tag: &str) -> Result { - let request = self - .inner - .client - .post(&format!( - "/v1/table/{}/tags/version/", - self.inner.identifier - )) - .json(&serde_json::json!({ "tag": tag })); - - let (request_id, response) = self.inner.send(request, true).await?; - let response = self - .inner - .check_table_response(&request_id, response) - .await?; - - match response.text().await { - Ok(body) => { - let value: serde_json::Value = - serde_json::from_str(&body).map_err(|e| Error::Http { - source: format!("Failed to parse tag version: {}", e).into(), - request_id: request_id.clone(), - status_code: None, - })?; - - value - .get("version") - .and_then(|v| v.as_u64()) - .ok_or_else(|| Error::Http { - source: format!("Invalid tag version response: {}", body).into(), - request_id, - status_code: None, - }) - } - Err(err) => { - let status_code = err.status(); - Err(Error::Http { - source: Box::new(err), - request_id, - status_code, - }) - } - } + let request = self.inner.post_read(&format!( + "/v1/table/{}/tags/version/", + self.inner.identifier + )); + self.inner + .resolve_tag_version_with_request(tag, request) + .await } async fn create(&mut self, tag: &str, version: u64) -> Result<()> { @@ -215,6 +240,7 @@ pub struct RemoteTable { version: RwLock>, location: RwLock>, schema_cache: BackgroundCache, + freshness: Mutex, } impl std::fmt::Debug for RemoteTable { @@ -243,6 +269,7 @@ impl RemoteTable { version: RwLock::new(None), location: RwLock::new(None), schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW), + freshness: Mutex::new(FreshnessState::default()), } } @@ -252,12 +279,56 @@ impl RemoteTable { } async fn describe_version(&self, version: Option) -> Result { - let mut request = self - .client - .post(&format!("/v1/table/{}/describe/", self.identifier)); + let request = self.post_read(&format!("/v1/table/{}/describe/", self.identifier)); + self.describe_with_request(request, version).await + } + async fn resolve_tag_version_with_request( + &self, + tag: &str, + request: RequestBuilder, + ) -> Result { + let request = request.json(&serde_json::json!({ "tag": tag })); + + let (request_id, response) = self.send(request, true).await?; + let response = self.check_table_response(&request_id, response).await?; + + match response.text().await { + Ok(body) => { + let value: serde_json::Value = + serde_json::from_str(&body).map_err(|e| Error::Http { + source: format!("Failed to parse tag version: {}", e).into(), + request_id: request_id.clone(), + status_code: None, + })?; + + value + .get("version") + .and_then(|v| v.as_u64()) + .ok_or_else(|| Error::Http { + source: format!("Invalid tag version response: {}", body).into(), + request_id, + status_code: None, + }) + } + Err(err) => { + let status_code = err.status(); + Err(Error::Http { + source: Box::new(err), + request_id, + status_code, + }) + } + } + } + + async fn describe_with_request( + &self, + request: RequestBuilder, + version: Option, + ) -> Result { let body = serde_json::json!({ "version": version }); - request = request.json(&body); + let request = request.json(&body); let (request_id, response) = self.send(request, true).await?; @@ -711,14 +782,44 @@ impl RemoteTable { *read_guard } + /// Snapshot the freshness headers to attach to a single read request. + /// Computed at call time so that retries reuse the same snapshot. + fn snapshot_freshness_headers(&self) -> FreshnessHeaders { + let state = *self.freshness.lock().unwrap(); + FreshnessHeaders { + min_version: state.min_version, + min_timestamp: compute_min_timestamp( + &state, + self.client.read_consistency_interval, + SystemTime::now(), + ), + } + } + + /// Build a POST request and attach the read-freshness headers + /// (`x-lancedb-min-version`, `x-lancedb-min-timestamp`). + fn post_read(&self, uri: &str) -> RequestBuilder { + self.snapshot_freshness_headers() + .apply(self.client.post(uri)) + } + + /// Record a version returned by a write so subsequent reads can request at + /// least that version via `x-lancedb-min-version`. A returned `0` from a + /// backward-compatible old server is ignored. + fn track_write_version(&self, version: u64) { + if version == 0 { + return; + } + let mut state = self.freshness.lock().unwrap(); + state.min_version = Some(state.min_version.map_or(version, |v| v.max(version))); + } + async fn execute_query( &self, query: &AnyQuery, options: &QueryExecutionOptions, ) -> Result>>> { - let mut request = self - .client - .post(&format!("/v1/table/{}/query/", self.identifier)); + let mut request = self.post_read(&format!("/v1/table/{}/query/", self.identifier)); if let Some(timeout) = options.timeout { // Also send to server, so it can abort the query if it takes too long. @@ -824,9 +925,10 @@ async fn fetch_schema( identifier: &str, table_name: &str, version: Option, + freshness_headers: FreshnessHeaders, ) -> Result { - let request = client - .post(&format!("/v1/table/{}/describe/", identifier)) + let request = freshness_headers + .apply(client.post(&format!("/v1/table/{}/describe/", identifier))) .json(&serde_json::json!({ "version": version })); let (request_id, response) = client.send_with_retry(request, None, true).await?; @@ -874,7 +976,9 @@ mod test_utils { use super::*; use crate::remote::ClientConfig; use crate::remote::client::test_utils::client_with_handler; - use crate::remote::client::test_utils::{MockSender, client_with_handler_and_config}; + use crate::remote::client::test_utils::{ + MockSender, client_with_handler_and_config, client_with_handler_and_interval, + }; impl RemoteTable { pub fn new_mock(name: String, handler: F, version: Option) -> Self @@ -892,6 +996,30 @@ mod test_utils { version: RwLock::new(None), location: RwLock::new(None), schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW), + freshness: Mutex::new(FreshnessState::default()), + } + } + + pub fn new_mock_with_consistency_interval( + name: String, + handler: F, + read_consistency_interval: Option, + ) -> Self + where + F: Fn(reqwest::Request) -> http::Response + Send + Sync + 'static, + T: Into, + { + let client = client_with_handler_and_interval(handler, read_consistency_interval); + Self { + client, + name: name.clone(), + namespace: vec![], + identifier: name, + server_version: ServerVersion::default(), + version: RwLock::new(None), + location: RwLock::new(None), + schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW), + freshness: Mutex::new(FreshnessState::default()), } } @@ -923,6 +1051,7 @@ mod test_utils { version: RwLock::new(None), location: RwLock::new(None), schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW), + freshness: Mutex::new(FreshnessState::default()), } } } @@ -986,6 +1115,7 @@ impl RemoteTable { if output.overwrite { self.invalidate_schema_cache(); } + self.track_write_version(add_result.version); return Ok(add_result); } @@ -1023,6 +1153,7 @@ impl RemoteTable { if output.overwrite { self.invalidate_schema_cache(); } + self.track_write_version(result.version); return Ok(result); } Err(e) => { @@ -1139,8 +1270,13 @@ impl BaseTable for RemoteTable { self.describe().await.map(|desc| desc.version) } async fn checkout(&self, version: u64) -> Result<()> { - // check that the version exists - self.describe_version(Some(version)) + // Validate the version exists. The describe is sent without freshness + // headers so a stale `min_version` from a previous write doesn't ride + // along on an explicit time-travel request. + let request = self + .client + .post(&format!("/v1/table/{}/describe/", self.identifier)); + self.describe_with_request(request, Some(version)) .await .map_err(|e| match e { // try to map the error to a more user-friendly error telling them @@ -1156,6 +1292,10 @@ impl BaseTable for RemoteTable { *write_guard = Some(version); drop(write_guard); + // Explicit time-travel: drop any read-your-write / freshness + // constraints so the user sees exactly the requested version. + *self.freshness.lock().unwrap() = FreshnessState::default(); + // Invalidate schema cache since we're switching versions self.invalidate_schema_cache(); @@ -1166,6 +1306,13 @@ impl BaseTable for RemoteTable { *write_guard = None; drop(write_guard); + // Drop any per-handle write tracking; subsequent reads use the + // baseline timestamp captured now to guarantee freshness. + *self.freshness.lock().unwrap() = FreshnessState { + min_version: None, + checkout_baseline: Some(SystemTime::now()), + }; + // Invalidate schema cache since we're switching versions self.invalidate_schema_cache(); @@ -1186,9 +1333,7 @@ impl BaseTable for RemoteTable { } async fn list_versions(&self) -> Result> { - let request = self - .client - .post(&format!("/v1/table/{}/version/list/", self.identifier)); + let request = self.post_read(&format!("/v1/table/{}/version/list/", self.identifier)); let (request_id, response) = self.send(request, true).await?; let response = self.check_table_response(&request_id, response).await?; @@ -1221,19 +1366,25 @@ impl BaseTable for RemoteTable { let client = self.client.clone(); let identifier = self.identifier.clone(); let table_name = self.name.clone(); + let freshness_headers = self.snapshot_freshness_headers(); self.schema_cache .get(move || async move { - fetch_schema(&client, &identifier, &table_name, version).await + fetch_schema( + &client, + &identifier, + &table_name, + version, + freshness_headers, + ) + .await }) .await .map_err(unwrap_shared_error) } async fn count_rows(&self, filter: Option) -> Result { - let mut request = self - .client - .post(&format!("/v1/table/{}/count_rows/", self.identifier)); + let mut request = self.post_read(&format!("/v1/table/{}/count_rows/", self.identifier)); let version = self.current_version().await; @@ -1359,9 +1510,7 @@ impl BaseTable for RemoteTable { } async fn explain_plan(&self, query: &AnyQuery, verbose: bool) -> Result { - let base_request = self - .client - .post(&format!("/v1/table/{}/explain_plan/", self.identifier)); + let base_request = self.post_read(&format!("/v1/table/{}/explain_plan/", self.identifier)); let query_bodies = self.prepare_query_bodies(query).await?; let requests: Vec = query_bodies @@ -1408,9 +1557,7 @@ impl BaseTable for RemoteTable { query: &AnyQuery, _options: QueryExecutionOptions, ) -> Result { - let request = self - .client - .post(&format!("/v1/table/{}/analyze_plan/", self.identifier)); + let request = self.post_read(&format!("/v1/table/{}/analyze_plan/", self.identifier)); let query_bodies = self.prepare_query_bodies(query).await?; let requests: Vec = query_bodies @@ -1480,6 +1627,7 @@ impl BaseTable for RemoteTable { status_code: None, })?; + self.track_write_version(update_response.version); Ok(update_response) } @@ -1510,6 +1658,7 @@ impl BaseTable for RemoteTable { request_id, status_code: None, })?; + self.track_write_version(delete_response.version); Ok(delete_response) } @@ -1666,6 +1815,7 @@ impl BaseTable for RemoteTable { status_code: None, })?; + self.track_write_version(merge_insert_response.version); Ok(merge_insert_response) } @@ -1691,12 +1841,22 @@ impl BaseTable for RemoteTable { Ok(Box::new(RemoteTags { inner: self })) } async fn checkout_tag(&self, tag: &str) -> Result<()> { - let tags = self.tags().await?; - let version = tags.get_version(tag).await?; + // Resolve the tag without attaching freshness headers; a stale + // `min_version` from a previous write should not ride along on an + // explicit time-travel request. + let request = self + .client + .post(&format!("/v1/table/{}/tags/version/", self.identifier)); + let version = self.resolve_tag_version_with_request(tag, request).await?; + let mut write_guard = self.version.write().await; *write_guard = Some(version); drop(write_guard); + // Explicit time-travel: drop any read-your-write / freshness + // constraints so the user sees exactly the tagged version. + *self.freshness.lock().unwrap() = FreshnessState::default(); + // Invalidate schema cache since we're switching versions self.invalidate_schema_cache(); @@ -1747,6 +1907,7 @@ impl BaseTable for RemoteTable { })?; self.invalidate_schema_cache(); + self.track_write_version(result.version); Ok(result) } @@ -1801,6 +1962,7 @@ impl BaseTable for RemoteTable { })?; self.invalidate_schema_cache(); + self.track_write_version(result.version); Ok(result) } @@ -1828,15 +1990,14 @@ impl BaseTable for RemoteTable { })?; self.invalidate_schema_cache(); + self.track_write_version(result.version); Ok(result) } async fn list_indices(&self) -> Result> { // Make request to list the indices - let mut request = self - .client - .post(&format!("/v1/table/{}/index/list/", self.identifier)); + let mut request = self.post_read(&format!("/v1/table/{}/index/list/", self.identifier)); let version = self.current_version().await; let body = serde_json::json!({ "version": version }); request = request.json(&body); @@ -1900,7 +2061,7 @@ impl BaseTable for RemoteTable { } async fn index_stats(&self, index_name: &str) -> Result> { - let mut request = self.client.post(&format!( + let mut request = self.post_read(&format!( "/v1/table/{}/index/{}/stats/", self.identifier, index_name )); @@ -2012,9 +2173,7 @@ impl BaseTable for RemoteTable { } async fn stats(&self) -> Result { - let request = self - .client - .post(&format!("/v1/table/{}/stats/", self.identifier)); + let request = self.post_read(&format!("/v1/table/{}/stats/", self.identifier)); let (request_id, response) = self.send(request, true).await?; let response = self.check_table_response(&request_id, response).await?; let body = response.text().await.err_to_http(request_id.clone())?; @@ -6005,4 +6164,299 @@ mod tests { assert_eq!(create_count.load(Ordering::SeqCst), 2); assert_eq!(abort_count.load(Ordering::SeqCst), 1); } + + // ---- Read freshness header tests ------------------------------------ + + #[test] + fn test_compute_min_timestamp_combines_baseline_and_interval() { + let now = SystemTime::now(); + let baseline = now - Duration::from_secs(60); + + // No interval, no baseline -> no header. + assert_eq!( + compute_min_timestamp(&FreshnessState::default(), None, now), + None + ); + + // Baseline only -> baseline. + let state = FreshnessState { + min_version: None, + checkout_baseline: Some(baseline), + }; + assert_eq!(compute_min_timestamp(&state, None, now), Some(baseline)); + + // ZERO interval, no baseline -> now. + assert_eq!( + compute_min_timestamp(&FreshnessState::default(), Some(Duration::ZERO), now), + Some(now) + ); + + // Positive interval, no baseline -> now - interval. + assert_eq!( + compute_min_timestamp( + &FreshnessState::default(), + Some(Duration::from_secs(10)), + now + ), + Some(now - Duration::from_secs(10)) + ); + + // Both: pick the more-recent (i.e. tighter) constraint. + // baseline = now-60, now-interval = now-10. now-10 is newer. + let state = FreshnessState { + min_version: None, + checkout_baseline: Some(baseline), + }; + assert_eq!( + compute_min_timestamp(&state, Some(Duration::from_secs(10)), now), + Some(now - Duration::from_secs(10)) + ); + + // Both, baseline newer: pick baseline. + let recent_baseline = now - Duration::from_secs(5); + let state = FreshnessState { + min_version: None, + checkout_baseline: Some(recent_baseline), + }; + assert_eq!( + compute_min_timestamp(&state, Some(Duration::from_secs(60)), now), + Some(recent_baseline) + ); + } + + /// Allowed slop when comparing a header timestamp against a locally + /// captured wall-clock bound. Tests run fast enough that 1s is plenty. + const FRESHNESS_TOLERANCE: Duration = Duration::from_secs(1); + + fn capturing_handler( + body_for: F, + ) -> ( + impl Fn(reqwest::Request) -> http::Response + Clone + Send + Sync + 'static, + Arc>>, + ) + where + F: Fn(&str) -> String + Clone + Send + Sync + 'static, + { + let captured = Arc::new(std::sync::Mutex::new(None)); + let captured_c = captured.clone(); + let handler = move |request: reqwest::Request| { + *captured_c.lock().unwrap() = Some(request.headers().clone()); + let path = request.url().path().to_string(); + http::Response::builder() + .status(200) + .body(body_for(&path)) + .unwrap() + }; + (handler, captured) + } + + fn parse_min_timestamp(headers: &http::HeaderMap) -> SystemTime { + let value = headers + .get("x-lancedb-min-timestamp") + .expect("expected x-lancedb-min-timestamp header") + .to_str() + .unwrap(); + chrono::DateTime::parse_from_rfc3339(value) + .unwrap() + .with_timezone(&chrono::Utc) + .into() + } + + #[tokio::test] + async fn test_freshness_default_sends_no_headers() { + let (handler, captured) = capturing_handler(|_| "42".to_string()); + let table = Table::new_with_handler("my_table", handler); + + let _ = table.count_rows(None).await.unwrap(); + + let headers = captured.lock().unwrap().clone().unwrap(); + assert!(!headers.contains_key("x-lancedb-min-timestamp")); + assert!(!headers.contains_key("x-lancedb-min-version")); + } + + #[tokio::test] + async fn test_freshness_zero_interval_sends_now() { + let (handler, captured) = capturing_handler(|_| "42".to_string()); + let table = + Table::new_with_handler_and_interval("my_table", handler, Some(Duration::from_secs(0))); + + let before = SystemTime::now(); + table.count_rows(None).await.unwrap(); + let after = SystemTime::now(); + + let headers = captured.lock().unwrap().clone().unwrap(); + let sent = parse_min_timestamp(&headers); + assert!( + sent >= before - FRESHNESS_TOLERANCE && sent <= after + FRESHNESS_TOLERANCE, + "expected timestamp roughly equal to wall clock" + ); + assert!(!headers.contains_key("x-lancedb-min-version")); + } + + #[tokio::test] + async fn test_freshness_positive_interval_sends_now_minus_interval() { + let (handler, captured) = capturing_handler(|_| "42".to_string()); + let interval = Duration::from_secs(30); + let table = Table::new_with_handler_and_interval("my_table", handler, Some(interval)); + + let before = SystemTime::now(); + table.count_rows(None).await.unwrap(); + let after = SystemTime::now(); + + let headers = captured.lock().unwrap().clone().unwrap(); + let sent = parse_min_timestamp(&headers); + assert!( + sent >= before - interval - FRESHNESS_TOLERANCE + && sent <= after - interval + FRESHNESS_TOLERANCE, + "expected timestamp roughly equal to now - interval" + ); + } + + #[tokio::test] + async fn test_freshness_checkout_latest_sets_baseline() { + let (handler, captured) = capturing_handler(|path| match path { + "/v1/table/my_table/count_rows/" => "42".to_string(), + _ => panic!("unexpected path: {}", path), + }); + // No interval — only the baseline should drive the timestamp. + let table = Table::new_with_handler_and_interval("my_table", handler, None); + + let before_checkout = SystemTime::now(); + table.checkout_latest().await.unwrap(); + let after_checkout = SystemTime::now(); + + table.count_rows(None).await.unwrap(); + + let headers = captured.lock().unwrap().clone().unwrap(); + let sent = parse_min_timestamp(&headers); + assert!( + sent >= before_checkout - FRESHNESS_TOLERANCE + && sent <= after_checkout + FRESHNESS_TOLERANCE, + "expected timestamp captured at checkout_latest() time" + ); + assert!(!headers.contains_key("x-lancedb-min-version")); + } + + #[tokio::test] + async fn test_freshness_min_version_tracked_after_write() { + let (handler, captured) = capturing_handler(|path| match path { + "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), + "/v1/table/my_table/count_rows/" => "42".to_string(), + _ => panic!("unexpected path: {}", path), + }); + let table = Table::new_with_handler("my_table", handler); + + let _ = table.update().column("a", "a + 1").execute().await.unwrap(); + // Update headers also pass through captured; reset by reading after. + table.count_rows(None).await.unwrap(); + + let headers = captured.lock().unwrap().clone().unwrap(); + assert_eq!( + headers + .get("x-lancedb-min-version") + .unwrap() + .to_str() + .unwrap(), + "7" + ); + } + + /// Like `capturing_handler`, but keeps a per-path snapshot of the headers + /// from every request so tests can assert on a specific endpoint. + #[allow(clippy::type_complexity)] + fn path_capturing_handler( + body_for: F, + ) -> ( + impl Fn(reqwest::Request) -> http::Response + Clone + Send + Sync + 'static, + Arc>>, + ) + where + F: Fn(&str) -> String + Clone + Send + Sync + 'static, + { + let captured: Arc>> = + Arc::new(std::sync::Mutex::new(HashMap::new())); + let captured_c = captured.clone(); + let handler = move |request: reqwest::Request| { + let path = request.url().path().to_string(); + captured_c + .lock() + .unwrap() + .insert(path.clone(), request.headers().clone()); + http::Response::builder() + .status(200) + .body(body_for(&path)) + .unwrap() + }; + (handler, captured) + } + + #[tokio::test] + async fn test_freshness_checkout_validation_sends_no_min_version() { + // After a write bumps min_version, calling checkout(v) must not let + // that stale header ride along on the validating /describe/ request. + let (handler, captured) = path_capturing_handler(|path| match path { + "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), + "/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(), + _ => panic!("unexpected path: {}", path), + }); + let table = Table::new_with_handler("my_table", handler); + + table.update().column("a", "a + 1").execute().await.unwrap(); + table.checkout(5).await.unwrap(); + + let captured = captured.lock().unwrap(); + let describe_headers = captured + .get("/v1/table/my_table/describe/") + .expect("describe should have been called by checkout(v)"); + assert!( + !describe_headers.contains_key("x-lancedb-min-version"), + "checkout(v) describe must not carry stale min_version", + ); + assert!(!describe_headers.contains_key("x-lancedb-min-timestamp")); + } + + #[tokio::test] + async fn test_freshness_checkout_tag_resolve_sends_no_min_version() { + // Same invariant for checkout_tag: the tag-resolve request must not + // pick up a stale min_version from a prior write. + let (handler, captured) = path_capturing_handler(|path| match path { + "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), + "/v1/table/my_table/tags/version/" => r#"{"version":5}"#.to_string(), + _ => panic!("unexpected path: {}", path), + }); + let table = Table::new_with_handler("my_table", handler); + + table.update().column("a", "a + 1").execute().await.unwrap(); + table.checkout_tag("v_initial").await.unwrap(); + + let captured = captured.lock().unwrap(); + let resolve_headers = captured + .get("/v1/table/my_table/tags/version/") + .expect("tags/version should have been called by checkout_tag"); + assert!( + !resolve_headers.contains_key("x-lancedb-min-version"), + "checkout_tag resolve must not carry stale min_version", + ); + assert!(!resolve_headers.contains_key("x-lancedb-min-timestamp")); + } + + #[tokio::test] + async fn test_freshness_checkout_clears_min_version() { + let (handler, captured) = capturing_handler(|path| match path { + "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), + // checkout(5) needs to describe version 5 first + "/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(), + "/v1/table/my_table/count_rows/" => "42".to_string(), + _ => panic!("unexpected path: {}", path), + }); + let table = Table::new_with_handler("my_table", handler); + + table.update().column("a", "a + 1").execute().await.unwrap(); + table.checkout(5).await.unwrap(); + table.count_rows(None).await.unwrap(); + + let headers = captured.lock().unwrap().clone().unwrap(); + assert!(!headers.contains_key("x-lancedb-min-version")); + assert!(!headers.contains_key("x-lancedb-min-timestamp")); + } } diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 6d639a95b..d4e8d3be3 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -686,6 +686,30 @@ mod test_utils { } } + pub fn new_with_handler_and_interval( + name: impl Into, + handler: impl Fn(reqwest::Request) -> http::Response + Clone + Send + Sync + 'static, + read_consistency_interval: Option, + ) -> Self + where + T: Into, + { + let inner = Arc::new( + crate::remote::table::RemoteTable::new_mock_with_consistency_interval( + name.into(), + handler.clone(), + read_consistency_interval, + ), + ); + let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler)); + Self { + inner, + database: Some(database), + // Registry is unused. + embedding_registry: Arc::new(MemoryRegistry::new()), + } + } + pub fn new_with_handler_version( name: impl Into, version: semver::Version,