Compare commits

...

1 Commits

Author SHA1 Message Date
Brendan Clement
cb6a1fafa6 refactor(remote): drop x-lancedb-min-version; use timestamp baseline for read-your-write 2026-05-28 20:05:10 -07:00

View File

@@ -66,47 +66,36 @@ use std::time::{Duration, SystemTime};
use tokio::sync::RwLock; use tokio::sync::RwLock;
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms"); const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
const MIN_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-version");
const MIN_TIMESTAMP_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-timestamp"); const MIN_TIMESTAMP_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-timestamp");
const METRIC_TYPE_KEY: &str = "metric_type"; const METRIC_TYPE_KEY: &str = "metric_type";
const INDEX_TYPE_KEY: &str = "index_type"; const INDEX_TYPE_KEY: &str = "index_type";
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30); const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5); const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5);
/// Per-table state driving the freshness headers (`x-lancedb-min-version` and /// Per-table state driving the `x-lancedb-min-timestamp` freshness header
/// `x-lancedb-min-timestamp`) sent on read requests. /// sent on read requests.
#[derive(Debug, Default, Clone, Copy)] #[derive(Debug, Default, Clone, Copy)]
struct FreshnessState { struct FreshnessState {
/// Provides read-your-write within a single handle: writes that return a /// Wall-clock floor for read freshness, bumped to `now` whenever this
/// version update this, and reads send it as `x-lancedb-min-version`. /// handle performs a write or an explicit [`BaseTable::checkout_latest`].
min_version: Option<u64>, /// Subsequent reads send `max(baseline, now - read_consistency_interval)`
/// Wall-clock time captured at the last [`BaseTable::checkout_latest`] /// as `x-lancedb-min-timestamp`.
/// call. Subsequent reads send
/// `max(baseline, now - read_consistency_interval)` as
/// `x-lancedb-min-timestamp`.
/// ///
/// Without this, `checkout_latest()` would have no effect on subsequent /// This is what provides read-your-write on a single handle: after a write
/// reads when `read_consistency_interval` is unset (the default): a /// the next read forces the server cache past the write time. It also
/// server-side cache could still serve a snapshot older than the moment /// preserves the `checkout_latest()` signal when `read_consistency_interval`
/// the user explicitly asked for "latest". The baseline forces the /// is unset (the default), where there is no interval-based floor.
/// server to skip any cache entry older than the checkout time, so the freshness_baseline: Option<SystemTime>,
/// `checkout_latest()` signal is preserved across reads on the same
/// handle regardless of the configured consistency interval.
checkout_baseline: Option<SystemTime>,
} }
/// Snapshot of the headers that should be attached to a single read request. /// Snapshot of the headers that should be attached to a single read request.
#[derive(Debug, Default, Clone, Copy)] #[derive(Debug, Default, Clone, Copy)]
struct FreshnessHeaders { struct FreshnessHeaders {
min_version: Option<u64>,
min_timestamp: Option<SystemTime>, min_timestamp: Option<SystemTime>,
} }
impl FreshnessHeaders { impl FreshnessHeaders {
fn apply(self, mut request: RequestBuilder) -> RequestBuilder { fn apply(self, mut request: RequestBuilder) -> RequestBuilder {
if let Some(v) = self.min_version {
request = request.header(MIN_VERSION_HEADER, v.to_string());
}
if let Some(ts) = self.min_timestamp { if let Some(ts) = self.min_timestamp {
let dt: chrono::DateTime<chrono::Utc> = ts.into(); let dt: chrono::DateTime<chrono::Utc> = ts.into();
request = request.header(MIN_TIMESTAMP_HEADER, dt.to_rfc3339()); request = request.header(MIN_TIMESTAMP_HEADER, dt.to_rfc3339());
@@ -115,6 +104,14 @@ impl FreshnessHeaders {
} }
} }
/// Monotonic floor for the freshness baseline. `SystemTime` is not monotonic
/// (NTP steps, hibernate/resume can move it backward), so a write must never
/// lower the baseline below a prior write's — that would let the next read send
/// a `min_timestamp` earlier than an earlier write and break read-your-write.
fn next_freshness_baseline(prev: Option<SystemTime>, now: SystemTime) -> SystemTime {
prev.map_or(now, |prev| prev.max(now))
}
fn compute_min_timestamp( fn compute_min_timestamp(
state: &FreshnessState, state: &FreshnessState,
interval: Option<Duration>, interval: Option<Duration>,
@@ -125,7 +122,7 @@ fn compute_min_timestamp(
Some(d) if d.is_zero() => Some(now), Some(d) if d.is_zero() => Some(now),
Some(d) => Some(now.checked_sub(d).unwrap_or(now)), Some(d) => Some(now.checked_sub(d).unwrap_or(now)),
}; };
match (interval_based, state.checkout_baseline) { match (interval_based, state.freshness_baseline) {
(None, None) => None, (None, None) => None,
(Some(t), None) | (None, Some(t)) => Some(t), (Some(t), None) | (None, Some(t)) => Some(t),
(Some(a), Some(b)) => Some(a.max(b)), (Some(a), Some(b)) => Some(a.max(b)),
@@ -787,7 +784,6 @@ impl<S: HttpSend> RemoteTable<S> {
fn snapshot_freshness_headers(&self) -> FreshnessHeaders { fn snapshot_freshness_headers(&self) -> FreshnessHeaders {
let state = *self.freshness.lock().unwrap(); let state = *self.freshness.lock().unwrap();
FreshnessHeaders { FreshnessHeaders {
min_version: state.min_version,
min_timestamp: compute_min_timestamp( min_timestamp: compute_min_timestamp(
&state, &state,
self.client.read_consistency_interval, self.client.read_consistency_interval,
@@ -796,22 +792,20 @@ impl<S: HttpSend> RemoteTable<S> {
} }
} }
/// Build a POST request and attach the read-freshness headers /// Build a POST request and attach the `x-lancedb-min-timestamp` freshness
/// (`x-lancedb-min-version`, `x-lancedb-min-timestamp`). /// header.
fn post_read(&self, uri: &str) -> RequestBuilder { fn post_read(&self, uri: &str) -> RequestBuilder {
self.snapshot_freshness_headers() self.snapshot_freshness_headers()
.apply(self.client.post(uri)) .apply(self.client.post(uri))
} }
/// Record a version returned by a write so subsequent reads can request at /// Record that this handle just performed a write, so the next read forces
/// least that version via `x-lancedb-min-version`. A returned `0` from a /// the server cache past the write time (read-your-write on a single
/// backward-compatible old server is ignored. /// handle).
fn track_write_version(&self, version: u64) { fn bump_freshness_baseline(&self) {
if version == 0 { let now = SystemTime::now();
return;
}
let mut state = self.freshness.lock().unwrap(); let mut state = self.freshness.lock().unwrap();
state.min_version = Some(state.min_version.map_or(version, |v| v.max(version))); state.freshness_baseline = Some(next_freshness_baseline(state.freshness_baseline, now));
} }
async fn execute_query( async fn execute_query(
@@ -1115,7 +1109,7 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
if output.overwrite { if output.overwrite {
self.invalidate_schema_cache(); self.invalidate_schema_cache();
} }
self.track_write_version(add_result.version); self.bump_freshness_baseline();
return Ok(add_result); return Ok(add_result);
} }
@@ -1153,7 +1147,7 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
if output.overwrite { if output.overwrite {
self.invalidate_schema_cache(); self.invalidate_schema_cache();
} }
self.track_write_version(result.version); self.bump_freshness_baseline();
return Ok(result); return Ok(result);
} }
Err(e) => { Err(e) => {
@@ -1271,7 +1265,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
} }
async fn checkout(&self, version: u64) -> Result<()> { async fn checkout(&self, version: u64) -> Result<()> {
// Validate the version exists. The describe is sent without freshness // Validate the version exists. The describe is sent without freshness
// headers so a stale `min_version` from a previous write doesn't ride // headers so a stale baseline from a previous write doesn't ride
// along on an explicit time-travel request. // along on an explicit time-travel request.
let request = self let request = self
.client .client
@@ -1306,11 +1300,10 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
*write_guard = None; *write_guard = None;
drop(write_guard); drop(write_guard);
// Drop any per-handle write tracking; subsequent reads use the // Reset the freshness baseline to now so subsequent reads see at least
// baseline timestamp captured now to guarantee freshness. // the state as of this explicit `checkout_latest()`.
*self.freshness.lock().unwrap() = FreshnessState { *self.freshness.lock().unwrap() = FreshnessState {
min_version: None, freshness_baseline: Some(SystemTime::now()),
checkout_baseline: Some(SystemTime::now()),
}; };
// Invalidate schema cache since we're switching versions // Invalidate schema cache since we're switching versions
@@ -1627,7 +1620,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
status_code: None, status_code: None,
})?; })?;
self.track_write_version(update_response.version); self.bump_freshness_baseline();
Ok(update_response) Ok(update_response)
} }
@@ -1658,7 +1651,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
request_id, request_id,
status_code: None, status_code: None,
})?; })?;
self.track_write_version(delete_response.version); self.bump_freshness_baseline();
Ok(delete_response) Ok(delete_response)
} }
@@ -1815,7 +1808,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
status_code: None, status_code: None,
})?; })?;
self.track_write_version(merge_insert_response.version); self.bump_freshness_baseline();
Ok(merge_insert_response) Ok(merge_insert_response)
} }
@@ -1842,7 +1835,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
} }
async fn checkout_tag(&self, tag: &str) -> Result<()> { async fn checkout_tag(&self, tag: &str) -> Result<()> {
// Resolve the tag without attaching freshness headers; a stale // Resolve the tag without attaching freshness headers; a stale
// `min_version` from a previous write should not ride along on an // baseline from a previous write should not ride along on an
// explicit time-travel request. // explicit time-travel request.
let request = self let request = self
.client .client
@@ -1907,7 +1900,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
})?; })?;
self.invalidate_schema_cache(); self.invalidate_schema_cache();
self.track_write_version(result.version); self.bump_freshness_baseline();
Ok(result) Ok(result)
} }
@@ -1962,7 +1955,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
})?; })?;
self.invalidate_schema_cache(); self.invalidate_schema_cache();
self.track_write_version(result.version); self.bump_freshness_baseline();
Ok(result) Ok(result)
} }
@@ -1990,7 +1983,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
})?; })?;
self.invalidate_schema_cache(); self.invalidate_schema_cache();
self.track_write_version(result.version); self.bump_freshness_baseline();
Ok(result) Ok(result)
} }
@@ -6167,6 +6160,21 @@ mod tests {
// ---- Read freshness header tests ------------------------------------ // ---- Read freshness header tests ------------------------------------
#[test]
fn test_next_freshness_baseline_is_monotonic() {
let t100 = SystemTime::UNIX_EPOCH + Duration::from_secs(100);
let t101 = SystemTime::UNIX_EPOCH + Duration::from_secs(101);
let t99 = SystemTime::UNIX_EPOCH + Duration::from_secs(99);
// No prior baseline -> take the current time.
assert_eq!(next_freshness_baseline(None, t100), t100);
// Clock moved forward -> advance to it.
assert_eq!(next_freshness_baseline(Some(t100), t101), t101);
// Clock moved backward (NTP step / resume) -> keep the higher prior
// baseline so read-your-write isn't lowered below an earlier write.
assert_eq!(next_freshness_baseline(Some(t100), t99), t100);
}
#[test] #[test]
fn test_compute_min_timestamp_combines_baseline_and_interval() { fn test_compute_min_timestamp_combines_baseline_and_interval() {
let now = SystemTime::now(); let now = SystemTime::now();
@@ -6180,8 +6188,7 @@ mod tests {
// Baseline only -> baseline. // Baseline only -> baseline.
let state = FreshnessState { let state = FreshnessState {
min_version: None, freshness_baseline: Some(baseline),
checkout_baseline: Some(baseline),
}; };
assert_eq!(compute_min_timestamp(&state, None, now), Some(baseline)); assert_eq!(compute_min_timestamp(&state, None, now), Some(baseline));
@@ -6204,8 +6211,7 @@ mod tests {
// Both: pick the more-recent (i.e. tighter) constraint. // Both: pick the more-recent (i.e. tighter) constraint.
// baseline = now-60, now-interval = now-10. now-10 is newer. // baseline = now-60, now-interval = now-10. now-10 is newer.
let state = FreshnessState { let state = FreshnessState {
min_version: None, freshness_baseline: Some(baseline),
checkout_baseline: Some(baseline),
}; };
assert_eq!( assert_eq!(
compute_min_timestamp(&state, Some(Duration::from_secs(10)), now), compute_min_timestamp(&state, Some(Duration::from_secs(10)), now),
@@ -6215,8 +6221,7 @@ mod tests {
// Both, baseline newer: pick baseline. // Both, baseline newer: pick baseline.
let recent_baseline = now - Duration::from_secs(5); let recent_baseline = now - Duration::from_secs(5);
let state = FreshnessState { let state = FreshnessState {
min_version: None, freshness_baseline: Some(recent_baseline),
checkout_baseline: Some(recent_baseline),
}; };
assert_eq!( assert_eq!(
compute_min_timestamp(&state, Some(Duration::from_secs(60)), now), compute_min_timestamp(&state, Some(Duration::from_secs(60)), now),
@@ -6271,7 +6276,6 @@ mod tests {
let headers = captured.lock().unwrap().clone().unwrap(); let headers = captured.lock().unwrap().clone().unwrap();
assert!(!headers.contains_key("x-lancedb-min-timestamp")); assert!(!headers.contains_key("x-lancedb-min-timestamp"));
assert!(!headers.contains_key("x-lancedb-min-version"));
} }
#[tokio::test] #[tokio::test]
@@ -6290,7 +6294,6 @@ mod tests {
sent >= before - FRESHNESS_TOLERANCE && sent <= after + FRESHNESS_TOLERANCE, sent >= before - FRESHNESS_TOLERANCE && sent <= after + FRESHNESS_TOLERANCE,
"expected timestamp roughly equal to wall clock" "expected timestamp roughly equal to wall clock"
); );
assert!(!headers.contains_key("x-lancedb-min-version"));
} }
#[tokio::test] #[tokio::test]
@@ -6334,11 +6337,10 @@ mod tests {
&& sent <= after_checkout + FRESHNESS_TOLERANCE, && sent <= after_checkout + FRESHNESS_TOLERANCE,
"expected timestamp captured at checkout_latest() time" "expected timestamp captured at checkout_latest() time"
); );
assert!(!headers.contains_key("x-lancedb-min-version"));
} }
#[tokio::test] #[tokio::test]
async fn test_freshness_min_version_tracked_after_write() { async fn test_freshness_baseline_bumped_after_write() {
let (handler, captured) = capturing_handler(|path| match path { let (handler, captured) = capturing_handler(|path| match path {
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
"/v1/table/my_table/count_rows/" => "42".to_string(), "/v1/table/my_table/count_rows/" => "42".to_string(),
@@ -6346,19 +6348,17 @@ mod tests {
}); });
let table = Table::new_with_handler("my_table", handler); let table = Table::new_with_handler("my_table", handler);
let before = SystemTime::now();
let _ = table.update().column("a", "a + 1").execute().await.unwrap(); let _ = table.update().column("a", "a + 1").execute().await.unwrap();
// Update headers also pass through captured; reset by reading after.
table.count_rows(None).await.unwrap(); table.count_rows(None).await.unwrap();
let after = SystemTime::now();
// The write bumped the freshness baseline, so the next read carries a
// min-timestamp at or after the write time (read-your-write).
let headers = captured.lock().unwrap().clone().unwrap(); let headers = captured.lock().unwrap().clone().unwrap();
assert_eq!( let ts = parse_min_timestamp(&headers);
headers assert!(ts >= before - FRESHNESS_TOLERANCE);
.get("x-lancedb-min-version") assert!(ts <= after + FRESHNESS_TOLERANCE);
.unwrap()
.to_str()
.unwrap(),
"7"
);
} }
/// Like `capturing_handler`, but keeps a per-path snapshot of the headers /// Like `capturing_handler`, but keeps a per-path snapshot of the headers
@@ -6391,9 +6391,10 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
async fn test_freshness_checkout_validation_sends_no_min_version() { async fn test_freshness_checkout_validation_sends_no_freshness_headers() {
// After a write bumps min_version, calling checkout(v) must not let // After a write bumps the baseline, calling checkout(v) must not let
// that stale header ride along on the validating /describe/ request. // that stale freshness header ride along on the validating /describe/
// request.
let (handler, captured) = path_capturing_handler(|path| match path { let (handler, captured) = path_capturing_handler(|path| match path {
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
"/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(), "/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(),
@@ -6409,16 +6410,15 @@ mod tests {
.get("/v1/table/my_table/describe/") .get("/v1/table/my_table/describe/")
.expect("describe should have been called by checkout(v)"); .expect("describe should have been called by checkout(v)");
assert!( assert!(
!describe_headers.contains_key("x-lancedb-min-version"), !describe_headers.contains_key("x-lancedb-min-timestamp"),
"checkout(v) describe must not carry stale min_version", "checkout(v) describe must not carry a stale freshness baseline",
); );
assert!(!describe_headers.contains_key("x-lancedb-min-timestamp"));
} }
#[tokio::test] #[tokio::test]
async fn test_freshness_checkout_tag_resolve_sends_no_min_version() { async fn test_freshness_checkout_tag_resolve_sends_no_freshness_headers() {
// Same invariant for checkout_tag: the tag-resolve request must not // Same invariant for checkout_tag: the tag-resolve request must not
// pick up a stale min_version from a prior write. // pick up a stale freshness baseline from a prior write.
let (handler, captured) = path_capturing_handler(|path| match path { let (handler, captured) = path_capturing_handler(|path| match path {
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
"/v1/table/my_table/tags/version/" => r#"{"version":5}"#.to_string(), "/v1/table/my_table/tags/version/" => r#"{"version":5}"#.to_string(),
@@ -6434,14 +6434,13 @@ mod tests {
.get("/v1/table/my_table/tags/version/") .get("/v1/table/my_table/tags/version/")
.expect("tags/version should have been called by checkout_tag"); .expect("tags/version should have been called by checkout_tag");
assert!( assert!(
!resolve_headers.contains_key("x-lancedb-min-version"), !resolve_headers.contains_key("x-lancedb-min-timestamp"),
"checkout_tag resolve must not carry stale min_version", "checkout_tag resolve must not carry a stale freshness baseline",
); );
assert!(!resolve_headers.contains_key("x-lancedb-min-timestamp"));
} }
#[tokio::test] #[tokio::test]
async fn test_freshness_checkout_clears_min_version() { async fn test_freshness_checkout_clears_baseline() {
let (handler, captured) = capturing_handler(|path| match path { let (handler, captured) = capturing_handler(|path| match path {
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
// checkout(5) needs to describe version 5 first // checkout(5) needs to describe version 5 first
@@ -6456,7 +6455,6 @@ mod tests {
table.count_rows(None).await.unwrap(); table.count_rows(None).await.unwrap();
let headers = captured.lock().unwrap().clone().unwrap(); let headers = captured.lock().unwrap().clone().unwrap();
assert!(!headers.contains_key("x-lancedb-min-version"));
assert!(!headers.contains_key("x-lancedb-min-timestamp")); assert!(!headers.contains_key("x-lancedb-min-timestamp"));
} }
} }