Compare commits

...

3 Commits

Author SHA1 Message Date
discord9
af90fad499 feat: truly limit time range by split window 2025-06-11 15:26:47 +08:00
discord9
22c61432f6 feat: wait for longer anyway&wait metrics 2025-06-11 15:20:47 +08:00
discord9
91f373e66e metrics: more generic metrics batching mode 2025-06-11 15:20:47 +08:00
6 changed files with 175 additions and 38 deletions

View File

@@ -316,7 +316,7 @@ impl StreamingEngine {
); );
METRIC_FLOW_ROWS METRIC_FLOW_ROWS
.with_label_values(&["out"]) .with_label_values(&["out-streaming"])
.inc_by(total_rows as u64); .inc_by(total_rows as u64);
let now = self.tick_manager.tick(); let now = self.tick_manager.tick();

View File

@@ -46,7 +46,7 @@ use crate::error::{
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu, NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
}; };
use crate::metrics::METRIC_FLOW_TASK_COUNT; use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
use crate::repr::{self, DiffRow}; use crate::repr::{self, DiffRow};
use crate::{Error, FlowId}; use crate::{Error, FlowId};
@@ -689,6 +689,9 @@ impl FlowEngine for FlowDualEngine {
let mut to_stream_engine = Vec::with_capacity(request.requests.len()); let mut to_stream_engine = Vec::with_capacity(request.requests.len());
let mut to_batch_engine = request.requests; let mut to_batch_engine = request.requests;
let mut batching_row_cnt = 0;
let mut streaming_row_cnt = 0;
{ {
// not locking this, or recover flows will be starved when also handling flow inserts // not locking this, or recover flows will be starved when also handling flow inserts
let src_table2flow = self.src_table2flow.read().await; let src_table2flow = self.src_table2flow.read().await;
@@ -698,9 +701,11 @@ impl FlowEngine for FlowDualEngine {
let is_in_stream = src_table2flow.in_stream(table_id); let is_in_stream = src_table2flow.in_stream(table_id);
let is_in_batch = src_table2flow.in_batch(table_id); let is_in_batch = src_table2flow.in_batch(table_id);
if is_in_stream { if is_in_stream {
streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
to_stream_engine.push(req.clone()); to_stream_engine.push(req.clone());
} }
if is_in_batch { if is_in_batch {
batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
return true; return true;
} }
if !is_in_batch && !is_in_stream { if !is_in_batch && !is_in_stream {
@@ -713,6 +718,14 @@ impl FlowEngine for FlowDualEngine {
// can't use drop due to https://github.com/rust-lang/rust/pull/128846 // can't use drop due to https://github.com/rust-lang/rust/pull/128846
} }
METRIC_FLOW_ROWS
.with_label_values(&["in-streaming"])
.inc_by(streaming_row_cnt as u64);
METRIC_FLOW_ROWS
.with_label_values(&["in-batching"])
.inc_by(batching_row_cnt as u64);
let streaming_engine = self.streaming_engine.clone(); let streaming_engine = self.streaming_engine.clone();
let stream_handler: JoinHandle<Result<(), Error>> = let stream_handler: JoinHandle<Result<(), Error>> =
common_runtime::spawn_global(async move { common_runtime::spawn_global(async move {

View File

@@ -42,6 +42,7 @@ use crate::error::{
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
UnexpectedSnafu, UnsupportedSnafu, UnexpectedSnafu, UnsupportedSnafu,
}; };
use crate::metrics::METRIC_FLOW_ROWS;
use crate::{CreateFlowArgs, Error, FlowId, TableName}; use crate::{CreateFlowArgs, Error, FlowId, TableName};
/// Batching mode Engine, responsible for driving all the batching mode tasks /// Batching mode Engine, responsible for driving all the batching mode tasks
@@ -155,6 +156,10 @@ impl BatchingEngine {
let Some(expr) = &task.config.time_window_expr else { let Some(expr) = &task.config.time_window_expr else {
continue; continue;
}; };
let row_cnt: usize = entry.iter().map(|rows| rows.rows.len()).sum();
METRIC_FLOW_ROWS
.with_label_values(&[&format!("{}-batching-in", task.config.flow_id)])
.inc_by(row_cnt as u64);
let involved_time_windows = expr.handle_rows(entry.clone()).await?; let involved_time_windows = expr.handle_rows(entry.clone()).await?;
let mut state = task.state.write().unwrap(); let mut state = task.state.write().unwrap();
state state

View File

@@ -32,6 +32,7 @@ use crate::batching_mode::MIN_REFRESH_DURATION;
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu}; use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
use crate::metrics::{ use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME,
}; };
use crate::{Error, FlowId}; use crate::{Error, FlowId};
@@ -102,8 +103,19 @@ impl TaskState {
}) })
.unwrap_or(last_duration); .unwrap_or(last_duration);
METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME
.with_label_values(&[
flow_id.to_string().as_str(),
time_window_size
.unwrap_or_default()
.as_secs_f64()
.to_string()
.as_str(),
])
.observe(next_duration.as_secs_f64());
// if have dirty time window, execute immediately to clean dirty time window // if have dirty time window, execute immediately to clean dirty time window
if self.dirty_time_windows.windows.is_empty() { /*if self.dirty_time_windows.windows.is_empty() {
self.last_update_time + next_duration self.last_update_time + next_duration
} else { } else {
debug!( debug!(
@@ -113,7 +125,9 @@ impl TaskState {
self.dirty_time_windows.windows self.dirty_time_windows.windows
); );
Instant::now() Instant::now()
} }*/
self.last_update_time + next_duration
} }
} }
@@ -206,47 +220,59 @@ impl DirtyTimeWindows {
// get the first `window_cnt` time windows // get the first `window_cnt` time windows
let max_time_range = window_size * window_cnt as i32; let max_time_range = window_size * window_cnt as i32;
let nth = {
let mut cur_time_range = chrono::Duration::zero();
let mut nth_key = None;
for (idx, (start, end)) in self.windows.iter().enumerate() {
// if time range is too long, stop
if cur_time_range > max_time_range {
nth_key = Some(*start);
break;
}
// if we have enough time windows, stop let mut to_be_query = BTreeMap::new();
if idx >= window_cnt { let mut new_windows = self.windows.clone();
nth_key = Some(*start); let mut cur_time_range = chrono::Duration::zero();
break; for (idx, (start, end)) in self.windows.iter().enumerate() {
} let first_end = start
.add_duration(window_size.to_std().unwrap())
.context(TimeSnafu)?;
let end = end.unwrap_or(first_end);
if let Some(end) = end { // if time range is too long, stop
if let Some(x) = end.sub(start) { if cur_time_range >= max_time_range {
cur_time_range += x; break;
}
}
} }
nth_key // if we have enough time windows, stop
}; if idx >= window_cnt {
let first_nth = { break;
if let Some(nth) = nth {
let mut after = self.windows.split_off(&nth);
std::mem::swap(&mut self.windows, &mut after);
after
} else {
std::mem::take(&mut self.windows)
} }
};
if let Some(x) = end.sub(start) {
if cur_time_range + x <= max_time_range {
to_be_query.insert(*start, Some(end));
new_windows.remove(start);
cur_time_range += x;
} else {
// too large a window, split it
// split at window_size * times
let surplus = max_time_range - cur_time_range;
let times = surplus.num_seconds() / window_size.num_seconds();
let split_offset = window_size * times as i32;
let split_at = start
.add_duration(split_offset.to_std().unwrap())
.context(TimeSnafu)?;
to_be_query.insert(*start, Some(split_at));
// remove the original window
new_windows.remove(start);
new_windows.insert(split_at, Some(end));
cur_time_range += split_offset;
break;
}
}
}
self.windows = new_windows;
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
.with_label_values(&[flow_id.to_string().as_str()]) .with_label_values(&[flow_id.to_string().as_str()])
.observe(first_nth.len() as f64); .observe(to_be_query.len() as f64);
let full_time_range = first_nth let full_time_range = to_be_query
.iter() .iter()
.fold(chrono::Duration::zero(), |acc, (start, end)| { .fold(chrono::Duration::zero(), |acc, (start, end)| {
if let Some(end) = end { if let Some(end) = end {
@@ -261,7 +287,7 @@ impl DirtyTimeWindows {
.observe(full_time_range); .observe(full_time_range);
let mut expr_lst = vec![]; let mut expr_lst = vec![];
for (start, end) in first_nth.into_iter() { for (start, end) in to_be_query.into_iter() {
// align using time window exprs // align using time window exprs
let (start, end) = if let Some(ctx) = task_ctx { let (start, end) = if let Some(ctx) = task_ctx {
let Some(time_window_expr) = &ctx.config.time_window_expr else { let Some(time_window_expr) = &ctx.config.time_window_expr else {
@@ -495,6 +521,64 @@ mod test {
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))", "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))",
) )
), ),
// split range
(
Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once(
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
60
)),
),
(
Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)),
Some(Timestamp::new_second(
60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3
)),
)]),
Some(
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
)
),
// split 2 min into 1 min
(
Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
40 * 3
)),
)]),
Some(
"((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))",
)
),
// split 3s + 1min into 3s + 57s
(
Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))),
(chrono::Duration::seconds(3), None),
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(
3
)),
),(
Timestamp::new_second(20),
Some(Timestamp::new_second(
140
)),
)]),
Some(
"(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))",
)
),
// expired // expired
( (
vec![ vec![
@@ -511,6 +595,8 @@ mod test {
None None
), ),
]; ];
// let len = testcases.len();
// let testcases = testcases[(len - 2)..(len - 1)].to_vec();
for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in
testcases testcases
{ {

View File

@@ -61,7 +61,9 @@ use crate::error::{
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu, SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
}; };
use crate::metrics::{ use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
METRIC_FLOW_ROWS,
}; };
use crate::{Error, FlowId}; use crate::{Error, FlowId};
@@ -371,6 +373,9 @@ impl BatchingTask {
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}", "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
elapsed elapsed
); );
METRIC_FLOW_ROWS
.with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
.inc_by(*affected_rows as _);
} else if let Err(err) = &res { } else if let Err(err) = &res {
warn!( warn!(
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}", "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
@@ -410,6 +415,7 @@ impl BatchingTask {
engine: QueryEngineRef, engine: QueryEngineRef,
frontend_client: Arc<FrontendClient>, frontend_client: Arc<FrontendClient>,
) { ) {
let flow_id_str = self.config.flow_id.to_string();
loop { loop {
// first check if shutdown signal is received // first check if shutdown signal is received
// if so, break the loop // if so, break the loop
@@ -427,6 +433,9 @@ impl BatchingTask {
Err(TryRecvError::Empty) => (), Err(TryRecvError::Empty) => (),
} }
} }
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
.with_label_values(&[&flow_id_str])
.inc();
let new_query = match self.gen_insert_plan(&engine).await { let new_query = match self.gen_insert_plan(&engine).await {
Ok(new_query) => new_query, Ok(new_query) => new_query,
@@ -473,6 +482,9 @@ impl BatchingTask {
} }
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
Err(err) => { Err(err) => {
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
.with_label_values(&[&flow_id_str])
.inc();
match new_query { match new_query {
Some(query) => { Some(query) => {
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id) common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)

View File

@@ -35,6 +35,13 @@ lazy_static! {
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,] vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
) )
.unwrap(); .unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_wait_time_secs",
"flow batching engine wait time between query(seconds)",
&["flow_id", "time_window_size"],
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!( pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_slow_query_secs", "greptime_flow_batching_engine_slow_query_secs",
"flow batching engine slow query(seconds)", "flow batching engine slow query(seconds)",
@@ -58,6 +65,20 @@ lazy_static! {
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
) )
.unwrap(); .unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec =
register_int_counter_vec!(
"greptime_flow_batching_start_query_count",
"flow batching engine started query count",
&["flow_id"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT: IntCounterVec =
register_int_counter_vec!(
"greptime_flow_batching_error_count",
"flow batching engine error count per flow id",
&["flow_id"],
)
.unwrap();
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge = pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap(); register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!( pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(