Compare commits

...

3 Commits

Author SHA1 Message Date
Brendan Clement
9a09286460 docs(js): regenerate typedoc for read_consistency_interval 2026-05-22 20:52:16 -07:00
Brendan Clement
e363e1661f docs: clarify read_consistency_interval scope and cost 2026-05-22 20:40:33 -07:00
Brendan Clement
cbdad17f34 feat(remote): send read freshness headers for remote table consistency 2026-05-22 20:29:31 -07:00
9 changed files with 612 additions and 101 deletions

View File

@@ -70,16 +70,20 @@ client used by manifest-enabled native connections.
optional readConsistencyInterval: number;
```
(For LanceDB OSS only): The interval, in seconds, at which to check for
updates to the table from other processes. If None, then consistency is not
checked. For performance reasons, this is the default. For strong
consistency, set this to zero seconds. Then every read will check for
updates from other processes. As a compromise, you can set this to a
non-zero value for eventual consistency. If more than that interval
has passed since the last check, then the table will be checked for updates.
Note: this consistency only applies to read operations. Write operations are
The interval, in seconds, at which to check for updates to the table
from other processes. If None, then consistency is not checked. For
performance reasons, this is the default. For strong consistency, set
this to zero seconds. Then every read will check for updates from other
processes. As a compromise, you can set this to a non-zero value for
eventual consistency. If more than that interval has passed since the
last check, then the table will be checked for updates. Note: this
consistency only applies to read operations. Write operations are
always consistent.
Stronger consistency is not free. The smaller the interval, the more
often each read pays the cost of checking for updates against object
storage, raising per-read latency and cost.
***
### region?

View File

@@ -24,15 +24,19 @@ mod util;
#[napi(object)]
#[derive(Debug)]
pub struct ConnectionOptions {
/// (For LanceDB OSS only): The interval, in seconds, at which to check for
/// updates to the table from other processes. If None, then consistency is not
/// checked. For performance reasons, this is the default. For strong
/// consistency, set this to zero seconds. Then every read will check for
/// updates from other processes. As a compromise, you can set this to a
/// non-zero value for eventual consistency. If more than that interval
/// has passed since the last check, then the table will be checked for updates.
/// Note: this consistency only applies to read operations. Write operations are
/// The interval, in seconds, at which to check for updates to the table
/// from other processes. If None, then consistency is not checked. For
/// performance reasons, this is the default. For strong consistency, set
/// this to zero seconds. Then every read will check for updates from other
/// processes. As a compromise, you can set this to a non-zero value for
/// eventual consistency. If more than that interval has passed since the
/// last check, then the table will be checked for updates. Note: this
/// consistency only applies to read operations. Write operations are
/// always consistent.
///
/// Stronger consistency is not free. The smaller the interval, the more
/// often each read pays the cost of checking for updates against object
/// storage, raising per-read latency and cost.
pub read_consistency_interval: Option<f64>,
/// (For LanceDB OSS only): configuration for object storage.
///

View File

@@ -94,7 +94,6 @@ def connect(
host_override: str, optional
The override url for LanceDB Cloud.
read_consistency_interval: timedelta, default None
(For LanceDB OSS only)
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked. For performance
reasons, this is the default. For strong consistency, set this to
@@ -104,6 +103,10 @@ def connect(
the last check, then the table will be checked for updates. Note: this
consistency only applies to read operations. Write operations are
always consistent.
Stronger consistency is not free. The smaller the interval, the more
often each read pays the cost of checking for updates against object
storage, raising per-read latency and cost.
client_config: ClientConfig or dict, optional
Configuration options for the LanceDB Cloud HTTP client. If a dict, then
the keys are the attributes of the ClientConfig class. If None, then the
@@ -217,6 +220,7 @@ def connect(
request_thread_pool=request_thread_pool,
client_config=client_config,
storage_options=storage_options,
read_consistency_interval=read_consistency_interval,
**kwargs,
)
_check_s3_bucket_with_dots(str(uri), storage_options)
@@ -343,7 +347,6 @@ async def connect_async(
host_override: str, optional
The override url for LanceDB Cloud.
read_consistency_interval: timedelta, default None
(For LanceDB OSS only)
The interval at which to check for updates to the table from other
processes. If None, then consistency is not checked. For performance
reasons, this is the default. For strong consistency, set this to
@@ -353,6 +356,10 @@ async def connect_async(
the last check, then the table will be checked for updates. Note: this
consistency only applies to read operations. Write operations are
always consistent.
Stronger consistency is not free. The smaller the interval, the more
often each read pays the cost of checking for updates against object
storage, raising per-read latency and cost.
client_config: ClientConfig or dict, optional
Configuration options for the LanceDB Cloud HTTP client. If a dict, then
the keys are the attributes of the ClientConfig class. If None, then the

View File

@@ -50,6 +50,7 @@ class RemoteDBConnection(DBConnection):
connection_timeout: Optional[float] = None,
read_timeout: Optional[float] = None,
storage_options: Optional[Dict[str, str]] = None,
read_consistency_interval: Optional[timedelta] = None,
):
"""Connect to a remote LanceDB database."""
if isinstance(client_config, dict):
@@ -103,6 +104,7 @@ class RemoteDBConnection(DBConnection):
host_override=host_override,
client_config=client_config,
storage_options=storage_options,
read_consistency_interval=read_consistency_interval,
)
)

View File

@@ -812,8 +812,7 @@ impl ConnectBuilder {
self
}
/// The interval at which to check for updates from other processes. This
/// only affects LanceDB OSS.
/// The interval at which to check for updates from other processes.
///
/// If left unset, consistency is not checked. For maximum read
/// performance, this is the default. For strong consistency, set this to
@@ -825,8 +824,11 @@ impl ConnectBuilder {
/// This only affects read operations. Write operations are always
/// consistent.
///
/// LanceDB Cloud uses eventual consistency under the hood, and is not
/// currently configurable.
/// # Cost
///
/// Stronger consistency is not free. The smaller the interval, the more
/// often each read pays the cost of checking for updates against object
/// storage, raising per-read latency and cost.
pub fn read_consistency_interval(
mut self,
read_consistency_interval: std::time::Duration,
@@ -886,6 +888,7 @@ impl ConnectBuilder {
options.host_override,
self.request.client_config,
storage_options.into(),
self.request.read_consistency_interval,
)?);
Ok(Connection {
internal,

View File

@@ -245,6 +245,9 @@ pub struct RestfulLanceDbClient<S: HttpSend = Sender> {
pub(crate) sender: S,
pub(crate) id_delimiter: String,
pub(crate) header_provider: Option<Arc<dyn HeaderProvider>>,
/// Connection-level read consistency interval. Drives the
/// `x-lancedb-min-timestamp` freshness header sent on read requests.
pub(crate) read_consistency_interval: Option<Duration>,
}
impl<S: HttpSend> std::fmt::Debug for RestfulLanceDbClient<S> {
@@ -338,6 +341,7 @@ impl RestfulLanceDbClient<Sender> {
host_override: Option<String>,
default_headers: HeaderMap,
client_config: ClientConfig,
read_consistency_interval: Option<Duration>,
) -> Result<Self> {
// Get the timeouts
let timeout =
@@ -435,6 +439,7 @@ impl RestfulLanceDbClient<Sender> {
.clone()
.unwrap_or("$".to_string()),
header_provider: client_config.header_provider,
read_consistency_interval,
})
}
}
@@ -840,6 +845,16 @@ pub mod test_utils {
pub fn client_with_handler<T>(
handler: impl Fn(reqwest::Request) -> http::response::Response<T> + Send + Sync + 'static,
) -> RestfulLanceDbClient<MockSender>
where
T: Into<reqwest::Body>,
{
client_with_handler_and_interval(handler, None)
}
pub fn client_with_handler_and_interval<T>(
handler: impl Fn(reqwest::Request) -> http::response::Response<T> + Send + Sync + 'static,
read_consistency_interval: Option<Duration>,
) -> RestfulLanceDbClient<MockSender>
where
T: Into<reqwest::Body>,
{
@@ -857,6 +872,7 @@ pub mod test_utils {
},
id_delimiter: "$".to_string(),
header_provider: None,
read_consistency_interval,
}
}
@@ -881,6 +897,7 @@ pub mod test_utils {
},
id_delimiter: config.id_delimiter.unwrap_or_else(|| "$".to_string()),
header_provider: config.header_provider,
read_consistency_interval: None,
}
}
}
@@ -1047,6 +1064,7 @@ mod tests {
sender: Sender,
id_delimiter: "+".to_string(),
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
read_consistency_interval: None,
};
// Apply dynamic headers
@@ -1082,6 +1100,7 @@ mod tests {
sender: Sender,
id_delimiter: "+".to_string(),
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
read_consistency_interval: None,
};
// Apply dynamic headers
@@ -1119,6 +1138,7 @@ mod tests {
sender: Sender,
id_delimiter: "+".to_string(),
header_provider: Some(Arc::new(provider) as Arc<dyn HeaderProvider>),
read_consistency_interval: None,
};
// Header provider errors should fail the request

View File

@@ -206,6 +206,7 @@ impl RemoteDatabase {
host_override: Option<String>,
client_config: ClientConfig,
options: RemoteOptions,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> {
let parsed = super::client::parse_db_url(uri)?;
let header_map = RestfulLanceDbClient::<Sender>::default_headers(
@@ -233,6 +234,7 @@ impl RemoteDatabase {
host_override,
header_map,
client_config.clone(),
read_consistency_interval,
)?;
let table_cache = Cache::builder()

View File

@@ -62,15 +62,67 @@ use std::collections::HashMap;
use std::io::Cursor;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::{Duration, SystemTime};
use tokio::sync::RwLock;
const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms");
const MIN_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-version");
const MIN_TIMESTAMP_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-timestamp");
const METRIC_TYPE_KEY: &str = "metric_type";
const INDEX_TYPE_KEY: &str = "index_type";
const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30);
const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5);
/// Per-table state driving the freshness headers (`x-lancedb-min-version` and
/// `x-lancedb-min-timestamp`) sent on read requests.
///
/// `min_version` provides read-your-write within a single handle: writes that
/// return a version update it, and reads send it. `checkout_baseline` is the
/// wall-clock time captured at the last [`BaseTable::checkout_latest`] call;
/// reads send `max(baseline, now - read_consistency_interval)`.
#[derive(Debug, Default, Clone, Copy)]
struct FreshnessState {
min_version: Option<u64>,
checkout_baseline: Option<SystemTime>,
}
/// Snapshot of the headers that should be attached to a single read request.
#[derive(Debug, Default, Clone, Copy)]
struct FreshnessHeaders {
min_version: Option<u64>,
min_timestamp: Option<SystemTime>,
}
impl FreshnessHeaders {
fn apply(self, mut request: RequestBuilder) -> RequestBuilder {
if let Some(v) = self.min_version {
request = request.header(MIN_VERSION_HEADER, v.to_string());
}
if let Some(ts) = self.min_timestamp {
let dt: chrono::DateTime<chrono::Utc> = ts.into();
request = request.header(MIN_TIMESTAMP_HEADER, dt.to_rfc3339());
}
request
}
}
fn compute_min_timestamp(
state: &FreshnessState,
interval: Option<Duration>,
now: SystemTime,
) -> Option<SystemTime> {
let interval_based = match interval {
None => None,
Some(d) if d.is_zero() => Some(now),
Some(d) => Some(now.checked_sub(d).unwrap_or(now)),
};
match (interval_based, state.checkout_baseline) {
(None, None) => None,
(Some(t), None) | (None, Some(t)) => Some(t),
(Some(a), Some(b)) => Some(a.max(b)),
}
}
pub struct RemoteTags<'a, S: HttpSend = Sender> {
inner: &'a RemoteTable<S>,
}
@@ -80,8 +132,7 @@ impl<S: HttpSend + 'static> Tags for RemoteTags<'_, S> {
async fn list(&self) -> Result<HashMap<String, TagContents>> {
let request = self
.inner
.client
.post(&format!("/v1/table/{}/tags/list/", self.inner.identifier));
.post_read(&format!("/v1/table/{}/tags/list/", self.inner.identifier));
let (request_id, response) = self.inner.send(request, true).await?;
let response = self
.inner
@@ -112,48 +163,13 @@ impl<S: HttpSend + 'static> Tags for RemoteTags<'_, S> {
}
async fn get_version(&self, tag: &str) -> Result<u64> {
let request = self
.inner
.client
.post(&format!(
"/v1/table/{}/tags/version/",
self.inner.identifier
))
.json(&serde_json::json!({ "tag": tag }));
let (request_id, response) = self.inner.send(request, true).await?;
let response = self
.inner
.check_table_response(&request_id, response)
.await?;
match response.text().await {
Ok(body) => {
let value: serde_json::Value =
serde_json::from_str(&body).map_err(|e| Error::Http {
source: format!("Failed to parse tag version: {}", e).into(),
request_id: request_id.clone(),
status_code: None,
})?;
value
.get("version")
.and_then(|v| v.as_u64())
.ok_or_else(|| Error::Http {
source: format!("Invalid tag version response: {}", body).into(),
request_id,
status_code: None,
})
}
Err(err) => {
let status_code = err.status();
Err(Error::Http {
source: Box::new(err),
request_id,
status_code,
})
}
}
let request = self.inner.post_read(&format!(
"/v1/table/{}/tags/version/",
self.inner.identifier
));
self.inner
.resolve_tag_version_with_request(tag, request)
.await
}
async fn create(&mut self, tag: &str, version: u64) -> Result<()> {
@@ -215,6 +231,7 @@ pub struct RemoteTable<S: HttpSend = Sender> {
version: RwLock<Option<u64>>,
location: RwLock<Option<String>>,
schema_cache: BackgroundCache<SchemaRef, Error>,
freshness: Mutex<FreshnessState>,
}
impl<S: HttpSend> std::fmt::Debug for RemoteTable<S> {
@@ -243,6 +260,7 @@ impl<S: HttpSend> RemoteTable<S> {
version: RwLock::new(None),
location: RwLock::new(None),
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
freshness: Mutex::new(FreshnessState::default()),
}
}
@@ -252,12 +270,56 @@ impl<S: HttpSend> RemoteTable<S> {
}
async fn describe_version(&self, version: Option<u64>) -> Result<TableDescription> {
let mut request = self
.client
.post(&format!("/v1/table/{}/describe/", self.identifier));
let request = self.post_read(&format!("/v1/table/{}/describe/", self.identifier));
self.describe_with_request(request, version).await
}
async fn resolve_tag_version_with_request(
&self,
tag: &str,
request: RequestBuilder,
) -> Result<u64> {
let request = request.json(&serde_json::json!({ "tag": tag }));
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
match response.text().await {
Ok(body) => {
let value: serde_json::Value =
serde_json::from_str(&body).map_err(|e| Error::Http {
source: format!("Failed to parse tag version: {}", e).into(),
request_id: request_id.clone(),
status_code: None,
})?;
value
.get("version")
.and_then(|v| v.as_u64())
.ok_or_else(|| Error::Http {
source: format!("Invalid tag version response: {}", body).into(),
request_id,
status_code: None,
})
}
Err(err) => {
let status_code = err.status();
Err(Error::Http {
source: Box::new(err),
request_id,
status_code,
})
}
}
}
async fn describe_with_request(
&self,
request: RequestBuilder,
version: Option<u64>,
) -> Result<TableDescription> {
let body = serde_json::json!({ "version": version });
request = request.json(&body);
let request = request.json(&body);
let (request_id, response) = self.send(request, true).await?;
@@ -711,14 +773,44 @@ impl<S: HttpSend> RemoteTable<S> {
*read_guard
}
/// Snapshot the freshness headers to attach to a single read request.
/// Computed at call time so that retries reuse the same snapshot.
fn snapshot_freshness_headers(&self) -> FreshnessHeaders {
let state = *self.freshness.lock().unwrap();
FreshnessHeaders {
min_version: state.min_version,
min_timestamp: compute_min_timestamp(
&state,
self.client.read_consistency_interval,
SystemTime::now(),
),
}
}
/// Build a POST request and attach the read-freshness headers
/// (`x-lancedb-min-version`, `x-lancedb-min-timestamp`).
fn post_read(&self, uri: &str) -> RequestBuilder {
self.snapshot_freshness_headers()
.apply(self.client.post(uri))
}
/// Record a version returned by a write so subsequent reads can request at
/// least that version via `x-lancedb-min-version`. A returned `0` from a
/// backward-compatible old server is ignored.
fn track_write_version(&self, version: u64) {
if version == 0 {
return;
}
let mut state = self.freshness.lock().unwrap();
state.min_version = Some(state.min_version.map_or(version, |v| v.max(version)));
}
async fn execute_query(
&self,
query: &AnyQuery,
options: &QueryExecutionOptions,
) -> Result<Vec<Pin<Box<dyn RecordBatchStream + Send>>>> {
let mut request = self
.client
.post(&format!("/v1/table/{}/query/", self.identifier));
let mut request = self.post_read(&format!("/v1/table/{}/query/", self.identifier));
if let Some(timeout) = options.timeout {
// Also send to server, so it can abort the query if it takes too long.
@@ -824,9 +916,10 @@ async fn fetch_schema<S: HttpSend>(
identifier: &str,
table_name: &str,
version: Option<u64>,
freshness_headers: FreshnessHeaders,
) -> Result<SchemaRef> {
let request = client
.post(&format!("/v1/table/{}/describe/", identifier))
let request = freshness_headers
.apply(client.post(&format!("/v1/table/{}/describe/", identifier)))
.json(&serde_json::json!({ "version": version }));
let (request_id, response) = client.send_with_retry(request, None, true).await?;
@@ -874,7 +967,9 @@ mod test_utils {
use super::*;
use crate::remote::ClientConfig;
use crate::remote::client::test_utils::client_with_handler;
use crate::remote::client::test_utils::{MockSender, client_with_handler_and_config};
use crate::remote::client::test_utils::{
MockSender, client_with_handler_and_config, client_with_handler_and_interval,
};
impl RemoteTable<MockSender> {
pub fn new_mock<F, T>(name: String, handler: F, version: Option<semver::Version>) -> Self
@@ -892,6 +987,30 @@ mod test_utils {
version: RwLock::new(None),
location: RwLock::new(None),
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
freshness: Mutex::new(FreshnessState::default()),
}
}
pub fn new_mock_with_consistency_interval<F, T>(
name: String,
handler: F,
read_consistency_interval: Option<Duration>,
) -> Self
where
F: Fn(reqwest::Request) -> http::Response<T> + Send + Sync + 'static,
T: Into<reqwest::Body>,
{
let client = client_with_handler_and_interval(handler, read_consistency_interval);
Self {
client,
name: name.clone(),
namespace: vec![],
identifier: name,
server_version: ServerVersion::default(),
version: RwLock::new(None),
location: RwLock::new(None),
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
freshness: Mutex::new(FreshnessState::default()),
}
}
@@ -923,6 +1042,7 @@ mod test_utils {
version: RwLock::new(None),
location: RwLock::new(None),
schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW),
freshness: Mutex::new(FreshnessState::default()),
}
}
}
@@ -986,6 +1106,7 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
if output.overwrite {
self.invalidate_schema_cache();
}
self.track_write_version(add_result.version);
return Ok(add_result);
}
@@ -1023,6 +1144,7 @@ impl<S: HttpSend + 'static> RemoteTable<S> {
if output.overwrite {
self.invalidate_schema_cache();
}
self.track_write_version(result.version);
return Ok(result);
}
Err(e) => {
@@ -1139,8 +1261,13 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
self.describe().await.map(|desc| desc.version)
}
async fn checkout(&self, version: u64) -> Result<()> {
// check that the version exists
self.describe_version(Some(version))
// Validate the version exists. The describe is sent without freshness
// headers so a stale `min_version` from a previous write doesn't ride
// along on an explicit time-travel request.
let request = self
.client
.post(&format!("/v1/table/{}/describe/", self.identifier));
self.describe_with_request(request, Some(version))
.await
.map_err(|e| match e {
// try to map the error to a more user-friendly error telling them
@@ -1156,6 +1283,10 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
*write_guard = Some(version);
drop(write_guard);
// Explicit time-travel: drop any read-your-write / freshness
// constraints so the user sees exactly the requested version.
*self.freshness.lock().unwrap() = FreshnessState::default();
// Invalidate schema cache since we're switching versions
self.invalidate_schema_cache();
@@ -1166,6 +1297,13 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
*write_guard = None;
drop(write_guard);
// Drop any per-handle write tracking; subsequent reads use the
// baseline timestamp captured now to guarantee freshness.
*self.freshness.lock().unwrap() = FreshnessState {
min_version: None,
checkout_baseline: Some(SystemTime::now()),
};
// Invalidate schema cache since we're switching versions
self.invalidate_schema_cache();
@@ -1186,9 +1324,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
async fn list_versions(&self) -> Result<Vec<Version>> {
let request = self
.client
.post(&format!("/v1/table/{}/version/list/", self.identifier));
let request = self.post_read(&format!("/v1/table/{}/version/list/", self.identifier));
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
@@ -1221,19 +1357,25 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
let client = self.client.clone();
let identifier = self.identifier.clone();
let table_name = self.name.clone();
let freshness_headers = self.snapshot_freshness_headers();
self.schema_cache
.get(move || async move {
fetch_schema(&client, &identifier, &table_name, version).await
fetch_schema(
&client,
&identifier,
&table_name,
version,
freshness_headers,
)
.await
})
.await
.map_err(unwrap_shared_error)
}
async fn count_rows(&self, filter: Option<Filter>) -> Result<usize> {
let mut request = self
.client
.post(&format!("/v1/table/{}/count_rows/", self.identifier));
let mut request = self.post_read(&format!("/v1/table/{}/count_rows/", self.identifier));
let version = self.current_version().await;
@@ -1359,9 +1501,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
async fn explain_plan(&self, query: &AnyQuery, verbose: bool) -> Result<String> {
let base_request = self
.client
.post(&format!("/v1/table/{}/explain_plan/", self.identifier));
let base_request = self.post_read(&format!("/v1/table/{}/explain_plan/", self.identifier));
let query_bodies = self.prepare_query_bodies(query).await?;
let requests: Vec<reqwest::RequestBuilder> = query_bodies
@@ -1408,9 +1548,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
query: &AnyQuery,
_options: QueryExecutionOptions,
) -> Result<String> {
let request = self
.client
.post(&format!("/v1/table/{}/analyze_plan/", self.identifier));
let request = self.post_read(&format!("/v1/table/{}/analyze_plan/", self.identifier));
let query_bodies = self.prepare_query_bodies(query).await?;
let requests: Vec<reqwest::RequestBuilder> = query_bodies
@@ -1480,6 +1618,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
status_code: None,
})?;
self.track_write_version(update_response.version);
Ok(update_response)
}
@@ -1506,6 +1645,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
request_id,
status_code: None,
})?;
self.track_write_version(delete_response.version);
Ok(delete_response)
}
@@ -1662,6 +1802,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
status_code: None,
})?;
self.track_write_version(merge_insert_response.version);
Ok(merge_insert_response)
}
@@ -1687,12 +1828,22 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
Ok(Box::new(RemoteTags { inner: self }))
}
async fn checkout_tag(&self, tag: &str) -> Result<()> {
let tags = self.tags().await?;
let version = tags.get_version(tag).await?;
// Resolve the tag without attaching freshness headers; a stale
// `min_version` from a previous write should not ride along on an
// explicit time-travel request.
let request = self
.client
.post(&format!("/v1/table/{}/tags/version/", self.identifier));
let version = self.resolve_tag_version_with_request(tag, request).await?;
let mut write_guard = self.version.write().await;
*write_guard = Some(version);
drop(write_guard);
// Explicit time-travel: drop any read-your-write / freshness
// constraints so the user sees exactly the tagged version.
*self.freshness.lock().unwrap() = FreshnessState::default();
// Invalidate schema cache since we're switching versions
self.invalidate_schema_cache();
@@ -1743,6 +1894,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
})?;
self.invalidate_schema_cache();
self.track_write_version(result.version);
Ok(result)
}
@@ -1797,6 +1949,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
})?;
self.invalidate_schema_cache();
self.track_write_version(result.version);
Ok(result)
}
@@ -1824,15 +1977,14 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
})?;
self.invalidate_schema_cache();
self.track_write_version(result.version);
Ok(result)
}
async fn list_indices(&self) -> Result<Vec<IndexConfig>> {
// Make request to list the indices
let mut request = self
.client
.post(&format!("/v1/table/{}/index/list/", self.identifier));
let mut request = self.post_read(&format!("/v1/table/{}/index/list/", self.identifier));
let version = self.current_version().await;
let body = serde_json::json!({ "version": version });
request = request.json(&body);
@@ -1896,7 +2048,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
async fn index_stats(&self, index_name: &str) -> Result<Option<IndexStatistics>> {
let mut request = self.client.post(&format!(
let mut request = self.post_read(&format!(
"/v1/table/{}/index/{}/stats/",
self.identifier, index_name
));
@@ -2008,9 +2160,7 @@ impl<S: HttpSend> BaseTable for RemoteTable<S> {
}
async fn stats(&self) -> Result<TableStatistics> {
let request = self
.client
.post(&format!("/v1/table/{}/stats/", self.identifier));
let request = self.post_read(&format!("/v1/table/{}/stats/", self.identifier));
let (request_id, response) = self.send(request, true).await?;
let response = self.check_table_response(&request_id, response).await?;
let body = response.text().await.err_to_http(request_id.clone())?;
@@ -5974,4 +6124,299 @@ mod tests {
assert_eq!(create_count.load(Ordering::SeqCst), 2);
assert_eq!(abort_count.load(Ordering::SeqCst), 1);
}
// ---- Read freshness header tests ------------------------------------
#[test]
fn test_compute_min_timestamp_combines_baseline_and_interval() {
let now = SystemTime::now();
let baseline = now - Duration::from_secs(60);
// No interval, no baseline -> no header.
assert_eq!(
compute_min_timestamp(&FreshnessState::default(), None, now),
None
);
// Baseline only -> baseline.
let state = FreshnessState {
min_version: None,
checkout_baseline: Some(baseline),
};
assert_eq!(compute_min_timestamp(&state, None, now), Some(baseline));
// ZERO interval, no baseline -> now.
assert_eq!(
compute_min_timestamp(&FreshnessState::default(), Some(Duration::ZERO), now),
Some(now)
);
// Positive interval, no baseline -> now - interval.
assert_eq!(
compute_min_timestamp(
&FreshnessState::default(),
Some(Duration::from_secs(10)),
now
),
Some(now - Duration::from_secs(10))
);
// Both: pick the more-recent (i.e. tighter) constraint.
// baseline = now-60, now-interval = now-10. now-10 is newer.
let state = FreshnessState {
min_version: None,
checkout_baseline: Some(baseline),
};
assert_eq!(
compute_min_timestamp(&state, Some(Duration::from_secs(10)), now),
Some(now - Duration::from_secs(10))
);
// Both, baseline newer: pick baseline.
let recent_baseline = now - Duration::from_secs(5);
let state = FreshnessState {
min_version: None,
checkout_baseline: Some(recent_baseline),
};
assert_eq!(
compute_min_timestamp(&state, Some(Duration::from_secs(60)), now),
Some(recent_baseline)
);
}
/// Allowed slop when comparing a header timestamp against a locally
/// captured wall-clock bound. Tests run fast enough that 1s is plenty.
const FRESHNESS_TOLERANCE: Duration = Duration::from_secs(1);
fn capturing_handler<F>(
body_for: F,
) -> (
impl Fn(reqwest::Request) -> http::Response<String> + Clone + Send + Sync + 'static,
Arc<std::sync::Mutex<Option<http::HeaderMap>>>,
)
where
F: Fn(&str) -> String + Clone + Send + Sync + 'static,
{
let captured = Arc::new(std::sync::Mutex::new(None));
let captured_c = captured.clone();
let handler = move |request: reqwest::Request| {
*captured_c.lock().unwrap() = Some(request.headers().clone());
let path = request.url().path().to_string();
http::Response::builder()
.status(200)
.body(body_for(&path))
.unwrap()
};
(handler, captured)
}
fn parse_min_timestamp(headers: &http::HeaderMap) -> SystemTime {
let value = headers
.get("x-lancedb-min-timestamp")
.expect("expected x-lancedb-min-timestamp header")
.to_str()
.unwrap();
chrono::DateTime::parse_from_rfc3339(value)
.unwrap()
.with_timezone(&chrono::Utc)
.into()
}
#[tokio::test]
async fn test_freshness_default_sends_no_headers() {
let (handler, captured) = capturing_handler(|_| "42".to_string());
let table = Table::new_with_handler("my_table", handler);
let _ = table.count_rows(None).await.unwrap();
let headers = captured.lock().unwrap().clone().unwrap();
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
assert!(!headers.contains_key("x-lancedb-min-version"));
}
#[tokio::test]
async fn test_freshness_zero_interval_sends_now() {
let (handler, captured) = capturing_handler(|_| "42".to_string());
let table =
Table::new_with_handler_and_interval("my_table", handler, Some(Duration::from_secs(0)));
let before = SystemTime::now();
table.count_rows(None).await.unwrap();
let after = SystemTime::now();
let headers = captured.lock().unwrap().clone().unwrap();
let sent = parse_min_timestamp(&headers);
assert!(
sent >= before - FRESHNESS_TOLERANCE && sent <= after + FRESHNESS_TOLERANCE,
"expected timestamp roughly equal to wall clock"
);
assert!(!headers.contains_key("x-lancedb-min-version"));
}
#[tokio::test]
async fn test_freshness_positive_interval_sends_now_minus_interval() {
let (handler, captured) = capturing_handler(|_| "42".to_string());
let interval = Duration::from_secs(30);
let table = Table::new_with_handler_and_interval("my_table", handler, Some(interval));
let before = SystemTime::now();
table.count_rows(None).await.unwrap();
let after = SystemTime::now();
let headers = captured.lock().unwrap().clone().unwrap();
let sent = parse_min_timestamp(&headers);
assert!(
sent >= before - interval - FRESHNESS_TOLERANCE
&& sent <= after - interval + FRESHNESS_TOLERANCE,
"expected timestamp roughly equal to now - interval"
);
}
#[tokio::test]
async fn test_freshness_checkout_latest_sets_baseline() {
let (handler, captured) = capturing_handler(|path| match path {
"/v1/table/my_table/count_rows/" => "42".to_string(),
_ => panic!("unexpected path: {}", path),
});
// No interval — only the baseline should drive the timestamp.
let table = Table::new_with_handler_and_interval("my_table", handler, None);
let before_checkout = SystemTime::now();
table.checkout_latest().await.unwrap();
let after_checkout = SystemTime::now();
table.count_rows(None).await.unwrap();
let headers = captured.lock().unwrap().clone().unwrap();
let sent = parse_min_timestamp(&headers);
assert!(
sent >= before_checkout - FRESHNESS_TOLERANCE
&& sent <= after_checkout + FRESHNESS_TOLERANCE,
"expected timestamp captured at checkout_latest() time"
);
assert!(!headers.contains_key("x-lancedb-min-version"));
}
#[tokio::test]
async fn test_freshness_min_version_tracked_after_write() {
let (handler, captured) = capturing_handler(|path| match path {
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
"/v1/table/my_table/count_rows/" => "42".to_string(),
_ => panic!("unexpected path: {}", path),
});
let table = Table::new_with_handler("my_table", handler);
let _ = table.update().column("a", "a + 1").execute().await.unwrap();
// Update headers also pass through captured; reset by reading after.
table.count_rows(None).await.unwrap();
let headers = captured.lock().unwrap().clone().unwrap();
assert_eq!(
headers
.get("x-lancedb-min-version")
.unwrap()
.to_str()
.unwrap(),
"7"
);
}
/// Like `capturing_handler`, but keeps a per-path snapshot of the headers
/// from every request so tests can assert on a specific endpoint.
#[allow(clippy::type_complexity)]
fn path_capturing_handler<F>(
body_for: F,
) -> (
impl Fn(reqwest::Request) -> http::Response<String> + Clone + Send + Sync + 'static,
Arc<std::sync::Mutex<HashMap<String, http::HeaderMap>>>,
)
where
F: Fn(&str) -> String + Clone + Send + Sync + 'static,
{
let captured: Arc<std::sync::Mutex<HashMap<String, http::HeaderMap>>> =
Arc::new(std::sync::Mutex::new(HashMap::new()));
let captured_c = captured.clone();
let handler = move |request: reqwest::Request| {
let path = request.url().path().to_string();
captured_c
.lock()
.unwrap()
.insert(path.clone(), request.headers().clone());
http::Response::builder()
.status(200)
.body(body_for(&path))
.unwrap()
};
(handler, captured)
}
#[tokio::test]
async fn test_freshness_checkout_validation_sends_no_min_version() {
// After a write bumps min_version, calling checkout(v) must not let
// that stale header ride along on the validating /describe/ request.
let (handler, captured) = path_capturing_handler(|path| match path {
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
"/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(),
_ => panic!("unexpected path: {}", path),
});
let table = Table::new_with_handler("my_table", handler);
table.update().column("a", "a + 1").execute().await.unwrap();
table.checkout(5).await.unwrap();
let captured = captured.lock().unwrap();
let describe_headers = captured
.get("/v1/table/my_table/describe/")
.expect("describe should have been called by checkout(v)");
assert!(
!describe_headers.contains_key("x-lancedb-min-version"),
"checkout(v) describe must not carry stale min_version",
);
assert!(!describe_headers.contains_key("x-lancedb-min-timestamp"));
}
#[tokio::test]
async fn test_freshness_checkout_tag_resolve_sends_no_min_version() {
// Same invariant for checkout_tag: the tag-resolve request must not
// pick up a stale min_version from a prior write.
let (handler, captured) = path_capturing_handler(|path| match path {
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
"/v1/table/my_table/tags/version/" => r#"{"version":5}"#.to_string(),
_ => panic!("unexpected path: {}", path),
});
let table = Table::new_with_handler("my_table", handler);
table.update().column("a", "a + 1").execute().await.unwrap();
table.checkout_tag("v_initial").await.unwrap();
let captured = captured.lock().unwrap();
let resolve_headers = captured
.get("/v1/table/my_table/tags/version/")
.expect("tags/version should have been called by checkout_tag");
assert!(
!resolve_headers.contains_key("x-lancedb-min-version"),
"checkout_tag resolve must not carry stale min_version",
);
assert!(!resolve_headers.contains_key("x-lancedb-min-timestamp"));
}
#[tokio::test]
async fn test_freshness_checkout_clears_min_version() {
let (handler, captured) = capturing_handler(|path| match path {
"/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(),
// checkout(5) needs to describe version 5 first
"/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(),
"/v1/table/my_table/count_rows/" => "42".to_string(),
_ => panic!("unexpected path: {}", path),
});
let table = Table::new_with_handler("my_table", handler);
table.update().column("a", "a + 1").execute().await.unwrap();
table.checkout(5).await.unwrap();
table.count_rows(None).await.unwrap();
let headers = captured.lock().unwrap().clone().unwrap();
assert!(!headers.contains_key("x-lancedb-min-version"));
assert!(!headers.contains_key("x-lancedb-min-timestamp"));
}
}

View File

@@ -656,6 +656,30 @@ mod test_utils {
}
}
pub fn new_with_handler_and_interval<T>(
name: impl Into<String>,
handler: impl Fn(reqwest::Request) -> http::Response<T> + Clone + Send + Sync + 'static,
read_consistency_interval: Option<std::time::Duration>,
) -> Self
where
T: Into<reqwest::Body>,
{
let inner = Arc::new(
crate::remote::table::RemoteTable::new_mock_with_consistency_interval(
name.into(),
handler.clone(),
read_consistency_interval,
),
);
let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler));
Self {
inner,
database: Some(database),
// Registry is unused.
embedding_registry: Arc::new(MemoryRegistry::new()),
}
}
pub fn new_with_handler_version<T>(
name: impl Into<String>,
version: semver::Version,