diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index b605c7a66f..92638d3c4a 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -457,9 +457,8 @@ impl StartCommand { // set the ref to query for the local flow state { - let flow_streaming_engine = flownode.flow_engine().streaming_engine(); information_extension - .set_flow_streaming_engine(flow_streaming_engine) + .set_flow_engine(flownode.flow_engine()) .await; } diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 2091c53d1d..7dc4bc93d8 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -75,7 +75,6 @@ pub(crate) mod table_source; use crate::FrontendInvoker; use crate::error::Error; -use crate::utils::StateReportHandler; // `GREPTIME_TIMESTAMP` is not used to distinguish when table is created automatically by flow pub const AUTO_CREATED_PLACEHOLDER_TS_COL: &str = "__ts_placeholder"; @@ -178,8 +177,6 @@ pub struct StreamingEngine { /// /// So that a series of event like `inserts -> flush` can be handled correctly flush_lock: RwLock<()>, - /// receive a oneshot sender to send state size report - state_report_handler: RwLock>, } /// Building FlownodeManager @@ -215,15 +212,9 @@ impl StreamingEngine { tick_manager, node_id, flush_lock: RwLock::new(()), - state_report_handler: RwLock::new(None), } } - pub async fn with_state_report_handler(self, handler: StateReportHandler) -> Self { - *self.state_report_handler.write().await = Some(handler); - self - } - /// Create a flownode manager with one worker pub fn new_with_workers<'s>( node_id: Option, @@ -554,37 +545,13 @@ impl StreamingEngine { /// Flow Runtime related methods impl StreamingEngine { - /// Start state report handler, which will receive a sender from HeartbeatTask to send state size report back - /// - /// if heartbeat task is shutdown, this future will exit too - async fn start_state_report_handler(self: Arc) -> Option> { - let state_report_handler = self.state_report_handler.write().await.take(); - if let Some(mut handler) = state_report_handler { - let zelf = self.clone(); - let handler = common_runtime::spawn_global(async move { - while let Some(ret_handler) = handler.recv().await { - let state_report = zelf.gen_state_report().await; - ret_handler.send(state_report).unwrap_or_else(|err| { - common_telemetry::error!(err; "Send state size report error"); - }); - } - }); - Some(handler) - } else { - None - } - } - /// run in common_runtime background runtime pub fn run_background( self: Arc, shutdown: Option>, ) -> JoinHandle<()> { info!("Starting flownode manager's background task"); - common_runtime::spawn_global(async move { - let _state_report_handler = self.clone().start_state_report_handler().await; - self.run(shutdown).await; - }) + common_runtime::spawn_global(async move { self.run(shutdown).await }) } /// log all flow errors diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 1391bf2374..d1e4600e23 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -28,6 +28,7 @@ use common_error::ext::BoxedError; use common_meta::ddl::create_flow::FlowType; use common_meta::error::Result as MetaResult; use common_meta::key::flow::FlowMetadataManager; +use common_meta::key::flow::flow_state::FlowStat; use common_runtime::JoinHandle; use common_telemetry::{error, info, trace, warn}; use datatypes::value::Value; @@ -41,7 +42,7 @@ use tokio::sync::{Mutex, RwLock}; use crate::adapter::{CreateFlowArgs, StreamingEngine}; use crate::batching_mode::engine::BatchingEngine; -use crate::engine::FlowEngine; +use crate::engine::{FlowEngine, FlowStatProvider}; use crate::error::{ CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, FlowNotRecoveredSnafu, IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, @@ -49,6 +50,7 @@ use crate::error::{ }; use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT}; use crate::repr::{self, DiffRow}; +use crate::utils::StateReportHandler; use crate::{Error, FlowId}; /// Ref to [`FlowDualEngine`] @@ -61,6 +63,8 @@ pub type FlowDualEngineRef = Arc; pub struct FlowDualEngine { streaming_engine: Arc, batching_engine: Arc, + /// receive a oneshot sender to send state report + state_report_handler: RwLock>, /// helper struct for faster query flow by table id or vice versa src_table2flow: RwLock, flow_metadata_manager: Arc, @@ -81,6 +85,7 @@ impl FlowDualEngine { Self { streaming_engine, batching_engine, + state_report_handler: Default::default(), src_table2flow: RwLock::new(SrcTableToFlow::default()), flow_metadata_manager, catalog_manager, @@ -152,6 +157,47 @@ impl FlowDualEngine { self.batching_engine.clone() } + pub async fn set_state_report_handler(&self, handler: StateReportHandler) { + *self.state_report_handler.write().await = Some(handler); + } + + pub async fn gen_state_report(&self) -> FlowStat { + let streaming = self.streaming_engine.flow_stat().await; + let batching = self.batching_engine.flow_stat().await; + + let mut state_size = streaming.state_size; + state_size.extend(batching.state_size); + + let mut last_exec_time_map = streaming.last_exec_time_map; + last_exec_time_map.extend(batching.last_exec_time_map); + + FlowStat { + state_size, + last_exec_time_map, + } + } + + /// Start state report task, which receives a sender from heartbeat task and sends report back. + /// + /// if heartbeat task is shutdown, this future exits too. + pub async fn start_state_report_task(self: Arc) -> Option> { + let state_report_handler = self.state_report_handler.write().await.take(); + if let Some(mut handler) = state_report_handler { + let zelf = self.clone(); + let handler = common_runtime::spawn_global(async move { + while let Some(ret_handler) = handler.recv().await { + let state_report = zelf.gen_state_report().await; + ret_handler.send(state_report).unwrap_or_else(|err| { + common_telemetry::error!(err; "Send state report error"); + }); + } + }); + Some(handler) + } else { + None + } + } + /// In distributed mode, scan periodically(1s) until available frontend is found, or timeout, /// in standalone mode, return immediately /// notice here if any frontend appear in cluster info this function will return immediately diff --git a/src/flow/src/adapter/stat.rs b/src/flow/src/adapter/stat.rs index fe1727a30e..68f8160c87 100644 --- a/src/flow/src/adapter/stat.rs +++ b/src/flow/src/adapter/stat.rs @@ -17,11 +17,13 @@ use std::collections::BTreeMap; use common_meta::key::flow::flow_state::FlowStat; use crate::StreamingEngine; +use crate::engine::FlowStatProvider; -impl StreamingEngine { - pub async fn gen_state_report(&self) -> FlowStat { +impl FlowStatProvider for StreamingEngine { + async fn flow_stat(&self) -> FlowStat { let mut full_report = BTreeMap::new(); let mut last_exec_time_map = BTreeMap::new(); + for worker in self.worker_handles.iter() { match worker.get_state_size().await { Ok(state_size) => { diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 5c677e6e38..054f5db9d6 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -24,6 +24,7 @@ use common_error::ext::BoxedError; use common_meta::ddl::create_flow::FlowType; use common_meta::key::TableMetadataManagerRef; use common_meta::key::flow::FlowMetadataManagerRef; +use common_meta::key::flow::flow_state::FlowStat; use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; use common_runtime::JoinHandle; use common_telemetry::tracing::warn; @@ -46,7 +47,7 @@ use crate::batching_mode::frontend_client::FrontendClient; use crate::batching_mode::task::{BatchingTask, TaskArgs}; use crate::batching_mode::time_window::{TimeWindowExpr, find_time_window_expr}; use crate::batching_mode::utils::sql_to_df_plan; -use crate::engine::FlowEngine; +use crate::engine::{FlowEngine, FlowStatProvider}; use crate::error::{ CreateFlowSnafu, DatafusionSnafu, ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, InvalidQuerySnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu, @@ -92,6 +93,18 @@ impl BatchingEngine { } } + /// Returns last execution timestamps (millisecond) for all batching flows. + pub async fn get_last_exec_time_map(&self) -> BTreeMap { + let tasks = self.tasks.read().await; + tasks + .iter() + .filter_map(|(flow_id, task)| { + task.last_execution_time_millis() + .map(|timestamp| (*flow_id, timestamp)) + }) + .collect() + } + pub async fn handle_mark_dirty_time_window( &self, reqs: DirtyWindowRequests, @@ -320,6 +333,20 @@ impl BatchingEngine { } } +impl FlowStatProvider for BatchingEngine { + async fn flow_stat(&self) -> FlowStat { + FlowStat { + state_size: BTreeMap::new(), + last_exec_time_map: self + .get_last_exec_time_map() + .await + .into_iter() + .map(|(flow_id, timestamp)| (flow_id as u32, timestamp)) + .collect(), + } + } +} + async fn get_table_name( table_info: &TableInfoManager, table_id: &TableId, diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index f9cc0ba9c2..d90023ae46 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -44,6 +44,8 @@ pub struct TaskState { last_update_time: Instant, /// last time query duration last_query_duration: Duration, + /// Last successful execution time in unix timestamp milliseconds. + last_exec_time_millis: Option, /// Dirty Time windows need to be updated /// mapping of `start -> end` and non-overlapping pub(crate) dirty_time_windows: DirtyTimeWindows, @@ -59,6 +61,7 @@ impl TaskState { query_ctx, last_update_time: Instant::now(), last_query_duration: Duration::from_secs(0), + last_exec_time_millis: None, dirty_time_windows: Default::default(), exec_state: ExecState::Idle, shutdown_rx, @@ -68,10 +71,17 @@ impl TaskState { /// called after last query is done /// `is_succ` indicate whether the last query is successful - pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) { + pub fn after_query_exec(&mut self, elapsed: Duration, is_succ: bool) { self.exec_state = ExecState::Idle; self.last_query_duration = elapsed; self.last_update_time = Instant::now(); + if is_succ { + self.last_exec_time_millis = Some(common_time::util::current_time_millis()); + } + } + + pub fn last_execution_time_millis(&self) -> Option { + self.last_exec_time_millis } /// Compute the next query delay based on the time window size or the last query duration. @@ -578,6 +588,20 @@ mod test { use crate::batching_mode::utils::sql_to_df_plan; use crate::test_utils::create_test_query_engine; + #[test] + fn test_task_state_records_last_execution_time() { + let query_ctx = QueryContext::arc(); + let (_tx, rx) = tokio::sync::oneshot::channel(); + let mut state = TaskState::new(query_ctx, rx); + + assert_eq!(None, state.last_execution_time_millis()); + state.after_query_exec(std::time::Duration::from_millis(1), false); + assert_eq!(None, state.last_execution_time_millis()); + + state.after_query_exec(std::time::Duration::from_millis(1), true); + assert!(state.last_execution_time_millis().is_some()); + } + #[test] fn test_merge_dirty_time_windows() { let merge_dist = DirtyTimeWindows::default().time_window_merge_threshold; diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 252735cdec..03c7afc460 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -180,6 +180,10 @@ impl BatchingTask { }) } + pub fn last_execution_time_millis(&self) -> Option { + self.state.read().unwrap().last_execution_time_millis() + } + /// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set) /// /// useful for flush_flow to flush dirty time windows range diff --git a/src/flow/src/engine.rs b/src/flow/src/engine.rs index eb3a00f580..d7ac7cdb45 100644 --- a/src/flow/src/engine.rs +++ b/src/flow/src/engine.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; +use common_meta::key::flow::flow_state::FlowStat; use session::context::QueryContext; use table::metadata::TableId; @@ -99,3 +100,9 @@ pub trait FlowEngine { req: api::v1::flow::DirtyWindowRequests, ) -> Result<(), Error>; } + +/// Provides flow runtime statistics for information schema and heartbeat reporting. +pub trait FlowStatProvider { + /// Returns current runtime stats of an engine. + async fn flow_stat(&self) -> FlowStat; +} diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index 3556fc32a7..bd4cbd9f08 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -42,6 +42,7 @@ mod utils; #[cfg(test)] mod test_utils; +pub use adapter::flownode_impl::FlowDualEngineRef; pub use adapter::{FlowConfig, FlowStreamingEngineRef, StreamingEngine}; pub use batching_mode::frontend_client::{FrontendClient, GrpcQueryHandlerWithBoxedError}; pub use engine::FlowAuthHeader; diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 6e9036d6cd..3dcb54c0a7 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -167,6 +167,8 @@ struct FlownodeServerInner { server_shutdown_tx: Mutex>, /// streaming task handler streaming_task_handler: Mutex>>, + /// state report task handler + state_report_task_handler: Mutex>>, flow_service: FlowService, } @@ -180,6 +182,7 @@ impl FlownodeServer { worker_shutdown_tx: Mutex::new(tx), server_shutdown_tx: Mutex::new(server_tx), streaming_task_handler: Mutex::new(None), + state_report_task_handler: Mutex::new(None), }), } } @@ -189,6 +192,11 @@ impl FlownodeServer { /// Should be called only after heartbeat is establish, hence can get cluster info async fn start_workers(&self) -> Result<(), Error> { let manager_ref = self.inner.flow_service.dual_engine.clone(); + let mut state_report_task_handler = self.inner.state_report_task_handler.lock().await; + if state_report_task_handler.is_none() { + *state_report_task_handler = manager_ref.clone().start_state_report_task().await; + } + drop(state_report_task_handler); let handle = manager_ref .streaming_engine() .run_background(Some(self.inner.worker_shutdown_tx.lock().await.subscribe())); @@ -213,6 +221,8 @@ impl FlownodeServer { if tx.send(()).is_err() { info!("Receiver dropped, the flow node server has already shutdown"); } + // Keep state_report_task_handler alive across worker restarts. + // Dropping it here would permanently lose the report channel receiver. self.inner .flow_service .dual_engine @@ -364,15 +374,18 @@ impl FlownodeBuilder { self.catalog_manager.clone(), self.opts.flow.batching_mode.clone(), )); - let dual = FlowDualEngine::new( + let dual = Arc::new(FlowDualEngine::new( manager.clone(), batching, self.flow_metadata_manager.clone(), self.catalog_manager.clone(), self.plugins.clone(), - ); + )); + if let Some(handler) = self.state_report_handler.take() { + dual.set_state_report_handler(handler).await; + } - let server = FlownodeServer::new(FlowService::new(Arc::new(dual))); + let server = FlownodeServer::new(FlowService::new(dual)); let heartbeat_task = self.heartbeat_task; @@ -418,9 +431,6 @@ impl FlownodeBuilder { })?; man.add_worker_handle(worker_handle); } - if let Some(handler) = self.state_report_handler.take() { - man = man.with_state_report_handler(handler).await; - } info!("Flow Node Manager started"); Ok(man) } @@ -662,3 +672,76 @@ pub(crate) async fn get_all_flow_ids( Ok(ret) } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::time::Duration; + + use catalog::memory::new_memory_catalog_manager; + use common_base::Plugins; + use common_meta::key::TableMetadataManager; + use common_meta::key::flow::FlowMetadataManager; + use common_meta::kv_backend::memory::MemoryKvBackend; + use query::options::QueryOptions; + + use super::*; + use crate::adapter::flownode_impl::FlowDualEngine; + use crate::batching_mode::BatchingModeOptions; + use crate::batching_mode::engine::BatchingEngine; + use crate::utils::SizeReportSender; + + async fn new_test_flownode_server() -> (FlownodeServer, SizeReportSender) { + let kv_backend = Arc::new(MemoryKvBackend::new()); + let table_meta = Arc::new(TableMetadataManager::new(kv_backend.clone())); + table_meta.init().await.unwrap(); + let flow_meta = Arc::new(FlowMetadataManager::new(kv_backend.clone())); + let catalog_manager = new_memory_catalog_manager().unwrap(); + let query_engine = crate::test_utils::create_test_query_engine(); + + let streaming_engine = Arc::new(StreamingEngine::new( + None, + query_engine.clone(), + table_meta.clone(), + )); + let (frontend_client, _handler) = + FrontendClient::from_empty_grpc_handler(QueryOptions::default()); + let batching_engine = Arc::new(BatchingEngine::new( + Arc::new(frontend_client), + query_engine, + flow_meta.clone(), + table_meta, + catalog_manager.clone(), + BatchingModeOptions::default(), + )); + let dual_engine = Arc::new(FlowDualEngine::new( + streaming_engine, + batching_engine, + flow_meta, + catalog_manager, + Plugins::new(), + )); + + let (report_sender, report_handler) = SizeReportSender::new(); + dual_engine.set_state_report_handler(report_handler).await; + + let server = FlownodeServer::new(FlowService::new(dual_engine)); + (server, report_sender) + } + + #[tokio::test] + async fn test_state_report_handler_survives_worker_restart() { + let (server, report_sender) = new_test_flownode_server().await; + + server.start_workers().await.unwrap(); + report_sender.query(Duration::from_secs(3)).await.unwrap(); + + server.stop_workers().await.unwrap(); + report_sender.query(Duration::from_secs(3)).await.unwrap(); + + server.start_workers().await.unwrap(); + report_sender.query(Duration::from_secs(3)).await.unwrap(); + + server.stop_workers().await.unwrap(); + } +} diff --git a/src/standalone/src/information_extension.rs b/src/standalone/src/information_extension.rs index 852da25e65..fb3bc3e5fb 100644 --- a/src/standalone/src/information_extension.rs +++ b/src/standalone/src/information_extension.rs @@ -26,7 +26,7 @@ use common_procedure::{ProcedureInfo, ProcedureManagerRef}; use common_query::request::QueryRequest; use common_stat::{ResourceStatImpl, ResourceStatRef}; use datanode::region_server::RegionServer; -use flow::StreamingEngine; +use flow::FlowDualEngineRef; use snafu::ResultExt; use store_api::storage::RegionId; use tokio::sync::RwLock; @@ -35,7 +35,7 @@ pub struct StandaloneInformationExtension { region_server: RegionServer, procedure_manager: ProcedureManagerRef, start_time_ms: u64, - flow_streaming_engine: RwLock>>, + flow_engine: RwLock>, resource_stat: ResourceStatRef, } @@ -47,15 +47,15 @@ impl StandaloneInformationExtension { region_server, procedure_manager, start_time_ms: common_time::util::current_time_millis() as u64, - flow_streaming_engine: RwLock::new(None), + flow_engine: RwLock::new(None), resource_stat: Arc::new(resource_stat), } } - /// Set the flow streaming engine for the standalone instance. - pub async fn set_flow_streaming_engine(&self, flow_streaming_engine: Arc) { - let mut guard = self.flow_streaming_engine.write().await; - *guard = Some(flow_streaming_engine); + /// Set the flow engine for the standalone instance. + pub async fn set_flow_engine(&self, flow_engine: FlowDualEngineRef) { + let mut guard = self.flow_engine.write().await; + *guard = Some(flow_engine); } } @@ -144,7 +144,7 @@ impl InformationExtension for StandaloneInformationExtension { async fn flow_stats(&self) -> std::result::Result, Self::Error> { Ok(Some( - self.flow_streaming_engine + self.flow_engine .read() .await .as_ref() diff --git a/tests/cases/standalone/common/flow/flow_view.result b/tests/cases/standalone/common/flow/flow_view.result index 086f823136..ec54a12aa8 100644 --- a/tests/cases/standalone/common/flow/flow_view.result +++ b/tests/cases/standalone/common/flow/flow_view.result @@ -72,13 +72,12 @@ INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512 Affected Rows: 4 --- TODO(discord9): fix flow stat update for batching mode flow SELECT created_time < last_execution_time, created_time IS NOT NULL, last_execution_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow'; +--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+ | information_schema.flows.created_time < information_schema.flows.last_execution_time | information_schema.flows.created_time IS NOT NULL | information_schema.flows.last_execution_time IS NOT NULL | source_table_names | +--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+ -| | true | false | greptime.public.ngx_access_log | +| true | true | true | greptime.public.ngx_access_log | +--------------------------------------------------------------------------------------+---------------------------------------------------+----------------------------------------------------------+--------------------------------+ DROP TABLE ngx_access_log; diff --git a/tests/cases/standalone/common/flow/flow_view.sql b/tests/cases/standalone/common/flow/flow_view.sql index 61aff064a9..28e5e2608e 100644 --- a/tests/cases/standalone/common/flow/flow_view.sql +++ b/tests/cases/standalone/common/flow/flow_view.sql @@ -32,7 +32,6 @@ INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512 -- SQLNESS SLEEP 10s INSERT INTO ngx_access_log VALUES ('192.168.1.1', 'GET', '/index.html', 200, 512, 'Mozilla/5.0', 1024, '2023-10-01T10:00:00Z'), ('192.168.1.2', 'POST', '/submit', 201, 256, 'curl/7.68.0', 512, '2023-10-01T10:01:00Z'), ('192.168.1.1', 'GET', '/about.html', 200, 128, 'Mozilla/5.0', 256, '2023-10-01T10:02:00Z'), ('192.168.1.3', 'GET', '/contact', 404, 64, 'curl/7.68.0', 128, '2023-10-01T10:03:00Z'); --- TODO(discord9): fix flow stat update for batching mode flow SELECT created_time < last_execution_time, created_time IS NOT NULL, last_execution_time IS NOT NULL, source_table_names FROM information_schema.flows WHERE flow_name = 'user_agent_flow'; DROP TABLE ngx_access_log;