refactor: make worker sync only and separate thread&test

This commit is contained in:
discord9
2024-04-29 18:03:13 +08:00
parent e33afa53f4
commit c22185abce
7 changed files with 189 additions and 100 deletions

View File

@@ -46,7 +46,7 @@ use crate::compute::{Context, DataflowState, ErrCollector};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, ColumnType, DiffRow, RelationType, Row, BOARDCAST_CAP};
use crate::repr::{self, ColumnType, DiffRow, RelationType, Row, BROADCAST_CAP};
use crate::transform::sql_to_flow_plan;
pub(crate) mod error;
@@ -64,29 +64,23 @@ pub const PER_REQ_MAX_ROW_CNT: usize = 8192;
pub type TaskId = u64;
pub type TableName = Vec<String>;
/// This function shouldn't be called from a async context,
///
/// better to be called from i.e. inside a `thread::spawn`
/// This function will create a new thread for flow worker and return a handle to the flow node manager
pub fn start_flow_node_and_one_worker(
frontend_invoker: Box<dyn FrontendInvoker + Send + Sync>,
query_engine: Arc<dyn QueryEngine>,
table_meta: TableMetadataManagerRef,
) {
let node_id = Some(1);
let (flow_node_manager, mut worker) =
FlowNodeManager::new_with_worker(node_id, frontend_invoker, query_engine, table_meta);
let rt = tokio::runtime::Runtime::new().unwrap();
let local = tokio::task::LocalSet::new();
) -> FlowNodeManager {
let (tx, rx) = oneshot::channel();
local.block_on(&rt, async move {
let worker_handle = tokio::task::spawn_local(async move {
worker.run().await;
});
let node_handle = tokio::task::spawn_local(async move {
flow_node_manager.run().await;
});
tokio::try_join!(worker_handle, node_handle).unwrap();
let _handle = std::thread::spawn(move || {
let node_id = Some(1);
let (flow_node_manager, mut worker) =
FlowNodeManager::new_with_worker(node_id, frontend_invoker, query_engine, table_meta);
let _ = tx.send(flow_node_manager);
worker.run();
});
rx.blocking_recv().unwrap()
}
pub type FlowNodeManagerRef = Arc<FlowNodeManager>;
@@ -116,7 +110,6 @@ impl FlowNodeManager {
/// note that this method didn't handle input mirror request, as this should be handled by grpc server
pub async fn run(&self) {
loop {
// TODO(discord9): start a server to listen for incoming request
self.run_available().await;
// TODO(discord9): error handling
let _ = self.send_writeback_requests().await;
@@ -154,7 +147,7 @@ impl FlowNodeManager {
table_meta: TableMetadataManagerRef,
) -> (Self, Worker<'s>) {
let mut zelf = Self::new(node_id, frontend_invoker, query_engine, table_meta);
let (handle, worker) = create_worker(zelf.tick_manager.clone());
let (handle, worker) = create_worker();
zelf.add_worker_handle(handle);
(zelf, worker)
}
@@ -300,8 +293,9 @@ impl FlowNodeManager {
/// However this is not blocking and can sometimes return while actual computation is still running in worker thread
/// TODO(discord9): add flag for subgraph that have input since last run
pub async fn run_available(&self) {
let now = self.tick_manager.tick();
for worker in self.worker_handles.iter() {
worker.lock().await.run_available();
worker.lock().await.run_available(now);
}
}
@@ -433,10 +427,8 @@ impl FlowNodeManager {
/// steps to create task:
/// 1. parse query into typed plan(and optional parse expire_when expr)
/// 2. render source/sink with output table id and used input table id
///
/// TODO(discord9): use greptime-proto type to create task instead
#[allow(clippy::too_many_arguments)]
pub async fn create_task(
pub async fn create_flow(
&self,
task_id: TaskId,
sink_table_id: TableId,
@@ -552,7 +544,7 @@ impl FlowNodeContext {
send_buffer.extend(rows);
let mut row_cnt = 0;
while let Some(row) = send_buffer.pop_front() {
if sender.len() >= BOARDCAST_CAP {
if sender.len() >= BROADCAST_CAP {
break;
}
row_cnt += 1;
@@ -600,7 +592,7 @@ impl FlowNodeContext {
pub fn add_source_sender(&mut self, table_id: TableId) {
self.source_sender
.entry(table_id)
.or_insert_with(|| broadcast::channel(BOARDCAST_CAP).0);
.or_insert_with(|| broadcast::channel(BROADCAST_CAP).0);
}
pub fn add_sink_receiver(&mut self, table_name: TableName) {
@@ -746,6 +738,12 @@ pub struct FlowTickManager {
anchor: Anchor,
}
impl std::fmt::Debug for FlowTickManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FlowTickManager").finish()
}
}
impl FlowTickManager {
pub fn new() -> Self {
FlowTickManager {

View File

@@ -67,7 +67,7 @@ impl flow_server::Flow for FlowService {
.await?;
let ret = self
.manager
.create_task(
.create_flow(
task_id.id as u64,
sink_table_id,
&source_table_ids,
@@ -178,8 +178,9 @@ impl servers::server::Server for FlowNodeServer {
.serve_with_incoming_shutdown(incoming, rx.map(drop))
.await
.context(StartGrpcSnafu);
// TODO(discord9): better place for dataflow to run per second
});
// TODO(discord9): better place for dataflow to run per second
let manager_ref = self.flow_service.manager.clone();
let _handle_trigger_run = common_runtime::spawn_bg(async move {
manager_ref.run().await;

View File

@@ -14,6 +14,7 @@
//! For single-thread flow worker
use std::borrow::BorrowMut;
use std::collections::{BTreeMap, VecDeque};
use std::sync::Arc;
@@ -31,13 +32,37 @@ use crate::repr::{self, DiffRow};
pub type SharedBuf = Arc<Mutex<VecDeque<DiffRow>>>;
/// Create both worker(`!Send`) and worker handle(`Send + Sync`)
pub fn create_worker<'a>() -> (WorkerHandle, Worker<'a>) {
let (itc_client, itc_server) = create_inter_thread_call();
let worker_handle = WorkerHandle {
itc_client: Mutex::new(itc_client),
};
let worker = Worker {
task_states: BTreeMap::new(),
itc_server: Arc::new(Mutex::new(itc_server)),
};
(worker_handle, worker)
}
/// ActiveDataflowState is a wrapper around `Hydroflow` and `DataflowState`
pub(crate) struct ActiveDataflowState<'subgraph> {
df: Hydroflow<'subgraph>,
state: DataflowState,
err_collector: ErrCollector,
}
impl std::fmt::Debug for ActiveDataflowState<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ActiveDataflowState")
.field("df", &"<Hydroflow>")
.field("state", &self.state)
.field("err_collector", &self.err_collector)
.finish()
}
}
impl Default for ActiveDataflowState<'_> {
fn default() -> Self {
ActiveDataflowState {
@@ -76,23 +101,11 @@ impl<'subgraph> ActiveDataflowState<'subgraph> {
}
}
#[derive(Debug)]
pub struct WorkerHandle {
itc_client: Mutex<InterThreadCallClient>,
}
/// Create both worker(`!Send`) and worker handle(`Send + Sync`)
pub fn create_worker<'a>(tick_manager: FlowTickManager) -> (WorkerHandle, Worker<'a>) {
let (itc_client, itc_server) = create_inter_thread_call();
let worker_handle = WorkerHandle {
itc_client: Mutex::new(itc_client),
};
let worker = Worker {
task_states: BTreeMap::new(),
itc_server: Mutex::new(itc_server),
tick_manager,
};
(worker_handle, worker)
}
#[test]
fn check_if_send_sync() {
fn check<T: Send + Sync>() {}
@@ -157,11 +170,13 @@ impl WorkerHandle {
}
}
// trigger running the worker
pub fn run_available(&self) {
/// trigger running the worker, will not block, and will run the worker parallelly
///
/// will set the current timestamp to `now` for all dataflows before running them
pub fn run_available(&self, now: repr::Timestamp) {
self.itc_client
.blocking_lock()
.call_non_blocking(Request::RunAvail);
.call_non_blocking(Request::RunAvail { now });
}
pub fn contains_task(&self, task_id: TaskId) -> Result<bool, Error> {
@@ -182,13 +197,20 @@ impl WorkerHandle {
.with_context(|_| EvalSnafu {})
}
}
/// shutdown the worker
pub fn shutdown(&self) {
self.itc_client
.blocking_lock()
.call_non_blocking(Request::Shutdown);
}
}
/// The actual worker that does the work and contain active state
#[derive(Debug)]
pub struct Worker<'subgraph> {
pub task_states: BTreeMap<TaskId, ActiveDataflowState<'subgraph>>,
itc_server: Mutex<InterThreadCallServer>,
tick_manager: FlowTickManager,
itc_server: Arc<Mutex<InterThreadCallServer>>,
}
impl<'s> Worker<'s> {
@@ -231,65 +253,75 @@ impl<'s> Worker<'s> {
self.task_states.remove(&task_id).is_some()
}
/// run the worker until it is dropped
///
/// This method should be called inside a `LocalSet` since it's `!Send`
pub async fn run(&mut self) {
/// Run the worker, blocking, until shutdown signal is received
pub fn run(&mut self) {
loop {
let (req_id, req) = self.itc_server.lock().await.recv().await.unwrap();
match req {
Request::Create {
task_id,
plan,
sink_id,
sink_sender,
source_ids,
src_recvs,
create_if_not_exist,
} => {
let task_create_result = self.create_task(
task_id,
plan,
sink_id,
sink_sender,
&source_ids,
src_recvs,
create_if_not_exist,
);
self.itc_server.lock().await.resp(
req_id,
Response::Create {
result: task_create_result,
},
);
let (req_id, req) = self.itc_server.blocking_lock().blocking_recv().unwrap();
let ret = self.handle_req(req_id, req);
match ret {
Ok(Some((id, resp))) => {
self.itc_server.blocking_lock().resp(id, resp);
}
Request::Remove { task_id } => {
let ret = self.remove_task(task_id);
self.itc_server
.lock()
.await
.resp(req_id, Response::Remove { result: ret })
}
Request::RunAvail => self.run_tick(),
Request::ContainTask { task_id } => {
let ret = self.task_states.contains_key(&task_id);
self.itc_server
.lock()
.await
.resp(req_id, Response::ContainTask { result: ret })
Ok(None) => continue,
Err(()) => {
break;
}
}
}
}
/// return true if any task is running
pub fn run_tick(&mut self) {
let now = self.tick_manager.tick();
/// run with tick acquired from tick manager(usually means system time)
/// TODO(discord9): better tick management
pub fn run_tick(&mut self, now: repr::Timestamp) {
for (_task_id, task_state) in self.task_states.iter_mut() {
task_state.set_current_ts(now);
task_state.run_available();
}
}
/// handle request, return response if any, Err if receive shutdown signal
fn handle_req(&mut self, req_id: usize, req: Request) -> Result<Option<(usize, Response)>, ()> {
let ret = match req {
Request::Create {
task_id,
plan,
sink_id,
sink_sender,
source_ids,
src_recvs,
create_if_not_exist,
} => {
let task_create_result = self.create_task(
task_id,
plan,
sink_id,
sink_sender,
&source_ids,
src_recvs,
create_if_not_exist,
);
Some((
req_id,
Response::Create {
result: task_create_result,
},
))
}
Request::Remove { task_id } => {
let ret = self.remove_task(task_id);
Some((req_id, Response::Remove { result: ret }))
}
Request::RunAvail { now } => {
self.run_tick(now);
None
}
Request::ContainTask { task_id } => {
let ret = self.task_states.contains_key(&task_id);
Some((req_id, Response::ContainTask { result: ret }))
}
Request::Shutdown => return Err(()),
};
Ok(ret)
}
}
#[derive(Debug)]
@@ -307,10 +339,13 @@ enum Request {
task_id: TaskId,
},
/// Trigger the worker to run, useful after input buffer is full
RunAvail,
RunAvail {
now: repr::Timestamp,
},
ContainTask {
task_id: TaskId,
},
Shutdown,
}
#[derive(Debug)]
@@ -341,6 +376,7 @@ fn create_inter_thread_call() -> (InterThreadCallClient, InterThreadCallServer)
(client, server)
}
#[derive(Debug)]
struct InterThreadCallClient {
call_id: Arc<Mutex<usize>>,
arg_sender: mpsc::UnboundedSender<(usize, Request)>,
@@ -378,6 +414,7 @@ impl InterThreadCallClient {
}
}
#[derive(Debug)]
struct InterThreadCallServer {
pub arg_recv: mpsc::UnboundedReceiver<(usize, Request)>,
pub ret_sender: mpsc::UnboundedSender<(usize, Response)>,
@@ -388,8 +425,61 @@ impl InterThreadCallServer {
self.arg_recv.recv().await
}
pub fn blocking_recv(&mut self) -> Option<(usize, Request)> {
self.arg_recv.blocking_recv()
}
/// Send response back to the client
pub fn resp(&self, call_id: usize, resp: Response) {
self.ret_sender.send((call_id, resp)).unwrap();
}
}
#[cfg(test)]
mod test {
use tokio::sync::oneshot;
use super::*;
use crate::expr::Id;
use crate::plan::Plan;
use crate::repr::{RelationType, Row};
#[test]
pub fn test_simple_get_with_worker_and_handle() {
let flow_tick = FlowTickManager::new();
let (tx, rx) = oneshot::channel();
let worker_thread_handle = std::thread::spawn(move || {
let (handle, mut worker) = create_worker();
tx.send(handle).unwrap();
worker.run();
});
let handle = rx.blocking_recv().unwrap();
let src_ids = vec![GlobalId::User(1)];
let (tx, rx) = broadcast::channel::<DiffRow>(1024);
let (sink_tx, mut sink_rx) = mpsc::unbounded_channel::<DiffRow>();
let (task_id, plan) = (
1,
TypedPlan {
plan: Plan::Get {
id: Id::Global(GlobalId::User(1)),
},
typ: RelationType::new(vec![]),
},
);
handle
.create_task(
task_id,
plan,
GlobalId::User(1),
sink_tx,
&src_ids,
vec![rx],
true,
)
.unwrap();
tx.send((Row::empty(), 0, 0)).unwrap();
handle.run_available(flow_tick.tick());
assert_eq!(sink_rx.blocking_recv().unwrap().0, Row::empty());
handle.shutdown();
worker_thread_handle.join().unwrap();
}
}

View File

@@ -25,7 +25,7 @@ use crate::adapter::error::{Error, PlanSnafu};
use crate::compute::render::Context;
use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff};
use crate::expr::GlobalId;
use crate::repr::{DiffRow, Row, BOARDCAST_CAP};
use crate::repr::{DiffRow, Row, BROADCAST_CAP};
#[allow(clippy::mutable_key_type)]
impl<'referred, 'df> Context<'referred, 'df> {
@@ -121,12 +121,12 @@ impl<'referred, 'df> Context<'referred, 'df> {
.add_subgraph_sink("Sink", collection.into_inner(), move |_ctx, recv| {
let data = recv.take_inner();
buf.extend(data.into_iter().flat_map(|i| i.into_iter()));
if sender.len() >= BOARDCAST_CAP {
if sender.len() >= BROADCAST_CAP {
return;
} else {
while let Some(row) = buf.pop_front() {
// if the sender is full, stop sending
if sender.len() >= BOARDCAST_CAP {
if sender.len() >= BROADCAST_CAP {
break;
}
// TODO(discord9): handling tokio broadcast error

View File

@@ -25,7 +25,7 @@ use crate::utils::{ArrangeHandler, Arrangement};
/// input/output of a dataflow
/// One `ComputeState` manage the input/output/schedule of one `Hydroflow`
#[derive(Default)]
#[derive(Debug, Default)]
pub struct DataflowState {
/// it is important to use a deque to maintain the order of subgraph here
/// TODO(discord9): consider dedup? Also not necessary for hydroflow itself also do dedup when schedule

View File

@@ -146,7 +146,7 @@ impl CollectionBundle {
///
/// Using a `VecDeque` to preserve the order of errors
/// when running dataflow continuously and need errors in order
#[derive(Default, Clone)]
#[derive(Debug, Default, Clone)]
pub struct ErrCollector {
pub inner: Arc<Mutex<VecDeque<EvalError>>>,
}

View File

@@ -52,7 +52,7 @@ pub type DiffRow = (Row, Timestamp, Diff);
pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff);
/// broadcast channel capacity, set to a arbitrary value
pub const BOARDCAST_CAP: usize = 1024;
pub const BROADCAST_CAP: usize = 1024;
/// Convert a value that is or can be converted to Datetime to internal timestamp
///