fix: don't use with_schema to remove schema metadata (#2162)

It seems that `RecordBatch::with_schema` is unable to remove schema
metadata from a batch. It fails with the error `target schema is not
superset of current schema`.

I'm not sure how the `test_metadata_erased` test is passing. Strangely,
the metadata was not present by the time the batch arrived at the
metadata eraser. I think maybe the schema metadata is only present in
the batch if there is a filter.

I've created a new unit test that makes sure the metadata is erased if
we have a filter also
This commit is contained in:
Weston Pace
2025-02-27 10:24:00 -08:00
committed by GitHub
parent 8877eb020d
commit fa1b9ad5bd
2 changed files with 85 additions and 7 deletions

8
Cargo.lock generated
View File

@@ -3915,7 +3915,7 @@ dependencies = [
[[package]]
name = "lancedb"
version = "0.17.0"
version = "0.18.0-beta.0"
dependencies = [
"arrow",
"arrow-array",
@@ -4001,7 +4001,7 @@ dependencies = [
[[package]]
name = "lancedb-node"
version = "0.17.0"
version = "0.18.0-beta.0"
dependencies = [
"arrow-array",
"arrow-ipc",
@@ -4026,7 +4026,7 @@ dependencies = [
[[package]]
name = "lancedb-nodejs"
version = "0.17.0"
version = "0.18.0-beta.0"
dependencies = [
"arrow-array",
"arrow-ipc",
@@ -4044,7 +4044,7 @@ dependencies = [
[[package]]
name = "lancedb-python"
version = "0.20.0"
version = "0.21.0-beta.0"
dependencies = [
"arrow",
"env_logger",

View File

@@ -4,6 +4,7 @@
//! This module contains adapters to allow LanceDB tables to be used as DataFusion table providers.
use std::{collections::HashMap, sync::Arc};
use arrow_array::RecordBatch;
use arrow_schema::Schema as ArrowSchema;
use async_trait::async_trait;
use datafusion_catalog::{Session, TableProvider};
@@ -104,7 +105,9 @@ impl ExecutionPlan for MetadataEraserExec {
) -> DataFusionResult<SendableRecordBatchStream> {
let stream = self.input.execute(partition, context)?;
let schema = self.schema.clone();
let stream = stream.map_ok(move |batch| batch.with_schema(schema.clone()).unwrap());
let stream = stream.map_ok(move |batch| {
RecordBatch::try_new(schema.clone(), batch.columns().to_vec()).unwrap()
});
Ok(
Box::pin(RecordBatchStreamAdapter::new(self.schema.clone(), stream))
as SendableRecordBatchStream,
@@ -201,7 +204,8 @@ pub mod tests {
use arrow::array::AsArray;
use arrow_array::{
Int32Array, RecordBatch, RecordBatchIterator, RecordBatchReader, UInt32Array,
BinaryArray, Float64Array, Int32Array, Int64Array, RecordBatch, RecordBatchIterator,
RecordBatchReader, StringArray, UInt32Array,
};
use arrow_schema::{DataType, Field, Schema};
use datafusion::{datasource::provider_as_source, prelude::SessionContext};
@@ -238,9 +242,49 @@ pub mod tests {
)
}
fn make_tbl_two_test_batches() -> impl RecordBatchReader + Send + Sync + 'static {
let metadata = HashMap::from_iter(vec![("foo".to_string(), "bar".to_string())]);
let schema = Arc::new(
Schema::new(vec![
Field::new("ints", DataType::Int64, true),
Field::new("strings", DataType::Utf8, true),
Field::new("floats", DataType::Float64, true),
Field::new("jsons", DataType::Utf8, true),
Field::new("bins", DataType::Binary, true),
Field::new("nodates", DataType::Utf8, true),
])
.with_metadata(metadata),
);
RecordBatchIterator::new(
vec![RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from_iter_values(0..1000)),
Arc::new(StringArray::from_iter_values(
(0..1000).map(|i| i.to_string()),
)),
Arc::new(Float64Array::from_iter_values((0..1000).map(|i| i as f64))),
Arc::new(StringArray::from_iter_values(
(0..1000).map(|i| format!("{{\"i\":{}}}", i)),
)),
Arc::new(BinaryArray::from_iter_values(
(0..1000).map(|i| (i as u32).to_be_bytes().to_vec()),
)),
Arc::new(StringArray::from_iter_values(
(0..1000).map(|i| i.to_string()),
)),
],
)],
schema,
)
}
struct TestFixture {
_tmp_dir: tempfile::TempDir,
// An adapter for a table with make_test_batches batches
adapter: Arc<BaseTableAdapter>,
// an adapter for a table with make_tbl_two_test_batches batches
adapter2: Arc<BaseTableAdapter>,
}
impl TestFixture {
@@ -262,15 +306,28 @@ pub mod tests {
.await
.unwrap();
let tbl2 = db
.create_table("tbl2", make_tbl_two_test_batches())
.execute()
.await
.unwrap();
let adapter = Arc::new(
BaseTableAdapter::try_new(tbl.base_table().clone())
.await
.unwrap(),
);
let adapter2 = Arc::new(
BaseTableAdapter::try_new(tbl2.base_table().clone())
.await
.unwrap(),
);
Self {
_tmp_dir: tmp_dir,
adapter,
adapter2,
}
}
@@ -309,7 +366,7 @@ pub mod tests {
}
async fn check_plan(plan: LogicalPlan, expected: &str) {
let physical_plan = dbg!(Self::plan_to_explain(plan).await);
let physical_plan = Self::plan_to_explain(plan).await;
let mut lines_checked = 0;
for (actual_line, expected_line) in physical_plan.lines().zip(expected.lines()) {
lines_checked += 1;
@@ -343,6 +400,27 @@ pub mod tests {
}
}
#[tokio::test]
async fn test_metadata_erased_with_filter() {
// This is a regression test where the metadata eraser was not properly erasing metadata
let fixture = TestFixture::new().await;
assert!(fixture.adapter.schema().metadata().is_empty());
let plan = LogicalPlanBuilder::scan("foo", provider_as_source(fixture.adapter2), None)
.unwrap()
.filter(col("ints").lt(lit(10)))
.unwrap()
.build()
.unwrap();
let mut stream = TestFixture::plan_to_stream(plan).await;
while let Some(batch) = stream.try_next().await.unwrap() {
assert!(batch.schema().metadata().is_empty());
}
}
#[tokio::test]
async fn test_filter_pushdown() {
let fixture = TestFixture::new().await;