feat!: change default read_consistency_interval=5s (#2281)

Previously, when we loaded the next version of the table, we would block
all reads with a write lock. Now, we only do that if
`read_consistency_interval=0`. Otherwise, we load the next version
asynchronously in the background. This should mean that
`read_consistency_interval > 0` won't have a meaningful impact on
latency.

Along with this change, I felt it was safe to change the default
consistency interval to 5 seconds. The current default is `None`, which
means we will **never** check for a new version by default. I think that
default is contrary to most users expectations.
This commit is contained in:
Will Jones
2025-03-28 11:04:31 -07:00
committed by GitHub
parent dc8b75feab
commit a547c523c2
20 changed files with 246 additions and 82 deletions

View File

@@ -60,7 +60,7 @@ fn database_new(mut cx: FunctionContext) -> JsResult<JsPromise> {
let mut conn_builder = connect(&path).storage_options(storage_options);
if let Some(interval) = read_consistency_interval {
conn_builder = conn_builder.read_consistency_interval(interval);
conn_builder = conn_builder.read_consistency_interval(Some(interval));
}
rt.spawn(async move {
let database = conn_builder.execute().await;

View File

@@ -12,7 +12,7 @@ use super::{
Catalog, CatalogOptions, CreateDatabaseMode, CreateDatabaseRequest, DatabaseNamesRequest,
OpenDatabaseRequest,
};
use crate::connection::ConnectRequest;
use crate::connection::{ConnectRequest, DEFAULT_READ_CONSISTENCY_INTERVAL};
use crate::database::listing::{ListingDatabase, ListingDatabaseOptions};
use crate::database::{Database, DatabaseOptions};
use crate::error::{CreateDirSnafu, Error, Result};
@@ -214,7 +214,7 @@ impl Catalog for ListingCatalog {
uri: db_uri,
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: None,
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
options: Default::default(),
};
@@ -241,7 +241,7 @@ impl Catalog for ListingCatalog {
uri: db_path.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: None,
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
options: Default::default(),
};
@@ -311,7 +311,7 @@ mod tests {
#[cfg(feature = "remote")]
client_config: Default::default(),
options: Default::default(),
read_consistency_interval: None,
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
};
let catalog = ListingCatalog::connect(&request).await.unwrap();

View File

@@ -36,6 +36,9 @@ pub use lance_encoding::version::LanceFileVersion;
#[cfg(feature = "remote")]
use lance_io::object_store::StorageOptions;
pub(crate) const DEFAULT_READ_CONSISTENCY_INTERVAL: Option<std::time::Duration> =
Some(std::time::Duration::from_secs(5));
/// A builder for configuring a [`Connection::table_names`] operation
pub struct TableNamesBuilder {
parent: Arc<dyn Database>,
@@ -618,14 +621,15 @@ pub struct ConnectRequest {
/// The interval at which to check for updates from other processes.
///
/// If None, then consistency is not checked. For performance
/// reasons, this is the default. For strong consistency, set this to
/// If None, then consistency is not checked. For strong consistency, set this to
/// zero seconds. Then every read will check for updates from other
/// processes. As a compromise, you can set this to a non-zero timedelta
/// for eventual consistency. If more than that interval has passed since
/// the last check, then the table will be checked for updates. Note: this
/// consistency only applies to read operations. Write operations are
/// always consistent.
///
/// The default is 5 seconds.
pub read_consistency_interval: Option<std::time::Duration>,
}
@@ -643,7 +647,7 @@ impl ConnectBuilder {
uri: uri.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: None,
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
options: HashMap::new(),
},
embedding_registry: None,
@@ -782,8 +786,7 @@ impl ConnectBuilder {
/// The interval at which to check for updates from other processes. This
/// only affects LanceDB OSS.
///
/// If left unset, consistency is not checked. For maximum read
/// performance, this is the default. For strong consistency, set this to
/// If left unset, consistency is not checked. For strong consistency, set this to
/// zero seconds. Then every read will check for updates from other processes.
/// As a compromise, set this to a non-zero duration for eventual consistency.
/// If more than that duration has passed since the last read, the read will
@@ -792,13 +795,15 @@ impl ConnectBuilder {
/// This only affects read operations. Write operations are always
/// consistent.
///
/// The default is 5 seconds.
///
/// LanceDB Cloud uses eventual consistency under the hood, and is not
/// currently configurable.
pub fn read_consistency_interval(
mut self,
read_consistency_interval: std::time::Duration,
read_consistency_interval: Option<std::time::Duration>,
) -> Self {
self.request.read_consistency_interval = Some(read_consistency_interval);
self.request.read_consistency_interval = read_consistency_interval;
self
}
@@ -882,7 +887,7 @@ impl CatalogConnectBuilder {
uri: uri.to_string(),
#[cfg(feature = "remote")]
client_config: Default::default(),
read_consistency_interval: None,
read_consistency_interval: DEFAULT_READ_CONSISTENCY_INTERVAL,
options: HashMap::new(),
},
}

View File

@@ -2611,7 +2611,7 @@ mod tests {
let dataset_path = tmp_dir.path().join("test.lance");
let uri = dataset_path.to_str().unwrap();
let conn = connect(uri)
.read_consistency_interval(Duration::from_secs(0))
.read_consistency_interval(Some(Duration::from_secs(0)))
.execute()
.await
.unwrap();
@@ -2694,7 +2694,7 @@ mod tests {
let dataset_path = tmp_dir.path().join("test.lance");
let uri = dataset_path.to_str().unwrap();
let conn = connect(uri)
.read_consistency_interval(Duration::from_secs(0))
.read_consistency_interval(Some(Duration::from_secs(0)))
.execute()
.await
.unwrap();
@@ -2891,7 +2891,7 @@ mod tests {
let dataset_path = tmp_dir.path().join("test.lance");
let uri = dataset_path.to_str().unwrap();
let conn = connect(uri)
.read_consistency_interval(Duration::from_secs(0))
.read_consistency_interval(Some(Duration::from_secs(0)))
.execute()
.await
.unwrap();
@@ -3462,7 +3462,8 @@ mod tests {
let mut conn2 = ConnectBuilder::new(uri);
if let Some(interval) = interval {
conn2 = conn2.read_consistency_interval(std::time::Duration::from_millis(interval));
conn2 = conn2
.read_consistency_interval(Some(std::time::Duration::from_millis(interval)));
}
let conn2 = conn2.execute().await.unwrap();
let table2 = conn2.open_table("my_table").execute().await.unwrap();
@@ -3498,7 +3499,7 @@ mod tests {
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Duration::from_secs(0))
.read_consistency_interval(Some(Duration::from_secs(0)))
.execute()
.await
.unwrap();
@@ -3519,7 +3520,7 @@ mod tests {
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Duration::from_secs(0))
.read_consistency_interval(Some(Duration::from_secs(0)))
.execute()
.await
.unwrap();
@@ -3594,7 +3595,7 @@ mod tests {
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Duration::from_secs(0))
.read_consistency_interval(Some(Duration::from_secs(0)))
.execute()
.await
.unwrap();
@@ -3656,7 +3657,7 @@ mod tests {
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Duration::from_secs(0))
.read_consistency_interval(Some(Duration::from_secs(0)))
.execute()
.await
.unwrap();

View File

@@ -7,6 +7,7 @@ use std::{
time::{self, Duration, Instant},
};
use futures::FutureExt;
use lance::Dataset;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
@@ -22,13 +23,16 @@ pub struct DatasetConsistencyWrapper(Arc<RwLock<DatasetRef>>);
///
/// The dataset is lazily loaded, and starts off as None. On the first access,
/// the dataset is loaded.
#[derive(Debug, Clone)]
#[derive(Debug)]
enum DatasetRef {
/// In this mode, the dataset is always the latest version.
Latest {
dataset: Dataset,
read_consistency_interval: Option<Duration>,
last_consistency_check: Option<time::Instant>,
/// A background task loading the next version of the dataset. This happens
/// in the background so as not to block the current thread.
refresh_task: Option<tokio::task::JoinHandle<Result<Dataset>>>,
},
/// In this mode, the dataset is a specific version. It cannot be mutated.
TimeTravel { dataset: Dataset, version: u64 },
@@ -41,9 +45,19 @@ impl DatasetRef {
Self::Latest {
dataset,
last_consistency_check,
refresh_task,
..
} => {
dataset.checkout_latest().await?;
// Replace the refresh task
if let Some(refresh_task) = refresh_task {
refresh_task.abort();
}
let mut new_dataset = dataset.clone();
refresh_task.replace(tokio::spawn(async move {
new_dataset.checkout_latest().await?;
Ok(new_dataset)
}));
last_consistency_check.replace(Instant::now());
}
Self::TimeTravel { dataset, version } => {
@@ -57,26 +71,24 @@ impl DatasetRef {
matches!(self, Self::Latest { .. })
}
async fn need_reload(&self) -> Result<bool> {
Ok(match self {
Self::Latest { dataset, .. } => {
dataset.latest_version_id().await? != dataset.version().version
}
Self::TimeTravel { dataset, version } => dataset.version().version != *version,
})
fn strong_consistency(&self) -> bool {
matches!(
self,
Self::Latest { read_consistency_interval: Some(interval), .. }
if interval.as_nanos() == 0
)
}
async fn as_latest(&mut self, read_consistency_interval: Option<Duration>) -> Result<()> {
match self {
Self::Latest { .. } => Ok(()),
Self::TimeTravel { dataset, .. } => {
dataset
.checkout_version(dataset.latest_version_id().await?)
.await?;
dataset.checkout_latest().await?;
*self = Self::Latest {
dataset: dataset.clone(),
read_consistency_interval,
last_consistency_check: Some(Instant::now()),
refresh_task: None,
};
Ok(())
}
@@ -114,13 +126,74 @@ impl DatasetRef {
match self {
Self::Latest {
dataset: ref mut ds,
refresh_task,
last_consistency_check,
..
} => {
*ds = dataset;
if let Some(refresh_task) = refresh_task {
refresh_task.abort();
}
*refresh_task = None;
*last_consistency_check = Some(Instant::now());
}
_ => unreachable!("Dataset should be in latest mode at this point"),
}
}
/// Wait for the background refresh task to complete.
async fn await_refresh(&mut self) -> Result<()> {
if let Self::Latest {
refresh_task: Some(refresh_task),
read_consistency_interval,
..
} = self
{
let dataset = refresh_task.await.expect("Refresh task panicked")?;
*self = Self::Latest {
dataset,
read_consistency_interval: *read_consistency_interval,
last_consistency_check: Some(Instant::now()),
refresh_task: None,
};
}
Ok(())
}
/// Check if background refresh task is done, and if so, update the dataset.
fn check_refresh(&mut self) -> Result<()> {
if let Self::Latest {
refresh_task: Some(refresh_task),
read_consistency_interval,
..
} = self
{
if refresh_task.is_finished() {
let dataset = refresh_task
.now_or_never()
.unwrap()
.expect("Refresh task panicked")?;
*self = Self::Latest {
dataset,
read_consistency_interval: *read_consistency_interval,
last_consistency_check: Some(Instant::now()),
refresh_task: None,
};
}
}
Ok(())
}
fn refresh_is_ready(&self) -> bool {
matches!(
self,
Self::Latest {
refresh_task: Some(refresh_task),
..
}
if refresh_task.is_finished()
)
}
}
impl DatasetConsistencyWrapper {
@@ -130,6 +203,7 @@ impl DatasetConsistencyWrapper {
dataset,
read_consistency_interval,
last_consistency_check: Some(Instant::now()),
refresh_task: None,
})))
}
@@ -188,18 +262,9 @@ impl DatasetConsistencyWrapper {
}
pub async fn reload(&self) -> Result<()> {
if !self.0.read().await.need_reload().await? {
return Ok(());
}
let mut write_guard = self.0.write().await;
// on lock escalation -- check if someone else has already reloaded
if !write_guard.need_reload().await? {
return Ok(());
}
// actually need reloading
write_guard.reload().await
write_guard.reload().await?;
write_guard.await_refresh().await
}
/// Returns the version, if in time travel mode, or None otherwise
@@ -245,9 +310,26 @@ impl DatasetConsistencyWrapper {
/// Ensures that the dataset is loaded and up-to-date with consistency and
/// version parameters.
async fn ensure_up_to_date(&self) -> Result<()> {
// We may have previously created a background task to fetch the new
// version of the dataset. If that task is done, we should update the
// dataset.
{
let read_guard = self.0.read().await;
if read_guard.refresh_is_ready() {
drop(read_guard);
self.0.write().await.check_refresh()?;
}
}
if !self.is_up_to_date().await? {
self.reload().await?;
}
// If we are in strong consistency mode, we should await the refresh task.
if self.0.read().await.strong_consistency() {
self.0.write().await.await_refresh().await?;
}
Ok(())
}
}