From 23f5dddc7c90e9cae3fad7d422ccd608fdb8e0ba Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 17 Jul 2023 17:29:58 -0700 Subject: [PATCH] [Rust] Checkout a version of dataset. (#321) * `Table::open()` from absolute path, and gives the responsibility of organizing metadata out of Table object * Fix Clippy warnings * Add `Table::checkout(version)` API --- python/pyproject.toml | 2 +- rust/ffi/node/src/lib.rs | 15 +--- rust/vectordb/src/database.rs | 47 ++++++----- rust/vectordb/src/index/vector.rs | 6 ++ rust/vectordb/src/table.rs | 135 ++++++++++++++++++------------ 5 files changed, 119 insertions(+), 86 deletions(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index 08eb66fa..9aeab86a 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "lancedb" -version = "0.1.10" +version = "0.1.11" dependencies = ["pylance~=0.5.8", "ratelimiter", "retry", "tqdm", "aiohttp", "pydantic>=2", "attr"] description = "lancedb" authors = [ diff --git a/rust/ffi/node/src/lib.rs b/rust/ffi/node/src/lib.rs index 7fe18e4b..a2b1b5fd 100644 --- a/rust/ffi/node/src/lib.rs +++ b/rust/ffi/node/src/lib.rs @@ -21,7 +21,7 @@ use arrow_array::{Float32Array, RecordBatchIterator}; use arrow_ipc::writer::FileWriter; use async_trait::async_trait; use futures::{TryFutureExt, TryStreamExt}; -use lance::dataset::{ReadParams, WriteMode, WriteParams}; +use lance::dataset::{WriteMode, WriteParams}; use lance::index::vector::MetricType; use lance::io::object_store::ObjectStoreParams; use neon::prelude::*; @@ -33,7 +33,7 @@ use tokio::runtime::Runtime; use vectordb::database::Database; use vectordb::error::Error; -use vectordb::table::{OpenTableParams, Table}; +use vectordb::table::{ReadParams, Table}; use crate::arrow::arrow_buffer_to_record_batch; @@ -177,7 +177,7 @@ fn database_open_table(mut cx: FunctionContext) -> JsResult { Err(err) => return err, }; - let param = ReadParams { + let params = ReadParams { store_options: Some(ObjectStoreParams { aws_credentials: aws_creds, ..ObjectStoreParams::default() @@ -191,14 +191,7 @@ fn database_open_table(mut cx: FunctionContext) -> JsResult { let (deferred, promise) = cx.promise(); rt.spawn(async move { - let table_rst = database - .open_table_with_params( - &table_name, - OpenTableParams { - open_table_params: param, - }, - ) - .await; + let table_rst = database.open_table_with_params(&table_name, ¶ms).await; deferred.settle_with(&channel, move |mut cx| { let table = Arc::new(Mutex::new( diff --git a/rust/vectordb/src/database.rs b/rust/vectordb/src/database.rs index 9200071c..3292df04 100644 --- a/rust/vectordb/src/database.rs +++ b/rust/vectordb/src/database.rs @@ -20,8 +20,10 @@ use lance::dataset::WriteParams; use lance::io::object_store::ObjectStore; use snafu::prelude::*; -use crate::error::{CreateDirSnafu, Result}; -use crate::table::{OpenTableParams, Table}; +use crate::error::{CreateDirSnafu, InvalidTableNameSnafu, Result}; +use crate::table::{ReadParams, Table}; + +pub const LANCE_FILE_EXTENSION: &str = "lance"; pub struct Database { object_store: ObjectStore, @@ -59,7 +61,7 @@ impl Database { fn try_create_dir(path: &str) -> core::result::Result<(), std::io::Error> { let path = Path::new(path); if !path.try_exists()? { - create_dir_all(&path)?; + create_dir_all(path)?; } Ok(()) } @@ -75,20 +77,15 @@ impl Database { .read_dir(self.base_path.clone()) .await? .iter() - .map(|fname| Path::new(fname)) + .map(Path::new) .filter(|path| { let is_lance = path .extension() - .map(|e| e.to_str().map(|e| e == LANCE_EXTENSION)) - .flatten(); + .and_then(|e| e.to_str()) + .map(|e| e == LANCE_EXTENSION); is_lance.unwrap_or(false) }) - .map(|p| { - p.file_stem() - .map(|s| s.to_str().map(|s| String::from(s))) - .flatten() - }) - .flatten() + .filter_map(|p| p.file_stem().and_then(|s| s.to_str().map(String::from))) .collect(); Ok(f) } @@ -105,7 +102,8 @@ impl Database { batches: impl RecordBatchReader + Send + 'static, params: Option, ) -> Result { - Table::create(&self.uri, name, batches, params).await + let table_uri = self.table_uri(name)?; + Table::create(&table_uri, name, batches, params).await } /// Open a table in the database. @@ -117,7 +115,7 @@ impl Database { /// /// * A [Table] object. pub async fn open_table(&self, name: &str) -> Result
{ - self.open_table_with_params(name, OpenTableParams::default()) + self.open_table_with_params(name, &ReadParams::default()) .await } @@ -130,12 +128,9 @@ impl Database { /// # Returns /// /// * A [Table] object. - pub async fn open_table_with_params( - &self, - name: &str, - params: OpenTableParams, - ) -> Result
{ - Table::open_with_params(&self.uri, name, params).await + pub async fn open_table_with_params(&self, name: &str, params: &ReadParams) -> Result
{ + let table_uri = self.table_uri(name)?; + Table::open_with_params(&table_uri, name, params).await } /// Drop a table in the database. @@ -148,6 +143,18 @@ impl Database { self.object_store.remove_dir_all(full_path).await?; Ok(()) } + + /// Get the URI of a table in the database. + fn table_uri(&self, name: &str) -> Result { + let path = Path::new(&self.uri); + let table_uri = path.join(format!("{}.{}", name, LANCE_FILE_EXTENSION)); + + let uri = table_uri + .as_path() + .to_str() + .context(InvalidTableNameSnafu { name })?; + Ok(uri.to_string()) + } } #[cfg(test)] diff --git a/rust/vectordb/src/index/vector.rs b/rust/vectordb/src/index/vector.rs index 36fbd100..bfad66e6 100644 --- a/rust/vectordb/src/index/vector.rs +++ b/rust/vectordb/src/index/vector.rs @@ -35,6 +35,12 @@ pub struct IvfPQIndexBuilder { impl IvfPQIndexBuilder { pub fn new() -> IvfPQIndexBuilder { + Default::default() + } +} + +impl Default for IvfPQIndexBuilder { + fn default() -> Self { IvfPQIndexBuilder { column: None, index_name: None, diff --git a/rust/vectordb/src/table.rs b/rust/vectordb/src/table.rs index ab6e2e08..c2ed2500 100644 --- a/rust/vectordb/src/table.rs +++ b/rust/vectordb/src/table.rs @@ -12,22 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::path::Path; use std::sync::Arc; use arrow_array::{Float32Array, RecordBatchReader}; use arrow_schema::SchemaRef; -use lance::dataset::{Dataset, ReadParams, WriteParams}; +use lance::dataset::{Dataset, WriteParams}; use lance::index::IndexType; -use snafu::prelude::*; +use std::path::Path; -use crate::error::{Error, InvalidTableNameSnafu, Result}; +use crate::error::{Error, Result}; use crate::index::vector::VectorIndexBuilder; use crate::query::Query; use crate::WriteMode; +pub use lance::dataset::ReadParams; + pub const VECTOR_COLUMN_NAME: &str = "vector"; -pub const LANCE_FILE_EXTENSION: &str = "lance"; /// A table in a LanceDB database. #[derive(Debug, Clone)] @@ -43,24 +43,25 @@ impl std::fmt::Display for Table { } } -#[derive(Default)] -pub struct OpenTableParams { - pub open_table_params: ReadParams, -} - impl Table { /// Opens an existing Table /// /// # Arguments /// - /// * `base_path` - The base path where the table is located - /// * `name` The Table name + /// * `uri` - The uri to a [Table] + /// * `name` - The table name /// /// # Returns /// /// * A [Table] object. - pub async fn open(base_uri: &str, name: &str) -> Result { - Self::open_with_params(base_uri, name, OpenTableParams::default()).await + pub async fn open(uri: &str) -> Result { + let name = Self::get_table_name(uri)?; + Self::open_with_params(uri, &name, &ReadParams::default()).await + } + + /// Open an Table with a given name. + pub async fn open_with_name(uri: &str, name: &str) -> Result { + Self::open_with_params(uri, name, &ReadParams::default()).await } /// Opens an existing Table @@ -69,25 +70,13 @@ impl Table { /// /// * `base_path` - The base path where the table is located /// * `name` The Table name - /// * `params` The [OpenTableParams] to use when opening the table + /// * `params` The [ReadParams] to use when opening the table /// /// # Returns /// /// * A [Table] object. - pub async fn open_with_params( - base_uri: &str, - name: &str, - params: OpenTableParams, - ) -> Result { - let path = Path::new(base_uri); - - let table_uri = path.join(format!("{}.{}", name, LANCE_FILE_EXTENSION)); - let uri = table_uri - .as_path() - .to_str() - .context(InvalidTableNameSnafu { name })?; - - let dataset = Dataset::open_with_params(uri, ¶ms.open_table_params) + pub async fn open_with_params(uri: &str, name: &str, params: &ReadParams) -> Result { + let dataset = Dataset::open_with_params(uri, params) .await .map_err(|e| match e { lance::Error::DatasetNotFound { .. } => Error::TableNotFound { @@ -104,31 +93,73 @@ impl Table { }) } + /// Checkout a specific version of this [`Table`] + /// + pub async fn checkout(uri: &str, version: u64) -> Result { + let name = Self::get_table_name(uri)?; + Self::checkout_with_params(uri, &name, version, &ReadParams::default()).await + } + + pub async fn checkout_with_name(uri: &str, name: &str, version: u64) -> Result { + Self::checkout_with_params(uri, name, version, &ReadParams::default()).await + } + + pub async fn checkout_with_params( + uri: &str, + name: &str, + version: u64, + params: &ReadParams, + ) -> Result { + let dataset = Dataset::checkout_with_params(uri, version, params) + .await + .map_err(|e| match e { + lance::Error::DatasetNotFound { .. } => Error::TableNotFound { + name: name.to_string(), + }, + e => Error::Lance { + message: e.to_string(), + }, + })?; + Ok(Table { + name: name.to_string(), + uri: uri.to_string(), + dataset: Arc::new(dataset), + }) + } + + fn get_table_name(uri: &str) -> Result { + let path = Path::new(uri); + let name = path + .file_stem() + .ok_or(Error::TableNotFound { + name: uri.to_string(), + })? + .to_str() + .ok_or(Error::InvalidTableName { + name: uri.to_string(), + })?; + Ok(name.to_string()) + } + /// Creates a new Table /// /// # Arguments /// - /// * `base_path` - The base path where the table is located + /// * `uri` - The URI to the table. /// * `name` The Table name - /// * `batches` RecordBatch to be saved in the database + /// * `batches` RecordBatch to be saved in the database. + /// * `params` - Write parameters. /// /// # Returns /// /// * A [Table] object. pub async fn create( - base_uri: &str, + uri: &str, name: &str, batches: impl RecordBatchReader + Send + 'static, params: Option, ) -> Result { - let base_path = Path::new(base_uri); - let table_uri = base_path.join(format!("{}.{}", name, LANCE_FILE_EXTENSION)); - let uri = table_uri - .as_path() - .to_str() - .context(InvalidTableNameSnafu { name })? - .to_string(); - let dataset = Dataset::write(batches, &uri, params) + let dataset = Dataset::write(batches, uri, params) .await .map_err(|e| match e { lance::Error::DatasetAlreadyExists { .. } => Error::TableAlreadyExists { @@ -140,7 +171,7 @@ impl Table { })?; Ok(Table { name: name.to_string(), - uri, + uri: uri.to_string(), dataset: Arc::new(dataset), }) } @@ -264,14 +295,13 @@ mod tests { async fn test_open() { let tmp_dir = tempdir().unwrap(); let dataset_path = tmp_dir.path().join("test.lance"); - let uri = tmp_dir.path().to_str().unwrap(); let batches = make_test_batches(); Dataset::write(batches, dataset_path.to_str().unwrap(), None) .await .unwrap(); - let table = Table::open(uri, "test").await.unwrap(); + let table = Table::open(dataset_path.to_str().unwrap()).await.unwrap(); assert_eq!(table.name, "test") } @@ -280,7 +310,7 @@ mod tests { async fn test_open_not_found() { let tmp_dir = tempdir().unwrap(); let uri = tmp_dir.path().to_str().unwrap(); - let table = Table::open(uri, "test").await; + let table = Table::open(uri).await; assert!(matches!(table.unwrap_err(), Error::TableNotFound { .. })); } @@ -371,14 +401,14 @@ mod tests { async fn test_search() { let tmp_dir = tempdir().unwrap(); let dataset_path = tmp_dir.path().join("test.lance"); - let uri = tmp_dir.path().to_str().unwrap(); + let uri = dataset_path.to_str().unwrap(); let batches = make_test_batches(); Dataset::write(batches, dataset_path.to_str().unwrap(), None) .await .unwrap(); - let table = Table::open(uri, "test").await.unwrap(); + let table = Table::open(uri).await.unwrap(); let vector = Float32Array::from_iter_values([0.1, 0.2]); let query = table.search(vector.clone()); @@ -410,7 +440,7 @@ mod tests { async fn test_open_table_options() { let tmp_dir = tempdir().unwrap(); let dataset_path = tmp_dir.path().join("test.lance"); - let uri = tmp_dir.path().to_str().unwrap(); + let uri = dataset_path.to_str().unwrap(); let batches = make_test_batches(); Dataset::write(batches, dataset_path.to_str().unwrap(), None) @@ -421,15 +451,12 @@ mod tests { let mut object_store_params = ObjectStoreParams::default(); object_store_params.object_store_wrapper = Some(wrapper.clone()); - let param = OpenTableParams { - open_table_params: ReadParams { - store_options: Some(object_store_params), - ..ReadParams::default() - }, + let param = ReadParams { + store_options: Some(object_store_params), + ..Default::default() }; - assert!(!wrapper.called()); - let _ = Table::open_with_params(uri, "test", param).await.unwrap(); + let _ = Table::open_with_params(uri, "test", ¶m).await.unwrap(); assert!(wrapper.called()); }