diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index fa5a03a9dc..6be6cf8c43 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -23,7 +23,7 @@ use api::v1::{OpType, SemanticType}; use common_telemetry::error; use datafusion::physical_plan::PhysicalExpr; use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr; -use datatypes::arrow::array::{ArrayRef, BooleanArray}; +use datatypes::arrow::array::{Array as _, ArrayRef, BooleanArray}; use datatypes::arrow::buffer::BooleanBuffer; use datatypes::arrow::record_batch::RecordBatch; use datatypes::prelude::ConcreteDataType; @@ -739,7 +739,13 @@ impl RangeBase { reason: "Failed to downcast to BooleanArray".to_string(), })?; - Ok(boolean_array.values().clone()) + // also need to consider nulls in the partition filter result. If a value is null, it should be treated as false (filtered out). + let mut mask = boolean_array.values().clone(); + if let Some(nulls) = boolean_array.nulls() { + mask = mask.bitand(nulls.inner()); + } + + Ok(mask) } /// Builds a `RecordBatch` from the input `Batch` matching the given schema. diff --git a/src/partition/src/error.rs b/src/partition/src/error.rs index d592881af8..569c8ff738 100644 --- a/src/partition/src/error.rs +++ b/src/partition/src/error.rs @@ -213,6 +213,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to convert partition expr value to ScalarValue: {:?}", value))] + ConvertPartitionExprValue { + value: Value, + #[snafu(implicit)] + location: Location, + source: datatypes::error::Error, + }, + #[snafu(display("Duplicate expr: {:?}", expr))] DuplicateExpr { expr: PartitionExpr, @@ -268,6 +276,7 @@ impl ErrorExt for Error { Error::ToDFSchema { .. } => StatusCode::Internal, Error::CreatePhysicalExpr { .. } => StatusCode::Internal, Error::UnsupportedPartitionExprValue { .. } => StatusCode::InvalidArguments, + Error::ConvertPartitionExprValue { .. } => StatusCode::InvalidArguments, } } diff --git a/src/partition/src/expr.rs b/src/partition/src/expr.rs index 1e7b573acb..1ef875dae0 100644 --- a/src/partition/src/expr.rs +++ b/src/partition/src/expr.rs @@ -83,7 +83,8 @@ impl Operand { Value::Date(v) => ScalarValue::Date32(Some(v.val())), Value::Null => ScalarValue::Null, Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())), - Value::Time(t) => time_to_scalar_value(*t.unit(), Some(t.value())).unwrap(), + Value::Time(t) => time_to_scalar_value(*t.unit(), Some(t.value())) + .context(error::ConvertPartitionExprValueSnafu { value: v.clone() })?, Value::IntervalYearMonth(v) => ScalarValue::IntervalYearMonth(Some(v.to_i32())), Value::IntervalDayTime(v) => ScalarValue::IntervalDayTime(Some((*v).into())), Value::IntervalMonthDayNano(v) => { @@ -301,28 +302,50 @@ impl PartitionExpr { self.op, RestrictedOp::Lt | RestrictedOp::LtEq | RestrictedOp::Gt | RestrictedOp::GtEq ) { + // Keep filtering semantics aligned with direct PartitionExpr evaluation (null-first ordering). + // In DataFusion SQL semantics, range comparisons with NULL yield NULL, so we inject + // `OR col IS NULL` on the null-first side of the comparison. if matches!(self.lhs.as_ref(), Operand::Column(_)) { let column_expr = self.lhs.try_as_logical_expr()?; let other_expr = self.rhs.try_as_logical_expr()?; let base = match self.op { - RestrictedOp::Lt => column_expr.clone().lt(other_expr), - RestrictedOp::LtEq => column_expr.clone().lt_eq(other_expr), - RestrictedOp::Gt => column_expr.clone().gt(other_expr), - RestrictedOp::GtEq => column_expr.clone().gt_eq(other_expr), + RestrictedOp::Lt => { + column_expr.clone().lt(other_expr).or(column_expr.is_null()) + } + RestrictedOp::LtEq => column_expr + .clone() + .lt_eq(other_expr) + .or(column_expr.is_null()), + RestrictedOp::Gt => column_expr + .clone() + .gt(other_expr) + .and(column_expr.is_not_null()), + RestrictedOp::GtEq => column_expr + .clone() + .gt_eq(other_expr) + .and(column_expr.is_not_null()), _ => unreachable!(), }; - return Ok(datafusion_expr::or(base, column_expr.is_null())); + return Ok(base); } else if matches!(self.rhs.as_ref(), Operand::Column(_)) { let other_expr = self.lhs.try_as_logical_expr()?; let column_expr = self.rhs.try_as_logical_expr()?; let base = match self.op { - RestrictedOp::Lt => other_expr.lt(column_expr.clone()), - RestrictedOp::LtEq => other_expr.lt_eq(column_expr.clone()), - RestrictedOp::Gt => other_expr.gt(column_expr.clone()), - RestrictedOp::GtEq => other_expr.gt_eq(column_expr.clone()), + RestrictedOp::Lt => other_expr + .lt(column_expr.clone()) + .and(column_expr.is_not_null()), + RestrictedOp::LtEq => other_expr + .lt_eq(column_expr.clone()) + .and(column_expr.is_not_null()), + RestrictedOp::Gt => { + other_expr.gt(column_expr.clone()).or(column_expr.is_null()) + } + RestrictedOp::GtEq => other_expr + .gt_eq(column_expr.clone()) + .or(column_expr.is_null()), _ => unreachable!(), }; - return Ok(datafusion_expr::or(base, column_expr.is_null())); + return Ok(base); } } @@ -552,7 +575,7 @@ mod tests { .try_as_logical_expr() .unwrap() .to_string(), - "a > Int64(10) OR a IS NULL" + "a > Int64(10) AND a IS NOT NULL" ); // Test Gt with column on LHS @@ -563,7 +586,7 @@ mod tests { ); assert_eq!( gt_expr.try_as_logical_expr().unwrap().to_string(), - "a > Int64(10) OR a IS NULL" + "a > Int64(10) AND a IS NOT NULL" ); // Test Gt with column on RHS @@ -588,7 +611,7 @@ mod tests { ); assert_eq!( gteq_expr.try_as_logical_expr().unwrap().to_string(), - "a >= Int64(10) OR a IS NULL" + "a >= Int64(10) AND a IS NOT NULL" ); // Test LtEq with column on LHS @@ -601,6 +624,160 @@ mod tests { lteq_expr.try_as_logical_expr().unwrap().to_string(), "a <= Int64(10) OR a IS NULL" ); + + let gteq_expr_rhs_column = PartitionExpr::new( + Operand::Value(Value::Int64(10)), + RestrictedOp::GtEq, + Operand::Column("a".to_string()), + ); + assert_eq!( + gteq_expr_rhs_column + .try_as_logical_expr() + .unwrap() + .to_string(), + "a <= Int64(10) OR a IS NULL" + ); + + let lteq_expr_rhs_column = PartitionExpr::new( + Operand::Value(Value::Int64(10)), + RestrictedOp::LtEq, + Operand::Column("a".to_string()), + ); + assert_eq!( + lteq_expr_rhs_column + .try_as_logical_expr() + .unwrap() + .to_string(), + "a >= Int64(10) AND a IS NOT NULL" + ); + + let and_expr = PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::LtEq, + Operand::Value(Value::Int64(10)), + )), + RestrictedOp::And, + Operand::Expr(PartitionExpr::new( + Operand::Column("b".to_string()), + RestrictedOp::Gt, + Operand::Value(Value::Int64(5)), + )), + ); + assert_eq!( + and_expr.try_as_logical_expr().unwrap().to_string(), + "(a <= Int64(10) OR a IS NULL) AND b > Int64(5) AND b IS NOT NULL" + ); + + let and_expr = PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::LtEq, + Operand::Value(Value::Int64(10)), + )), + RestrictedOp::And, + Operand::Expr(PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::Gt, + Operand::Value(Value::Int64(5)), + )), + ); + assert_eq!( + and_expr.try_as_logical_expr().unwrap().to_string(), + "(a <= Int64(10) OR a IS NULL) AND a > Int64(5) AND a IS NOT NULL" + ); + + let and_expr_strict_lower = PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::Lt, + Operand::Value(Value::Int64(10)), + )), + RestrictedOp::And, + Operand::Expr(PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::GtEq, + Operand::Value(Value::Int64(5)), + )), + ); + assert_eq!( + and_expr_strict_lower + .try_as_logical_expr() + .unwrap() + .to_string(), + "(a < Int64(10) OR a IS NULL) AND a >= Int64(5) AND a IS NOT NULL" + ); + + let and_expr_rhs_column = PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Value(Value::Int64(10)), + RestrictedOp::GtEq, + Operand::Column("a".to_string()), + )), + RestrictedOp::And, + Operand::Expr(PartitionExpr::new( + Operand::Value(Value::Int64(5)), + RestrictedOp::Lt, + Operand::Column("a".to_string()), + )), + ); + assert_eq!( + and_expr_rhs_column + .try_as_logical_expr() + .unwrap() + .to_string(), + "(a <= Int64(10) OR a IS NULL) AND a > Int64(5) AND a IS NOT NULL" + ); + + let or_expr_same_column = PartitionExpr::new( + Operand::Expr(PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::LtEq, + Operand::Value(Value::Int64(10)), + )), + RestrictedOp::Or, + Operand::Expr(PartitionExpr::new( + Operand::Column("a".to_string()), + RestrictedOp::Gt, + Operand::Value(Value::Int64(5)), + )), + ); + assert_eq!( + or_expr_same_column + .try_as_logical_expr() + .unwrap() + .to_string(), + "a <= Int64(10) OR a IS NULL OR a > Int64(5) AND a IS NOT NULL" + ); + } + + #[test] + fn test_try_as_logical_expr_rhs_column_without_canonicalize() { + let gt_expr_rhs_column = PartitionExpr { + lhs: Box::new(Operand::Value(Value::Int64(10))), + op: RestrictedOp::Gt, + rhs: Box::new(Operand::Column("a".to_string())), + }; + assert_eq!( + gt_expr_rhs_column + .try_as_logical_expr() + .unwrap() + .to_string(), + "Int64(10) > a OR a IS NULL" + ); + + let gteq_expr_rhs_column = PartitionExpr { + lhs: Box::new(Operand::Value(Value::Int64(10))), + op: RestrictedOp::GtEq, + rhs: Box::new(Operand::Column("a".to_string())), + }; + assert_eq!( + gteq_expr_rhs_column + .try_as_logical_expr() + .unwrap() + .to_string(), + "Int64(10) >= a OR a IS NULL" + ); } #[test]