From 0f58bd7af29c2837ff39097a51b784440a845af7 Mon Sep 17 00:00:00 2001 From: Rob Meng Date: Wed, 28 Jun 2023 11:20:09 -0400 Subject: [PATCH] allow passing ReadParams to dataset when opening a table (#234) Plumb thru object store construction hook from [lance/pull/1014](https://github.com/lancedb/lance/pull/1014) --- Cargo.lock | 4 +- rust/ffi/node/Cargo.toml | 2 +- rust/vectordb/Cargo.toml | 2 +- rust/vectordb/src/table.rs | 94 ++++++++++++++++++++++++++++++++++---- 4 files changed, 89 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 776162e4c..9b54a7188 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1636,9 +1636,9 @@ dependencies = [ [[package]] name = "lance" -version = "0.5.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84dfe2a2af3e7b079a4743e303617c6ac19f43d212b7d6def8873305266f2bcd" +checksum = "76dad119202267ad3f2a5f5dd4b38a9ce7f66c13c14005c2588f9a18da7f25ab" dependencies = [ "arrow", "arrow-arith", diff --git a/rust/ffi/node/Cargo.toml b/rust/ffi/node/Cargo.toml index 4d7855b79..da6cdba50 100644 --- a/rust/ffi/node/Cargo.toml +++ b/rust/ffi/node/Cargo.toml @@ -15,7 +15,7 @@ arrow-ipc = "40.0" arrow-schema = "40.0" once_cell = "1" futures = "0.3" -lance = "0.5.0" +lance = "0.5.1" vectordb = { path = "../../vectordb" } tokio = { version = "1.23", features = ["rt-multi-thread"] } neon = {version = "0.10.1", default-features = false, features = ["channel-api", "napi-6", "promise-api", "task-api"] } diff --git a/rust/vectordb/Cargo.toml b/rust/vectordb/Cargo.toml index 5e822af66..51d6680da 100644 --- a/rust/vectordb/Cargo.toml +++ b/rust/vectordb/Cargo.toml @@ -14,7 +14,7 @@ arrow-data = "40.0" arrow-schema = "40.0" object_store = "0.6.1" snafu = "0.7.4" -lance = "0.5.0" +lance = "0.5.1" tokio = { version = "1.23", features = ["rt-multi-thread"] } [dev-dependencies] diff --git a/rust/vectordb/src/table.rs b/rust/vectordb/src/table.rs index ab62b598f..d441f2306 100644 --- a/rust/vectordb/src/table.rs +++ b/rust/vectordb/src/table.rs @@ -16,7 +16,7 @@ use std::path::Path; use std::sync::Arc; use arrow_array::{Float32Array, RecordBatchReader}; -use lance::dataset::{Dataset, WriteMode, WriteParams}; +use lance::dataset::{Dataset, ReadParams, WriteMode, WriteParams}; use lance::index::IndexType; use snafu::prelude::*; @@ -41,6 +41,11 @@ impl std::fmt::Display for Table { } } +#[derive(Default)] +pub struct OpenTableParams { + pub(crate) dataset_read_params: ReadParams, +} + impl Table { /// Opens an existing Table /// @@ -53,6 +58,25 @@ impl Table { /// /// * A [Table] object. pub async fn open(base_uri: &str, name: &str) -> Result { + Self::open_with_params(base_uri, name, OpenTableParams::default()).await + } + + /// Opens an existing Table + /// + /// # Arguments + /// + /// * `base_path` - The base path where the table is located + /// * `name` The Table name + /// * `params` The [OpenTableParams] to use when opening the table + /// + /// # Returns + /// + /// * A [Table] object. + pub async fn open_with_params( + base_uri: &str, + name: &str, + params: OpenTableParams, + ) -> Result { let path = Path::new(base_uri); let table_uri = path.join(format!("{}.{}", name, LANCE_FILE_EXTENSION)); @@ -61,14 +85,16 @@ impl Table { .to_str() .context(InvalidTableNameSnafu { name })?; - let dataset = Dataset::open(&uri).await.map_err(|e| match e { - lance::Error::DatasetNotFound { .. } => Error::TableNotFound { - name: name.to_string(), - }, - e => Error::Lance { - message: e.to_string(), - }, - })?; + let dataset = Dataset::open_with_params(uri, ¶ms.dataset_read_params) + .await + .map_err(|e| match e { + lance::Error::DatasetNotFound { .. } => Error::TableNotFound { + name: name.to_string(), + }, + e => Error::Lance { + message: e.to_string(), + }, + })?; Ok(Table { name: name.to_string(), uri: uri.to_string(), @@ -200,6 +226,7 @@ impl Table { #[cfg(test)] mod tests { + use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use arrow_array::{ @@ -211,6 +238,7 @@ mod tests { use lance::dataset::{Dataset, WriteMode}; use lance::index::vector::ivf::IvfBuildParams; use lance::index::vector::pq::PQBuildParams; + use lance::io::object_store::{ObjectStoreParams, WrappingObjectStore}; use rand::Rng; use tempfile::tempdir; @@ -331,6 +359,54 @@ mod tests { assert_eq!(vector, query.query_vector); } + #[derive(Default)] + struct NoOpCacheWrapper { + called: AtomicBool, + } + + impl NoOpCacheWrapper { + fn called(&self) -> bool { + self.called.load(Ordering::Relaxed) + } + } + + impl WrappingObjectStore for NoOpCacheWrapper { + fn wrap( + &self, + original: Arc, + ) -> Arc { + self.called.store(true, Ordering::Relaxed); + return original; + } + } + + #[tokio::test] + async fn test_open_table_options() { + let tmp_dir = tempdir().unwrap(); + let dataset_path = tmp_dir.path().join("test.lance"); + let uri = tmp_dir.path().to_str().unwrap(); + + let mut batches: Box = Box::new(make_test_batches()); + Dataset::write(&mut batches, dataset_path.to_str().unwrap(), None) + .await + .unwrap(); + + let wrapper = Arc::new(NoOpCacheWrapper::default()); + + let param = OpenTableParams { + dataset_read_params: ReadParams { + store_options: Some(ObjectStoreParams { + object_store_wrapper: Some(wrapper.clone()), + }), + ..ReadParams::default() + }, + }; + + assert!(!wrapper.called()); + let _ = Table::open_with_params(uri, "test", param).await.unwrap(); + assert!(wrapper.called()); + } + fn make_test_batches() -> RecordBatchBuffer { let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])); RecordBatchBuffer::new(vec![RecordBatch::try_new(