Compare commits

...

1 Commits

Author SHA1 Message Date
Jack Ye
ed6b166fc4 feat(remote): monotonic reads via x-lancedb-min-read-version watermark
Track the highest dataset version observed in read responses (a new
x-lancedb-version response header) per table handle and send it back as
x-lancedb-min-read-version on subsequent reads. A load-balanced query node
whose cache is behind that version refreshes before serving, so successive
reads on a handle never move backward in version even when the load balancer
routes them to different nodes.

Sourced only from reads (always committed dataset versions), so unlike the
retired min_version it is unaffected by WAL writes returning WAL entry ids.
2026-06-29 17:30:33 -07:00

View File

@@ -70,18 +70,29 @@ use tokio::sync::RwLock;
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
const MIN_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-version");
const MIN_TIMESTAMP_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-timestamp");
const MIN_READ_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-read-version");
const VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-version");
const METRIC_TYPE_KEY: &str = "metric_type";
const INDEX_TYPE_KEY: &str = "index_type";
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5);
/// Per-table state driving the freshness headers (`x-lancedb-min-version` and
/// `x-lancedb-min-timestamp`) sent on read requests.
/// Per-table state driving the freshness headers (`x-lancedb-min-version`,
/// `x-lancedb-min-timestamp`, and `x-lancedb-min-read-version`) sent on read
/// requests.
#[derive(Debug, Default, Clone, Copy)]
struct FreshnessState {
/// Provides read-your-write within a single handle: writes that return a
/// version update this, and reads send it as `x-lancedb-min-version`.
min_version: Option<u64>,
/// Highest dataset version observed in a *read* response on this handle.
/// Reads send it as `x-lancedb-min-read-version` so a load-balanced query
/// node whose cache is behind this version must refresh before serving,
/// giving monotonic reads across nodes regardless of which one the load
/// balancer routes to. Sourced only from reads (always committed dataset
/// versions), never from writes (which may return WAL entry ids), so it is
/// unaffected by the WAL/version mismatch that retired `min_version`.
min_read_version: Option<u64>,
/// Wall-clock time captured at the last [`BaseTable::checkout_latest`]
/// call. Subsequent reads send
/// `max(baseline, now - read_consistency_interval)` as
@@ -102,6 +113,7 @@ struct FreshnessState {
struct FreshnessHeaders {
min_version: Option<u64>,
min_timestamp: Option<SystemTime>,
min_read_version: Option<u64>,
}
impl FreshnessHeaders {
@@ -113,6 +125,9 @@ impl FreshnessHeaders {
let dt: chrono::DateTime<chrono::Utc> = ts.into();
request = request.header(MIN_TIMESTAMP_HEADER, dt.to_rfc3339());
}
if let Some(v) = self.min_read_version {
request = request.header(MIN_READ_VERSION_HEADER, v.to_string());
}
request
}
}
@@ -884,6 +899,7 @@ impl<S: HttpSend> RemoteTable<S> {
self.client.read_consistency_interval,
SystemTime::now(),
),
min_read_version: state.min_read_version,
}
}
@@ -905,6 +921,30 @@ impl<S: HttpSend> RemoteTable<S> {
state.min_version = Some(state.min_version.map_or(version, |v| v.max(version)));
}
/// Record a dataset version observed in a *read* response so subsequent
/// reads request at least this version via `x-lancedb-min-read-version`,
/// giving monotonic reads across load-balanced query nodes. A returned `0`
/// (or absent header from an old server) is ignored.
fn track_read_version(&self, version: u64) {
if version == 0 {
return;
}
let mut state = self.freshness.lock().unwrap();
state.min_read_version = Some(state.min_read_version.map_or(version, |v| v.max(version)));
}
/// Parse the `x-lancedb-version` response header (the dataset version a read
/// reflects) and fold it into the read-version watermark.
fn track_read_version_from_headers(&self, headers: &reqwest::header::HeaderMap) {
if let Some(version) = headers
.get(&VERSION_HEADER)
.and_then(|value| value.to_str().ok())
.and_then(|value| value.parse::<u64>().ok())
{
self.track_read_version(version);
}
}
async fn execute_query(
&self,
query: &AnyQuery,
@@ -928,6 +968,7 @@ impl<S: HttpSend> RemoteTable<S> {
let futures = requests.into_iter().map(|req| async move {
let (request_id, response) = self.send(req, true).await?;
self.track_read_version_from_headers(response.headers());
self.read_arrow_stream(&request_id, response).await
});
let streams = futures::future::try_join_all(futures);
@@ -1545,11 +1586,12 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
*write_guard = None;
drop(write_guard);
// Drop any per-handle write tracking; subsequent reads use the
// Drop any per-handle read/write tracking; subsequent reads use the
// baseline timestamp captured now to guarantee freshness.
*self.freshness.lock().unwrap() = FreshnessState {
min_version: None,
checkout_baseline: Some(SystemTime::now()),
min_read_version: None,
};
// Invalidate schema cache since we're switching versions
@@ -1805,6 +1847,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
};
self.track_read_version_from_headers(response.headers());
let body = response.text().await.err_to_http(request_id.clone())?;
serde_json::from_str(&body).map_err(|e| Error::Http {
@@ -7124,6 +7167,7 @@ mod tests {
let state = FreshnessState {
min_version: None,
checkout_baseline: Some(baseline),
min_read_version: None,
};
assert_eq!(compute_min_timestamp(&state, None, now), Some(baseline));
@@ -7148,6 +7192,7 @@ mod tests {
let state = FreshnessState {
min_version: None,
checkout_baseline: Some(baseline),
min_read_version: None,
};
assert_eq!(
compute_min_timestamp(&state, Some(Duration::from_secs(10)), now),
@@ -7159,6 +7204,7 @@ mod tests {
let state = FreshnessState {
min_version: None,
checkout_baseline: Some(recent_baseline),
min_read_version: None,
};
assert_eq!(
compute_min_timestamp(&state, Some(Duration::from_secs(60)), now),
@@ -7303,6 +7349,106 @@ mod tests {
);
}
/// A handler that records every request's headers and answers each read with
/// an `x-lancedb-version` response header taken from `versions` (by call
/// index, saturating at the last entry). An empty string means "no header".
fn read_version_handler(
versions: &'static [&'static str],
) -> (
impl Fn(reqwest::Request) -> http::Response<String> + Clone + Send + Sync + 'static,
Arc<std::sync::Mutex<Vec<http::HeaderMap>>>,
) {
let requests = Arc::new(std::sync::Mutex::new(Vec::new()));
let requests_c = requests.clone();
let call = Arc::new(AtomicUsize::new(0));
let handler = move |request: reqwest::Request| {
requests_c.lock().unwrap().push(request.headers().clone());
let i = call.fetch_add(1, Ordering::SeqCst).min(versions.len() - 1);
let mut builder = http::Response::builder().status(200);
if !versions[i].is_empty() {
builder = builder.header("x-lancedb-version", versions[i]);
}
builder.body("42".to_string()).unwrap()
};
(handler, requests)
}
#[tokio::test]
async fn test_read_version_watermark_tracked_and_sent() {
let (handler, requests) = read_version_handler(&["100", "100"]);
let table = Table::new_with_handler("my_table", handler);
// First read has no watermark yet; the response advertises version 100,
// so the second read must floor the server at 100.
table.count_rows(None).await.unwrap();
table.count_rows(None).await.unwrap();
let reqs = requests.lock().unwrap();
assert!(!reqs[0].contains_key("x-lancedb-min-read-version"));
assert_eq!(
reqs[1]
.get("x-lancedb-min-read-version")
.unwrap()
.to_str()
.unwrap(),
"100"
);
}
#[tokio::test]
async fn test_read_version_watermark_keeps_max() {
// Server reports 100 then a stale 50; the watermark must not regress.
let (handler, requests) = read_version_handler(&["100", "50", "50"]);
let table = Table::new_with_handler("my_table", handler);
table.count_rows(None).await.unwrap();
table.count_rows(None).await.unwrap();
table.count_rows(None).await.unwrap();
let reqs = requests.lock().unwrap();
assert_eq!(
reqs[2]
.get("x-lancedb-min-read-version")
.unwrap()
.to_str()
.unwrap(),
"100"
);
}
#[tokio::test]
async fn test_read_version_absent_header_no_watermark() {
// An old server that doesn't return the version header leaves the
// watermark unset, preserving backward compatibility.
let (handler, requests) = read_version_handler(&[""]);
let table = Table::new_with_handler("my_table", handler);
table.count_rows(None).await.unwrap();
table.count_rows(None).await.unwrap();
let reqs = requests.lock().unwrap();
assert!(!reqs[1].contains_key("x-lancedb-min-read-version"));
}
#[tokio::test]
async fn test_read_version_watermark_reset_on_checkout_latest() {
let (handler, requests) = read_version_handler(&["100", "100"]);
let table = Table::new_with_handler("my_table", handler);
table.count_rows(None).await.unwrap();
table.checkout_latest().await.unwrap();
table.count_rows(None).await.unwrap();
// The read after checkout_latest starts from a clean slate.
let reqs = requests.lock().unwrap();
assert!(
!reqs
.last()
.unwrap()
.contains_key("x-lancedb-min-read-version")
);
}
/// Like `capturing_handler`, but keeps a per-path snapshot of the headers
/// from every request so tests can assert on a specific endpoint.
#[allow(clippy::type_complexity)]