feat: support for checkout and checkout_latest in remote rust and python sdks

This commit is contained in:
albertlockett
2024-11-19 17:24:28 -05:00
parent b2f88f0b29
commit 131c01d702
3 changed files with 70 additions and 11 deletions

View File

@@ -86,6 +86,12 @@ class RemoteTable(Table):
"""to_pandas() is not yet supported on LanceDB cloud."""
return NotImplementedError("to_pandas() is not yet supported on LanceDB cloud.")
def checkout(self, version):
return self._loop.run_until_complete(self._table.checkout(version))
def checkout_latest(self):
return self._loop.run_until_complete(self._table.checkout_latest())
def list_indices(self):
"""List all the indices on the table"""
return self._loop.run_until_complete(self._table.list_indices())

View File

@@ -1012,6 +1012,18 @@ class Table(ABC):
The names of the columns to drop.
"""
@abstractmethod
def checkout(self):
"""
TODO comments
"""
@abstractmethod
def checkout_latest(self):
"""
TODO comments
"""
@cached_property
def _dataset_uri(self) -> str:
return _table_uri(self._conn.uri, self.name)

View File

@@ -22,6 +22,7 @@ use lance::dataset::scanner::DatasetRecordBatchStream;
use lance::dataset::{ColumnAlteration, NewColumnTransform};
use lance_datafusion::exec::OneShotExec;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use crate::{
connection::NoData,
@@ -43,11 +44,13 @@ pub struct RemoteTable<S: HttpSend = Sender> {
#[allow(dead_code)]
client: RestfulLanceDbClient<S>,
name: String,
version: RwLock<Option<u64>>,
}
impl<S: HttpSend> RemoteTable<S> {
pub fn new(client: RestfulLanceDbClient<S>, name: String) -> Self {
Self { client, name }
Self { client, name, version: RwLock::new(None) }
}
async fn describe(&self) -> Result<TableDescription> {
@@ -251,6 +254,26 @@ impl<S: HttpSend> RemoteTable<S> {
}
}
}
async fn check_mutable(&self) -> Result<()> {
let read_guard = self.version.read().await;
match *read_guard {
None => Ok(()),
Some(version) => Err(Error::NotSupported {
message: format!(
"Cannot mutate table reference fixed at version {}. Call checkout_latest() to get a mutable table reference.",
version
)
})
}
}
async fn current_version(&self) -> Option<u64> {
let read_guard = self.version.read().await;
let version = *read_guard;
version
}
}
#[derive(Deserialize)]
@@ -278,7 +301,7 @@ mod test_utils {
T: Into<reqwest::Body>,
{
let client = client_with_handler(handler);
Self { client, name }
Self { client, name, version: RwLock::new(None) }
}
}
}
@@ -297,17 +320,21 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
async fn version(&self) -> Result<u64> {
self.describe().await.map(|desc| desc.version)
}
async fn checkout(&self, _version: u64) -> Result<()> {
Err(Error::NotSupported {
message: "checkout is not supported on LanceDB cloud.".into(),
})
async fn checkout(&self, version: u64) -> Result<()> {
// TODO check that the version exists
// we can do this when the list_versions changes land
// https://github.com/lancedb/lancedb/pull/1850
let mut write_guard = self.version.write().await;
*write_guard = Some(version);
Ok(())
}
async fn checkout_latest(&self) -> Result<()> {
Err(Error::NotSupported {
message: "checkout is not supported on LanceDB cloud.".into(),
})
let mut write_guard = self.version.write().await;
*write_guard = None;
Ok(())
}
async fn restore(&self) -> Result<()> {
self.check_mutable().await?;
Err(Error::NotSupported {
message: "restore is not supported on LanceDB cloud.".into(),
})
@@ -321,10 +348,14 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
.client
.post(&format!("/v1/table/{}/count_rows/", self.name));
let version = self.current_version().await;
if let Some(filter) = filter {
request = request.json(&serde_json::json!({ "predicate": filter }));
request = request.json(&serde_json::json!({ "predicate": filter, "version": version }));
} else {
request = request.json(&serde_json::json!({}));
let body = serde_json::json!({ "version": version });
println!("body is: {}", body); // TODO delete this
request = request.json(&body);
}
let (request_id, response) = self.client.send(request, true).await?;
@@ -344,6 +375,7 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
add: AddDataBuilder<NoData>,
data: Box<dyn RecordBatchReader + Send>,
) -> Result<()> {
self.check_mutable().await?;
let body = Self::reader_as_body(data)?;
let mut request = self
.client
@@ -421,6 +453,7 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
Ok(DatasetRecordBatchStream::new(stream))
}
async fn update(&self, update: UpdateBuilder) -> Result<u64> {
self.check_mutable().await?;
let request = self
.client
.post(&format!("/v1/table/{}/update/", self.name));
@@ -442,6 +475,7 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
Ok(0) // TODO: support returning number of modified rows once supported in SaaS.
}
async fn delete(&self, predicate: &str) -> Result<()> {
self.check_mutable().await?;
let body = serde_json::json!({ "predicate": predicate });
let request = self
.client
@@ -453,6 +487,7 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
}
async fn create_index(&self, mut index: IndexBuilder) -> Result<()> {
self.check_mutable().await?;
let request = self
.client
.post(&format!("/v1/table/{}/create_index/", self.name));
@@ -531,6 +566,7 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
params: MergeInsertBuilder,
new_data: Box<dyn RecordBatchReader + Send>,
) -> Result<()> {
self.check_mutable().await?;
let query = MergeInsertRequest::try_from(params)?;
let body = Self::reader_as_body(new_data)?;
let request = self
@@ -547,6 +583,7 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
Ok(())
}
async fn optimize(&self, _action: OptimizeAction) -> Result<OptimizeStats> {
self.check_mutable().await?;
Err(Error::NotSupported {
message: "optimize is not supported on LanceDB cloud.".into(),
})
@@ -556,16 +593,19 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
_transforms: NewColumnTransform,
_read_columns: Option<Vec<String>>,
) -> Result<()> {
self.check_mutable().await?;
Err(Error::NotSupported {
message: "add_columns is not yet supported.".into(),
})
}
async fn alter_columns(&self, _alterations: &[ColumnAlteration]) -> Result<()> {
self.check_mutable().await?;
Err(Error::NotSupported {
message: "alter_columns is not yet supported.".into(),
})
}
async fn drop_columns(&self, _columns: &[&str]) -> Result<()> {
self.check_mutable().await?;
Err(Error::NotSupported {
message: "drop_columns is not yet supported.".into(),
})
@@ -648,6 +688,7 @@ impl<S: HttpSend> TableInternal for RemoteTable<S> {
Ok(Some(stats))
}
async fn table_definition(&self) -> Result<TableDefinition> {
self.check_mutable().await?;
Err(Error::NotSupported {
message: "table_definition is not supported on LanceDB cloud.".into(),
})