mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-31 19:00:39 +00:00
refactor(remote): drop x-lancedb-min-version; use timestamp baseline for read-your-write
This commit is contained in:
@@ -66,47 +66,36 @@ use std::time::{Duration, SystemTime};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
|
||||
const MIN_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-version");
|
||||
const MIN_TIMESTAMP_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-timestamp");
|
||||
const METRIC_TYPE_KEY: &str = "metric_type";
|
||||
const INDEX_TYPE_KEY: &str = "index_type";
|
||||
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
|
||||
const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Per-table state driving the freshness headers (`x-lancedb-min-version` and
|
||||
/// `x-lancedb-min-timestamp`) sent on read requests.
|
||||
/// Per-table state driving the `x-lancedb-min-timestamp` freshness header
|
||||
/// sent on read requests.
|
||||
#[derive(Debug, Default, Clone, Copy)]
|
||||
struct FreshnessState {
|
||||
/// Provides read-your-write within a single handle: writes that return a
|
||||
/// version update this, and reads send it as `x-lancedb-min-version`.
|
||||
min_version: Option<u64>,
|
||||
/// Wall-clock time captured at the last [`BaseTable::checkout_latest`]
|
||||
/// call. Subsequent reads send
|
||||
/// `max(baseline, now - read_consistency_interval)` as
|
||||
/// `x-lancedb-min-timestamp`.
|
||||
/// Wall-clock floor for read freshness, bumped to `now` whenever this
|
||||
/// handle performs a write or an explicit [`BaseTable::checkout_latest`].
|
||||
/// Subsequent reads send `max(baseline, now - read_consistency_interval)`
|
||||
/// as `x-lancedb-min-timestamp`.
|
||||
///
|
||||
/// Without this, `checkout_latest()` would have no effect on subsequent
|
||||
/// reads when `read_consistency_interval` is unset (the default): a
|
||||
/// server-side cache could still serve a snapshot older than the moment
|
||||
/// the user explicitly asked for "latest". The baseline forces the
|
||||
/// server to skip any cache entry older than the checkout time, so the
|
||||
/// `checkout_latest()` signal is preserved across reads on the same
|
||||
/// handle regardless of the configured consistency interval.
|
||||
checkout_baseline: Option<SystemTime>,
|
||||
/// This is what provides read-your-write on a single handle: after a write
|
||||
/// the next read forces the server cache past the write time. It also
|
||||
/// preserves the `checkout_latest()` signal when `read_consistency_interval`
|
||||
/// is unset (the default), where there is no interval-based floor.
|
||||
freshness_baseline: Option<SystemTime>,
|
||||
}
|
||||
|
||||
/// Snapshot of the headers that should be attached to a single read request.
|
||||
#[derive(Debug, Default, Clone, Copy)]
|
||||
struct FreshnessHeaders {
|
||||
min_version: Option<u64>,
|
||||
min_timestamp: Option<SystemTime>,
|
||||
}
|
||||
|
||||
impl FreshnessHeaders {
|
||||
fn apply(self, mut request: RequestBuilder) -> RequestBuilder {
|
||||
if let Some(v) = self.min_version {
|
||||
request = request.header(MIN_VERSION_HEADER, v.to_string());
|
||||
}
|
||||
if let Some(ts) = self.min_timestamp {
|
||||
let dt: chrono::DateTime<chrono::Utc> = ts.into();
|
||||
request = request.header(MIN_TIMESTAMP_HEADER, dt.to_rfc3339());
|
||||
@@ -115,6 +104,14 @@ impl FreshnessHeaders {
|
||||
}
|
||||
}
|
||||
|
||||
/// Monotonic floor for the freshness baseline. `SystemTime` is not monotonic
|
||||
/// (NTP steps, hibernate/resume can move it backward), so a write must never
|
||||
/// lower the baseline below a prior write's — that would let the next read send
|
||||
/// a `min_timestamp` earlier than an earlier write and break read-your-write.
|
||||
fn next_freshness_baseline(prev: Option<SystemTime>, now: SystemTime) -> SystemTime {
|
||||
prev.map_or(now, |prev| prev.max(now))
|
||||
}
|
||||
|
||||
fn compute_min_timestamp(
|
||||
state: &FreshnessState,
|
||||
interval: Option<Duration>,
|
||||
@@ -125,7 +122,7 @@ fn compute_min_timestamp(
|
||||
Some(d) if d.is_zero() => Some(now),
|
||||
Some(d) => Some(now.checked_sub(d).unwrap_or(now)),
|
||||
};
|
||||
match (interval_based, state.checkout_baseline) {
|
||||
match (interval_based, state.freshness_baseline) {
|
||||
(None, None) => None,
|
||||
(Some(t), None) | (None, Some(t)) => Some(t),
|
||||
(Some(a), Some(b)) => Some(a.max(b)),
|
||||
@@ -787,7 +784,6 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
fn snapshot_freshness_headers(&self) -> FreshnessHeaders {
|
||||
let state = *self.freshness.lock().unwrap();
|
||||
FreshnessHeaders {
|
||||
min_version: state.min_version,
|
||||
min_timestamp: compute_min_timestamp(
|
||||
&state,
|
||||
self.client.read_consistency_interval,
|
||||
@@ -796,22 +792,20 @@ impl<S: HttpSend> RemoteTable<S> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Build a POST request and attach the read-freshness headers
|
||||
/// (`x-lancedb-min-version`, `x-lancedb-min-timestamp`).
|
||||
/// Build a POST request and attach the `x-lancedb-min-timestamp` freshness
|
||||
/// header.
|
||||
fn post_read(&self, uri: &str) -> RequestBuilder {
|
||||
self.snapshot_freshness_headers()
|
||||
.apply(self.client.post(uri))
|
||||
}
|
||||
|
||||
/// Record a version returned by a write so subsequent reads can request at
|
||||
/// least that version via `x-lancedb-min-version`. A returned `0` from a
|
||||
/// backward-compatible old server is ignored.
|
||||
fn track_write_version(&self, version: u64) {
|
||||
if version == 0 {
|
||||
return;
|
||||
}
|
||||
/// Record that this handle just performed a write, so the next read forces
|
||||
/// the server cache past the write time (read-your-write on a single
|
||||
/// handle).
|
||||
fn bump_freshness_baseline(&self) {
|
||||
let now = SystemTime::now();
|
||||
let mut state = self.freshness.lock().unwrap();
|
||||
state.min_version = Some(state.min_version.map_or(version, |v| v.max(version)));
|
||||
state.freshness_baseline = Some(next_freshness_baseline(state.freshness_baseline, now));
|
||||
}
|
||||
|
||||
async fn execute_query(
|
||||
@@ -1115,7 +1109,7 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
|
||||
if output.overwrite {
|
||||
self.invalidate_schema_cache();
|
||||
}
|
||||
self.track_write_version(add_result.version);
|
||||
self.bump_freshness_baseline();
|
||||
|
||||
return Ok(add_result);
|
||||
}
|
||||
@@ -1153,7 +1147,7 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
|
||||
if output.overwrite {
|
||||
self.invalidate_schema_cache();
|
||||
}
|
||||
self.track_write_version(result.version);
|
||||
self.bump_freshness_baseline();
|
||||
return Ok(result);
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -1271,7 +1265,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
}
|
||||
async fn checkout(&self, version: u64) -> Result<()> {
|
||||
// Validate the version exists. The describe is sent without freshness
|
||||
// headers so a stale `min_version` from a previous write doesn't ride
|
||||
// headers so a stale baseline from a previous write doesn't ride
|
||||
// along on an explicit time-travel request.
|
||||
let request = self
|
||||
.client
|
||||
@@ -1306,11 +1300,10 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
*write_guard = None;
|
||||
drop(write_guard);
|
||||
|
||||
// Drop any per-handle write tracking; subsequent reads use the
|
||||
// baseline timestamp captured now to guarantee freshness.
|
||||
// Reset the freshness baseline to now so subsequent reads see at least
|
||||
// the state as of this explicit `checkout_latest()`.
|
||||
*self.freshness.lock().unwrap() = FreshnessState {
|
||||
min_version: None,
|
||||
checkout_baseline: Some(SystemTime::now()),
|
||||
freshness_baseline: Some(SystemTime::now()),
|
||||
};
|
||||
|
||||
// Invalidate schema cache since we're switching versions
|
||||
@@ -1627,7 +1620,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
self.track_write_version(update_response.version);
|
||||
self.bump_freshness_baseline();
|
||||
Ok(update_response)
|
||||
}
|
||||
|
||||
@@ -1658,7 +1651,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
request_id,
|
||||
status_code: None,
|
||||
})?;
|
||||
self.track_write_version(delete_response.version);
|
||||
self.bump_freshness_baseline();
|
||||
Ok(delete_response)
|
||||
}
|
||||
|
||||
@@ -1815,7 +1808,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
status_code: None,
|
||||
})?;
|
||||
|
||||
self.track_write_version(merge_insert_response.version);
|
||||
self.bump_freshness_baseline();
|
||||
Ok(merge_insert_response)
|
||||
}
|
||||
|
||||
@@ -1842,7 +1835,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
}
|
||||
async fn checkout_tag(&self, tag: &str) -> Result<()> {
|
||||
// Resolve the tag without attaching freshness headers; a stale
|
||||
// `min_version` from a previous write should not ride along on an
|
||||
// baseline from a previous write should not ride along on an
|
||||
// explicit time-travel request.
|
||||
let request = self
|
||||
.client
|
||||
@@ -1907,7 +1900,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
})?;
|
||||
|
||||
self.invalidate_schema_cache();
|
||||
self.track_write_version(result.version);
|
||||
self.bump_freshness_baseline();
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
@@ -1962,7 +1955,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
})?;
|
||||
|
||||
self.invalidate_schema_cache();
|
||||
self.track_write_version(result.version);
|
||||
self.bump_freshness_baseline();
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
@@ -1990,7 +1983,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
||||
})?;
|
||||
|
||||
self.invalidate_schema_cache();
|
||||
self.track_write_version(result.version);
|
||||
self.bump_freshness_baseline();
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
@@ -6167,6 +6160,21 @@ mod tests {
|
||||
|
||||
// ---- Read freshness header tests ------------------------------------
|
||||
|
||||
#[test]
|
||||
fn test_next_freshness_baseline_is_monotonic() {
|
||||
let t100 = SystemTime::UNIX_EPOCH + Duration::from_secs(100);
|
||||
let t101 = SystemTime::UNIX_EPOCH + Duration::from_secs(101);
|
||||
let t99 = SystemTime::UNIX_EPOCH + Duration::from_secs(99);
|
||||
|
||||
// No prior baseline -> take the current time.
|
||||
assert_eq!(next_freshness_baseline(None, t100), t100);
|
||||
// Clock moved forward -> advance to it.
|
||||
assert_eq!(next_freshness_baseline(Some(t100), t101), t101);
|
||||
// Clock moved backward (NTP step / resume) -> keep the higher prior
|
||||
// baseline so read-your-write isn't lowered below an earlier write.
|
||||
assert_eq!(next_freshness_baseline(Some(t100), t99), t100);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_compute_min_timestamp_combines_baseline_and_interval() {
|
||||
let now = SystemTime::now();
|
||||
@@ -6180,8 +6188,7 @@ mod tests {
|
||||
|
||||
// Baseline only -> baseline.
|
||||
let state = FreshnessState {
|
||||
min_version: None,
|
||||
checkout_baseline: Some(baseline),
|
||||
freshness_baseline: Some(baseline),
|
||||
};
|
||||
assert_eq!(compute_min_timestamp(&state, None, now), Some(baseline));
|
||||
|
||||
@@ -6204,8 +6211,7 @@ mod tests {
|
||||
// Both: pick the more-recent (i.e. tighter) constraint.
|
||||
// baseline = now-60, now-interval = now-10. now-10 is newer.
|
||||
let state = FreshnessState {
|
||||
min_version: None,
|
||||
checkout_baseline: Some(baseline),
|
||||
freshness_baseline: Some(baseline),
|
||||
};
|
||||
assert_eq!(
|
||||
compute_min_timestamp(&state, Some(Duration::from_secs(10)), now),
|
||||
@@ -6215,8 +6221,7 @@ mod tests {
|
||||
// Both, baseline newer: pick baseline.
|
||||
let recent_baseline = now - Duration::from_secs(5);
|
||||
let state = FreshnessState {
|
||||
min_version: None,
|
||||
checkout_baseline: Some(recent_baseline),
|
||||
freshness_baseline: Some(recent_baseline),
|
||||
};
|
||||
assert_eq!(
|
||||
compute_min_timestamp(&state, Some(Duration::from_secs(60)), now),
|
||||
@@ -6271,7 +6276,6 @@ mod tests {
|
||||
|
||||
let headers = captured.lock().unwrap().clone().unwrap();
|
||||
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
|
||||
assert!(!headers.contains_key("x-lancedb-min-version"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -6290,7 +6294,6 @@ mod tests {
|
||||
sent >= before - FRESHNESS_TOLERANCE && sent <= after + FRESHNESS_TOLERANCE,
|
||||
"expected timestamp roughly equal to wall clock"
|
||||
);
|
||||
assert!(!headers.contains_key("x-lancedb-min-version"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -6334,11 +6337,10 @@ mod tests {
|
||||
&& sent <= after_checkout + FRESHNESS_TOLERANCE,
|
||||
"expected timestamp captured at checkout_latest() time"
|
||||
);
|
||||
assert!(!headers.contains_key("x-lancedb-min-version"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_min_version_tracked_after_write() {
|
||||
async fn test_freshness_baseline_bumped_after_write() {
|
||||
let (handler, captured) = capturing_handler(|path| match path {
|
||||
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
||||
"/v1/table/my_table/count_rows/" => "42".to_string(),
|
||||
@@ -6346,19 +6348,17 @@ mod tests {
|
||||
});
|
||||
let table = Table::new_with_handler("my_table", handler);
|
||||
|
||||
let before = SystemTime::now();
|
||||
let _ = table.update().column("a", "a + 1").execute().await.unwrap();
|
||||
// Update headers also pass through captured; reset by reading after.
|
||||
table.count_rows(None).await.unwrap();
|
||||
let after = SystemTime::now();
|
||||
|
||||
// The write bumped the freshness baseline, so the next read carries a
|
||||
// min-timestamp at or after the write time (read-your-write).
|
||||
let headers = captured.lock().unwrap().clone().unwrap();
|
||||
assert_eq!(
|
||||
headers
|
||||
.get("x-lancedb-min-version")
|
||||
.unwrap()
|
||||
.to_str()
|
||||
.unwrap(),
|
||||
"7"
|
||||
);
|
||||
let ts = parse_min_timestamp(&headers);
|
||||
assert!(ts >= before - FRESHNESS_TOLERANCE);
|
||||
assert!(ts <= after + FRESHNESS_TOLERANCE);
|
||||
}
|
||||
|
||||
/// Like `capturing_handler`, but keeps a per-path snapshot of the headers
|
||||
@@ -6391,9 +6391,10 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_checkout_validation_sends_no_min_version() {
|
||||
// After a write bumps min_version, calling checkout(v) must not let
|
||||
// that stale header ride along on the validating /describe/ request.
|
||||
async fn test_freshness_checkout_validation_sends_no_freshness_headers() {
|
||||
// After a write bumps the baseline, calling checkout(v) must not let
|
||||
// that stale freshness header ride along on the validating /describe/
|
||||
// request.
|
||||
let (handler, captured) = path_capturing_handler(|path| match path {
|
||||
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
||||
"/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(),
|
||||
@@ -6409,16 +6410,15 @@ mod tests {
|
||||
.get("/v1/table/my_table/describe/")
|
||||
.expect("describe should have been called by checkout(v)");
|
||||
assert!(
|
||||
!describe_headers.contains_key("x-lancedb-min-version"),
|
||||
"checkout(v) describe must not carry stale min_version",
|
||||
!describe_headers.contains_key("x-lancedb-min-timestamp"),
|
||||
"checkout(v) describe must not carry a stale freshness baseline",
|
||||
);
|
||||
assert!(!describe_headers.contains_key("x-lancedb-min-timestamp"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_checkout_tag_resolve_sends_no_min_version() {
|
||||
async fn test_freshness_checkout_tag_resolve_sends_no_freshness_headers() {
|
||||
// Same invariant for checkout_tag: the tag-resolve request must not
|
||||
// pick up a stale min_version from a prior write.
|
||||
// pick up a stale freshness baseline from a prior write.
|
||||
let (handler, captured) = path_capturing_handler(|path| match path {
|
||||
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
||||
"/v1/table/my_table/tags/version/" => r#"{"version":5}"#.to_string(),
|
||||
@@ -6434,14 +6434,13 @@ mod tests {
|
||||
.get("/v1/table/my_table/tags/version/")
|
||||
.expect("tags/version should have been called by checkout_tag");
|
||||
assert!(
|
||||
!resolve_headers.contains_key("x-lancedb-min-version"),
|
||||
"checkout_tag resolve must not carry stale min_version",
|
||||
!resolve_headers.contains_key("x-lancedb-min-timestamp"),
|
||||
"checkout_tag resolve must not carry a stale freshness baseline",
|
||||
);
|
||||
assert!(!resolve_headers.contains_key("x-lancedb-min-timestamp"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_freshness_checkout_clears_min_version() {
|
||||
async fn test_freshness_checkout_clears_baseline() {
|
||||
let (handler, captured) = capturing_handler(|path| match path {
|
||||
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
||||
// checkout(5) needs to describe version 5 first
|
||||
@@ -6456,7 +6455,6 @@ mod tests {
|
||||
table.count_rows(None).await.unwrap();
|
||||
|
||||
let headers = captured.lock().unwrap().clone().unwrap();
|
||||
assert!(!headers.contains_key("x-lancedb-min-version"));
|
||||
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user