refactor: remove unused code for pruning row groups (#2973)

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2023-12-21 15:54:29 +08:00
committed by GitHub
parent 6a1f5751c6
commit b5c5458798

View File

@@ -15,27 +15,23 @@
use std::sync::Arc;
use common_query::logical_plan::{DfExpr, Expr};
use common_telemetry::{debug, error, warn};
use common_telemetry::{error, warn};
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::parquet::file::metadata::RowGroupMetaData;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_common::ToDFSchema;
use datafusion_expr::expr::InList;
use datafusion_expr::{Between, BinaryExpr, ColumnarValue, Operator};
use datafusion_expr::{Between, BinaryExpr, Operator};
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datatypes::arrow;
use datatypes::arrow::array::BooleanArray;
use datatypes::schema::SchemaRef;
use datatypes::value::scalar_value_to_timestamp;
use snafu::ResultExt;
use crate::error;
use crate::predicate::stats::RowGroupPruningStatistics;
#[cfg(test)]
mod stats;
#[derive(Debug, Clone)]
@@ -77,83 +73,6 @@ impl Predicate {
.collect::<Vec<_>>())
}
/// Builds an empty predicate from given schema.
pub fn empty() -> Self {
Self { exprs: vec![] }
}
/// Evaluates the predicate against row group metadata.
/// Returns a vector of boolean values, among which `false` means the row group can be skipped.
pub fn prune_row_groups(
&self,
row_groups: &[RowGroupMetaData],
schema: SchemaRef,
) -> Vec<bool> {
let mut res = vec![true; row_groups.len()];
let Ok(physical_exprs) = self.to_physical_exprs(schema.arrow_schema()) else {
return res;
};
let arrow_schema = schema.arrow_schema();
for expr in &physical_exprs {
match PruningPredicate::try_new(expr.clone(), arrow_schema.clone()) {
Ok(p) => {
let stat = RowGroupPruningStatistics::new(row_groups, &schema);
match p.prune(&stat) {
Ok(r) => {
for (curr_val, res) in r.into_iter().zip(res.iter_mut()) {
*res &= curr_val
}
}
Err(e) => {
warn!("Failed to prune row groups, error: {:?}", e);
}
}
}
Err(e) => {
error!("Failed to create predicate for expr, error: {:?}", e);
}
}
}
res
}
/// Prunes primary keys
pub fn prune_primary_key(&self, primary_key: &RecordBatch) -> error::Result<bool> {
let pk_schema = primary_key.schema();
let physical_exprs = self.to_physical_exprs(&pk_schema)?;
for expr in &physical_exprs {
// evaluate every filter against primary key
let Ok(eva) = expr.evaluate(primary_key) else {
continue;
};
let result = match eva {
ColumnarValue::Array(array) => {
let predicate_array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
predicate_array
.into_iter()
.map(|x| x.unwrap_or(true))
.next()
.unwrap_or(true)
}
// result was a column
ColumnarValue::Scalar(ScalarValue::Boolean(v)) => v.unwrap_or(true),
_ => {
unreachable!("Unexpected primary key record batch evaluation result: {:?}, primary key: {:?}", eva, primary_key);
}
};
debug!(
"Evaluate primary key {:?} against filter: {:?}, result: {:?}",
primary_key, expr, result
);
if !result {
return Ok(false);
}
}
Ok(true)
}
/// Evaluates the predicate against the `stats`.
/// Returns a vector of boolean values, among which `false` means the row group can be skipped.
pub fn prune_with_stats<S: PruningStatistics>(
@@ -443,6 +362,7 @@ mod tests {
use parquet::file::properties::WriterProperties;
use super::*;
use crate::predicate::stats::RowGroupPruningStatistics;
fn check_build_predicate(expr: DfExpr, expect: TimestampRange) {
assert_eq!(
@@ -568,6 +488,7 @@ mod tests {
TimestampRange::until_end(Timestamp::new_millisecond(1000), false),
);
}
#[test]
fn test_lt_eq() {
// ts <= 1ms
@@ -651,8 +572,8 @@ mod tests {
expect: Vec<bool>,
) {
let dir = create_temp_dir("prune_parquet");
let (path, schema) = gen_test_parquet_file(&dir, array_cnt).await;
let schema = Arc::new(datatypes::schema::Schema::try_from(schema).unwrap());
let (path, arrow_schema) = gen_test_parquet_file(&dir, array_cnt).await;
let schema = Arc::new(datatypes::schema::Schema::try_from(arrow_schema.clone()).unwrap());
let arrow_predicate = Predicate::new(filters);
let builder = ParquetRecordBatchStreamBuilder::new(
tokio::fs::OpenOptions::new()
@@ -665,7 +586,9 @@ mod tests {
.unwrap();
let metadata = builder.metadata().clone();
let row_groups = metadata.row_groups();
let res = arrow_predicate.prune_row_groups(row_groups, schema);
let stats = RowGroupPruningStatistics::new(row_groups, &schema);
let res = arrow_predicate.prune_with_stats(&stats, &arrow_schema);
assert_eq!(expect, res);
}