From cb6a1fafa6438277187ee13e2a5f43a5ba1ddfd9 Mon Sep 17 00:00:00 2001 From: Brendan Clement Date: Thu, 28 May 2026 14:41:31 -0700 Subject: [PATCH] refactor(remote): drop x-lancedb-min-version; use timestamp baseline for read-your-write --- rust/lancedb/src/remote/table.rs | 158 +++++++++++++++---------------- 1 file changed, 78 insertions(+), 80 deletions(-) diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index 4f034ba43..73f042ed2 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -66,47 +66,36 @@ use std::time::{Duration, SystemTime}; use tokio::sync::RwLock; const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms"); -const MIN_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-version"); const MIN_TIMESTAMP_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-timestamp"); const METRIC_TYPE_KEY: &str = "metric_type"; const INDEX_TYPE_KEY: &str = "index_type"; const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30); const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5); -/// Per-table state driving the freshness headers (`x-lancedb-min-version` and -/// `x-lancedb-min-timestamp`) sent on read requests. +/// Per-table state driving the `x-lancedb-min-timestamp` freshness header +/// sent on read requests. #[derive(Debug, Default, Clone, Copy)] struct FreshnessState { - /// Provides read-your-write within a single handle: writes that return a - /// version update this, and reads send it as `x-lancedb-min-version`. - min_version: Option, - /// Wall-clock time captured at the last [`BaseTable::checkout_latest`] - /// call. Subsequent reads send - /// `max(baseline, now - read_consistency_interval)` as - /// `x-lancedb-min-timestamp`. + /// Wall-clock floor for read freshness, bumped to `now` whenever this + /// handle performs a write or an explicit [`BaseTable::checkout_latest`]. + /// Subsequent reads send `max(baseline, now - read_consistency_interval)` + /// as `x-lancedb-min-timestamp`. /// - /// Without this, `checkout_latest()` would have no effect on subsequent - /// reads when `read_consistency_interval` is unset (the default): a - /// server-side cache could still serve a snapshot older than the moment - /// the user explicitly asked for "latest". The baseline forces the - /// server to skip any cache entry older than the checkout time, so the - /// `checkout_latest()` signal is preserved across reads on the same - /// handle regardless of the configured consistency interval. - checkout_baseline: Option, + /// This is what provides read-your-write on a single handle: after a write + /// the next read forces the server cache past the write time. It also + /// preserves the `checkout_latest()` signal when `read_consistency_interval` + /// is unset (the default), where there is no interval-based floor. + freshness_baseline: Option, } /// Snapshot of the headers that should be attached to a single read request. #[derive(Debug, Default, Clone, Copy)] struct FreshnessHeaders { - min_version: Option, min_timestamp: Option, } impl FreshnessHeaders { fn apply(self, mut request: RequestBuilder) -> RequestBuilder { - if let Some(v) = self.min_version { - request = request.header(MIN_VERSION_HEADER, v.to_string()); - } if let Some(ts) = self.min_timestamp { let dt: chrono::DateTime = ts.into(); request = request.header(MIN_TIMESTAMP_HEADER, dt.to_rfc3339()); @@ -115,6 +104,14 @@ impl FreshnessHeaders { } } +/// Monotonic floor for the freshness baseline. `SystemTime` is not monotonic +/// (NTP steps, hibernate/resume can move it backward), so a write must never +/// lower the baseline below a prior write's — that would let the next read send +/// a `min_timestamp` earlier than an earlier write and break read-your-write. +fn next_freshness_baseline(prev: Option, now: SystemTime) -> SystemTime { + prev.map_or(now, |prev| prev.max(now)) +} + fn compute_min_timestamp( state: &FreshnessState, interval: Option, @@ -125,7 +122,7 @@ fn compute_min_timestamp( Some(d) if d.is_zero() => Some(now), Some(d) => Some(now.checked_sub(d).unwrap_or(now)), }; - match (interval_based, state.checkout_baseline) { + match (interval_based, state.freshness_baseline) { (None, None) => None, (Some(t), None) | (None, Some(t)) => Some(t), (Some(a), Some(b)) => Some(a.max(b)), @@ -787,7 +784,6 @@ impl RemoteTable { fn snapshot_freshness_headers(&self) -> FreshnessHeaders { let state = *self.freshness.lock().unwrap(); FreshnessHeaders { - min_version: state.min_version, min_timestamp: compute_min_timestamp( &state, self.client.read_consistency_interval, @@ -796,22 +792,20 @@ impl RemoteTable { } } - /// Build a POST request and attach the read-freshness headers - /// (`x-lancedb-min-version`, `x-lancedb-min-timestamp`). + /// Build a POST request and attach the `x-lancedb-min-timestamp` freshness + /// header. fn post_read(&self, uri: &str) -> RequestBuilder { self.snapshot_freshness_headers() .apply(self.client.post(uri)) } - /// Record a version returned by a write so subsequent reads can request at - /// least that version via `x-lancedb-min-version`. A returned `0` from a - /// backward-compatible old server is ignored. - fn track_write_version(&self, version: u64) { - if version == 0 { - return; - } + /// Record that this handle just performed a write, so the next read forces + /// the server cache past the write time (read-your-write on a single + /// handle). + fn bump_freshness_baseline(&self) { + let now = SystemTime::now(); let mut state = self.freshness.lock().unwrap(); - state.min_version = Some(state.min_version.map_or(version, |v| v.max(version))); + state.freshness_baseline = Some(next_freshness_baseline(state.freshness_baseline, now)); } async fn execute_query( @@ -1115,7 +1109,7 @@ impl RemoteTable { if output.overwrite { self.invalidate_schema_cache(); } - self.track_write_version(add_result.version); + self.bump_freshness_baseline(); return Ok(add_result); } @@ -1153,7 +1147,7 @@ impl RemoteTable { if output.overwrite { self.invalidate_schema_cache(); } - self.track_write_version(result.version); + self.bump_freshness_baseline(); return Ok(result); } Err(e) => { @@ -1271,7 +1265,7 @@ impl BaseTable for RemoteTable { } async fn checkout(&self, version: u64) -> Result<()> { // Validate the version exists. The describe is sent without freshness - // headers so a stale `min_version` from a previous write doesn't ride + // headers so a stale baseline from a previous write doesn't ride // along on an explicit time-travel request. let request = self .client @@ -1306,11 +1300,10 @@ impl BaseTable for RemoteTable { *write_guard = None; drop(write_guard); - // Drop any per-handle write tracking; subsequent reads use the - // baseline timestamp captured now to guarantee freshness. + // Reset the freshness baseline to now so subsequent reads see at least + // the state as of this explicit `checkout_latest()`. *self.freshness.lock().unwrap() = FreshnessState { - min_version: None, - checkout_baseline: Some(SystemTime::now()), + freshness_baseline: Some(SystemTime::now()), }; // Invalidate schema cache since we're switching versions @@ -1627,7 +1620,7 @@ impl BaseTable for RemoteTable { status_code: None, })?; - self.track_write_version(update_response.version); + self.bump_freshness_baseline(); Ok(update_response) } @@ -1658,7 +1651,7 @@ impl BaseTable for RemoteTable { request_id, status_code: None, })?; - self.track_write_version(delete_response.version); + self.bump_freshness_baseline(); Ok(delete_response) } @@ -1815,7 +1808,7 @@ impl BaseTable for RemoteTable { status_code: None, })?; - self.track_write_version(merge_insert_response.version); + self.bump_freshness_baseline(); Ok(merge_insert_response) } @@ -1842,7 +1835,7 @@ impl BaseTable for RemoteTable { } async fn checkout_tag(&self, tag: &str) -> Result<()> { // Resolve the tag without attaching freshness headers; a stale - // `min_version` from a previous write should not ride along on an + // baseline from a previous write should not ride along on an // explicit time-travel request. let request = self .client @@ -1907,7 +1900,7 @@ impl BaseTable for RemoteTable { })?; self.invalidate_schema_cache(); - self.track_write_version(result.version); + self.bump_freshness_baseline(); Ok(result) } @@ -1962,7 +1955,7 @@ impl BaseTable for RemoteTable { })?; self.invalidate_schema_cache(); - self.track_write_version(result.version); + self.bump_freshness_baseline(); Ok(result) } @@ -1990,7 +1983,7 @@ impl BaseTable for RemoteTable { })?; self.invalidate_schema_cache(); - self.track_write_version(result.version); + self.bump_freshness_baseline(); Ok(result) } @@ -6167,6 +6160,21 @@ mod tests { // ---- Read freshness header tests ------------------------------------ + #[test] + fn test_next_freshness_baseline_is_monotonic() { + let t100 = SystemTime::UNIX_EPOCH + Duration::from_secs(100); + let t101 = SystemTime::UNIX_EPOCH + Duration::from_secs(101); + let t99 = SystemTime::UNIX_EPOCH + Duration::from_secs(99); + + // No prior baseline -> take the current time. + assert_eq!(next_freshness_baseline(None, t100), t100); + // Clock moved forward -> advance to it. + assert_eq!(next_freshness_baseline(Some(t100), t101), t101); + // Clock moved backward (NTP step / resume) -> keep the higher prior + // baseline so read-your-write isn't lowered below an earlier write. + assert_eq!(next_freshness_baseline(Some(t100), t99), t100); + } + #[test] fn test_compute_min_timestamp_combines_baseline_and_interval() { let now = SystemTime::now(); @@ -6180,8 +6188,7 @@ mod tests { // Baseline only -> baseline. let state = FreshnessState { - min_version: None, - checkout_baseline: Some(baseline), + freshness_baseline: Some(baseline), }; assert_eq!(compute_min_timestamp(&state, None, now), Some(baseline)); @@ -6204,8 +6211,7 @@ mod tests { // Both: pick the more-recent (i.e. tighter) constraint. // baseline = now-60, now-interval = now-10. now-10 is newer. let state = FreshnessState { - min_version: None, - checkout_baseline: Some(baseline), + freshness_baseline: Some(baseline), }; assert_eq!( compute_min_timestamp(&state, Some(Duration::from_secs(10)), now), @@ -6215,8 +6221,7 @@ mod tests { // Both, baseline newer: pick baseline. let recent_baseline = now - Duration::from_secs(5); let state = FreshnessState { - min_version: None, - checkout_baseline: Some(recent_baseline), + freshness_baseline: Some(recent_baseline), }; assert_eq!( compute_min_timestamp(&state, Some(Duration::from_secs(60)), now), @@ -6271,7 +6276,6 @@ mod tests { let headers = captured.lock().unwrap().clone().unwrap(); assert!(!headers.contains_key("x-lancedb-min-timestamp")); - assert!(!headers.contains_key("x-lancedb-min-version")); } #[tokio::test] @@ -6290,7 +6294,6 @@ mod tests { sent >= before - FRESHNESS_TOLERANCE && sent <= after + FRESHNESS_TOLERANCE, "expected timestamp roughly equal to wall clock" ); - assert!(!headers.contains_key("x-lancedb-min-version")); } #[tokio::test] @@ -6334,11 +6337,10 @@ mod tests { && sent <= after_checkout + FRESHNESS_TOLERANCE, "expected timestamp captured at checkout_latest() time" ); - assert!(!headers.contains_key("x-lancedb-min-version")); } #[tokio::test] - async fn test_freshness_min_version_tracked_after_write() { + async fn test_freshness_baseline_bumped_after_write() { let (handler, captured) = capturing_handler(|path| match path { "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), "/v1/table/my_table/count_rows/" => "42".to_string(), @@ -6346,19 +6348,17 @@ mod tests { }); let table = Table::new_with_handler("my_table", handler); + let before = SystemTime::now(); let _ = table.update().column("a", "a + 1").execute().await.unwrap(); - // Update headers also pass through captured; reset by reading after. table.count_rows(None).await.unwrap(); + let after = SystemTime::now(); + // The write bumped the freshness baseline, so the next read carries a + // min-timestamp at or after the write time (read-your-write). let headers = captured.lock().unwrap().clone().unwrap(); - assert_eq!( - headers - .get("x-lancedb-min-version") - .unwrap() - .to_str() - .unwrap(), - "7" - ); + let ts = parse_min_timestamp(&headers); + assert!(ts >= before - FRESHNESS_TOLERANCE); + assert!(ts <= after + FRESHNESS_TOLERANCE); } /// Like `capturing_handler`, but keeps a per-path snapshot of the headers @@ -6391,9 +6391,10 @@ mod tests { } #[tokio::test] - async fn test_freshness_checkout_validation_sends_no_min_version() { - // After a write bumps min_version, calling checkout(v) must not let - // that stale header ride along on the validating /describe/ request. + async fn test_freshness_checkout_validation_sends_no_freshness_headers() { + // After a write bumps the baseline, calling checkout(v) must not let + // that stale freshness header ride along on the validating /describe/ + // request. let (handler, captured) = path_capturing_handler(|path| match path { "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), "/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(), @@ -6409,16 +6410,15 @@ mod tests { .get("/v1/table/my_table/describe/") .expect("describe should have been called by checkout(v)"); assert!( - !describe_headers.contains_key("x-lancedb-min-version"), - "checkout(v) describe must not carry stale min_version", + !describe_headers.contains_key("x-lancedb-min-timestamp"), + "checkout(v) describe must not carry a stale freshness baseline", ); - assert!(!describe_headers.contains_key("x-lancedb-min-timestamp")); } #[tokio::test] - async fn test_freshness_checkout_tag_resolve_sends_no_min_version() { + async fn test_freshness_checkout_tag_resolve_sends_no_freshness_headers() { // Same invariant for checkout_tag: the tag-resolve request must not - // pick up a stale min_version from a prior write. + // pick up a stale freshness baseline from a prior write. let (handler, captured) = path_capturing_handler(|path| match path { "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), "/v1/table/my_table/tags/version/" => r#"{"version":5}"#.to_string(), @@ -6434,14 +6434,13 @@ mod tests { .get("/v1/table/my_table/tags/version/") .expect("tags/version should have been called by checkout_tag"); assert!( - !resolve_headers.contains_key("x-lancedb-min-version"), - "checkout_tag resolve must not carry stale min_version", + !resolve_headers.contains_key("x-lancedb-min-timestamp"), + "checkout_tag resolve must not carry a stale freshness baseline", ); - assert!(!resolve_headers.contains_key("x-lancedb-min-timestamp")); } #[tokio::test] - async fn test_freshness_checkout_clears_min_version() { + async fn test_freshness_checkout_clears_baseline() { let (handler, captured) = capturing_handler(|path| match path { "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), // checkout(5) needs to describe version 5 first @@ -6456,7 +6455,6 @@ mod tests { table.count_rows(None).await.unwrap(); let headers = captured.lock().unwrap().clone().unwrap(); - assert!(!headers.contains_key("x-lancedb-min-version")); assert!(!headers.contains_key("x-lancedb-min-timestamp")); } }