fix: null first for part expr as logical expr (#7747)

* fix: null first for part expr as logical expr

Signed-off-by: discord9 <discord9@163.com>

* test: update tests

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* fix: nulll handle&non-null filter

Signed-off-by: discord9 <discord9@163.com>

* chore: doc test

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-06 10:53:05 +08:00
committed by GitHub
parent f544b02408
commit 4c30b9efaf
3 changed files with 208 additions and 16 deletions

View File

@@ -23,7 +23,7 @@ use api::v1::{OpType, SemanticType};
use common_telemetry::error;
use datafusion::physical_plan::PhysicalExpr;
use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
use datatypes::arrow::array::{ArrayRef, BooleanArray};
use datatypes::arrow::array::{Array as _, ArrayRef, BooleanArray};
use datatypes::arrow::buffer::BooleanBuffer;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::ConcreteDataType;
@@ -739,7 +739,13 @@ impl RangeBase {
reason: "Failed to downcast to BooleanArray".to_string(),
})?;
Ok(boolean_array.values().clone())
// also need to consider nulls in the partition filter result. If a value is null, it should be treated as false (filtered out).
let mut mask = boolean_array.values().clone();
if let Some(nulls) = boolean_array.nulls() {
mask = mask.bitand(nulls.inner());
}
Ok(mask)
}
/// Builds a `RecordBatch` from the input `Batch` matching the given schema.

View File

