feat: use column expr with filters in LogQuery (#6646)

* feat: use column expr with filters in LogQuery

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove some clone

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-08-05 11:35:09 -07:00
committed by GitHub
parent e64469bbc4
commit ea024874e7
2 changed files with 180 additions and 142 deletions

View File

@@ -42,7 +42,7 @@ pub struct LogQuery {
// Filters
/// Conjunction of filters to apply for the raw logs.
///
/// Filters here can only refer to the columns from the original log.
/// Filters here can apply to any LogExpr.
pub filters: Vec<ColumnFilters>,
/// Adjacent lines to return. Applies to all filters above.
///
@@ -90,8 +90,7 @@ pub enum LogExpr {
alias: String,
},
Filter {
expr: Box<LogExpr>,
filter: ContentFilter,
filter: ColumnFilters,
},
}
@@ -282,12 +281,12 @@ impl TimeFilter {
}
}
/// Represents a column with filters to query.
#[derive(Debug, Serialize, Deserialize)]
/// Represents an expression with filters to query.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ColumnFilters {
/// Case-sensitive column name to query.
pub column_name: String,
/// Filters to apply to the column. Can be empty.
/// Expression to apply filters to. Can be a column reference or any other LogExpr.
pub expr: Box<LogExpr>,
/// Filters to apply to the expression result. Can be empty.
pub filters: Vec<ContentFilter>,
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow_schema::Schema as ArrowSchema;
use catalog::table_source::DfTableSourceProvider;
use common_function::utils::escape_like_pattern;
use datafusion::datasource::DefaultTableSource;
@@ -21,7 +22,7 @@ use datafusion_expr::utils::conjunction;
use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion_sql::TableReference;
use datatypes::schema::Schema;
use log_query::{ColumnFilters, LogExpr, LogQuery, TimeFilter};
use log_query::{LogExpr, LogQuery, TimeFilter};
use snafu::{OptionExt, ResultExt};
use table::table::adapter::DfTableProviderAdapter;
@@ -68,6 +69,7 @@ impl LogQueryPlanner {
// Build the initial scan plan
let mut plan_builder = LogicalPlanBuilder::scan(table_ref, table_source, None)
.context(DataFusionPlanningSnafu)?;
let df_schema = plan_builder.schema().clone();
// Collect filter expressions
let mut filters = Vec::new();
@@ -76,8 +78,8 @@ impl LogQueryPlanner {
filters.push(self.build_time_filter(&query.time_filter, &schema)?);
// Column filters
for column_filter in &query.filters {
if let Some(expr) = self.build_column_filter(column_filter)? {
for filter in &query.filters {
if let Some(expr) = self.build_column_filter(filter, df_schema.as_arrow())? {
filters.push(expr);
}
}
@@ -138,61 +140,56 @@ impl LogQueryPlanner {
Ok(expr)
}
/// Returns filter expressions
fn build_column_filter(&self, column_filter: &ColumnFilters) -> Result<Option<Expr>> {
if column_filter.filters.is_empty() {
return Ok(None);
}
self.build_content_filters(&column_filter.column_name, &column_filter.filters)
}
/// Builds filter expressions from content filters for a specific column
fn build_content_filters(
/// Builds filter expression from ColumnFilters (new structure with expr + filters)
fn build_column_filter(
&self,
column_name: &str,
filters: &[log_query::ContentFilter],
column_filter: &log_query::ColumnFilters,
schema: &ArrowSchema,
) -> Result<Option<Expr>> {
if filters.is_empty() {
return Ok(None);
}
let col_expr = self.log_expr_to_column_expr(&column_filter.expr, schema)?;
let exprs = filters
let filter_exprs = column_filter
.filters
.iter()
.map(|filter| self.build_content_filter(column_name, filter))
.filter_map(|filter| {
self.build_content_filter_with_expr(col_expr.clone(), filter)
.transpose()
})
.try_collect::<Vec<_>>()?;
Ok(conjunction(exprs))
if filter_exprs.is_empty() {
return Ok(None);
}
// Combine all filters with AND logic
Ok(conjunction(filter_exprs))
}
/// Builds a single content filter expression
/// Builds filter expression from a single ContentFilter using a provided column expression
#[allow(clippy::only_used_in_recursion)]
fn build_content_filter(
fn build_content_filter_with_expr(
&self,
column_name: &str,
col_expr: Expr,
filter: &log_query::ContentFilter,
) -> Result<Expr> {
) -> Result<Option<Expr>> {
match filter {
log_query::ContentFilter::Exact(pattern) => {
Ok(col(column_name)
.like(lit(ScalarValue::Utf8(Some(escape_like_pattern(pattern))))))
log_query::ContentFilter::Exact(value) => Ok(Some(
col_expr.like(lit(ScalarValue::Utf8(Some(escape_like_pattern(value))))),
)),
log_query::ContentFilter::Prefix(value) => Ok(Some(col_expr.like(lit(
ScalarValue::Utf8(Some(format!("{}%", escape_like_pattern(value)))),
)))),
log_query::ContentFilter::Postfix(value) => Ok(Some(col_expr.like(lit(
ScalarValue::Utf8(Some(format!("%{}", escape_like_pattern(value)))),
)))),
log_query::ContentFilter::Contains(value) => Ok(Some(col_expr.like(lit(
ScalarValue::Utf8(Some(format!("%{}%", escape_like_pattern(value)))),
)))),
log_query::ContentFilter::Regex(_pattern) => Err(UnimplementedSnafu {
feature: "regex filter",
}
log_query::ContentFilter::Prefix(pattern) => Ok(col(column_name).like(lit(
ScalarValue::Utf8(Some(format!("{}%", escape_like_pattern(pattern)))),
))),
log_query::ContentFilter::Postfix(pattern) => Ok(col(column_name).like(lit(
ScalarValue::Utf8(Some(format!("%{}", escape_like_pattern(pattern)))),
))),
log_query::ContentFilter::Contains(pattern) => Ok(col(column_name).like(lit(
ScalarValue::Utf8(Some(format!("%{}%", escape_like_pattern(pattern)))),
))),
log_query::ContentFilter::Regex(..) => Err::<Expr, _>(
UnimplementedSnafu {
feature: "regex filter",
}
.build(),
),
log_query::ContentFilter::Exist => Ok(col(column_name).is_not_null()),
.build()),
log_query::ContentFilter::Exist => Ok(Some(col_expr.is_not_null())),
log_query::ContentFilter::Between {
start,
end,
@@ -200,56 +197,65 @@ impl LogQueryPlanner {
end_inclusive,
} => {
let left = if *start_inclusive {
Expr::gt_eq
col_expr
.clone()
.gt_eq(lit(ScalarValue::Utf8(Some(start.clone()))))
} else {
Expr::gt
col_expr
.clone()
.gt(lit(ScalarValue::Utf8(Some(start.clone()))))
};
let right = if *end_inclusive {
Expr::lt_eq
col_expr.lt_eq(lit(ScalarValue::Utf8(Some(end.clone()))))
} else {
Expr::lt
col_expr.lt(lit(ScalarValue::Utf8(Some(end.clone()))))
};
Ok(left(
col(column_name),
lit(ScalarValue::Utf8(Some(escape_like_pattern(start)))),
)
.and(right(
col(column_name),
lit(ScalarValue::Utf8(Some(escape_like_pattern(end)))),
)))
Ok(Some(left.and(right)))
}
log_query::ContentFilter::GreatThan { value, inclusive } => {
let expr = if *inclusive { Expr::gt_eq } else { Expr::gt };
Ok(expr(
col(column_name),
lit(ScalarValue::Utf8(Some(escape_like_pattern(value)))),
))
let comparison_expr = if *inclusive {
col_expr.gt_eq(lit(ScalarValue::Utf8(Some(value.clone()))))
} else {
col_expr.gt(lit(ScalarValue::Utf8(Some(value.clone()))))
};
Ok(Some(comparison_expr))
}
log_query::ContentFilter::LessThan { value, inclusive } => {
let expr = if *inclusive { Expr::lt_eq } else { Expr::lt };
Ok(expr(
col(column_name),
lit(ScalarValue::Utf8(Some(escape_like_pattern(value)))),
))
if *inclusive {
Ok(Some(
col_expr.lt_eq(lit(ScalarValue::Utf8(Some(value.clone())))),
))
} else {
Ok(Some(
col_expr.lt(lit(ScalarValue::Utf8(Some(value.clone())))),
))
}
}
log_query::ContentFilter::In(values) => {
let list = values
let values: Vec<_> = values
.iter()
.map(|value| lit(ScalarValue::Utf8(Some(escape_like_pattern(value)))))
.map(|v| lit(ScalarValue::Utf8(Some(v.clone()))))
.collect();
Ok(col(column_name).in_list(list, false))
Ok(Some(col_expr.in_list(values, false)))
}
log_query::ContentFilter::Compound(filters, op) => {
let exprs = filters
.iter()
.map(|filter| self.build_content_filter(column_name, filter))
.filter_map(|filter| {
self.build_content_filter_with_expr(col_expr.clone(), filter)
.transpose()
})
.try_collect::<Vec<_>>()?;
if exprs.is_empty() {
return Ok(None);
}
match op {
log_query::BinaryOperator::And => Ok(conjunction(exprs).unwrap()),
log_query::BinaryOperator::And => Ok(conjunction(exprs)),
log_query::BinaryOperator::Or => {
// Build a disjunction (OR) of expressions
Ok(exprs.into_iter().reduce(|a, b| a.or(b)).unwrap())
Ok(exprs.into_iter().reduce(|a, b| a.or(b)))
}
}
}
@@ -272,11 +278,11 @@ impl LogQueryPlanner {
})?;
let args = args
.iter()
.map(|expr| self.log_expr_to_column_expr(expr, schema))
.map(|expr| self.log_expr_to_column_expr(expr, schema.as_arrow()))
.try_collect::<Vec<_>>()?;
let group_exprs = by
.iter()
.map(|expr| self.log_expr_to_column_expr(expr, schema))
.map(|expr| self.log_expr_to_column_expr(expr, schema.as_arrow()))
.try_collect::<Vec<_>>()?;
let aggr_expr = aggr_fn.call(args);
@@ -287,7 +293,7 @@ impl LogQueryPlanner {
///
/// A column expression here can be a column identifier, a positional identifier, or a literal.
/// They don't rely on the context of the query or other columns.
fn log_expr_to_column_expr(&self, expr: &LogExpr, schema: &DFSchema) -> Result<Expr> {
fn log_expr_to_column_expr(&self, expr: &LogExpr, schema: &ArrowSchema) -> Result<Expr> {
match expr {
LogExpr::NamedIdent(name) => Ok(col(name)),
LogExpr::PositionalIdent(index) => Ok(col(schema.field(*index).name())),
@@ -303,7 +309,7 @@ impl LogQueryPlanner {
fn build_scalar_func(&self, schema: &DFSchema, name: &str, args: &[LogExpr]) -> Result<Expr> {
let args = args
.iter()
.map(|expr| self.log_expr_to_column_expr(expr, schema))
.map(|expr| self.log_expr_to_column_expr(expr, schema.as_arrow()))
.try_collect::<Vec<_>>()?;
let func = self.session_state.scalar_functions().get(name).context(
UnknownScalarFunctionSnafu {
@@ -343,15 +349,13 @@ impl LogQueryPlanner {
.aggregate(group_exprs, [aggr_expr.clone()])
.context(DataFusionPlanningSnafu)?;
}
LogExpr::Filter { expr, filter } => {
LogExpr::Filter { filter } => {
let schema = plan_builder.schema();
let expr = self.log_expr_to_column_expr(expr, schema)?;
let col_name = expr.schema_name().to_string();
let filter_expr = self.build_content_filter(&col_name, filter)?;
plan_builder = plan_builder
.filter(filter_expr)
.context(DataFusionPlanningSnafu)?;
if let Some(filter_expr) = self.build_column_filter(filter, schema.as_arrow())? {
plan_builder = plan_builder
.filter(filter_expr)
.context(DataFusionPlanningSnafu)?;
}
}
LogExpr::ScalarFunc { name, args, alias } => {
let schema = plan_builder.schema();
@@ -367,7 +371,7 @@ impl LogQueryPlanner {
// nothing to do, return empty vec.
}
LogExpr::Alias { expr, alias } => {
let expr = self.log_expr_to_column_expr(expr, plan_builder.schema())?;
let expr = self.log_expr_to_column_expr(expr, plan_builder.schema().as_arrow())?;
let aliased_expr = expr.alias(alias);
plan_builder = plan_builder
.project([aliased_expr.clone()])
@@ -395,7 +399,7 @@ mod tests {
use datafusion::execution::SessionStateBuilder;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaRef};
use log_query::{BinaryOperator, ContentFilter, Context, Limit};
use log_query::{BinaryOperator, ColumnFilters, ContentFilter, Context, Limit, LogExpr};
use session::context::QueryContext;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::table_name::TableName;
@@ -482,7 +486,7 @@ mod tests {
span: None,
},
filters: vec![ColumnFilters {
column_name: "message".to_string(),
expr: Box::new(LogExpr::NamedIdent("message".to_string())),
filters: vec![ContentFilter::Contains("error".to_string())],
}],
limit: Limit {
@@ -559,21 +563,24 @@ mod tests {
}
#[tokio::test]
async fn test_build_column_filter() {
async fn test_build_content_filter() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
let schema = mock_schema();
let column_filter = ColumnFilters {
column_name: "message".to_string(),
expr: Box::new(LogExpr::NamedIdent("message".to_string())),
filters: vec![
ContentFilter::Contains("error".to_string()),
ContentFilter::Prefix("WARN".to_string()),
],
};
let expr_option = planner.build_column_filter(&column_filter).unwrap();
let expr_option = planner
.build_column_filter(&column_filter, schema.arrow_schema())
.unwrap();
assert!(expr_option.is_some());
let expr = expr_option.unwrap();
@@ -600,7 +607,7 @@ mod tests {
span: None,
},
filters: vec![ColumnFilters {
column_name: "message".to_string(),
expr: Box::new(LogExpr::NamedIdent("message".to_string())),
filters: vec![ContentFilter::Contains("error".to_string())],
}],
limit: Limit {
@@ -635,7 +642,7 @@ mod tests {
span: None,
},
filters: vec![ColumnFilters {
column_name: "message".to_string(),
expr: Box::new(LogExpr::NamedIdent("message".to_string())),
filters: vec![ContentFilter::Contains("error".to_string())],
}],
limit: Limit {
@@ -743,14 +750,15 @@ mod tests {
}
#[tokio::test]
async fn test_build_column_filter_between() {
async fn test_build_content_filter_between() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
let schema = mock_schema();
let column_filter = ColumnFilters {
column_name: "message".to_string(),
expr: Box::new(LogExpr::NamedIdent("message".to_string())),
filters: vec![ContentFilter::Between {
start: "a".to_string(),
end: "z".to_string(),
@@ -759,7 +767,9 @@ mod tests {
}],
};
let expr_option = planner.build_column_filter(&column_filter).unwrap();
let expr_option = planner
.build_column_filter(&column_filter, schema.arrow_schema())
.unwrap();
assert!(expr_option.is_some());
let expr = expr_option.unwrap();
@@ -828,16 +838,20 @@ mod tests {
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
let schema = mock_schema();
// Test AND compound
let filter = ContentFilter::Compound(
vec![
let column_filter = ColumnFilters {
expr: Box::new(LogExpr::NamedIdent("message".to_string())),
filters: vec![
ContentFilter::Contains("error".to_string()),
ContentFilter::Prefix("WARN".to_string()),
],
BinaryOperator::And,
);
let expr = planner.build_content_filter("message", &filter).unwrap();
};
let expr = planner
.build_column_filter(&column_filter, schema.arrow_schema())
.unwrap()
.unwrap();
let expected_expr = col("message")
.like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
@@ -845,15 +859,21 @@ mod tests {
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
// Test OR compound
let filter = ContentFilter::Compound(
vec![
ContentFilter::Contains("error".to_string()),
ContentFilter::Prefix("WARN".to_string()),
],
BinaryOperator::Or,
);
let expr = planner.build_content_filter("message", &filter).unwrap();
// Test OR compound - use Compound filter for OR logic
let column_filter = ColumnFilters {
expr: Box::new(LogExpr::NamedIdent("message".to_string())),
filters: vec![ContentFilter::Compound(
vec![
ContentFilter::Contains("error".to_string()),
ContentFilter::Prefix("WARN".to_string()),
],
BinaryOperator::Or,
)],
};
let expr = planner
.build_column_filter(&column_filter, schema.arrow_schema())
.unwrap()
.unwrap();
let expected_expr = col("message")
.like(lit(ScalarValue::Utf8(Some("%error%".to_string()))))
@@ -862,20 +882,26 @@ mod tests {
assert_eq!(format!("{:?}", expr), format!("{:?}", expected_expr));
// Test nested compound
let filter = ContentFilter::Compound(
vec![
ContentFilter::Contains("error".to_string()),
ContentFilter::Compound(
vec![
ContentFilter::Prefix("WARN".to_string()),
ContentFilter::Exact("DEBUG".to_string()),
],
BinaryOperator::Or,
),
],
BinaryOperator::And,
);
let expr = planner.build_content_filter("message", &filter).unwrap();
let column_filter = ColumnFilters {
expr: Box::new(LogExpr::NamedIdent("message".to_string())),
filters: vec![ContentFilter::Compound(
vec![
ContentFilter::Contains("error".to_string()),
ContentFilter::Compound(
vec![
ContentFilter::Prefix("WARN".to_string()),
ContentFilter::Exact("DEBUG".to_string()),
],
BinaryOperator::Or,
),
],
BinaryOperator::And,
)],
};
let expr = planner
.build_column_filter(&column_filter, schema.arrow_schema())
.unwrap()
.unwrap();
let expected_nested = col("message")
.like(lit(ScalarValue::Utf8(Some("WARN%".to_string()))))
@@ -893,17 +919,20 @@ mod tests {
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
let schema = mock_schema();
// Test GreatThan with inclusive=true
let column_filter = ColumnFilters {
column_name: "message".to_string(),
expr: Box::new(LogExpr::NamedIdent("message".to_string())),
filters: vec![ContentFilter::GreatThan {
value: "error".to_string(),
inclusive: true,
}],
};
let expr_option = planner.build_column_filter(&column_filter).unwrap();
let expr_option = planner
.build_column_filter(&column_filter, schema.arrow_schema())
.unwrap();
assert!(expr_option.is_some());
let expr = expr_option.unwrap();
@@ -913,14 +942,16 @@ mod tests {
// Test GreatThan with inclusive=false
let column_filter = ColumnFilters {
column_name: "message".to_string(),
expr: Box::new(LogExpr::NamedIdent("message".to_string())),
filters: vec![ContentFilter::GreatThan {
value: "error".to_string(),
inclusive: false,
}],
};
let expr_option = planner.build_column_filter(&column_filter).unwrap();
let expr_option = planner
.build_column_filter(&column_filter, schema.arrow_schema())
.unwrap();
assert!(expr_option.is_some());
let expr = expr_option.unwrap();
@@ -935,17 +966,20 @@ mod tests {
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
let schema = mock_schema();
// Test LessThan with inclusive=true
let column_filter = ColumnFilters {
column_name: "message".to_string(),
expr: Box::new(LogExpr::NamedIdent("message".to_string())),
filters: vec![ContentFilter::LessThan {
value: "error".to_string(),
inclusive: true,
}],
};
let expr_option = planner.build_column_filter(&column_filter).unwrap();
let expr_option = planner
.build_column_filter(&column_filter, schema.arrow_schema())
.unwrap();
assert!(expr_option.is_some());
let expr = expr_option.unwrap();
@@ -955,14 +989,16 @@ mod tests {
// Test LessThan with inclusive=false
let column_filter = ColumnFilters {
column_name: "message".to_string(),
expr: Box::new(LogExpr::NamedIdent("message".to_string())),
filters: vec![ContentFilter::LessThan {
value: "error".to_string(),
inclusive: false,
}],
};
let expr_option = planner.build_column_filter(&column_filter).unwrap();
let expr_option = planner
.build_column_filter(&column_filter, schema.arrow_schema())
.unwrap();
assert!(expr_option.is_some());
let expr = expr_option.unwrap();
@@ -977,10 +1013,11 @@ mod tests {
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
let schema = mock_schema();
// Test In filter with multiple values
let column_filter = ColumnFilters {
column_name: "message".to_string(),
expr: Box::new(LogExpr::NamedIdent("message".to_string())),
filters: vec![ContentFilter::In(vec![
"error".to_string(),
"warning".to_string(),
@@ -988,7 +1025,9 @@ mod tests {
])],
};
let expr_option = planner.build_column_filter(&column_filter).unwrap();
let expr_option = planner
.build_column_filter(&column_filter, schema.arrow_schema())
.unwrap();
assert!(expr_option.is_some());
let expr = expr_option.unwrap();