From cbdad17f34f170ab294281476c6d4fa16bf28199 Mon Sep 17 00:00:00 2001 From: Brendan Clement Date: Fri, 22 May 2026 20:21:39 -0700 Subject: [PATCH] feat(remote): send read freshness headers for remote table consistency --- python/python/lancedb/__init__.py | 1 + python/python/lancedb/remote/db.py | 2 + rust/lancedb/src/connection.rs | 9 +- rust/lancedb/src/remote/client.rs | 20 + rust/lancedb/src/remote/db.rs | 2 + rust/lancedb/src/remote/table.rs | 603 +++++++++++++++++++++++++---- rust/lancedb/src/table.rs | 24 ++ 7 files changed, 578 insertions(+), 83 deletions(-) diff --git a/python/python/lancedb/__init__.py b/python/python/lancedb/__init__.py index 9ddecc6cf..ac370d148 100644 --- a/python/python/lancedb/__init__.py +++ b/python/python/lancedb/__init__.py @@ -217,6 +217,7 @@ def connect( request_thread_pool=request_thread_pool, client_config=client_config, storage_options=storage_options, + read_consistency_interval=read_consistency_interval, **kwargs, ) _check_s3_bucket_with_dots(str(uri), storage_options) diff --git a/python/python/lancedb/remote/db.py b/python/python/lancedb/remote/db.py index e110cdac1..e75a3964c 100644 --- a/python/python/lancedb/remote/db.py +++ b/python/python/lancedb/remote/db.py @@ -50,6 +50,7 @@ class RemoteDBConnection(DBConnection): connection_timeout: Optional[float] = None, read_timeout: Optional[float] = None, storage_options: Optional[Dict[str, str]] = None, + read_consistency_interval: Optional[timedelta] = None, ): """Connect to a remote LanceDB database.""" if isinstance(client_config, dict): @@ -103,6 +104,7 @@ class RemoteDBConnection(DBConnection): host_override=host_override, client_config=client_config, storage_options=storage_options, + read_consistency_interval=read_consistency_interval, ) ) diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index 8034c2a53..1b069eef8 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -812,8 +812,7 @@ impl ConnectBuilder { self } - /// The interval at which to check for updates from other processes. This - /// only affects LanceDB OSS. + /// The interval at which to check for updates from other processes. /// /// If left unset, consistency is not checked. For maximum read /// performance, this is the default. For strong consistency, set this to @@ -825,8 +824,9 @@ impl ConnectBuilder { /// This only affects read operations. Write operations are always /// consistent. /// - /// LanceDB Cloud uses eventual consistency under the hood, and is not - /// currently configurable. + /// For LanceDB Cloud and Enterprise, the interval is sent on every read as + /// an `x-lancedb-min-timestamp` freshness header so the server's cache + /// honors the same semantics. pub fn read_consistency_interval( mut self, read_consistency_interval: std::time::Duration, @@ -886,6 +886,7 @@ impl ConnectBuilder { options.host_override, self.request.client_config, storage_options.into(), + self.request.read_consistency_interval, )?); Ok(Connection { internal, diff --git a/rust/lancedb/src/remote/client.rs b/rust/lancedb/src/remote/client.rs index 9197ba74a..378698ca6 100644 --- a/rust/lancedb/src/remote/client.rs +++ b/rust/lancedb/src/remote/client.rs @@ -245,6 +245,9 @@ pub struct RestfulLanceDbClient { pub(crate) sender: S, pub(crate) id_delimiter: String, pub(crate) header_provider: Option>, + /// Connection-level read consistency interval. Drives the + /// `x-lancedb-min-timestamp` freshness header sent on read requests. + pub(crate) read_consistency_interval: Option, } impl std::fmt::Debug for RestfulLanceDbClient { @@ -338,6 +341,7 @@ impl RestfulLanceDbClient { host_override: Option, default_headers: HeaderMap, client_config: ClientConfig, + read_consistency_interval: Option, ) -> Result { // Get the timeouts let timeout = @@ -435,6 +439,7 @@ impl RestfulLanceDbClient { .clone() .unwrap_or("$".to_string()), header_provider: client_config.header_provider, + read_consistency_interval, }) } } @@ -840,6 +845,16 @@ pub mod test_utils { pub fn client_with_handler( handler: impl Fn(reqwest::Request) -> http::response::Response + Send + Sync + 'static, ) -> RestfulLanceDbClient + where + T: Into, + { + client_with_handler_and_interval(handler, None) + } + + pub fn client_with_handler_and_interval( + handler: impl Fn(reqwest::Request) -> http::response::Response + Send + Sync + 'static, + read_consistency_interval: Option, + ) -> RestfulLanceDbClient where T: Into, { @@ -857,6 +872,7 @@ pub mod test_utils { }, id_delimiter: "$".to_string(), header_provider: None, + read_consistency_interval, } } @@ -881,6 +897,7 @@ pub mod test_utils { }, id_delimiter: config.id_delimiter.unwrap_or_else(|| "$".to_string()), header_provider: config.header_provider, + read_consistency_interval: None, } } } @@ -1047,6 +1064,7 @@ mod tests { sender: Sender, id_delimiter: "+".to_string(), header_provider: Some(Arc::new(provider) as Arc), + read_consistency_interval: None, }; // Apply dynamic headers @@ -1082,6 +1100,7 @@ mod tests { sender: Sender, id_delimiter: "+".to_string(), header_provider: Some(Arc::new(provider) as Arc), + read_consistency_interval: None, }; // Apply dynamic headers @@ -1119,6 +1138,7 @@ mod tests { sender: Sender, id_delimiter: "+".to_string(), header_provider: Some(Arc::new(provider) as Arc), + read_consistency_interval: None, }; // Header provider errors should fail the request diff --git a/rust/lancedb/src/remote/db.rs b/rust/lancedb/src/remote/db.rs index dfe5d0c99..c11584157 100644 --- a/rust/lancedb/src/remote/db.rs +++ b/rust/lancedb/src/remote/db.rs @@ -206,6 +206,7 @@ impl RemoteDatabase { host_override: Option, client_config: ClientConfig, options: RemoteOptions, + read_consistency_interval: Option, ) -> Result { let parsed = super::client::parse_db_url(uri)?; let header_map = RestfulLanceDbClient::::default_headers( @@ -233,6 +234,7 @@ impl RemoteDatabase { host_override, header_map, client_config.clone(), + read_consistency_interval, )?; let table_cache = Cache::builder() diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index b50ae4e7a..bf46ab5c3 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -62,15 +62,67 @@ use std::collections::HashMap; use std::io::Cursor; use std::pin::Pin; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use tokio::sync::RwLock; const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms"); +const MIN_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-version"); +const MIN_TIMESTAMP_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-timestamp"); const METRIC_TYPE_KEY: &str = "metric_type"; const INDEX_TYPE_KEY: &str = "index_type"; const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30); const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5); +/// Per-table state driving the freshness headers (`x-lancedb-min-version` and +/// `x-lancedb-min-timestamp`) sent on read requests. +/// +/// `min_version` provides read-your-write within a single handle: writes that +/// return a version update it, and reads send it. `checkout_baseline` is the +/// wall-clock time captured at the last [`BaseTable::checkout_latest`] call; +/// reads send `max(baseline, now - read_consistency_interval)`. +#[derive(Debug, Default, Clone, Copy)] +struct FreshnessState { + min_version: Option, + checkout_baseline: Option, +} + +/// Snapshot of the headers that should be attached to a single read request. +#[derive(Debug, Default, Clone, Copy)] +struct FreshnessHeaders { + min_version: Option, + min_timestamp: Option, +} + +impl FreshnessHeaders { + fn apply(self, mut request: RequestBuilder) -> RequestBuilder { + if let Some(v) = self.min_version { + request = request.header(MIN_VERSION_HEADER, v.to_string()); + } + if let Some(ts) = self.min_timestamp { + let dt: chrono::DateTime = ts.into(); + request = request.header(MIN_TIMESTAMP_HEADER, dt.to_rfc3339()); + } + request + } +} + +fn compute_min_timestamp( + state: &FreshnessState, + interval: Option, + now: SystemTime, +) -> Option { + let interval_based = match interval { + None => None, + Some(d) if d.is_zero() => Some(now), + Some(d) => Some(now.checked_sub(d).unwrap_or(now)), + }; + match (interval_based, state.checkout_baseline) { + (None, None) => None, + (Some(t), None) | (None, Some(t)) => Some(t), + (Some(a), Some(b)) => Some(a.max(b)), + } +} + pub struct RemoteTags<'a, S: HttpSend = Sender> { inner: &'a RemoteTable, } @@ -80,8 +132,7 @@ impl Tags for RemoteTags<'_, S> { async fn list(&self) -> Result> { let request = self .inner - .client - .post(&format!("/v1/table/{}/tags/list/", self.inner.identifier)); + .post_read(&format!("/v1/table/{}/tags/list/", self.inner.identifier)); let (request_id, response) = self.inner.send(request, true).await?; let response = self .inner @@ -112,48 +163,13 @@ impl Tags for RemoteTags<'_, S> { } async fn get_version(&self, tag: &str) -> Result { - let request = self - .inner - .client - .post(&format!( - "/v1/table/{}/tags/version/", - self.inner.identifier - )) - .json(&serde_json::json!({ "tag": tag })); - - let (request_id, response) = self.inner.send(request, true).await?; - let response = self - .inner - .check_table_response(&request_id, response) - .await?; - - match response.text().await { - Ok(body) => { - let value: serde_json::Value = - serde_json::from_str(&body).map_err(|e| Error::Http { - source: format!("Failed to parse tag version: {}", e).into(), - request_id: request_id.clone(), - status_code: None, - })?; - - value - .get("version") - .and_then(|v| v.as_u64()) - .ok_or_else(|| Error::Http { - source: format!("Invalid tag version response: {}", body).into(), - request_id, - status_code: None, - }) - } - Err(err) => { - let status_code = err.status(); - Err(Error::Http { - source: Box::new(err), - request_id, - status_code, - }) - } - } + let request = self.inner.post_read(&format!( + "/v1/table/{}/tags/version/", + self.inner.identifier + )); + self.inner + .resolve_tag_version_with_request(tag, request) + .await } async fn create(&mut self, tag: &str, version: u64) -> Result<()> { @@ -215,6 +231,7 @@ pub struct RemoteTable { version: RwLock>, location: RwLock>, schema_cache: BackgroundCache, + freshness: Mutex, } impl std::fmt::Debug for RemoteTable { @@ -243,6 +260,7 @@ impl RemoteTable { version: RwLock::new(None), location: RwLock::new(None), schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW), + freshness: Mutex::new(FreshnessState::default()), } } @@ -252,12 +270,56 @@ impl RemoteTable { } async fn describe_version(&self, version: Option) -> Result { - let mut request = self - .client - .post(&format!("/v1/table/{}/describe/", self.identifier)); + let request = self.post_read(&format!("/v1/table/{}/describe/", self.identifier)); + self.describe_with_request(request, version).await + } + async fn resolve_tag_version_with_request( + &self, + tag: &str, + request: RequestBuilder, + ) -> Result { + let request = request.json(&serde_json::json!({ "tag": tag })); + + let (request_id, response) = self.send(request, true).await?; + let response = self.check_table_response(&request_id, response).await?; + + match response.text().await { + Ok(body) => { + let value: serde_json::Value = + serde_json::from_str(&body).map_err(|e| Error::Http { + source: format!("Failed to parse tag version: {}", e).into(), + request_id: request_id.clone(), + status_code: None, + })?; + + value + .get("version") + .and_then(|v| v.as_u64()) + .ok_or_else(|| Error::Http { + source: format!("Invalid tag version response: {}", body).into(), + request_id, + status_code: None, + }) + } + Err(err) => { + let status_code = err.status(); + Err(Error::Http { + source: Box::new(err), + request_id, + status_code, + }) + } + } + } + + async fn describe_with_request( + &self, + request: RequestBuilder, + version: Option, + ) -> Result { let body = serde_json::json!({ "version": version }); - request = request.json(&body); + let request = request.json(&body); let (request_id, response) = self.send(request, true).await?; @@ -711,14 +773,44 @@ impl RemoteTable { *read_guard } + /// Snapshot the freshness headers to attach to a single read request. + /// Computed at call time so that retries reuse the same snapshot. + fn snapshot_freshness_headers(&self) -> FreshnessHeaders { + let state = *self.freshness.lock().unwrap(); + FreshnessHeaders { + min_version: state.min_version, + min_timestamp: compute_min_timestamp( + &state, + self.client.read_consistency_interval, + SystemTime::now(), + ), + } + } + + /// Build a POST request and attach the read-freshness headers + /// (`x-lancedb-min-version`, `x-lancedb-min-timestamp`). + fn post_read(&self, uri: &str) -> RequestBuilder { + self.snapshot_freshness_headers() + .apply(self.client.post(uri)) + } + + /// Record a version returned by a write so subsequent reads can request at + /// least that version via `x-lancedb-min-version`. A returned `0` from a + /// backward-compatible old server is ignored. + fn track_write_version(&self, version: u64) { + if version == 0 { + return; + } + let mut state = self.freshness.lock().unwrap(); + state.min_version = Some(state.min_version.map_or(version, |v| v.max(version))); + } + async fn execute_query( &self, query: &AnyQuery, options: &QueryExecutionOptions, ) -> Result>>> { - let mut request = self - .client - .post(&format!("/v1/table/{}/query/", self.identifier)); + let mut request = self.post_read(&format!("/v1/table/{}/query/", self.identifier)); if let Some(timeout) = options.timeout { // Also send to server, so it can abort the query if it takes too long. @@ -824,9 +916,10 @@ async fn fetch_schema( identifier: &str, table_name: &str, version: Option, + freshness_headers: FreshnessHeaders, ) -> Result { - let request = client - .post(&format!("/v1/table/{}/describe/", identifier)) + let request = freshness_headers + .apply(client.post(&format!("/v1/table/{}/describe/", identifier))) .json(&serde_json::json!({ "version": version })); let (request_id, response) = client.send_with_retry(request, None, true).await?; @@ -874,7 +967,9 @@ mod test_utils { use super::*; use crate::remote::ClientConfig; use crate::remote::client::test_utils::client_with_handler; - use crate::remote::client::test_utils::{MockSender, client_with_handler_and_config}; + use crate::remote::client::test_utils::{ + MockSender, client_with_handler_and_config, client_with_handler_and_interval, + }; impl RemoteTable { pub fn new_mock(name: String, handler: F, version: Option) -> Self @@ -892,6 +987,30 @@ mod test_utils { version: RwLock::new(None), location: RwLock::new(None), schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW), + freshness: Mutex::new(FreshnessState::default()), + } + } + + pub fn new_mock_with_consistency_interval( + name: String, + handler: F, + read_consistency_interval: Option, + ) -> Self + where + F: Fn(reqwest::Request) -> http::Response + Send + Sync + 'static, + T: Into, + { + let client = client_with_handler_and_interval(handler, read_consistency_interval); + Self { + client, + name: name.clone(), + namespace: vec![], + identifier: name, + server_version: ServerVersion::default(), + version: RwLock::new(None), + location: RwLock::new(None), + schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW), + freshness: Mutex::new(FreshnessState::default()), } } @@ -923,6 +1042,7 @@ mod test_utils { version: RwLock::new(None), location: RwLock::new(None), schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW), + freshness: Mutex::new(FreshnessState::default()), } } } @@ -986,6 +1106,7 @@ impl RemoteTable { if output.overwrite { self.invalidate_schema_cache(); } + self.track_write_version(add_result.version); return Ok(add_result); } @@ -1023,6 +1144,7 @@ impl RemoteTable { if output.overwrite { self.invalidate_schema_cache(); } + self.track_write_version(result.version); return Ok(result); } Err(e) => { @@ -1139,8 +1261,13 @@ impl BaseTable for RemoteTable { self.describe().await.map(|desc| desc.version) } async fn checkout(&self, version: u64) -> Result<()> { - // check that the version exists - self.describe_version(Some(version)) + // Validate the version exists. The describe is sent without freshness + // headers so a stale `min_version` from a previous write doesn't ride + // along on an explicit time-travel request. + let request = self + .client + .post(&format!("/v1/table/{}/describe/", self.identifier)); + self.describe_with_request(request, Some(version)) .await .map_err(|e| match e { // try to map the error to a more user-friendly error telling them @@ -1156,6 +1283,10 @@ impl BaseTable for RemoteTable { *write_guard = Some(version); drop(write_guard); + // Explicit time-travel: drop any read-your-write / freshness + // constraints so the user sees exactly the requested version. + *self.freshness.lock().unwrap() = FreshnessState::default(); + // Invalidate schema cache since we're switching versions self.invalidate_schema_cache(); @@ -1166,6 +1297,13 @@ impl BaseTable for RemoteTable { *write_guard = None; drop(write_guard); + // Drop any per-handle write tracking; subsequent reads use the + // baseline timestamp captured now to guarantee freshness. + *self.freshness.lock().unwrap() = FreshnessState { + min_version: None, + checkout_baseline: Some(SystemTime::now()), + }; + // Invalidate schema cache since we're switching versions self.invalidate_schema_cache(); @@ -1186,9 +1324,7 @@ impl BaseTable for RemoteTable { } async fn list_versions(&self) -> Result> { - let request = self - .client - .post(&format!("/v1/table/{}/version/list/", self.identifier)); + let request = self.post_read(&format!("/v1/table/{}/version/list/", self.identifier)); let (request_id, response) = self.send(request, true).await?; let response = self.check_table_response(&request_id, response).await?; @@ -1221,19 +1357,25 @@ impl BaseTable for RemoteTable { let client = self.client.clone(); let identifier = self.identifier.clone(); let table_name = self.name.clone(); + let freshness_headers = self.snapshot_freshness_headers(); self.schema_cache .get(move || async move { - fetch_schema(&client, &identifier, &table_name, version).await + fetch_schema( + &client, + &identifier, + &table_name, + version, + freshness_headers, + ) + .await }) .await .map_err(unwrap_shared_error) } async fn count_rows(&self, filter: Option) -> Result { - let mut request = self - .client - .post(&format!("/v1/table/{}/count_rows/", self.identifier)); + let mut request = self.post_read(&format!("/v1/table/{}/count_rows/", self.identifier)); let version = self.current_version().await; @@ -1359,9 +1501,7 @@ impl BaseTable for RemoteTable { } async fn explain_plan(&self, query: &AnyQuery, verbose: bool) -> Result { - let base_request = self - .client - .post(&format!("/v1/table/{}/explain_plan/", self.identifier)); + let base_request = self.post_read(&format!("/v1/table/{}/explain_plan/", self.identifier)); let query_bodies = self.prepare_query_bodies(query).await?; let requests: Vec = query_bodies @@ -1408,9 +1548,7 @@ impl BaseTable for RemoteTable { query: &AnyQuery, _options: QueryExecutionOptions, ) -> Result { - let request = self - .client - .post(&format!("/v1/table/{}/analyze_plan/", self.identifier)); + let request = self.post_read(&format!("/v1/table/{}/analyze_plan/", self.identifier)); let query_bodies = self.prepare_query_bodies(query).await?; let requests: Vec = query_bodies @@ -1480,6 +1618,7 @@ impl BaseTable for RemoteTable { status_code: None, })?; + self.track_write_version(update_response.version); Ok(update_response) } @@ -1506,6 +1645,7 @@ impl BaseTable for RemoteTable { request_id, status_code: None, })?; + self.track_write_version(delete_response.version); Ok(delete_response) } @@ -1662,6 +1802,7 @@ impl BaseTable for RemoteTable { status_code: None, })?; + self.track_write_version(merge_insert_response.version); Ok(merge_insert_response) } @@ -1687,12 +1828,22 @@ impl BaseTable for RemoteTable { Ok(Box::new(RemoteTags { inner: self })) } async fn checkout_tag(&self, tag: &str) -> Result<()> { - let tags = self.tags().await?; - let version = tags.get_version(tag).await?; + // Resolve the tag without attaching freshness headers; a stale + // `min_version` from a previous write should not ride along on an + // explicit time-travel request. + let request = self + .client + .post(&format!("/v1/table/{}/tags/version/", self.identifier)); + let version = self.resolve_tag_version_with_request(tag, request).await?; + let mut write_guard = self.version.write().await; *write_guard = Some(version); drop(write_guard); + // Explicit time-travel: drop any read-your-write / freshness + // constraints so the user sees exactly the tagged version. + *self.freshness.lock().unwrap() = FreshnessState::default(); + // Invalidate schema cache since we're switching versions self.invalidate_schema_cache(); @@ -1743,6 +1894,7 @@ impl BaseTable for RemoteTable { })?; self.invalidate_schema_cache(); + self.track_write_version(result.version); Ok(result) } @@ -1797,6 +1949,7 @@ impl BaseTable for RemoteTable { })?; self.invalidate_schema_cache(); + self.track_write_version(result.version); Ok(result) } @@ -1824,15 +1977,14 @@ impl BaseTable for RemoteTable { })?; self.invalidate_schema_cache(); + self.track_write_version(result.version); Ok(result) } async fn list_indices(&self) -> Result> { // Make request to list the indices - let mut request = self - .client - .post(&format!("/v1/table/{}/index/list/", self.identifier)); + let mut request = self.post_read(&format!("/v1/table/{}/index/list/", self.identifier)); let version = self.current_version().await; let body = serde_json::json!({ "version": version }); request = request.json(&body); @@ -1896,7 +2048,7 @@ impl BaseTable for RemoteTable { } async fn index_stats(&self, index_name: &str) -> Result> { - let mut request = self.client.post(&format!( + let mut request = self.post_read(&format!( "/v1/table/{}/index/{}/stats/", self.identifier, index_name )); @@ -2008,9 +2160,7 @@ impl BaseTable for RemoteTable { } async fn stats(&self) -> Result { - let request = self - .client - .post(&format!("/v1/table/{}/stats/", self.identifier)); + let request = self.post_read(&format!("/v1/table/{}/stats/", self.identifier)); let (request_id, response) = self.send(request, true).await?; let response = self.check_table_response(&request_id, response).await?; let body = response.text().await.err_to_http(request_id.clone())?; @@ -5974,4 +6124,299 @@ mod tests { assert_eq!(create_count.load(Ordering::SeqCst), 2); assert_eq!(abort_count.load(Ordering::SeqCst), 1); } + + // ---- Read freshness header tests ------------------------------------ + + #[test] + fn test_compute_min_timestamp_combines_baseline_and_interval() { + let now = SystemTime::now(); + let baseline = now - Duration::from_secs(60); + + // No interval, no baseline -> no header. + assert_eq!( + compute_min_timestamp(&FreshnessState::default(), None, now), + None + ); + + // Baseline only -> baseline. + let state = FreshnessState { + min_version: None, + checkout_baseline: Some(baseline), + }; + assert_eq!(compute_min_timestamp(&state, None, now), Some(baseline)); + + // ZERO interval, no baseline -> now. + assert_eq!( + compute_min_timestamp(&FreshnessState::default(), Some(Duration::ZERO), now), + Some(now) + ); + + // Positive interval, no baseline -> now - interval. + assert_eq!( + compute_min_timestamp( + &FreshnessState::default(), + Some(Duration::from_secs(10)), + now + ), + Some(now - Duration::from_secs(10)) + ); + + // Both: pick the more-recent (i.e. tighter) constraint. + // baseline = now-60, now-interval = now-10. now-10 is newer. + let state = FreshnessState { + min_version: None, + checkout_baseline: Some(baseline), + }; + assert_eq!( + compute_min_timestamp(&state, Some(Duration::from_secs(10)), now), + Some(now - Duration::from_secs(10)) + ); + + // Both, baseline newer: pick baseline. + let recent_baseline = now - Duration::from_secs(5); + let state = FreshnessState { + min_version: None, + checkout_baseline: Some(recent_baseline), + }; + assert_eq!( + compute_min_timestamp(&state, Some(Duration::from_secs(60)), now), + Some(recent_baseline) + ); + } + + /// Allowed slop when comparing a header timestamp against a locally + /// captured wall-clock bound. Tests run fast enough that 1s is plenty. + const FRESHNESS_TOLERANCE: Duration = Duration::from_secs(1); + + fn capturing_handler( + body_for: F, + ) -> ( + impl Fn(reqwest::Request) -> http::Response + Clone + Send + Sync + 'static, + Arc>>, + ) + where + F: Fn(&str) -> String + Clone + Send + Sync + 'static, + { + let captured = Arc::new(std::sync::Mutex::new(None)); + let captured_c = captured.clone(); + let handler = move |request: reqwest::Request| { + *captured_c.lock().unwrap() = Some(request.headers().clone()); + let path = request.url().path().to_string(); + http::Response::builder() + .status(200) + .body(body_for(&path)) + .unwrap() + }; + (handler, captured) + } + + fn parse_min_timestamp(headers: &http::HeaderMap) -> SystemTime { + let value = headers + .get("x-lancedb-min-timestamp") + .expect("expected x-lancedb-min-timestamp header") + .to_str() + .unwrap(); + chrono::DateTime::parse_from_rfc3339(value) + .unwrap() + .with_timezone(&chrono::Utc) + .into() + } + + #[tokio::test] + async fn test_freshness_default_sends_no_headers() { + let (handler, captured) = capturing_handler(|_| "42".to_string()); + let table = Table::new_with_handler("my_table", handler); + + let _ = table.count_rows(None).await.unwrap(); + + let headers = captured.lock().unwrap().clone().unwrap(); + assert!(!headers.contains_key("x-lancedb-min-timestamp")); + assert!(!headers.contains_key("x-lancedb-min-version")); + } + + #[tokio::test] + async fn test_freshness_zero_interval_sends_now() { + let (handler, captured) = capturing_handler(|_| "42".to_string()); + let table = + Table::new_with_handler_and_interval("my_table", handler, Some(Duration::from_secs(0))); + + let before = SystemTime::now(); + table.count_rows(None).await.unwrap(); + let after = SystemTime::now(); + + let headers = captured.lock().unwrap().clone().unwrap(); + let sent = parse_min_timestamp(&headers); + assert!( + sent >= before - FRESHNESS_TOLERANCE && sent <= after + FRESHNESS_TOLERANCE, + "expected timestamp roughly equal to wall clock" + ); + assert!(!headers.contains_key("x-lancedb-min-version")); + } + + #[tokio::test] + async fn test_freshness_positive_interval_sends_now_minus_interval() { + let (handler, captured) = capturing_handler(|_| "42".to_string()); + let interval = Duration::from_secs(30); + let table = Table::new_with_handler_and_interval("my_table", handler, Some(interval)); + + let before = SystemTime::now(); + table.count_rows(None).await.unwrap(); + let after = SystemTime::now(); + + let headers = captured.lock().unwrap().clone().unwrap(); + let sent = parse_min_timestamp(&headers); + assert!( + sent >= before - interval - FRESHNESS_TOLERANCE + && sent <= after - interval + FRESHNESS_TOLERANCE, + "expected timestamp roughly equal to now - interval" + ); + } + + #[tokio::test] + async fn test_freshness_checkout_latest_sets_baseline() { + let (handler, captured) = capturing_handler(|path| match path { + "/v1/table/my_table/count_rows/" => "42".to_string(), + _ => panic!("unexpected path: {}", path), + }); + // No interval — only the baseline should drive the timestamp. + let table = Table::new_with_handler_and_interval("my_table", handler, None); + + let before_checkout = SystemTime::now(); + table.checkout_latest().await.unwrap(); + let after_checkout = SystemTime::now(); + + table.count_rows(None).await.unwrap(); + + let headers = captured.lock().unwrap().clone().unwrap(); + let sent = parse_min_timestamp(&headers); + assert!( + sent >= before_checkout - FRESHNESS_TOLERANCE + && sent <= after_checkout + FRESHNESS_TOLERANCE, + "expected timestamp captured at checkout_latest() time" + ); + assert!(!headers.contains_key("x-lancedb-min-version")); + } + + #[tokio::test] + async fn test_freshness_min_version_tracked_after_write() { + let (handler, captured) = capturing_handler(|path| match path { + "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), + "/v1/table/my_table/count_rows/" => "42".to_string(), + _ => panic!("unexpected path: {}", path), + }); + let table = Table::new_with_handler("my_table", handler); + + let _ = table.update().column("a", "a + 1").execute().await.unwrap(); + // Update headers also pass through captured; reset by reading after. + table.count_rows(None).await.unwrap(); + + let headers = captured.lock().unwrap().clone().unwrap(); + assert_eq!( + headers + .get("x-lancedb-min-version") + .unwrap() + .to_str() + .unwrap(), + "7" + ); + } + + /// Like `capturing_handler`, but keeps a per-path snapshot of the headers + /// from every request so tests can assert on a specific endpoint. + #[allow(clippy::type_complexity)] + fn path_capturing_handler( + body_for: F, + ) -> ( + impl Fn(reqwest::Request) -> http::Response + Clone + Send + Sync + 'static, + Arc>>, + ) + where + F: Fn(&str) -> String + Clone + Send + Sync + 'static, + { + let captured: Arc>> = + Arc::new(std::sync::Mutex::new(HashMap::new())); + let captured_c = captured.clone(); + let handler = move |request: reqwest::Request| { + let path = request.url().path().to_string(); + captured_c + .lock() + .unwrap() + .insert(path.clone(), request.headers().clone()); + http::Response::builder() + .status(200) + .body(body_for(&path)) + .unwrap() + }; + (handler, captured) + } + + #[tokio::test] + async fn test_freshness_checkout_validation_sends_no_min_version() { + // After a write bumps min_version, calling checkout(v) must not let + // that stale header ride along on the validating /describe/ request. + let (handler, captured) = path_capturing_handler(|path| match path { + "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), + "/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(), + _ => panic!("unexpected path: {}", path), + }); + let table = Table::new_with_handler("my_table", handler); + + table.update().column("a", "a + 1").execute().await.unwrap(); + table.checkout(5).await.unwrap(); + + let captured = captured.lock().unwrap(); + let describe_headers = captured + .get("/v1/table/my_table/describe/") + .expect("describe should have been called by checkout(v)"); + assert!( + !describe_headers.contains_key("x-lancedb-min-version"), + "checkout(v) describe must not carry stale min_version", + ); + assert!(!describe_headers.contains_key("x-lancedb-min-timestamp")); + } + + #[tokio::test] + async fn test_freshness_checkout_tag_resolve_sends_no_min_version() { + // Same invariant for checkout_tag: the tag-resolve request must not + // pick up a stale min_version from a prior write. + let (handler, captured) = path_capturing_handler(|path| match path { + "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), + "/v1/table/my_table/tags/version/" => r#"{"version":5}"#.to_string(), + _ => panic!("unexpected path: {}", path), + }); + let table = Table::new_with_handler("my_table", handler); + + table.update().column("a", "a + 1").execute().await.unwrap(); + table.checkout_tag("v_initial").await.unwrap(); + + let captured = captured.lock().unwrap(); + let resolve_headers = captured + .get("/v1/table/my_table/tags/version/") + .expect("tags/version should have been called by checkout_tag"); + assert!( + !resolve_headers.contains_key("x-lancedb-min-version"), + "checkout_tag resolve must not carry stale min_version", + ); + assert!(!resolve_headers.contains_key("x-lancedb-min-timestamp")); + } + + #[tokio::test] + async fn test_freshness_checkout_clears_min_version() { + let (handler, captured) = capturing_handler(|path| match path { + "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), + // checkout(5) needs to describe version 5 first + "/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(), + "/v1/table/my_table/count_rows/" => "42".to_string(), + _ => panic!("unexpected path: {}", path), + }); + let table = Table::new_with_handler("my_table", handler); + + table.update().column("a", "a + 1").execute().await.unwrap(); + table.checkout(5).await.unwrap(); + table.count_rows(None).await.unwrap(); + + let headers = captured.lock().unwrap().clone().unwrap(); + assert!(!headers.contains_key("x-lancedb-min-version")); + assert!(!headers.contains_key("x-lancedb-min-timestamp")); + } } diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 03f967e6e..e3c0ab1ef 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -656,6 +656,30 @@ mod test_utils { } } + pub fn new_with_handler_and_interval( + name: impl Into, + handler: impl Fn(reqwest::Request) -> http::Response + Clone + Send + Sync + 'static, + read_consistency_interval: Option, + ) -> Self + where + T: Into, + { + let inner = Arc::new( + crate::remote::table::RemoteTable::new_mock_with_consistency_interval( + name.into(), + handler.clone(), + read_consistency_interval, + ), + ); + let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler)); + Self { + inner, + database: Some(database), + // Registry is unused. + embedding_registry: Arc::new(MemoryRegistry::new()), + } + } + pub fn new_with_handler_version( name: impl Into, version: semver::Version,