feat(log-query): implement the first part of log query expr (#5548)

* feat(log-query): implement the first part of log query expr

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-02-19 10:25:41 -08:00
committed by GitHub
parent 53b25c04a2
commit e8788088a8
4 changed files with 236 additions and 16 deletions

View File

@@ -55,7 +55,7 @@ pub struct LogQuery {
}
/// Expression to calculate on log after filtering.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LogExpr {
NamedIdent(String),
PositionalIdent(usize),
@@ -289,7 +289,7 @@ pub struct ColumnFilters {
pub filters: Vec<ContentFilter>,
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ContentFilter {
// Search-based filters
/// Only match the exact content.
@@ -322,7 +322,7 @@ pub enum ContentFilter {
Compound(Vec<ContentFilter>, BinaryOperator),
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum BinaryOperator {
And,
Or,

View File

@@ -18,6 +18,7 @@ use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datafusion::error::DataFusionError;
use log_query::LogExpr;
use snafu::{Location, Snafu};
#[derive(Snafu)]
@@ -57,6 +58,28 @@ pub enum Error {
location: Location,
feature: String,
},
#[snafu(display("Unknown aggregate function: {name}"))]
UnknownAggregateFunction {
name: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unknown scalar function: {name}"))]
UnknownScalarFunction {
name: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unexpected log expression: {expr:?}, expected {expected}"))]
UnexpectedLogExpr {
expr: LogExpr,
expected: String,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -67,6 +90,9 @@ impl ErrorExt for Error {
DataFusionPlanning { .. } => StatusCode::External,
UnknownTable { .. } | TimeIndexNotFound { .. } => StatusCode::Internal,
Unimplemented { .. } => StatusCode::Unsupported,
UnknownAggregateFunction { .. }
| UnknownScalarFunction { .. }
| UnexpectedLogExpr { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -15,17 +15,19 @@
use catalog::table_source::DfTableSourceProvider;
use common_function::utils::escape_like_pattern;
use datafusion::datasource::DefaultTableSource;
use datafusion_common::ScalarValue;
use datafusion::execution::SessionState;
use datafusion_common::{DFSchema, ScalarValue};
use datafusion_expr::utils::conjunction;
use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion_sql::TableReference;
use datatypes::schema::Schema;
use log_query::{ColumnFilters, LogQuery, TimeFilter};
use log_query::{ColumnFilters, LogExpr, LogQuery, TimeFilter};
use snafu::{OptionExt, ResultExt};
use table::table::adapter::DfTableProviderAdapter;
use crate::log_query::error::{
CatalogSnafu, DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, UnimplementedSnafu,
CatalogSnafu, DataFusionPlanningSnafu, Result, TimeIndexNotFoundSnafu, UnexpectedLogExprSnafu,
UnimplementedSnafu, UnknownAggregateFunctionSnafu, UnknownScalarFunctionSnafu,
UnknownTableSnafu,
};
@@ -33,11 +35,15 @@ const DEFAULT_LIMIT: usize = 1000;
pub struct LogQueryPlanner {
table_provider: DfTableSourceProvider,
session_state: SessionState,
}
impl LogQueryPlanner {
pub fn new(table_provider: DfTableSourceProvider) -> Self {
Self { table_provider }
pub fn new(table_provider: DfTableSourceProvider, session_state: SessionState) -> Self {
Self {
table_provider,
session_state,
}
}
pub async fn query_to_plan(&mut self, query: LogQuery) -> Result<LogicalPlan> {
@@ -100,6 +106,54 @@ impl LogQueryPlanner {
)
.context(DataFusionPlanningSnafu)?;
// Apply log expressions
for expr in &query.exprs {
match expr {
LogExpr::AggrFunc {
name,
args,
by,
range: _range,
} => {
let schema = plan_builder.schema();
let (group_expr, aggr_exprs) = self.build_aggr_func(schema, name, args, by)?;
plan_builder = plan_builder
.aggregate([group_expr], aggr_exprs)
.context(DataFusionPlanningSnafu)?;
}
LogExpr::Filter { expr, filter } => {
let schema = plan_builder.schema();
let expr = self.log_expr_to_df_expr(expr, schema)?;
let col_name = expr.schema_name().to_string();
let filter = self.build_column_filter(&ColumnFilters {
column_name: col_name,
filters: vec![filter.clone()],
})?;
if let Some(filter) = filter {
plan_builder = plan_builder
.filter(filter)
.context(DataFusionPlanningSnafu)?;
}
}
LogExpr::ScalarFunc { name, args } => {
let schema = plan_builder.schema();
let expr = self.build_scalar_func(schema, name, args)?;
plan_builder = plan_builder
.project([expr])
.context(DataFusionPlanningSnafu)?;
}
LogExpr::NamedIdent(_) | LogExpr::PositionalIdent(_) => {
// nothing to do
}
_ => {
UnimplementedSnafu {
feature: "log expression",
}
.fail()?;
}
}
}
// Build the final plan
let plan = plan_builder.build().context(DataFusionPlanningSnafu)?;
@@ -199,6 +253,61 @@ impl LogQueryPlanner {
Ok(conjunction(exprs))
}
fn build_aggr_func(
&self,
schema: &DFSchema,
fn_name: &str,
args: &[LogExpr],
by: &[LogExpr],
) -> Result<(Expr, Vec<Expr>)> {
let aggr_fn = self
.session_state
.aggregate_functions()
.get(fn_name)
.context(UnknownAggregateFunctionSnafu {
name: fn_name.to_string(),
})?;
let args = args
.iter()
.map(|expr| self.log_expr_to_df_expr(expr, schema))
.try_collect::<Vec<_>>()?;
let group_exprs = by
.iter()
.map(|expr| self.log_expr_to_df_expr(expr, schema))
.try_collect::<Vec<_>>()?;
let aggr_expr = aggr_fn.call(args);
Ok((aggr_expr, group_exprs))
}
fn log_expr_to_df_expr(&self, expr: &LogExpr, schema: &DFSchema) -> Result<Expr> {
match expr {
LogExpr::NamedIdent(name) => Ok(col(name)),
LogExpr::PositionalIdent(index) => Ok(col(schema.field(*index).name())),
LogExpr::Literal(literal) => Ok(lit(ScalarValue::Utf8(Some(literal.clone())))),
_ => UnexpectedLogExprSnafu {
expr: expr.clone(),
expected: "named identifier, positional identifier, or literal",
}
.fail(),
}
}
fn build_scalar_func(&self, schema: &DFSchema, name: &str, args: &[LogExpr]) -> Result<Expr> {
let args = args
.iter()
.map(|expr| self.log_expr_to_df_expr(expr, schema))
.try_collect::<Vec<_>>()?;
let func = self.session_state.scalar_functions().get(name).context(
UnknownScalarFunctionSnafu {
name: name.to_string(),
},
)?;
let expr = func.call(args);
Ok(expr)
}
}
#[cfg(test)]
@@ -209,6 +318,7 @@ mod tests {
use catalog::RegisterTableRequest;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_query::test_util::DummyDecoder;
use datafusion::execution::SessionStateBuilder;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaRef};
use log_query::{ContentFilter, Context, Limit};
@@ -287,7 +397,8 @@ mod tests {
async fn test_query_to_plan() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let mut planner = LogQueryPlanner::new(table_provider);
let session_state = SessionStateBuilder::new().with_default_features().build();
let mut planner = LogQueryPlanner::new(table_provider, session_state);
let log_query = LogQuery {
table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
@@ -321,7 +432,8 @@ mod tests {
async fn test_build_time_filter() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let planner = LogQueryPlanner::new(table_provider);
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
let time_filter = TimeFilter {
start: Some("2021-01-01T00:00:00Z".to_string()),
@@ -348,7 +460,8 @@ mod tests {
async fn test_build_time_filter_without_end() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let planner = LogQueryPlanner::new(table_provider);
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
let time_filter = TimeFilter {
start: Some("2021-01-01T00:00:00Z".to_string()),
@@ -375,7 +488,8 @@ mod tests {
async fn test_build_column_filter() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let planner = LogQueryPlanner::new(table_provider);
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
let column_filter = ColumnFilters {
column_name: "message".to_string(),
@@ -401,7 +515,8 @@ mod tests {
async fn test_query_to_plan_with_only_skip() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let mut planner = LogQueryPlanner::new(table_provider);
let session_state = SessionStateBuilder::new().with_default_features().build();
let mut planner = LogQueryPlanner::new(table_provider, session_state);
let log_query = LogQuery {
table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
@@ -435,7 +550,8 @@ mod tests {
async fn test_query_to_plan_without_limit() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let mut planner = LogQueryPlanner::new(table_provider);
let session_state = SessionStateBuilder::new().with_default_features().build();
let mut planner = LogQueryPlanner::new(table_provider, session_state);
let log_query = LogQuery {
table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
@@ -473,11 +589,89 @@ mod tests {
assert_eq!(escape_like_pattern("te\\st"), "te\\\\st");
}
#[tokio::test]
async fn test_query_to_plan_with_aggr_func() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let mut planner = LogQueryPlanner::new(table_provider, session_state);
let log_query = LogQuery {
table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
time_filter: TimeFilter {
start: Some("2021-01-01T00:00:00Z".to_string()),
end: Some("2021-01-02T00:00:00Z".to_string()),
span: None,
},
filters: vec![],
limit: Limit {
skip: None,
fetch: Some(100),
},
context: Context::None,
columns: vec![],
exprs: vec![LogExpr::AggrFunc {
name: "count".to_string(),
args: vec![LogExpr::NamedIdent("message".to_string())],
by: vec![LogExpr::NamedIdent("host".to_string())],
range: None,
}],
};
let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Aggregate: groupBy=[[count(greptime.public.test_table.message)]], aggr=[[greptime.public.test_table.host]] [count(greptime.public.test_table.message):Int64, host:Utf8;N]\
\n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
#[tokio::test]
async fn test_query_to_plan_with_scalar_func() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let session_state = SessionStateBuilder::new().with_default_features().build();
let mut planner = LogQueryPlanner::new(table_provider, session_state);
let log_query = LogQuery {
table: TableName::new(DEFAULT_CATALOG_NAME, "public", "test_table"),
time_filter: TimeFilter {
start: Some("2021-01-01T00:00:00Z".to_string()),
end: Some("2021-01-02T00:00:00Z".to_string()),
span: None,
},
filters: vec![],
limit: Limit {
skip: None,
fetch: Some(100),
},
context: Context::None,
columns: vec![],
exprs: vec![LogExpr::ScalarFunc {
name: "date_trunc".to_string(),
args: vec![
LogExpr::NamedIdent("timestamp".to_string()),
LogExpr::Literal("day".to_string()),
],
}],
};
let plan = planner.query_to_plan(log_query).await.unwrap();
let expected = "Projection: date_trunc(greptime.public.test_table.timestamp, Utf8(\"day\")) [date_trunc(greptime.public.test_table.timestamp,Utf8(\"day\")):Timestamp(Nanosecond, None);N]\
\n Limit: skip=0, fetch=100 [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n Filter: greptime.public.test_table.timestamp >= Utf8(\"2021-01-01T00:00:00Z\") AND greptime.public.test_table.timestamp <= Utf8(\"2021-01-02T00:00:00Z\") [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]\
\n TableScan: greptime.public.test_table [message:Utf8, timestamp:Timestamp(Millisecond, None), host:Utf8;N]";
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
#[tokio::test]
async fn test_build_column_filter_between() {
let table_provider =
build_test_table_provider(&[("public".to_string(), "test_table".to_string())]).await;
let planner = LogQueryPlanner::new(table_provider);
let session_state = SessionStateBuilder::new().with_default_features().build();
let planner = LogQueryPlanner::new(table_provider, session_state);
let column_filter = ColumnFilters {
column_name: "message".to_string(),

View File

@@ -220,7 +220,7 @@ impl LogicalPlanner for DfLogicalPlanner {
.enable_ident_normalization,
);
let mut planner = LogQueryPlanner::new(table_provider);
let mut planner = LogQueryPlanner::new(table_provider, self.session_state.clone());
planner
.query_to_plan(query)
.await