mirror of
https://github.com/lancedb/lancedb.git
synced 2026-06-30 17:40:40 +00:00
Compare commits
1 Commits
codex/upda
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f6c9d31f98 |
@@ -3,7 +3,7 @@
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use lancedb::{arrow::IntoArrow, ipc::ipc_file_to_batches, table::merge::MergeInsertBuilder};
|
||||
use lancedb::{ipc::ipc_file_to_batches, table::merge::MergeInsertBuilder};
|
||||
use napi::bindgen_prelude::*;
|
||||
use napi_derive::napi;
|
||||
|
||||
@@ -66,11 +66,9 @@ impl NativeMergeInsertBuilder {
|
||||
|
||||
#[napi(catch_unwind)]
|
||||
pub async fn execute(&self, buf: Buffer) -> napi::Result<MergeResult> {
|
||||
let data = ipc_file_to_batches(buf.to_vec())
|
||||
.and_then(IntoArrow::into_arrow)
|
||||
.map_err(|e| {
|
||||
napi::Error::from_reason(format!("Failed to read IPC file: {}", convert_error(&e)))
|
||||
})?;
|
||||
let data = ipc_file_to_batches(buf.to_vec()).map_err(|e| {
|
||||
napi::Error::from_reason(format!("Failed to read IPC file: {}", convert_error(&e)))
|
||||
})?;
|
||||
|
||||
let this = self.clone();
|
||||
|
||||
|
||||
@@ -166,6 +166,10 @@ required-features = ["bedrock"]
|
||||
[[example]]
|
||||
name = "simple"
|
||||
|
||||
[[example]]
|
||||
name = "polars"
|
||||
required-features = ["polars"]
|
||||
|
||||
[[example]]
|
||||
name = "full_text_search"
|
||||
|
||||
|
||||
47
rust/lancedb/examples/polars.rs
Normal file
47
rust/lancedb/examples/polars.rs
Normal file
@@ -0,0 +1,47 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
// SPDX-FileCopyrightText: Copyright The LanceDB Authors
|
||||
|
||||
//! This example demonstrates ingesting a Polars DataFrame into LanceDB and
|
||||
//! reading it back out as a Polars DataFrame.
|
||||
|
||||
use lancedb::arrow::IntoPolars;
|
||||
use lancedb::query::ExecutableQuery;
|
||||
use lancedb::{Result, connect};
|
||||
use polars::prelude::{DataFrame, NamedFrom, Series};
|
||||
|
||||
fn make_dataframe() -> DataFrame {
|
||||
let ids = Series::new("id", &[1i32, 2, 3, 4, 5]);
|
||||
let names = Series::new("name", &["Alice", "Bob", "Carol", "Dave", "Eve"]);
|
||||
let scores = Series::new("score", &[9.5f64, 8.1, 7.3, 9.0, 6.5]);
|
||||
DataFrame::new(vec![ids, names, scores]).unwrap()
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let db = connect(tmp.path().to_str().unwrap()).execute().await?;
|
||||
|
||||
// Ingest a Polars DataFrame directly — DataFrame now implements Scannable.
|
||||
let df = make_dataframe();
|
||||
println!("Input DataFrame:\n{df}");
|
||||
|
||||
let table = db.create_table("people", df).execute().await?;
|
||||
|
||||
// Append more rows.
|
||||
let more = DataFrame::new(vec![
|
||||
Series::new("id", &[6i32, 7]),
|
||||
Series::new("name", &["Frank", "Grace"]),
|
||||
Series::new("score", &[7.8f64, 8.9]),
|
||||
])
|
||||
.unwrap();
|
||||
table.add(more).execute().await?;
|
||||
|
||||
// Read back as a Polars DataFrame.
|
||||
let result_df = table.query().execute().await?.into_polars().await?;
|
||||
|
||||
println!(
|
||||
"\nRound-tripped DataFrame ({} rows):\n{result_df}",
|
||||
result_df.height()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
@@ -112,54 +112,14 @@ impl<S: Stream<Item = Result<arrow_array::RecordBatch>>> RecordBatchStream
|
||||
|
||||
/// A trait for converting incoming data to Arrow
|
||||
///
|
||||
/// Integrations should implement this trait to allow data to be
|
||||
/// imported directly from the integration. For example, implementing
|
||||
/// this trait for `Vec<Vec<...>>` would allow the `Vec` to be directly
|
||||
/// used in methods like [`crate::connection::Connection::create_table`]
|
||||
/// or [`crate::table::Table::add`]
|
||||
pub trait IntoArrow {
|
||||
/// Convert the data into an iterator of Arrow batches
|
||||
fn into_arrow(self) -> Result<Box<dyn arrow_array::RecordBatchReader + Send>>;
|
||||
}
|
||||
|
||||
pub type BoxedRecordBatchReader = Box<dyn arrow_array::RecordBatchReader + Send>;
|
||||
|
||||
impl<T: arrow_array::RecordBatchReader + Send + 'static> IntoArrow for T {
|
||||
fn into_arrow(self) -> Result<Box<dyn arrow_array::RecordBatchReader + Send>> {
|
||||
Ok(Box::new(self))
|
||||
}
|
||||
}
|
||||
|
||||
/// A trait for converting incoming data to Arrow asynchronously
|
||||
///
|
||||
/// Serves the same purpose as [`IntoArrow`], but for asynchronous data.
|
||||
///
|
||||
/// Note: Arrow has no async equivalent to RecordBatchReader and so
|
||||
pub trait IntoArrowStream {
|
||||
/// Convert the data into a stream of Arrow batches
|
||||
fn into_arrow(self) -> Result<SendableRecordBatchStream>;
|
||||
}
|
||||
|
||||
impl<S: Stream<Item = Result<arrow_array::RecordBatch>>> SimpleRecordBatchStream<S> {
|
||||
pub fn new(stream: S, schema: Arc<arrow_schema::Schema>) -> Self {
|
||||
Self { schema, stream }
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoArrowStream for SendableRecordBatchStream {
|
||||
fn into_arrow(self) -> Result<SendableRecordBatchStream> {
|
||||
Ok(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoArrowStream for datafusion_physical_plan::SendableRecordBatchStream {
|
||||
fn into_arrow(self) -> Result<SendableRecordBatchStream> {
|
||||
let schema = self.schema();
|
||||
let stream = self.map_err(|df_err| df_err.into());
|
||||
Ok(Box::pin(SimpleRecordBatchStream::new(stream, schema)))
|
||||
}
|
||||
}
|
||||
|
||||
pub trait LanceDbDatagenExt {
|
||||
fn into_ldb_stream(
|
||||
self,
|
||||
@@ -264,9 +224,7 @@ impl IntoPolars for SendableRecordBatchStream {
|
||||
#[cfg(all(test, feature = "polars"))]
|
||||
mod tests {
|
||||
use super::SendableRecordBatchStream;
|
||||
use crate::arrow::{
|
||||
IntoArrow, IntoPolars, PolarsDataFrameRecordBatchReader, SimpleRecordBatchStream,
|
||||
};
|
||||
use crate::arrow::{IntoPolars, PolarsDataFrameRecordBatchReader, SimpleRecordBatchStream};
|
||||
use polars::prelude::{DataFrame, NamedFrom, Series};
|
||||
|
||||
fn get_record_batch_reader_from_polars() -> Box<dyn arrow_array::RecordBatchReader + Send> {
|
||||
@@ -280,10 +238,7 @@ mod tests {
|
||||
float_series = Series::new("float", &[2.0]);
|
||||
let df2 = DataFrame::new(vec![string_series, int_series, float_series]).unwrap();
|
||||
|
||||
PolarsDataFrameRecordBatchReader::new(df1.vstack(&df2).unwrap())
|
||||
.unwrap()
|
||||
.into_arrow()
|
||||
.unwrap()
|
||||
Box::new(PolarsDataFrameRecordBatchReader::new(df1.vstack(&df2).unwrap()).unwrap())
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -185,6 +185,43 @@ impl Scannable for SendableRecordBatchStream {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "polars")]
|
||||
impl Scannable for polars::frame::DataFrame {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
crate::polars_arrow_convertors::convert_polars_df_schema_to_arrow_rb_schema(
|
||||
self.schema().clone(),
|
||||
)
|
||||
.expect("failed to convert Polars DataFrame schema to Arrow schema")
|
||||
}
|
||||
|
||||
fn scan_as_stream(&mut self) -> SendableRecordBatchStream {
|
||||
let schema = Scannable::schema(self);
|
||||
let batches: crate::Result<Vec<RecordBatch>> =
|
||||
match crate::arrow::PolarsDataFrameRecordBatchReader::new(self.clone()) {
|
||||
Err(e) => Err(e),
|
||||
Ok(reader) => reader.map(|b| b.map_err(Into::into)).collect(),
|
||||
};
|
||||
match batches {
|
||||
Err(e) => Box::pin(SimpleRecordBatchStream {
|
||||
schema,
|
||||
stream: once(async move { Err(e) }),
|
||||
}),
|
||||
Ok(batches) => {
|
||||
let stream = futures::stream::iter(batches.into_iter().map(Ok));
|
||||
Box::pin(SimpleRecordBatchStream { schema, stream })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn num_rows(&self) -> Option<usize> {
|
||||
Some(self.height())
|
||||
}
|
||||
|
||||
fn rescannable(&self) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl StreamingWriteSource for Box<dyn Scannable> {
|
||||
fn arrow_schema(&self) -> SchemaRef {
|
||||
@@ -1089,4 +1126,60 @@ mod tests {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "polars")]
|
||||
mod polars_tests {
|
||||
use super::*;
|
||||
use crate::arrow::IntoPolars;
|
||||
use crate::query::ExecutableQuery;
|
||||
use polars::prelude::{DataFrame, NamedFrom, Series};
|
||||
|
||||
fn make_df() -> DataFrame {
|
||||
DataFrame::new(vec![
|
||||
Series::new("id", &[1i32, 2, 3]),
|
||||
Series::new("val", &[1.1f64, 2.2, 3.3]),
|
||||
])
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_dataframe_scannable_round_trip() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let db = crate::connect(tmp.path().to_str().unwrap())
|
||||
.execute()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let df = make_df();
|
||||
let table = db.create_table("t", df.clone()).execute().await.unwrap();
|
||||
|
||||
// Append the same rows again.
|
||||
table.add(df.clone()).execute().await.unwrap();
|
||||
|
||||
let result = table
|
||||
.query()
|
||||
.execute()
|
||||
.await
|
||||
.unwrap()
|
||||
.into_polars()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(result.height(), df.height() * 2);
|
||||
assert_eq!(result.schema(), df.schema());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_dataframe_scannable_rescannable() {
|
||||
let mut df = make_df();
|
||||
assert!(df.rescannable());
|
||||
|
||||
let batches1: Vec<RecordBatch> = df.scan_as_stream().try_collect().await.unwrap();
|
||||
assert_eq!(batches1.iter().map(|b| b.num_rows()).sum::<usize>(), 3);
|
||||
|
||||
// Can be scanned again.
|
||||
let batches2: Vec<RecordBatch> = df.scan_as_stream().try_collect().await.unwrap();
|
||||
assert_eq!(batches2.iter().map(|b| b.num_rows()).sum::<usize>(), 3);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user