Bump Lance version to 0.5.3 (#250)

This commit is contained in:
Lei Xu
2023-07-04 08:34:41 -07:00
committed by GitHub
parent fc725c99f0
commit 148ed82607
7 changed files with 75 additions and 3844 deletions

3793
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -4,3 +4,11 @@ members = [
"rust/ffi/node" "rust/ffi/node"
] ]
resolver = "2" resolver = "2"
[workspace.dependencies]
lance = "0.5.3"
arrow-array = "40.0"
arrow-data = "40.0"
arrow-schema = "40.0"
arrow-ipc = "40.0"
object_store = "0.6.1"

View File

@@ -10,12 +10,12 @@ exclude = ["index.node"]
crate-type = ["cdylib"] crate-type = ["cdylib"]
[dependencies] [dependencies]
arrow-array = "40.0" arrow-array = { workspace = true }
arrow-ipc = "40.0" arrow-ipc = { workspace = true }
arrow-schema = "40.0" arrow-schema = { workspace = true }
once_cell = "1" once_cell = "1"
futures = "0.3" futures = "0.3"
lance = "0.5.2" lance = { workspace = true }
vectordb = { path = "../../vectordb" } vectordb = { path = "../../vectordb" }
tokio = { version = "1.23", features = ["rt-multi-thread"] } tokio = { version = "1.23", features = ["rt-multi-thread"] }
neon = {version = "0.10.1", default-features = false, features = ["channel-api", "napi-6", "promise-api", "task-api"] } neon = {version = "0.10.1", default-features = false, features = ["channel-api", "napi-6", "promise-api", "task-api"] }

View File

