From 131c01d702f70a9d15c94c53eaab666ae38abad4 Mon Sep 17 00:00:00 2001 From: albertlockett Date: Tue, 19 Nov 2024 17:24:28 -0500 Subject: [PATCH] feat: support for checkout and checkout_latest in remote rust and python sdks --- python/python/lancedb/remote/table.py | 6 +++ python/python/lancedb/table.py | 12 +++++ rust/lancedb/src/remote/table.rs | 63 ++++++++++++++++++++++----- 3 files changed, 70 insertions(+), 11 deletions(-) diff --git a/python/python/lancedb/remote/table.py b/python/python/lancedb/remote/table.py index c1106bb0..c897cb6b 100644 --- a/python/python/lancedb/remote/table.py +++ b/python/python/lancedb/remote/table.py @@ -86,6 +86,12 @@ class RemoteTable(Table): """to_pandas() is not yet supported on LanceDB cloud.""" return NotImplementedError("to_pandas() is not yet supported on LanceDB cloud.") + def checkout(self, version): + return self._loop.run_until_complete(self._table.checkout(version)) + + def checkout_latest(self): + return self._loop.run_until_complete(self._table.checkout_latest()) + def list_indices(self): """List all the indices on the table""" return self._loop.run_until_complete(self._table.list_indices()) diff --git a/python/python/lancedb/table.py b/python/python/lancedb/table.py index eee14dd9..bf737c08 100644 --- a/python/python/lancedb/table.py +++ b/python/python/lancedb/table.py @@ -1012,6 +1012,18 @@ class Table(ABC): The names of the columns to drop. """ + @abstractmethod + def checkout(self): + """ + TODO comments + """ + + @abstractmethod + def checkout_latest(self): + """ + TODO comments + """ + @cached_property def _dataset_uri(self) -> str: return _table_uri(self._conn.uri, self.name) diff --git a/rust/lancedb/src/remote/table.rs b/rust/lancedb/src/remote/table.rs index 30fb59e2..c18b0268 100644 --- a/rust/lancedb/src/remote/table.rs +++ b/rust/lancedb/src/remote/table.rs @@ -22,6 +22,7 @@ use lance::dataset::scanner::DatasetRecordBatchStream; use lance::dataset::{ColumnAlteration, NewColumnTransform}; use lance_datafusion::exec::OneShotExec; use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; use crate::{ connection::NoData, @@ -43,11 +44,13 @@ pub struct RemoteTable { #[allow(dead_code)] client: RestfulLanceDbClient, name: String, + + version: RwLock>, } impl RemoteTable { pub fn new(client: RestfulLanceDbClient, name: String) -> Self { - Self { client, name } + Self { client, name, version: RwLock::new(None) } } async fn describe(&self) -> Result { @@ -251,6 +254,26 @@ impl RemoteTable { } } } + + async fn check_mutable(&self) -> Result<()> { + let read_guard = self.version.read().await; + match *read_guard { + None => Ok(()), + Some(version) => Err(Error::NotSupported { + message: format!( + "Cannot mutate table reference fixed at version {}. Call checkout_latest() to get a mutable table reference.", + version + ) + }) + } + } + + async fn current_version(&self) -> Option { + let read_guard = self.version.read().await; + let version = *read_guard; + + version + } } #[derive(Deserialize)] @@ -278,7 +301,7 @@ mod test_utils { T: Into, { let client = client_with_handler(handler); - Self { client, name } + Self { client, name, version: RwLock::new(None) } } } } @@ -297,17 +320,21 @@ impl TableInternal for RemoteTable { async fn version(&self) -> Result { self.describe().await.map(|desc| desc.version) } - async fn checkout(&self, _version: u64) -> Result<()> { - Err(Error::NotSupported { - message: "checkout is not supported on LanceDB cloud.".into(), - }) + async fn checkout(&self, version: u64) -> Result<()> { + // TODO check that the version exists + // we can do this when the list_versions changes land + // https://github.com/lancedb/lancedb/pull/1850 + let mut write_guard = self.version.write().await; + *write_guard = Some(version); + Ok(()) } async fn checkout_latest(&self) -> Result<()> { - Err(Error::NotSupported { - message: "checkout is not supported on LanceDB cloud.".into(), - }) + let mut write_guard = self.version.write().await; + *write_guard = None; + Ok(()) } async fn restore(&self) -> Result<()> { + self.check_mutable().await?; Err(Error::NotSupported { message: "restore is not supported on LanceDB cloud.".into(), }) @@ -321,10 +348,14 @@ impl TableInternal for RemoteTable { .client .post(&format!("/v1/table/{}/count_rows/", self.name)); + let version = self.current_version().await; + if let Some(filter) = filter { - request = request.json(&serde_json::json!({ "predicate": filter })); + request = request.json(&serde_json::json!({ "predicate": filter, "version": version })); } else { - request = request.json(&serde_json::json!({})); + let body = serde_json::json!({ "version": version }); + println!("body is: {}", body); // TODO delete this + request = request.json(&body); } let (request_id, response) = self.client.send(request, true).await?; @@ -344,6 +375,7 @@ impl TableInternal for RemoteTable { add: AddDataBuilder, data: Box, ) -> Result<()> { + self.check_mutable().await?; let body = Self::reader_as_body(data)?; let mut request = self .client @@ -421,6 +453,7 @@ impl TableInternal for RemoteTable { Ok(DatasetRecordBatchStream::new(stream)) } async fn update(&self, update: UpdateBuilder) -> Result { + self.check_mutable().await?; let request = self .client .post(&format!("/v1/table/{}/update/", self.name)); @@ -442,6 +475,7 @@ impl TableInternal for RemoteTable { Ok(0) // TODO: support returning number of modified rows once supported in SaaS. } async fn delete(&self, predicate: &str) -> Result<()> { + self.check_mutable().await?; let body = serde_json::json!({ "predicate": predicate }); let request = self .client @@ -453,6 +487,7 @@ impl TableInternal for RemoteTable { } async fn create_index(&self, mut index: IndexBuilder) -> Result<()> { + self.check_mutable().await?; let request = self .client .post(&format!("/v1/table/{}/create_index/", self.name)); @@ -531,6 +566,7 @@ impl TableInternal for RemoteTable { params: MergeInsertBuilder, new_data: Box, ) -> Result<()> { + self.check_mutable().await?; let query = MergeInsertRequest::try_from(params)?; let body = Self::reader_as_body(new_data)?; let request = self @@ -547,6 +583,7 @@ impl TableInternal for RemoteTable { Ok(()) } async fn optimize(&self, _action: OptimizeAction) -> Result { + self.check_mutable().await?; Err(Error::NotSupported { message: "optimize is not supported on LanceDB cloud.".into(), }) @@ -556,16 +593,19 @@ impl TableInternal for RemoteTable { _transforms: NewColumnTransform, _read_columns: Option>, ) -> Result<()> { + self.check_mutable().await?; Err(Error::NotSupported { message: "add_columns is not yet supported.".into(), }) } async fn alter_columns(&self, _alterations: &[ColumnAlteration]) -> Result<()> { + self.check_mutable().await?; Err(Error::NotSupported { message: "alter_columns is not yet supported.".into(), }) } async fn drop_columns(&self, _columns: &[&str]) -> Result<()> { + self.check_mutable().await?; Err(Error::NotSupported { message: "drop_columns is not yet supported.".into(), }) @@ -648,6 +688,7 @@ impl TableInternal for RemoteTable { Ok(Some(stats)) } async fn table_definition(&self) -> Result { + self.check_mutable().await?; Err(Error::NotSupported { message: "table_definition is not supported on LanceDB cloud.".into(), })