feat: new() for FlowNodeManager

This commit is contained in:
discord9
2024-04-28 18:50:20 +08:00
parent edcbc89c38
commit 74dee25688
3 changed files with 43 additions and 5 deletions

2
Cargo.lock generated
View File

@@ -3516,10 +3516,12 @@ name = "flow"
version = "0.7.2"
dependencies = [
"api",
"async-trait",
"catalog",
"common-catalog",
"common-decimal",
"common-error",
"common-frontend",
"common-macro",
"common-meta",
"common-telemetry",

View File

@@ -15,6 +15,7 @@ workspace = true
api.workspace = true
common-decimal.workspace = true
common-error.workspace = true
common-frontend.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
@@ -25,6 +26,7 @@ datatypes.workspace = true
enum_dispatch = "0.3"
# This fork is simply for keeping our dependency in our org, and pin the version
# it is the same with upstream repo
async-trait.workspace = true
common-meta.workspace = true
greptime-proto.workspace = true
hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" }

View File

@@ -17,6 +17,7 @@
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::sync::Arc;
use common_frontend::handler::FrontendInvoker;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::TableNameManager;
use hydroflow::scheduled::graph::Hydroflow;
@@ -33,7 +34,7 @@ use tokio::sync::{broadcast, mpsc};
use tokio::task::LocalSet;
use crate::adapter::error::{EvalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu};
use crate::adapter::worker::WorkerHandle;
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::{Context, DataflowState, ErrCollector};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
@@ -58,8 +59,6 @@ pub type TableName = Vec<String>;
/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread
///
/// The choice of timestamp is just using current system timestamp for now
///
/// TODO(discord9): refactor flow worker into a separate !Send struct? Flow Workers and Flow Node exists on same machine
pub struct FlowNodeManager {
/// The handler to the worker that will run the dataflow
/// which is `!Send` so a handle is used
@@ -68,11 +67,46 @@ pub struct FlowNodeManager {
query_engine: Arc<dyn QueryEngine>,
/// Getting table name and table schema from table info manager
srv_map: TableInfoSource,
frontend_invoker: Box<dyn FrontendInvoker + Send + Sync>,
/// contains mapping from table name to global id, and table schema
node_context: FlowNodeContext,
tick_manager: FlowTickManager,
}
impl FlowNodeManager {
pub fn new(
frontend_invoker: Box<dyn FrontendInvoker + Send + Sync>,
query_engine: Arc<dyn QueryEngine>,
table_info: TableInfoManager,
) -> Self {
let srv_map = TableInfoSource::new(table_info);
let node_context = FlowNodeContext::default();
let tick_manager = FlowTickManager::new();
let worker_handles = Vec::new();
FlowNodeManager {
worker_handles,
query_engine,
srv_map,
frontend_invoker,
node_context,
tick_manager,
}
}
pub fn new_with_worker<'s>(
frontend_invoker: Box<dyn FrontendInvoker + Send + Sync>,
query_engine: Arc<dyn QueryEngine>,
table_info: TableInfoManager,
) -> (Self, Worker<'s>) {
let mut zelf = Self::new(frontend_invoker, query_engine, table_info);
let (handle, worker) = create_worker(zelf.tick_manager.clone());
zelf.add_worker_handle(handle);
(zelf, worker)
}
pub fn add_worker_handle(&mut self, handle: WorkerHandle) {
self.worker_handles.push(handle);
}
}
/// Just check if NodeManager's other fields are `Send` so later we can refactor so A Flow Node Manager
/// can manage multiple flow worker(thread) then we can run multiple flow worker in a single flow node manager
#[test]
@@ -445,7 +479,7 @@ impl FlowNodeContext {
pub fn add_sink_receiver(&mut self, table_name: TableName) {
self.sink_receiver
.entry(table_name)
.or_insert_with(|| mpsc::unbounded_channel::<DiffRow>());
.or_insert_with(mpsc::unbounded_channel::<DiffRow>);
}
pub fn get_source_by_global_id(
@@ -479,7 +513,7 @@ impl FlowNodeContext {
.0;
self.sink_receiver
.get(&table_name)
.map(|(s, r)| s.clone())
.map(|(s, _r)| s.clone())
.with_context(|| TableNotFoundSnafu {
name: table_name.join("."),
})