feat(log-query): support binary op, scalar fn & is_true/is_false (#6659)

* rename symbol

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* handle binary op

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update test results

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/query/src/log_query/planner.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

* fix format

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* reduce duplication

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Ruihang Xia
2025-08-05 21:38:25 -07:00
committed by GitHub
parent 309e9d978c
commit c8da35c7e5
2 changed files with 195 additions and 52 deletions

View File

@@ -82,7 +82,7 @@ pub enum LogExpr {
},
BinaryOp {
left: Box<LogExpr>,
op: String,
op: BinaryOperator,
right: Box<LogExpr>,
},
Alias {
@@ -326,14 +326,38 @@ pub enum ContentFilter {
inclusive: bool,
},
In(Vec<String>),
// TODO(ruihang): arithmetic operations
IsTrue,
IsFalse,
// Compound filters
Compound(Vec<ContentFilter>, BinaryOperator),
Compound(Vec<ContentFilter>, ConjunctionOperator),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ConjunctionOperator {
And,
Or,
}
/// Binary operators for LogExpr::BinaryOp.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum BinaryOperator {
// Comparison operators
Eq,
Ne,
Lt,
Le,
Gt,
Ge,
// Arithmetic operators
Plus,
Minus,
Multiply,
Divide,
Modulo,
// Logical operators
And,
Or,
}

View File

@@ -19,10 +19,10 @@ use datafusion::datasource::DefaultTableSource;
use datafusion::execution::SessionState;
use datafusion_common::{DFSchema, ScalarValue};
use datafusion_expr::utils::conjunction;
use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion_expr::{col, lit, BinaryExpr, Expr, LogicalPlan, LogicalPlanBuilder, Operator};
use datafusion_sql::TableReference;
use datatypes::schema::Schema;
use log_query::{LogExpr, LogQuery, TimeFilter};
use log_query::{BinaryOperator, LogExpr, LogQuery, TimeFilter};
use snafu::{OptionExt, ResultExt};
use table::table::adapter::DfTableProviderAdapter;
@@ -146,7 +146,9 @@ impl LogQueryPlanner {
column_filter: &log_query::ColumnFilters,
schema: &ArrowSchema,
) -> Result<Option<Expr>> {
let col_expr = self.log_expr_to_column_expr(&column_filter.expr, schema)?;
// Convert ArrowSchema to DFSchema for the more generic function
let df_schema = DFSchema::try_from(schema.clone()).context(DataFusionPlanningSnafu)?;
let col_expr = self.log_expr_to_df_expr(&column_filter.expr, &df_schema)?;
let filter_exprs = column_filter
.filters
@@ -238,6 +240,8 @@ impl LogQueryPlanner {
.collect();
Ok(Some(col_expr.in_list(values, false)))
}
log_query::ContentFilter::IsTrue => Ok(Some(col_expr.is_true())),
log_query::ContentFilter::IsFalse => Ok(Some(col_expr.is_false())),
log_query::ContentFilter::Compound(filters, op) => {
let exprs = filters
.iter()
@@ -252,8 +256,8 @@ impl LogQueryPlanner {
}
match op {
log_query::BinaryOperator::And => Ok(conjunction(exprs)),
log_query::BinaryOperator::Or => {
log_query::ConjunctionOperator::And => Ok(conjunction(exprs)),
log_query::ConjunctionOperator::Or => {
// Build a disjunction (OR) of expressions
Ok(exprs.into_iter().reduce(|a, b| a.or(b)))
}
@@ -278,38 +282,61 @@ impl LogQueryPlanner {
})?;
let args = args
.iter()
.map(|expr| self.log_expr_to_column_expr(expr, schema.as_arrow()))
.map(|expr| self.log_expr_to_df_expr(expr, schema))
.try_collect::<Vec<_>>()?;
let group_exprs = by
.iter()
.map(|expr| self.log_expr_to_column_expr(expr, schema.as_arrow()))
.map(|expr| self.log_expr_to_df_expr(expr, schema))
.try_collect::<Vec<_>>()?;
let aggr_expr = aggr_fn.call(args);
Ok((aggr_expr, group_exprs))
}
/// Converts a log expression to a column expression.
///
/// A column expression here can be a column identifier, a positional identifier, or a literal.
/// They don't rely on the context of the query or other columns.
fn log_expr_to_column_expr(&self, expr: &LogExpr, schema: &ArrowSchema) -> Result<Expr> {
/// Converts a LogExpr to a DataFusion Expr, handling all expression types.
fn log_expr_to_df_expr(&self, expr: &LogExpr, schema: &DFSchema) -> Result<Expr> {
match expr {
LogExpr::NamedIdent(name) => Ok(col(name)),
LogExpr::PositionalIdent(index) => Ok(col(schema.field(*index).name())),
LogExpr::Literal(literal) => Ok(lit(ScalarValue::Utf8(Some(literal.clone())))),
_ => UnexpectedLogExprSnafu {
expr: expr.clone(),
expected: "named identifier, positional identifier, or literal",
LogExpr::BinaryOp { left, op, right } => {
let left_expr = self.log_expr_to_df_expr(left, schema)?;
let right_expr = self.log_expr_to_df_expr(right, schema)?;
let df_op = Self::binary_operator_to_df_operator(op);
Ok(Expr::BinaryExpr(BinaryExpr {
left: Box::new(left_expr),
op: df_op,
right: Box::new(right_expr),
}))
}
LogExpr::ScalarFunc { name, args, alias } => {
self.build_scalar_func(schema, name, args, alias)
}
LogExpr::Alias { expr, alias } => {
let df_expr = self.log_expr_to_df_expr(expr, schema)?;
Ok(df_expr.alias(alias))
}
LogExpr::AggrFunc { .. } | LogExpr::Filter { .. } | LogExpr::Decompose { .. } => {
UnexpectedLogExprSnafu {
expr: expr.clone(),
expected: "not a typical expression",
}
.fail()
}
.fail(),
}
}
fn build_scalar_func(&self, schema: &DFSchema, name: &str, args: &[LogExpr]) -> Result<Expr> {
fn build_scalar_func(
&self,
schema: &DFSchema,
name: &str,
args: &[LogExpr],
alias: &Option<String>,
) -> Result<Expr> {
let args = args
.iter()
.map(|expr| self.log_expr_to_column_expr(expr, schema.as_arrow()))
.map(|expr| self.log_expr_to_df_expr(expr, schema))
.try_collect::<Vec<_>>()?;
let func = self.session_state.scalar_functions().get(name).context(
UnknownScalarFunctionSnafu {
@@ -318,7 +345,30 @@ impl LogQueryPlanner {
)?;
let expr = func.call(args);
Ok(expr)
if let Some(alias) = alias {
Ok(expr.alias(alias))
} else {
Ok(expr)
}
}
/// Convert BinaryOperator to DataFusion's Operator.
fn binary_operator_to_df_operator(op: &BinaryOperator) -> Operator {
match op {
BinaryOperator::Eq => Operator::Eq,
BinaryOperator::Ne => Operator::NotEq,
BinaryOperator::Lt => Operator::Lt,
BinaryOperator::Le => Operator::LtEq,
BinaryOperator::Gt => Operator::Gt,
BinaryOperator::Ge => Operator::GtEq,
BinaryOperator::Plus => Operator::Plus,
BinaryOperator::Minus => Operator::Minus,
BinaryOperator::Multiply => Operator::Multiply,
BinaryOperator::Divide => Operator::Divide,
BinaryOperator::Modulo => Operator::Modulo,
BinaryOperator::And => Operator::And,
BinaryOperator::Or => Operator::Or,
}
}
/// Process LogExpr recursively.
@@ -359,24 +409,30 @@ impl LogQueryPlanner {
}
LogExpr::ScalarFunc { name, args, alias } => {
let schema = plan_builder.schema();
let mut expr = self.build_scalar_func(schema, name, args)?;
if let Some(alias) = alias {
expr = expr.alias(alias);
}
let expr = self.build_scalar_func(schema, name, args, alias)?;
plan_builder = plan_builder
.project([expr.clone()])
.project([expr])
.context(DataFusionPlanningSnafu)?;
}
LogExpr::NamedIdent(_) | LogExpr::PositionalIdent(_) => {
// nothing to do, return empty vec.
}
LogExpr::Alias { expr, alias } => {
let expr = self.log_expr_to_column_expr(expr, plan_builder.schema().as_arrow())?;
let aliased_expr = expr.alias(alias);
let schema = plan_builder.schema();
let df_expr = self.log_expr_to_df_expr(expr, schema)?;
let aliased_expr = df_expr.alias(alias);
plan_builder = plan_builder
.project([aliased_expr.clone()])
.context(DataFusionPlanningSnafu)?;
}
LogExpr::BinaryOp { .. } => {
let schema = plan_builder.schema();
let binary_expr = self.log_expr_to_df_expr(expr, schema)?;
plan_builder = plan_builder
.project([binary_expr])
.context(DataFusionPlanningSnafu)?;
}
_ => {
UnimplementedSnafu {
feature: "log expression",
@@ -399,7 +455,7 @@ mod tests {
use datafusion::execution::SessionStateBuilder;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaRef};
use log_query::{BinaryOperator, ColumnFilters, ContentFilter, Context, Limit, LogExpr};
use log_query::{ColumnFilters, ConjunctionOperator, ContentFilter, Context, Limit, LogExpr};
use session::context::QueryContext;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::table_name::TableName;
@@ -425,12 +481,17 @@ mod tests {
ConcreteDataType::string_datatype(),
true,
),
ColumnSchema::new(
"is_active".to_string(),
ConcreteDataType::boolean_datatype(),
true,
),
];
Arc::new(Schema::new(columns))
}
/// Registers table under `greptime`, with `message` and `timestamp` and `host` columns.
/// Registers table under `greptime`, with `message`, `timestamp`, `host`, and `is_active` columns.
async fn build_test_table_provider(
table_name_tuples: &[(String, String)],
) -> DfTableSourceProvider {
@@ -499,9 +560,9 @@ mod tests {
};
let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
let expected = "Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
@@ -620,9 +681,9 @@ mod tests {
};
let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Limit: skip=10, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
let expected = "Limit: skip=10, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
@@ -655,9 +716,9 @@ mod tests {
};
let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
let expected = "Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") AND greptime.public.test_table.message LIKE Utf8(\"%error%\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
@@ -702,9 +763,9 @@ mod tests {
let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Aggregate: groupBy=[[greptime.public.test_table.host]], aggr=[[count(greptime.public.test_table.message) AS count_result]] [host:Utf8;N, count_result:Int64]\
\n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
\n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
@@ -742,9 +803,9 @@ mod tests {
let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Projection: date_trunc(greptime.public.test_table.timestamp, Utf8(\"day\")) AS time_bucket [time_bucket:Timestamp(Nanosecond, None);N]\
\n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
\n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
@@ -825,9 +886,9 @@ mod tests {
let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Aggregate: groupBy=[[2__date_histogram__time_bucket]], aggr=[[count(2__date_histogram__time_bucket) AS count_result]] [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N, count_result:Int64]\
\n Projection: date_bin(Utf8(\"30 seconds\"), greptime.public.test_table.timestamp) AS 2__date_histogram__time_bucket [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N]\
\n Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
\n Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N, is_active:Boolean;N]";
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
@@ -867,7 +928,7 @@ mod tests {
ContentFilter::Contains("error".to_string()),
ContentFilter::Prefix("WARN".to_string()),
],
BinaryOperator::Or,
ConjunctionOperator::Or,
)],
};
let expr = planner
@@ -892,10 +953,10 @@ mod tests {
ContentFilter::Prefix("WARN".to_string()),
ContentFilter::Exact("DEBUG".to_string()),
],
BinaryOperator::Or,
ConjunctionOperator::Or,
),
],
BinaryOperator::And,
ConjunctionOperator::And,
)],
};
let expr = planner
@@ -1042,4 +1103,62 @@ mod tests {
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
}
#[tokio::test]
async fn test_build_is_true_filter() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
let schema = mock_schema();
// Test IsTrue filter
let column_filter = ColumnFilters {
expr: Box::new(LogExpr::NamedIdent("is_active".to_string())),
filters: vec![ContentFilter::IsTrue],
};
let expr_option = planner
.build_column_filter(&column_filter, schema.arrow_schema())
.unwrap();
assert!(expr_option.is_some());
let expr = expr_option.unwrap();
let expected_expr_string =
"IsTrue(Column(Column { relation: None, name: \"is_active\" }))".to_string();
assert_eq!(format!("{:?}", expr), expected_expr_string);
}
#[tokio::test]
async fn test_build_filter_with_scalar_fn() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
let schema = mock_schema();
let column_filter = ColumnFilters {
expr: Box::new(LogExpr::BinaryOp {
left: Box::new(LogExpr::ScalarFunc {
name: "character_length".to_string(),
args: vec![LogExpr::NamedIdent("message".to_string())],
alias: None,
}),
op: BinaryOperator::Gt,
right: Box::new(LogExpr::Literal("100".to_string())),
}),
filters: vec![ContentFilter::IsTrue],
};
let expr_option = planner
.build_column_filter(&column_filter, schema.arrow_schema())
.unwrap();
assert!(expr_option.is_some());
let expr = expr_option.unwrap();
let expected_expr_string = "character_length(message) > Utf8(\"100\") IS TRUE";
assert_eq!(format!("{}", expr), expected_expr_string);
}
}