feat(remote): send read freshness headers for remote table consistency

This commit is contained in:
Brendan Clement
2026-05-22 20:21:39 -07:00
parent ccec91d957
commit cbdad17f34
7 changed files with 578 additions and 83 deletions

View File

@@ -217,6 +217,7 @@ def connect(
request_thread_pool=request_thread_pool,
client_config=client_config,
storage_options=storage_options,
read_consistency_interval=read_consistency_interval,
**kwargs,
)
_check_s3_bucket_with_dots(str(uri), storage_options)

View File

@@ -50,6 +50,7 @@ class RemoteDBConnection(DBConnection):
connection_timeout: Optional[float] = None,
read_timeout: Optional[float] = None,
storage_options: Optional[Dict[str, str]] = None,
read_consistency_interval: Optional[timedelta] = None,
):
"""Connect to a remote LanceDB database."""
if isinstance(client_config, dict):
@@ -103,6 +104,7 @@ class RemoteDBConnection(DBConnection):
host_override=host_override,
client_config=client_config,
storage_options=storage_options,
read_consistency_interval=read_consistency_interval,
)
)

View File

@@ -812,8 +812,7 @@ impl ConnectBuilder {
self
}
/// The interval at which to check for updates from other processes. This
/// only affects LanceDB OSS.
/// The interval at which to check for updates from other processes.
///
/// If left unset, consistency is not checked. For maximum read
/// performance, this is the default. For strong consistency, set this to
@@ -825,8 +824,9 @@ impl ConnectBuilder {
/// This only affects read operations. Write operations are always
/// consistent.
///
/// LanceDB Cloud uses eventual consistency under the hood, and is not
/// currently configurable.
/// For LanceDB Cloud and Enterprise, the interval is sent on every read as
/// an `x-lancedb-min-timestamp` freshness header so the server's cache
/// honors the same semantics.
pub fn read_consistency_interval(
mut self,
read_consistency_interval: std::time::Duration,
@@ -886,6 +886,7 @@ impl ConnectBuilder {
options.host_override,
self.request.client_config,
storage_options.into(),
self.request.read_consistency_interval,
)?);
Ok(Connection {
internal,

View File

@@ -245,6 +245,9 @@ pub struct RestfulLanceDbClient<S: HttpSend = Sender> {
pub(crate) sender: S,
pub(crate) id_delimiter: String,
pub(crate) header_provider: Option<Arc<dyn HeaderProvider>>,
/// Connection-level read consistency interval. Drives the
/// `x-lancedb-min-timestamp` freshness header sent on read requests.
pub(crate) read_consistency_interval: Option<Duration>,
}
impl<S: HttpSend> std::fmt::Debug for RestfulLanceDbClient<S> {
@@ -338,6 +341,7 @@ impl RestfulLanceDbClient<Sender> {
host_override: Option<String>,
default_headers: HeaderMap,
client_config: ClientConfig,
read_consistency_interval: Option<Duration>,
) -> Result<Self> {
// Get the timeouts
let timeout =
@@ -435,6 +439,7 @@ impl RestfulLanceDbClient<Sender> {
.clone()
.unwrap_or("$".to_string()),
header_provider: client_config.header_provider,
read_consistency_interval,
})
}
}
@@ -840,6 +845,16 @@ pub mod test_utils {
pub fn client_with_handler<T>(
handler: impl Fn(reqwest::Request) -> http::response::Response<T> + Send + Sync + 'static,
) -> RestfulLanceDbClient<MockSender>
where
T: Into<reqwest::Body>,
{
client_with_handler_and_interval(handler, None)
}
pub fn client_with_handler_and_interval<T>(
handler: impl Fn(reqwest::Request) -> http::response::Response<T> + Send + Sync + 'static,
read_consistency_interval: Option<Duration>,
) -> RestfulLanceDbClient<MockSender>
where
T: Into<reqwest::Body>,
{
@@ -857,6 +872,7 @@ pub mod test_utils {
},
id_delimiter: "$".to_string(),
header_provider: None,
read_consistency_interval,
}
}
@@ -881,6 +897,7 @@ pub mod test_utils {
},
id_delimiter: config.id_delimiter.unwrap_or_else(|| "$".to_string()),
header_provider: config.header_provider,
read_consistency_interval: None,
}
}
}
@@ -1047,6 +1064,7 @@ mod tests {
sender: Sender,
id_delimiter: "+".to_string(),
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
read_consistency_interval: None,
};
// Apply dynamic headers
@@ -1082,6 +1100,7 @@ mod tests {
sender: Sender,
id_delimiter: "+".to_string(),
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
read_consistency_interval: None,
};
// Apply dynamic headers
@@ -1119,6 +1138,7 @@ mod tests {
sender: Sender,
id_delimiter: "+".to_string(),
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
read_consistency_interval: None,
};
// Header provider errors should fail the request

