mirror of
https://github.com/lancedb/lancedb.git
synced 2026-05-14 10:30:40 +00:00
fix: non-stopping dataset version check after passing the first consistency check interval (#3034)
When a table has a read consistency interval, queries within the
interval skip the version check. Once the interval expires, a list call
checks for new versions. If the version hasn't changed, the timer should
reset so the next interval begins, but it didn't. The timer stayed
expired, so every query after that triggered a list call, even though
nothing changed.
This affects all read operations (queries, schema lookups, searches) on
tables with read_consistency_interval set. Each operation adds a
list("_versions/") call to object storage, adding latency proportional
to the store's list performance. For high-QPS workloads, this can
saturate object store list throughput and significantly degrade query
latency.
Bug flow:
1. Every read operation (query, schema, search) calls
ensure_up_to_date()
2. ensure_up_to_date() calls is_up_to_date(), which compares
last_consistency_check.elapsed() against
read_consistency_interval
3. If the interval has expired, it calls reload()
4. reload() calls need_reload(), which calls latest_version_id() — this
is the list IOP
(list("_versions/"))
5. If no new version, reload() returns early without resetting
last_consistency_check
6. On the next query, step 2 sees the stale timer again → step 3 → step
4 → another list IOP
7. This repeats on every query forever
This commit is contained in:
@@ -57,15 +57,6 @@ impl DatasetRef {
|
||||
matches!(self, Self::Latest { .. })
|
||||
}
|
||||
|
||||
async fn need_reload(&self) -> Result<bool> {
|
||||
Ok(match self {
|
||||
Self::Latest { dataset, .. } => {
|
||||
dataset.latest_version_id().await? != dataset.version().version
|
||||
}
|
||||
Self::TimeTravel { dataset, version } => dataset.version().version != *version,
|
||||
})
|
||||
}
|
||||
|
||||
async fn as_latest(&mut self, read_consistency_interval: Option<Duration>) -> Result<()> {
|
||||
match self {
|
||||
Self::Latest { .. } => Ok(()),
|
||||
@@ -118,6 +109,21 @@ impl DatasetRef {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn is_up_to_date(&self) -> bool {
|
||||
match self {
|
||||
Self::Latest {
|
||||
read_consistency_interval,
|
||||
last_consistency_check,
|
||||
..
|
||||
} => match (read_consistency_interval, last_consistency_check) {
|
||||
(None, _) => true,
|
||||
(Some(_), None) => false,
|
||||
(Some(interval), Some(last_check)) => last_check.elapsed() < *interval,
|
||||
},
|
||||
Self::TimeTravel { dataset, version } => dataset.version().version == *version,
|
||||
}
|
||||
}
|
||||
|
||||
fn time_travel_version(&self) -> Option<u64> {
|
||||
match self {
|
||||
Self::Latest { .. } => None,
|
||||
@@ -205,18 +211,7 @@ impl DatasetConsistencyWrapper {
|
||||
}
|
||||
|
||||
pub async fn reload(&self) -> Result<()> {
|
||||
if !self.0.read().await.need_reload().await? {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut write_guard = self.0.write().await;
|
||||
// on lock escalation -- check if someone else has already reloaded
|
||||
if !write_guard.need_reload().await? {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// actually need reloading
|
||||
write_guard.reload().await
|
||||
self.0.write().await.reload().await
|
||||
}
|
||||
|
||||
/// Returns the version, if in time travel mode, or None otherwise
|
||||
@@ -235,35 +230,20 @@ impl DatasetConsistencyWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
async fn is_up_to_date(&self) -> Result<bool> {
|
||||
let dataset_ref = self.0.read().await;
|
||||
match &*dataset_ref {
|
||||
DatasetRef::Latest {
|
||||
read_consistency_interval,
|
||||
last_consistency_check,
|
||||
..
|
||||
} => match (read_consistency_interval, last_consistency_check) {
|
||||
(None, _) => Ok(true),
|
||||
(Some(_), None) => Ok(false),
|
||||
(Some(read_consistency_interval), Some(last_consistency_check)) => {
|
||||
if &last_consistency_check.elapsed() < read_consistency_interval {
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
},
|
||||
DatasetRef::TimeTravel { dataset, version } => {
|
||||
Ok(dataset.version().version == *version)
|
||||
}
|
||||
}
|
||||
async fn is_up_to_date(&self) -> bool {
|
||||
self.0.read().await.is_up_to_date()
|
||||
}
|
||||
|
||||
/// Ensures that the dataset is loaded and up-to-date with consistency and
|
||||
/// version parameters.
|
||||
async fn ensure_up_to_date(&self) -> Result<()> {
|
||||
if !self.is_up_to_date().await? {
|
||||
self.reload().await?;
|
||||
if !self.is_up_to_date().await {
|
||||
// Re-check under write lock — another task may have reloaded
|
||||
// while we waited for the lock.
|
||||
let mut write_guard = self.0.write().await;
|
||||
if !write_guard.is_up_to_date() {
|
||||
write_guard.reload().await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -351,4 +331,60 @@ mod tests {
|
||||
let stats = io_stats.incremental_stats();
|
||||
assert_eq!(stats.read_iops, 1);
|
||||
}
|
||||
|
||||
/// Regression test: before the fix, the reload fast-path (no version change)
|
||||
/// did not reset `last_consistency_check`, causing a list call on every
|
||||
/// subsequent query once the interval expired.
|
||||
#[tokio::test]
|
||||
async fn test_reload_resets_consistency_timer() {
|
||||
let db = connect("memory://")
|
||||
.read_consistency_interval(Duration::from_secs(1))
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
let io_stats = IoStatsHolder::default();
|
||||
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
|
||||
let table = db
|
||||
.create_empty_table("test", schema)
|
||||
.write_options(WriteOptions {
|
||||
lance_write_params: Some(WriteParams {
|
||||
store_params: Some(ObjectStoreParams {
|
||||
object_store_wrapper: Some(Arc::new(io_stats.clone())),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
}),
|
||||
})
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let start = Instant::now();
|
||||
io_stats.incremental_stats(); // reset
|
||||
|
||||
// Step 1: within interval — no list
|
||||
table.schema().await.unwrap();
|
||||
let s = io_stats.incremental_stats();
|
||||
assert_eq!(s.read_iops, 0, "step 1, elapsed={:?}", start.elapsed());
|
||||
|
||||
// Step 2: still within interval — no list
|
||||
table.schema().await.unwrap();
|
||||
let s = io_stats.incremental_stats();
|
||||
assert_eq!(s.read_iops, 0, "step 2, elapsed={:?}", start.elapsed());
|
||||
|
||||
// Step 3: sleep past the 1s boundary
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
// Step 4: interval expired — exactly 1 list, timer resets
|
||||
table.schema().await.unwrap();
|
||||
let s = io_stats.incremental_stats();
|
||||
assert_eq!(s.read_iops, 1, "step 4, elapsed={:?}", start.elapsed());
|
||||
|
||||
// Step 5: 10 more calls — timer just reset, no lists (THIS is the regression test).
|
||||
for _ in 0..10 {
|
||||
table.schema().await.unwrap();
|
||||
}
|
||||
let s = io_stats.incremental_stats();
|
||||
assert_eq!(s.read_iops, 0, "step 5, elapsed={:?}", start.elapsed());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user