@@ -17,10 +17,9 @@ use std::convert::TryFrom;
use std::ops::Deref; use std::ops::Deref;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use arrow_array::{Float32Array, RecordBatchReader}; use arrow_array::{Float32Array, RecordBatchIterator, RecordBatchReader};
use arrow_ipc::writer::FileWriter; use arrow_ipc::writer::FileWriter;
use futures::{TryFutureExt, TryStreamExt}; use futures::{TryFutureExt, TryStreamExt};
use lance::arrow::RecordBatchBuffer;
use lance::dataset::{WriteMode, WriteParams}; use lance::dataset::{WriteMode, WriteParams};
use lance::index::vector::MetricType; use lance::index::vector::MetricType;
use neon::prelude::*; use neon::prelude::*;
@@ -233,6 +232,7 @@ fn table_create(mut cx: FunctionContext) -> JsResult<JsPromise> {
let table_name = cx.argument::<JsString>(0)?.value(&mut cx); let table_name = cx.argument::<JsString>(0)?.value(&mut cx);
let buffer = cx.argument::<JsBuffer>(1)?; let buffer = cx.argument::<JsBuffer>(1)?;
let batches = arrow_buffer_to_record_batch(buffer.as_slice(&mut cx)); let batches = arrow_buffer_to_record_batch(buffer.as_slice(&mut cx));
let schema = batches[0].schema();
// Write mode // Write mode
let mode = match cx.argument::<JsString>(2)?.value(&mut cx).as_str() { let mode = match cx.argument::<JsString>(2)?.value(&mut cx).as_str() {
@@ -251,7 +251,10 @@ fn table_create(mut cx: FunctionContext) -> JsResult<JsPromise> {
let database = db.database.clone(); let database = db.database.clone();
rt.block_on(async move { rt.block_on(async move {
let batch_reader: Box<dyn RecordBatchReader> = Box::new(RecordBatchBuffer::new(batches)); let batch_reader: Box<dyn RecordBatchReader> = Box::new(RecordBatchIterator::new(
batches.into_iter().map(Ok),
schema,
));
let table_rst = database.create_table(&table_name, batch_reader, Some(params)).await; let table_rst = database.create_table(&table_name, batch_reader, Some(params)).await;
deferred.settle_with(&channel, move |mut cx| { deferred.settle_with(&channel, move |mut cx| {
@@ -275,6 +278,7 @@ fn table_add(mut cx: FunctionContext) -> JsResult<JsPromise> {
let buffer = cx.argument::<JsBuffer>(0)?; let buffer = cx.argument::<JsBuffer>(0)?;
let write_mode = cx.argument::<JsString>(1)?.value(&mut cx); let write_mode = cx.argument::<JsString>(1)?.value(&mut cx);
let batches = arrow_buffer_to_record_batch(buffer.as_slice(&mut cx)); let batches = arrow_buffer_to_record_batch(buffer.as_slice(&mut cx));
let schema = batches[0].schema();
let rt = runtime(&mut cx)?; let rt = runtime(&mut cx)?;
let channel = cx.channel(); let channel = cx.channel();
@@ -284,7 +288,10 @@ fn table_add(mut cx: FunctionContext) -> JsResult<JsPromise> {
let write_mode = write_mode_map.get(write_mode.as_str()).cloned(); let write_mode = write_mode_map.get(write_mode.as_str()).cloned();
rt.block_on(async move { rt.block_on(async move {
let batch_reader: Box<dyn RecordBatchReader> = Box::new(RecordBatchBuffer::new(batches)); let batch_reader: Box<dyn RecordBatchReader> = Box::new(RecordBatchIterator::new(
batches.into_iter().map(Ok),
schema,
));
let add_result = table.lock().unwrap().add(batch_reader, write_mode).await; let add_result = table.lock().unwrap().add(batch_reader, write_mode).await;
deferred.settle_with(&channel, move |mut cx| { deferred.settle_with(&channel, move |mut cx| {

View File

@@ -7,14 +7,13 @@ license = "Apache-2.0"
repository = "https://github.com/lancedb/lancedb" repository = "https://github.com/lancedb/lancedb"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
arrow-array = "40.0" arrow-array = { workspace = true }
arrow-data = "40.0" arrow-data = { workspace = true }
arrow-schema = "40.0" arrow-schema = { workspace = true }
object_store = "0.6.1" object_store = { workspace = true }
snafu = "0.7.4" snafu = "0.7.4"
lance = "0.5.2" lance = { workspace = true }
tokio = { version = "1.23", features = ["rt-multi-thread"] } tokio = { version = "1.23", features = ["rt-multi-thread"] }
[dev-dependencies] [dev-dependencies]

View File

@@ -164,9 +164,8 @@ impl Query {
mod tests { mod tests {
use std::sync::Arc; use std::sync::Arc;
use arrow_array::{Float32Array, RecordBatch, RecordBatchReader}; use arrow_array::{Float32Array, RecordBatch, RecordBatchIterator, RecordBatchReader};
use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
use lance::arrow::RecordBatchBuffer;
use lance::dataset::Dataset; use lance::dataset::Dataset;
use lance::index::vector::MetricType; use lance::index::vector::MetricType;
@@ -174,7 +173,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_setters_getters() { async fn test_setters_getters() {
let mut batches: Box<dyn RecordBatchReader> = Box::new(make_test_batches()); let mut batches: Box<dyn RecordBatchReader> = make_test_batches();
let ds = Dataset::write(&mut batches, "memory://foo", None) let ds = Dataset::write(&mut batches, "memory://foo", None)
.await .await
.unwrap(); .unwrap();
@@ -203,7 +202,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_execute() { async fn test_execute() {
let mut batches: Box<dyn RecordBatchReader> = Box::new(make_test_batches()); let mut batches: Box<dyn RecordBatchReader> = make_test_batches();
let ds = Dataset::write(&mut batches, "memory://foo", None) let ds = Dataset::write(&mut batches, "memory://foo", None)
.await .await
.unwrap(); .unwrap();
@@ -214,7 +213,7 @@ mod tests {
assert_eq!(result.is_ok(), true); assert_eq!(result.is_ok(), true);
} }
fn make_test_batches() -> RecordBatchBuffer { fn make_test_batches() -> Box<dyn RecordBatchReader> {
let dim: usize = 128; let dim: usize = 128;
let schema = Arc::new(ArrowSchema::new(vec![ let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("key", DataType::Int32, false), ArrowField::new("key", DataType::Int32, false),
@@ -228,7 +227,11 @@ mod tests {
), ),
ArrowField::new("uri", DataType::Utf8, true), ArrowField::new("uri", DataType::Utf8, true),
])); ]));
Box::new(RecordBatchIterator::new(
RecordBatchBuffer::new(vec![RecordBatch::new_empty(schema.clone())]) vec![RecordBatch::new_empty(schema.clone())]
.into_iter()
.map(Ok),
schema,
))
} }
} }

View File

@@ -231,11 +231,11 @@ mod tests {
use std::sync::Arc; use std::sync::Arc;
use arrow_array::{ use arrow_array::{
Array, FixedSizeListArray, Float32Array, Int32Array, RecordBatch, RecordBatchReader, Array, FixedSizeListArray, Float32Array, Int32Array, RecordBatch, RecordBatchIterator,
RecordBatchReader,
}; };
use arrow_data::ArrayDataBuilder; use arrow_data::ArrayDataBuilder;
use arrow_schema::{DataType, Field, Schema}; use arrow_schema::{DataType, Field, Schema};
use lance::arrow::RecordBatchBuffer;
use lance::dataset::{Dataset, WriteMode}; use lance::dataset::{Dataset, WriteMode};
use lance::index::vector::ivf::IvfBuildParams; use lance::index::vector::ivf::IvfBuildParams;
use lance::index::vector::pq::PQBuildParams; use lance::index::vector::pq::PQBuildParams;
@@ -252,7 +252,7 @@ mod tests {
let dataset_path = tmp_dir.path().join("test.lance"); let dataset_path = tmp_dir.path().join("test.lance");
let uri = tmp_dir.path().to_str().unwrap(); let uri = tmp_dir.path().to_str().unwrap();
let mut batches: Box<dyn RecordBatchReader> = Box::new(make_test_batches()); let mut batches: Box<dyn RecordBatchReader> = make_test_batches();
Dataset::write(&mut batches, dataset_path.to_str().unwrap(), None) Dataset::write(&mut batches, dataset_path.to_str().unwrap(), None)
.await .await
.unwrap(); .unwrap();
@@ -283,11 +283,11 @@ mod tests {
let tmp_dir = tempdir().unwrap(); let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap(); let uri = tmp_dir.path().to_str().unwrap();
let batches: Box<dyn RecordBatchReader> = Box::new(make_test_batches()); let batches: Box<dyn RecordBatchReader> = make_test_batches();
let _ = batches.schema().clone(); let _ = batches.schema().clone();
Table::create(&uri, "test", batches, None).await.unwrap(); Table::create(&uri, "test", batches, None).await.unwrap();
let batches: Box<dyn RecordBatchReader> = Box::new(make_test_batches()); let batches: Box<dyn RecordBatchReader> = make_test_batches();
let result = Table::create(&uri, "test", batches, None).await; let result = Table::create(&uri, "test", batches, None).await;
assert!(matches!( assert!(matches!(
result.unwrap_err(), result.unwrap_err(),
@@ -300,17 +300,17 @@ mod tests {
let tmp_dir = tempdir().unwrap(); let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap(); let uri = tmp_dir.path().to_str().unwrap();
let batches: Box<dyn RecordBatchReader> = Box::new(make_test_batches()); let batches: Box<dyn RecordBatchReader> = make_test_batches();
let schema = batches.schema().clone(); let schema = batches.schema().clone();
let mut table = Table::create(&uri, "test", batches, None).await.unwrap(); let mut table = Table::create(&uri, "test", batches, None).await.unwrap();
assert_eq!(table.count_rows().await.unwrap(), 10); assert_eq!(table.count_rows().await.unwrap(), 10);
let new_batches: Box<dyn RecordBatchReader> = let new_batches: Box<dyn RecordBatchReader> =
Box::new(RecordBatchBuffer::new(vec![RecordBatch::try_new( Box::new(RecordBatchIterator::new(vec![RecordBatch::try_new(
schema, schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(100..110))], vec![Arc::new(Int32Array::from_iter_values(100..110))],
) )
.unwrap()])); .unwrap()].into_iter().map(Ok), schema.clone()));
table.add(new_batches, None).await.unwrap(); table.add(new_batches, None).await.unwrap();
assert_eq!(table.count_rows().await.unwrap(), 20); assert_eq!(table.count_rows().await.unwrap(), 20);
@@ -322,17 +322,21 @@ mod tests {
let tmp_dir = tempdir().unwrap(); let tmp_dir = tempdir().unwrap();
let uri = tmp_dir.path().to_str().unwrap(); let uri = tmp_dir.path().to_str().unwrap();
let batches: Box<dyn RecordBatchReader> = Box::new(make_test_batches()); let batches: Box<dyn RecordBatchReader> = make_test_batches();
let schema = batches.schema().clone(); let schema = batches.schema().clone();
let mut table = Table::create(uri, "test", batches, None).await.unwrap(); let mut table = Table::create(uri, "test", batches, None).await.unwrap();
assert_eq!(table.count_rows().await.unwrap(), 10); assert_eq!(table.count_rows().await.unwrap(), 10);
let new_batches: Box<dyn RecordBatchReader> = let new_batches: Box<dyn RecordBatchReader> = Box::new(RecordBatchIterator::new(
Box::new(RecordBatchBuffer::new(vec![RecordBatch::try_new( vec![RecordBatch::try_new(
schema, schema.clone(),
vec![Arc::new(Int32Array::from_iter_values(100..110))], vec![Arc::new(Int32Array::from_iter_values(100..110))],
) )
.unwrap()])); .unwrap()]
.into_iter()
.map(Ok),
schema.clone(),
));
table table
.add(new_batches, Some(WriteMode::Overwrite)) .add(new_batches, Some(WriteMode::Overwrite))
@@ -348,7 +352,7 @@ mod tests {
let dataset_path = tmp_dir.path().join("test.lance"); let dataset_path = tmp_dir.path().join("test.lance");
let uri = tmp_dir.path().to_str().unwrap(); let uri = tmp_dir.path().to_str().unwrap();
let mut batches: Box<dyn RecordBatchReader> = Box::new(make_test_batches()); let mut batches: Box<dyn RecordBatchReader> = make_test_batches();
Dataset::write(&mut batches, dataset_path.to_str().unwrap(), None) Dataset::write(&mut batches, dataset_path.to_str().unwrap(), None)
.await .await
.unwrap(); .unwrap();
@@ -387,18 +391,18 @@ mod tests {
let dataset_path = tmp_dir.path().join("test.lance"); let dataset_path = tmp_dir.path().join("test.lance");
let uri = tmp_dir.path().to_str().unwrap(); let uri = tmp_dir.path().to_str().unwrap();
let mut batches: Box<dyn RecordBatchReader> = Box::new(make_test_batches()); let mut batches: Box<dyn RecordBatchReader> = make_test_batches();
Dataset::write(&mut batches, dataset_path.to_str().unwrap(), None) Dataset::write(&mut batches, dataset_path.to_str().unwrap(), None)
.await .await
.unwrap(); .unwrap();
let wrapper = Arc::new(NoOpCacheWrapper::default()); let wrapper = Arc::new(NoOpCacheWrapper::default());
let mut object_store_params = ObjectStoreParams::default();
object_store_params.object_store_wrapper = Some(wrapper.clone());
let param = OpenTableParams { let param = OpenTableParams {
open_table_params: ReadParams { open_table_params: ReadParams {
store_options: Some(ObjectStoreParams { store_options: Some(object_store_params),
object_store_wrapper: Some(wrapper.clone()),
}),
..ReadParams::default() ..ReadParams::default()
}, },
}; };
@@ -408,13 +412,15 @@ mod tests {
assert!(wrapper.called()); assert!(wrapper.called());
} }
fn make_test_batches() -> RecordBatchBuffer { fn make_test_batches() -> Box<dyn RecordBatchReader> {
let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])); let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)]));
RecordBatchBuffer::new(vec![RecordBatch::try_new( Box::new(RecordBatchIterator::new(
schema.clone(), vec![RecordBatch::try_new(
vec![Arc::new(Int32Array::from_iter_values(0..10))], schema.clone(),
) vec![Arc::new(Int32Array::from_iter_values(0..10))],
.unwrap()]) )],
schema,
))
} }
#[tokio::test] #[tokio::test]
@@ -447,11 +453,12 @@ mod tests {
); );
let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap()); let vectors = Arc::new(create_fixed_size_list(float_arr, dimension).unwrap());
let batches = RecordBatchBuffer::new(vec![RecordBatch::try_new( let batches = RecordBatchIterator::new(
schema.clone(), vec![RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap()]
vec![vectors.clone()], .into_iter()
) .map(Ok),
.unwrap()]); schema,
);
let reader: Box<dyn RecordBatchReader + Send> = Box::new(batches); let reader: Box<dyn RecordBatchReader + Send> = Box::new(batches);
let mut table = Table::create(uri, "test", reader, None).await.unwrap(); let mut table = Table::create(uri, "test", reader, None).await.unwrap();