Compare commits

...

2 Commits

Author SHA1 Message Date
discord9
22c61432f6 feat: wait for longer anyway&wait metrics 2025-06-11 15:20:47 +08:00
discord9
91f373e66e metrics: more generic metrics batching mode 2025-06-11 15:20:47 +08:00
6 changed files with 70 additions and 5 deletions

View File

@@ -316,7 +316,7 @@ impl StreamingEngine {
);
METRIC_FLOW_ROWS
.with_label_values(&["out"])
.with_label_values(&["out-streaming"])
.inc_by(total_rows as u64);
let now = self.tick_manager.tick();

View File

@@ -46,7 +46,7 @@ use crate::error::{
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
};
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
use crate::repr::{self, DiffRow};
use crate::{Error, FlowId};
@@ -689,6 +689,9 @@ impl FlowEngine for FlowDualEngine {
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
let mut to_batch_engine = request.requests;
let mut batching_row_cnt = 0;
let mut streaming_row_cnt = 0;
{
// not locking this, or recover flows will be starved when also handling flow inserts
let src_table2flow = self.src_table2flow.read().await;
@@ -698,9 +701,11 @@ impl FlowEngine for FlowDualEngine {
let is_in_stream = src_table2flow.in_stream(table_id);
let is_in_batch = src_table2flow.in_batch(table_id);
if is_in_stream {
streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
to_stream_engine.push(req.clone());
}
if is_in_batch {
batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0);
return true;
}
if !is_in_batch && !is_in_stream {
@@ -713,6 +718,14 @@ impl FlowEngine for FlowDualEngine {
// can't use drop due to https://github.com/rust-lang/rust/pull/128846
}
METRIC_FLOW_ROWS
.with_label_values(&["in-streaming"])
.inc_by(streaming_row_cnt as u64);
METRIC_FLOW_ROWS
.with_label_values(&["in-batching"])
.inc_by(batching_row_cnt as u64);
let streaming_engine = self.streaming_engine.clone();
let stream_handler: JoinHandle<Result<(), Error>> =
common_runtime::spawn_global(async move {

View File

@@ -42,6 +42,7 @@ use crate::error::{
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
UnexpectedSnafu, UnsupportedSnafu,
};
use crate::metrics::METRIC_FLOW_ROWS;
use crate::{CreateFlowArgs, Error, FlowId, TableName};
/// Batching mode Engine, responsible for driving all the batching mode tasks
@@ -155,6 +156,10 @@ impl BatchingEngine {
let Some(expr) = &task.config.time_window_expr else {
continue;
};
let row_cnt: usize = entry.iter().map(|rows| rows.rows.len()).sum();
METRIC_FLOW_ROWS
.with_label_values(&[&format!("{}-batching-in", task.config.flow_id)])
.inc_by(row_cnt as u64);
let involved_time_windows = expr.handle_rows(entry.clone()).await?;
let mut state = task.state.write().unwrap();
state

View File

@@ -32,6 +32,7 @@ use crate::batching_mode::MIN_REFRESH_DURATION;
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME,
};
use crate::{Error, FlowId};
@@ -102,8 +103,19 @@ impl TaskState {
})
.unwrap_or(last_duration);
METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME
.with_label_values(&[
flow_id.to_string().as_str(),
time_window_size
.unwrap_or_default()
.as_secs_f64()
.to_string()
.as_str(),
])
.observe(next_duration.as_secs_f64());
// if have dirty time window, execute immediately to clean dirty time window
if self.dirty_time_windows.windows.is_empty() {
/*if self.dirty_time_windows.windows.is_empty() {
self.last_update_time + next_duration
} else {
debug!(
@@ -113,7 +125,9 @@ impl TaskState {
self.dirty_time_windows.windows
);
Instant::now()
}
}*/
self.last_update_time + next_duration
}
}

View File

@@ -61,7 +61,9 @@ use crate::error::{
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME,
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT,
METRIC_FLOW_ROWS,
};
use crate::{Error, FlowId};
@@ -371,6 +373,9 @@ impl BatchingTask {
"Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}",
elapsed
);
METRIC_FLOW_ROWS
.with_label_values(&[format!("{}-out-batching", flow_id).as_str()])
.inc_by(*affected_rows as _);
} else if let Err(err) = &res {
warn!(
"Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}",
@@ -410,6 +415,7 @@ impl BatchingTask {
engine: QueryEngineRef,
frontend_client: Arc<FrontendClient>,
) {
let flow_id_str = self.config.flow_id.to_string();
loop {
// first check if shutdown signal is received
// if so, break the loop
@@ -427,6 +433,9 @@ impl BatchingTask {
Err(TryRecvError::Empty) => (),
}
}
METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT
.with_label_values(&[&flow_id_str])
.inc();
let new_query = match self.gen_insert_plan(&engine).await {
Ok(new_query) => new_query,
@@ -473,6 +482,9 @@ impl BatchingTask {
}
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
Err(err) => {
METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT
.with_label_values(&[&flow_id_str])
.inc();
match new_query {
Some(query) => {
common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id)

View File

@@ -35,6 +35,13 @@ lazy_static! {
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_wait_time_secs",
"flow batching engine wait time between query(seconds)",
&["flow_id", "time_window_size"],
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_slow_query_secs",
"flow batching engine slow query(seconds)",
@@ -58,6 +65,20 @@ lazy_static! {
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec =
register_int_counter_vec!(
"greptime_flow_batching_start_query_count",
"flow batching engine started query count",
&["flow_id"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT: IntCounterVec =
register_int_counter_vec!(
"greptime_flow_batching_error_count",
"flow batching engine error count per flow id",
&["flow_id"],
)
.unwrap();
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(