feat(log-query): implement compound filter and alias expr (#5596)

* refine alias behavior

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* implement compound

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* support gt, lt, and in

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-03-03 10:52:13 -08:00
committed by GitHub
parent dc24c462dc
commit 6c90f25299
2 changed files with 436 additions and 115 deletions

View File

@@ -63,6 +63,7 @@ pub enum LogExpr {
ScalarFunc {
name: String,
args: Vec<LogExpr>,
alias: Option<String>,
},
AggrFunc {
name: String,
@@ -70,6 +71,7 @@ pub enum LogExpr {
/// Optional range function parameter. Stands for the time range for both step and align.
range: Option<String>,
by: Vec<LogExpr>,
alias: Option<String>,
},
Decompose {
expr: Box<LogExpr>,
@@ -316,6 +318,15 @@ pub enum ContentFilter {
start_inclusive: bool,
end_inclusive: bool,
},
GreatThan {
value: String,
inclusive: bool,
},
LessThan {
value: String,
inclusive: bool,
},
In(Vec<String>),
// TODO(ruihang): arithmetic operations
// Compound filters

View File

@@ -108,50 +108,7 @@ impl LogQueryPlanner {
// Apply log expressions
for expr in &query.exprs {
match expr {
LogExpr::AggrFunc {
name,
args,
by,
range: _range,
} => {
let schema = plan_builder.schema();
let (group_expr, aggr_exprs) = self.build_aggr_func(schema, name, args, by)?;
plan_builder = plan_builder
.aggregate([group_expr], aggr_exprs)
.context(DataFusionPlanningSnafu)?;
}
LogExpr::Filter { expr, filter } => {
let schema = plan_builder.schema();
let expr = self.log_expr_to_df_expr(expr, schema)?;
let col_name = expr.schema_name().to_string();
let filter = self.build_column_filter(&ColumnFilters {
column_name: col_name,
filters: vec![filter.clone()],
})?;
if let Some(filter) = filter {
plan_builder = plan_builder
.filter(filter)
.context(DataFusionPlanningSnafu)?;
}
}
LogExpr::ScalarFunc { name, args } => {
let schema = plan_builder.schema();
let expr = self.build_scalar_func(schema, name, args)?;
plan_builder = plan_builder
.project([expr])
.context(DataFusionPlanningSnafu)?;
}
LogExpr::NamedIdent(_) | LogExpr::PositionalIdent(_) => {
// nothing to do
}
_ => {
UnimplementedSnafu {
feature: "log expression",
}
.fail()?;
}
}
plan_builder = self.process_log_expr(plan_builder, expr)?;
}
// Build the final plan
@@ -187,73 +144,118 @@ impl LogQueryPlanner {
return Ok(None);
}
let exprs = column_filter
.filters
self.build_content_filters(&column_filter.column_name, &column_filter.filters)
}
/// Builds filter expressions from content filters for a specific column
fn build_content_filters(
&self,
column_name: &str,
filters: &[log_query::ContentFilter],
) -> Result<Option<Expr>> {
if filters.is_empty() {
return Ok(None);
}
let exprs = filters
.iter()
.map(|filter| match filter {
log_query::ContentFilter::Exact(pattern) => Ok(col(&column_filter.column_name)
.like(lit(ScalarValue::Utf8(Some(escape_like_pattern(pattern)))))),
log_query::ContentFilter::Prefix(pattern) => Ok(col(&column_filter.column_name)
.like(lit(ScalarValue::Utf8(Some(format!(
"{}%",
escape_like_pattern(pattern)
)))))),
log_query::ContentFilter::Postfix(pattern) => Ok(col(&column_filter.column_name)
.like(lit(ScalarValue::Utf8(Some(format!(
"%{}",
escape_like_pattern(pattern)
)))))),
log_query::ContentFilter::Contains(pattern) => Ok(col(&column_filter.column_name)
.like(lit(ScalarValue::Utf8(Some(format!(
"%{}%",
escape_like_pattern(pattern)
)))))),
log_query::ContentFilter::Regex(..) => Err::<Expr, _>(
UnimplementedSnafu {
feature: "regex filter",
}
.build(),
),
log_query::ContentFilter::Exist => {
Ok(col(&column_filter.column_name).is_not_null())
}
log_query::ContentFilter::Between {
start,
end,
start_inclusive,
end_inclusive,
} => {
let left = if *start_inclusive {
Expr::gt_eq
} else {
Expr::gt
};
let right = if *end_inclusive {
Expr::lt_eq
} else {
Expr::lt
};
Ok(left(
col(&column_filter.column_name),
lit(ScalarValue::Utf8(Some(escape_like_pattern(start)))),
)
.and(right(
col(&column_filter.column_name),
lit(ScalarValue::Utf8(Some(escape_like_pattern(end)))),
)))
}
log_query::ContentFilter::Compound(..) => Err::<Expr, _>(
UnimplementedSnafu {
feature: "compound filter",
}
.build(),
),
})
.map(|filter| self.build_content_filter(column_name, filter))
.try_collect::<Vec<_>>()?;
Ok(conjunction(exprs))
}
/// Builds a single content filter expression
#[allow(clippy::only_used_in_recursion)]
fn build_content_filter(
&self,
column_name: &str,
filter: &log_query::ContentFilter,
) -> Result<Expr> {
match filter {
log_query::ContentFilter::Exact(pattern) => {
Ok(col(column_name)
.like(lit(ScalarValue::Utf8(Some(escape_like_pattern(pattern))))))
}
log_query::ContentFilter::Prefix(pattern) => Ok(col(column_name).like(lit(
ScalarValue::Utf8(Some(format!("{}%", escape_like_pattern(pattern)))),
))),
log_query::ContentFilter::Postfix(pattern) => Ok(col(column_name).like(lit(
ScalarValue::Utf8(Some(format!("%{}", escape_like_pattern(pattern)))),
))),
log_query::ContentFilter::Contains(pattern) => Ok(col(column_name).like(lit(
ScalarValue::Utf8(Some(format!("%{}%", escape_like_pattern(pattern)))),
))),
log_query::ContentFilter::Regex(..) => Err::<Expr, _>(
UnimplementedSnafu {
feature: "regex filter",
}
.build(),
),
log_query::ContentFilter::Exist => Ok(col(column_name).is_not_null()),
log_query::ContentFilter::Between {
start,
end,
start_inclusive,
end_inclusive,
} => {
let left = if *start_inclusive {
Expr::gt_eq
} else {
Expr::gt
};
let right = if *end_inclusive {
Expr::lt_eq
} else {
Expr::lt
};
Ok(left(
col(column_name),
lit(ScalarValue::Utf8(Some(escape_like_pattern(start)))),
)
.and(right(
col(column_name),
lit(ScalarValue::Utf8(Some(escape_like_pattern(end)))),
)))
}
log_query::ContentFilter::GreatThan { value, inclusive } => {
let expr = if *inclusive { Expr::gt_eq } else { Expr::gt };
Ok(expr(
col(column_name),
lit(ScalarValue::Utf8(Some(escape_like_pattern(value)))),
))
}
log_query::ContentFilter::LessThan { value, inclusive } => {
let expr = if *inclusive { Expr::lt_eq } else { Expr::lt };
Ok(expr(
col(column_name),
lit(ScalarValue::Utf8(Some(escape_like_pattern(value)))),
))
}
log_query::ContentFilter::In(values) => {
let list = values
.iter()
.map(|value| lit(ScalarValue::Utf8(Some(escape_like_pattern(value)))))
.collect();
Ok(col(column_name).in_list(list, false))
}
log_query::ContentFilter::Compound(filters, op) => {
let exprs = filters
.iter()
.map(|filter| self.build_content_filter(column_name, filter))
.try_collect::<Vec<_>>()?;
match op {
log_query::BinaryOperator::And => Ok(conjunction(exprs).unwrap()),
log_query::BinaryOperator::Or => {
// Build a disjunction (OR) of expressions
Ok(exprs.into_iter().reduce(|a, b| a.or(b)).unwrap())
}
}
}
}
}
fn build_aggr_func(
&self,
schema: &DFSchema,
@@ -270,18 +272,22 @@ impl LogQueryPlanner {
})?;
let args = args
.iter()
.map(|expr| self.log_expr_to_df_expr(expr, schema))
.map(|expr| self.log_expr_to_column_expr(expr, schema))
.try_collect::<Vec<_>>()?;
let group_exprs = by
.iter()
.map(|expr| self.log_expr_to_df_expr(expr, schema))
.map(|expr| self.log_expr_to_column_expr(expr, schema))
.try_collect::<Vec<_>>()?;
let aggr_expr = aggr_fn.call(args);
Ok((aggr_expr, group_exprs))
}
fn log_expr_to_df_expr(&self, expr: &LogExpr, schema: &DFSchema) -> Result<Expr> {
/// Converts a log expression to a column expression.
///
/// A column expression here can be a column identifier, a positional identifier, or a literal.
/// They don't rely on the context of the query or other columns.
fn log_expr_to_column_expr(&self, expr: &LogExpr, schema: &DFSchema) -> Result<Expr> {
match expr {
LogExpr::NamedIdent(name) => Ok(col(name)),
LogExpr::PositionalIdent(index) => Ok(col(schema.field(*index).name())),
@@ -297,7 +303,7 @@ impl LogQueryPlanner {
fn build_scalar_func(&self, schema: &DFSchema, name: &str, args: &[LogExpr]) -> Result<Expr> {
let args = args
.iter()
.map(|expr| self.log_expr_to_df_expr(expr, schema))
.map(|expr| self.log_expr_to_column_expr(expr, schema))
.try_collect::<Vec<_>>()?;
let func = self.session_state.scalar_functions().get(name).context(
UnknownScalarFunctionSnafu {
@@ -308,6 +314,74 @@ impl LogQueryPlanner {
Ok(expr)
}
/// Process LogExpr recursively.
///
/// Return the [`LogicalPlanBuilder`] after modification and the resulting expression's names.
fn process_log_expr(
&self,
plan_builder: LogicalPlanBuilder,
expr: &LogExpr,
) -> Result<LogicalPlanBuilder> {
let mut plan_builder = plan_builder;
match expr {
LogExpr::AggrFunc {
name,
args,
by,
range: _range,
alias,
} => {
let schema = plan_builder.schema();
let (mut aggr_expr, group_exprs) = self.build_aggr_func(schema, name, args, by)?;
if let Some(alias) = alias {
aggr_expr = aggr_expr.alias(alias);
}
plan_builder = plan_builder
.aggregate(group_exprs, [aggr_expr.clone()])
.context(DataFusionPlanningSnafu)?;
}
LogExpr::Filter { expr, filter } => {
let schema = plan_builder.schema();
let expr = self.log_expr_to_column_expr(expr, schema)?;
let col_name = expr.schema_name().to_string();
let filter_expr = self.build_content_filter(&col_name, filter)?;
plan_builder = plan_builder
.filter(filter_expr)
.context(DataFusionPlanningSnafu)?;
}
LogExpr::ScalarFunc { name, args, alias } => {
let schema = plan_builder.schema();
let mut expr = self.build_scalar_func(schema, name, args)?;
if let Some(alias) = alias {
expr = expr.alias(alias);
}
plan_builder = plan_builder
.project([expr.clone()])
.context(DataFusionPlanningSnafu)?;
}
LogExpr::NamedIdent(_) | LogExpr::PositionalIdent(_) => {
// nothing to do, return empty vec.
}
LogExpr::Alias { expr, alias } => {
let expr = self.log_expr_to_column_expr(expr, plan_builder.schema())?;
let aliased_expr = expr.alias(alias);
plan_builder = plan_builder
.project([aliased_expr.clone()])
.context(DataFusionPlanningSnafu)?;
}
_ => {
UnimplementedSnafu {
feature: "log expression",
}
.fail()?;
}
}
Ok(plan_builder)
}
}
#[cfg(test)]
@@ -321,7 +395,7 @@ mod tests {
use datafusion::execution::SessionStateBuilder;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaRef};
use log_query::{ContentFilter, Context, Limit};
use log_query::{BinaryOperator, ContentFilter, Context, Limit};
use session::context::QueryContext;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::table_name::TableName;
@@ -615,14 +689,15 @@ mod tests {
args: vec![LogExpr::NamedIdent("message".to_string())],
by: vec![LogExpr::NamedIdent("host".to_string())],
range: None,
alias: Some("count_result".to_string()),
}],
};
let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Aggregate: groupBy=[[count(greptime.public.test_table.message)]], aggr=[[greptime.public.test_table.host]] [count(greptime.public.test_table.message):Int64, host:Utf8;N]\
\n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
let expected = "Aggregate: groupBy=[[greptime.public.test_table.host]], aggr=[[count(greptime.public.test_table.message) AS count_result]] [host:Utf8;N, count_result:Int64]\
\n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
@@ -654,11 +729,12 @@ mod tests {
LogExpr::NamedIdent("timestamp".to_string()),
LogExpr::Literal("day".to_string()),
],
alias: Some("time_bucket".to_string()),
}],
};
let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Projection: date_trunc(greptime.public.test_table.timestamp, Utf8(\"day\")) [date_trunc(greptime.public.test_table.timestamp,Utf8(\"day\")):Timestamp(Nanosecond, None);N]\
let expected = "Projection: date_trunc(greptime.public.test_table.timestamp, Utf8(\"day\")) AS time_bucket [time_bucket:Timestamp(Nanosecond, None);N]\
\n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
@@ -693,4 +769,238 @@ mod tests {
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
}
#[tokio::test]
async fn test_query_to_plan_with_date_histogram() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let mut planner = LogQueryPlanner::new(table_provider, session_state);
let log_query = LogQuery {
table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
time_filter: TimeFilter {
start: Some("2021-01-01T00:00:00Z".to_string()),
end: Some("2021-01-02T00:00:00Z".to_string()),
span: None,
},
filters: vec![],
limit: Limit {
skip: Some(0),
fetch: None,
},
context: Context::None,
columns: vec![],
exprs: vec![
LogExpr::ScalarFunc {
name: "date_bin".to_string(),
args: vec![
LogExpr::Literal("30 seconds".to_string()),
LogExpr::NamedIdent("timestamp".to_string()),
],
alias: Some("2__date_histogram__time_bucket".to_string()),
},
LogExpr::AggrFunc {
name: "count".to_string(),
args: vec![LogExpr::PositionalIdent(0)],
by: vec![LogExpr::NamedIdent(
"2__date_histogram__time_bucket".to_string(),
)],
range: None,
alias: Some("count_result".to_string()),
},
],
};
let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Aggregate: groupBy=[[2__date_histogram__time_bucket]], aggr=[[count(2__date_histogram__time_bucket) AS count_result]] [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N, count_result:Int64]\
\n Projection: date_bin(Utf8(\"30 seconds\"), greptime.public.test_table.timestamp) AS 2__date_histogram__time_bucket [2__date_histogram__time_bucket:Timestamp(Nanosecond, None);N]\
\n Limit: skip=0, fetch=1000 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
#[tokio::test]
async fn test_build_compound_filter() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
// Test AND compound
let filter = ContentFilter::Compound(
vec![
ContentFilter::Contains("error".to_string()),
ContentFilter::Prefix("WARN".to_string()),
],
BinaryOperator::And,
);
let expr = planner.build_content_filter("message", &filter).unwrap();
let expected_expr = col("message")
.like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
.and(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
// Test OR compound
let filter = ContentFilter::Compound(
vec![
ContentFilter::Contains("error".to_string()),
ContentFilter::Prefix("WARN".to_string()),
],
BinaryOperator::Or,
);
let expr = planner.build_content_filter("message", &filter).unwrap();
let expected_expr = col("message")
.like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
.or(col("message").like(lit(ScalarValue::Utf8(Some("WARN%".to_string())))));
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
// Test nested compound
let filter = ContentFilter::Compound(
vec![
ContentFilter::Contains("error".to_string()),
ContentFilter::Compound(
vec![
ContentFilter::Prefix("WARN".to_string()),
ContentFilter::Exact("DEBUG".to_string()),
],
BinaryOperator::Or,
),
],
BinaryOperator::And,
);
let expr = planner.build_content_filter("message", &filter).unwrap();
let expected_nested = col("message")
.like(lit(ScalarValue::Utf8(Some("WARN%".to_string()))))
.or(col("message").like(lit(ScalarValue::Utf8(Some("DEBUG".to_string())))));
let expected_expr = col("message")
.like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
.and(expected_nested);
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
}
#[tokio::test]
async fn test_build_great_than_filter() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
// Test GreatThan with inclusive=true
let column_filter = ColumnFilters {
column_name: "message".to_string(),
filters: vec![ContentFilter::GreatThan {
value: "error".to_string(),
inclusive: true,
}],
};
let expr_option = planner.build_column_filter(&column_filter).unwrap();
assert!(expr_option.is_some());
let expr = expr_option.unwrap();
let expected_expr = col("message").gt_eq(lit(ScalarValue::Utf8(Some("error".to_string()))));
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
// Test GreatThan with inclusive=false
let column_filter = ColumnFilters {
column_name: "message".to_string(),
filters: vec![ContentFilter::GreatThan {
value: "error".to_string(),
inclusive: false,
}],
};
let expr_option = planner.build_column_filter(&column_filter).unwrap();
assert!(expr_option.is_some());
let expr = expr_option.unwrap();
let expected_expr = col("message").gt(lit(ScalarValue::Utf8(Some("error".to_string()))));
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
}
#[tokio::test]
async fn test_build_less_than_filter() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
// Test LessThan with inclusive=true
let column_filter = ColumnFilters {
column_name: "message".to_string(),
filters: vec![ContentFilter::LessThan {
value: "error".to_string(),
inclusive: true,
}],
};
let expr_option = planner.build_column_filter(&column_filter).unwrap();
assert!(expr_option.is_some());
let expr = expr_option.unwrap();
let expected_expr = col("message").lt_eq(lit(ScalarValue::Utf8(Some("error".to_string()))));
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
// Test LessThan with inclusive=false
let column_filter = ColumnFilters {
column_name: "message".to_string(),
filters: vec![ContentFilter::LessThan {
value: "error".to_string(),
inclusive: false,
}],
};
let expr_option = planner.build_column_filter(&column_filter).unwrap();
assert!(expr_option.is_some());
let expr = expr_option.unwrap();
let expected_expr = col("message").lt(lit(ScalarValue::Utf8(Some("error".to_string()))));
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
}
#[tokio::test]
async fn test_build_in_filter() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
// Test In filter with multiple values
let column_filter = ColumnFilters {
column_name: "message".to_string(),
filters: vec![ContentFilter::In(vec![
"error".to_string(),
"warning".to_string(),
"info".to_string(),
])],
};
let expr_option = planner.build_column_filter(&column_filter).unwrap();
assert!(expr_option.is_some());
let expr = expr_option.unwrap();
let expected_expr = col("message").in_list(
vec![
lit(ScalarValue::Utf8(Some("error".to_string()))),
lit(ScalarValue::Utf8(Some("warning".to_string()))),
lit(ScalarValue::Utf8(Some("info".to_string()))),
],
false,
);
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
}
}