diff --git a/rust/lancedb/src/table/dataset.rs b/rust/lancedb/src/table/dataset.rs index 50f6b6472..bbec316c7 100644 --- a/rust/lancedb/src/table/dataset.rs +++ b/rust/lancedb/src/table/dataset.rs @@ -57,15 +57,6 @@ impl DatasetRef { matches!(self, Self::Latest { .. }) } - async fn need_reload(&self) -> Result { - Ok(match self { - Self::Latest { dataset, .. } => { - dataset.latest_version_id().await? != dataset.version().version - } - Self::TimeTravel { dataset, version } => dataset.version().version != *version, - }) - } - async fn as_latest(&mut self, read_consistency_interval: Option) -> Result<()> { match self { Self::Latest { .. } => Ok(()), @@ -118,6 +109,21 @@ impl DatasetRef { Ok(()) } + fn is_up_to_date(&self) -> bool { + match self { + Self::Latest { + read_consistency_interval, + last_consistency_check, + .. + } => match (read_consistency_interval, last_consistency_check) { + (None, _) => true, + (Some(_), None) => false, + (Some(interval), Some(last_check)) => last_check.elapsed() < *interval, + }, + Self::TimeTravel { dataset, version } => dataset.version().version == *version, + } + } + fn time_travel_version(&self) -> Option { match self { Self::Latest { .. } => None, @@ -205,18 +211,7 @@ impl DatasetConsistencyWrapper { } pub async fn reload(&self) -> Result<()> { - if !self.0.read().await.need_reload().await? { - return Ok(()); - } - - let mut write_guard = self.0.write().await; - // on lock escalation -- check if someone else has already reloaded - if !write_guard.need_reload().await? { - return Ok(()); - } - - // actually need reloading - write_guard.reload().await + self.0.write().await.reload().await } /// Returns the version, if in time travel mode, or None otherwise @@ -235,35 +230,20 @@ impl DatasetConsistencyWrapper { } } - async fn is_up_to_date(&self) -> Result { - let dataset_ref = self.0.read().await; - match &*dataset_ref { - DatasetRef::Latest { - read_consistency_interval, - last_consistency_check, - .. - } => match (read_consistency_interval, last_consistency_check) { - (None, _) => Ok(true), - (Some(_), None) => Ok(false), - (Some(read_consistency_interval), Some(last_consistency_check)) => { - if &last_consistency_check.elapsed() < read_consistency_interval { - Ok(true) - } else { - Ok(false) - } - } - }, - DatasetRef::TimeTravel { dataset, version } => { - Ok(dataset.version().version == *version) - } - } + async fn is_up_to_date(&self) -> bool { + self.0.read().await.is_up_to_date() } /// Ensures that the dataset is loaded and up-to-date with consistency and /// version parameters. async fn ensure_up_to_date(&self) -> Result<()> { - if !self.is_up_to_date().await? { - self.reload().await?; + if !self.is_up_to_date().await { + // Re-check under write lock — another task may have reloaded + // while we waited for the lock. + let mut write_guard = self.0.write().await; + if !write_guard.is_up_to_date() { + write_guard.reload().await?; + } } Ok(()) } @@ -351,4 +331,60 @@ mod tests { let stats = io_stats.incremental_stats(); assert_eq!(stats.read_iops, 1); } + + /// Regression test: before the fix, the reload fast-path (no version change) + /// did not reset `last_consistency_check`, causing a list call on every + /// subsequent query once the interval expired. + #[tokio::test] + async fn test_reload_resets_consistency_timer() { + let db = connect("memory://") + .read_consistency_interval(Duration::from_secs(1)) + .execute() + .await + .unwrap(); + let io_stats = IoStatsHolder::default(); + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + let table = db + .create_empty_table("test", schema) + .write_options(WriteOptions { + lance_write_params: Some(WriteParams { + store_params: Some(ObjectStoreParams { + object_store_wrapper: Some(Arc::new(io_stats.clone())), + ..Default::default() + }), + ..Default::default() + }), + }) + .execute() + .await + .unwrap(); + + let start = Instant::now(); + io_stats.incremental_stats(); // reset + + // Step 1: within interval — no list + table.schema().await.unwrap(); + let s = io_stats.incremental_stats(); + assert_eq!(s.read_iops, 0, "step 1, elapsed={:?}", start.elapsed()); + + // Step 2: still within interval — no list + table.schema().await.unwrap(); + let s = io_stats.incremental_stats(); + assert_eq!(s.read_iops, 0, "step 2, elapsed={:?}", start.elapsed()); + + // Step 3: sleep past the 1s boundary + tokio::time::sleep(Duration::from_secs(1)).await; + + // Step 4: interval expired — exactly 1 list, timer resets + table.schema().await.unwrap(); + let s = io_stats.incremental_stats(); + assert_eq!(s.read_iops, 1, "step 4, elapsed={:?}", start.elapsed()); + + // Step 5: 10 more calls — timer just reset, no lists (THIS is the regression test). + for _ in 0..10 { + table.schema().await.unwrap(); + } + let s = io_stats.incremental_stats(); + assert_eq!(s.read_iops, 0, "step 5, elapsed={:?}", start.elapsed()); + } }