feat: add time travel operations to the async API (#1070)

This commit is contained in:
Weston Pace
2024-03-12 09:20:23 -07:00
parent f822255683
commit 47daf9b7b0
13 changed files with 472 additions and 87 deletions

View File

@@ -356,6 +356,15 @@ pub struct ConnectBuilder {
aws_creds: Option<AwsCredential>,
/// The interval at which to check for updates from other processes.
///
/// If None, then consistency is not checked. For performance
/// reasons, this is the default. For strong consistency, set this to
/// zero seconds. Then every read will check for updates from other
/// processes. As a compromise, you can set this to a non-zero timedelta
/// for eventual consistency. If more than that interval has passed since
/// the last check, then the table will be checked for updates. Note: this
/// consistency only applies to read operations. Write operations are
/// always consistent.
read_consistency_interval: Option<std::time::Duration>,
}

View File

@@ -45,6 +45,18 @@ impl TableInternal for RemoteTable {
fn name(&self) -> &str {
&self.name
}
async fn version(&self) -> Result<u64> {
todo!()
}
async fn checkout(&self, _version: u64) -> Result<()> {
todo!()
}
async fn checkout_latest(&self) -> Result<()> {
todo!()
}
async fn restore(&self) -> Result<()> {
todo!()
}
async fn schema(&self) -> Result<SchemaRef> {
todo!()
}

View File

@@ -177,6 +177,10 @@ pub(crate) trait TableInternal: std::fmt::Display + std::fmt::Debug + Send + Syn
) -> Result<()>;
async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<()>;
async fn drop_columns(&self, columns: &[&str]) -> Result<()>;
async fn version(&self) -> Result<u64>;
async fn checkout(&self, version: u64) -> Result<()>;
async fn checkout_latest(&self) -> Result<()>;
async fn restore(&self) -> Result<()>;
}
/// A Table is a collection of strong typed Rows.
@@ -533,6 +537,56 @@ impl Table {
pub async fn drop_columns(&self, columns: &[&str]) -> Result<()> {
self.inner.drop_columns(columns).await
}
/// Retrieve the version of the table
///
/// LanceDb supports versioning. Every operation that modifies the table increases
/// version. As long as a version hasn't been deleted you can `[Self::checkout]` that
/// version to view the data at that point. In addition, you can `[Self::restore]` the
/// version to replace the current table with a previous version.
pub async fn version(&self) -> Result<u64> {
self.inner.version().await
}
/// Checks out a specific version of the Table
///
/// Any read operation on the table will now access the data at the checked out version.
/// As a consequence, calling this method will disable any read consistency interval
/// that was previously set.
///
/// This is a read-only operation that turns the table into a sort of "view"
/// or "detached head". Other table instances will not be affected. To make the change
/// permanent you can use the `[Self::restore]` method.
///
/// Any operation that modifies the table will fail while the table is in a checked
/// out state.
///
/// To return the table to a normal state use `[Self::checkout_latest]`
pub async fn checkout(&self, version: u64) -> Result<()> {
self.inner.checkout(version).await
}
/// Ensures the table is pointing at the latest version
///
/// This can be used to manually update a table when the read_consistency_interval is None
/// It can also be used to undo a `[Self::checkout]` operation
pub async fn checkout_latest(&self) -> Result<()> {
self.inner.checkout_latest().await
}
/// Restore the table to the currently checked out version
///
/// This operation will fail if checkout has not been called previously
///
/// This operation will overwrite the latest version of the table with a
/// previous version. Any changes made since the checked out version will
/// no longer be visible.
///
/// Once the operation concludes the table will no longer be in a checked
/// out state and the read_consistency_interval, if any, will apply.
pub async fn restore(&self) -> Result<()> {
self.inner.restore().await
}
}
impl From<NativeTable> for Table {
@@ -639,55 +693,6 @@ impl NativeTable {
})
}
/// Checkout a specific version of this [NativeTable]
pub async fn checkout(uri: &str, version: u64) -> Result<Self> {
let name = Self::get_table_name(uri)?;
Self::checkout_with_params(uri, &name, version, None, ReadParams::default(), None).await
}
pub async fn checkout_with_params(
uri: &str,
name: &str,
version: u64,
write_store_wrapper: Option<Arc<dyn WrappingObjectStore>>,
params: ReadParams,
read_consistency_interval: Option<std::time::Duration>,
) -> Result<Self> {
// patch the params if we have a write store wrapper
let params = match write_store_wrapper.clone() {
Some(wrapper) => params.patch_with_store_wrapper(wrapper)?,
None => params,
};
let dataset = DatasetBuilder::from_uri(uri)
.with_version(version)
.with_read_params(params)
.load()
.await?;
let dataset = DatasetConsistencyWrapper::new_time_travel(dataset, version);
Ok(Self {
name: name.to_string(),
uri: uri.to_string(),
dataset,
store_wrapper: write_store_wrapper,
read_consistency_interval,
})
}
/// Checkout the latest version of this [NativeTable].
///
/// This will force the table to be reloaded from disk, regardless of the
/// `read_consistency_interval` set.
pub async fn checkout_latest(&self) -> Result<Self> {
let mut dataset = self.dataset.duplicate().await;
dataset.as_latest(self.read_consistency_interval).await?;
dataset.reload().await?;
Ok(Self {
dataset,
..self.clone()
})
}
fn get_table_name(uri: &str) -> Result<String> {
let path = Path::new(uri);
let name = path
@@ -788,11 +793,6 @@ impl NativeTable {
.await
}
/// Version of this Table
pub async fn version(&self) -> Result<u64> {
Ok(self.dataset.get().await?.version().version)
}
async fn optimize_indices(&self, options: &OptimizeOptions) -> Result<()> {
info!("LanceDB: optimizing indices: {:?}", options);
self.dataset
@@ -1046,6 +1046,43 @@ impl TableInternal for NativeTable {
self.name.as_str()
}
async fn version(&self) -> Result<u64> {
Ok(self.dataset.get().await?.version().version)
}
async fn checkout(&self, version: u64) -> Result<()> {
self.dataset.as_time_travel(version).await
}
async fn checkout_latest(&self) -> Result<()> {
self.dataset
.as_latest(self.read_consistency_interval)
.await?;
self.dataset.reload().await
}
async fn restore(&self) -> Result<()> {
let version =
self.dataset
.time_travel_version()
.await
.ok_or_else(|| Error::InvalidInput {
message: "you must run checkout before running restore".to_string(),
})?;
{
// Use get_mut_unchecked as restore is the only "write" operation that is allowed
// when the table is in time travel mode.
// Also, drop the guard after .restore because as_latest will need it
let mut dataset = self.dataset.get_mut_unchecked().await?;
debug_assert_eq!(dataset.version().version, version);
dataset.restore().await?;
}
self.dataset
.as_latest(self.read_consistency_interval)
.await?;
Ok(())
}
async fn schema(&self) -> Result<SchemaRef> {
let lance_schema = self.dataset.get().await?.schema().clone();
Ok(Arc::new(Schema::from(&lance_schema)))
@@ -1077,6 +1114,8 @@ impl TableInternal for NativeTable {
None => lance_params,
};
self.dataset.ensure_mutable().await?;
let dataset = Dataset::write(add.data, &self.uri, Some(lance_params)).await?;
self.dataset.set_latest(dataset).await;
Ok(())
@@ -1972,14 +2011,20 @@ mod tests {
Ok(FixedSizeListArray::from(data))
}
#[tokio::test]
async fn test_read_consistency_interval() {
fn some_sample_data() -> impl RecordBatchReader {
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
vec![Arc::new(Int32Array::from(vec![1]))],
)
.unwrap();
let schema = batch.schema().clone();
let batch = Ok(batch);
RecordBatchIterator::new(vec![batch], schema)
}
#[tokio::test]
async fn test_read_consistency_interval() {
let intervals = vec![
None,
Some(0),
@@ -1987,12 +2032,14 @@ mod tests {
];
for interval in intervals {
let data = some_sample_data();
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let conn1 = ConnectBuilder::new(uri).execute().await.unwrap();
let table1 = conn1
.create_empty_table("my_table", batch.schema())
.create_empty_table("my_table", data.schema())
.execute()
.await
.unwrap();
@@ -2007,22 +2054,14 @@ mod tests {
assert_eq!(table1.count_rows(None).await.unwrap(), 0);
assert_eq!(table2.count_rows(None).await.unwrap(), 0);
table1
.add(Box::new(RecordBatchIterator::new(
vec![Ok(batch.clone())],
batch.schema(),
)))
.execute()
.await
.unwrap();
table1.add(Box::new(data)).execute().await.unwrap();
assert_eq!(table1.count_rows(None).await.unwrap(), 1);
match interval {
None => {
assert_eq!(table2.count_rows(None).await.unwrap(), 0);
let table2_native =
table2.as_native().unwrap().checkout_latest().await.unwrap();
assert_eq!(table2_native.count_rows(None).await.unwrap(), 1);
table2.checkout_latest().await.unwrap();
assert_eq!(table2.count_rows(None).await.unwrap(), 1);
}
Some(0) => {
assert_eq!(table2.count_rows(None).await.unwrap(), 1);
@@ -2036,4 +2075,33 @@ mod tests {
}
}
}
#[tokio::test]
async fn test_time_travel_write() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let conn = ConnectBuilder::new(uri)
.read_consistency_interval(Duration::from_secs(0))
.execute()
.await
.unwrap();
let table = conn
.create_table("my_table", Box::new(some_sample_data()))
.execute()
.await
.unwrap();
let version = table.version().await.unwrap();
table
.add(Box::new(some_sample_data()))
.execute()
.await
.unwrap();
table.checkout(version).await.unwrap();
assert!(table
.add(Box::new(some_sample_data()))
.execute()
.await
.is_err())
}
}

View File

@@ -83,6 +83,33 @@ impl DatasetRef {
}
}
async fn as_time_travel(&mut self, target_version: u64) -> Result<()> {
match self {
Self::Latest { dataset, .. } => {
*self = Self::TimeTravel {
dataset: dataset.checkout_version(target_version).await?,
version: target_version,
};
}
Self::TimeTravel { dataset, version } => {
if *version != target_version {
*self = Self::TimeTravel {
dataset: dataset.checkout_version(target_version).await?,
version: target_version,
};
}
}
}
Ok(())
}
fn time_travel_version(&self) -> Option<u64> {
match self {
Self::Latest { .. } => None,
Self::TimeTravel { version, .. } => Some(*version),
}
}
fn set_latest(&mut self, dataset: Dataset) {
match self {
Self::Latest {
@@ -106,23 +133,6 @@ impl DatasetConsistencyWrapper {
})))
}
/// Create a new wrapper in the time travel mode.
pub fn new_time_travel(dataset: Dataset, version: u64) -> Self {
Self(Arc::new(RwLock::new(DatasetRef::TimeTravel {
dataset,
version,
})))
}
/// Create an independent copy of self.
///
/// Unlike Clone, this will track versions independently of the original wrapper and
/// will be tied to a different RwLock.
pub async fn duplicate(&self) -> Self {
let ds_ref = self.0.read().await;
Self(Arc::new(RwLock::new((*ds_ref).clone())))
}
/// Get an immutable reference to the dataset.
pub async fn get(&self) -> Result<DatasetReadGuard<'_>> {
self.ensure_up_to_date().await?;
@@ -132,7 +142,19 @@ impl DatasetConsistencyWrapper {
}
/// Get a mutable reference to the dataset.
///
/// If the dataset is in time travel mode this will fail
pub async fn get_mut(&self) -> Result<DatasetWriteGuard<'_>> {
self.ensure_mutable().await?;
self.ensure_up_to_date().await?;
Ok(DatasetWriteGuard {
guard: self.0.write().await,
})
}
/// Get a mutable reference to the dataset without requiring the
/// dataset to be in a Latest mode.
pub async fn get_mut_unchecked(&self) -> Result<DatasetWriteGuard<'_>> {
self.ensure_up_to_date().await?;
Ok(DatasetWriteGuard {
guard: self.0.write().await,
@@ -140,7 +162,7 @@ impl DatasetConsistencyWrapper {
}
/// Convert into a wrapper in latest version mode
pub async fn as_latest(&mut self, read_consistency_interval: Option<Duration>) -> Result<()> {
pub async fn as_latest(&self, read_consistency_interval: Option<Duration>) -> Result<()> {
self.0
.write()
.await
@@ -148,6 +170,10 @@ impl DatasetConsistencyWrapper {
.await
}
pub async fn as_time_travel(&self, target_version: u64) -> Result<()> {
self.0.write().await.as_time_travel(target_version).await
}
/// Provide a known latest version of the dataset.
///
/// This is usually done after some write operation, which inherently will
@@ -160,6 +186,22 @@ impl DatasetConsistencyWrapper {
self.0.write().await.reload().await
}
/// Returns the version, if in time travel mode, or None otherwise
pub async fn time_travel_version(&self) -> Option<u64> {
self.0.read().await.time_travel_version()
}
pub async fn ensure_mutable(&self) -> Result<()> {
let dataset_ref = self.0.read().await;
match &*dataset_ref {
DatasetRef::Latest { .. } => Ok(()),
DatasetRef::TimeTravel { .. } => Err(crate::Error::InvalidInput {
message: "table cannot be modified when a specific version is checked out"
.to_string(),
}),
}
}
async fn is_up_to_date(&self) -> Result<bool> {
let dataset_ref = self.0.read().await;
match &*dataset_ref {