@@ -213,6 +213,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to convert partition expr value to ScalarValue: {:?}", value))]
ConvertPartitionExprValue {
value: Value,
#[snafu(implicit)]
location: Location,
source: datatypes::error::Error,
},
#[snafu(display("Duplicate expr: {:?}", expr))]
DuplicateExpr {
expr: PartitionExpr,
@@ -268,6 +276,7 @@ impl ErrorExt for Error {
Error::ToDFSchema { .. } => StatusCode::Internal,
Error::CreatePhysicalExpr { .. } => StatusCode::Internal,
Error::UnsupportedPartitionExprValue { .. } => StatusCode::InvalidArguments,
Error::ConvertPartitionExprValue { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -83,7 +83,8 @@ impl Operand {
Value::Date(v) => ScalarValue::Date32(Some(v.val())),
Value::Null => ScalarValue::Null,
Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())),
Value::Time(t) => time_to_scalar_value(*t.unit(), Some(t.value())).unwrap(),
Value::Time(t) => time_to_scalar_value(*t.unit(), Some(t.value()))
.context(error::ConvertPartitionExprValueSnafu { value: v.clone() })?,
Value::IntervalYearMonth(v) => ScalarValue::IntervalYearMonth(Some(v.to_i32())),
Value::IntervalDayTime(v) => ScalarValue::IntervalDayTime(Some((*v).into())),
Value::IntervalMonthDayNano(v) => {
@@ -301,28 +302,50 @@ impl PartitionExpr {
self.op,
RestrictedOp::Lt | RestrictedOp::LtEq | RestrictedOp::Gt | RestrictedOp::GtEq
) {
// Keep filtering semantics aligned with direct PartitionExpr evaluation (null-first ordering).
// In DataFusion SQL semantics, range comparisons with NULL yield NULL, so we inject
// `OR col IS NULL` on the null-first side of the comparison.
if matches!(self.lhs.as_ref(), Operand::Column(_)) {
let column_expr = self.lhs.try_as_logical_expr()?;
let other_expr = self.rhs.try_as_logical_expr()?;
let base = match self.op {
RestrictedOp::Lt => column_expr.clone().lt(other_expr),
RestrictedOp::LtEq => column_expr.clone().lt_eq(other_expr),
RestrictedOp::Gt => column_expr.clone().gt(other_expr),
RestrictedOp::GtEq => column_expr.clone().gt_eq(other_expr),
RestrictedOp::Lt => {
column_expr.clone().lt(other_expr).or(column_expr.is_null())
}
RestrictedOp::LtEq => column_expr
.clone()
.lt_eq(other_expr)
.or(column_expr.is_null()),
RestrictedOp::Gt => column_expr
.clone()
.gt(other_expr)
.and(column_expr.is_not_null()),
RestrictedOp::GtEq => column_expr
.clone()
.gt_eq(other_expr)
.and(column_expr.is_not_null()),
_ => unreachable!(),
};
return Ok(datafusion_expr::or(base, column_expr.is_null()));
return Ok(base);
} else if matches!(self.rhs.as_ref(), Operand::Column(_)) {
let other_expr = self.lhs.try_as_logical_expr()?;
let column_expr = self.rhs.try_as_logical_expr()?;
let base = match self.op {
RestrictedOp::Lt => other_expr.lt(column_expr.clone()),
RestrictedOp::LtEq => other_expr.lt_eq(column_expr.clone()),
RestrictedOp::Gt => other_expr.gt(column_expr.clone()),
RestrictedOp::GtEq => other_expr.gt_eq(column_expr.clone()),
RestrictedOp::Lt => other_expr
.lt(column_expr.clone())
.and(column_expr.is_not_null()),
RestrictedOp::LtEq => other_expr
.lt_eq(column_expr.clone())
.and(column_expr.is_not_null()),
RestrictedOp::Gt => {
other_expr.gt(column_expr.clone()).or(column_expr.is_null())
}
RestrictedOp::GtEq => other_expr
.gt_eq(column_expr.clone())
.or(column_expr.is_null()),
_ => unreachable!(),
};
return Ok(datafusion_expr::or(base, column_expr.is_null()));
return Ok(base);
}
}
@@ -552,7 +575,7 @@ mod tests {
.try_as_logical_expr()
.unwrap()
.to_string(),
"a > Int64(10) OR a IS NULL"
"a > Int64(10) AND a IS NOT NULL"
);
// Test Gt with column on LHS
@@ -563,7 +586,7 @@ mod tests {
);
assert_eq!(
gt_expr.try_as_logical_expr().unwrap().to_string(),
"a > Int64(10) OR a IS NULL"
"a > Int64(10) AND a IS NOT NULL"
);
// Test Gt with column on RHS
@@ -588,7 +611,7 @@ mod tests {
);
assert_eq!(
gteq_expr.try_as_logical_expr().unwrap().to_string(),
"a >= Int64(10) OR a IS NULL"
"a >= Int64(10) AND a IS NOT NULL"
);
// Test LtEq with column on LHS
@@ -601,6 +624,160 @@ mod tests {
lteq_expr.try_as_logical_expr().unwrap().to_string(),
"a <= Int64(10) OR a IS NULL"
);
let gteq_expr_rhs_column = PartitionExpr::new(
Operand::Value(Value::Int64(10)),
RestrictedOp::GtEq,
Operand::Column("a".to_string()),
);
assert_eq!(
gteq_expr_rhs_column
.try_as_logical_expr()
.unwrap()
.to_string(),
"a <= Int64(10) OR a IS NULL"
);
let lteq_expr_rhs_column = PartitionExpr::new(
Operand::Value(Value::Int64(10)),
RestrictedOp::LtEq,
Operand::Column("a".to_string()),
);
assert_eq!(
lteq_expr_rhs_column
.try_as_logical_expr()
.unwrap()
.to_string(),
"a >= Int64(10) AND a IS NOT NULL"
);
let and_expr = PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::LtEq,
Operand::Value(Value::Int64(10)),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Column("b".to_string()),
RestrictedOp::Gt,
Operand::Value(Value::Int64(5)),
)),
);
assert_eq!(
and_expr.try_as_logical_expr().unwrap().to_string(),
"(a <= Int64(10) OR a IS NULL) AND b > Int64(5) AND b IS NOT NULL"
);
let and_expr = PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::LtEq,
Operand::Value(Value::Int64(10)),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::Gt,
Operand::Value(Value::Int64(5)),
)),
);
assert_eq!(
and_expr.try_as_logical_expr().unwrap().to_string(),
"(a <= Int64(10) OR a IS NULL) AND a > Int64(5) AND a IS NOT NULL"
);
let and_expr_strict_lower = PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::Lt,
Operand::Value(Value::Int64(10)),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::GtEq,
Operand::Value(Value::Int64(5)),
)),
);
assert_eq!(
and_expr_strict_lower
.try_as_logical_expr()
.unwrap()
.to_string(),
"(a < Int64(10) OR a IS NULL) AND a >= Int64(5) AND a IS NOT NULL"
);
let and_expr_rhs_column = PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Value(Value::Int64(10)),
RestrictedOp::GtEq,
Operand::Column("a".to_string()),
)),
RestrictedOp::And,
Operand::Expr(PartitionExpr::new(
Operand::Value(Value::Int64(5)),
RestrictedOp::Lt,
Operand::Column("a".to_string()),
)),
);
assert_eq!(
and_expr_rhs_column
.try_as_logical_expr()
.unwrap()
.to_string(),
"(a <= Int64(10) OR a IS NULL) AND a > Int64(5) AND a IS NOT NULL"
);
let or_expr_same_column = PartitionExpr::new(
Operand::Expr(PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::LtEq,
Operand::Value(Value::Int64(10)),
)),
RestrictedOp::Or,
Operand::Expr(PartitionExpr::new(
Operand::Column("a".to_string()),
RestrictedOp::Gt,
Operand::Value(Value::Int64(5)),
)),
);
assert_eq!(
or_expr_same_column
.try_as_logical_expr()
.unwrap()
.to_string(),
"a <= Int64(10) OR a IS NULL OR a > Int64(5) AND a IS NOT NULL"
);
}
#[test]
fn test_try_as_logical_expr_rhs_column_without_canonicalize() {
let gt_expr_rhs_column = PartitionExpr {
lhs: Box::new(Operand::Value(Value::Int64(10))),
op: RestrictedOp::Gt,
rhs: Box::new(Operand::Column("a".to_string())),
};
assert_eq!(
gt_expr_rhs_column
.try_as_logical_expr()
.unwrap()
.to_string(),
"Int64(10) > a OR a IS NULL"
);
let gteq_expr_rhs_column = PartitionExpr {
lhs: Box::new(Operand::Value(Value::Int64(10))),
op: RestrictedOp::GtEq,
rhs: Box::new(Operand::Column("a".to_string())),
};
assert_eq!(
gteq_expr_rhs_column
.try_as_logical_expr()
.unwrap()
.to_string(),
"Int64(10) >= a OR a IS NULL"
);
}
#[test]