refactor(flow): limit the size of query (#6216)

* refactor: not wait for slow query

* chore: clippy

* chore: fmt

* WIP: time range lock

* WIP

* refactor: rm over-complicated query pool

* chore: add more metrics& rm sql from slow query metrics
This commit is contained in:
discord9
2025-06-03 20:27:07 +08:00
committed by GitHub
parent 083c22b90a
commit 38cac301f2
3 changed files with 99 additions and 20 deletions

View File

@@ -30,6 +30,9 @@ use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::MIN_REFRESH_DURATION;
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
};
use crate::{Error, FlowId};
/// The state of the [`BatchingTask`].
@@ -127,10 +130,10 @@ impl DirtyTimeWindows {
/// Time window merge distance
///
/// TODO(discord9): make those configurable
const MERGE_DIST: i32 = 3;
pub const MERGE_DIST: i32 = 3;
/// Maximum number of filters allowed in a single query
const MAX_FILTER_NUM: usize = 20;
pub const MAX_FILTER_NUM: usize = 20;
/// Add lower bounds to the dirty time windows. Upper bounds are ignored.
///
@@ -154,11 +157,16 @@ impl DirtyTimeWindows {
}
/// Generate all filter expressions consuming all time windows
///
/// there is two limits:
/// - shouldn't return a too long time range(<=`window_size * window_cnt`), so that the query can be executed in a reasonable time
/// - shouldn't return too many time range exprs, so that the query can be parsed properly instead of causing parser to overflow
pub fn gen_filter_exprs(
&mut self,
col_name: &str,
expire_lower_bound: Option<Timestamp>,
window_size: chrono::Duration,
window_cnt: usize,
flow_id: FlowId,
task_ctx: Option<&BatchingTask>,
) -> Result<Option<datafusion_expr::Expr>, Error> {
@@ -196,12 +204,33 @@ impl DirtyTimeWindows {
}
}
// get the first `MAX_FILTER_NUM` time windows
let nth = self
.windows
.iter()
.nth(Self::MAX_FILTER_NUM)
.map(|(key, _)| *key);
// get the first `window_cnt` time windows
let max_time_range = window_size * window_cnt as i32;
let nth = {
let mut cur_time_range = chrono::Duration::zero();
let mut nth_key = None;
for (idx, (start, end)) in self.windows.iter().enumerate() {
// if time range is too long, stop
if cur_time_range > max_time_range {
nth_key = Some(*start);
break;
}
// if we have enough time windows, stop
if idx >= window_cnt {
nth_key = Some(*start);
break;
}
if let Some(end) = end {
if let Some(x) = end.sub(start) {
cur_time_range += x;
}
}
}
nth_key
};
let first_nth = {
if let Some(nth) = nth {
let mut after = self.windows.split_off(&nth);
@@ -213,6 +242,24 @@ impl DirtyTimeWindows {
}
};
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
.with_label_values(&[flow_id.to_string().as_str()])
.observe(first_nth.len() as f64);
let full_time_range = first_nth
.iter()
.fold(chrono::Duration::zero(), |acc, (start, end)| {
if let Some(end) = end {
acc + end.sub(start).unwrap_or(chrono::Duration::zero())
} else {
acc
}
})
.num_seconds() as f64;
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
.with_label_values(&[flow_id.to_string().as_str()])
.observe(full_time_range);
let mut expr_lst = vec![];
for (start, end) in first_nth.into_iter() {
// align using time window exprs
@@ -274,6 +321,8 @@ impl DirtyTimeWindows {
}
/// Merge time windows that overlaps or get too close
///
/// TODO(discord9): not merge and prefer to send smaller time windows? how?
pub fn merge_dirty_time_windows(
&mut self,
window_size: chrono::Duration,
@@ -472,7 +521,14 @@ mod test {
.unwrap();
assert_eq!(expected, dirty.windows);
let filter_expr = dirty
.gen_filter_exprs("ts", expire_lower_bound, window_size, 0, None)
.gen_filter_exprs(
"ts",
expire_lower_bound,
window_size,
DirtyTimeWindows::MAX_FILTER_NUM,
0,
None,
)
.unwrap();
let unparser = datafusion::sql::unparser::Unparser::default();

View File

@@ -46,7 +46,7 @@ use tokio::time::Instant;
use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
use crate::batching_mode::frontend_client::FrontendClient;
use crate::batching_mode::state::TaskState;
use crate::batching_mode::state::{DirtyTimeWindows, TaskState};
use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::utils::{
get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter,
@@ -387,7 +387,6 @@ impl BatchingTask {
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
.with_label_values(&[
flow_id.to_string().as_str(),
&plan.to_string(),
&peer_desc.unwrap_or_default().to_string(),
])
.observe(elapsed.as_secs_f64());
@@ -429,16 +428,23 @@ impl BatchingTask {
}
}
let mut new_query = None;
let mut gen_and_exec = async || {
new_query = self.gen_insert_plan(&engine).await?;
if let Some(new_query) = &new_query {
self.execute_logical_plan(&frontend_client, new_query).await
} else {
Ok(None)
let new_query = match self.gen_insert_plan(&engine).await {
Ok(new_query) => new_query,
Err(err) => {
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
// also sleep for a little while before try again to prevent flooding logs
tokio::time::sleep(MIN_REFRESH_DURATION).await;
continue;
}
};
match gen_and_exec().await {
let res = if let Some(new_query) = &new_query {
self.execute_logical_plan(&frontend_client, new_query).await
} else {
Ok(None)
};
match res {
// normal execute, sleep for some time before doing next query
Ok(Some(_)) => {
let sleep_until = {
@@ -583,6 +589,7 @@ impl BatchingTask {
&col_name,
Some(l),
window_size,
DirtyTimeWindows::MAX_FILTER_NUM,
self.config.flow_id,
Some(self),
)?;

View File

@@ -38,10 +38,26 @@ lazy_static! {
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_slow_query_secs",
"flow batching engine slow query(seconds)",
&["flow_id", "sql", "peer"],
&["flow_id", "peer"],
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_query_window_cnt",
"flow batching engine query time window count",
&["flow_id"],
vec![0.0, 5., 10., 20., 40.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_query_time_range_secs",
"flow batching engine query time range(seconds)",
&["flow_id"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(