diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index e5472b42a0..fe9def4a94 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -46,7 +46,7 @@ use crate::compute::{Context, DataflowState, ErrCollector}; use crate::expr::error::InternalSnafu; use crate::expr::GlobalId; use crate::plan::{Plan, TypedPlan}; -use crate::repr::{self, ColumnType, DiffRow, RelationType, Row, BOARDCAST_CAP}; +use crate::repr::{self, ColumnType, DiffRow, RelationType, Row, BROADCAST_CAP}; use crate::transform::sql_to_flow_plan; pub(crate) mod error; @@ -64,29 +64,23 @@ pub const PER_REQ_MAX_ROW_CNT: usize = 8192; pub type TaskId = u64; pub type TableName = Vec; -/// This function shouldn't be called from a async context, -/// -/// better to be called from i.e. inside a `thread::spawn` +/// This function will create a new thread for flow worker and return a handle to the flow node manager pub fn start_flow_node_and_one_worker( frontend_invoker: Box, query_engine: Arc, table_meta: TableMetadataManagerRef, -) { - let node_id = Some(1); - let (flow_node_manager, mut worker) = - FlowNodeManager::new_with_worker(node_id, frontend_invoker, query_engine, table_meta); - let rt = tokio::runtime::Runtime::new().unwrap(); - let local = tokio::task::LocalSet::new(); +) -> FlowNodeManager { + let (tx, rx) = oneshot::channel(); - local.block_on(&rt, async move { - let worker_handle = tokio::task::spawn_local(async move { - worker.run().await; - }); - let node_handle = tokio::task::spawn_local(async move { - flow_node_manager.run().await; - }); - tokio::try_join!(worker_handle, node_handle).unwrap(); + let _handle = std::thread::spawn(move || { + let node_id = Some(1); + let (flow_node_manager, mut worker) = + FlowNodeManager::new_with_worker(node_id, frontend_invoker, query_engine, table_meta); + let _ = tx.send(flow_node_manager); + worker.run(); }); + + rx.blocking_recv().unwrap() } pub type FlowNodeManagerRef = Arc; @@ -116,7 +110,6 @@ impl FlowNodeManager { /// note that this method didn't handle input mirror request, as this should be handled by grpc server pub async fn run(&self) { loop { - // TODO(discord9): start a server to listen for incoming request self.run_available().await; // TODO(discord9): error handling let _ = self.send_writeback_requests().await; @@ -154,7 +147,7 @@ impl FlowNodeManager { table_meta: TableMetadataManagerRef, ) -> (Self, Worker<'s>) { let mut zelf = Self::new(node_id, frontend_invoker, query_engine, table_meta); - let (handle, worker) = create_worker(zelf.tick_manager.clone()); + let (handle, worker) = create_worker(); zelf.add_worker_handle(handle); (zelf, worker) } @@ -300,8 +293,9 @@ impl FlowNodeManager { /// However this is not blocking and can sometimes return while actual computation is still running in worker thread /// TODO(discord9): add flag for subgraph that have input since last run pub async fn run_available(&self) { + let now = self.tick_manager.tick(); for worker in self.worker_handles.iter() { - worker.lock().await.run_available(); + worker.lock().await.run_available(now); } } @@ -433,10 +427,8 @@ impl FlowNodeManager { /// steps to create task: /// 1. parse query into typed plan(and optional parse expire_when expr) /// 2. render source/sink with output table id and used input table id - /// - /// TODO(discord9): use greptime-proto type to create task instead #[allow(clippy::too_many_arguments)] - pub async fn create_task( + pub async fn create_flow( &self, task_id: TaskId, sink_table_id: TableId, @@ -552,7 +544,7 @@ impl FlowNodeContext { send_buffer.extend(rows); let mut row_cnt = 0; while let Some(row) = send_buffer.pop_front() { - if sender.len() >= BOARDCAST_CAP { + if sender.len() >= BROADCAST_CAP { break; } row_cnt += 1; @@ -600,7 +592,7 @@ impl FlowNodeContext { pub fn add_source_sender(&mut self, table_id: TableId) { self.source_sender .entry(table_id) - .or_insert_with(|| broadcast::channel(BOARDCAST_CAP).0); + .or_insert_with(|| broadcast::channel(BROADCAST_CAP).0); } pub fn add_sink_receiver(&mut self, table_name: TableName) { @@ -746,6 +738,12 @@ pub struct FlowTickManager { anchor: Anchor, } +impl std::fmt::Debug for FlowTickManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FlowTickManager").finish() + } +} + impl FlowTickManager { pub fn new() -> Self { FlowTickManager { diff --git a/src/flow/src/adapter/server.rs b/src/flow/src/adapter/server.rs index 06042994fd..607e072860 100644 --- a/src/flow/src/adapter/server.rs +++ b/src/flow/src/adapter/server.rs @@ -67,7 +67,7 @@ impl flow_server::Flow for FlowService { .await?; let ret = self .manager - .create_task( + .create_flow( task_id.id as u64, sink_table_id, &source_table_ids, @@ -178,8 +178,9 @@ impl servers::server::Server for FlowNodeServer { .serve_with_incoming_shutdown(incoming, rx.map(drop)) .await .context(StartGrpcSnafu); - // TODO(discord9): better place for dataflow to run per second }); + + // TODO(discord9): better place for dataflow to run per second let manager_ref = self.flow_service.manager.clone(); let _handle_trigger_run = common_runtime::spawn_bg(async move { manager_ref.run().await; diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index b69d2ed5e7..cddfe8682d 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -14,6 +14,7 @@ //! For single-thread flow worker +use std::borrow::BorrowMut; use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; @@ -31,13 +32,37 @@ use crate::repr::{self, DiffRow}; pub type SharedBuf = Arc>>; +/// Create both worker(`!Send`) and worker handle(`Send + Sync`) +pub fn create_worker<'a>() -> (WorkerHandle, Worker<'a>) { + let (itc_client, itc_server) = create_inter_thread_call(); + let worker_handle = WorkerHandle { + itc_client: Mutex::new(itc_client), + }; + let worker = Worker { + task_states: BTreeMap::new(), + itc_server: Arc::new(Mutex::new(itc_server)), + }; + (worker_handle, worker) +} + /// ActiveDataflowState is a wrapper around `Hydroflow` and `DataflowState` + pub(crate) struct ActiveDataflowState<'subgraph> { df: Hydroflow<'subgraph>, state: DataflowState, err_collector: ErrCollector, } +impl std::fmt::Debug for ActiveDataflowState<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ActiveDataflowState") + .field("df", &"") + .field("state", &self.state) + .field("err_collector", &self.err_collector) + .finish() + } +} + impl Default for ActiveDataflowState<'_> { fn default() -> Self { ActiveDataflowState { @@ -76,23 +101,11 @@ impl<'subgraph> ActiveDataflowState<'subgraph> { } } +#[derive(Debug)] pub struct WorkerHandle { itc_client: Mutex, } -/// Create both worker(`!Send`) and worker handle(`Send + Sync`) -pub fn create_worker<'a>(tick_manager: FlowTickManager) -> (WorkerHandle, Worker<'a>) { - let (itc_client, itc_server) = create_inter_thread_call(); - let worker_handle = WorkerHandle { - itc_client: Mutex::new(itc_client), - }; - let worker = Worker { - task_states: BTreeMap::new(), - itc_server: Mutex::new(itc_server), - tick_manager, - }; - (worker_handle, worker) -} #[test] fn check_if_send_sync() { fn check() {} @@ -157,11 +170,13 @@ impl WorkerHandle { } } - // trigger running the worker - pub fn run_available(&self) { + /// trigger running the worker, will not block, and will run the worker parallelly + /// + /// will set the current timestamp to `now` for all dataflows before running them + pub fn run_available(&self, now: repr::Timestamp) { self.itc_client .blocking_lock() - .call_non_blocking(Request::RunAvail); + .call_non_blocking(Request::RunAvail { now }); } pub fn contains_task(&self, task_id: TaskId) -> Result { @@ -182,13 +197,20 @@ impl WorkerHandle { .with_context(|_| EvalSnafu {}) } } + + /// shutdown the worker + pub fn shutdown(&self) { + self.itc_client + .blocking_lock() + .call_non_blocking(Request::Shutdown); + } } /// The actual worker that does the work and contain active state +#[derive(Debug)] pub struct Worker<'subgraph> { pub task_states: BTreeMap>, - itc_server: Mutex, - tick_manager: FlowTickManager, + itc_server: Arc>, } impl<'s> Worker<'s> { @@ -231,65 +253,75 @@ impl<'s> Worker<'s> { self.task_states.remove(&task_id).is_some() } - /// run the worker until it is dropped - /// - /// This method should be called inside a `LocalSet` since it's `!Send` - pub async fn run(&mut self) { + /// Run the worker, blocking, until shutdown signal is received + pub fn run(&mut self) { loop { - let (req_id, req) = self.itc_server.lock().await.recv().await.unwrap(); - match req { - Request::Create { - task_id, - plan, - sink_id, - sink_sender, - source_ids, - src_recvs, - create_if_not_exist, - } => { - let task_create_result = self.create_task( - task_id, - plan, - sink_id, - sink_sender, - &source_ids, - src_recvs, - create_if_not_exist, - ); - self.itc_server.lock().await.resp( - req_id, - Response::Create { - result: task_create_result, - }, - ); + let (req_id, req) = self.itc_server.blocking_lock().blocking_recv().unwrap(); + let ret = self.handle_req(req_id, req); + match ret { + Ok(Some((id, resp))) => { + self.itc_server.blocking_lock().resp(id, resp); } - Request::Remove { task_id } => { - let ret = self.remove_task(task_id); - self.itc_server - .lock() - .await - .resp(req_id, Response::Remove { result: ret }) - } - Request::RunAvail => self.run_tick(), - Request::ContainTask { task_id } => { - let ret = self.task_states.contains_key(&task_id); - self.itc_server - .lock() - .await - .resp(req_id, Response::ContainTask { result: ret }) + Ok(None) => continue, + Err(()) => { + break; } } } } - /// return true if any task is running - pub fn run_tick(&mut self) { - let now = self.tick_manager.tick(); + /// run with tick acquired from tick manager(usually means system time) + /// TODO(discord9): better tick management + pub fn run_tick(&mut self, now: repr::Timestamp) { for (_task_id, task_state) in self.task_states.iter_mut() { task_state.set_current_ts(now); task_state.run_available(); } } + /// handle request, return response if any, Err if receive shutdown signal + fn handle_req(&mut self, req_id: usize, req: Request) -> Result, ()> { + let ret = match req { + Request::Create { + task_id, + plan, + sink_id, + sink_sender, + source_ids, + src_recvs, + create_if_not_exist, + } => { + let task_create_result = self.create_task( + task_id, + plan, + sink_id, + sink_sender, + &source_ids, + src_recvs, + create_if_not_exist, + ); + Some(( + req_id, + Response::Create { + result: task_create_result, + }, + )) + } + Request::Remove { task_id } => { + let ret = self.remove_task(task_id); + Some((req_id, Response::Remove { result: ret })) + } + Request::RunAvail { now } => { + self.run_tick(now); + None + } + Request::ContainTask { task_id } => { + let ret = self.task_states.contains_key(&task_id); + Some((req_id, Response::ContainTask { result: ret })) + } + Request::Shutdown => return Err(()), + }; + Ok(ret) + } } #[derive(Debug)] @@ -307,10 +339,13 @@ enum Request { task_id: TaskId, }, /// Trigger the worker to run, useful after input buffer is full - RunAvail, + RunAvail { + now: repr::Timestamp, + }, ContainTask { task_id: TaskId, }, + Shutdown, } #[derive(Debug)] @@ -341,6 +376,7 @@ fn create_inter_thread_call() -> (InterThreadCallClient, InterThreadCallServer) (client, server) } +#[derive(Debug)] struct InterThreadCallClient { call_id: Arc>, arg_sender: mpsc::UnboundedSender<(usize, Request)>, @@ -378,6 +414,7 @@ impl InterThreadCallClient { } } +#[derive(Debug)] struct InterThreadCallServer { pub arg_recv: mpsc::UnboundedReceiver<(usize, Request)>, pub ret_sender: mpsc::UnboundedSender<(usize, Response)>, @@ -388,8 +425,61 @@ impl InterThreadCallServer { self.arg_recv.recv().await } + pub fn blocking_recv(&mut self) -> Option<(usize, Request)> { + self.arg_recv.blocking_recv() + } + /// Send response back to the client pub fn resp(&self, call_id: usize, resp: Response) { self.ret_sender.send((call_id, resp)).unwrap(); } } + +#[cfg(test)] +mod test { + use tokio::sync::oneshot; + + use super::*; + use crate::expr::Id; + use crate::plan::Plan; + use crate::repr::{RelationType, Row}; + #[test] + pub fn test_simple_get_with_worker_and_handle() { + let flow_tick = FlowTickManager::new(); + let (tx, rx) = oneshot::channel(); + let worker_thread_handle = std::thread::spawn(move || { + let (handle, mut worker) = create_worker(); + tx.send(handle).unwrap(); + worker.run(); + }); + let handle = rx.blocking_recv().unwrap(); + let src_ids = vec![GlobalId::User(1)]; + let (tx, rx) = broadcast::channel::(1024); + let (sink_tx, mut sink_rx) = mpsc::unbounded_channel::(); + let (task_id, plan) = ( + 1, + TypedPlan { + plan: Plan::Get { + id: Id::Global(GlobalId::User(1)), + }, + typ: RelationType::new(vec![]), + }, + ); + handle + .create_task( + task_id, + plan, + GlobalId::User(1), + sink_tx, + &src_ids, + vec![rx], + true, + ) + .unwrap(); + tx.send((Row::empty(), 0, 0)).unwrap(); + handle.run_available(flow_tick.tick()); + assert_eq!(sink_rx.blocking_recv().unwrap().0, Row::empty()); + handle.shutdown(); + worker_thread_handle.join().unwrap(); + } +} diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index 3900b38190..e6d6b9f6e6 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -25,7 +25,7 @@ use crate::adapter::error::{Error, PlanSnafu}; use crate::compute::render::Context; use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff}; use crate::expr::GlobalId; -use crate::repr::{DiffRow, Row, BOARDCAST_CAP}; +use crate::repr::{DiffRow, Row, BROADCAST_CAP}; #[allow(clippy::mutable_key_type)] impl<'referred, 'df> Context<'referred, 'df> { @@ -121,12 +121,12 @@ impl<'referred, 'df> Context<'referred, 'df> { .add_subgraph_sink("Sink", collection.into_inner(), move |_ctx, recv| { let data = recv.take_inner(); buf.extend(data.into_iter().flat_map(|i| i.into_iter())); - if sender.len() >= BOARDCAST_CAP { + if sender.len() >= BROADCAST_CAP { return; } else { while let Some(row) = buf.pop_front() { // if the sender is full, stop sending - if sender.len() >= BOARDCAST_CAP { + if sender.len() >= BROADCAST_CAP { break; } // TODO(discord9): handling tokio broadcast error diff --git a/src/flow/src/compute/state.rs b/src/flow/src/compute/state.rs index 5bcb3a7ab1..a9a431de97 100644 --- a/src/flow/src/compute/state.rs +++ b/src/flow/src/compute/state.rs @@ -25,7 +25,7 @@ use crate::utils::{ArrangeHandler, Arrangement}; /// input/output of a dataflow /// One `ComputeState` manage the input/output/schedule of one `Hydroflow` -#[derive(Default)] +#[derive(Debug, Default)] pub struct DataflowState { /// it is important to use a deque to maintain the order of subgraph here /// TODO(discord9): consider dedup? Also not necessary for hydroflow itself also do dedup when schedule diff --git a/src/flow/src/compute/types.rs b/src/flow/src/compute/types.rs index 4098f3c548..42b102e072 100644 --- a/src/flow/src/compute/types.rs +++ b/src/flow/src/compute/types.rs @@ -146,7 +146,7 @@ impl CollectionBundle { /// /// Using a `VecDeque` to preserve the order of errors /// when running dataflow continuously and need errors in order -#[derive(Default, Clone)] +#[derive(Debug, Default, Clone)] pub struct ErrCollector { pub inner: Arc>>, } diff --git a/src/flow/src/repr.rs b/src/flow/src/repr.rs index 6d3622f245..94f2c15497 100644 --- a/src/flow/src/repr.rs +++ b/src/flow/src/repr.rs @@ -52,7 +52,7 @@ pub type DiffRow = (Row, Timestamp, Diff); pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff); /// broadcast channel capacity, set to a arbitrary value -pub const BOARDCAST_CAP: usize = 1024; +pub const BROADCAST_CAP: usize = 1024; /// Convert a value that is or can be converted to Datetime to internal timestamp ///