View File

@@ -206,6 +206,7 @@ impl RemoteDatabase {
host_override: Option<String>,
client_config: ClientConfig,
options: RemoteOptions,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> {
let parsed = super::client::parse_db_url(uri)?;
let header_map = RestfulLanceDbClient::<Sender>::default_headers(
@@ -233,6 +234,7 @@ impl RemoteDatabase {
host_override,
header_map,
client_config.clone(),
read_consistency_interval,
)?;
let table_cache = Cache::builder()

View File

@@ -62,15 +62,67 @@ use std::collections::HashMap;
use std::io::Cursor;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::{Duration, SystemTime};
use tokio::sync::RwLock;
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
const MIN_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-version");
const MIN_TIMESTAMP_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-timestamp");
const METRIC_TYPE_KEY: &str = "metric_type";
const INDEX_TYPE_KEY: &str = "index_type";
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5);
/// Per-table state driving the freshness headers (`x-lancedb-min-version` and
/// `x-lancedb-min-timestamp`) sent on read requests.
///
/// `min_version` provides read-your-write within a single handle: writes that
/// return a version update it, and reads send it. `checkout_baseline` is the
/// wall-clock time captured at the last [`BaseTable::checkout_latest`] call;
/// reads send `max(baseline, now - read_consistency_interval)`.
#[derive(Debug, Default, Clone, Copy)]
struct FreshnessState {
min_version: Option<u64>,
checkout_baseline: Option<SystemTime>,
}
/// Snapshot of the headers that should be attached to a single read request.
#[derive(Debug, Default, Clone, Copy)]
struct FreshnessHeaders {
min_version: Option<u64>,
min_timestamp: Option<SystemTime>,
}
impl FreshnessHeaders {
fn apply(self, mut request: RequestBuilder) -> RequestBuilder {
if let Some(v) = self.min_version {
request = request.header(MIN_VERSION_HEADER, v.to_string());
}
if let Some(ts) = self.min_timestamp {
let dt: chrono::DateTime<chrono::Utc> = ts.into();
request = request.header(MIN_TIMESTAMP_HEADER, dt.to_rfc3339());
}
request
}
}
fn compute_min_timestamp(
state: &FreshnessState,
interval: Option<Duration>,
now: SystemTime,
) -> Option<SystemTime> {
let interval_based = match interval {
None => None,
Some(d) if d.is_zero() => Some(now),
Some(d) => Some(now.checked_sub(d).unwrap_or(now)),
};
match (interval_based, state.checkout_baseline) {
(None, None) => None,
(Some(t), None) | (None, Some(t)) => Some(t),
(Some(a), Some(b)) => Some(a.max(b)),
}
}
pub struct RemoteTags<'a, S: HttpSend = Sender> {
inner: &'a RemoteTable<S>,
}
@@ -80,8 +132,7 @@ impl<S: HttpSend + 'static> Tags for RemoteTags<'_, S> {
async fn list(&self) -> Result<HashMap<String, TagContents>> {
let request = self
.inner
.client
.post(&format!("/v1/table/{}/tags/list/", self.inner.identifier));
.post_read(&format!("/v1/table/{}/tags/list/", self.inner.identifier));
let (request_id, response) = self.inner.send(request, true).await?;
let response = self
.inner
@@ -112,48 +163,13 @@ impl<S: HttpSend + 'static> Tags for RemoteTags<'_, S> {
}
async fn get_version(&self, tag: &str) -> Result<u64> {
let request = self
.inner
.client
.post(&format!(
"/v1/table/{}/tags/version/",
self.inner.identifier
))
.json(&serde_json::json!({ "tag": tag }));
let (request_id, response) = self.inner.send(request, true).await?;
let response = self
.inner
.check_table_response(&request_id, response)
.await?;
match response.text().await {
Ok(body) => {
let value: serde_json::Value =
serde_json::from_str(&body).map_err(|e| Error::Http {
source: format!("Failed to parse tag version: {}", e).into(),
request_id: request_id.clone(),
status_code: None,
})?;
value
.get("version")
.and_then(|v| v.as_u64())
.ok_or_else(|| Error::Http {
source: format!("Invalid tag version response: {}", body).into(),
request_id,
status_code: None,
})
}
Err(err) => {
let status_code = err.status();
Err(Error::Http {
source: Box::new(err),
request_id,
status_code,
})
}
}
let request = self.inner.post_read(&format!(
"/v1/table/{}/tags/version/",
self.inner.identifier
));
self.inner
.resolve_tag_version_with_request(tag, request)
.await
}
async fn create(&mut self, tag: &str, version: u64) -> Result<()> {
@@ -215,6 +231,7 @@ pub struct RemoteTable<S: HttpSend = Sender> {
version: RwLock<Option<u64>>,
location: RwLock<Option<String>>,
schema_cache: BackgroundCache<SchemaRef, Error>,
freshness: Mutex<FreshnessState>,
}
impl<S: HttpSend> std::fmt::Debug for RemoteTable<S> {
@@ -243,6 +260,7 @@ impl<S: HttpSend> RemoteTable<S> {
version: RwLock::new(None),
location: RwLock::new(None),
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
freshness: Mutex::new(FreshnessState::default()),
}
}
@@ -252,12 +270,56 @@ impl<S: HttpSend> RemoteTable<S> {
}
async fn describe_version(&self, version: Option<u64>) -> Result<TableDescription> {
let mut request = self
.client
.post(&format!("/v1/table/{}/describe/", self.identifier));
let request = self.post_read(&format!("/v1/table/{}/describe/", self.identifier));
self.describe_with_request(request, version).await
}
async fn resolve_tag_version_with_request(
&self,
tag: &str,
request: RequestBuilder,
) -> Result<u64> {
let request = request.json(&serde_json::json!({ "tag": tag }));
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
match response.text().await {
Ok(body) => {
let value: serde_json::Value =
serde_json::from_str(&body).map_err(|e| Error::Http {
source: format!("Failed to parse tag version: {}", e).into(),
request_id: request_id.clone(),
status_code: None,
})?;
value
.get("version")
.and_then(|v| v.as_u64())
.ok_or_else(|| Error::Http {
source: format!("Invalid tag version response: {}", body).into(),
request_id,
status_code: None,
})
}
Err(err) => {
let status_code = err.status();
Err(Error::Http {
source: Box::new(err),
request_id,
status_code,
})
}
}
}
async fn describe_with_request(
&self,
request: RequestBuilder,
version: Option<u64>,
) -> Result<TableDescription> {
let body = serde_json::json!({ "version": version });
request = request.json(&body);
let request = request.json(&body);
let (request_id, response) = self.send(request, true).await?;
@@ -711,14 +773,44 @@ impl<S: HttpSend> RemoteTable<S> {
*read_guard
}
/// Snapshot the freshness headers to attach to a single read request.
/// Computed at call time so that retries reuse the same snapshot.
fn snapshot_freshness_headers(&self) -> FreshnessHeaders {
let state = *self.freshness.lock().unwrap();
FreshnessHeaders {
min_version: state.min_version,
min_timestamp: compute_min_timestamp(
&state,
self.client.read_consistency_interval,
SystemTime::now(),
),
}
}
/// Build a POST request and attach the read-freshness headers
/// (`x-lancedb-min-version`, `x-lancedb-min-timestamp`).
fn post_read(&self, uri: &str) -> RequestBuilder {
self.snapshot_freshness_headers()
.apply(self.client.post(uri))
}
/// Record a version returned by a write so subsequent reads can request at
/// least that version via `x-lancedb-min-version`. A returned `0` from a
/// backward-compatible old server is ignored.
fn track_write_version(&self, version: u64) {
if version == 0 {
return;
}
let mut state = self.freshness.lock().unwrap();
state.min_version = Some(state.min_version.map_or(version, |v| v.max(version)));
}
async fn execute_query(
&self,
query: &AnyQuery,
options: &QueryExecutionOptions,
) -> Result<Vec<Pin<Box<dyn RecordBatchStream + Send>>>> {
let mut request = self
.client
.post(&format!("/v1/table/{}/query/", self.identifier));
let mut request = self.post_read(&format!("/v1/table/{}/query/", self.identifier));
if let Some(timeout) = options.timeout {
// Also send to server, so it can abort the query if it takes too long.
@@ -824,9 +916,10 @@ async fn fetch_schema<S: HttpSend>(
identifier: &str,
table_name: &str,
version: Option<u64>,
freshness_headers: FreshnessHeaders,
) -> Result<SchemaRef> {
let request = client
.post(&format!("/v1/table/{}/describe/", identifier))
let request = freshness_headers
.apply(client.post(&format!("/v1/table/{}/describe/", identifier)))
.json(&serde_json::json!({ "version": version }));
let (request_id, response) = client.send_with_retry(request, None, true).await?;
@@ -874,7 +967,9 @@ mod test_utils {
use super::*;
use crate::remote::ClientConfig;
use crate::remote::client::test_utils::client_with_handler;
use crate::remote::client::test_utils::{MockSender, client_with_handler_and_config};
use crate::remote::client::test_utils::{
MockSender, client_with_handler_and_config, client_with_handler_and_interval,
};
impl RemoteTable<MockSender> {
pub fn new_mock<F, T>(name: String, handler: F, version: Option<semver::Version>) -> Self
@@ -892,6 +987,30 @@ mod test_utils {
version: RwLock::new(None),
location: RwLock::new(None),
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
freshness: Mutex::new(FreshnessState::default()),
}
}
pub fn new_mock_with_consistency_interval<F, T>(
name: String,
handler: F,
read_consistency_interval: Option<Duration>,
) -> Self
where
F: Fn(reqwest::Request) -> http::Response<T> + Send + Sync + 'static,
T: Into<reqwest::Body>,
{
let client = client_with_handler_and_interval(handler, read_consistency_interval);
Self {
client,
name: name.clone(),
namespace: vec![],
identifier: name,
server_version: ServerVersion::default(),
version: RwLock::new(None),
location: RwLock::new(None),
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
freshness: Mutex::new(FreshnessState::default()),
}
}
@@ -923,6 +1042,7 @@ mod test_utils {
version: RwLock::new(None),
location: RwLock::new(None),
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
freshness: Mutex::new(FreshnessState::default()),
}
}
}
@@ -986,6 +1106,7 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
if output.overwrite {
self.invalidate_schema_cache();
}
self.track_write_version(add_result.version);
return Ok(add_result);
}
@@ -1023,6 +1144,7 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
if output.overwrite {
self.invalidate_schema_cache();
}
self.track_write_version(result.version);
return Ok(result);
}
Err(e) => {
@@ -1139,8 +1261,13 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
self.describe().await.map(|desc| desc.version)
}
async fn checkout(&self, version: u64) -> Result<()> {
// check that the version exists
self.describe_version(Some(version))
// Validate the version exists. The describe is sent without freshness
// headers so a stale `min_version` from a previous write doesn't ride
// along on an explicit time-travel request.
let request = self
.client
.post(&format!("/v1/table/{}/describe/", self.identifier));
self.describe_with_request(request, Some(version))
.await
.map_err(|e| match e {
// try to map the error to a more user-friendly error telling them
@@ -1156,6 +1283,10 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
*write_guard = Some(version);
drop(write_guard);
// Explicit time-travel: drop any read-your-write / freshness
// constraints so the user sees exactly the requested version.
*self.freshness.lock().unwrap() = FreshnessState::default();
// Invalidate schema cache since we're switching versions
self.invalidate_schema_cache();
@@ -1166,6 +1297,13 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
*write_guard = None;
drop(write_guard);
// Drop any per-handle write tracking; subsequent reads use the
// baseline timestamp captured now to guarantee freshness.
*self.freshness.lock().unwrap() = FreshnessState {
min_version: None,
checkout_baseline: Some(SystemTime::now()),
};
// Invalidate schema cache since we're switching versions
self.invalidate_schema_cache();
@@ -1186,9 +1324,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
async fn list_versions(&self) -> Result<Vec<Version>> {
let request = self
.client
.post(&format!("/v1/table/{}/version/list/", self.identifier));
let request = self.post_read(&format!("/v1/table/{}/version/list/", self.identifier));
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
@@ -1221,19 +1357,25 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let client = self.client.clone();
let identifier = self.identifier.clone();
let table_name = self.name.clone();
let freshness_headers = self.snapshot_freshness_headers();
self.schema_cache
.get(move || async move {
fetch_schema(&client, &identifier, &table_name, version).await
fetch_schema(
&client,
&identifier,
&table_name,
version,
freshness_headers,
)
.await
})
.await
.map_err(unwrap_shared_error)
}
async fn count_rows(&self, filter: Option<Filter>) -> Result<usize> {
let mut request = self
.client
.post(&format!("/v1/table/{}/count_rows/", self.identifier));
let mut request = self.post_read(&format!("/v1/table/{}/count_rows/", self.identifier));
let version = self.current_version().await;
@@ -1359,9 +1501,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
async fn explain_plan(&self, query: &AnyQuery, verbose: bool) -> Result<String> {
let base_request = self
.client
.post(&format!("/v1/table/{}/explain_plan/", self.identifier));
let base_request = self.post_read(&format!("/v1/table/{}/explain_plan/", self.identifier));
let query_bodies = self.prepare_query_bodies(query).await?;
let requests: Vec<reqwest::RequestBuilder> = query_bodies
@@ -1408,9 +1548,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
query: &AnyQuery,
_options: QueryExecutionOptions,
) -> Result<String> {
let request = self
.client
.post(&format!("/v1/table/{}/analyze_plan/", self.identifier));
let request = self.post_read(&format!("/v1/table/{}/analyze_plan/", self.identifier));
let query_bodies = self.prepare_query_bodies(query).await?;
let requests: Vec<reqwest::RequestBuilder> = query_bodies
@@ -1480,6 +1618,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
status_code: None,
})?;
self.track_write_version(update_response.version);
Ok(update_response)
}
@@ -1506,6 +1645,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
request_id,
status_code: None,
})?;
self.track_write_version(delete_response.version);
Ok(delete_response)
}
@@ -1662,6 +1802,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
status_code: None,
})?;
self.track_write_version(merge_insert_response.version);
Ok(merge_insert_response)
}
@@ -1687,12 +1828,22 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
Ok(Box::new(RemoteTags { inner: self }))
}
async fn checkout_tag(&self, tag: &str) -> Result<()> {
let tags = self.tags().await?;
let version = tags.get_version(tag).await?;
// Resolve the tag without attaching freshness headers; a stale
// `min_version` from a previous write should not ride along on an
// explicit time-travel request.
let request = self
.client
.post(&format!("/v1/table/{}/tags/version/", self.identifier));
let version = self.resolve_tag_version_with_request(tag, request).await?;
let mut write_guard = self.version.write().await;
*write_guard = Some(version);
drop(write_guard);
// Explicit time-travel: drop any read-your-write / freshness
// constraints so the user sees exactly the tagged version.
*self.freshness.lock().unwrap() = FreshnessState::default();
// Invalidate schema cache since we're switching versions
self.invalidate_schema_cache();
@@ -1743,6 +1894,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
})?;
self.invalidate_schema_cache();
self.track_write_version(result.version);
Ok(result)
}
@@ -1797,6 +1949,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
})?;
self.invalidate_schema_cache();
self.track_write_version(result.version);
Ok(result)
}
@@ -1824,15 +1977,14 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
})?;
self.invalidate_schema_cache();
self.track_write_version(result.version);
Ok(result)
}
async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
// Make request to list the indices
let mut request = self
.client
.post(&format!("/v1/table/{}/index/list/", self.identifier));
let mut request = self.post_read(&format!("/v1/table/{}/index/list/", self.identifier));
let version = self.current_version().await;
let body = serde_json::json!({ "version": version });
request = request.json(&body);
@@ -1896,7 +2048,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>> {
let mut request = self.client.post(&format!(
let mut request = self.post_read(&format!(
"/v1/table/{}/index/{}/stats/",
self.identifier, index_name
));
@@ -2008,9 +2160,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
async fn stats(&self) -> Result<TableStatistics> {
let request = self
.client
.post(&format!("/v1/table/{}/stats/", self.identifier));
let request = self.post_read(&format!("/v1/table/{}/stats/", self.identifier));
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await.err_to_http(request_id.clone())?;
@@ -5974,4 +6124,299 @@ mod tests {
assert_eq!(create_count.load(Ordering::SeqCst), 2);
assert_eq!(abort_count.load(Ordering::SeqCst), 1);
}
// ---- Read freshness header tests ------------------------------------
#[test]
fn test_compute_min_timestamp_combines_baseline_and_interval() {
let now = SystemTime::now();
let baseline = now - Duration::from_secs(60);
// No interval, no baseline -> no header.
assert_eq!(
compute_min_timestamp(&FreshnessState::default(), None, now),
None
);
// Baseline only -> baseline.
let state = FreshnessState {
min_version: None,
checkout_baseline: Some(baseline),
};
assert_eq!(compute_min_timestamp(&state, None, now), Some(baseline));
// ZERO interval, no baseline -> now.
assert_eq!(
compute_min_timestamp(&FreshnessState::default(), Some(Duration::ZERO), now),
Some(now)
);
// Positive interval, no baseline -> now - interval.
assert_eq!(
compute_min_timestamp(
&FreshnessState::default(),
Some(Duration::from_secs(10)),
now
),
Some(now - Duration::from_secs(10))
);
// Both: pick the more-recent (i.e. tighter) constraint.
// baseline = now-60, now-interval = now-10. now-10 is newer.
let state = FreshnessState {
min_version: None,
checkout_baseline: Some(baseline),
};
assert_eq!(
compute_min_timestamp(&state, Some(Duration::from_secs(10)), now),
Some(now - Duration::from_secs(10))
);
// Both, baseline newer: pick baseline.
let recent_baseline = now - Duration::from_secs(5);
let state = FreshnessState {
min_version: None,
checkout_baseline: Some(recent_baseline),
};
assert_eq!(
compute_min_timestamp(&state, Some(Duration::from_secs(60)), now),
Some(recent_baseline)
);
}
/// Allowed slop when comparing a header timestamp against a locally
/// captured wall-clock bound. Tests run fast enough that 1s is plenty.
const FRESHNESS_TOLERANCE: Duration = Duration::from_secs(1);
fn capturing_handler<F>(
body_for: F,
) -> (
impl Fn(reqwest::Request) -> http::Response<String> + Clone + Send + Sync + 'static,
Arc<std::sync::Mutex<Option<http::HeaderMap>>>,
)
where
F: Fn(&str) -> String + Clone + Send + Sync + 'static,
{
let captured = Arc::new(std::sync::Mutex::new(None));
let captured_c = captured.clone();
let handler = move |request: reqwest::Request| {
*captured_c.lock().unwrap() = Some(request.headers().clone());
let path = request.url().path().to_string();
http::Response::builder()
.status(200)
.body(body_for(&path))
.unwrap()
};
(handler, captured)
}
fn parse_min_timestamp(headers: &http::HeaderMap) -> SystemTime {
let value = headers
.get("x-lancedb-min-timestamp")
.expect("expected x-lancedb-min-timestamp header")
.to_str()
.unwrap();
chrono::DateTime::parse_from_rfc3339(value)
.unwrap()
.with_timezone(&chrono::Utc)
.into()
}
#[tokio::test]
async fn test_freshness_default_sends_no_headers() {
let (handler, captured) = capturing_handler(|_| "42".to_string());
let table = Table::new_with_handler("my_table", handler);
let _ = table.count_rows(None).await.unwrap();
let headers = captured.lock().unwrap().clone().unwrap();
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
assert!(!headers.contains_key("x-lancedb-min-version"));
}
#[tokio::test]
async fn test_freshness_zero_interval_sends_now() {
let (handler, captured) = capturing_handler(|_| "42".to_string());
let table =
Table::new_with_handler_and_interval("my_table", handler, Some(Duration::from_secs(0)));
let before = SystemTime::now();
table.count_rows(None).await.unwrap();
let after = SystemTime::now();
let headers = captured.lock().unwrap().clone().unwrap();
let sent = parse_min_timestamp(&headers);
assert!(
sent >= before - FRESHNESS_TOLERANCE && sent <= after + FRESHNESS_TOLERANCE,
"expected timestamp roughly equal to wall clock"
);
assert!(!headers.contains_key("x-lancedb-min-version"));
}
#[tokio::test]
async fn test_freshness_positive_interval_sends_now_minus_interval() {
let (handler, captured) = capturing_handler(|_| "42".to_string());
let interval = Duration::from_secs(30);
let table = Table::new_with_handler_and_interval("my_table", handler, Some(interval));
let before = SystemTime::now();
table.count_rows(None).await.unwrap();
let after = SystemTime::now();
let headers = captured.lock().unwrap().clone().unwrap();
let sent = parse_min_timestamp(&headers);
assert!(
sent >= before - interval - FRESHNESS_TOLERANCE
&& sent <= after - interval + FRESHNESS_TOLERANCE,
"expected timestamp roughly equal to now - interval"
);
}
#[tokio::test]
async fn test_freshness_checkout_latest_sets_baseline() {
let (handler, captured) = capturing_handler(|path| match path {
"/v1/table/my_table/count_rows/" => "42".to_string(),
_ => panic!("unexpected path: {}", path),
});
// No interval — only the baseline should drive the timestamp.
let table = Table::new_with_handler_and_interval("my_table", handler, None);
let before_checkout = SystemTime::now();
table.checkout_latest().await.unwrap();
let after_checkout = SystemTime::now();
table.count_rows(None).await.unwrap();
let headers = captured.lock().unwrap().clone().unwrap();
let sent = parse_min_timestamp(&headers);
assert!(
sent >= before_checkout - FRESHNESS_TOLERANCE
&& sent <= after_checkout + FRESHNESS_TOLERANCE,
"expected timestamp captured at checkout_latest() time"
);
assert!(!headers.contains_key("x-lancedb-min-version"));
}
#[tokio::test]
async fn test_freshness_min_version_tracked_after_write() {
let (handler, captured) = capturing_handler(|path| match path {
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
"/v1/table/my_table/count_rows/" => "42".to_string(),
_ => panic!("unexpected path: {}", path),
});
let table = Table::new_with_handler("my_table", handler);
let _ = table.update().column("a", "a + 1").execute().await.unwrap();
// Update headers also pass through captured; reset by reading after.
table.count_rows(None).await.unwrap();
let headers = captured.lock().unwrap().clone().unwrap();
assert_eq!(
headers
.get("x-lancedb-min-version")
.unwrap()
.to_str()
.unwrap(),
"7"
);
}
/// Like `capturing_handler`, but keeps a per-path snapshot of the headers
/// from every request so tests can assert on a specific endpoint.
#[allow(clippy::type_complexity)]
fn path_capturing_handler<F>(
body_for: F,
) -> (
impl Fn(reqwest::Request) -> http::Response<String> + Clone + Send + Sync + 'static,
Arc<std::sync::Mutex<HashMap<String, http::HeaderMap>>>,
)
where
F: Fn(&str) -> String + Clone + Send + Sync + 'static,
{
let captured: Arc<std::sync::Mutex<HashMap<String, http::HeaderMap>>> =
Arc::new(std::sync::Mutex::new(HashMap::new()));
let captured_c = captured.clone();
let handler = move |request: reqwest::Request| {
let path = request.url().path().to_string();
captured_c
.lock()
.unwrap()
.insert(path.clone(), request.headers().clone());
http::Response::builder()
.status(200)
.body(body_for(&path))
.unwrap()
};
(handler, captured)
}
#[tokio::test]
async fn test_freshness_checkout_validation_sends_no_min_version() {
// After a write bumps min_version, calling checkout(v) must not let
// that stale header ride along on the validating /describe/ request.
let (handler, captured) = path_capturing_handler(|path| match path {
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
"/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(),
_ => panic!("unexpected path: {}", path),
});
let table = Table::new_with_handler("my_table", handler);
table.update().column("a", "a + 1").execute().await.unwrap();
table.checkout(5).await.unwrap();
let captured = captured.lock().unwrap();
let describe_headers = captured
.get("/v1/table/my_table/describe/")
.expect("describe should have been called by checkout(v)");
assert!(
!describe_headers.contains_key("x-lancedb-min-version"),
"checkout(v) describe must not carry stale min_version",
);
assert!(!describe_headers.contains_key("x-lancedb-min-timestamp"));
}
#[tokio::test]
async fn test_freshness_checkout_tag_resolve_sends_no_min_version() {
// Same invariant for checkout_tag: the tag-resolve request must not
// pick up a stale min_version from a prior write.
let (handler, captured) = path_capturing_handler(|path| match path {
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
"/v1/table/my_table/tags/version/" => r#"{"version":5}"#.to_string(),
_ => panic!("unexpected path: {}", path),
});
let table = Table::new_with_handler("my_table", handler);
table.update().column("a", "a + 1").execute().await.unwrap();
table.checkout_tag("v_initial").await.unwrap();
let captured = captured.lock().unwrap();
let resolve_headers = captured
.get("/v1/table/my_table/tags/version/")
.expect("tags/version should have been called by checkout_tag");
assert!(
!resolve_headers.contains_key("x-lancedb-min-version"),
"checkout_tag resolve must not carry stale min_version",
);
assert!(!resolve_headers.contains_key("x-lancedb-min-timestamp"));
}
#[tokio::test]
async fn test_freshness_checkout_clears_min_version() {
let (handler, captured) = capturing_handler(|path| match path {
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
// checkout(5) needs to describe version 5 first
"/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(),
"/v1/table/my_table/count_rows/" => "42".to_string(),
_ => panic!("unexpected path: {}", path),
});
let table = Table::new_with_handler("my_table", handler);
table.update().column("a", "a + 1").execute().await.unwrap();
table.checkout(5).await.unwrap();
table.count_rows(None).await.unwrap();
let headers = captured.lock().unwrap().clone().unwrap();
assert!(!headers.contains_key("x-lancedb-min-version"));
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
}
}

View File

@@ -656,6 +656,30 @@ mod test_utils {
}
}
pub fn new_with_handler_and_interval<T>(
name: impl Into<String>,
handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
read_consistency_interval: Option<std::time::Duration>,
) -> Self
where
T: Into<reqwest::Body>,
{
let inner = Arc::new(
crate::remote::table::RemoteTable::new_mock_with_consistency_interval(
name.into(),
handler.clone(),
read_consistency_interval,
),
);
let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
Self {
inner,
database: Some(database),
// Registry is unused.
embedding_registry: Arc::new(MemoryRegistry::new()),
}
}
pub fn new_with_handler_version<T>(
name: impl Into<String>,
version: semver::Version,