feat: retry with smaller time when failed query

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-06-12 21:10:55 +08:00
parent 11e4a8abb3
commit 5dd6f92b60
3 changed files with 113 additions and 39 deletions

View File

@@ -59,6 +59,11 @@ pub struct TaskState {
pub(crate) min_run_interval: Option<u64>,
/// max filter number per query
pub(crate) max_filter_num: Option<usize>,
/// Current filter count, will grow when query succeeds(capped by `max_filter_num`),
/// and reset to 1 when query fails.
///
/// This is useful for controlling resource usage
pub(crate) cur_filter_cnt: usize,
}
impl TaskState {
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
@@ -72,6 +77,7 @@ impl TaskState {
task_handle: None,
min_run_interval: None,
max_filter_num: None,
cur_filter_cnt: 1,
}
}
@@ -140,6 +146,17 @@ pub struct DirtyTimeWindows {
windows: BTreeMap<Timestamp, Option<Timestamp>>,
}
/// Time windows that are being worked on, which are not dirty but are currently being processed
#[derive(Debug, Clone, Default)]
pub struct WorkingTimeWindows {
/// windows's `start -> end` and non-overlapping
/// `end` is exclusive(and optional)
pub windows: BTreeMap<Timestamp, Option<Timestamp>>,
/// Filter expression for the time windows
/// This is used to filter the data in the time windows.
pub filter: Option<datafusion_expr::Expr>,
}
impl DirtyTimeWindows {
/// Time window merge distance
///
@@ -177,6 +194,12 @@ impl DirtyTimeWindows {
self.windows.insert(start, end);
}
pub fn add_windows(&mut self, windows: BTreeMap<Timestamp, Option<Timestamp>>) {
for (start, end) in windows {
self.windows.insert(start, end);
}
}
/// Clean all dirty time windows, useful when can't found time window expr
pub fn clean(&mut self) {
self.windows.clear();
@@ -195,7 +218,7 @@ impl DirtyTimeWindows {
window_cnt: usize,
flow_id: FlowId,
task_ctx: Option<&BatchingTask>,
) -> Result<Option<datafusion_expr::Expr>, Error> {
) -> Result<WorkingTimeWindows, Error> {
debug!(
"expire_lower_bound: {:?}, window_size: {:?}",
expire_lower_bound.map(|t| t.to_iso8601_string()),
@@ -318,7 +341,7 @@ impl DirtyTimeWindows {
.observe(stalled_time_range.num_seconds() as f64);
let mut expr_lst = vec![];
for (start, end) in to_be_query.into_iter() {
for (start, end) in to_be_query.clone().into_iter() {
// align using time window exprs
let (start, end) = if let Some(ctx) = task_ctx {
let Some(time_window_expr) = &ctx.config.time_window_expr else {
@@ -350,7 +373,12 @@ impl DirtyTimeWindows {
expr_lst.push(expr);
}
let expr = expr_lst.into_iter().reduce(|a, b| a.or(b));
Ok(expr)
let working = WorkingTimeWindows {
windows: to_be_query,
filter: expr,
};
Ok(working)
}
fn align_time_window(
@@ -646,7 +674,8 @@ mod test {
0,
None,
)
.unwrap();
.unwrap()
.filter;
let unparser = datafusion::sql::unparser::Unparser::default();
let to_sql = filter_expr

View File

@@ -46,7 +46,7 @@ use tokio::time::Instant;
use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
use crate::batching_mode::frontend_client::FrontendClient;
use crate::batching_mode::state::{DirtyTimeWindows, TaskState};
use crate::batching_mode::state::{DirtyTimeWindows, TaskState, WorkingTimeWindows};
use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::utils::{
get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter,
@@ -61,9 +61,9 @@ use crate::error::{
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
METRIC_FLOW_ROWS,
METRIC_FLOW_BATCHING_ENGINE_CUR_WND_CNT, METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT,
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT, METRIC_FLOW_ROWS,
};
use crate::{Error, FlowId};
@@ -110,6 +110,13 @@ enum QueryType {
Sql,
}
/// A plan with working time windows, used to track the time windows that are currently being processed
#[derive(Debug, Clone)]
pub struct PlanWithWindow {
pub plan: Option<LogicalPlan>,
pub working_windows: WorkingTimeWindows,
}
#[derive(Clone)]
pub struct BatchingTask {
pub config: Arc<TaskConfig>,
@@ -218,30 +225,29 @@ impl BatchingTask {
engine: &QueryEngineRef,
frontend_client: &Arc<FrontendClient>,
) -> Result<Option<(u32, Duration)>, Error> {
if let Some(new_query) = self.gen_insert_plan(engine).await? {
if let Some(new_query) = self.gen_insert_plan(engine).await?.plan {
debug!("Generate new query: {}", new_query);
self.execute_logical_plan(frontend_client, &new_query).await
self.execute_logical_plan(frontend_client, &new_query)
.await
.map(Some)
} else {
debug!("Generate no query");
Ok(None)
}
}
pub async fn gen_insert_plan(
&self,
engine: &QueryEngineRef,
) -> Result<Option<LogicalPlan>, Error> {
pub async fn gen_insert_plan(&self, engine: &QueryEngineRef) -> Result<PlanWithWindow, Error> {
let (table, df_schema) = get_table_info_df_schema(
self.config.catalog_manager.clone(),
self.config.sink_table_name.clone(),
)
.await?;
let new_query = self
let new_query_info = self
.gen_query_with_time_window(engine.clone(), &table.meta.schema)
.await?;
let insert_into = if let Some((new_query, _column_cnt)) = new_query {
let insert_into = if let Some(new_query) = new_query_info.plan {
// first check if all columns in input query exists in sink table
// since insert into ref to names in record batch generate by given query
let table_columns = df_schema
@@ -272,12 +278,15 @@ impl BatchingTask {
Arc::new(new_query),
))
} else {
return Ok(None);
return Ok(new_query_info);
};
let insert_into = insert_into.recompute_schema().context(DatafusionSnafu {
context: "Failed to recompute schema",
})?;
Ok(Some(insert_into))
Ok(PlanWithWindow {
plan: Some(insert_into),
working_windows: new_query_info.working_windows,
})
}
pub async fn create_table(
@@ -297,7 +306,7 @@ impl BatchingTask {
&self,
frontend_client: &Arc<FrontendClient>,
plan: &LogicalPlan,
) -> Result<Option<(u32, Duration)>, Error> {
) -> Result<(u32, Duration), Error> {
let instant = Instant::now();
let flow_id = self.config.flow_id;
@@ -410,7 +419,7 @@ impl BatchingTask {
let res = res?;
Ok(Some((res, elapsed)))
Ok((res, elapsed))
}
/// start executing query in a loop, break when receive shutdown signal
@@ -443,7 +452,7 @@ impl BatchingTask {
.with_label_values(&[&flow_id_str])
.inc();
let new_query = match self.gen_insert_plan(&engine).await {
let new_query_info = match self.gen_insert_plan(&engine).await {
Ok(new_query) => new_query,
Err(err) => {
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
@@ -453,8 +462,10 @@ impl BatchingTask {
}
};
let res = if let Some(new_query) = &new_query {
self.execute_logical_plan(&frontend_client, new_query).await
let res = if let Some(new_query) = &new_query_info.plan {
self.execute_logical_plan(&frontend_client, new_query)
.await
.map(Some)
} else {
Ok(None)
};
@@ -463,7 +474,17 @@ impl BatchingTask {
// normal execute, sleep for some time before doing next query
Ok(Some(_)) => {
let sleep_until = {
let state = self.state.write().unwrap();
let mut state = self.state.write().unwrap();
// double cur_filter_cnt
state.cur_filter_cnt = state.cur_filter_cnt.saturating_mul(2).min(
state
.max_filter_num
.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM),
);
METRIC_FLOW_BATCHING_ENGINE_CUR_WND_CNT
.with_label_values(&[&flow_id_str])
.set(state.cur_filter_cnt as i64);
state.get_next_start_query_time(
self.config.flow_id,
@@ -491,7 +512,7 @@ impl BatchingTask {
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
.with_label_values(&[&flow_id_str])
.inc();
match new_query {
match new_query_info.plan {
Some(query) => {
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)
}
@@ -499,6 +520,17 @@ impl BatchingTask {
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id)
}
}
{
// return working windows to dirty windows, and reset current filter cnt so next time we generate query only generate a small query
let mut state = self.state.write().unwrap();
state
.dirty_time_windows
.add_windows(new_query_info.working_windows.windows);
state.cur_filter_cnt = 1;
METRIC_FLOW_BATCHING_ENGINE_CUR_WND_CNT
.with_label_values(&[&flow_id_str])
.set(state.cur_filter_cnt as i64);
}
// also sleep for a little while before try again to prevent flooding logs
tokio::time::sleep(MIN_REFRESH_DURATION).await;
}
@@ -527,7 +559,7 @@ impl BatchingTask {
&self,
engine: QueryEngineRef,
sink_table_schema: &Arc<Schema>,
) -> Result<Option<(LogicalPlan, usize)>, Error> {
) -> Result<PlanWithWindow, Error> {
let query_ctx = self.state.read().unwrap().query_ctx.clone();
let start = SystemTime::now();
let since_the_epoch = start
@@ -540,7 +572,6 @@ impl BatchingTask {
.unwrap_or(u64::MIN);
let low_bound = Timestamp::new_second(low_bound as i64);
let schema_len = self.config.output_schema.fields().len();
let expire_time_window_bound = self
.config
@@ -573,10 +604,12 @@ impl BatchingTask {
context: format!("Failed to rewrite plan:\n {}\n", plan),
})?
.data;
let schema_len = plan.schema().fields().len();
// since no time window lower/upper bound is found, just return the original query(with auto columns)
return Ok(Some((plan, schema_len)));
return Ok(PlanWithWindow {
plan: Some(plan),
working_windows: WorkingTimeWindows::default(),
});
};
debug!(
@@ -598,16 +631,14 @@ impl BatchingTask {
),
})?;
let expr = {
let working_windows = {
let mut state = self.state.write().unwrap();
let max_window_cnt = state
.max_filter_num
.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM);
let cur_wnd_cnt = state.cur_filter_cnt;
state.dirty_time_windows.gen_filter_exprs(
&col_name,
Some(l),
window_size,
max_window_cnt,
cur_wnd_cnt,
self.config.flow_id,
Some(self),
)?
@@ -616,7 +647,9 @@ impl BatchingTask {
debug!(
"Flow id={:?}, Generated filter expr: {:?}",
self.config.flow_id,
expr.as_ref()
working_windows
.filter
.as_ref()
.map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu {
context: format!("Failed to generate filter expr from {expr:?}"),
}))
@@ -624,13 +657,16 @@ impl BatchingTask {
.map(|s| s.to_string())
);
let Some(expr) = expr else {
let Some(expr) = &working_windows.filter else {
// no new data, hence no need to update
debug!("Flow id={:?}, no new data, not update", self.config.flow_id);
return Ok(None);
return Ok(PlanWithWindow {
plan: None,
working_windows,
});
};
let mut add_filter = AddFilterRewriter::new(expr);
let mut add_filter = AddFilterRewriter::new(expr.clone());
let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone());
let plan =
@@ -646,7 +682,10 @@ impl BatchingTask {
// only apply optimize after complex rewrite is done
let new_plan = apply_df_optimizer(rewrite).await?;
Ok(Some((new_plan, schema_len)))
Ok(PlanWithWindow {
plan: Some(new_plan),
working_windows,
})
}
}

View File

@@ -96,6 +96,12 @@ lazy_static! {
&["flow_id"]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_CUR_WND_CNT: IntGaugeVec = register_int_gauge_vec!(
"greptime_flow_batching_current_window_count",
"flow batching engine current query window count per flow id",
&["flow_id"]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_guess_fe_load",