fix: add partition statistics to MetadataEraser (#2637)

Some of the data fusion optimizers optimize based on data statistics
(e.g. total bytes, number of rows).
If those statistics are not supplied, optimizers cannot optimize on top.
One example is Anti Hash Join which can optimize from LeftAnti (Left:
big table, Right: small table) to RightAnti (Left: small table, Right:
big table). Left Anti requires reading the whole big & small table while
RightAnti only requires reading the whole left table and supports limit
push down to only read partial of big table
This commit is contained in:
LuQQiu
2025-09-09 09:13:22 -07:00
committed by GitHub
parent d19c64e29b
commit 79960b254e

View File

@@ -121,6 +121,10 @@ impl ExecutionPlan for MetadataEraserExec {
as SendableRecordBatchStream, as SendableRecordBatchStream,
) )
} }
fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
self.input.partition_statistics(partition)
}
} }
#[derive(Debug)] #[derive(Debug)]
@@ -227,6 +231,7 @@ pub mod tests {
prelude::{SessionConfig, SessionContext}, prelude::{SessionConfig, SessionContext},
}; };
use datafusion_catalog::TableProvider; use datafusion_catalog::TableProvider;
use datafusion_common::stats::Precision;
use datafusion_execution::SendableRecordBatchStream; use datafusion_execution::SendableRecordBatchStream;
use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder}; use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder};
use futures::{StreamExt, TryStreamExt}; use futures::{StreamExt, TryStreamExt};
@@ -509,4 +514,24 @@ pub mod tests {
TestFixture::check_plan(plan, "").await; TestFixture::check_plan(plan, "").await;
} }
#[tokio::test]
async fn test_metadata_eraser_propagates_statistics() {
let fixture = TestFixture::new().await;
let plan =
LogicalPlanBuilder::scan("foo", provider_as_source(fixture.adapter.clone()), None)
.unwrap()
.build()
.unwrap();
let ctx = SessionContext::new();
let physical_plan = ctx.state().create_physical_plan(&plan).await.unwrap();
assert_eq!(physical_plan.name(), "MetadataEraserExec");
let partition_stats = physical_plan.partition_statistics(None).unwrap();
assert!(matches!(partition_stats.num_rows, Precision::Exact(10)));
}
} }