diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index b16a57a575..261117157b 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -56,7 +56,7 @@ use datanode::datanode::{Datanode, DatanodeBuilder}; use datanode::region_server::RegionServer; use file_engine::config::EngineConfig as FileEngineConfig; use flow::{ - FlowConfig, FlowWorkerManager, FlownodeBuilder, FlownodeInstance, FlownodeOptions, + FlowConfig, FlowStreamingEngine, FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, FrontendInvoker, GrpcQueryHandlerWithBoxedError, }; use frontend::frontend::{Frontend, FrontendOptions}; @@ -703,7 +703,7 @@ pub struct StandaloneInformationExtension { region_server: RegionServer, procedure_manager: ProcedureManagerRef, start_time_ms: u64, - flow_worker_manager: RwLock>>, + flow_worker_manager: RwLock>>, } impl StandaloneInformationExtension { @@ -717,7 +717,7 @@ impl StandaloneInformationExtension { } /// Set the flow worker manager for the standalone instance. - pub async fn set_flow_worker_manager(&self, flow_worker_manager: Arc) { + pub async fn set_flow_worker_manager(&self, flow_worker_manager: Arc) { let mut guard = self.flow_worker_manager.write().await; *guard = Some(flow_worker_manager); } diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index bd2267967d..9d27c58606 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -135,14 +135,14 @@ impl Configurable for FlownodeOptions { } /// Arc-ed FlowNodeManager, cheaper to clone -pub type FlowWorkerManagerRef = Arc; +pub type FlowWorkerManagerRef = Arc; /// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread /// /// The choice of timestamp is just using current system timestamp for now /// /// TODO(discord9): rename to FlowStreamingEngine -pub struct FlowWorkerManager { +pub struct FlowStreamingEngine { /// The handler to the worker that will run the dataflow /// which is `!Send` so a handle is used pub worker_handles: Vec, @@ -171,7 +171,7 @@ pub struct FlowWorkerManager { } /// Building FlownodeManager -impl FlowWorkerManager { +impl FlowStreamingEngine { /// set frontend invoker pub async fn set_frontend_invoker(&self, frontend: FrontendInvoker) { *self.frontend_invoker.write().await = Some(frontend); @@ -190,7 +190,7 @@ impl FlowWorkerManager { let node_context = FlownodeContext::new(Box::new(srv_map.clone()) as _); let tick_manager = FlowTickManager::new(); let worker_handles = Vec::new(); - FlowWorkerManager { + FlowStreamingEngine { worker_handles, worker_selector: Mutex::new(0), query_engine, @@ -266,7 +266,7 @@ pub fn batches_to_rows_req(batches: Vec) -> Result, Erro } /// This impl block contains methods to send writeback requests to frontend -impl FlowWorkerManager { +impl FlowStreamingEngine { /// Return the number of requests it made pub async fn send_writeback_requests(&self) -> Result { let all_reqs = self.generate_writeback_request().await?; @@ -537,7 +537,7 @@ impl FlowWorkerManager { } /// Flow Runtime related methods -impl FlowWorkerManager { +impl FlowStreamingEngine { /// Start state report handler, which will receive a sender from HeartbeatTask to send state size report back /// /// if heartbeat task is shutdown, this future will exit too @@ -731,7 +731,7 @@ impl FlowWorkerManager { } /// Create&Remove flow -impl FlowWorkerManager { +impl FlowStreamingEngine { /// remove a flow by it's id pub async fn remove_flow_inner(&self, flow_id: FlowId) -> Result<(), Error> { for handle in self.worker_handles.iter() { diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index be16c38cb6..f99dc4961f 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -35,7 +35,7 @@ use snafu::{ensure, IntoError, OptionExt, ResultExt}; use store_api::storage::{RegionId, TableId}; use tokio::sync::{Mutex, RwLock}; -use crate::adapter::{CreateFlowArgs, FlowWorkerManager}; +use crate::adapter::{CreateFlowArgs, FlowStreamingEngine}; use crate::batching_mode::engine::BatchingEngine; use crate::engine::FlowEngine; use crate::error::{ @@ -55,7 +55,7 @@ pub type FlowDualEngineRef = Arc; /// including create/drop/flush flow /// and redirect insert requests to the appropriate engine pub struct FlowDualEngine { - streaming_engine: Arc, + streaming_engine: Arc, batching_engine: Arc, /// helper struct for faster query flow by table id or vice versa src_table2flow: RwLock, @@ -66,7 +66,7 @@ pub struct FlowDualEngine { impl FlowDualEngine { pub fn new( - streaming_engine: Arc, + streaming_engine: Arc, batching_engine: Arc, flow_metadata_manager: Arc, catalog_manager: Arc, @@ -81,7 +81,7 @@ impl FlowDualEngine { } } - pub fn streaming_engine(&self) -> Arc { + pub fn streaming_engine(&self) -> Arc { self.streaming_engine.clone() } @@ -692,7 +692,7 @@ fn to_meta_err( } #[async_trait::async_trait] -impl common_meta::node_manager::Flownode for FlowWorkerManager { +impl common_meta::node_manager::Flownode for FlowStreamingEngine { async fn handle(&self, request: FlowRequest) -> MetaResult { let query_ctx = request .header @@ -778,7 +778,7 @@ impl common_meta::node_manager::Flownode for FlowWorkerManager { } } -impl FlowEngine for FlowWorkerManager { +impl FlowEngine for FlowStreamingEngine { async fn create_flow(&self, args: CreateFlowArgs) -> Result, Error> { self.create_flow_inner(args).await } @@ -830,7 +830,7 @@ impl FetchFromRow { } } -impl FlowWorkerManager { +impl FlowStreamingEngine { async fn handle_inserts_inner( &self, request: InsertRequests, diff --git a/src/flow/src/adapter/refill.rs b/src/flow/src/adapter/refill.rs index 25287df62b..556f61db93 100644 --- a/src/flow/src/adapter/refill.rs +++ b/src/flow/src/adapter/refill.rs @@ -31,7 +31,7 @@ use snafu::{ensure, OptionExt, ResultExt}; use table::metadata::TableId; use crate::adapter::table_source::ManagedTableSource; -use crate::adapter::{FlowId, FlowWorkerManager, FlowWorkerManagerRef}; +use crate::adapter::{FlowId, FlowStreamingEngine, FlowWorkerManagerRef}; use crate::error::{FlowNotFoundSnafu, JoinTaskSnafu, UnexpectedSnafu}; use crate::expr::error::ExternalSnafu; use crate::expr::utils::find_plan_time_window_expr_lower_bound; @@ -39,7 +39,7 @@ use crate::repr::RelationDesc; use crate::server::get_all_flow_ids; use crate::{Error, FrontendInvoker}; -impl FlowWorkerManager { +impl FlowStreamingEngine { /// Create and start refill flow tasks in background pub async fn create_and_start_refill_flow_tasks( self: &FlowWorkerManagerRef, diff --git a/src/flow/src/adapter/stat.rs b/src/flow/src/adapter/stat.rs index bf272cd9d0..769065756e 100644 --- a/src/flow/src/adapter/stat.rs +++ b/src/flow/src/adapter/stat.rs @@ -16,9 +16,9 @@ use std::collections::BTreeMap; use common_meta::key::flow::flow_state::FlowStat; -use crate::FlowWorkerManager; +use crate::FlowStreamingEngine; -impl FlowWorkerManager { +impl FlowStreamingEngine { pub async fn gen_state_report(&self) -> FlowStat { let mut full_report = BTreeMap::new(); let mut last_exec_time_map = BTreeMap::new(); diff --git a/src/flow/src/adapter/util.rs b/src/flow/src/adapter/util.rs index 6811c29c96..d2eed7c998 100644 --- a/src/flow/src/adapter/util.rs +++ b/src/flow/src/adapter/util.rs @@ -33,8 +33,8 @@ use crate::adapter::table_source::TableDesc; use crate::adapter::{TableName, WorkerHandle, AUTO_CREATED_PLACEHOLDER_TS_COL}; use crate::error::{Error, ExternalSnafu, UnexpectedSnafu}; use crate::repr::{ColumnType, RelationDesc, RelationType}; -use crate::FlowWorkerManager; -impl FlowWorkerManager { +use crate::FlowStreamingEngine; +impl FlowStreamingEngine { /// Get a worker handle for creating flow, using round robin to select a worker pub(crate) async fn get_worker_handle_for_create_flow(&self) -> &WorkerHandle { let use_idx = { diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index 83443c147b..9c209841a2 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -43,7 +43,7 @@ mod utils; #[cfg(test)] mod test_utils; -pub use adapter::{FlowConfig, FlowWorkerManager, FlowWorkerManagerRef, FlownodeOptions}; +pub use adapter::{FlowConfig, FlowStreamingEngine, FlowWorkerManagerRef, FlownodeOptions}; pub use batching_mode::frontend_client::{FrontendClient, GrpcQueryHandlerWithBoxedError}; pub(crate) use engine::{CreateFlowArgs, FlowId, TableName}; pub use error::{Error, Result}; diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index d7deba3e48..33d9e78b65 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -63,7 +63,7 @@ use crate::heartbeat::HeartbeatTask; use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS}; use crate::transform::register_function_to_query_engine; use crate::utils::{SizeReportSender, StateReportHandler}; -use crate::{CreateFlowArgs, Error, FlowWorkerManager, FlownodeOptions, FrontendClient}; +use crate::{CreateFlowArgs, Error, FlowStreamingEngine, FlownodeOptions, FrontendClient}; pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER"; /// wrapping flow node manager to avoid orphan rule with Arc<...> @@ -489,7 +489,7 @@ impl FlownodeBuilder { async fn build_manager( &mut self, query_engine: Arc, - ) -> Result { + ) -> Result { let table_meta = self.table_meta.clone(); register_function_to_query_engine(&query_engine); @@ -498,7 +498,7 @@ impl FlownodeBuilder { let node_id = self.opts.node_id.map(|id| id as u32); - let mut man = FlowWorkerManager::new(node_id, query_engine, table_meta); + let mut man = FlowStreamingEngine::new(node_id, query_engine, table_meta); for worker_id in 0..num_workers { let (tx, rx) = oneshot::channel();