feat: flow full aggr only trigger on new data (#6880)

* fix: flow full aggr only trigger on new data

Signed-off-by: discord9 <discord9@163.com>

* chore: better debug msg

Signed-off-by: discord9 <discord9@163.com>

* refactor: per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-09-03 16:45:24 +08:00
committed by GitHub
parent ee4b8708d7
commit a65db7121e
4 changed files with 102 additions and 38 deletions

View File

@@ -156,9 +156,11 @@ impl BatchingEngine {
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
let src_table_names = &task.config.source_table_names;
let mut all_dirty_windows = HashSet::new();
let mut is_dirty = false;
for src_table_name in src_table_names {
if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
let Some(expr) = &task.config.time_window_expr else {
is_dirty = true;
continue;
};
for timestamp in timestamps {
@@ -173,6 +175,9 @@ impl BatchingEngine {
}
}
let mut state = task.state.write().unwrap();
if is_dirty {
state.dirty_time_windows.set_dirty();
}
let flow_id_label = task.config.flow_id.to_string();
for timestamp in all_dirty_windows {
state.dirty_time_windows.add_window(timestamp, None);
@@ -274,9 +279,12 @@ impl BatchingEngine {
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
let src_table_names = &task.config.source_table_names;
let mut is_dirty = false;
for src_table_name in src_table_names {
if let Some(entry) = group_by_table_name.get(src_table_name) {
let Some(expr) = &task.config.time_window_expr else {
is_dirty = true;
continue;
};
let involved_time_windows = expr.handle_rows(entry.clone()).await?;
@@ -286,6 +294,10 @@ impl BatchingEngine {
.add_lower_bounds(involved_time_windows.into_iter());
}
}
if is_dirty {
task.state.write().unwrap().dirty_time_windows.set_dirty();
}
Ok(())
});
handles.push(handle);

View File

@@ -203,11 +203,21 @@ impl DirtyTimeWindows {
self.windows.clear();
}
/// Set windows to be dirty, only useful for full aggr without time window
/// to mark some new data is inserted
pub fn set_dirty(&mut self) {
self.windows.insert(Timestamp::new_second(0), None);
}
/// Number of dirty windows.
pub fn len(&self) -> usize {
self.windows.len()
}
pub fn is_empty(&self) -> bool {
self.windows.is_empty()
}
/// Get the effective count of time windows, which is the number of time windows that can be
/// used for query, compute from total time window range divided by `window_size`.
pub fn effective_count(&self, window_size: &Duration) -> usize {

View File

@@ -48,8 +48,8 @@ use crate::batching_mode::frontend_client::FrontendClient;
use crate::batching_mode::state::{FilterExprInfo, TaskState};
use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::utils::{
get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter,
FindGroupByFinalName,
gen_plan_with_matching_schema, get_table_info_df_schema, sql_to_df_plan, AddFilterRewriter,
ColumnMatcherRewriter, FindGroupByFinalName,
};
use crate::batching_mode::BatchingModeOptions;
use crate::df_optimizer::apply_df_optimizer;
@@ -618,42 +618,63 @@ impl BatchingTask {
.map(|expr| expr.eval(low_bound))
.transpose()?;
let (Some((Some(l), Some(u))), QueryType::Sql) =
(expire_time_window_bound, &self.config.query_type)
else {
// either no time window or not a sql query, then just use the original query
// use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
debug!(
"Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.config.flow_id
);
// clean dirty time window too, this could be from create flow's check_execute
self.state.write().unwrap().dirty_time_windows.clean();
let (expire_lower_bound, expire_upper_bound) =
match (expire_time_window_bound, &self.config.query_type) {
(Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
(None, QueryType::Sql) => {
// if it's sql query and no time window lower/upper bound is found, just return the original query(with auto columns)
// use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
debug!(
"Flow id = {:?}, no time window, using the same query",
self.config.flow_id
);
// clean dirty time window too, this could be from create flow's check_execute
let is_dirty = !self.state.read().unwrap().dirty_time_windows.is_empty();
self.state.write().unwrap().dirty_time_windows.clean();
// TODO(discord9): not add auto column for tql query?
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
if !is_dirty {
// no dirty data, hence no need to update
debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
return Ok(None);
}
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false)
.await?;
let plan = gen_plan_with_matching_schema(
&self.config.query,
query_ctx,
engine,
sink_table_schema.clone(),
)
.await?;
let plan = plan
.clone()
.rewrite(&mut add_auto_column)
.with_context(|_| DatafusionSnafu {
context: format!("Failed to rewrite plan:\n {}\n", plan),
})?
.data;
return Ok(Some(PlanInfo { plan, filter: None }));
}
_ => {
// clean for tql have no use for time window
self.state.write().unwrap().dirty_time_windows.clean();
// since no time window lower/upper bound is found, just return the original query(with auto columns)
return Ok(Some(PlanInfo { plan, filter: None }));
};
let plan = gen_plan_with_matching_schema(
&self.config.query,
query_ctx,
engine,
sink_table_schema.clone(),
)
.await?;
return Ok(Some(PlanInfo { plan, filter: None }));
}
};
debug!(
"Flow id = {:?}, found time window: precise_lower_bound={:?}, precise_upper_bound={:?} with dirty time windows: {:?}",
self.config.flow_id, l, u, self.state.read().unwrap().dirty_time_windows
self.config.flow_id, expire_lower_bound, expire_upper_bound, self.state.read().unwrap().dirty_time_windows
);
let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
reason: format!("Can't get window size from {u:?} - {l:?}"),
})?;
let window_size = expire_upper_bound
.sub(&expire_lower_bound)
.with_context(|| UnexpectedSnafu {
reason: format!(
"Can't get window size from {expire_upper_bound:?} - {expire_lower_bound:?}"
),
})?;
let col_name = self
.config
.time_window_expr
@@ -673,7 +694,7 @@ impl BatchingTask {
.dirty_time_windows
.gen_filter_exprs(
&col_name,
Some(l),
Some(expire_lower_bound),
window_size,
max_window_cnt
.unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query),
@@ -701,7 +722,7 @@ impl BatchingTask {
};
let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
let mut add_auto_column = ColumnMatcherRewriter::new(sink_table_schema.clone());
let plan =
sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.config.query, false).await?;

View File

@@ -24,7 +24,7 @@ use datafusion::error::Result as DfResult;
use datafusion::logical_expr::Expr;
use datafusion::sql::unparser::Unparser;
use datafusion_common::tree_node::{
Transformed, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor,
Transformed, TreeNode as _, TreeNodeRecursion, TreeNodeRewriter, TreeNodeVisitor,
};
use datafusion_common::{DFSchema, DataFusionError, ScalarValue};
use datafusion_expr::{Distinct, LogicalPlan, Projection};
@@ -135,6 +135,27 @@ pub async fn sql_to_df_plan(
Ok(plan)
}
/// Generate a plan that matches the schema of the sink table
/// from given sql by alias and adding auto columns
pub(crate) async fn gen_plan_with_matching_schema(
sql: &str,
query_ctx: QueryContextRef,
engine: QueryEngineRef,
sink_table_schema: SchemaRef,
) -> Result<LogicalPlan, Error> {
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), sql, false).await?;
let mut add_auto_column = ColumnMatcherRewriter::new(sink_table_schema);
let plan = plan
.clone()
.rewrite(&mut add_auto_column)
.with_context(|_| DatafusionSnafu {
context: format!("Failed to rewrite plan:\n {}\n", plan),
})?
.data;
Ok(plan)
}
pub fn df_plan_to_sql(plan: &LogicalPlan) -> Result<String, Error> {
/// A dialect that forces identifiers to be quoted when have uppercase
struct ForceQuoteIdentifiers;
@@ -239,19 +260,19 @@ impl TreeNodeVisitor<'_> for FindGroupByFinalName {
}
}
/// Add to the final select columns like `update_at`
/// Optionally add to the final select columns like `update_at` if the sink table has such column
/// (which doesn't necessary need to have exact name just need to be a extra timestamp column)
/// and `__ts_placeholder`(this column need to have exact this name and be a timestamp)
/// with values like `now()` and `0`
///
/// it also give existing columns alias to column in sink table if needed
#[derive(Debug)]
pub struct AddAutoColumnRewriter {
pub struct ColumnMatcherRewriter {
pub schema: SchemaRef,
pub is_rewritten: bool,
}
impl AddAutoColumnRewriter {
impl ColumnMatcherRewriter {
pub fn new(schema: SchemaRef) -> Self {
Self {
schema,
@@ -348,7 +369,7 @@ impl AddAutoColumnRewriter {
}
}
impl TreeNodeRewriter for AddAutoColumnRewriter {
impl TreeNodeRewriter for ColumnMatcherRewriter {
type Node = LogicalPlan;
fn f_down(&mut self, mut node: Self::Node) -> DfResult<Transformed<Self::Node>> {
if self.is_rewritten {
@@ -696,7 +717,7 @@ mod test {
let ctx = QueryContext::arc();
for (before, after, column_schemas) in testcases {
let schema = Arc::new(Schema::new(column_schemas));
let mut add_auto_column_rewriter = AddAutoColumnRewriter::new(schema);
let mut add_auto_column_rewriter = ColumnMatcherRewriter::new(schema);
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), before, false)
.await