fix: revert change default read_consistency_interval=5s (#2327)

This reverts commit a547c523c2 or #2281

The current implementation can cause panics and performance degradation.
I will bring this back with more testing in
https://github.com/lancedb/lancedb/pull/2311

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Documentation**
- Enhanced clarity on read consistency settings with updated
descriptions and default behavior.
- Removed outdated warnings about eventual consistency from the
troubleshooting guide.

- **Refactor**
- Streamlined the handling of the read consistency interval across
integrations, now defaulting to "None" for improved performance.
  - Simplified internal logic to offer a more consistent experience.

- **Tests**
- Updated test expectations to reflect the new default representation
for the read consistency interval.
- Removed redundant tests related to "no consistency" settings for
streamlined testing.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
This commit is contained in:
Will Jones
2025-04-14 10:48:15 -05:00
committed by GitHub
parent 4708b60bb1
commit b3a4efd587
20 changed files with 83 additions and 246 deletions

View File

@@ -60,7 +60,7 @@ fn database_new(mut cx: FunctionContext) -> JsResult<JsPromise> {
let mut conn_builder = connect(&path).storage_options(storage_options);
if let Some(interval) = read_consistency_interval {
conn_builder = conn_builder.read_consistency_interval(Some(interval));
conn_builder = conn_builder.read_consistency_interval(interval);
}
rt.spawn(async move {
let database = conn_builder.execute().await;

View File

@@ -12,7 +12,7 @@ use super::{
Catalog, CatalogOptions, CreateDatabaseMode, CreateDatabaseRequest, DatabaseNamesRequest,
OpenDatabaseRequest,
};
use crate::connection::{ConnectRequest, DEFAULT_READ_CONSISTENCY_INTERVAL};
use crate::connection::ConnectRequest;
use crate::database::listing::{ListingDatabase, ListingDatabaseOptions};
use crate::database::{Database, DatabaseOptions};
use crate::error::{CreateDirSnafu, Error, Result};
@@ -214,7 +214,7 @@ impl Catalog for ListingCatalog {
uri: db_uri,
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
read_consistency_interval: None,
options: Default::default(),
};
@@ -241,7 +241,7 @@ impl Catalog for ListingCatalog {
uri: db_path.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
read_consistency_interval: None,
options: Default::default(),
};
@@ -311,7 +311,7 @@ mod tests {
#[cfg(feature = "remote")]
client_config: Default::default(),
options: Default::default(),
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
read_consistency_interval: None,
};
let catalog = ListingCatalog::connect(&request).await.unwrap();

View File

@@ -36,9 +36,6 @@ pub use lance_encoding::version::LanceFileVersion;
#[cfg(feature = "remote")]
use lance_io::object_store::StorageOptions;
pub(crate) const DEFAULT_READ_CONSISTENCY_INTERVAL: Option<std::time::Duration> =
Some(std::time::Duration::from_secs(5));
/// A builder for configuring a [`Connection::table_names`] operation
pub struct TableNamesBuilder {
parent: Arc<dyn Database>,
@@ -621,15 +618,14 @@ pub struct ConnectRequest {
/// The interval at which to check for updates from other processes.
///
/// If None, then consistency is not checked. For strong consistency, set this to
/// If None, then consistency is not checked. For performance
/// reasons, this is the default. For strong consistency, set this to
/// zero seconds. Then every read will check for updates from other
/// processes. As a compromise, you can set this to a non-zero timedelta
/// for eventual consistency. If more than that interval has passed since
/// the last check, then the table will be checked for updates. Note: this
/// consistency only applies to read operations. Write operations are
/// always consistent.
///
/// The default is 5 seconds.
pub read_consistency_interval: Option<std::time::Duration>,
}
@@ -647,7 +643,7 @@ impl ConnectBuilder {
uri: uri.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
read_consistency_interval: None,
options: HashMap::new(),
},
embedding_registry: None,
@@ -786,7 +782,8 @@ impl ConnectBuilder {
/// The interval at which to check for updates from other processes. This
/// only affects LanceDB OSS.
///
/// If left unset, consistency is not checked. For strong consistency, set this to
/// If left unset, consistency is not checked. For maximum read
/// performance, this is the default. For strong consistency, set this to
/// zero seconds. Then every read will check for updates from other processes.
/// As a compromise, set this to a non-zero duration for eventual consistency.
/// If more than that duration has passed since the last read, the read will
@@ -795,15 +792,13 @@ impl ConnectBuilder {
/// This only affects read operations. Write operations are always
/// consistent.
///
/// The default is 5 seconds.
///
/// LanceDB Cloud uses eventual consistency under the hood, and is not
/// currently configurable.
pub fn read_consistency_interval(
mut self,
read_consistency_interval: Option<std::time::Duration>,
read_consistency_interval: std::time::Duration,
) -> Self {
self.request.read_consistency_interval = read_consistency_interval;
self.request.read_consistency_interval = Some(read_consistency_interval);
self
}
@@ -887,7 +882,7 @@ impl CatalogConnectBuilder {
uri: uri.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
read_consistency_interval: None,
options: HashMap::new(),
},
}

View File

@@ -2629,7 +2629,7 @@ mod tests {
let dataset_path = tmp_dir.path().join("test.lance");
let uri = dataset_path.to_str().unwrap();
let conn = connect(uri)
.read_consistency_interval(Some(Duration::from_secs(0)))
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
@@ -2712,7 +2712,7 @@ mod tests {
let dataset_path = tmp_dir.path().join("test.lance");
let uri = dataset_path.to_str().unwrap();
let conn = connect(uri)
.read_consistency_interval(Some(Duration::from_secs(0)))
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
@@ -2909,7 +2909,7 @@ mod tests {
let dataset_path = tmp_dir.path().join("test.lance");
let uri = dataset_path.to_str().unwrap();
let conn = connect(uri)
.read_consistency_interval(Some(Duration::from_secs(0)))
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
@@ -3480,8 +3480,7 @@ mod tests {
let mut conn2 = ConnectBuilder::new(uri);
if let Some(interval) = interval {
conn2 = conn2
.read_consistency_interval(Some(std::time::Duration::from_millis(interval)));
conn2 = conn2.read_consistency_interval(std::time::Duration::from_millis(interval));
}
let conn2 = conn2.execute().await.unwrap();
let table2 = conn2.open_table("my_table").execute().await.unwrap();
@@ -3517,7 +3516,7 @@ mod tests {
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Some(Duration::from_secs(0)))
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
@@ -3538,7 +3537,7 @@ mod tests {
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Some(Duration::from_secs(0)))
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
@@ -3613,7 +3612,7 @@ mod tests {
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Some(Duration::from_secs(0)))
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
@@ -3675,7 +3674,7 @@ mod tests {
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Some(Duration::from_secs(0)))
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();

View File

@@ -7,7 +7,6 @@ use std::{
time::{self, Duration, Instant},
};
use futures::FutureExt;
use lance::Dataset;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
@@ -23,16 +22,13 @@ pub struct DatasetConsistencyWrapper(Arc<RwLock<DatasetRef>>);
///
/// The dataset is lazily loaded, and starts off as None. On the first access,
/// the dataset is loaded.
#[derive(Debug)]
#[derive(Debug, Clone)]
enum DatasetRef {
/// In this mode, the dataset is always the latest version.
Latest {
dataset: Dataset,
read_consistency_interval: Option<Duration>,
last_consistency_check: Option<time::Instant>,
/// A background task loading the next version of the dataset. This happens
/// in the background so as not to block the current thread.
refresh_task: Option<tokio::task::JoinHandle<Result<Dataset>>>,
},
/// In this mode, the dataset is a specific version. It cannot be mutated.
TimeTravel { dataset: Dataset, version: u64 },
@@ -45,18 +41,9 @@ impl DatasetRef {
Self::Latest {
dataset,
last_consistency_check,
refresh_task,
..
} => {
// Replace the refresh task
if let Some(refresh_task) = refresh_task {
refresh_task.abort();
}
let mut new_dataset = dataset.clone();
refresh_task.replace(tokio::spawn(async move {
new_dataset.checkout_latest().await?;
Ok(new_dataset)
}));
dataset.checkout_latest().await?;
last_consistency_check.replace(Instant::now());
}
Self::TimeTravel { dataset, version } => {
@@ -70,24 +57,26 @@ impl DatasetRef {
matches!(self, Self::Latest { .. })
}
fn strong_consistency(&self) -> bool {
matches!(
self,
Self::Latest { read_consistency_interval: Some(interval), .. }
if interval.as_nanos() == 0
)
async fn need_reload(&self) -> Result<bool> {
Ok(match self {
Self::Latest { dataset, .. } => {
dataset.latest_version_id().await? != dataset.version().version
}
Self::TimeTravel { dataset, version } => dataset.version().version != *version,
})
}
async fn as_latest(&mut self, read_consistency_interval: Option<Duration>) -> Result<()> {
match self {
Self::Latest { .. } => Ok(()),
Self::TimeTravel { dataset, .. } => {
dataset.checkout_latest().await?;
dataset
.checkout_version(dataset.latest_version_id().await?)
.await?;
*self = Self::Latest {
dataset: dataset.clone(),
read_consistency_interval,
last_consistency_check: Some(Instant::now()),
refresh_task: None,
};
Ok(())
}
@@ -125,74 +114,13 @@ impl DatasetRef {
match self {
Self::Latest {
dataset: ref mut ds,
refresh_task,
last_consistency_check,
..
} => {
*ds = dataset;
if let Some(refresh_task) = refresh_task {
refresh_task.abort();
}
*refresh_task = None;
*last_consistency_check = Some(Instant::now());
}
_ => unreachable!("Dataset should be in latest mode at this point"),
}
}
/// Wait for the background refresh task to complete.
async fn await_refresh(&mut self) -> Result<()> {
if let Self::Latest {
refresh_task: Some(refresh_task),
read_consistency_interval,
..
} = self
{
let dataset = refresh_task.await.expect("Refresh task panicked")?;
*self = Self::Latest {
dataset,
read_consistency_interval: *read_consistency_interval,
last_consistency_check: Some(Instant::now()),
refresh_task: None,
};
}
Ok(())
}
/// Check if background refresh task is done, and if so, update the dataset.
fn check_refresh(&mut self) -> Result<()> {
if let Self::Latest {
refresh_task: Some(refresh_task),
read_consistency_interval,
..
} = self
{
if refresh_task.is_finished() {
let dataset = refresh_task
.now_or_never()
.unwrap()
.expect("Refresh task panicked")?;
*self = Self::Latest {
dataset,
read_consistency_interval: *read_consistency_interval,
last_consistency_check: Some(Instant::now()),
refresh_task: None,
};
}
}
Ok(())
}
fn refresh_is_ready(&self) -> bool {
matches!(
self,
Self::Latest {
refresh_task: Some(refresh_task),
..
}
if refresh_task.is_finished()
)
}
}
impl DatasetConsistencyWrapper {
@@ -202,7 +130,6 @@ impl DatasetConsistencyWrapper {
dataset,
read_consistency_interval,
last_consistency_check: Some(Instant::now()),
refresh_task: None,
})))
}
@@ -261,9 +188,18 @@ impl DatasetConsistencyWrapper {
}
pub async fn reload(&self) -> Result<()> {
if !self.0.read().await.need_reload().await? {
return Ok(());
}
let mut write_guard = self.0.write().await;
write_guard.reload().await?;
write_guard.await_refresh().await
// on lock escalation -- check if someone else has already reloaded
if !write_guard.need_reload().await? {
return Ok(());
}
// actually need reloading
write_guard.reload().await
}
/// Returns the version, if in time travel mode, or None otherwise
@@ -309,26 +245,9 @@ impl DatasetConsistencyWrapper {
/// Ensures that the dataset is loaded and up-to-date with consistency and
/// version parameters.
async fn ensure_up_to_date(&self) -> Result<()> {
// We may have previously created a background task to fetch the new
// version of the dataset. If that task is done, we should update the
// dataset.
{
let read_guard = self.0.read().await;
if read_guard.refresh_is_ready() {
drop(read_guard);
self.0.write().await.check_refresh()?;
}
}
if !self.is_up_to_date().await? {
self.reload().await?;
}
// If we are in strong consistency mode, we should await the refresh task.
if self.0.read().await.strong_consistency() {
self.0.write().await.await_refresh().await?;
}
Ok(())
}
}
@@ -384,7 +303,7 @@ mod tests {
#[tokio::test]
async fn test_iops_open_strong_consistency() {
let db = connect("memory://")
.read_consistency_interval(Some(Duration::ZERO))
.read_consistency_interval(Duration::ZERO)
.execute()
.await
.expect("Failed to connect to database");