mirror of
https://github.com/lancedb/lancedb.git
synced 2026-06-20 12:40:39 +00:00
Compare commits
1 Commits
python-v0.
...
drop-min-v
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cb6a1fafa6 |
@@ -66,47 +66,36 @@ use std::time::{Duration, SystemTime};
|
|||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
|
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
|
||||||
const MIN_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-version");
|
|
||||||
const MIN_TIMESTAMP_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-timestamp");
|
const MIN_TIMESTAMP_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-timestamp");
|
||||||
const METRIC_TYPE_KEY: &str = "metric_type";
|
const METRIC_TYPE_KEY: &str = "metric_type";
|
||||||
const INDEX_TYPE_KEY: &str = "index_type";
|
const INDEX_TYPE_KEY: &str = "index_type";
|
||||||
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
|
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
|
||||||
const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5);
|
const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5);
|
||||||
|
|
||||||
/// Per-table state driving the freshness headers (`x-lancedb-min-version` and
|
/// Per-table state driving the `x-lancedb-min-timestamp` freshness header
|
||||||
/// `x-lancedb-min-timestamp`) sent on read requests.
|
/// sent on read requests.
|
||||||
#[derive(Debug, Default, Clone, Copy)]
|
#[derive(Debug, Default, Clone, Copy)]
|
||||||
struct FreshnessState {
|
struct FreshnessState {
|
||||||
/// Provides read-your-write within a single handle: writes that return a
|
/// Wall-clock floor for read freshness, bumped to `now` whenever this
|
||||||
/// version update this, and reads send it as `x-lancedb-min-version`.
|
/// handle performs a write or an explicit [`BaseTable::checkout_latest`].
|
||||||
min_version: Option<u64>,
|
/// Subsequent reads send `max(baseline, now - read_consistency_interval)`
|
||||||
/// Wall-clock time captured at the last [`BaseTable::checkout_latest`]
|
/// as `x-lancedb-min-timestamp`.
|
||||||
/// call. Subsequent reads send
|
|
||||||
/// `max(baseline, now - read_consistency_interval)` as
|
|
||||||
/// `x-lancedb-min-timestamp`.
|
|
||||||
///
|
///
|
||||||
/// Without this, `checkout_latest()` would have no effect on subsequent
|
/// This is what provides read-your-write on a single handle: after a write
|
||||||
/// reads when `read_consistency_interval` is unset (the default): a
|
/// the next read forces the server cache past the write time. It also
|
||||||
/// server-side cache could still serve a snapshot older than the moment
|
/// preserves the `checkout_latest()` signal when `read_consistency_interval`
|
||||||
/// the user explicitly asked for "latest". The baseline forces the
|
/// is unset (the default), where there is no interval-based floor.
|
||||||
/// server to skip any cache entry older than the checkout time, so the
|
freshness_baseline: Option<SystemTime>,
|
||||||
/// `checkout_latest()` signal is preserved across reads on the same
|
|
||||||
/// handle regardless of the configured consistency interval.
|
|
||||||
checkout_baseline: Option<SystemTime>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Snapshot of the headers that should be attached to a single read request.
|
/// Snapshot of the headers that should be attached to a single read request.
|
||||||
#[derive(Debug, Default, Clone, Copy)]
|
#[derive(Debug, Default, Clone, Copy)]
|
||||||
struct FreshnessHeaders {
|
struct FreshnessHeaders {
|
||||||
min_version: Option<u64>,
|
|
||||||
min_timestamp: Option<SystemTime>,
|
min_timestamp: Option<SystemTime>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FreshnessHeaders {
|
impl FreshnessHeaders {
|
||||||
fn apply(self, mut request: RequestBuilder) -> RequestBuilder {
|
fn apply(self, mut request: RequestBuilder) -> RequestBuilder {
|
||||||
if let Some(v) = self.min_version {
|
|
||||||
request = request.header(MIN_VERSION_HEADER, v.to_string());
|
|
||||||
}
|
|
||||||
if let Some(ts) = self.min_timestamp {
|
if let Some(ts) = self.min_timestamp {
|
||||||
let dt: chrono::DateTime<chrono::Utc> = ts.into();
|
let dt: chrono::DateTime<chrono::Utc> = ts.into();
|
||||||
request = request.header(MIN_TIMESTAMP_HEADER, dt.to_rfc3339());
|
request = request.header(MIN_TIMESTAMP_HEADER, dt.to_rfc3339());
|
||||||
@@ -115,6 +104,14 @@ impl FreshnessHeaders {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Monotonic floor for the freshness baseline. `SystemTime` is not monotonic
|
||||||
|
/// (NTP steps, hibernate/resume can move it backward), so a write must never
|
||||||
|
/// lower the baseline below a prior write's — that would let the next read send
|
||||||
|
/// a `min_timestamp` earlier than an earlier write and break read-your-write.
|
||||||
|
fn next_freshness_baseline(prev: Option<SystemTime>, now: SystemTime) -> SystemTime {
|
||||||
|
prev.map_or(now, |prev| prev.max(now))
|
||||||
|
}
|
||||||
|
|
||||||
fn compute_min_timestamp(
|
fn compute_min_timestamp(
|
||||||
state: &FreshnessState,
|
state: &FreshnessState,
|
||||||
interval: Option<Duration>,
|
interval: Option<Duration>,
|
||||||
@@ -125,7 +122,7 @@ fn compute_min_timestamp(
|
|||||||
Some(d) if d.is_zero() => Some(now),
|
Some(d) if d.is_zero() => Some(now),
|
||||||
Some(d) => Some(now.checked_sub(d).unwrap_or(now)),
|
Some(d) => Some(now.checked_sub(d).unwrap_or(now)),
|
||||||
};
|
};
|
||||||
match (interval_based, state.checkout_baseline) {
|
match (interval_based, state.freshness_baseline) {
|
||||||
(None, None) => None,
|
(None, None) => None,
|
||||||
(Some(t), None) | (None, Some(t)) => Some(t),
|
(Some(t), None) | (None, Some(t)) => Some(t),
|
||||||
(Some(a), Some(b)) => Some(a.max(b)),
|
(Some(a), Some(b)) => Some(a.max(b)),
|
||||||
@@ -787,7 +784,6 @@ impl<S: HttpSend> RemoteTable<S> {
|
|||||||
fn snapshot_freshness_headers(&self) -> FreshnessHeaders {
|
fn snapshot_freshness_headers(&self) -> FreshnessHeaders {
|
||||||
let state = *self.freshness.lock().unwrap();
|
let state = *self.freshness.lock().unwrap();
|
||||||
FreshnessHeaders {
|
FreshnessHeaders {
|
||||||
min_version: state.min_version,
|
|
||||||
min_timestamp: compute_min_timestamp(
|
min_timestamp: compute_min_timestamp(
|
||||||
&state,
|
&state,
|
||||||
self.client.read_consistency_interval,
|
self.client.read_consistency_interval,
|
||||||
@@ -796,22 +792,20 @@ impl<S: HttpSend> RemoteTable<S> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Build a POST request and attach the read-freshness headers
|
/// Build a POST request and attach the `x-lancedb-min-timestamp` freshness
|
||||||
/// (`x-lancedb-min-version`, `x-lancedb-min-timestamp`).
|
/// header.
|
||||||
fn post_read(&self, uri: &str) -> RequestBuilder {
|
fn post_read(&self, uri: &str) -> RequestBuilder {
|
||||||
self.snapshot_freshness_headers()
|
self.snapshot_freshness_headers()
|
||||||
.apply(self.client.post(uri))
|
.apply(self.client.post(uri))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Record a version returned by a write so subsequent reads can request at
|
/// Record that this handle just performed a write, so the next read forces
|
||||||
/// least that version via `x-lancedb-min-version`. A returned `0` from a
|
/// the server cache past the write time (read-your-write on a single
|
||||||
/// backward-compatible old server is ignored.
|
/// handle).
|
||||||
fn track_write_version(&self, version: u64) {
|
fn bump_freshness_baseline(&self) {
|
||||||
if version == 0 {
|
let now = SystemTime::now();
|
||||||
return;
|
|
||||||
}
|
|
||||||
let mut state = self.freshness.lock().unwrap();
|
let mut state = self.freshness.lock().unwrap();
|
||||||
state.min_version = Some(state.min_version.map_or(version, |v| v.max(version)));
|
state.freshness_baseline = Some(next_freshness_baseline(state.freshness_baseline, now));
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn execute_query(
|
async fn execute_query(
|
||||||
@@ -1115,7 +1109,7 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
|
|||||||
if output.overwrite {
|
if output.overwrite {
|
||||||
self.invalidate_schema_cache();
|
self.invalidate_schema_cache();
|
||||||
}
|
}
|
||||||
self.track_write_version(add_result.version);
|
self.bump_freshness_baseline();
|
||||||
|
|
||||||
return Ok(add_result);
|
return Ok(add_result);
|
||||||
}
|
}
|
||||||
@@ -1153,7 +1147,7 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
|
|||||||
if output.overwrite {
|
if output.overwrite {
|
||||||
self.invalidate_schema_cache();
|
self.invalidate_schema_cache();
|
||||||
}
|
}
|
||||||
self.track_write_version(result.version);
|
self.bump_freshness_baseline();
|
||||||
return Ok(result);
|
return Ok(result);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -1271,7 +1265,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
|||||||
}
|
}
|
||||||
async fn checkout(&self, version: u64) -> Result<()> {
|
async fn checkout(&self, version: u64) -> Result<()> {
|
||||||
// Validate the version exists. The describe is sent without freshness
|
// Validate the version exists. The describe is sent without freshness
|
||||||
// headers so a stale `min_version` from a previous write doesn't ride
|
// headers so a stale baseline from a previous write doesn't ride
|
||||||
// along on an explicit time-travel request.
|
// along on an explicit time-travel request.
|
||||||
let request = self
|
let request = self
|
||||||
.client
|
.client
|
||||||
@@ -1306,11 +1300,10 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
|||||||
*write_guard = None;
|
*write_guard = None;
|
||||||
drop(write_guard);
|
drop(write_guard);
|
||||||
|
|
||||||
// Drop any per-handle write tracking; subsequent reads use the
|
// Reset the freshness baseline to now so subsequent reads see at least
|
||||||
// baseline timestamp captured now to guarantee freshness.
|
// the state as of this explicit `checkout_latest()`.
|
||||||
*self.freshness.lock().unwrap() = FreshnessState {
|
*self.freshness.lock().unwrap() = FreshnessState {
|
||||||
min_version: None,
|
freshness_baseline: Some(SystemTime::now()),
|
||||||
checkout_baseline: Some(SystemTime::now()),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Invalidate schema cache since we're switching versions
|
// Invalidate schema cache since we're switching versions
|
||||||
@@ -1627,7 +1620,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
|||||||
status_code: None,
|
status_code: None,
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
self.track_write_version(update_response.version);
|
self.bump_freshness_baseline();
|
||||||
Ok(update_response)
|
Ok(update_response)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1658,7 +1651,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
|||||||
request_id,
|
request_id,
|
||||||
status_code: None,
|
status_code: None,
|
||||||
})?;
|
})?;
|
||||||
self.track_write_version(delete_response.version);
|
self.bump_freshness_baseline();
|
||||||
Ok(delete_response)
|
Ok(delete_response)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1815,7 +1808,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
|||||||
status_code: None,
|
status_code: None,
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
self.track_write_version(merge_insert_response.version);
|
self.bump_freshness_baseline();
|
||||||
Ok(merge_insert_response)
|
Ok(merge_insert_response)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1842,7 +1835,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
|||||||
}
|
}
|
||||||
async fn checkout_tag(&self, tag: &str) -> Result<()> {
|
async fn checkout_tag(&self, tag: &str) -> Result<()> {
|
||||||
// Resolve the tag without attaching freshness headers; a stale
|
// Resolve the tag without attaching freshness headers; a stale
|
||||||
// `min_version` from a previous write should not ride along on an
|
// baseline from a previous write should not ride along on an
|
||||||
// explicit time-travel request.
|
// explicit time-travel request.
|
||||||
let request = self
|
let request = self
|
||||||
.client
|
.client
|
||||||
@@ -1907,7 +1900,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
|||||||
})?;
|
})?;
|
||||||
|
|
||||||
self.invalidate_schema_cache();
|
self.invalidate_schema_cache();
|
||||||
self.track_write_version(result.version);
|
self.bump_freshness_baseline();
|
||||||
|
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
@@ -1962,7 +1955,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
|||||||
})?;
|
})?;
|
||||||
|
|
||||||
self.invalidate_schema_cache();
|
self.invalidate_schema_cache();
|
||||||
self.track_write_version(result.version);
|
self.bump_freshness_baseline();
|
||||||
|
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
@@ -1990,7 +1983,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
|
|||||||
})?;
|
})?;
|
||||||
|
|
||||||
self.invalidate_schema_cache();
|
self.invalidate_schema_cache();
|
||||||
self.track_write_version(result.version);
|
self.bump_freshness_baseline();
|
||||||
|
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
@@ -6167,6 +6160,21 @@ mod tests {
|
|||||||
|
|
||||||
// ---- Read freshness header tests ------------------------------------
|
// ---- Read freshness header tests ------------------------------------
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_next_freshness_baseline_is_monotonic() {
|
||||||
|
let t100 = SystemTime::UNIX_EPOCH + Duration::from_secs(100);
|
||||||
|
let t101 = SystemTime::UNIX_EPOCH + Duration::from_secs(101);
|
||||||
|
let t99 = SystemTime::UNIX_EPOCH + Duration::from_secs(99);
|
||||||
|
|
||||||
|
// No prior baseline -> take the current time.
|
||||||
|
assert_eq!(next_freshness_baseline(None, t100), t100);
|
||||||
|
// Clock moved forward -> advance to it.
|
||||||
|
assert_eq!(next_freshness_baseline(Some(t100), t101), t101);
|
||||||
|
// Clock moved backward (NTP step / resume) -> keep the higher prior
|
||||||
|
// baseline so read-your-write isn't lowered below an earlier write.
|
||||||
|
assert_eq!(next_freshness_baseline(Some(t100), t99), t100);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_compute_min_timestamp_combines_baseline_and_interval() {
|
fn test_compute_min_timestamp_combines_baseline_and_interval() {
|
||||||
let now = SystemTime::now();
|
let now = SystemTime::now();
|
||||||
@@ -6180,8 +6188,7 @@ mod tests {
|
|||||||
|
|
||||||
// Baseline only -> baseline.
|
// Baseline only -> baseline.
|
||||||
let state = FreshnessState {
|
let state = FreshnessState {
|
||||||
min_version: None,
|
freshness_baseline: Some(baseline),
|
||||||
checkout_baseline: Some(baseline),
|
|
||||||
};
|
};
|
||||||
assert_eq!(compute_min_timestamp(&state, None, now), Some(baseline));
|
assert_eq!(compute_min_timestamp(&state, None, now), Some(baseline));
|
||||||
|
|
||||||
@@ -6204,8 +6211,7 @@ mod tests {
|
|||||||
// Both: pick the more-recent (i.e. tighter) constraint.
|
// Both: pick the more-recent (i.e. tighter) constraint.
|
||||||
// baseline = now-60, now-interval = now-10. now-10 is newer.
|
// baseline = now-60, now-interval = now-10. now-10 is newer.
|
||||||
let state = FreshnessState {
|
let state = FreshnessState {
|
||||||
min_version: None,
|
freshness_baseline: Some(baseline),
|
||||||
checkout_baseline: Some(baseline),
|
|
||||||
};
|
};
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
compute_min_timestamp(&state, Some(Duration::from_secs(10)), now),
|
compute_min_timestamp(&state, Some(Duration::from_secs(10)), now),
|
||||||
@@ -6215,8 +6221,7 @@ mod tests {
|
|||||||
// Both, baseline newer: pick baseline.
|
// Both, baseline newer: pick baseline.
|
||||||
let recent_baseline = now - Duration::from_secs(5);
|
let recent_baseline = now - Duration::from_secs(5);
|
||||||
let state = FreshnessState {
|
let state = FreshnessState {
|
||||||
min_version: None,
|
freshness_baseline: Some(recent_baseline),
|
||||||
checkout_baseline: Some(recent_baseline),
|
|
||||||
};
|
};
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
compute_min_timestamp(&state, Some(Duration::from_secs(60)), now),
|
compute_min_timestamp(&state, Some(Duration::from_secs(60)), now),
|
||||||
@@ -6271,7 +6276,6 @@ mod tests {
|
|||||||
|
|
||||||
let headers = captured.lock().unwrap().clone().unwrap();
|
let headers = captured.lock().unwrap().clone().unwrap();
|
||||||
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
|
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
|
||||||
assert!(!headers.contains_key("x-lancedb-min-version"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@@ -6290,7 +6294,6 @@ mod tests {
|
|||||||
sent >= before - FRESHNESS_TOLERANCE && sent <= after + FRESHNESS_TOLERANCE,
|
sent >= before - FRESHNESS_TOLERANCE && sent <= after + FRESHNESS_TOLERANCE,
|
||||||
"expected timestamp roughly equal to wall clock"
|
"expected timestamp roughly equal to wall clock"
|
||||||
);
|
);
|
||||||
assert!(!headers.contains_key("x-lancedb-min-version"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@@ -6334,11 +6337,10 @@ mod tests {
|
|||||||
&& sent <= after_checkout + FRESHNESS_TOLERANCE,
|
&& sent <= after_checkout + FRESHNESS_TOLERANCE,
|
||||||
"expected timestamp captured at checkout_latest() time"
|
"expected timestamp captured at checkout_latest() time"
|
||||||
);
|
);
|
||||||
assert!(!headers.contains_key("x-lancedb-min-version"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_freshness_min_version_tracked_after_write() {
|
async fn test_freshness_baseline_bumped_after_write() {
|
||||||
let (handler, captured) = capturing_handler(|path| match path {
|
let (handler, captured) = capturing_handler(|path| match path {
|
||||||
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
||||||
"/v1/table/my_table/count_rows/" => "42".to_string(),
|
"/v1/table/my_table/count_rows/" => "42".to_string(),
|
||||||
@@ -6346,19 +6348,17 @@ mod tests {
|
|||||||
});
|
});
|
||||||
let table = Table::new_with_handler("my_table", handler);
|
let table = Table::new_with_handler("my_table", handler);
|
||||||
|
|
||||||
|
let before = SystemTime::now();
|
||||||
let _ = table.update().column("a", "a + 1").execute().await.unwrap();
|
let _ = table.update().column("a", "a + 1").execute().await.unwrap();
|
||||||
// Update headers also pass through captured; reset by reading after.
|
|
||||||
table.count_rows(None).await.unwrap();
|
table.count_rows(None).await.unwrap();
|
||||||
|
let after = SystemTime::now();
|
||||||
|
|
||||||
|
// The write bumped the freshness baseline, so the next read carries a
|
||||||
|
// min-timestamp at or after the write time (read-your-write).
|
||||||
let headers = captured.lock().unwrap().clone().unwrap();
|
let headers = captured.lock().unwrap().clone().unwrap();
|
||||||
assert_eq!(
|
let ts = parse_min_timestamp(&headers);
|
||||||
headers
|
assert!(ts >= before - FRESHNESS_TOLERANCE);
|
||||||
.get("x-lancedb-min-version")
|
assert!(ts <= after + FRESHNESS_TOLERANCE);
|
||||||
.unwrap()
|
|
||||||
.to_str()
|
|
||||||
.unwrap(),
|
|
||||||
"7"
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Like `capturing_handler`, but keeps a per-path snapshot of the headers
|
/// Like `capturing_handler`, but keeps a per-path snapshot of the headers
|
||||||
@@ -6391,9 +6391,10 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_freshness_checkout_validation_sends_no_min_version() {
|
async fn test_freshness_checkout_validation_sends_no_freshness_headers() {
|
||||||
// After a write bumps min_version, calling checkout(v) must not let
|
// After a write bumps the baseline, calling checkout(v) must not let
|
||||||
// that stale header ride along on the validating /describe/ request.
|
// that stale freshness header ride along on the validating /describe/
|
||||||
|
// request.
|
||||||
let (handler, captured) = path_capturing_handler(|path| match path {
|
let (handler, captured) = path_capturing_handler(|path| match path {
|
||||||
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
||||||
"/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(),
|
"/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(),
|
||||||
@@ -6409,16 +6410,15 @@ mod tests {
|
|||||||
.get("/v1/table/my_table/describe/")
|
.get("/v1/table/my_table/describe/")
|
||||||
.expect("describe should have been called by checkout(v)");
|
.expect("describe should have been called by checkout(v)");
|
||||||
assert!(
|
assert!(
|
||||||
!describe_headers.contains_key("x-lancedb-min-version"),
|
!describe_headers.contains_key("x-lancedb-min-timestamp"),
|
||||||
"checkout(v) describe must not carry stale min_version",
|
"checkout(v) describe must not carry a stale freshness baseline",
|
||||||
);
|
);
|
||||||
assert!(!describe_headers.contains_key("x-lancedb-min-timestamp"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_freshness_checkout_tag_resolve_sends_no_min_version() {
|
async fn test_freshness_checkout_tag_resolve_sends_no_freshness_headers() {
|
||||||
// Same invariant for checkout_tag: the tag-resolve request must not
|
// Same invariant for checkout_tag: the tag-resolve request must not
|
||||||
// pick up a stale min_version from a prior write.
|
// pick up a stale freshness baseline from a prior write.
|
||||||
let (handler, captured) = path_capturing_handler(|path| match path {
|
let (handler, captured) = path_capturing_handler(|path| match path {
|
||||||
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
||||||
"/v1/table/my_table/tags/version/" => r#"{"version":5}"#.to_string(),
|
"/v1/table/my_table/tags/version/" => r#"{"version":5}"#.to_string(),
|
||||||
@@ -6434,14 +6434,13 @@ mod tests {
|
|||||||
.get("/v1/table/my_table/tags/version/")
|
.get("/v1/table/my_table/tags/version/")
|
||||||
.expect("tags/version should have been called by checkout_tag");
|
.expect("tags/version should have been called by checkout_tag");
|
||||||
assert!(
|
assert!(
|
||||||
!resolve_headers.contains_key("x-lancedb-min-version"),
|
!resolve_headers.contains_key("x-lancedb-min-timestamp"),
|
||||||
"checkout_tag resolve must not carry stale min_version",
|
"checkout_tag resolve must not carry a stale freshness baseline",
|
||||||
);
|
);
|
||||||
assert!(!resolve_headers.contains_key("x-lancedb-min-timestamp"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_freshness_checkout_clears_min_version() {
|
async fn test_freshness_checkout_clears_baseline() {
|
||||||
let (handler, captured) = capturing_handler(|path| match path {
|
let (handler, captured) = capturing_handler(|path| match path {
|
||||||
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
|
||||||
// checkout(5) needs to describe version 5 first
|
// checkout(5) needs to describe version 5 first
|
||||||
@@ -6456,7 +6455,6 @@ mod tests {
|
|||||||
table.count_rows(None).await.unwrap();
|
table.count_rows(None).await.unwrap();
|
||||||
|
|
||||||
let headers = captured.lock().unwrap().clone().unwrap();
|
let headers = captured.lock().unwrap().clone().unwrap();
|
||||||
assert!(!headers.contains_key("x-lancedb-min-version"));
|
|
||||||
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
|
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user