From 47daf9b7b04fd2755cca61dd32bdd0d4e705197a Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 12 Mar 2024 09:20:23 -0700 Subject: [PATCH] feat: add time travel operations to the async API (#1070) --- nodejs/__test__/table.test.ts | 45 +++++++ nodejs/lancedb/native.d.ts | 4 + nodejs/lancedb/table.ts | 54 ++++++++ nodejs/src/table.rs | 27 ++++ python/python/lancedb/_lancedb.pyi | 4 + python/python/lancedb/db.py | 2 +- python/python/lancedb/table.py | 56 ++++++++ python/python/tests/test_table.py | 34 +++++ python/src/table.rs | 30 +++++ rust/lancedb/src/connection.rs | 9 ++ rust/lancedb/src/remote/table.rs | 12 ++ rust/lancedb/src/table.rs | 204 +++++++++++++++++++---------- rust/lancedb/src/table/dataset.rs | 78 ++++++++--- 13 files changed, 472 insertions(+), 87 deletions(-) diff --git a/nodejs/__test__/table.test.ts b/nodejs/__test__/table.test.ts index 93e8081f..e9e465ec 100644 --- a/nodejs/__test__/table.test.ts +++ b/nodejs/__test__/table.test.ts @@ -332,3 +332,48 @@ describe("schema evolution", function () { expect(await table.schema()).toEqual(expectedSchema); }); }); + +describe("when dealing with versioning", () => { + let tmpDir: tmp.DirResult; + beforeEach(() => { + tmpDir = tmp.dirSync({ unsafeCleanup: true }); + }); + afterEach(() => { + tmpDir.removeCallback(); + }); + + it("can travel in time", async () => { + // Setup + const con = await connect(tmpDir.name); + const table = await con.createTable("vectors", [ + { id: 1n, vector: [0.1, 0.2] }, + ]); + const version = await table.version(); + await table.add([{ id: 2n, vector: [0.1, 0.2] }]); + expect(await table.countRows()).toBe(2); + // Make sure we can rewind + await table.checkout(version); + expect(await table.countRows()).toBe(1); + // Can't add data in time travel mode + await expect(table.add([{ id: 3n, vector: [0.1, 0.2] }])).rejects.toThrow( + "table cannot be modified when a specific version is checked out", + ); + // Can go back to normal mode + await table.checkoutLatest(); + expect(await table.countRows()).toBe(2); + // Should be able to add data again + await table.add([{ id: 2n, vector: [0.1, 0.2] }]); + expect(await table.countRows()).toBe(3); + // Now checkout and restore + await table.checkout(version); + await table.restore(); + expect(await table.countRows()).toBe(1); + // Should be able to add data + await table.add([{ id: 2n, vector: [0.1, 0.2] }]); + expect(await table.countRows()).toBe(2); + // Can't use restore if not checked out + await expect(table.restore()).rejects.toThrow( + "checkout before running restore", + ); + }); +}); diff --git a/nodejs/lancedb/native.d.ts b/nodejs/lancedb/native.d.ts index 9e1f83d3..f208b66d 100644 --- a/nodejs/lancedb/native.d.ts +++ b/nodejs/lancedb/native.d.ts @@ -117,4 +117,8 @@ export class Table { addColumns(transforms: Array): Promise alterColumns(alterations: Array): Promise dropColumns(columns: Array): Promise + version(): Promise + checkout(version: number): Promise + checkoutLatest(): Promise + restore(): Promise } diff --git a/nodejs/lancedb/table.ts b/nodejs/lancedb/table.ts index 8bb35f94..85bf22e8 100644 --- a/nodejs/lancedb/table.ts +++ b/nodejs/lancedb/table.ts @@ -235,4 +235,58 @@ export class Table { async dropColumns(columnNames: string[]): Promise { await this.inner.dropColumns(columnNames); } + + /** Retrieve the version of the table + * + * LanceDb supports versioning. Every operation that modifies the table increases + * version. As long as a version hasn't been deleted you can `[Self::checkout]` that + * version to view the data at that point. In addition, you can `[Self::restore]` the + * version to replace the current table with a previous version. + */ + async version(): Promise { + return await this.inner.version(); + } + + /** Checks out a specific version of the Table + * + * Any read operation on the table will now access the data at the checked out version. + * As a consequence, calling this method will disable any read consistency interval + * that was previously set. + * + * This is a read-only operation that turns the table into a sort of "view" + * or "detached head". Other table instances will not be affected. To make the change + * permanent you can use the `[Self::restore]` method. + * + * Any operation that modifies the table will fail while the table is in a checked + * out state. + * + * To return the table to a normal state use `[Self::checkout_latest]` + */ + async checkout(version: number): Promise { + await this.inner.checkout(version); + } + + /** Ensures the table is pointing at the latest version + * + * This can be used to manually update a table when the read_consistency_interval is None + * It can also be used to undo a `[Self::checkout]` operation + */ + async checkoutLatest(): Promise { + await this.inner.checkoutLatest(); + } + + /** Restore the table to the currently checked out version + * + * This operation will fail if checkout has not been called previously + * + * This operation will overwrite the latest version of the table with a + * previous version. Any changes made since the checked out version will + * no longer be visible. + * + * Once the operation concludes the table will no longer be in a checked + * out state and the read_consistency_interval, if any, will apply. + */ + async restore(): Promise { + await this.inner.restore(); + } } diff --git a/nodejs/src/table.rs b/nodejs/src/table.rs index 80afd63a..9bfbb4c8 100644 --- a/nodejs/src/table.rs +++ b/nodejs/src/table.rs @@ -214,6 +214,33 @@ impl Table { })?; Ok(()) } + + #[napi] + pub async fn version(&self) -> napi::Result { + self.inner_ref()? + .version() + .await + .map(|val| val as i64) + .default_error() + } + + #[napi] + pub async fn checkout(&self, version: i64) -> napi::Result<()> { + self.inner_ref()? + .checkout(version as u64) + .await + .default_error() + } + + #[napi] + pub async fn checkout_latest(&self) -> napi::Result<()> { + self.inner_ref()?.checkout_latest().await.default_error() + } + + #[napi] + pub async fn restore(&self) -> napi::Result<()> { + self.inner_ref()?.restore().await.default_error() + } } /// A definition of a column alteration. The alteration changes the column at diff --git a/python/python/lancedb/_lancedb.pyi b/python/python/lancedb/_lancedb.pyi index f5e95d30..6605c934 100644 --- a/python/python/lancedb/_lancedb.pyi +++ b/python/python/lancedb/_lancedb.pyi @@ -34,6 +34,10 @@ class Table: async def create_index( self, column: str, config: Optional[Index], replace: Optional[bool] ): ... + async def version(self) -> int: ... + async def checkout(self, version): ... + async def checkout_latest(self): ... + async def restore(self): ... async def connect( uri: str, diff --git a/python/python/lancedb/db.py b/python/python/lancedb/db.py index f4f2a429..486cd30b 100644 --- a/python/python/lancedb/db.py +++ b/python/python/lancedb/db.py @@ -529,7 +529,7 @@ class AsyncConnection(object): on_bad_vectors: Optional[str] = None, fill_value: Optional[float] = None, embedding_functions: Optional[List[EmbeddingFunctionConfig]] = None, - ) -> Table: + ) -> AsyncTable: """Create a [Table][lancedb.table.Table] in the database. Parameters diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index f4e4f8b7..86e9a9c0 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -2354,3 +2354,59 @@ class AsyncTable: The names of the columns to drop. """ raise NotImplementedError + + async def version(self) -> int: + """ + Retrieve the version of the table + + LanceDb supports versioning. Every operation that modifies the table increases + version. As long as a version hasn't been deleted you can `[Self::checkout]` + that version to view the data at that point. In addition, you can + `[Self::restore]` the version to replace the current table with a previous + version. + """ + return await self._inner.version() + + async def checkout(self, version): + """ + Checks out a specific version of the Table + + Any read operation on the table will now access the data at the checked out + version. As a consequence, calling this method will disable any read consistency + interval that was previously set. + + This is a read-only operation that turns the table into a sort of "view" + or "detached head". Other table instances will not be affected. To make the + change permanent you can use the `[Self::restore]` method. + + Any operation that modifies the table will fail while the table is in a checked + out state. + + To return the table to a normal state use `[Self::checkout_latest]` + """ + await self._inner.checkout(version) + + async def checkout_latest(self): + """ + Ensures the table is pointing at the latest version + + This can be used to manually update a table when the read_consistency_interval + is None + It can also be used to undo a `[Self::checkout]` operation + """ + await self._inner.checkout_latest() + + async def restore(self): + """ + Restore the table to the currently checked out version + + This operation will fail if checkout has not been called previously + + This operation will overwrite the latest version of the table with a + previous version. Any changes made since the checked out version will + no longer be visible. + + Once the operation concludes the table will no longer be in a checked + out state and the read_consistency_interval, if any, will apply. + """ + await self._inner.restore() diff --git a/python/python/tests/test_table.py b/python/python/tests/test_table.py index ccad7d58..d04261c2 100644 --- a/python/python/tests/test_table.py +++ b/python/python/tests/test_table.py @@ -974,3 +974,37 @@ def test_drop_columns(tmp_path): table = LanceTable.create(db, "my_table", data=data) table.drop_columns(["category"]) assert table.to_arrow().column_names == ["id"] + + +@pytest.mark.asyncio +async def test_time_travel(db_async: AsyncConnection): + # Setup + table = await db_async.create_table("some_table", data=[{"id": 0}]) + version = await table.version() + await table.add([{"id": 1}]) + assert await table.count_rows() == 2 + # Make sure we can rewind + await table.checkout(version) + assert await table.count_rows() == 1 + # Can't add data in time travel mode + with pytest.raises( + ValueError, + match="table cannot be modified when a specific version is checked out", + ): + await table.add([{"id": 2}]) + # Can go back to normal mode + await table.checkout_latest() + assert await table.count_rows() == 2 + # Should be able to add data again + await table.add([{"id": 3}]) + assert await table.count_rows() == 3 + # Now checkout and restore + await table.checkout(version) + await table.restore() + assert await table.count_rows() == 1 + # Should be able to add data + await table.add([{"id": 4}]) + assert await table.count_rows() == 2 + # Can't use restore if not checked out + with pytest.raises(ValueError, match="checkout before running restore"): + await table.restore() diff --git a/python/src/table.rs b/python/src/table.rs index 93f4fda2..5231204f 100644 --- a/python/src/table.rs +++ b/python/src/table.rs @@ -109,4 +109,34 @@ impl Table { Some(inner) => inner.to_string(), } } + + pub fn version(self_: PyRef<'_, Self>) -> PyResult<&PyAny> { + let inner = self_.inner_ref()?.clone(); + future_into_py( + self_.py(), + async move { inner.version().await.infer_error() }, + ) + } + + pub fn checkout(self_: PyRef<'_, Self>, version: u64) -> PyResult<&PyAny> { + let inner = self_.inner_ref()?.clone(); + future_into_py(self_.py(), async move { + inner.checkout(version).await.infer_error() + }) + } + + pub fn checkout_latest(self_: PyRef<'_, Self>) -> PyResult<&PyAny> { + let inner = self_.inner_ref()?.clone(); + future_into_py(self_.py(), async move { + inner.checkout_latest().await.infer_error() + }) + } + + pub fn restore(self_: PyRef<'_, Self>) -> PyResult<&PyAny> { + let inner = self_.inner_ref()?.clone(); + future_into_py( + self_.py(), + async move { inner.restore().await.infer_error() }, + ) + } } diff --git a/rust/lancedb/src/connection.rs b/rust/lancedb/src/connection.rs index 8394879c..3c540b4f 100644 --- a/rust/lancedb/src/connection.rs +++ b/rust/lancedb/src/connection.rs @@ -356,6 +356,15 @@ pub struct ConnectBuilder { aws_creds: Option, /// The interval at which to check for updates from other processes. + /// + /// If None, then consistency is not checked. For performance + /// reasons, this is the default. For strong consistency, set this to + /// zero seconds. Then every read will check for updates from other + /// processes. As a compromise, you can set this to a non-zero timedelta + /// for eventual consistency. If more than that interval has passed since + /// the last check, then the table will be checked for updates. Note: this + /// consistency only applies to read operations. Write operations are + /// always consistent. read_consistency_interval: Option, } diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index 649e46b4..32779fb5 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -45,6 +45,18 @@ impl TableInternal for RemoteTable { fn name(&self) -> &str { &self.name } + async fn version(&self) -> Result { + todo!() + } + async fn checkout(&self, _version: u64) -> Result<()> { + todo!() + } + async fn checkout_latest(&self) -> Result<()> { + todo!() + } + async fn restore(&self) -> Result<()> { + todo!() + } async fn schema(&self) -> Result { todo!() } diff --git a/rust/lancedb/src/table.rs b/rust/lancedb/src/table.rs index 80a212e6..a51144dc 100644 --- a/rust/lancedb/src/table.rs +++ b/rust/lancedb/src/table.rs @@ -177,6 +177,10 @@ pub(crate) trait TableInternal: std::fmt::Display + std::fmt::Debug + Send + Syn ) -> Result<()>; async fn alter_columns(&self, alterations: &[ColumnAlteration]) -> Result<()>; async fn drop_columns(&self, columns: &[&str]) -> Result<()>; + async fn version(&self) -> Result; + async fn checkout(&self, version: u64) -> Result<()>; + async fn checkout_latest(&self) -> Result<()>; + async fn restore(&self) -> Result<()>; } /// A Table is a collection of strong typed Rows. @@ -533,6 +537,56 @@ impl Table { pub async fn drop_columns(&self, columns: &[&str]) -> Result<()> { self.inner.drop_columns(columns).await } + + /// Retrieve the version of the table + /// + /// LanceDb supports versioning. Every operation that modifies the table increases + /// version. As long as a version hasn't been deleted you can `[Self::checkout]` that + /// version to view the data at that point. In addition, you can `[Self::restore]` the + /// version to replace the current table with a previous version. + pub async fn version(&self) -> Result { + self.inner.version().await + } + + /// Checks out a specific version of the Table + /// + /// Any read operation on the table will now access the data at the checked out version. + /// As a consequence, calling this method will disable any read consistency interval + /// that was previously set. + /// + /// This is a read-only operation that turns the table into a sort of "view" + /// or "detached head". Other table instances will not be affected. To make the change + /// permanent you can use the `[Self::restore]` method. + /// + /// Any operation that modifies the table will fail while the table is in a checked + /// out state. + /// + /// To return the table to a normal state use `[Self::checkout_latest]` + pub async fn checkout(&self, version: u64) -> Result<()> { + self.inner.checkout(version).await + } + + /// Ensures the table is pointing at the latest version + /// + /// This can be used to manually update a table when the read_consistency_interval is None + /// It can also be used to undo a `[Self::checkout]` operation + pub async fn checkout_latest(&self) -> Result<()> { + self.inner.checkout_latest().await + } + + /// Restore the table to the currently checked out version + /// + /// This operation will fail if checkout has not been called previously + /// + /// This operation will overwrite the latest version of the table with a + /// previous version. Any changes made since the checked out version will + /// no longer be visible. + /// + /// Once the operation concludes the table will no longer be in a checked + /// out state and the read_consistency_interval, if any, will apply. + pub async fn restore(&self) -> Result<()> { + self.inner.restore().await + } } impl From for Table { @@ -639,55 +693,6 @@ impl NativeTable { }) } - /// Checkout a specific version of this [NativeTable] - pub async fn checkout(uri: &str, version: u64) -> Result { - let name = Self::get_table_name(uri)?; - Self::checkout_with_params(uri, &name, version, None, ReadParams::default(), None).await - } - - pub async fn checkout_with_params( - uri: &str, - name: &str, - version: u64, - write_store_wrapper: Option>, - params: ReadParams, - read_consistency_interval: Option, - ) -> Result { - // patch the params if we have a write store wrapper - let params = match write_store_wrapper.clone() { - Some(wrapper) => params.patch_with_store_wrapper(wrapper)?, - None => params, - }; - let dataset = DatasetBuilder::from_uri(uri) - .with_version(version) - .with_read_params(params) - .load() - .await?; - let dataset = DatasetConsistencyWrapper::new_time_travel(dataset, version); - - Ok(Self { - name: name.to_string(), - uri: uri.to_string(), - dataset, - store_wrapper: write_store_wrapper, - read_consistency_interval, - }) - } - - /// Checkout the latest version of this [NativeTable]. - /// - /// This will force the table to be reloaded from disk, regardless of the - /// `read_consistency_interval` set. - pub async fn checkout_latest(&self) -> Result { - let mut dataset = self.dataset.duplicate().await; - dataset.as_latest(self.read_consistency_interval).await?; - dataset.reload().await?; - Ok(Self { - dataset, - ..self.clone() - }) - } - fn get_table_name(uri: &str) -> Result { let path = Path::new(uri); let name = path @@ -788,11 +793,6 @@ impl NativeTable { .await } - /// Version of this Table - pub async fn version(&self) -> Result { - Ok(self.dataset.get().await?.version().version) - } - async fn optimize_indices(&self, options: &OptimizeOptions) -> Result<()> { info!("LanceDB: optimizing indices: {:?}", options); self.dataset @@ -1046,6 +1046,43 @@ impl TableInternal for NativeTable { self.name.as_str() } + async fn version(&self) -> Result { + Ok(self.dataset.get().await?.version().version) + } + + async fn checkout(&self, version: u64) -> Result<()> { + self.dataset.as_time_travel(version).await + } + + async fn checkout_latest(&self) -> Result<()> { + self.dataset + .as_latest(self.read_consistency_interval) + .await?; + self.dataset.reload().await + } + + async fn restore(&self) -> Result<()> { + let version = + self.dataset + .time_travel_version() + .await + .ok_or_else(|| Error::InvalidInput { + message: "you must run checkout before running restore".to_string(), + })?; + { + // Use get_mut_unchecked as restore is the only "write" operation that is allowed + // when the table is in time travel mode. + // Also, drop the guard after .restore because as_latest will need it + let mut dataset = self.dataset.get_mut_unchecked().await?; + debug_assert_eq!(dataset.version().version, version); + dataset.restore().await?; + } + self.dataset + .as_latest(self.read_consistency_interval) + .await?; + Ok(()) + } + async fn schema(&self) -> Result { let lance_schema = self.dataset.get().await?.schema().clone(); Ok(Arc::new(Schema::from(&lance_schema))) @@ -1077,6 +1114,8 @@ impl TableInternal for NativeTable { None => lance_params, }; + self.dataset.ensure_mutable().await?; + let dataset = Dataset::write(add.data, &self.uri, Some(lance_params)).await?; self.dataset.set_latest(dataset).await; Ok(()) @@ -1972,14 +2011,20 @@ mod tests { Ok(FixedSizeListArray::from(data)) } - #[tokio::test] - async fn test_read_consistency_interval() { + fn some_sample_data() -> impl RecordBatchReader { let batch = RecordBatch::try_new( Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])), vec![Arc::new(Int32Array::from(vec![1]))], ) .unwrap(); + let schema = batch.schema().clone(); + let batch = Ok(batch); + RecordBatchIterator::new(vec![batch], schema) + } + + #[tokio::test] + async fn test_read_consistency_interval() { let intervals = vec![ None, Some(0), @@ -1987,12 +2032,14 @@ mod tests { ]; for interval in intervals { + let data = some_sample_data(); + let tmp_dir = tempdir().unwrap(); let uri = tmp_dir.path().to_str().unwrap(); let conn1 = ConnectBuilder::new(uri).execute().await.unwrap(); let table1 = conn1 - .create_empty_table("my_table", batch.schema()) + .create_empty_table("my_table", data.schema()) .execute() .await .unwrap(); @@ -2007,22 +2054,14 @@ mod tests { assert_eq!(table1.count_rows(None).await.unwrap(), 0); assert_eq!(table2.count_rows(None).await.unwrap(), 0); - table1 - .add(Box::new(RecordBatchIterator::new( - vec![Ok(batch.clone())], - batch.schema(), - ))) - .execute() - .await - .unwrap(); + table1.add(Box::new(data)).execute().await.unwrap(); assert_eq!(table1.count_rows(None).await.unwrap(), 1); match interval { None => { assert_eq!(table2.count_rows(None).await.unwrap(), 0); - let table2_native = - table2.as_native().unwrap().checkout_latest().await.unwrap(); - assert_eq!(table2_native.count_rows(None).await.unwrap(), 1); + table2.checkout_latest().await.unwrap(); + assert_eq!(table2.count_rows(None).await.unwrap(), 1); } Some(0) => { assert_eq!(table2.count_rows(None).await.unwrap(), 1); @@ -2036,4 +2075,33 @@ mod tests { } } } + + #[tokio::test] + async fn test_time_travel_write() { + let tmp_dir = tempdir().unwrap(); + let uri = tmp_dir.path().to_str().unwrap(); + + let conn = ConnectBuilder::new(uri) + .read_consistency_interval(Duration::from_secs(0)) + .execute() + .await + .unwrap(); + let table = conn + .create_table("my_table", Box::new(some_sample_data())) + .execute() + .await + .unwrap(); + let version = table.version().await.unwrap(); + table + .add(Box::new(some_sample_data())) + .execute() + .await + .unwrap(); + table.checkout(version).await.unwrap(); + assert!(table + .add(Box::new(some_sample_data())) + .execute() + .await + .is_err()) + } } diff --git a/rust/lancedb/src/table/dataset.rs b/rust/lancedb/src/table/dataset.rs index 322518e5..91c605e0 100644 --- a/rust/lancedb/src/table/dataset.rs +++ b/rust/lancedb/src/table/dataset.rs @@ -83,6 +83,33 @@ impl DatasetRef { } } + async fn as_time_travel(&mut self, target_version: u64) -> Result<()> { + match self { + Self::Latest { dataset, .. } => { + *self = Self::TimeTravel { + dataset: dataset.checkout_version(target_version).await?, + version: target_version, + }; + } + Self::TimeTravel { dataset, version } => { + if *version != target_version { + *self = Self::TimeTravel { + dataset: dataset.checkout_version(target_version).await?, + version: target_version, + }; + } + } + } + Ok(()) + } + + fn time_travel_version(&self) -> Option { + match self { + Self::Latest { .. } => None, + Self::TimeTravel { version, .. } => Some(*version), + } + } + fn set_latest(&mut self, dataset: Dataset) { match self { Self::Latest { @@ -106,23 +133,6 @@ impl DatasetConsistencyWrapper { }))) } - /// Create a new wrapper in the time travel mode. - pub fn new_time_travel(dataset: Dataset, version: u64) -> Self { - Self(Arc::new(RwLock::new(DatasetRef::TimeTravel { - dataset, - version, - }))) - } - - /// Create an independent copy of self. - /// - /// Unlike Clone, this will track versions independently of the original wrapper and - /// will be tied to a different RwLock. - pub async fn duplicate(&self) -> Self { - let ds_ref = self.0.read().await; - Self(Arc::new(RwLock::new((*ds_ref).clone()))) - } - /// Get an immutable reference to the dataset. pub async fn get(&self) -> Result> { self.ensure_up_to_date().await?; @@ -132,7 +142,19 @@ impl DatasetConsistencyWrapper { } /// Get a mutable reference to the dataset. + /// + /// If the dataset is in time travel mode this will fail pub async fn get_mut(&self) -> Result> { + self.ensure_mutable().await?; + self.ensure_up_to_date().await?; + Ok(DatasetWriteGuard { + guard: self.0.write().await, + }) + } + + /// Get a mutable reference to the dataset without requiring the + /// dataset to be in a Latest mode. + pub async fn get_mut_unchecked(&self) -> Result> { self.ensure_up_to_date().await?; Ok(DatasetWriteGuard { guard: self.0.write().await, @@ -140,7 +162,7 @@ impl DatasetConsistencyWrapper { } /// Convert into a wrapper in latest version mode - pub async fn as_latest(&mut self, read_consistency_interval: Option) -> Result<()> { + pub async fn as_latest(&self, read_consistency_interval: Option) -> Result<()> { self.0 .write() .await @@ -148,6 +170,10 @@ impl DatasetConsistencyWrapper { .await } + pub async fn as_time_travel(&self, target_version: u64) -> Result<()> { + self.0.write().await.as_time_travel(target_version).await + } + /// Provide a known latest version of the dataset. /// /// This is usually done after some write operation, which inherently will @@ -160,6 +186,22 @@ impl DatasetConsistencyWrapper { self.0.write().await.reload().await } + /// Returns the version, if in time travel mode, or None otherwise + pub async fn time_travel_version(&self) -> Option { + self.0.read().await.time_travel_version() + } + + pub async fn ensure_mutable(&self) -> Result<()> { + let dataset_ref = self.0.read().await; + match &*dataset_ref { + DatasetRef::Latest { .. } => Ok(()), + DatasetRef::TimeTravel { .. } => Err(crate::Error::InvalidInput { + message: "table cannot be modified when a specific version is checked out" + .to_string(), + }), + } + } + async fn is_up_to_date(&self) -> Result { let dataset_ref = self.0.read().await; match &*dataset_ref {