allow passing ReadParams to dataset when opening a table (#234)

Plumb thru object store construction hook from
[lance/pull/1014](https://github.com/lancedb/lance/pull/1014)
This commit is contained in:
Rob Meng
2023-06-28 11:20:09 -04:00
committed by GitHub
parent 01abf82808
commit 0f58bd7af2
4 changed files with 89 additions and 13 deletions

4
Cargo.lock generated
View File

@@ -1636,9 +1636,9 @@ dependencies = [
[[package]]
name = "lance"
version = "0.5.0"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84dfe2a2af3e7b079a4743e303617c6ac19f43d212b7d6def8873305266f2bcd"
checksum = "76dad119202267ad3f2a5f5dd4b38a9ce7f66c13c14005c2588f9a18da7f25ab"
dependencies = [
"arrow",
"arrow-arith",

View File

@@ -15,7 +15,7 @@ arrow-ipc = "40.0"
arrow-schema = "40.0"
once_cell = "1"
futures = "0.3"
lance = "0.5.0"
lance = "0.5.1"
vectordb = { path = "../../vectordb" }
tokio = { version = "1.23", features = ["rt-multi-thread"] }
neon = {version = "0.10.1", default-features = false, features = ["channel-api", "napi-6", "promise-api", "task-api"] }

View File

@@ -14,7 +14,7 @@ arrow-data = "40.0"
arrow-schema = "40.0"
object_store = "0.6.1"
snafu = "0.7.4"
lance = "0.5.0"
lance = "0.5.1"
tokio = { version = "1.23", features = ["rt-multi-thread"] }
[dev-dependencies]

View File

@@ -16,7 +16,7 @@ use std::path::Path;
use std::sync::Arc;
use arrow_array::{Float32Array, RecordBatchReader};
use lance::dataset::{Dataset, WriteMode, WriteParams};
use lance::dataset::{Dataset, ReadParams, WriteMode, WriteParams};
use lance::index::IndexType;
use snafu::prelude::*;
@@ -41,6 +41,11 @@ impl std::fmt::Display for Table {
}
}
#[derive(Default)]
pub struct OpenTableParams {
pub(crate) dataset_read_params: ReadParams,
}
impl Table {
/// Opens an existing Table
///
@@ -53,6 +58,25 @@ impl Table {
///
/// * A [Table] object.
pub async fn open(base_uri: &str, name: &str) -> Result<Self> {
Self::open_with_params(base_uri, name, OpenTableParams::default()).await
}
/// Opens an existing Table
///
/// # Arguments
///
/// * `base_path` - The base path where the table is located
/// * `name` The Table name
/// * `params` The [OpenTableParams] to use when opening the table
///
/// # Returns
///
/// * A [Table] object.
pub async fn open_with_params(
base_uri: &str,
name: &str,
params: OpenTableParams,
) -> Result<Self> {
let path = Path::new(base_uri);
let table_uri = path.join(format!("{}.{}", name, LANCE_FILE_EXTENSION));
@@ -61,14 +85,16 @@ impl Table {
.to_str()
.context(InvalidTableNameSnafu { name })?;
let dataset = Dataset::open(&uri).await.map_err(|e| match e {
lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
name: name.to_string(),
},
e => Error::Lance {
message: e.to_string(),
},
})?;
let dataset = Dataset::open_with_params(uri, &params.dataset_read_params)
.await
.map_err(|e| match e {
lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
name: name.to_string(),
},
e => Error::Lance {
message: e.to_string(),
},
})?;
Ok(Table {
name: name.to_string(),
uri: uri.to_string(),
@@ -200,6 +226,7 @@ impl Table {
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use arrow_array::{
@@ -211,6 +238,7 @@ mod tests {
use lance::dataset::{Dataset, WriteMode};
use lance::index::vector::ivf::IvfBuildParams;
use lance::index::vector::pq::PQBuildParams;
use lance::io::object_store::{ObjectStoreParams, WrappingObjectStore};
use rand::Rng;
use tempfile::tempdir;
@@ -331,6 +359,54 @@ mod tests {
assert_eq!(vector, query.query_vector);
}
#[derive(Default)]
struct NoOpCacheWrapper {
called: AtomicBool,
}
impl NoOpCacheWrapper {
fn called(&self) -> bool {
self.called.load(Ordering::Relaxed)
}
}
impl WrappingObjectStore for NoOpCacheWrapper {
fn wrap(
&self,
original: Arc<dyn object_store::ObjectStore>,
) -> Arc<dyn object_store::ObjectStore> {
self.called.store(true, Ordering::Relaxed);
return original;
}
}
#[tokio::test]
async fn test_open_table_options() {
let tmp_dir = tempdir().unwrap();
let dataset_path = tmp_dir.path().join("test.lance");
let uri = tmp_dir.path().to_str().unwrap();
let mut batches: Box<dyn RecordBatchReader> = Box::new(make_test_batches());
Dataset::write(&mut batches, dataset_path.to_str().unwrap(), None)
.await
.unwrap();
let wrapper = Arc::new(NoOpCacheWrapper::default());
let param = OpenTableParams {
dataset_read_params: ReadParams {
store_options: Some(ObjectStoreParams {
object_store_wrapper: Some(wrapper.clone()),
}),
..ReadParams::default()
},
};
assert!(!wrapper.called());
let _ = Table::open_with_params(uri, "test", param).await.unwrap();
assert!(wrapper.called());
}
fn make_test_batches() -> RecordBatchBuffer {
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
RecordBatchBuffer::new(vec![RecordBatch::try_new(