From 74dee25688aeff7942999eb875ba7b8eefe2cb9b Mon Sep 17 00:00:00 2001 From: discord9 Date: Sun, 28 Apr 2024 18:50:20 +0800 Subject: [PATCH] feat: new() for FlowNodeManager --- Cargo.lock | 2 ++ src/flow/Cargo.toml | 2 ++ src/flow/src/adapter.rs | 44 ++++++++++++++++++++++++++++++++++++----- 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b9dd164883..862a534918 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3516,10 +3516,12 @@ name = "flow" version = "0.7.2" dependencies = [ "api", + "async-trait", "catalog", "common-catalog", "common-decimal", "common-error", + "common-frontend", "common-macro", "common-meta", "common-telemetry", diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index e5274704f9..c5b57d1c74 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -15,6 +15,7 @@ workspace = true api.workspace = true common-decimal.workspace = true common-error.workspace = true +common-frontend.workspace = true common-macro.workspace = true common-telemetry.workspace = true common-time.workspace = true @@ -25,6 +26,7 @@ datatypes.workspace = true enum_dispatch = "0.3" # This fork is simply for keeping our dependency in our org, and pin the version # it is the same with upstream repo +async-trait.workspace = true common-meta.workspace = true greptime-proto.workspace = true hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" } diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 58a850835f..5f32408c6c 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -17,6 +17,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; use std::sync::Arc; +use common_frontend::handler::FrontendInvoker; use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; use common_meta::key::table_name::TableNameManager; use hydroflow::scheduled::graph::Hydroflow; @@ -33,7 +34,7 @@ use tokio::sync::{broadcast, mpsc}; use tokio::task::LocalSet; use crate::adapter::error::{EvalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu}; -use crate::adapter::worker::WorkerHandle; +use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; use crate::compute::{Context, DataflowState, ErrCollector}; use crate::expr::error::InternalSnafu; use crate::expr::GlobalId; @@ -58,8 +59,6 @@ pub type TableName = Vec; /// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread /// /// The choice of timestamp is just using current system timestamp for now -/// -/// TODO(discord9): refactor flow worker into a separate !Send struct? Flow Workers and Flow Node exists on same machine pub struct FlowNodeManager { /// The handler to the worker that will run the dataflow /// which is `!Send` so a handle is used @@ -68,11 +67,46 @@ pub struct FlowNodeManager { query_engine: Arc, /// Getting table name and table schema from table info manager srv_map: TableInfoSource, + frontend_invoker: Box, /// contains mapping from table name to global id, and table schema node_context: FlowNodeContext, tick_manager: FlowTickManager, } +impl FlowNodeManager { + pub fn new( + frontend_invoker: Box, + query_engine: Arc, + table_info: TableInfoManager, + ) -> Self { + let srv_map = TableInfoSource::new(table_info); + let node_context = FlowNodeContext::default(); + let tick_manager = FlowTickManager::new(); + let worker_handles = Vec::new(); + FlowNodeManager { + worker_handles, + query_engine, + srv_map, + frontend_invoker, + node_context, + tick_manager, + } + } + pub fn new_with_worker<'s>( + frontend_invoker: Box, + query_engine: Arc, + table_info: TableInfoManager, + ) -> (Self, Worker<'s>) { + let mut zelf = Self::new(frontend_invoker, query_engine, table_info); + let (handle, worker) = create_worker(zelf.tick_manager.clone()); + zelf.add_worker_handle(handle); + (zelf, worker) + } + pub fn add_worker_handle(&mut self, handle: WorkerHandle) { + self.worker_handles.push(handle); + } +} + /// Just check if NodeManager's other fields are `Send` so later we can refactor so A Flow Node Manager /// can manage multiple flow worker(thread) then we can run multiple flow worker in a single flow node manager #[test] @@ -445,7 +479,7 @@ impl FlowNodeContext { pub fn add_sink_receiver(&mut self, table_name: TableName) { self.sink_receiver .entry(table_name) - .or_insert_with(|| mpsc::unbounded_channel::()); + .or_insert_with(mpsc::unbounded_channel::); } pub fn get_source_by_global_id( @@ -479,7 +513,7 @@ impl FlowNodeContext { .0; self.sink_receiver .get(&table_name) - .map(|(s, r)| s.clone()) + .map(|(s, _r)| s.clone()) .with_context(|| TableNotFoundSnafu { name: table_name.join("."), })