[Rust] Checkout a version of dataset. (#321)

* `Table::open()` from absolute path, and gives the responsibility of
organizing metadata out of Table object
* Fix Clippy warnings
* Add `Table::checkout(version)` API
This commit is contained in:
Lei Xu
2023-07-17 17:29:58 -07:00
committed by GitHub
parent 9748406cba
commit 23f5dddc7c
5 changed files with 119 additions and 86 deletions

View File

@@ -1,6 +1,6 @@
[project]
name = "lancedb"
version = "0.1.10"
version = "0.1.11"
dependencies = ["pylance~=0.5.8", "ratelimiter", "retry", "tqdm", "aiohttp", "pydantic>=2", "attr"]
description = "lancedb"
authors = [

View File

@@ -21,7 +21,7 @@ use arrow_array::{Float32Array, RecordBatchIterator};
use arrow_ipc::writer::FileWriter;
use async_trait::async_trait;
use futures::{TryFutureExt, TryStreamExt};
use lance::dataset::{ReadParams, WriteMode, WriteParams};
use lance::dataset::{WriteMode, WriteParams};
use lance::index::vector::MetricType;
use lance::io::object_store::ObjectStoreParams;
use neon::prelude::*;
@@ -33,7 +33,7 @@ use tokio::runtime::Runtime;
use vectordb::database::Database;
use vectordb::error::Error;
use vectordb::table::{OpenTableParams, Table};
use vectordb::table::{ReadParams, Table};
use crate::arrow::arrow_buffer_to_record_batch;
@@ -177,7 +177,7 @@ fn database_open_table(mut cx: FunctionContext) -> JsResult<JsPromise> {
Err(err) => return err,
};
let param = ReadParams {
let params = ReadParams {
store_options: Some(ObjectStoreParams {
aws_credentials: aws_creds,
..ObjectStoreParams::default()
@@ -191,14 +191,7 @@ fn database_open_table(mut cx: FunctionContext) -> JsResult<JsPromise> {
let (deferred, promise) = cx.promise();
rt.spawn(async move {
let table_rst = database
.open_table_with_params(
&table_name,
OpenTableParams {
open_table_params: param,
},
)
.await;
let table_rst = database.open_table_with_params(&table_name, &params).await;
deferred.settle_with(&channel, move |mut cx| {
let table = Arc::new(Mutex::new(

View File

@@ -20,8 +20,10 @@ use lance::dataset::WriteParams;
use lance::io::object_store::ObjectStore;
use snafu::prelude::*;
use crate::error::{CreateDirSnafu, Result};
use crate::table::{OpenTableParams, Table};
use crate::error::{CreateDirSnafu, InvalidTableNameSnafu, Result};
use crate::table::{ReadParams, Table};
pub const LANCE_FILE_EXTENSION: &str = "lance";
pub struct Database {
object_store: ObjectStore,
@@ -59,7 +61,7 @@ impl Database {
fn try_create_dir(path: &str) -> core::result::Result<(), std::io::Error> {
let path = Path::new(path);
if !path.try_exists()? {
create_dir_all(&path)?;
create_dir_all(path)?;
}
Ok(())
}
@@ -75,20 +77,15 @@ impl Database {
.read_dir(self.base_path.clone())
.await?
.iter()
.map(|fname| Path::new(fname))
.map(Path::new)
.filter(|path| {
let is_lance = path
.extension()
.map(|e| e.to_str().map(|e| e == LANCE_EXTENSION))
.flatten();
.and_then(|e| e.to_str())
.map(|e| e == LANCE_EXTENSION);
is_lance.unwrap_or(false)
})
.map(|p| {
p.file_stem()
.map(|s| s.to_str().map(|s| String::from(s)))
.flatten()
})
.flatten()
.filter_map(|p| p.file_stem().and_then(|s| s.to_str().map(String::from)))
.collect();
Ok(f)
}
@@ -105,7 +102,8 @@ impl Database {
batches: impl RecordBatchReader + Send + 'static,
params: Option<WriteParams>,
) -> Result<Table> {
Table::create(&self.uri, name, batches, params).await
let table_uri = self.table_uri(name)?;
Table::create(&table_uri, name, batches, params).await
}
/// Open a table in the database.
@@ -117,7 +115,7 @@ impl Database {
///
/// * A [Table] object.
pub async fn open_table(&self, name: &str) -> Result<Table> {
self.open_table_with_params(name, OpenTableParams::default())
self.open_table_with_params(name, &ReadParams::default())
.await
}
@@ -130,12 +128,9 @@ impl Database {
/// # Returns
///
/// * A [Table] object.
pub async fn open_table_with_params(
&self,
name: &str,
params: OpenTableParams,
) -> Result<Table> {
Table::open_with_params(&self.uri, name, params).await
pub async fn open_table_with_params(&self, name: &str, params: &ReadParams) -> Result<Table> {
let table_uri = self.table_uri(name)?;
Table::open_with_params(&table_uri, name, params).await
}
/// Drop a table in the database.
@@ -148,6 +143,18 @@ impl Database {
self.object_store.remove_dir_all(full_path).await?;
Ok(())
}
/// Get the URI of a table in the database.
fn table_uri(&self, name: &str) -> Result<String> {
let path = Path::new(&self.uri);
let table_uri = path.join(format!("{}.{}", name, LANCE_FILE_EXTENSION));
let uri = table_uri
.as_path()
.to_str()
.context(InvalidTableNameSnafu { name })?;
Ok(uri.to_string())
}
}
#[cfg(test)]

View File

@@ -35,6 +35,12 @@ pub struct IvfPQIndexBuilder {
impl IvfPQIndexBuilder {
pub fn new() -> IvfPQIndexBuilder {
Default::default()
}
}
impl Default for IvfPQIndexBuilder {
fn default() -> Self {
IvfPQIndexBuilder {
column: None,
index_name: None,

View File

@@ -12,22 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use std::sync::Arc;
use arrow_array::{Float32Array, RecordBatchReader};
use arrow_schema::SchemaRef;
use lance::dataset::{Dataset, ReadParams, WriteParams};
use lance::dataset::{Dataset, WriteParams};
use lance::index::IndexType;
use snafu::prelude::*;
use std::path::Path;
use crate::error::{Error, InvalidTableNameSnafu, Result};
use crate::error::{Error, Result};
use crate::index::vector::VectorIndexBuilder;
use crate::query::Query;
use crate::WriteMode;
pub use lance::dataset::ReadParams;
pub const VECTOR_COLUMN_NAME: &str = "vector";
pub const LANCE_FILE_EXTENSION: &str = "lance";
/// A table in a LanceDB database.
#[derive(Debug, Clone)]
@@ -43,24 +43,25 @@ impl std::fmt::Display for Table {
}
}
#[derive(Default)]
pub struct OpenTableParams {
pub open_table_params: ReadParams,
}
impl Table {
/// Opens an existing Table
///
/// # Arguments
///
/// * `base_path` - The base path where the table is located
/// * `name` The Table name
/// * `uri` - The uri to a [Table]
/// * `name` - The table name
///
/// # Returns
///
/// * A [Table] object.
pub async fn open(base_uri: &str, name: &str) -> Result<Self> {
Self::open_with_params(base_uri, name, OpenTableParams::default()).await
pub async fn open(uri: &str) -> Result<Self> {
let name = Self::get_table_name(uri)?;
Self::open_with_params(uri, &name, &ReadParams::default()).await
}
/// Open an Table with a given name.
pub async fn open_with_name(uri: &str, name: &str) -> Result<Self> {
Self::open_with_params(uri, name, &ReadParams::default()).await
}
/// Opens an existing Table
@@ -69,25 +70,13 @@ impl Table {
///
/// * `base_path` - The base path where the table is located
/// * `name` The Table name
/// * `params` The [OpenTableParams] to use when opening the table
/// * `params` The [ReadParams] to use when opening the table
///
/// # Returns
///
/// * A [Table] object.
pub async fn open_with_params(
base_uri: &str,
name: &str,
params: OpenTableParams,
) -> Result<Self> {
let path = Path::new(base_uri);
let table_uri = path.join(format!("{}.{}", name, LANCE_FILE_EXTENSION));
let uri = table_uri
.as_path()
.to_str()
.context(InvalidTableNameSnafu { name })?;
let dataset = Dataset::open_with_params(uri, &params.open_table_params)
pub async fn open_with_params(uri: &str, name: &str, params: &ReadParams) -> Result<Self> {
let dataset = Dataset::open_with_params(uri, params)
.await
.map_err(|e| match e {
lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
@@ -104,31 +93,73 @@ impl Table {
})
}
/// Checkout a specific version of this [`Table`]
///
pub async fn checkout(uri: &str, version: u64) -> Result<Self> {
let name = Self::get_table_name(uri)?;
Self::checkout_with_params(uri, &name, version, &ReadParams::default()).await
}
pub async fn checkout_with_name(uri: &str, name: &str, version: u64) -> Result<Self> {
Self::checkout_with_params(uri, name, version, &ReadParams::default()).await
}
pub async fn checkout_with_params(
uri: &str,
name: &str,
version: u64,
params: &ReadParams,
) -> Result<Self> {
let dataset = Dataset::checkout_with_params(uri, version, params)
.await
.map_err(|e| match e {
lance::Error::DatasetNotFound { .. } => Error::TableNotFound {
name: name.to_string(),
},
e => Error::Lance {
message: e.to_string(),
},
})?;
Ok(Table {
name: name.to_string(),
uri: uri.to_string(),
dataset: Arc::new(dataset),
})
}
fn get_table_name(uri: &str) -> Result<String> {
let path = Path::new(uri);
let name = path
.file_stem()
.ok_or(Error::TableNotFound {
name: uri.to_string(),
})?
.to_str()
.ok_or(Error::InvalidTableName {
name: uri.to_string(),
})?;
Ok(name.to_string())
}
/// Creates a new Table
///
/// # Arguments
///
/// * `base_path` - The base path where the table is located
/// * `uri` - The URI to the table.
/// * `name` The Table name
/// * `batches` RecordBatch to be saved in the database
/// * `batches` RecordBatch to be saved in the database.
/// * `params` - Write parameters.
///
/// # Returns
///
/// * A [Table] object.
pub async fn create(
base_uri: &str,
uri: &str,
name: &str,
batches: impl RecordBatchReader + Send + 'static,
params: Option<WriteParams>,
) -> Result<Self> {
let base_path = Path::new(base_uri);
let table_uri = base_path.join(format!("{}.{}", name, LANCE_FILE_EXTENSION));
let uri = table_uri
.as_path()
.to_str()
.context(InvalidTableNameSnafu { name })?
.to_string();
let dataset = Dataset::write(batches, &uri, params)
let dataset = Dataset::write(batches, uri, params)
.await
.map_err(|e| match e {
lance::Error::DatasetAlreadyExists { .. } => Error::TableAlreadyExists {
@@ -140,7 +171,7 @@ impl Table {
})?;
Ok(Table {
name: name.to_string(),
uri,
uri: uri.to_string(),
dataset: Arc::new(dataset),
})
}
@@ -264,14 +295,13 @@ mod tests {
async fn test_open() {
let tmp_dir = tempdir().unwrap();
let dataset_path = tmp_dir.path().join("test.lance");
let uri = tmp_dir.path().to_str().unwrap();
let batches = make_test_batches();
Dataset::write(batches, dataset_path.to_str().unwrap(), None)
.await
.unwrap();
let table = Table::open(uri, "test").await.unwrap();
let table = Table::open(dataset_path.to_str().unwrap()).await.unwrap();
assert_eq!(table.name, "test")
}
@@ -280,7 +310,7 @@ mod tests {
async fn test_open_not_found() {
let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap();
let table = Table::open(uri, "test").await;
let table = Table::open(uri).await;
assert!(matches!(table.unwrap_err(), Error::TableNotFound { .. }));
}
@@ -371,14 +401,14 @@ mod tests {
async fn test_search() {
let tmp_dir = tempdir().unwrap();
let dataset_path = tmp_dir.path().join("test.lance");
let uri = tmp_dir.path().to_str().unwrap();
let uri = dataset_path.to_str().unwrap();
let batches = make_test_batches();
Dataset::write(batches, dataset_path.to_str().unwrap(), None)
.await
.unwrap();
let table = Table::open(uri, "test").await.unwrap();
let table = Table::open(uri).await.unwrap();
let vector = Float32Array::from_iter_values([0.1, 0.2]);
let query = table.search(vector.clone());
@@ -410,7 +440,7 @@ mod tests {
async fn test_open_table_options() {
let tmp_dir = tempdir().unwrap();
let dataset_path = tmp_dir.path().join("test.lance");
let uri = tmp_dir.path().to_str().unwrap();
let uri = dataset_path.to_str().unwrap();
let batches = make_test_batches();
Dataset::write(batches, dataset_path.to_str().unwrap(), None)
@@ -421,15 +451,12 @@ mod tests {
let mut object_store_params = ObjectStoreParams::default();
object_store_params.object_store_wrapper = Some(wrapper.clone());
let param = OpenTableParams {
open_table_params: ReadParams {
store_options: Some(object_store_params),
..ReadParams::default()
},
let param = ReadParams {
store_options: Some(object_store_params),
..Default::default()
};
assert!(!wrapper.called());
let _ = Table::open_with_params(uri, "test", param).await.unwrap();
let _ = Table::open_with_params(uri, "test", &param).await.unwrap();
assert!(wrapper.called());
}