feat: provide plan info when flow exec (#6783)

* feat: provide  plan info when flow exec

Signed-off-by: discord9 <discord9@163.com>

* backoff?

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-08-22 11:47:26 +08:00
committed by GitHub
parent eaceae4c91
commit 2b43ff30b6
2 changed files with 113 additions and 37 deletions

View File

@@ -192,6 +192,12 @@ impl DirtyTimeWindows {
self.windows.insert(start, end);
}
pub fn add_windows(&mut self, time_ranges: Vec<(Timestamp, Timestamp)>) {
for (start, end) in time_ranges {
self.windows.insert(start, Some(end));
}
}
/// Clean all dirty time windows, useful when can't found time window expr
pub fn clean(&mut self) {
self.windows.clear();
@@ -242,7 +248,7 @@ impl DirtyTimeWindows {
window_cnt: usize,
flow_id: FlowId,
task_ctx: Option<&BatchingTask>,
) -> Result<Option<datafusion_expr::Expr>, Error> {
) -> Result<Option<FilterExprInfo>, Error> {
ensure!(
window_size.num_seconds() > 0,
UnexpectedSnafu {
@@ -372,7 +378,15 @@ impl DirtyTimeWindows {
.with_label_values(&[flow_id.to_string().as_str()])
.observe(stalled_time_range.num_seconds() as f64);
let std_window_size = window_size.to_std().map_err(|e| {
InternalSnafu {
reason: e.to_string(),
}
.build()
})?;
let mut expr_lst = vec![];
let mut time_ranges = vec![];
for (start, end) in to_be_query.into_iter() {
// align using time window exprs
let (start, end) = if let Some(ctx) = task_ctx {
@@ -386,26 +400,31 @@ impl DirtyTimeWindows {
} else {
(start, end)
};
let end = end.unwrap_or(start.add_duration(std_window_size).context(TimeSnafu)?);
time_ranges.push((start, end));
debug!(
"Time window start: {:?}, end: {:?}",
start.to_iso8601_string(),
end.map(|t| t.to_iso8601_string())
end.to_iso8601_string()
);
use datafusion_expr::{col, lit};
let lower = to_df_literal(start)?;
let upper = end.map(to_df_literal).transpose()?;
let expr = if let Some(upper) = upper {
col(col_name)
.gt_eq(lit(lower))
.and(col(col_name).lt(lit(upper)))
} else {
col(col_name).gt_eq(lit(lower))
};
let upper = to_df_literal(end)?;
let expr = col(col_name)
.gt_eq(lit(lower))
.and(col(col_name).lt(lit(upper)));
expr_lst.push(expr);
}
let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
Ok(expr)
let ret = expr.map(|expr| FilterExprInfo {
expr,
col_name: col_name.to_string(),
time_ranges,
window_size,
});
Ok(ret)
}
fn align_time_window(
@@ -519,6 +538,25 @@ enum ExecState {
Executing,
}
/// Filter Expression's information
#[derive(Debug, Clone)]
pub struct FilterExprInfo {
pub expr: datafusion_expr::Expr,
pub col_name: String,
pub time_ranges: Vec<(Timestamp, Timestamp)>,
pub window_size: chrono::Duration,
}
impl FilterExprInfo {
pub fn total_window_length(&self) -> chrono::Duration {
self.time_ranges
.iter()
.fold(chrono::Duration::zero(), |acc, (start, end)| {
acc + end.sub(start).unwrap_or(chrono::Duration::zero())
})
}
}
#[cfg(test)]
mod test {
use pretty_assertions::assert_eq;
@@ -702,7 +740,8 @@ mod test {
0,
None,
)
.unwrap();
.unwrap()
.map(|e| e.expr);
let unparser = datafusion::sql::unparser::Unparser::default();
let to_sql = filter_expr

View File

@@ -46,7 +46,7 @@ use tokio::time::Instant;
use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
use crate::batching_mode::frontend_client::FrontendClient;
use crate::batching_mode::state::TaskState;
use crate::batching_mode::state::{FilterExprInfo, TaskState};
use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::utils::{
get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter,
@@ -130,6 +130,11 @@ pub struct TaskArgs<'a> {
pub batch_opts: Arc<BatchingModeOptions>,
}
pub struct PlanInfo {
pub plan: LogicalPlan,
pub filter: Option<FilterExprInfo>,
}
impl BatchingTask {
#[allow(clippy::too_many_arguments)]
pub fn try_new(
@@ -232,8 +237,9 @@ impl BatchingTask {
max_window_cnt: Option<usize>,
) -> Result<Option<(u32, Duration)>, Error> {
if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? {
debug!("Generate new query: {}", new_query);
self.execute_logical_plan(frontend_client, &new_query).await
debug!("Generate new query: {}", new_query.plan);
self.execute_logical_plan(frontend_client, &new_query.plan)
.await
} else {
debug!("Generate no query");
Ok(None)
@@ -244,7 +250,7 @@ impl BatchingTask {
&self,
engine: &QueryEngineRef,
max_window_cnt: Option<usize>,
) -> Result<Option<LogicalPlan>, Error> {
) -> Result<Option<PlanInfo>, Error> {
let (table, df_schema) = get_table_info_df_schema(
self.config.catalog_manager.clone(),
self.config.sink_table_name.clone(),
@@ -259,7 +265,7 @@ impl BatchingTask {
)
.await?;
let insert_into = if let Some((new_query, _column_cnt)) = new_query {
let insert_into_info = if let Some(new_query) = new_query {
// first check if all columns in input query exists in sink table
// since insert into ref to names in record batch generate by given query
let table_columns = df_schema
@@ -267,7 +273,7 @@ impl BatchingTask {
.into_iter()
.map(|c| c.name)
.collect::<BTreeSet<_>>();
for column in new_query.schema().columns() {
for column in new_query.plan.schema().columns() {
ensure!(
table_columns.contains(column.name()),
InvalidQuerySnafu {
@@ -283,7 +289,7 @@ impl BatchingTask {
let table_source = Arc::new(DefaultTableSource::new(table_provider));
// update_at& time index placeholder (if exists) should have default value
LogicalPlan::Dml(DmlStatement::new(
let plan = LogicalPlan::Dml(DmlStatement::new(
datafusion_common::TableReference::Full {
catalog: self.config.sink_table_name[0].clone().into(),
schema: self.config.sink_table_name[1].clone().into(),
@@ -291,15 +297,26 @@ impl BatchingTask {
},
table_source,
WriteOp::Insert(datafusion_expr::dml::InsertOp::Append),
Arc::new(new_query),
))
Arc::new(new_query.plan),
));
PlanInfo {
plan,
filter: new_query.filter,
}
} else {
return Ok(None);
};
let insert_into = insert_into.recompute_schema().context(DatafusionSnafu {
context: "Failed to recompute schema",
})?;
Ok(Some(insert_into))
let insert_into = insert_into_info
.plan
.recompute_schema()
.context(DatafusionSnafu {
context: "Failed to recompute schema",
})?;
Ok(Some(PlanInfo {
plan: insert_into,
filter: insert_into_info.filter,
}))
}
pub async fn create_table(
@@ -434,6 +451,7 @@ impl BatchingTask {
frontend_client: Arc<FrontendClient>,
) {
let flow_id_str = self.config.flow_id.to_string();
let mut max_window_cnt = None;
loop {
// first check if shutdown signal is received
// if so, break the loop
@@ -457,7 +475,7 @@ impl BatchingTask {
let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
let new_query = match self.gen_insert_plan(&engine, None).await {
let new_query = match self.gen_insert_plan(&engine, max_window_cnt).await {
Ok(new_query) => new_query,
Err(err) => {
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
@@ -468,7 +486,8 @@ impl BatchingTask {
};
let res = if let Some(new_query) = &new_query {
self.execute_logical_plan(&frontend_client, new_query).await
self.execute_logical_plan(&frontend_client, &new_query.plan)
.await
} else {
Ok(None)
};
@@ -476,6 +495,10 @@ impl BatchingTask {
match res {
// normal execute, sleep for some time before doing next query
Ok(Some(_)) => {
// can increase max_window_cnt to query more windows next time
max_window_cnt = max_window_cnt.map(|cnt| {
(cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query)
});
let sleep_until = {
let state = self.state.write().unwrap();
@@ -511,7 +534,16 @@ impl BatchingTask {
.inc();
match new_query {
Some(query) => {
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan);
// Re-add dirty windows back since query failed
self.state.write().unwrap().dirty_time_windows.add_windows(
query.filter.map(|f| f.time_ranges).unwrap_or_default(),
);
// TODO(discord9): add some backoff here? half the query time window or what
// backoff meaning use smaller `max_window_cnt` for next query
// since last query failed, we should not try to query too many windows
max_window_cnt = Some(1);
}
None => {
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
@@ -546,7 +578,7 @@ impl BatchingTask {
engine: QueryEngineRef,
sink_table_schema: &Arc<Schema>,
max_window_cnt: Option<usize>,
) -> Result<Option<(LogicalPlan, usize)>, Error> {
) -> Result<Option<PlanInfo>, Error> {
let query_ctx = self.state.read().unwrap().query_ctx.clone();
let start = SystemTime::now();
let since_the_epoch = start
@@ -559,7 +591,6 @@ impl BatchingTask {
.unwrap_or(u64::MIN);
let low_bound = Timestamp::new_second(low_bound as i64);
let schema_len = self.config.output_schema.fields().len();
let expire_time_window_bound = self
.config
@@ -592,10 +623,9 @@ impl BatchingTask {
context: format!("Failed to rewrite plan:\n {}\n", plan),
})?
.data;
let schema_len = plan.schema().fields().len();
// since no time window lower/upper bound is found, just return the original query(with auto columns)
return Ok(Some((plan, schema_len)));
return Ok(Some(PlanInfo { plan, filter: None }));
};
debug!(
@@ -636,9 +666,11 @@ impl BatchingTask {
"Flow id={:?}, Generated filter expr: {:?}",
self.config.flow_id,
expr.as_ref()
.map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu {
context: format!("Failed to generate filter expr from {expr:?}"),
}))
.map(
|expr| expr_to_sql(&expr.expr).with_context(|_| DatafusionSnafu {
context: format!("Failed to generate filter expr from {expr:?}"),
})
)
.transpose()?
.map(|s| s.to_string())
);
@@ -649,7 +681,7 @@ impl BatchingTask {
return Ok(None);
};
let mut add_filter = AddFilterRewriter::new(expr);
let mut add_filter = AddFilterRewriter::new(expr.expr.clone());
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
let plan =
@@ -665,7 +697,12 @@ impl BatchingTask {
// only apply optimize after complex rewrite is done
let new_plan = apply_df_optimizer(rewrite).await?;
Ok(Some((new_plan, schema_len)))
let info = PlanInfo {
plan: new_plan.clone(),
filter: Some(expr),
};
Ok(Some(info))
}
}