From e88a40b58b656b5871c6b702c6819fd99eb06a41 Mon Sep 17 00:00:00 2001 From: discord9 Date: Sun, 28 Apr 2024 16:20:04 +0800 Subject: [PATCH] refactor: use seperate Worker --- src/flow/src/adapter.rs | 189 +++++++++++++-------------------- src/flow/src/adapter/tests.rs | 2 +- src/flow/src/adapter/worker.rs | 83 ++++++++++----- 3 files changed, 133 insertions(+), 141 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 423e0b67aa..fa76fafc01 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -29,10 +29,11 @@ use smallvec::SmallVec; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; use table::metadata::TableId; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, mpsc}; use tokio::task::LocalSet; use crate::adapter::error::{EvalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu}; +use crate::adapter::worker::WorkerHandle; use crate::compute::{Context, DataflowState, ErrCollector}; use crate::expr::error::InternalSnafu; use crate::expr::GlobalId; @@ -57,32 +58,25 @@ pub type TableName = Vec; /// The choice of timestamp is just using current system timestamp for now /// /// TODO(discord9): refactor flow worker into a separate !Send struct? Flow Workers and Flow Node exists on same machine -pub struct FlowNodeManager<'subgraph> { - /// The state of all tasks in the flow node - /// This is also the only field that's not `Send` in the struct - pub task_states: BTreeMap>, +pub struct FlowNodeManager { + /// The handler to the worker that will run the dataflow + /// which is `!Send` so a handle is used + pub worker_handles: Vec, /// The query engine that will be used to parse the query and convert it to a dataflow plan query_engine: Arc, + /// Getting table name and table schema from table info manager srv_map: TableInfoSource, /// contains mapping from table name to global id, and table schema node_context: FlowNodeContext, tick_manager: FlowTickManager, } -/// This is non-SEND struct, so we can't run it in a separate thread -pub struct FlowWorkerManager<'subgraph> { - pub task_states: BTreeMap>, -} - /// Just check if NodeManager's other fields are `Send` so later we can refactor so A Flow Node Manager /// can manage multiple flow worker(thread) then we can run multiple flow worker in a single flow node manager #[test] fn check_is_send() { fn is_send() {} - is_send::(); - is_send::(); - is_send::(); - is_send::>(); + is_send::(); } /// mapping of table name <-> table id should be query from tableinfo manager @@ -220,56 +214,14 @@ pub fn diff_row_to_request(rows: Vec) -> Vec { reqs } -impl<'s> FlowNodeManager<'s> { - /// blocking run the dataflow's grpc service & execution in a `LocalSet` - /// - /// the idomic way to run the dataflow - /// is spawn a new thread, then create a flow node manager, and run the dataflow - /// using this method - pub fn run_dataflow(self, rt: tokio::runtime::Runtime, local_set: LocalSet) { - local_set.block_on(&rt, async move { - // TODO(discord9): might place grpc service on another thread? - let zelf = self; - todo!("main loop"); - }); - } - +impl FlowNodeManager { /// Run all available subgraph in the flow node /// This will try to run all dataflow in this node /// TODO(discord9): add flag for subgraph that have input since last run pub fn run_available(&mut self) { - let now = self.tick_manager.tick(); - for (task_id, task_state) in self.task_states.iter_mut() { - task_state.set_current_ts(now); - task_state.run_available(); - - // if there is work done, check for new data in the sink - while task_state.run_available() { - let sink_table_name = self.node_context.task_to_sink.get(task_id).unwrap(); - let sink_buf = self - .node_context - .sink_buffer - .entry(sink_table_name.clone()) - .or_default(); - let sink_recv = self - .node_context - .sink_receiver - .get_mut(sink_table_name) - .unwrap(); - // TODO(discord9): handle lagging eror - while let Ok(row) = sink_recv.1.try_recv() { - sink_buf.push_back(row); - } - } - } - } - - /// Take everything in sink buffer and construct write request which should be send to the frontend - pub fn take_sink_request_per_table(&mut self) -> Vec<(TableName, Vec)> { - std::mem::take(&mut self.node_context.sink_buffer) - .into_iter() - .map(|(name, buf)| (name, buf.into_iter().collect())) - .collect() + self.worker_handles.iter_mut().for_each(|worker| { + worker.run_available(); + }); } /// send write request to related source sender @@ -283,6 +235,16 @@ impl<'s> FlowNodeManager<'s> { Ok(()) } + pub async fn remove_task(&mut self, task_id: TaskId) -> Result<(), Error> { + for handle in self.worker_handles.iter_mut() { + if handle.contains_task(task_id)? { + handle.remove_task(task_id)?; + break; + } + } + Ok(()) + } + /// Return task id if a new task is created, otherwise return None /// /// steps to create task: @@ -304,8 +266,10 @@ impl<'s> FlowNodeManager<'s> { ) -> Result, Error> { if create_if_not_exist { // check if the task already exists - if self.task_states.contains_key(&task_id) { - return Ok(None); + for handle in self.worker_handles.iter() { + if handle.contains_task(task_id)? { + return Ok(None); + } } } // assign global id to source and sink table @@ -317,62 +281,52 @@ impl<'s> FlowNodeManager<'s> { .assign_global_id_to_table(&self.srv_map, *source) .await; } + self.node_context + .register_task_src_sink(task_id, source_table_ids, sink_table_id); // construct a active dataflow state with it let flow_plan = sql_to_flow_plan(&mut self.node_context, &self.query_engine, &sql).await?; // TODO(discord9): parse `expire_when` + let _ = expire_when; + let _ = comment; + let _ = task_options; + + // TODO(discord9): add more than one handles + let sink_id = self + .node_context + .table_repr + .get_by_table_id(&sink_table_id) + .unwrap() + .1; + let sink_sender = self.node_context.get_sink_by_global_id(&sink_id)?; + + let source_ids = source_table_ids + .iter() + .map(|id| self.node_context.table_repr.get_by_table_id(id).unwrap().1) + .collect_vec(); + let source_senders = source_ids + .iter() + .map(|id| { + self.node_context + .get_source_by_global_id(id) + .map(|s| s.subscribe()) + }) + .collect::, _>>()?; + + let handle = &self.worker_handles[0]; + handle.create_task( + task_id, + flow_plan, + sink_id, + sink_sender, + &source_ids, + source_senders, + create_if_not_exist, + )?; - self.create_ctx_and_render(task_id, flow_plan, sink_table_id, source_table_ids)?; Ok(Some(task_id)) } - - /// create a render context, render the plan, and connect source/sink to the rendered dataflow - /// - /// return the output table's assigned global id - fn create_ctx_and_render( - &mut self, - task_id: TaskId, - plan: TypedPlan, - sink_table_id: TableId, - source_table_ids: &[TableId], - ) -> Result<(), Error> { - let mut cur_task_state = ActiveDataflowState::<'s>::default(); - - { - let sink_global_id = self - .node_context - .table_repr - .get_by_table_id(&sink_table_id) - .with_context(|| TableNotFoundSnafu { - name: sink_table_id.to_string(), - })? - .1; - let mut ctx = cur_task_state.new_ctx(sink_global_id); - // rendering source now that we have the context - for source in source_table_ids { - let source = self - .node_context - .table_repr - .get_by_table_id(source) - .with_context(|| TableNotFoundSnafu { - name: source.to_string(), - })? - .1; - let source_sender = self.node_context.get_source_by_global_id(&source)?; - let recv = source_sender.subscribe(); - let bundle = ctx.render_source(recv)?; - ctx.insert_global(source, bundle); - } - - let rendered_dataflow = ctx.render_plan(plan.plan)?; - let sink_sender = self.node_context.get_sink_by_global_id(&sink_global_id)?; - ctx.render_sink(rendered_dataflow, sink_sender); - } - - self.task_states.insert(task_id, cur_task_state); - Ok(()) - } } /// A context that holds the information of the dataflow @@ -390,10 +344,13 @@ pub struct FlowNodeContext { /// /// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key /// note that the sink receiver should only have one, and we are using broadcast as mpsc channel here - pub sink_receiver: - BTreeMap, broadcast::Receiver)>, - /// store sink buffer for each sink table, used for sending data back to the frontend - pub sink_buffer: BTreeMap>, + pub sink_receiver: BTreeMap< + TableName, + ( + mpsc::UnboundedSender, + mpsc::UnboundedReceiver, + ), + >, /// store source in buffer for each source table, in case broadcast channel is full pub send_buffer: BTreeMap>, /// the schema of the table @@ -469,7 +426,7 @@ impl FlowNodeContext { pub fn add_sink_receiver(&mut self, table_name: TableName) { self.sink_receiver .entry(table_name) - .or_insert_with(|| broadcast::channel(BOARDCAST_CAP)); + .or_insert_with(|| mpsc::unbounded_channel::()); } pub fn get_source_by_global_id( @@ -493,7 +450,7 @@ impl FlowNodeContext { pub fn get_sink_by_global_id( &self, id: &GlobalId, - ) -> Result, Error> { + ) -> Result, Error> { let table_name = self .table_repr .get_by_global_id(id) diff --git a/src/flow/src/adapter/tests.rs b/src/flow/src/adapter/tests.rs index e2a948f6bf..672fff4675 100644 --- a/src/flow/src/adapter/tests.rs +++ b/src/flow/src/adapter/tests.rs @@ -68,7 +68,7 @@ fn mock_harness_flow_node_manager() { { let table_id: TableId = 451; let key = TableInfoKey::new(table_id); - let val = todo!(); + let val = new_test_table_info_with_name(table_id, "table1", vec![1, 2, 3]); } let kv = Arc::new(kvb) as KvBackendRef; let info_manager = TableInfoManager::new(kv); diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index 44e2820c67..1b884b24c2 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -76,7 +76,10 @@ impl WorkerHandle { }; let ret = self.itc_client.blocking_lock().call_blocking(req)?; - if let Response::Create { task_create_result } = ret { + if let Response::Create { + result: task_create_result, + } = ret + { task_create_result } else { InternalSnafu { @@ -90,11 +93,11 @@ impl WorkerHandle { } /// remove task, return task id - pub fn remove_task(&self, task_id: TaskId) -> Result { + pub fn remove_task(&self, task_id: TaskId) -> Result { let req = Request::Remove { task_id }; let ret = self.itc_client.blocking_lock().call_blocking(req)?; - if let Response::Remove { task_remove_result } = ret { - task_remove_result + if let Response::Remove { result } = ret { + Ok(result) } else { InternalSnafu { reason: format!( @@ -107,10 +110,29 @@ impl WorkerHandle { } // trigger running the worker - pub fn trigger_run(&self) { + pub fn run_available(&self) { self.itc_client .blocking_lock() - .call_non_resp(Request::TriggerRun); + .call_non_blocking(Request::RunAvail); + } + + pub fn contains_task(&self, task_id: TaskId) -> Result { + let req = Request::ContainTask { task_id }; + let ret = self.itc_client.blocking_lock().call_blocking(req).unwrap(); + if let Response::ContainTask { + result: task_contain_result, + } = ret + { + Ok(task_contain_result) + } else { + InternalSnafu { + reason: format!( + "Flow Node/Worker itc failed, expect Response::ContainTask, found {ret:?}" + ), + } + .fail() + .with_context(|_| EvalSnafu {}) + } } } @@ -156,9 +178,9 @@ impl<'s> Worker<'s> { Ok(Some(task_id)) } - pub fn remove_task(&mut self, task_id: TaskId) -> Result<(), Error> { - self.task_states.remove(&task_id); - Ok(()) + /// remove task, return true if a task is removed + pub fn remove_task(&mut self, task_id: TaskId) -> bool { + self.task_states.remove(&task_id).is_some() } /// run the worker until it is dropped @@ -185,21 +207,28 @@ impl<'s> Worker<'s> { src_recvs, create_if_not_exist, ); + self.itc_server.lock().await.resp( + req_id, + Response::Create { + result: task_create_result, + }, + ); + } + Request::Remove { task_id } => { + let ret = self.remove_task(task_id); self.itc_server .lock() .await - .resp(req_id, Response::Create { task_create_result }); + .resp(req_id, Response::Remove { result: ret }) } - Request::Remove { task_id } => { - let ret = self.remove_task(task_id).map(|_| task_id); - self.itc_server.lock().await.resp( - req_id, - Response::Remove { - task_remove_result: ret, - }, - ) + Request::RunAvail => self.run_tick(), + Request::ContainTask { task_id } => { + let ret = self.task_states.contains_key(&task_id); + self.itc_server + .lock() + .await + .resp(req_id, Response::ContainTask { result: ret }) } - Request::TriggerRun => self.run_tick(), } } @@ -228,16 +257,22 @@ enum Request { task_id: TaskId, }, /// Trigger the worker to run, useful after input buffer is full - TriggerRun, + RunAvail, + ContainTask { + task_id: TaskId, + }, } #[derive(Debug)] enum Response { Create { - task_create_result: Result, Error>, + result: Result, Error>, }, Remove { - task_remove_result: Result, + result: bool, + }, + ContainTask { + result: bool, }, } @@ -263,8 +298,8 @@ struct InterThreadCallClient { } impl InterThreadCallClient { - /// call without expecting responses - fn call_non_resp(&mut self, req: Request) { + /// call without expecting responses or blocking + fn call_non_blocking(&mut self, req: Request) { let call_id = { let mut call_id = self.call_id.blocking_lock(); *call_id += 1;