mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-01 12:50:40 +00:00
chore: add and/or for log query (#6681)
* chore: add and/or for log query * chore: remove impl From<Vec<ColumnFilters>> for Filters
This commit is contained in:
@@ -43,7 +43,7 @@ pub struct LogQuery {
|
||||
/// Conjunction of filters to apply for the raw logs.
|
||||
///
|
||||
/// Filters here can apply to any LogExpr.
|
||||
pub filters: Vec<ColumnFilters>,
|
||||
pub filters: Filters,
|
||||
/// Adjacent lines to return. Applies to all filters above.
|
||||
///
|
||||
/// TODO(ruihang): Do we need per-filter context?
|
||||
@@ -54,6 +54,44 @@ pub struct LogQuery {
|
||||
pub exprs: Vec<LogExpr>,
|
||||
}
|
||||
|
||||
/// Nested filter structure that supports and/or relationships
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum Filters {
|
||||
/// Single filter condition
|
||||
Single(ColumnFilters),
|
||||
/// Multiple filters with AND relationship
|
||||
And(Vec<Filters>),
|
||||
/// Multiple filters with OR relationship
|
||||
Or(Vec<Filters>),
|
||||
Not(Box<Filters>),
|
||||
}
|
||||
|
||||
impl Default for Filters {
|
||||
fn default() -> Self {
|
||||
Filters::And(vec![])
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ColumnFilters> for Filters {
|
||||
fn from(filter: ColumnFilters) -> Self {
|
||||
Filters::Single(filter)
|
||||
}
|
||||
}
|
||||
|
||||
impl Filters {
|
||||
pub fn and<T: Into<Filters>>(other: Vec<T>) -> Filters {
|
||||
Filters::And(other.into_iter().map(Into::into).collect())
|
||||
}
|
||||
|
||||
pub fn or<T: Into<Filters>>(other: Vec<T>) -> Filters {
|
||||
Filters::Or(other.into_iter().map(Into::into).collect())
|
||||
}
|
||||
|
||||
pub fn single(filter: ColumnFilters) -> Filters {
|
||||
Filters::Single(filter)
|
||||
}
|
||||
}
|
||||
|
||||
/// Expression to calculate on log after filtering.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum LogExpr {
|
||||
@@ -99,7 +137,7 @@ impl Default for LogQuery {
|
||||
Self {
|
||||
table: TableName::new("", "", ""),
|
||||
time_filter: Default::default(),
|
||||
filters: vec![],
|
||||
filters: Filters::And(vec![]),
|
||||
limit: Limit::default(),
|
||||
context: Default::default(),
|
||||
columns: vec![],
|
||||
|
||||
@@ -18,8 +18,8 @@ use common_function::utils::escape_like_pattern;
|
||||
use datafusion::datasource::DefaultTableSource;
|
||||
use datafusion::execution::SessionState;
|
||||
use datafusion_common::{DFSchema, ScalarValue};
|
||||
use datafusion_expr::utils::conjunction;
|
||||
use datafusion_expr::{col, lit, BinaryExpr, Expr, LogicalPlan, LogicalPlanBuilder, Operator};
|
||||
use datafusion_expr::utils::{conjunction, disjunction};
|
||||
use datafusion_expr::{col, lit, not, BinaryExpr, Expr, LogicalPlan, LogicalPlanBuilder, Operator};
|
||||
use datafusion_sql::TableReference;
|
||||
use datatypes::schema::Schema;
|
||||
use log_query::{BinaryOperator, LogExpr, LogQuery, TimeFilter};
|
||||
@@ -77,11 +77,8 @@ impl LogQueryPlanner {
|
||||
// Time filter
|
||||
filters.push(self.build_time_filter(&query.time_filter, &schema)?);
|
||||
|
||||
// Column filters
|
||||
for filter in &query.filters {
|
||||
if let Some(expr) = self.build_column_filter(filter, df_schema.as_arrow())? {
|
||||
filters.push(expr);
|
||||
}
|
||||
if let Some(filters_expr) = self.build_filters(&query.filters, df_schema.as_arrow())? {
|
||||
filters.push(filters_expr);
|
||||
}
|
||||
|
||||
// Apply filters
|
||||
@@ -139,6 +136,48 @@ impl LogQueryPlanner {
|
||||
|
||||
Ok(expr)
|
||||
}
|
||||
//disjunction
|
||||
fn build_filters(
|
||||
&self,
|
||||
filters: &log_query::Filters,
|
||||
schema: &ArrowSchema,
|
||||
) -> Result<Option<Expr>> {
|
||||
match filters {
|
||||
log_query::Filters::And(filters) => {
|
||||
let exprs = filters
|
||||
.iter()
|
||||
.filter_map(|filter| self.build_filters(filter, schema).transpose())
|
||||
.try_collect::<Vec<_>>()?;
|
||||
if exprs.is_empty() {
|
||||
Ok(None)
|
||||
} else {
|
||||
Ok(conjunction(exprs))
|
||||
}
|
||||
}
|
||||
log_query::Filters::Or(filters) => {
|
||||
let exprs = filters
|
||||
.iter()
|
||||
.filter_map(|filter| self.build_filters(filter, schema).transpose())
|
||||
.try_collect::<Vec<_>>()?;
|
||||
if exprs.is_empty() {
|
||||
Ok(None)
|
||||
} else {
|
||||
Ok(disjunction(exprs))
|
||||
}
|
||||
}
|
||||
log_query::Filters::Not(filter) => {
|
||||
if let Some(expr) = self.build_filters(filter, schema)? {
|
||||
Ok(Some(not(expr)))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
log_query::Filters::Single(column_filters) => {
|
||||
// Build a single column filter
|
||||
self.build_column_filter(column_filters, schema)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds filter expression from ColumnFilters (new structure with expr + filters)
|
||||
fn build_column_filter(
|
||||
@@ -455,7 +494,9 @@ mod tests {
|
||||
use datafusion::execution::SessionStateBuilder;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, SchemaRef};
|
||||
use log_query::{ColumnFilters, ConjunctionOperator, ContentFilter, Context, Limit, LogExpr};
|
||||
use log_query::{
|
||||
ColumnFilters, ConjunctionOperator, ContentFilter, Context, Filters, Limit, LogExpr,
|
||||
};
|
||||
use session::context::QueryContext;
|
||||
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
|
||||
use table::table_name::TableName;
|
||||
@@ -546,10 +587,10 @@ mod tests {
|
||||
end: Some("2021-01-02T00:00:00Z".to_string()),
|
||||
span: None,
|
||||
},
|
||||
filters: vec![ColumnFilters {
|
||||
filters: Filters::Single(ColumnFilters {
|
||||
expr: Box::new(LogExpr::NamedIdent("message".to_string())),
|
||||
filters: vec![ContentFilter::Contains("error".to_string())],
|
||||
}],
|
||||
}),
|
||||
limit: Limit {
|
||||
skip: None,
|
||||
fetch: Some(100),
|
||||
@@ -667,10 +708,10 @@ mod tests {
|
||||
end: Some("2021-01-02T00:00:00Z".to_string()),
|
||||
span: None,
|
||||
},
|
||||
filters: vec![ColumnFilters {
|
||||
filters: Filters::Single(ColumnFilters {
|
||||
expr: Box::new(LogExpr::NamedIdent("message".to_string())),
|
||||
filters: vec![ContentFilter::Contains("error".to_string())],
|
||||
}],
|
||||
}),
|
||||
limit: Limit {
|
||||
skip: Some(10),
|
||||
fetch: None,
|
||||
@@ -702,10 +743,10 @@ mod tests {
|
||||
end: Some("2021-01-02T00:00:00Z".to_string()),
|
||||
span: None,
|
||||
},
|
||||
filters: vec![ColumnFilters {
|
||||
filters: Filters::Single(ColumnFilters {
|
||||
expr: Box::new(LogExpr::NamedIdent("message".to_string())),
|
||||
filters: vec![ContentFilter::Contains("error".to_string())],
|
||||
}],
|
||||
}),
|
||||
limit: Limit {
|
||||
skip: None,
|
||||
fetch: None,
|
||||
@@ -745,7 +786,7 @@ mod tests {
|
||||
end: Some("2021-01-02T00:00:00Z".to_string()),
|
||||
span: None,
|
||||
},
|
||||
filters: vec![],
|
||||
filters: Default::default(),
|
||||
limit: Limit {
|
||||
skip: None,
|
||||
fetch: Some(100),
|
||||
@@ -784,7 +825,7 @@ mod tests {
|
||||
end: Some("2021-01-02T00:00:00Z".to_string()),
|
||||
span: None,
|
||||
},
|
||||
filters: vec![],
|
||||
filters: Default::default(),
|
||||
limit: Limit {
|
||||
skip: None,
|
||||
fetch: Some(100),
|
||||
@@ -855,7 +896,7 @@ mod tests {
|
||||
end: Some("2021-01-02T00:00:00Z".to_string()),
|
||||
span: None,
|
||||
},
|
||||
filters: vec![],
|
||||
filters: Default::default(),
|
||||
limit: Limit {
|
||||
skip: Some(0),
|
||||
fetch: None,
|
||||
|
||||
@@ -5009,7 +5009,7 @@ pub async fn test_log_query(store_type: StorageType) {
|
||||
fetch: Some(1),
|
||||
},
|
||||
columns: vec!["ts".to_string(), "message".to_string()],
|
||||
filters: vec![],
|
||||
filters: Default::default(),
|
||||
context: Context::None,
|
||||
exprs: vec![],
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user