diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index f3976d4af..3dfb8218c 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -70,18 +70,29 @@ use tokio::sync::RwLock; const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms"); const MIN_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-version"); const MIN_TIMESTAMP_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-timestamp"); +const MIN_READ_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-read-version"); +const VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-version"); const METRIC_TYPE_KEY: &str = "metric_type"; const INDEX_TYPE_KEY: &str = "index_type"; const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30); const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5); -/// Per-table state driving the freshness headers (`x-lancedb-min-version` and -/// `x-lancedb-min-timestamp`) sent on read requests. +/// Per-table state driving the freshness headers (`x-lancedb-min-version`, +/// `x-lancedb-min-timestamp`, and `x-lancedb-min-read-version`) sent on read +/// requests. #[derive(Debug, Default, Clone, Copy)] struct FreshnessState { /// Provides read-your-write within a single handle: writes that return a /// version update this, and reads send it as `x-lancedb-min-version`. min_version: Option, + /// Highest dataset version observed in a *read* response on this handle. + /// Reads send it as `x-lancedb-min-read-version` so a load-balanced query + /// node whose cache is behind this version must refresh before serving, + /// giving monotonic reads across nodes regardless of which one the load + /// balancer routes to. Sourced only from reads (always committed dataset + /// versions), never from writes (which may return WAL entry ids), so it is + /// unaffected by the WAL/version mismatch that retired `min_version`. + min_read_version: Option, /// Wall-clock time captured at the last [`BaseTable::checkout_latest`] /// call. Subsequent reads send /// `max(baseline, now - read_consistency_interval)` as @@ -102,6 +113,7 @@ struct FreshnessState { struct FreshnessHeaders { min_version: Option, min_timestamp: Option, + min_read_version: Option, } impl FreshnessHeaders { @@ -113,6 +125,9 @@ impl FreshnessHeaders { let dt: chrono::DateTime = ts.into(); request = request.header(MIN_TIMESTAMP_HEADER, dt.to_rfc3339()); } + if let Some(v) = self.min_read_version { + request = request.header(MIN_READ_VERSION_HEADER, v.to_string()); + } request } } @@ -884,6 +899,7 @@ impl RemoteTable { self.client.read_consistency_interval, SystemTime::now(), ), + min_read_version: state.min_read_version, } } @@ -905,6 +921,30 @@ impl RemoteTable { state.min_version = Some(state.min_version.map_or(version, |v| v.max(version))); } + /// Record a dataset version observed in a *read* response so subsequent + /// reads request at least this version via `x-lancedb-min-read-version`, + /// giving monotonic reads across load-balanced query nodes. A returned `0` + /// (or absent header from an old server) is ignored. + fn track_read_version(&self, version: u64) { + if version == 0 { + return; + } + let mut state = self.freshness.lock().unwrap(); + state.min_read_version = Some(state.min_read_version.map_or(version, |v| v.max(version))); + } + + /// Parse the `x-lancedb-version` response header (the dataset version a read + /// reflects) and fold it into the read-version watermark. + fn track_read_version_from_headers(&self, headers: &reqwest::header::HeaderMap) { + if let Some(version) = headers + .get(&VERSION_HEADER) + .and_then(|value| value.to_str().ok()) + .and_then(|value| value.parse::().ok()) + { + self.track_read_version(version); + } + } + async fn execute_query( &self, query: &AnyQuery, @@ -928,6 +968,7 @@ impl RemoteTable { let futures = requests.into_iter().map(|req| async move { let (request_id, response) = self.send(req, true).await?; + self.track_read_version_from_headers(response.headers()); self.read_arrow_stream(&request_id, response).await }); let streams = futures::future::try_join_all(futures); @@ -1545,11 +1586,12 @@ impl BaseTable for RemoteTable { *write_guard = None; drop(write_guard); - // Drop any per-handle write tracking; subsequent reads use the + // Drop any per-handle read/write tracking; subsequent reads use the // baseline timestamp captured now to guarantee freshness. *self.freshness.lock().unwrap() = FreshnessState { min_version: None, checkout_baseline: Some(SystemTime::now()), + min_read_version: None, }; // Invalidate schema cache since we're switching versions @@ -1805,6 +1847,7 @@ impl BaseTable for RemoteTable { } }; + self.track_read_version_from_headers(response.headers()); let body = response.text().await.err_to_http(request_id.clone())?; serde_json::from_str(&body).map_err(|e| Error::Http { @@ -7124,6 +7167,7 @@ mod tests { let state = FreshnessState { min_version: None, checkout_baseline: Some(baseline), + min_read_version: None, }; assert_eq!(compute_min_timestamp(&state, None, now), Some(baseline)); @@ -7148,6 +7192,7 @@ mod tests { let state = FreshnessState { min_version: None, checkout_baseline: Some(baseline), + min_read_version: None, }; assert_eq!( compute_min_timestamp(&state, Some(Duration::from_secs(10)), now), @@ -7159,6 +7204,7 @@ mod tests { let state = FreshnessState { min_version: None, checkout_baseline: Some(recent_baseline), + min_read_version: None, }; assert_eq!( compute_min_timestamp(&state, Some(Duration::from_secs(60)), now), @@ -7303,6 +7349,106 @@ mod tests { ); } + /// A handler that records every request's headers and answers each read with + /// an `x-lancedb-version` response header taken from `versions` (by call + /// index, saturating at the last entry). An empty string means "no header". + fn read_version_handler( + versions: &'static [&'static str], + ) -> ( + impl Fn(reqwest::Request) -> http::Response + Clone + Send + Sync + 'static, + Arc>>, + ) { + let requests = Arc::new(std::sync::Mutex::new(Vec::new())); + let requests_c = requests.clone(); + let call = Arc::new(AtomicUsize::new(0)); + let handler = move |request: reqwest::Request| { + requests_c.lock().unwrap().push(request.headers().clone()); + let i = call.fetch_add(1, Ordering::SeqCst).min(versions.len() - 1); + let mut builder = http::Response::builder().status(200); + if !versions[i].is_empty() { + builder = builder.header("x-lancedb-version", versions[i]); + } + builder.body("42".to_string()).unwrap() + }; + (handler, requests) + } + + #[tokio::test] + async fn test_read_version_watermark_tracked_and_sent() { + let (handler, requests) = read_version_handler(&["100", "100"]); + let table = Table::new_with_handler("my_table", handler); + + // First read has no watermark yet; the response advertises version 100, + // so the second read must floor the server at 100. + table.count_rows(None).await.unwrap(); + table.count_rows(None).await.unwrap(); + + let reqs = requests.lock().unwrap(); + assert!(!reqs[0].contains_key("x-lancedb-min-read-version")); + assert_eq!( + reqs[1] + .get("x-lancedb-min-read-version") + .unwrap() + .to_str() + .unwrap(), + "100" + ); + } + + #[tokio::test] + async fn test_read_version_watermark_keeps_max() { + // Server reports 100 then a stale 50; the watermark must not regress. + let (handler, requests) = read_version_handler(&["100", "50", "50"]); + let table = Table::new_with_handler("my_table", handler); + + table.count_rows(None).await.unwrap(); + table.count_rows(None).await.unwrap(); + table.count_rows(None).await.unwrap(); + + let reqs = requests.lock().unwrap(); + assert_eq!( + reqs[2] + .get("x-lancedb-min-read-version") + .unwrap() + .to_str() + .unwrap(), + "100" + ); + } + + #[tokio::test] + async fn test_read_version_absent_header_no_watermark() { + // An old server that doesn't return the version header leaves the + // watermark unset, preserving backward compatibility. + let (handler, requests) = read_version_handler(&[""]); + let table = Table::new_with_handler("my_table", handler); + + table.count_rows(None).await.unwrap(); + table.count_rows(None).await.unwrap(); + + let reqs = requests.lock().unwrap(); + assert!(!reqs[1].contains_key("x-lancedb-min-read-version")); + } + + #[tokio::test] + async fn test_read_version_watermark_reset_on_checkout_latest() { + let (handler, requests) = read_version_handler(&["100", "100"]); + let table = Table::new_with_handler("my_table", handler); + + table.count_rows(None).await.unwrap(); + table.checkout_latest().await.unwrap(); + table.count_rows(None).await.unwrap(); + + // The read after checkout_latest starts from a clean slate. + let reqs = requests.lock().unwrap(); + assert!( + !reqs + .last() + .unwrap() + .contains_key("x-lancedb-min-read-version") + ); + } + /// Like `capturing_handler`, but keeps a per-path snapshot of the headers /// from every request so tests can assert on a specific endpoint. #[allow(clippy::type_complexity)]