From e081708cce2ffcceff309b4f2611c726eef5b89f Mon Sep 17 00:00:00 2001 From: LuQQiu Date: Mon, 16 Feb 2026 15:49:14 -0800 Subject: [PATCH] fix: non-stopping dataset version check after passing the first consistency check interval (#3034) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a table has a read consistency interval, queries within the interval skip the version check. Once the interval expires, a list call checks for new versions. If the version hasn't changed, the timer should reset so the next interval begins, but it didn't. The timer stayed expired, so every query after that triggered a list call, even though nothing changed. This affects all read operations (queries, schema lookups, searches) on tables with read_consistency_interval set. Each operation adds a list("_versions/") call to object storage, adding latency proportional to the store's list performance. For high-QPS workloads, this can saturate object store list throughput and significantly degrade query latency. Bug flow: 1. Every read operation (query, schema, search) calls ensure_up_to_date() 2. ensure_up_to_date() calls is_up_to_date(), which compares last_consistency_check.elapsed() against read_consistency_interval 3. If the interval has expired, it calls reload() 4. reload() calls need_reload(), which calls latest_version_id() — this is the list IOP (list("_versions/")) 5. If no new version, reload() returns early without resetting last_consistency_check 6. On the next query, step 2 sees the stale timer again → step 3 → step 4 → another list IOP 7. This repeats on every query forever --- rust/lancedb/src/table/dataset.rs | 126 +++++++++++++++++++----------- 1 file changed, 81 insertions(+), 45 deletions(-) diff --git a/rust/lancedb/src/table/dataset.rs b/rust/lancedb/src/table/dataset.rs index 50f6b6472..bbec316c7 100644 --- a/rust/lancedb/src/table/dataset.rs +++ b/rust/lancedb/src/table/dataset.rs @@ -57,15 +57,6 @@ impl DatasetRef { matches!(self, Self::Latest { .. }) } - async fn need_reload(&self) -> Result { - Ok(match self { - Self::Latest { dataset, .. } => { - dataset.latest_version_id().await? != dataset.version().version - } - Self::TimeTravel { dataset, version } => dataset.version().version != *version, - }) - } - async fn as_latest(&mut self, read_consistency_interval: Option) -> Result<()> { match self { Self::Latest { .. } => Ok(()), @@ -118,6 +109,21 @@ impl DatasetRef { Ok(()) } + fn is_up_to_date(&self) -> bool { + match self { + Self::Latest { + read_consistency_interval, + last_consistency_check, + .. + } => match (read_consistency_interval, last_consistency_check) { + (None, _) => true, + (Some(_), None) => false, + (Some(interval), Some(last_check)) => last_check.elapsed() < *interval, + }, + Self::TimeTravel { dataset, version } => dataset.version().version == *version, + } + } + fn time_travel_version(&self) -> Option { match self { Self::Latest { .. } => None, @@ -205,18 +211,7 @@ impl DatasetConsistencyWrapper { } pub async fn reload(&self) -> Result<()> { - if !self.0.read().await.need_reload().await? { - return Ok(()); - } - - let mut write_guard = self.0.write().await; - // on lock escalation -- check if someone else has already reloaded - if !write_guard.need_reload().await? { - return Ok(()); - } - - // actually need reloading - write_guard.reload().await + self.0.write().await.reload().await } /// Returns the version, if in time travel mode, or None otherwise @@ -235,35 +230,20 @@ impl DatasetConsistencyWrapper { } } - async fn is_up_to_date(&self) -> Result { - let dataset_ref = self.0.read().await; - match &*dataset_ref { - DatasetRef::Latest { - read_consistency_interval, - last_consistency_check, - .. - } => match (read_consistency_interval, last_consistency_check) { - (None, _) => Ok(true), - (Some(_), None) => Ok(false), - (Some(read_consistency_interval), Some(last_consistency_check)) => { - if &last_consistency_check.elapsed() < read_consistency_interval { - Ok(true) - } else { - Ok(false) - } - } - }, - DatasetRef::TimeTravel { dataset, version } => { - Ok(dataset.version().version == *version) - } - } + async fn is_up_to_date(&self) -> bool { + self.0.read().await.is_up_to_date() } /// Ensures that the dataset is loaded and up-to-date with consistency and /// version parameters. async fn ensure_up_to_date(&self) -> Result<()> { - if !self.is_up_to_date().await? { - self.reload().await?; + if !self.is_up_to_date().await { + // Re-check under write lock — another task may have reloaded + // while we waited for the lock. + let mut write_guard = self.0.write().await; + if !write_guard.is_up_to_date() { + write_guard.reload().await?; + } } Ok(()) } @@ -351,4 +331,60 @@ mod tests { let stats = io_stats.incremental_stats(); assert_eq!(stats.read_iops, 1); } + + /// Regression test: before the fix, the reload fast-path (no version change) + /// did not reset `last_consistency_check`, causing a list call on every + /// subsequent query once the interval expired. + #[tokio::test] + async fn test_reload_resets_consistency_timer() { + let db = connect("memory://") + .read_consistency_interval(Duration::from_secs(1)) + .execute() + .await + .unwrap(); + let io_stats = IoStatsHolder::default(); + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + let table = db + .create_empty_table("test", schema) + .write_options(WriteOptions { + lance_write_params: Some(WriteParams { + store_params: Some(ObjectStoreParams { + object_store_wrapper: Some(Arc::new(io_stats.clone())), + ..Default::default() + }), + ..Default::default() + }), + }) + .execute() + .await + .unwrap(); + + let start = Instant::now(); + io_stats.incremental_stats(); // reset + + // Step 1: within interval — no list + table.schema().await.unwrap(); + let s = io_stats.incremental_stats(); + assert_eq!(s.read_iops, 0, "step 1, elapsed={:?}", start.elapsed()); + + // Step 2: still within interval — no list + table.schema().await.unwrap(); + let s = io_stats.incremental_stats(); + assert_eq!(s.read_iops, 0, "step 2, elapsed={:?}", start.elapsed()); + + // Step 3: sleep past the 1s boundary + tokio::time::sleep(Duration::from_secs(1)).await; + + // Step 4: interval expired — exactly 1 list, timer resets + table.schema().await.unwrap(); + let s = io_stats.incremental_stats(); + assert_eq!(s.read_iops, 1, "step 4, elapsed={:?}", start.elapsed()); + + // Step 5: 10 more calls — timer just reset, no lists (THIS is the regression test). + for _ in 0..10 { + table.schema().await.unwrap(); + } + let s = io_stats.incremental_stats(); + assert_eq!(s.read_iops, 0, "step 5, elapsed={:?}", start.elapsed()); + } }