From 15e75804c4cbff2c34df2a622c7a29e8d64bd7cb Mon Sep 17 00:00:00 2001 From: Brendan Clement Date: Tue, 26 May 2026 13:38:07 -0700 Subject: [PATCH 1/4] feat(remote): send read freshness headers for remote table consistency (#3439) Closes client side work of #3370 ### Summary - Plumbs `read_consistency_interval` from `ConnectBuilder` through `RestfulLanceDbClient` so remote reads attach an `x-lancedb-min-timestamp` freshness header. None = no header (default), zero = "now", positive = `now - interval`. - Adds per-table `FreshnessState` on `RemoteTable`: write responses (`update`, `delete`, `merge_insert`, `add_columns`, `alter_columns`, `drop_columns`) track the committed version, and the next read sends `x-lancedb-min-version` so the server's cache honors read-your-write. - `checkout(v)` / `checkout_tag(t)` / `checkout_latest()` / `restore()` reset the freshness state appropriately; the validating `/describe/` and tag-resolve requests are sent without freshness headers so they don't carry stale state. - Updates Rust, Python, and Node docstrings and calls out that stronger consistency raises per-read latency and cost. ### Testing - Unit tests cover default behavior, interval=0, positive interval, checkout_latest baseline, min_version-after-write, checkout clears state, and the two no-stale-header invariants on `checkout(v)` and `checkout_tag(t)`. - Ran smoke tests against local remote table to verify functionality --- docs/src/js/interfaces/ConnectionOptions.md | 20 +- nodejs/src/lib.rs | 20 +- python/python/lancedb/__init__.py | 11 +- python/python/lancedb/remote/db.py | 2 + rust/lancedb/src/connection.rs | 11 +- rust/lancedb/src/remote/client.rs | 20 + rust/lancedb/src/remote/db.rs | 2 + rust/lancedb/src/remote/table.rs | 612 +++++++++++++++++--- rust/lancedb/src/table.rs | 24 + 9 files changed, 621 insertions(+), 101 deletions(-) diff --git a/docs/src/js/interfaces/ConnectionOptions.md b/docs/src/js/interfaces/ConnectionOptions.md index 1ad0e127a..de2083a9b 100644 --- a/docs/src/js/interfaces/ConnectionOptions.md +++ b/docs/src/js/interfaces/ConnectionOptions.md @@ -70,16 +70,20 @@ client used by manifest-enabled native connections. optional readConsistencyInterval: number; ``` -(For LanceDB OSS only): The interval, in seconds, at which to check for -updates to the table from other processes. If None, then consistency is not -checked. For performance reasons, this is the default. For strong -consistency, set this to zero seconds. Then every read will check for -updates from other processes. As a compromise, you can set this to a -non-zero value for eventual consistency. If more than that interval -has passed since the last check, then the table will be checked for updates. -Note: this consistency only applies to read operations. Write operations are +The interval, in seconds, at which to check for updates to the table +from other processes. If None, then consistency is not checked. For +performance reasons, this is the default. For strong consistency, set +this to zero seconds. Then every read will check for updates from other +processes. As a compromise, you can set this to a non-zero value for +eventual consistency. If more than that interval has passed since the +last check, then the table will be checked for updates. Note: this +consistency only applies to read operations. Write operations are always consistent. +Stronger consistency is not free. The smaller the interval, the more +often each read pays the cost of checking for updates against object +storage, raising per-read latency and cost. + *** ### region? diff --git a/nodejs/src/lib.rs b/nodejs/src/lib.rs index f241fb81f..53c630c93 100644 --- a/nodejs/src/lib.rs +++ b/nodejs/src/lib.rs @@ -24,15 +24,19 @@ mod util; #[napi(object)] #[derive(Debug)] pub struct ConnectionOptions { - /// (For LanceDB OSS only): The interval, in seconds, at which to check for - /// updates to the table from other processes. If None, then consistency is not - /// checked. For performance reasons, this is the default. For strong - /// consistency, set this to zero seconds. Then every read will check for - /// updates from other processes. As a compromise, you can set this to a - /// non-zero value for eventual consistency. If more than that interval - /// has passed since the last check, then the table will be checked for updates. - /// Note: this consistency only applies to read operations. Write operations are + /// The interval, in seconds, at which to check for updates to the table + /// from other processes. If None, then consistency is not checked. For + /// performance reasons, this is the default. For strong consistency, set + /// this to zero seconds. Then every read will check for updates from other + /// processes. As a compromise, you can set this to a non-zero value for + /// eventual consistency. If more than that interval has passed since the + /// last check, then the table will be checked for updates. Note: this + /// consistency only applies to read operations. Write operations are /// always consistent. + /// + /// Stronger consistency is not free. The smaller the interval, the more + /// often each read pays the cost of checking for updates against object + /// storage, raising per-read latency and cost. pub read_consistency_interval: Option, /// (For LanceDB OSS only): configuration for object storage. /// diff --git a/python/python/lancedb/__init__.py b/python/python/lancedb/__init__.py index 9ddecc6cf..c169633c4 100644 --- a/python/python/lancedb/__init__.py +++ b/python/python/lancedb/__init__.py @@ -94,7 +94,6 @@ def connect( host_override: str, optional The override url for LanceDB Cloud. read_consistency_interval: timedelta, default None - (For LanceDB OSS only) The interval at which to check for updates to the table from other processes. If None, then consistency is not checked. For performance reasons, this is the default. For strong consistency, set this to @@ -104,6 +103,10 @@ def connect( the last check, then the table will be checked for updates. Note: this consistency only applies to read operations. Write operations are always consistent. + + Stronger consistency is not free. The smaller the interval, the more + often each read pays the cost of checking for updates against object + storage, raising per-read latency and cost. client_config: ClientConfig or dict, optional Configuration options for the LanceDB Cloud HTTP client. If a dict, then the keys are the attributes of the ClientConfig class. If None, then the @@ -217,6 +220,7 @@ def connect( request_thread_pool=request_thread_pool, client_config=client_config, storage_options=storage_options, + read_consistency_interval=read_consistency_interval, **kwargs, ) _check_s3_bucket_with_dots(str(uri), storage_options) @@ -343,7 +347,6 @@ async def connect_async( host_override: str, optional The override url for LanceDB Cloud. read_consistency_interval: timedelta, default None - (For LanceDB OSS only) The interval at which to check for updates to the table from other processes. If None, then consistency is not checked. For performance reasons, this is the default. For strong consistency, set this to @@ -353,6 +356,10 @@ async def connect_async( the last check, then the table will be checked for updates. Note: this consistency only applies to read operations. Write operations are always consistent. + + Stronger consistency is not free. The smaller the interval, the more + often each read pays the cost of checking for updates against object + storage, raising per-read latency and cost. client_config: ClientConfig or dict, optional Configuration options for the LanceDB Cloud HTTP client. If a dict, then the keys are the attributes of the ClientConfig class. If None, then the diff --git a/python/python/lancedb/remote/db.py b/python/python/lancedb/remote/db.py index e110cdac1..e75a3964c 100644 --- a/python/python/lancedb/remote/db.py +++ b/python/python/lancedb/remote/db.py @@ -50,6 +50,7 @@ class RemoteDBConnection(DBConnection): connection_timeout: Optional[float] = None, read_timeout: Optional[float] = None, storage_options: Optional[Dict[str, str]] = None, + read_consistency_interval: Optional[timedelta] = None, ): """Connect to a remote LanceDB database.""" if isinstance(client_config, dict): @@ -103,6 +104,7 @@ class RemoteDBConnection(DBConnection): host_override=host_override, client_config=client_config, storage_options=storage_options, + read_consistency_interval=read_consistency_interval, ) ) diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index 8034c2a53..c1d475c9c 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -812,8 +812,7 @@ impl ConnectBuilder { self } - /// The interval at which to check for updates from other processes. This - /// only affects LanceDB OSS. + /// The interval at which to check for updates from other processes. /// /// If left unset, consistency is not checked. For maximum read /// performance, this is the default. For strong consistency, set this to @@ -825,8 +824,11 @@ impl ConnectBuilder { /// This only affects read operations. Write operations are always /// consistent. /// - /// LanceDB Cloud uses eventual consistency under the hood, and is not - /// currently configurable. + /// # Cost + /// + /// Stronger consistency is not free. The smaller the interval, the more + /// often each read pays the cost of checking for updates against object + /// storage, raising per-read latency and cost. pub fn read_consistency_interval( mut self, read_consistency_interval: std::time::Duration, @@ -886,6 +888,7 @@ impl ConnectBuilder { options.host_override, self.request.client_config, storage_options.into(), + self.request.read_consistency_interval, )?); Ok(Connection { internal, diff --git a/rust/lancedb/src/remote/client.rs b/rust/lancedb/src/remote/client.rs index 9197ba74a..378698ca6 100644 --- a/rust/lancedb/src/remote/client.rs +++ b/rust/lancedb/src/remote/client.rs @@ -245,6 +245,9 @@ pub struct RestfulLanceDbClient { pub(crate) sender: S, pub(crate) id_delimiter: String, pub(crate) header_provider: Option>, + /// Connection-level read consistency interval. Drives the + /// `x-lancedb-min-timestamp` freshness header sent on read requests. + pub(crate) read_consistency_interval: Option, } impl std::fmt::Debug for RestfulLanceDbClient { @@ -338,6 +341,7 @@ impl RestfulLanceDbClient { host_override: Option, default_headers: HeaderMap, client_config: ClientConfig, + read_consistency_interval: Option, ) -> Result { // Get the timeouts let timeout = @@ -435,6 +439,7 @@ impl RestfulLanceDbClient { .clone() .unwrap_or("$".to_string()), header_provider: client_config.header_provider, + read_consistency_interval, }) } } @@ -840,6 +845,16 @@ pub mod test_utils { pub fn client_with_handler( handler: impl Fn(reqwest::Request) -> http::response::Response + Send + Sync + 'static, ) -> RestfulLanceDbClient + where + T: Into, + { + client_with_handler_and_interval(handler, None) + } + + pub fn client_with_handler_and_interval( + handler: impl Fn(reqwest::Request) -> http::response::Response + Send + Sync + 'static, + read_consistency_interval: Option, + ) -> RestfulLanceDbClient where T: Into, { @@ -857,6 +872,7 @@ pub mod test_utils { }, id_delimiter: "$".to_string(), header_provider: None, + read_consistency_interval, } } @@ -881,6 +897,7 @@ pub mod test_utils { }, id_delimiter: config.id_delimiter.unwrap_or_else(|| "$".to_string()), header_provider: config.header_provider, + read_consistency_interval: None, } } } @@ -1047,6 +1064,7 @@ mod tests { sender: Sender, id_delimiter: "+".to_string(), header_provider: Some(Arc::new(provider) as Arc), + read_consistency_interval: None, }; // Apply dynamic headers @@ -1082,6 +1100,7 @@ mod tests { sender: Sender, id_delimiter: "+".to_string(), header_provider: Some(Arc::new(provider) as Arc), + read_consistency_interval: None, }; // Apply dynamic headers @@ -1119,6 +1138,7 @@ mod tests { sender: Sender, id_delimiter: "+".to_string(), header_provider: Some(Arc::new(provider) as Arc), + read_consistency_interval: None, }; // Header provider errors should fail the request diff --git a/rust/lancedb/src/remote/db.rs b/rust/lancedb/src/remote/db.rs index dfe5d0c99..c11584157 100644 --- a/rust/lancedb/src/remote/db.rs +++ b/rust/lancedb/src/remote/db.rs @@ -206,6 +206,7 @@ impl RemoteDatabase { host_override: Option, client_config: ClientConfig, options: RemoteOptions, + read_consistency_interval: Option, ) -> Result { let parsed = super::client::parse_db_url(uri)?; let header_map = RestfulLanceDbClient::::default_headers( @@ -233,6 +234,7 @@ impl RemoteDatabase { host_override, header_map, client_config.clone(), + read_consistency_interval, )?; let table_cache = Cache::builder() diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index 47831fee2..4f034ba43 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -62,15 +62,76 @@ use std::collections::HashMap; use std::io::Cursor; use std::pin::Pin; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use tokio::sync::RwLock; const REQUEST_TIMEOUT_HEADER: HeaderName = HeaderName::from_static("x-request-timeout-ms"); +const MIN_VERSION_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-version"); +const MIN_TIMESTAMP_HEADER: HeaderName = HeaderName::from_static("x-lancedb-min-timestamp"); const METRIC_TYPE_KEY: &str = "metric_type"; const INDEX_TYPE_KEY: &str = "index_type"; const SCHEMA_CACHE_TTL: Duration = Duration::from_secs(30); const SCHEMA_CACHE_REFRESH_WINDOW: Duration = Duration::from_secs(5); +/// Per-table state driving the freshness headers (`x-lancedb-min-version` and +/// `x-lancedb-min-timestamp`) sent on read requests. +#[derive(Debug, Default, Clone, Copy)] +struct FreshnessState { + /// Provides read-your-write within a single handle: writes that return a + /// version update this, and reads send it as `x-lancedb-min-version`. + min_version: Option, + /// Wall-clock time captured at the last [`BaseTable::checkout_latest`] + /// call. Subsequent reads send + /// `max(baseline, now - read_consistency_interval)` as + /// `x-lancedb-min-timestamp`. + /// + /// Without this, `checkout_latest()` would have no effect on subsequent + /// reads when `read_consistency_interval` is unset (the default): a + /// server-side cache could still serve a snapshot older than the moment + /// the user explicitly asked for "latest". The baseline forces the + /// server to skip any cache entry older than the checkout time, so the + /// `checkout_latest()` signal is preserved across reads on the same + /// handle regardless of the configured consistency interval. + checkout_baseline: Option, +} + +/// Snapshot of the headers that should be attached to a single read request. +#[derive(Debug, Default, Clone, Copy)] +struct FreshnessHeaders { + min_version: Option, + min_timestamp: Option, +} + +impl FreshnessHeaders { + fn apply(self, mut request: RequestBuilder) -> RequestBuilder { + if let Some(v) = self.min_version { + request = request.header(MIN_VERSION_HEADER, v.to_string()); + } + if let Some(ts) = self.min_timestamp { + let dt: chrono::DateTime = ts.into(); + request = request.header(MIN_TIMESTAMP_HEADER, dt.to_rfc3339()); + } + request + } +} + +fn compute_min_timestamp( + state: &FreshnessState, + interval: Option, + now: SystemTime, +) -> Option { + let interval_based = match interval { + None => None, + Some(d) if d.is_zero() => Some(now), + Some(d) => Some(now.checked_sub(d).unwrap_or(now)), + }; + match (interval_based, state.checkout_baseline) { + (None, None) => None, + (Some(t), None) | (None, Some(t)) => Some(t), + (Some(a), Some(b)) => Some(a.max(b)), + } +} + pub struct RemoteTags<'a, S: HttpSend = Sender> { inner: &'a RemoteTable, } @@ -80,8 +141,7 @@ impl Tags for RemoteTags<'_, S> { async fn list(&self) -> Result> { let request = self .inner - .client - .post(&format!("/v1/table/{}/tags/list/", self.inner.identifier)); + .post_read(&format!("/v1/table/{}/tags/list/", self.inner.identifier)); let (request_id, response) = self.inner.send(request, true).await?; let response = self .inner @@ -112,48 +172,13 @@ impl Tags for RemoteTags<'_, S> { } async fn get_version(&self, tag: &str) -> Result { - let request = self - .inner - .client - .post(&format!( - "/v1/table/{}/tags/version/", - self.inner.identifier - )) - .json(&serde_json::json!({ "tag": tag })); - - let (request_id, response) = self.inner.send(request, true).await?; - let response = self - .inner - .check_table_response(&request_id, response) - .await?; - - match response.text().await { - Ok(body) => { - let value: serde_json::Value = - serde_json::from_str(&body).map_err(|e| Error::Http { - source: format!("Failed to parse tag version: {}", e).into(), - request_id: request_id.clone(), - status_code: None, - })?; - - value - .get("version") - .and_then(|v| v.as_u64()) - .ok_or_else(|| Error::Http { - source: format!("Invalid tag version response: {}", body).into(), - request_id, - status_code: None, - }) - } - Err(err) => { - let status_code = err.status(); - Err(Error::Http { - source: Box::new(err), - request_id, - status_code, - }) - } - } + let request = self.inner.post_read(&format!( + "/v1/table/{}/tags/version/", + self.inner.identifier + )); + self.inner + .resolve_tag_version_with_request(tag, request) + .await } async fn create(&mut self, tag: &str, version: u64) -> Result<()> { @@ -215,6 +240,7 @@ pub struct RemoteTable { version: RwLock>, location: RwLock>, schema_cache: BackgroundCache, + freshness: Mutex, } impl std::fmt::Debug for RemoteTable { @@ -243,6 +269,7 @@ impl RemoteTable { version: RwLock::new(None), location: RwLock::new(None), schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW), + freshness: Mutex::new(FreshnessState::default()), } } @@ -252,12 +279,56 @@ impl RemoteTable { } async fn describe_version(&self, version: Option) -> Result { - let mut request = self - .client - .post(&format!("/v1/table/{}/describe/", self.identifier)); + let request = self.post_read(&format!("/v1/table/{}/describe/", self.identifier)); + self.describe_with_request(request, version).await + } + async fn resolve_tag_version_with_request( + &self, + tag: &str, + request: RequestBuilder, + ) -> Result { + let request = request.json(&serde_json::json!({ "tag": tag })); + + let (request_id, response) = self.send(request, true).await?; + let response = self.check_table_response(&request_id, response).await?; + + match response.text().await { + Ok(body) => { + let value: serde_json::Value = + serde_json::from_str(&body).map_err(|e| Error::Http { + source: format!("Failed to parse tag version: {}", e).into(), + request_id: request_id.clone(), + status_code: None, + })?; + + value + .get("version") + .and_then(|v| v.as_u64()) + .ok_or_else(|| Error::Http { + source: format!("Invalid tag version response: {}", body).into(), + request_id, + status_code: None, + }) + } + Err(err) => { + let status_code = err.status(); + Err(Error::Http { + source: Box::new(err), + request_id, + status_code, + }) + } + } + } + + async fn describe_with_request( + &self, + request: RequestBuilder, + version: Option, + ) -> Result { let body = serde_json::json!({ "version": version }); - request = request.json(&body); + let request = request.json(&body); let (request_id, response) = self.send(request, true).await?; @@ -711,14 +782,44 @@ impl RemoteTable { *read_guard } + /// Snapshot the freshness headers to attach to a single read request. + /// Computed at call time so that retries reuse the same snapshot. + fn snapshot_freshness_headers(&self) -> FreshnessHeaders { + let state = *self.freshness.lock().unwrap(); + FreshnessHeaders { + min_version: state.min_version, + min_timestamp: compute_min_timestamp( + &state, + self.client.read_consistency_interval, + SystemTime::now(), + ), + } + } + + /// Build a POST request and attach the read-freshness headers + /// (`x-lancedb-min-version`, `x-lancedb-min-timestamp`). + fn post_read(&self, uri: &str) -> RequestBuilder { + self.snapshot_freshness_headers() + .apply(self.client.post(uri)) + } + + /// Record a version returned by a write so subsequent reads can request at + /// least that version via `x-lancedb-min-version`. A returned `0` from a + /// backward-compatible old server is ignored. + fn track_write_version(&self, version: u64) { + if version == 0 { + return; + } + let mut state = self.freshness.lock().unwrap(); + state.min_version = Some(state.min_version.map_or(version, |v| v.max(version))); + } + async fn execute_query( &self, query: &AnyQuery, options: &QueryExecutionOptions, ) -> Result>>> { - let mut request = self - .client - .post(&format!("/v1/table/{}/query/", self.identifier)); + let mut request = self.post_read(&format!("/v1/table/{}/query/", self.identifier)); if let Some(timeout) = options.timeout { // Also send to server, so it can abort the query if it takes too long. @@ -824,9 +925,10 @@ async fn fetch_schema( identifier: &str, table_name: &str, version: Option, + freshness_headers: FreshnessHeaders, ) -> Result { - let request = client - .post(&format!("/v1/table/{}/describe/", identifier)) + let request = freshness_headers + .apply(client.post(&format!("/v1/table/{}/describe/", identifier))) .json(&serde_json::json!({ "version": version })); let (request_id, response) = client.send_with_retry(request, None, true).await?; @@ -874,7 +976,9 @@ mod test_utils { use super::*; use crate::remote::ClientConfig; use crate::remote::client::test_utils::client_with_handler; - use crate::remote::client::test_utils::{MockSender, client_with_handler_and_config}; + use crate::remote::client::test_utils::{ + MockSender, client_with_handler_and_config, client_with_handler_and_interval, + }; impl RemoteTable { pub fn new_mock(name: String, handler: F, version: Option) -> Self @@ -892,6 +996,30 @@ mod test_utils { version: RwLock::new(None), location: RwLock::new(None), schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW), + freshness: Mutex::new(FreshnessState::default()), + } + } + + pub fn new_mock_with_consistency_interval( + name: String, + handler: F, + read_consistency_interval: Option, + ) -> Self + where + F: Fn(reqwest::Request) -> http::Response + Send + Sync + 'static, + T: Into, + { + let client = client_with_handler_and_interval(handler, read_consistency_interval); + Self { + client, + name: name.clone(), + namespace: vec![], + identifier: name, + server_version: ServerVersion::default(), + version: RwLock::new(None), + location: RwLock::new(None), + schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW), + freshness: Mutex::new(FreshnessState::default()), } } @@ -923,6 +1051,7 @@ mod test_utils { version: RwLock::new(None), location: RwLock::new(None), schema_cache: BackgroundCache::new(SCHEMA_CACHE_TTL, SCHEMA_CACHE_REFRESH_WINDOW), + freshness: Mutex::new(FreshnessState::default()), } } } @@ -986,6 +1115,7 @@ impl RemoteTable { if output.overwrite { self.invalidate_schema_cache(); } + self.track_write_version(add_result.version); return Ok(add_result); } @@ -1023,6 +1153,7 @@ impl RemoteTable { if output.overwrite { self.invalidate_schema_cache(); } + self.track_write_version(result.version); return Ok(result); } Err(e) => { @@ -1139,8 +1270,13 @@ impl BaseTable for RemoteTable { self.describe().await.map(|desc| desc.version) } async fn checkout(&self, version: u64) -> Result<()> { - // check that the version exists - self.describe_version(Some(version)) + // Validate the version exists. The describe is sent without freshness + // headers so a stale `min_version` from a previous write doesn't ride + // along on an explicit time-travel request. + let request = self + .client + .post(&format!("/v1/table/{}/describe/", self.identifier)); + self.describe_with_request(request, Some(version)) .await .map_err(|e| match e { // try to map the error to a more user-friendly error telling them @@ -1156,6 +1292,10 @@ impl BaseTable for RemoteTable { *write_guard = Some(version); drop(write_guard); + // Explicit time-travel: drop any read-your-write / freshness + // constraints so the user sees exactly the requested version. + *self.freshness.lock().unwrap() = FreshnessState::default(); + // Invalidate schema cache since we're switching versions self.invalidate_schema_cache(); @@ -1166,6 +1306,13 @@ impl BaseTable for RemoteTable { *write_guard = None; drop(write_guard); + // Drop any per-handle write tracking; subsequent reads use the + // baseline timestamp captured now to guarantee freshness. + *self.freshness.lock().unwrap() = FreshnessState { + min_version: None, + checkout_baseline: Some(SystemTime::now()), + }; + // Invalidate schema cache since we're switching versions self.invalidate_schema_cache(); @@ -1186,9 +1333,7 @@ impl BaseTable for RemoteTable { } async fn list_versions(&self) -> Result> { - let request = self - .client - .post(&format!("/v1/table/{}/version/list/", self.identifier)); + let request = self.post_read(&format!("/v1/table/{}/version/list/", self.identifier)); let (request_id, response) = self.send(request, true).await?; let response = self.check_table_response(&request_id, response).await?; @@ -1221,19 +1366,25 @@ impl BaseTable for RemoteTable { let client = self.client.clone(); let identifier = self.identifier.clone(); let table_name = self.name.clone(); + let freshness_headers = self.snapshot_freshness_headers(); self.schema_cache .get(move || async move { - fetch_schema(&client, &identifier, &table_name, version).await + fetch_schema( + &client, + &identifier, + &table_name, + version, + freshness_headers, + ) + .await }) .await .map_err(unwrap_shared_error) } async fn count_rows(&self, filter: Option) -> Result { - let mut request = self - .client - .post(&format!("/v1/table/{}/count_rows/", self.identifier)); + let mut request = self.post_read(&format!("/v1/table/{}/count_rows/", self.identifier)); let version = self.current_version().await; @@ -1359,9 +1510,7 @@ impl BaseTable for RemoteTable { } async fn explain_plan(&self, query: &AnyQuery, verbose: bool) -> Result { - let base_request = self - .client - .post(&format!("/v1/table/{}/explain_plan/", self.identifier)); + let base_request = self.post_read(&format!("/v1/table/{}/explain_plan/", self.identifier)); let query_bodies = self.prepare_query_bodies(query).await?; let requests: Vec = query_bodies @@ -1408,9 +1557,7 @@ impl BaseTable for RemoteTable { query: &AnyQuery, _options: QueryExecutionOptions, ) -> Result { - let request = self - .client - .post(&format!("/v1/table/{}/analyze_plan/", self.identifier)); + let request = self.post_read(&format!("/v1/table/{}/analyze_plan/", self.identifier)); let query_bodies = self.prepare_query_bodies(query).await?; let requests: Vec = query_bodies @@ -1480,6 +1627,7 @@ impl BaseTable for RemoteTable { status_code: None, })?; + self.track_write_version(update_response.version); Ok(update_response) } @@ -1510,6 +1658,7 @@ impl BaseTable for RemoteTable { request_id, status_code: None, })?; + self.track_write_version(delete_response.version); Ok(delete_response) } @@ -1666,6 +1815,7 @@ impl BaseTable for RemoteTable { status_code: None, })?; + self.track_write_version(merge_insert_response.version); Ok(merge_insert_response) } @@ -1691,12 +1841,22 @@ impl BaseTable for RemoteTable { Ok(Box::new(RemoteTags { inner: self })) } async fn checkout_tag(&self, tag: &str) -> Result<()> { - let tags = self.tags().await?; - let version = tags.get_version(tag).await?; + // Resolve the tag without attaching freshness headers; a stale + // `min_version` from a previous write should not ride along on an + // explicit time-travel request. + let request = self + .client + .post(&format!("/v1/table/{}/tags/version/", self.identifier)); + let version = self.resolve_tag_version_with_request(tag, request).await?; + let mut write_guard = self.version.write().await; *write_guard = Some(version); drop(write_guard); + // Explicit time-travel: drop any read-your-write / freshness + // constraints so the user sees exactly the tagged version. + *self.freshness.lock().unwrap() = FreshnessState::default(); + // Invalidate schema cache since we're switching versions self.invalidate_schema_cache(); @@ -1747,6 +1907,7 @@ impl BaseTable for RemoteTable { })?; self.invalidate_schema_cache(); + self.track_write_version(result.version); Ok(result) } @@ -1801,6 +1962,7 @@ impl BaseTable for RemoteTable { })?; self.invalidate_schema_cache(); + self.track_write_version(result.version); Ok(result) } @@ -1828,15 +1990,14 @@ impl BaseTable for RemoteTable { })?; self.invalidate_schema_cache(); + self.track_write_version(result.version); Ok(result) } async fn list_indices(&self) -> Result> { // Make request to list the indices - let mut request = self - .client - .post(&format!("/v1/table/{}/index/list/", self.identifier)); + let mut request = self.post_read(&format!("/v1/table/{}/index/list/", self.identifier)); let version = self.current_version().await; let body = serde_json::json!({ "version": version }); request = request.json(&body); @@ -1900,7 +2061,7 @@ impl BaseTable for RemoteTable { } async fn index_stats(&self, index_name: &str) -> Result> { - let mut request = self.client.post(&format!( + let mut request = self.post_read(&format!( "/v1/table/{}/index/{}/stats/", self.identifier, index_name )); @@ -2012,9 +2173,7 @@ impl BaseTable for RemoteTable { } async fn stats(&self) -> Result { - let request = self - .client - .post(&format!("/v1/table/{}/stats/", self.identifier)); + let request = self.post_read(&format!("/v1/table/{}/stats/", self.identifier)); let (request_id, response) = self.send(request, true).await?; let response = self.check_table_response(&request_id, response).await?; let body = response.text().await.err_to_http(request_id.clone())?; @@ -6005,4 +6164,299 @@ mod tests { assert_eq!(create_count.load(Ordering::SeqCst), 2); assert_eq!(abort_count.load(Ordering::SeqCst), 1); } + + // ---- Read freshness header tests ------------------------------------ + + #[test] + fn test_compute_min_timestamp_combines_baseline_and_interval() { + let now = SystemTime::now(); + let baseline = now - Duration::from_secs(60); + + // No interval, no baseline -> no header. + assert_eq!( + compute_min_timestamp(&FreshnessState::default(), None, now), + None + ); + + // Baseline only -> baseline. + let state = FreshnessState { + min_version: None, + checkout_baseline: Some(baseline), + }; + assert_eq!(compute_min_timestamp(&state, None, now), Some(baseline)); + + // ZERO interval, no baseline -> now. + assert_eq!( + compute_min_timestamp(&FreshnessState::default(), Some(Duration::ZERO), now), + Some(now) + ); + + // Positive interval, no baseline -> now - interval. + assert_eq!( + compute_min_timestamp( + &FreshnessState::default(), + Some(Duration::from_secs(10)), + now + ), + Some(now - Duration::from_secs(10)) + ); + + // Both: pick the more-recent (i.e. tighter) constraint. + // baseline = now-60, now-interval = now-10. now-10 is newer. + let state = FreshnessState { + min_version: None, + checkout_baseline: Some(baseline), + }; + assert_eq!( + compute_min_timestamp(&state, Some(Duration::from_secs(10)), now), + Some(now - Duration::from_secs(10)) + ); + + // Both, baseline newer: pick baseline. + let recent_baseline = now - Duration::from_secs(5); + let state = FreshnessState { + min_version: None, + checkout_baseline: Some(recent_baseline), + }; + assert_eq!( + compute_min_timestamp(&state, Some(Duration::from_secs(60)), now), + Some(recent_baseline) + ); + } + + /// Allowed slop when comparing a header timestamp against a locally + /// captured wall-clock bound. Tests run fast enough that 1s is plenty. + const FRESHNESS_TOLERANCE: Duration = Duration::from_secs(1); + + fn capturing_handler( + body_for: F, + ) -> ( + impl Fn(reqwest::Request) -> http::Response + Clone + Send + Sync + 'static, + Arc>>, + ) + where + F: Fn(&str) -> String + Clone + Send + Sync + 'static, + { + let captured = Arc::new(std::sync::Mutex::new(None)); + let captured_c = captured.clone(); + let handler = move |request: reqwest::Request| { + *captured_c.lock().unwrap() = Some(request.headers().clone()); + let path = request.url().path().to_string(); + http::Response::builder() + .status(200) + .body(body_for(&path)) + .unwrap() + }; + (handler, captured) + } + + fn parse_min_timestamp(headers: &http::HeaderMap) -> SystemTime { + let value = headers + .get("x-lancedb-min-timestamp") + .expect("expected x-lancedb-min-timestamp header") + .to_str() + .unwrap(); + chrono::DateTime::parse_from_rfc3339(value) + .unwrap() + .with_timezone(&chrono::Utc) + .into() + } + + #[tokio::test] + async fn test_freshness_default_sends_no_headers() { + let (handler, captured) = capturing_handler(|_| "42".to_string()); + let table = Table::new_with_handler("my_table", handler); + + let _ = table.count_rows(None).await.unwrap(); + + let headers = captured.lock().unwrap().clone().unwrap(); + assert!(!headers.contains_key("x-lancedb-min-timestamp")); + assert!(!headers.contains_key("x-lancedb-min-version")); + } + + #[tokio::test] + async fn test_freshness_zero_interval_sends_now() { + let (handler, captured) = capturing_handler(|_| "42".to_string()); + let table = + Table::new_with_handler_and_interval("my_table", handler, Some(Duration::from_secs(0))); + + let before = SystemTime::now(); + table.count_rows(None).await.unwrap(); + let after = SystemTime::now(); + + let headers = captured.lock().unwrap().clone().unwrap(); + let sent = parse_min_timestamp(&headers); + assert!( + sent >= before - FRESHNESS_TOLERANCE && sent <= after + FRESHNESS_TOLERANCE, + "expected timestamp roughly equal to wall clock" + ); + assert!(!headers.contains_key("x-lancedb-min-version")); + } + + #[tokio::test] + async fn test_freshness_positive_interval_sends_now_minus_interval() { + let (handler, captured) = capturing_handler(|_| "42".to_string()); + let interval = Duration::from_secs(30); + let table = Table::new_with_handler_and_interval("my_table", handler, Some(interval)); + + let before = SystemTime::now(); + table.count_rows(None).await.unwrap(); + let after = SystemTime::now(); + + let headers = captured.lock().unwrap().clone().unwrap(); + let sent = parse_min_timestamp(&headers); + assert!( + sent >= before - interval - FRESHNESS_TOLERANCE + && sent <= after - interval + FRESHNESS_TOLERANCE, + "expected timestamp roughly equal to now - interval" + ); + } + + #[tokio::test] + async fn test_freshness_checkout_latest_sets_baseline() { + let (handler, captured) = capturing_handler(|path| match path { + "/v1/table/my_table/count_rows/" => "42".to_string(), + _ => panic!("unexpected path: {}", path), + }); + // No interval — only the baseline should drive the timestamp. + let table = Table::new_with_handler_and_interval("my_table", handler, None); + + let before_checkout = SystemTime::now(); + table.checkout_latest().await.unwrap(); + let after_checkout = SystemTime::now(); + + table.count_rows(None).await.unwrap(); + + let headers = captured.lock().unwrap().clone().unwrap(); + let sent = parse_min_timestamp(&headers); + assert!( + sent >= before_checkout - FRESHNESS_TOLERANCE + && sent <= after_checkout + FRESHNESS_TOLERANCE, + "expected timestamp captured at checkout_latest() time" + ); + assert!(!headers.contains_key("x-lancedb-min-version")); + } + + #[tokio::test] + async fn test_freshness_min_version_tracked_after_write() { + let (handler, captured) = capturing_handler(|path| match path { + "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), + "/v1/table/my_table/count_rows/" => "42".to_string(), + _ => panic!("unexpected path: {}", path), + }); + let table = Table::new_with_handler("my_table", handler); + + let _ = table.update().column("a", "a + 1").execute().await.unwrap(); + // Update headers also pass through captured; reset by reading after. + table.count_rows(None).await.unwrap(); + + let headers = captured.lock().unwrap().clone().unwrap(); + assert_eq!( + headers + .get("x-lancedb-min-version") + .unwrap() + .to_str() + .unwrap(), + "7" + ); + } + + /// Like `capturing_handler`, but keeps a per-path snapshot of the headers + /// from every request so tests can assert on a specific endpoint. + #[allow(clippy::type_complexity)] + fn path_capturing_handler( + body_for: F, + ) -> ( + impl Fn(reqwest::Request) -> http::Response + Clone + Send + Sync + 'static, + Arc>>, + ) + where + F: Fn(&str) -> String + Clone + Send + Sync + 'static, + { + let captured: Arc>> = + Arc::new(std::sync::Mutex::new(HashMap::new())); + let captured_c = captured.clone(); + let handler = move |request: reqwest::Request| { + let path = request.url().path().to_string(); + captured_c + .lock() + .unwrap() + .insert(path.clone(), request.headers().clone()); + http::Response::builder() + .status(200) + .body(body_for(&path)) + .unwrap() + }; + (handler, captured) + } + + #[tokio::test] + async fn test_freshness_checkout_validation_sends_no_min_version() { + // After a write bumps min_version, calling checkout(v) must not let + // that stale header ride along on the validating /describe/ request. + let (handler, captured) = path_capturing_handler(|path| match path { + "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), + "/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(), + _ => panic!("unexpected path: {}", path), + }); + let table = Table::new_with_handler("my_table", handler); + + table.update().column("a", "a + 1").execute().await.unwrap(); + table.checkout(5).await.unwrap(); + + let captured = captured.lock().unwrap(); + let describe_headers = captured + .get("/v1/table/my_table/describe/") + .expect("describe should have been called by checkout(v)"); + assert!( + !describe_headers.contains_key("x-lancedb-min-version"), + "checkout(v) describe must not carry stale min_version", + ); + assert!(!describe_headers.contains_key("x-lancedb-min-timestamp")); + } + + #[tokio::test] + async fn test_freshness_checkout_tag_resolve_sends_no_min_version() { + // Same invariant for checkout_tag: the tag-resolve request must not + // pick up a stale min_version from a prior write. + let (handler, captured) = path_capturing_handler(|path| match path { + "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), + "/v1/table/my_table/tags/version/" => r#"{"version":5}"#.to_string(), + _ => panic!("unexpected path: {}", path), + }); + let table = Table::new_with_handler("my_table", handler); + + table.update().column("a", "a + 1").execute().await.unwrap(); + table.checkout_tag("v_initial").await.unwrap(); + + let captured = captured.lock().unwrap(); + let resolve_headers = captured + .get("/v1/table/my_table/tags/version/") + .expect("tags/version should have been called by checkout_tag"); + assert!( + !resolve_headers.contains_key("x-lancedb-min-version"), + "checkout_tag resolve must not carry stale min_version", + ); + assert!(!resolve_headers.contains_key("x-lancedb-min-timestamp")); + } + + #[tokio::test] + async fn test_freshness_checkout_clears_min_version() { + let (handler, captured) = capturing_handler(|path| match path { + "/v1/table/my_table/update/" => r#"{"rows_updated":1,"version":7}"#.to_string(), + // checkout(5) needs to describe version 5 first + "/v1/table/my_table/describe/" => r#"{"version":5,"schema":{"fields":[]}}"#.to_string(), + "/v1/table/my_table/count_rows/" => "42".to_string(), + _ => panic!("unexpected path: {}", path), + }); + let table = Table::new_with_handler("my_table", handler); + + table.update().column("a", "a + 1").execute().await.unwrap(); + table.checkout(5).await.unwrap(); + table.count_rows(None).await.unwrap(); + + let headers = captured.lock().unwrap().clone().unwrap(); + assert!(!headers.contains_key("x-lancedb-min-version")); + assert!(!headers.contains_key("x-lancedb-min-timestamp")); + } } diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 6d639a95b..d4e8d3be3 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -686,6 +686,30 @@ mod test_utils { } } + pub fn new_with_handler_and_interval( + name: impl Into, + handler: impl Fn(reqwest::Request) -> http::Response + Clone + Send + Sync + 'static, + read_consistency_interval: Option, + ) -> Self + where + T: Into, + { + let inner = Arc::new( + crate::remote::table::RemoteTable::new_mock_with_consistency_interval( + name.into(), + handler.clone(), + read_consistency_interval, + ), + ); + let database = Arc::new(crate::remote::db::RemoteDatabase::new_mock(handler)); + Self { + inner, + database: Some(database), + // Registry is unused. + embedding_registry: Arc::new(MemoryRegistry::new()), + } + } + pub fn new_with_handler_version( name: impl Into, version: semver::Version, From 87bd6694b6c25b2aa36bc33bc7b346099d19768f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 26 May 2026 14:28:40 -0700 Subject: [PATCH 2/4] chore(deps): bump the rust-minor-patch group across 1 directory with 2 updates (#3440) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bumps the rust-minor-patch group with 2 updates in the / directory: [serde_json](https://github.com/serde-rs/json) and [aws-smithy-runtime](https://github.com/smithy-lang/smithy-rs). Updates `serde_json` from 1.0.149 to 1.0.150
Release notes

Sourced from serde_json's releases.

v1.0.150

Commits
  • a1ae73a Release 1.0.150
  • 1a360b0 Merge pull request #1324 from puneetdixit200/reject-non-string-enum-keys
  • 2037b63 Reject non-string enum object keys
  • 5d30df6 Resolve manual_assert_eq pedantic clippy lint
  • dc8003a Raise required compiler for preserve_order feature to 1.85
  • a42fa98 Unpin CI miri toolchain
  • 684a60e Pin CI miri to nightly-2026-02-11
  • 7c7da33 Raise required compiler to Rust 1.71
  • acf4850 Simplify Number::is_f64
  • 6b8ceab Resolve unnecessary_map_or clippy lint
  • Additional commits viewable in compare view

Updates `aws-smithy-runtime` from 1.11.1 to 1.11.3
Commits

Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) ---
Dependabot commands and options
You can trigger Dependabot actions by commenting on this PR: - `@dependabot rebase` will rebase this PR - `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@dependabot show ignore conditions` will show all of the ignore conditions of the specified dependency - `@dependabot ignore major version` will close this group update PR and stop Dependabot creating any more for the specific dependency's major version (unless you unignore this specific dependency's major version or upgrade to it yourself) - `@dependabot ignore minor version` will close this group update PR and stop Dependabot creating any more for the specific dependency's minor version (unless you unignore this specific dependency's minor version or upgrade to it yourself) - `@dependabot ignore ` will close this group update PR and stop Dependabot creating any more for the specific dependency (unless you unignore this specific dependency or upgrade to it yourself) - `@dependabot unignore ` will remove all of the ignore conditions of the specified dependency - `@dependabot unignore ` will remove the ignore condition of the specified dependency and ignore conditions
Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e8a9fce0..fec2d0c5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -976,15 +976,16 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.11.1" +version = "1.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0504b1ab12debb5959e5165ee5fe97dd387e7aa7ea6a477bfd7635dfe769a4f5" +checksum = "b8e6f5caf6fea86f8c2206541ab5857cfcda9013426cdbe8fa0098b9e2d32182" dependencies = [ "aws-smithy-async", "aws-smithy-http", "aws-smithy-http-client", "aws-smithy-observability", "aws-smithy-runtime-api", + "aws-smithy-schema", "aws-smithy-types", "bytes", "fastrand", @@ -1001,9 +1002,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.12.0" +version = "1.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b71a13df6ada0aafbf21a73bdfcdf9324cfa9df77d96b8446045be3cde61b42e" +checksum = "dc117c179ecf39a62a0a3f49f600e9ac26a7ad7dd172177999f83933af776c32" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api-macros", @@ -1029,10 +1030,21 @@ dependencies = [ ] [[package]] -name = "aws-smithy-types" -version = "1.4.7" +name = "aws-smithy-schema" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d73dbfbaa8e4bc57b9045137680b958d274823509a360abfd8e1d514d40c95c" +checksum = "7442cb268338f0eb8278140a107c046756aa01093d8ef5e99628d34ae09c94f5" +dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-types", + "http 1.4.0", +] + +[[package]] +name = "aws-smithy-types" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "056b66dbce2f81cc0c1e2b05bb402eb58f8a3530479d650efadd5bbae9a4050b" dependencies = [ "base64-simd", "bytes", @@ -8301,9 +8313,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.149" +version = "1.0.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +checksum = "e8014e44b4736ed0538adeecded0fce2a272f22dc9578a7eb6b2d9993c74cfb9" dependencies = [ "itoa", "memchr", From 7dba793629cb56b939c0c4183a7e52718edd25da Mon Sep 17 00:00:00 2001 From: devteamaegis Date: Tue, 26 May 2026 18:26:34 -0400 Subject: [PATCH 3/4] fix(rerankers): inverted scores and incorrect missing-FTS penalty in LinearCombinationReranker (#3437) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem `LinearCombinationReranker.merge_results` has two related bugs that make it return **inverted relevance rankings** — the least relevant document ranks first (closes #3154). ### Bug 1 — `_combine_score` subtracts from 1, inverting the final ranking ```python def _combine_score(self, vector_score, fts_score): return 1 - (self.weight * vector_score + (1 - self.weight) * fts_score) ``` Both `vector_score` (already converted via `_invert_score`) and `fts_score` (BM25 relevance) are in **higher-is-better** space. Wrapping the weighted average in `1 - (...)` flips the direction: a perfectly matching document (`vector_score=1, fts_score=1`) gets `_relevance_score = 0.0`, while a non-matching document gets a high score. ### Bug 2 — Documents missing an FTS score are rewarded, not penalised ```python fts_score = result.get("_score", fill) # fill=1.0 by default ``` When a document has no FTS match, `fts_score = fill = 1.0`. In `_combine_score` (with the bug-1 formula), this large value becomes a **negative penalty** via `1 - (... + 0.3 * 1.0)`, counterintuitively *boosting* the document's score. By contrast, missing vector results correctly receive `_invert_score(fill) = 0.0` (penalised). ## Fix **Bug 1** — remove the `1 -` inversion from `_combine_score`: ```python def _combine_score(self, vector_score, fts_score): return self.weight * vector_score + (1 - self.weight) * fts_score ``` **Bug 2** — use `1 - fill` for missing FTS scores so both penalties are symmetric (mirror of what `_invert_score(fill)` already does for missing vector scores): ```python fts_score = result.get("_score", 1 - fill) # was: fill ``` With `fill=1.0` (default): `1 - 1.0 = 0.0` — missing-FTS entries contribute `0` to the FTS term, identical to how missing-vector entries contribute `0` to the vector term. ## Verification Concrete example from the issue. With `weight=0.7`, `fill=1.0`: | Document | `_distance` | `_score` | Old `_relevance_score` | New `_relevance_score` | |----------|-------------|----------|------------------------|------------------------| | `apple orange` | 0.0 (best) | 2.41 (only FTS) | 0.30 (**wrong: ranked 2nd**) | 1.42 (**correct: ranked 1st**) | | `banana grape` | 0.9999 (worst) | — | 0.70 (**wrong: ranked 1st**) | 0.00 (**correct: ranked last**) | ## Tests Two regression tests added to `python/python/tests/test_rerankers.py`: - `test_linear_combination_best_match_ranks_first` — the document with the smallest distance **and** an FTS match must have the highest `_relevance_score`. - `test_linear_combination_missing_fts_is_penalised` — a document with any FTS score must beat an otherwise-equal document with no FTS match. --------- Co-authored-by: Will Jones --- .../lancedb/rerankers/linear_combination.py | 17 +++- python/python/tests/test_rerankers.py | 86 +++++++++++++++++++ 2 files changed, 100 insertions(+), 3 deletions(-) diff --git a/python/python/lancedb/rerankers/linear_combination.py b/python/python/lancedb/rerankers/linear_combination.py index 9f1d645c9..74f23ea61 100644 --- a/python/python/lancedb/rerankers/linear_combination.py +++ b/python/python/lancedb/rerankers/linear_combination.py @@ -102,8 +102,15 @@ class LinearCombinationReranker(Reranker): combined_list = [] for row_id, result in results.items(): + # Convert vector distance to a relevance score in [0, 1] where + # higher is better. Missing vector entries are penalised with + # `_invert_score(fill)` = 1 - fill (= 0.0 for the default fill=1). vector_score = self._invert_score(result.get("_distance", fill)) - fts_score = result.get("_score", fill) + # FTS scores (BM25) are already in a "higher = more relevant" space. + # Missing FTS entries are penalised symmetrically: we use + # `1 - fill` so that the same `fill` value drives both missing-vector + # and missing-FTS penalties in the same direction. + fts_score = result.get("_score", 1 - fill) result["_relevance_score"] = self._combine_score(vector_score, fts_score) combined_list.append(result) @@ -123,8 +130,12 @@ class LinearCombinationReranker(Reranker): return tbl def _combine_score(self, vector_score, fts_score): - # these scores represent distance - return 1 - (self.weight * vector_score + (1 - self.weight) * fts_score) + # Both vector_score (inverted distance) and fts_score are in a + # "higher = more relevant" space. A straight weighted average gives + # higher _relevance_score to better matches, as expected. + # Previously this returned `1 - (...)` which inverted the final + # ranking so that the *least* relevant document ranked first. + return self.weight * vector_score + (1 - self.weight) * fts_score def _invert_score(self, dist: float): # Invert the score between relevance and distance diff --git a/python/python/tests/test_rerankers.py b/python/python/tests/test_rerankers.py index 3d028cb3a..c886772bb 100644 --- a/python/python/tests/test_rerankers.py +++ b/python/python/tests/test_rerankers.py @@ -603,3 +603,89 @@ def test_cross_encoder_reranker_return_all(tmp_path): assert "_relevance_score" in result.column_names assert "_score" in result.column_names assert "_distance" in result.column_names + + +# --------------------------------------------------------------------------- +# Regression tests for LinearCombinationReranker scoring bugs (issue #3154) +# --------------------------------------------------------------------------- + + +def test_linear_combination_best_match_ranks_first(): + """ + The document that is BOTH the closest vector match AND the only FTS match + must rank first. Previously _combine_score subtracted from 1, inverting + the ranking so the worst document ranked highest. + """ + reranker = LinearCombinationReranker(weight=0.7, return_score="all") + + # rowid 0: perfect vector match, sole FTS match → should rank 1st + # rowid 1: mediocre vector, no FTS match + # rowid 2: bad vector, no FTS match + vector_results = pa.Table.from_pydict( + { + "_rowid": [0, 1, 2], + "_distance": [0.0, 0.5, 0.9], + } + ) + fts_results = pa.Table.from_pydict( + { + "_rowid": [0], + "_score": [1.0], + } + ) + + combined = reranker.merge_results(vector_results, fts_results, fill=1.0) + scores = dict( + zip( + combined["_rowid"].to_pylist(), + combined["_relevance_score"].to_pylist(), + ) + ) + + # rowid 0 must have the highest relevance score + assert scores[0] > scores[1], ( + f"Best match (rowid 0, score={scores[0]:.4f}) should beat " + f"mid match (rowid 1, score={scores[1]:.4f})" + ) + assert scores[1] > scores[2], ( + f"Mid match (rowid 1, score={scores[1]:.4f}) should beat " + f"bad match (rowid 2, score={scores[2]:.4f})" + ) + + +def test_linear_combination_missing_fts_is_penalised(): + """ + A document with no FTS match must score *lower* than a document that + has a mediocre FTS match, everything else being equal. Previously + missing-FTS entries used fill=1.0 directly, which gave them a reward + (via the 1-(...) inversion) instead of a penalty. + """ + reranker = LinearCombinationReranker(weight=0.5, return_score="all") + + vector_results = pa.Table.from_pydict( + { + "_rowid": [0, 1], + "_distance": [0.2, 0.2], # identical vector scores + } + ) + fts_results = pa.Table.from_pydict( + { + "_rowid": [0], # rowid 1 has no FTS match + "_score": [0.3], # small FTS score + } + ) + + combined = reranker.merge_results(vector_results, fts_results, fill=1.0) + scores = dict( + zip( + combined["_rowid"].to_pylist(), + combined["_relevance_score"].to_pylist(), + ) + ) + + # rowid 0 has a small FTS score; rowid 1 has none. + # Even a small FTS contribution should beat having none at all. + assert scores[0] > scores[1], ( + f"Document with FTS score (rowid 0, {scores[0]:.4f}) should beat " + f"document with no FTS match (rowid 1, {scores[1]:.4f})" + ) From a7d9f2e99dfd1e80278e8c5c0ffbc6b4862398d9 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Tue, 26 May 2026 17:35:28 -0700 Subject: [PATCH 4/4] fix: remove primary key constraint from MemWAL bucket sharding (#3435) ## Summary - Bump lance dependency from `v7.0.0-beta.13` to `v7.0.0-rc.1` - Remove PK constraint from `LsmWriteSpec::Bucket` docs and `Table::set_lsm_write_spec` docs - Remove test assertions that expected rejection when no PK is set or when bucket column != PK Closes https://github.com/lance-format/lance/issues/6917 --- Cargo.lock | 74 +++++++++++----------- Cargo.toml | 28 ++++---- nodejs/__test__/connection.test.ts | 32 ++++++---- python/python/tests/test_db.py | 9 ++- python/python/tests/test_lsm_write_spec.py | 11 ---- rust/lancedb/src/table.rs | 36 ++--------- 6 files changed, 82 insertions(+), 108 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fec2d0c5a..1db3d8485 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3296,8 +3296,8 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" [[package]] name = "fsst" -version = "7.0.0-beta.13" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1" +version = "7.0.0-rc.1" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-rc.1#06f52e901dd9bd99d299263130fb2bda0a3f91af" dependencies = [ "arrow-array", "rand 0.9.4", @@ -4518,8 +4518,8 @@ checksum = "e037a2e1d8d5fdbd49b16a4ea09d5d6401c1f29eca5ff29d03d3824dba16256a" [[package]] name = "lance" -version = "7.0.0-beta.13" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1" +version = "7.0.0-rc.1" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-rc.1#06f52e901dd9bd99d299263130fb2bda0a3f91af" dependencies = [ "arc-swap", "arrow", @@ -4537,6 +4537,7 @@ dependencies = [ "async_cell", "aws-credential-types", "aws-sdk-dynamodb", + "bitpacking", "byteorder", "bytes", "chrono", @@ -4566,6 +4567,7 @@ dependencies = [ "lance-table", "lance-tokenizer", "log", + "moka", "object_store", "permutation", "pin-project", @@ -4589,8 +4591,8 @@ dependencies = [ [[package]] name = "lance-arrow" -version = "7.0.0-beta.13" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1" +version = "7.0.0-rc.1" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-rc.1#06f52e901dd9bd99d299263130fb2bda0a3f91af" dependencies = [ "arrow-array", "arrow-buffer", @@ -4610,8 +4612,8 @@ dependencies = [ [[package]] name = "lance-bitpacking" -version = "7.0.0-beta.13" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1" +version = "7.0.0-rc.1" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-rc.1#06f52e901dd9bd99d299263130fb2bda0a3f91af" dependencies = [ "arrayref", "paste", @@ -4620,8 +4622,8 @@ dependencies = [ [[package]] name = "lance-core" -version = "7.0.0-beta.13" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1" +version = "7.0.0-rc.1" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-rc.1#06f52e901dd9bd99d299263130fb2bda0a3f91af" dependencies = [ "arrow-array", "arrow-buffer", @@ -4656,8 +4658,8 @@ dependencies = [ [[package]] name = "lance-datafusion" -version = "7.0.0-beta.13" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1" +version = "7.0.0-rc.1" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-rc.1#06f52e901dd9bd99d299263130fb2bda0a3f91af" dependencies = [ "arrow", "arrow-array", @@ -4687,8 +4689,8 @@ dependencies = [ [[package]] name = "lance-datagen" -version = "7.0.0-beta.13" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1" +version = "7.0.0-rc.1" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-rc.1#06f52e901dd9bd99d299263130fb2bda0a3f91af" dependencies = [ "arrow", "arrow-array", @@ -4706,8 +4708,8 @@ dependencies = [ [[package]] name = "lance-encoding" -version = "7.0.0-beta.13" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1" +version = "7.0.0-rc.1" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-rc.1#06f52e901dd9bd99d299263130fb2bda0a3f91af" dependencies = [ "arrow-arith", "arrow-array", @@ -4742,8 +4744,8 @@ dependencies = [ [[package]] name = "lance-file" -version = "7.0.0-beta.13" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1" +version = "7.0.0-rc.1" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-rc.1#06f52e901dd9bd99d299263130fb2bda0a3f91af" dependencies = [ "arrow-arith", "arrow-array", @@ -4774,8 +4776,8 @@ dependencies = [ [[package]] name = "lance-index" -version = "7.0.0-beta.13" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1" +version = "7.0.0-rc.1" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-rc.1#06f52e901dd9bd99d299263130fb2bda0a3f91af" dependencies = [ "arc-swap", "arrow", @@ -4839,8 +4841,8 @@ dependencies = [ [[package]] name = "lance-io" -version = "7.0.0-beta.13" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1" +version = "7.0.0-rc.1" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-rc.1#06f52e901dd9bd99d299263130fb2bda0a3f91af" dependencies = [ "arrow", "arrow-arith", @@ -4882,8 +4884,8 @@ dependencies = [ [[package]] name = "lance-linalg" -version = "7.0.0-beta.13" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1" +version = "7.0.0-rc.1" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-rc.1#06f52e901dd9bd99d299263130fb2bda0a3f91af" dependencies = [ "arrow-array", "arrow-buffer", @@ -4899,8 +4901,8 @@ dependencies = [ [[package]] name = "lance-namespace" -version = "7.0.0-beta.13" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1" +version = "7.0.0-rc.1" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-rc.1#06f52e901dd9bd99d299263130fb2bda0a3f91af" dependencies = [ "arrow", "async-trait", @@ -4912,8 +4914,8 @@ dependencies = [ [[package]] name = "lance-namespace-impls" -version = "7.0.0-beta.13" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1" +version = "7.0.0-rc.1" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-rc.1#06f52e901dd9bd99d299263130fb2bda0a3f91af" dependencies = [ "arrow", "arrow-ipc", @@ -4948,9 +4950,9 @@ dependencies = [ [[package]] name = "lance-namespace-reqwest-client" -version = "0.7.6" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f65e31bdaa13e01dab6e7cf566da31df243c34a542f0d915d3601ec0e01e61d2" +checksum = "6369eee4682fb11edf538388b43c61ce288b8302fe89bb40944d7daa7faaae99" dependencies = [ "reqwest 0.12.28", "serde", @@ -4962,8 +4964,8 @@ dependencies = [ [[package]] name = "lance-table" -version = "7.0.0-beta.13" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1" +version = "7.0.0-rc.1" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-rc.1#06f52e901dd9bd99d299263130fb2bda0a3f91af" dependencies = [ "arrow", "arrow-array", @@ -5002,8 +5004,8 @@ dependencies = [ [[package]] name = "lance-testing" -version = "7.0.0-beta.13" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1" +version = "7.0.0-rc.1" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-rc.1#06f52e901dd9bd99d299263130fb2bda0a3f91af" dependencies = [ "arrow-array", "arrow-schema", @@ -5014,8 +5016,8 @@ dependencies = [ [[package]] name = "lance-tokenizer" -version = "7.0.0-beta.13" -source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-beta.13#929166e3ff51ed61b1fa42de2c63feaf51967ea1" +version = "7.0.0-rc.1" +source = "git+https://github.com/lance-format/lance.git?tag=v7.0.0-rc.1#06f52e901dd9bd99d299263130fb2bda0a3f91af" dependencies = [ "jieba-rs", "lindera", diff --git a/Cargo.toml b/Cargo.toml index 1c87a4f25..f829a878a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,20 +13,20 @@ categories = ["database-implementations"] rust-version = "1.91.0" [workspace.dependencies] -lance = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" } -lance-core = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" } -lance-datagen = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" } -lance-file = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" } -lance-io = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" } -lance-index = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" } -lance-linalg = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" } -lance-namespace = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" } -lance-namespace-impls = { "version" = "=7.0.0-beta.13", default-features = false, "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" } -lance-table = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" } -lance-testing = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" } -lance-datafusion = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" } -lance-encoding = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" } -lance-arrow = { "version" = "=7.0.0-beta.13", "tag" = "v7.0.0-beta.13", "git" = "https://github.com/lance-format/lance.git" } +lance = { "version" = "=7.0.0-rc.1", default-features = false, "tag" = "v7.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" } +lance-core = { "version" = "=7.0.0-rc.1", "tag" = "v7.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" } +lance-datagen = { "version" = "=7.0.0-rc.1", "tag" = "v7.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" } +lance-file = { "version" = "=7.0.0-rc.1", "tag" = "v7.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" } +lance-io = { "version" = "=7.0.0-rc.1", default-features = false, "tag" = "v7.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" } +lance-index = { "version" = "=7.0.0-rc.1", "tag" = "v7.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" } +lance-linalg = { "version" = "=7.0.0-rc.1", "tag" = "v7.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" } +lance-namespace = { "version" = "=7.0.0-rc.1", "tag" = "v7.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" } +lance-namespace-impls = { "version" = "=7.0.0-rc.1", default-features = false, "tag" = "v7.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" } +lance-table = { "version" = "=7.0.0-rc.1", "tag" = "v7.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" } +lance-testing = { "version" = "=7.0.0-rc.1", "tag" = "v7.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" } +lance-datafusion = { "version" = "=7.0.0-rc.1", "tag" = "v7.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" } +lance-encoding = { "version" = "=7.0.0-rc.1", "tag" = "v7.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" } +lance-arrow = { "version" = "=7.0.0-rc.1", "tag" = "v7.0.0-rc.1", "git" = "https://github.com/lance-format/lance.git" } ahash = "0.8" # Note that this one does not include pyarrow arrow = { version = "58.0.0", optional = false } diff --git a/nodejs/__test__/connection.test.ts b/nodejs/__test__/connection.test.ts index d195a0ca4..68180471a 100644 --- a/nodejs/__test__/connection.test.ts +++ b/nodejs/__test__/connection.test.ts @@ -171,18 +171,22 @@ describe("given a connection", () => { let manifestDir = tmpDir.name + "/test_manifest_paths_v2_empty.lance/_versions"; - readdirSync(manifestDir).forEach((file) => { - expect(file).toMatch(/^\d{20}\.manifest$/); - }); + readdirSync(manifestDir) + .filter((f) => f.endsWith(".manifest")) + .forEach((file) => { + expect(file).toMatch(/^\d{20}\.manifest$/); + }); table = (await db.createTable("test_manifest_paths_v2", [{ id: 1 }], { enableV2ManifestPaths: true, })) as LocalTable; expect(await table.usesV2ManifestPaths()).toBe(true); manifestDir = tmpDir.name + "/test_manifest_paths_v2.lance/_versions"; - readdirSync(manifestDir).forEach((file) => { - expect(file).toMatch(/^\d{20}\.manifest$/); - }); + readdirSync(manifestDir) + .filter((f) => f.endsWith(".manifest")) + .forEach((file) => { + expect(file).toMatch(/^\d{20}\.manifest$/); + }); }); it("should be able to migrate tables to the V2 manifest paths", async () => { @@ -199,16 +203,20 @@ describe("given a connection", () => { const manifestDir = tmpDir.name + "/test_manifest_path_migration.lance/_versions"; - readdirSync(manifestDir).forEach((file) => { - expect(file).toMatch(/^\d\.manifest$/); - }); + readdirSync(manifestDir) + .filter((f) => f.endsWith(".manifest")) + .forEach((file) => { + expect(file).toMatch(/^\d\.manifest$/); + }); await table.migrateManifestPathsV2(); expect(await table.usesV2ManifestPaths()).toBe(true); - readdirSync(manifestDir).forEach((file) => { - expect(file).toMatch(/^\d{20}\.manifest$/); - }); + readdirSync(manifestDir) + .filter((f) => f.endsWith(".manifest")) + .forEach((file) => { + expect(file).toMatch(/^\d{20}\.manifest$/); + }); }); }); diff --git a/python/python/tests/test_db.py b/python/python/tests/test_db.py index d3db372de..9495fb330 100644 --- a/python/python/tests/test_db.py +++ b/python/python/tests/test_db.py @@ -466,7 +466,8 @@ async def test_create_table_v2_manifest_paths_async(tmp_path): assert await tbl.uses_v2_manifest_paths() manifests_dir = tmp_path / "test_v2_manifest_paths.lance" / "_versions" for manifest in os.listdir(manifests_dir): - assert re.match(r"\d{20}\.manifest", manifest) + if manifest.endswith(".manifest"): + assert re.match(r"\d{20}\.manifest", manifest) # Start a table in V1 mode then migrate tbl = await db_no_v2_paths.create_table( @@ -476,13 +477,15 @@ async def test_create_table_v2_manifest_paths_async(tmp_path): assert not await tbl.uses_v2_manifest_paths() manifests_dir = tmp_path / "test_v2_migration.lance" / "_versions" for manifest in os.listdir(manifests_dir): - assert re.match(r"\d\.manifest", manifest) + if manifest.endswith(".manifest"): + assert re.match(r"\d\.manifest", manifest) await tbl.migrate_manifest_paths_v2() assert await tbl.uses_v2_manifest_paths() for manifest in os.listdir(manifests_dir): - assert re.match(r"\d{20}\.manifest", manifest) + if manifest.endswith(".manifest"): + assert re.match(r"\d{20}\.manifest", manifest) @pytest.mark.asyncio diff --git a/python/python/tests/test_lsm_write_spec.py b/python/python/tests/test_lsm_write_spec.py index b81153994..d9d75d3a9 100644 --- a/python/python/tests/test_lsm_write_spec.py +++ b/python/python/tests/test_lsm_write_spec.py @@ -40,16 +40,6 @@ def _make_table(tmp_path): def test_set_lsm_write_spec_validates(tmp_path): _db, table = _make_table(tmp_path) - # No PK set yet. - with pytest.raises(Exception, match="primary key"): - table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4)) - - table.set_unenforced_primary_key("id") - - # Column mismatch. - with pytest.raises(Exception, match="match"): - table.set_lsm_write_spec(LsmWriteSpec.bucket("v", 4)) - # Out-of-range num_buckets. with pytest.raises(Exception, match="num_buckets"): table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 0)) @@ -70,7 +60,6 @@ def test_unset_lsm_write_spec(tmp_path): table.unset_lsm_write_spec() # Install a spec, then remove it; afterwards a fresh spec can be set. - table.set_unenforced_primary_key("id") table.set_lsm_write_spec(LsmWriteSpec.bucket("id", 4)) table.unset_lsm_write_spec() # A second unset errors — there is no spec left to remove. diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index d4e8d3be3..60debbd22 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -312,17 +312,15 @@ pub use self::merge::MergeResult; /// date) and [`LsmWriteSpec::with_writer_config_defaults`] (default /// `ShardWriter` configuration recorded in the MemWAL index). /// -/// All variants require the table to have an unenforced primary key. -/// /// Install a spec with [`Table::set_lsm_write_spec`] and remove it with /// [`Table::unset_lsm_write_spec`]. The actual `merge_insert` dispatch /// onto the MemWAL writer is a follow-up. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum LsmWriteSpec { - /// Hash-bucket sharding by the unenforced primary key column. + /// Hash-bucket sharding by a scalar column. /// - /// `column` must equal the table's currently-set single-column - /// unenforced primary key. `num_buckets` must be in `[1, 1024]`. + /// `column` must be a non-nested column with a supported scalar type. + /// `num_buckets` must be in `[1, 1024]`. /// Iceberg-compatible Murmur3-x86-32 (seed 0) is used so each row's /// `bucket(column, num_buckets)` value is stable across processes. Bucket { @@ -1360,21 +1358,15 @@ impl Table { /// /// [`LsmWriteSpec`] chooses one of three sharding strategies: /// - /// - [`LsmWriteSpec::bucket`] — hash-bucket writes by the single-column - /// unenforced primary key. + /// - [`LsmWriteSpec::bucket`] — hash-bucket writes by a scalar column. /// - [`LsmWriteSpec::identity`] — shard by the raw value of a scalar column. /// - [`LsmWriteSpec::unsharded`] — route every write to a single shard. /// - /// All variants require the table to have an unenforced primary key - /// ([`Table::set_unenforced_primary_key`]); bucket sharding additionally - /// requires it to be the single column being bucketed. - /// /// # Example /// /// ``` /// # use lancedb::table::{LsmWriteSpec, Table}; /// # async fn example(table: &Table) -> Result<(), Box> { - /// table.set_unenforced_primary_key(["id"]).await?; /// table /// .set_lsm_write_spec( /// LsmWriteSpec::bucket("id", 16).with_maintained_indexes(["id_idx"]), @@ -4661,21 +4653,6 @@ mod tests { .unwrap(); let table = conn.create_table("t", reader).execute().await.unwrap(); - // Reject when no PK is set. - let err = table - .set_lsm_write_spec(LsmWriteSpec::bucket("id", 4)) - .await - .expect_err("should reject without PK"); - assert!(matches!(err, Error::Lance { .. }), "got {:?}", err); - - // Set PK, then a mismatched column on the spec must be rejected. - table.set_unenforced_primary_key(["id"]).await.unwrap(); - let err = table - .set_lsm_write_spec(LsmWriteSpec::bucket("name", 4)) - .await - .expect_err("should reject column != PK"); - assert!(matches!(err, Error::Lance { .. }), "got {:?}", err); - // Reject num_buckets out of range. for bad in [0u32, 1025] { let err = table @@ -4741,9 +4718,6 @@ mod tests { .unwrap(); let table = conn.create_table("t", reader).execute().await.unwrap(); - // Lance's MemWAL still requires *some* unenforced primary key on - // the dataset; Unsharded just skips the per-row hashing step. - table.set_unenforced_primary_key(["id"]).await.unwrap(); table .set_lsm_write_spec(LsmWriteSpec::unsharded()) .await @@ -4790,7 +4764,6 @@ mod tests { .unwrap(); let table = conn.create_table("t", reader).execute().await.unwrap(); - table.set_unenforced_primary_key(["id"]).await.unwrap(); table .set_lsm_write_spec( LsmWriteSpec::identity("region") @@ -4846,7 +4819,6 @@ mod tests { table.unset_lsm_write_spec().await.unwrap_err(); // Install a spec, then unset it. - table.set_unenforced_primary_key(["id"]).await.unwrap(); table .set_lsm_write_spec(LsmWriteSpec::bucket("id", 4)) .await