feat: report flow stats from streaming and batching engines (#7701)

* fix: report flow stats from streaming and batching engines

* handle restart report handler

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename fn name

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-02-11 15:38:09 +08:00
committed by GitHub
parent db46849f40
commit 77013d9085
13 changed files with 216 additions and 58 deletions

View File

@@ -457,9 +457,8 @@ impl StartCommand {
// set the ref to query for the local flow state
{
let flow_streaming_engine = flownode.flow_engine().streaming_engine();
information_extension
.set_flow_streaming_engine(flow_streaming_engine)
.set_flow_engine(flownode.flow_engine())
.await;
}

View File

@@ -75,7 +75,6 @@ pub(crate) mod table_source;
use crate::FrontendInvoker;
use crate::error::Error;
use crate::utils::StateReportHandler;
// `GREPTIME_TIMESTAMP` is not used to distinguish when table is created automatically by flow
pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder";
@@ -178,8 +177,6 @@ pub struct StreamingEngine {
///
/// So that a series of event like `inserts -> flush` can be handled correctly
flush_lock: RwLock<()>,
/// receive a oneshot sender to send state size report
state_report_handler: RwLock<Option<StateReportHandler>>,
}
/// Building FlownodeManager
@@ -215,15 +212,9 @@ impl StreamingEngine {
tick_manager,
node_id,
flush_lock: RwLock::new(()),
state_report_handler: RwLock::new(None),
}
}
pub async fn with_state_report_handler(self, handler: StateReportHandler) -> Self {
*self.state_report_handler.write().await = Some(handler);
self
}
/// Create a flownode manager with one worker
pub fn new_with_workers<'s>(
node_id: Option<u32>,
@@ -554,37 +545,13 @@ impl StreamingEngine {
/// Flow Runtime related methods
impl StreamingEngine {
/// Start state report handler, which will receive a sender from HeartbeatTask to send state size report back
///
/// if heartbeat task is shutdown, this future will exit too
async fn start_state_report_handler(self: Arc<Self>) -> Option<JoinHandle<()>> {
let state_report_handler = self.state_report_handler.write().await.take();
if let Some(mut handler) = state_report_handler {
let zelf = self.clone();
let handler = common_runtime::spawn_global(async move {
while let Some(ret_handler) = handler.recv().await {
let state_report = zelf.gen_state_report().await;
ret_handler.send(state_report).unwrap_or_else(|err| {
common_telemetry::error!(err; "Send state size report error");
});
}
});
Some(handler)
} else {
None
}
}
/// run in common_runtime background runtime
pub fn run_background(
self: Arc<Self>,
shutdown: Option<broadcast::Receiver<()>>,
) -> JoinHandle<()> {
info!("Starting flownode manager's background task");
common_runtime::spawn_global(async move {
let _state_report_handler = self.clone().start_state_report_handler().await;
self.run(shutdown).await;
})
common_runtime::spawn_global(async move { self.run(shutdown).await })
}
/// log all flow errors

View File

@@ -28,6 +28,7 @@ use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType;
use common_meta::error::Result as MetaResult;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::flow::flow_state::FlowStat;
use common_runtime::JoinHandle;
use common_telemetry::{error, info, trace, warn};
use datatypes::value::Value;
@@ -41,7 +42,7 @@ use tokio::sync::{Mutex, RwLock};
use crate::adapter::{CreateFlowArgs, StreamingEngine};
use crate::batching_mode::engine::BatchingEngine;
use crate::engine::FlowEngine;
use crate::engine::{FlowEngine, FlowStatProvider};
use crate::error::{
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, FlowNotRecoveredSnafu,
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
@@ -49,6 +50,7 @@ use crate::error::{
};
use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT};
use crate::repr::{self, DiffRow};
use crate::utils::StateReportHandler;
use crate::{Error, FlowId};
/// Ref to [`FlowDualEngine`]
@@ -61,6 +63,8 @@ pub type FlowDualEngineRef = Arc<FlowDualEngine>;
pub struct FlowDualEngine {
streaming_engine: Arc<StreamingEngine>,
batching_engine: Arc<BatchingEngine>,
/// receive a oneshot sender to send state report
state_report_handler: RwLock<Option<StateReportHandler>>,
/// helper struct for faster query flow by table id or vice versa
src_table2flow: RwLock<SrcTableToFlow>,
flow_metadata_manager: Arc<FlowMetadataManager>,
@@ -81,6 +85,7 @@ impl FlowDualEngine {
Self {
streaming_engine,
batching_engine,
state_report_handler: Default::default(),
src_table2flow: RwLock::new(SrcTableToFlow::default()),
flow_metadata_manager,
catalog_manager,
@@ -152,6 +157,47 @@ impl FlowDualEngine {
self.batching_engine.clone()
}
pub async fn set_state_report_handler(&self, handler: StateReportHandler) {
*self.state_report_handler.write().await = Some(handler);
}
pub async fn gen_state_report(&self) -> FlowStat {
let streaming = self.streaming_engine.flow_stat().await;
let batching = self.batching_engine.flow_stat().await;
let mut state_size = streaming.state_size;
state_size.extend(batching.state_size);
let mut last_exec_time_map = streaming.last_exec_time_map;
last_exec_time_map.extend(batching.last_exec_time_map);
FlowStat {
state_size,
last_exec_time_map,
}
}
/// Start state report task, which receives a sender from heartbeat task and sends report back.
///
/// if heartbeat task is shutdown, this future exits too.
pub async fn start_state_report_task(self: Arc<Self>) -> Option<JoinHandle<()>> {
let state_report_handler = self.state_report_handler.write().await.take();
if let Some(mut handler) = state_report_handler {
let zelf = self.clone();
let handler = common_runtime::spawn_global(async move {
while let Some(ret_handler) = handler.recv().await {
let state_report = zelf.gen_state_report().await;
ret_handler.send(state_report).unwrap_or_else(|err| {
common_telemetry::error!(err; "Send state report error");
});
}
});
Some(handler)
} else {
None
}
}
/// In distributed mode, scan periodically(1s) until available frontend is found, or timeout,
/// in standalone mode, return immediately
/// notice here if any frontend appear in cluster info this function will return immediately

View File

@@ -17,11 +17,13 @@ use std::collections::BTreeMap;
use common_meta::key::flow::flow_state::FlowStat;
use crate::StreamingEngine;
use crate::engine::FlowStatProvider;
impl StreamingEngine {
pub async fn gen_state_report(&self) -> FlowStat {
impl FlowStatProvider for StreamingEngine {
async fn flow_stat(&self) -> FlowStat {
let mut full_report = BTreeMap::new();
let mut last_exec_time_map = BTreeMap::new();
for worker in self.worker_handles.iter() {
match worker.get_state_size().await {
Ok(state_size) => {

View File

@@ -24,6 +24,7 @@ use common_error::ext::BoxedError;
use common_meta::ddl::create_flow::FlowType;
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::flow::FlowMetadataManagerRef;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_runtime::JoinHandle;
use common_telemetry::tracing::warn;
@@ -46,7 +47,7 @@ use crate::batching_mode::frontend_client::FrontendClient;
use crate::batching_mode::task::{BatchingTask, TaskArgs};
use crate::batching_mode::time_window::{TimeWindowExpr, find_time_window_expr};
use crate::batching_mode::utils::sql_to_df_plan;
use crate::engine::FlowEngine;
use crate::engine::{FlowEngine, FlowStatProvider};
use crate::error::{
CreateFlowSnafu, DatafusionSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu,
InvalidQuerySnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu,
@@ -92,6 +93,18 @@ impl BatchingEngine {
}
}
/// Returns last execution timestamps (millisecond) for all batching flows.
pub async fn get_last_exec_time_map(&self) -> BTreeMap<FlowId, i64> {
let tasks = self.tasks.read().await;
tasks
.iter()
.filter_map(|(flow_id, task)| {
task.last_execution_time_millis()
.map(|timestamp| (*flow_id, timestamp))
})
.collect()
}
pub async fn handle_mark_dirty_time_window(
&self,
reqs: DirtyWindowRequests,
@@ -320,6 +333,20 @@ impl BatchingEngine {
}
}
impl FlowStatProvider for BatchingEngine {
async fn flow_stat(&self) -> FlowStat {
FlowStat {
state_size: BTreeMap::new(),
last_exec_time_map: self
.get_last_exec_time_map()
.await
.into_iter()
.map(|(flow_id, timestamp)| (flow_id as u32, timestamp))
.collect(),
}
}
}
async fn get_table_name(
table_info: &TableInfoManager,
table_id: &TableId,

View File

@@ -44,6 +44,8 @@ pub struct TaskState {
last_update_time: Instant,
/// last time query duration
last_query_duration: Duration,
/// Last successful execution time in unix timestamp milliseconds.
last_exec_time_millis: Option<i64>,
/// Dirty Time windows need to be updated
/// mapping of `start -> end` and non-overlapping
pub(crate) dirty_time_windows: DirtyTimeWindows,
@@ -59,6 +61,7 @@ impl TaskState {
query_ctx,
last_update_time: Instant::now(),
last_query_duration: Duration::from_secs(0),
last_exec_time_millis: None,
dirty_time_windows: Default::default(),
exec_state: ExecState::Idle,
shutdown_rx,
@@ -68,10 +71,17 @@ impl TaskState {
/// called after last query is done
/// `is_succ` indicate whether the last query is successful
pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
pub fn after_query_exec(&mut self, elapsed: Duration, is_succ: bool) {
self.exec_state = ExecState::Idle;
self.last_query_duration = elapsed;
self.last_update_time = Instant::now();
if is_succ {
self.last_exec_time_millis = Some(common_time::util::current_time_millis());
}
}
pub fn last_execution_time_millis(&self) -> Option<i64> {
self.last_exec_time_millis
}
/// Compute the next query delay based on the time window size or the last query duration.
@@ -578,6 +588,20 @@ mod test {
use crate::batching_mode::utils::sql_to_df_plan;
use crate::test_utils::create_test_query_engine;
#[test]
fn test_task_state_records_last_execution_time() {
let query_ctx = QueryContext::arc();
let (_tx, rx) = tokio::sync::oneshot::channel();
let mut state = TaskState::new(query_ctx, rx);
assert_eq!(None, state.last_execution_time_millis());
state.after_query_exec(std::time::Duration::from_millis(1), false);
assert_eq!(None, state.last_execution_time_millis());
state.after_query_exec(std::time::Duration::from_millis(1), true);
assert!(state.last_execution_time_millis().is_some());
}
#[test]
fn test_merge_dirty_time_windows() {
let merge_dist = DirtyTimeWindows::default().time_window_merge_threshold;

View File

@@ -180,6 +180,10 @@ impl BatchingTask {
})
}
pub fn last_execution_time_millis(&self) -> Option<i64> {
self.state.read().unwrap().last_execution_time_millis()
}
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
///
/// useful for flush_flow to flush dirty time windows range

View File

@@ -16,6 +16,7 @@
use std::collections::HashMap;
use common_meta::key::flow::flow_state::FlowStat;
use session::context::QueryContext;
use table::metadata::TableId;
@@ -99,3 +100,9 @@ pub trait FlowEngine {
req: api::v1::flow::DirtyWindowRequests,
) -> Result<(), Error>;
}
/// Provides flow runtime statistics for information schema and heartbeat reporting.
pub trait FlowStatProvider {
/// Returns current runtime stats of an engine.
async fn flow_stat(&self) -> FlowStat;
}

View File

@@ -42,6 +42,7 @@ mod utils;
#[cfg(test)]
mod test_utils;
pub use adapter::flownode_impl::FlowDualEngineRef;
pub use adapter::{FlowConfig, FlowStreamingEngineRef, StreamingEngine};
pub use batching_mode::frontend_client::{FrontendClient, GrpcQueryHandlerWithBoxedError};
pub use engine::FlowAuthHeader;

View File

@@ -167,6 +167,8 @@ struct FlownodeServerInner {
server_shutdown_tx: Mutex<broadcast::Sender<()>>,
/// streaming task handler
streaming_task_handler: Mutex<Option<JoinHandle<()>>>,
/// state report task handler
state_report_task_handler: Mutex<Option<JoinHandle<()>>>,
flow_service: FlowService,
}
@@ -180,6 +182,7 @@ impl FlownodeServer {
worker_shutdown_tx: Mutex::new(tx),
server_shutdown_tx: Mutex::new(server_tx),
streaming_task_handler: Mutex::new(None),
state_report_task_handler: Mutex::new(None),
}),
}
}
@@ -189,6 +192,11 @@ impl FlownodeServer {
/// Should be called only after heartbeat is establish, hence can get cluster info
async fn start_workers(&self) -> Result<(), Error> {
let manager_ref = self.inner.flow_service.dual_engine.clone();
let mut state_report_task_handler = self.inner.state_report_task_handler.lock().await;
if state_report_task_handler.is_none() {
*state_report_task_handler = manager_ref.clone().start_state_report_task().await;
}
drop(state_report_task_handler);
let handle = manager_ref
.streaming_engine()
.run_background(Some(self.inner.worker_shutdown_tx.lock().await.subscribe()));
@@ -213,6 +221,8 @@ impl FlownodeServer {
if tx.send(()).is_err() {
info!("Receiver dropped, the flow node server has already shutdown");
}
// Keep state_report_task_handler alive across worker restarts.
// Dropping it here would permanently lose the report channel receiver.
self.inner
.flow_service
.dual_engine
@@ -364,15 +374,18 @@ impl FlownodeBuilder {
self.catalog_manager.clone(),
self.opts.flow.batching_mode.clone(),
));
let dual = FlowDualEngine::new(
let dual = Arc::new(FlowDualEngine::new(
manager.clone(),
batching,
self.flow_metadata_manager.clone(),
self.catalog_manager.clone(),
self.plugins.clone(),
);
));
if let Some(handler) = self.state_report_handler.take() {
dual.set_state_report_handler(handler).await;
}
let server = FlownodeServer::new(FlowService::new(Arc::new(dual)));
let server = FlownodeServer::new(FlowService::new(dual));
let heartbeat_task = self.heartbeat_task;
@@ -418,9 +431,6 @@ impl FlownodeBuilder {
})?;
man.add_worker_handle(worker_handle);
}
if let Some(handler) = self.state_report_handler.take() {
man = man.with_state_report_handler(handler).await;
}
info!("Flow Node Manager started");
Ok(man)
}
@@ -662,3 +672,76 @@ pub(crate) async fn get_all_flow_ids(
Ok(ret)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use catalog::memory::new_memory_catalog_manager;
use common_base::Plugins;
use common_meta::key::TableMetadataManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use query::options::QueryOptions;
use super::*;
use crate::adapter::flownode_impl::FlowDualEngine;
use crate::batching_mode::BatchingModeOptions;
use crate::batching_mode::engine::BatchingEngine;
use crate::utils::SizeReportSender;
async fn new_test_flownode_server() -> (FlownodeServer, SizeReportSender) {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_meta = Arc::new(TableMetadataManager::new(kv_backend.clone()));
table_meta.init().await.unwrap();
let flow_meta = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let catalog_manager = new_memory_catalog_manager().unwrap();
let query_engine = crate::test_utils::create_test_query_engine();
let streaming_engine = Arc::new(StreamingEngine::new(
None,
query_engine.clone(),
table_meta.clone(),
));
let (frontend_client, _handler) =
FrontendClient::from_empty_grpc_handler(QueryOptions::default());
let batching_engine = Arc::new(BatchingEngine::new(
Arc::new(frontend_client),
query_engine,
flow_meta.clone(),
table_meta,
catalog_manager.clone(),
BatchingModeOptions::default(),
));
let dual_engine = Arc::new(FlowDualEngine::new(
streaming_engine,
batching_engine,
flow_meta,
catalog_manager,
Plugins::new(),
));
let (report_sender, report_handler) = SizeReportSender::new();
dual_engine.set_state_report_handler(report_handler).await;
let server = FlownodeServer::new(FlowService::new(dual_engine));
(server, report_sender)
}
#[tokio::test]
async fn test_state_report_handler_survives_worker_restart() {
let (server, report_sender) = new_test_flownode_server().await;
server.start_workers().await.unwrap();
report_sender.query(Duration::from_secs(3)).await.unwrap();
server.stop_workers().await.unwrap();
report_sender.query(Duration::from_secs(3)).await.unwrap();
server.start_workers().await.unwrap();
report_sender.query(Duration::from_secs(3)).await.unwrap();
server.stop_workers().await.unwrap();
}
}

View File

@@ -26,7 +26,7 @@ use common_procedure::{ProcedureInfo, ProcedureManagerRef};
use common_query::request::QueryRequest;
use common_stat::{ResourceStatImpl, ResourceStatRef};
use datanode::region_server::RegionServer;
use flow::StreamingEngine;
use flow::FlowDualEngineRef;
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::sync::RwLock;
@@ -35,7 +35,7 @@ pub struct StandaloneInformationExtension {
region_server: RegionServer,
procedure_manager: ProcedureManagerRef,
start_time_ms: u64,
flow_streaming_engine: RwLock<Option<Arc<StreamingEngine>>>,
flow_engine: RwLock<Option<FlowDualEngineRef>>,
resource_stat: ResourceStatRef,
}
@@ -47,15 +47,15 @@ impl StandaloneInformationExtension {
region_server,
procedure_manager,
start_time_ms: common_time::util::current_time_millis() as u64,
flow_streaming_engine: RwLock::new(None),
flow_engine: RwLock::new(None),
resource_stat: Arc::new(resource_stat),
}
}
/// Set the flow streaming engine for the standalone instance.
pub async fn set_flow_streaming_engine(&self, flow_streaming_engine: Arc<StreamingEngine>) {
let mut guard = self.flow_streaming_engine.write().await;
*guard = Some(flow_streaming_engine);
/// Set the flow engine for the standalone instance.
pub async fn set_flow_engine(&self, flow_engine: FlowDualEngineRef) {
let mut guard = self.flow_engine.write().await;
*guard = Some(flow_engine);
}
}
@@ -144,7 +144,7 @@ impl InformationExtension for StandaloneInformationExtension {
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
Ok(Some(
self.flow_streaming_engine
self.flow_engine
.read()
.await
.as_ref()

View File

@@ -72,13 +72,12 @@ INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512
Affected Rows: 4
-- TODO(discord9): fix flow stat update for batching mode flow
SELECT created_time < last_execution_time, created_time IS NOT NULL, last_execution_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow';
+--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+
| information_schema.flows.created_time < information_schema.flows.last_execution_time | information_schema.flows.created_time IS NOT NULL | information_schema.flows.last_execution_time IS NOT NULL | source_table_names |
+--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+
| | true | false | greptime.public.ngx_access_log |
| true | true | true | greptime.public.ngx_access_log |
+--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+
DROP TABLE ngx_access_log;

View File

@@ -32,7 +32,6 @@ INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512
-- SQLNESS SLEEP 10s
INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512, 'Mozilla/5.0', 1024, '2023-10-01T10:00:00Z'), ('192.168.1.2', 'POST', '/submit', 201, 256, 'curl/7.68.0', 512, '2023-10-01T10:01:00Z'), ('192.168.1.1', 'GET', '/about.html', 200, 128, 'Mozilla/5.0', 256, '2023-10-01T10:02:00Z'), ('192.168.1.3', 'GET', '/contact', 404, 64, 'curl/7.68.0', 128, '2023-10-01T10:03:00Z');
-- TODO(discord9): fix flow stat update for batching mode flow
SELECT created_time < last_execution_time, created_time IS NOT NULL, last_execution_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow';
DROP TABLE ngx_access_log;