refactor: use seperate Worker

This commit is contained in:
discord9
2024-04-28 16:20:04 +08:00
parent c7647759be
commit e88a40b58b
3 changed files with 133 additions and 141 deletions

View File

@@ -29,10 +29,11 @@ use smallvec::SmallVec;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::broadcast;
use tokio::sync::{broadcast, mpsc};
use tokio::task::LocalSet;
use crate::adapter::error::{EvalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu};
use crate::adapter::worker::WorkerHandle;
use crate::compute::{Context, DataflowState, ErrCollector};
use crate::expr::error::InternalSnafu;
use crate::expr::GlobalId;
@@ -57,32 +58,25 @@ pub type TableName = Vec<String>;
/// The choice of timestamp is just using current system timestamp for now
///
/// TODO(discord9): refactor flow worker into a separate !Send struct? Flow Workers and Flow Node exists on same machine
pub struct FlowNodeManager<'subgraph> {
/// The state of all tasks in the flow node
/// This is also the only field that's not `Send` in the struct
pub task_states: BTreeMap<TaskId, ActiveDataflowState<'subgraph>>,
pub struct FlowNodeManager {
/// The handler to the worker that will run the dataflow
/// which is `!Send` so a handle is used
pub worker_handles: Vec<WorkerHandle>,
/// The query engine that will be used to parse the query and convert it to a dataflow plan
query_engine: Arc<dyn QueryEngine>,
/// Getting table name and table schema from table info manager
srv_map: TableInfoSource,
/// contains mapping from table name to global id, and table schema
node_context: FlowNodeContext,
tick_manager: FlowTickManager,
}
/// This is non-SEND struct, so we can't run it in a separate thread
pub struct FlowWorkerManager<'subgraph> {
pub task_states: BTreeMap<TaskId, ActiveDataflowState<'subgraph>>,
}
/// Just check if NodeManager's other fields are `Send` so later we can refactor so A Flow Node Manager
/// can manage multiple flow worker(thread) then we can run multiple flow worker in a single flow node manager
#[test]
fn check_is_send() {
fn is_send<T: Send + Sync>() {}
is_send::<TableInfoSource>();
is_send::<FlowNodeContext>();
is_send::<FlowTickManager>();
is_send::<broadcast::Sender<DiffRow>>();
is_send::<FlowNodeManager>();
}
/// mapping of table name <-> table id should be query from tableinfo manager
@@ -220,56 +214,14 @@ pub fn diff_row_to_request(rows: Vec<DiffRow>) -> Vec<DiffRequest> {
reqs
}
impl<'s> FlowNodeManager<'s> {
/// blocking run the dataflow's grpc service & execution in a `LocalSet`
///
/// the idomic way to run the dataflow
/// is spawn a new thread, then create a flow node manager, and run the dataflow
/// using this method
pub fn run_dataflow(self, rt: tokio::runtime::Runtime, local_set: LocalSet) {
local_set.block_on(&rt, async move {
// TODO(discord9): might place grpc service on another thread?
let zelf = self;
todo!("main loop");
});
}
impl FlowNodeManager {
/// Run all available subgraph in the flow node
/// This will try to run all dataflow in this node
/// TODO(discord9): add flag for subgraph that have input since last run
pub fn run_available(&mut self) {
let now = self.tick_manager.tick();
for (task_id, task_state) in self.task_states.iter_mut() {
task_state.set_current_ts(now);
task_state.run_available();
// if there is work done, check for new data in the sink
while task_state.run_available() {
let sink_table_name = self.node_context.task_to_sink.get(task_id).unwrap();
let sink_buf = self
.node_context
.sink_buffer
.entry(sink_table_name.clone())
.or_default();
let sink_recv = self
.node_context
.sink_receiver
.get_mut(sink_table_name)
.unwrap();
// TODO(discord9): handle lagging eror
while let Ok(row) = sink_recv.1.try_recv() {
sink_buf.push_back(row);
}
}
}
}
/// Take everything in sink buffer and construct write request which should be send to the frontend
pub fn take_sink_request_per_table(&mut self) -> Vec<(TableName, Vec<DiffRow>)> {
std::mem::take(&mut self.node_context.sink_buffer)
.into_iter()
.map(|(name, buf)| (name, buf.into_iter().collect()))
.collect()
self.worker_handles.iter_mut().for_each(|worker| {
worker.run_available();
});
}
/// send write request to related source sender
@@ -283,6 +235,16 @@ impl<'s> FlowNodeManager<'s> {
Ok(())
}
pub async fn remove_task(&mut self, task_id: TaskId) -> Result<(), Error> {
for handle in self.worker_handles.iter_mut() {
if handle.contains_task(task_id)? {
handle.remove_task(task_id)?;
break;
}
}
Ok(())
}
/// Return task id if a new task is created, otherwise return None
///
/// steps to create task:
@@ -304,8 +266,10 @@ impl<'s> FlowNodeManager<'s> {
) -> Result<Option<TaskId>, Error> {
if create_if_not_exist {
// check if the task already exists
if self.task_states.contains_key(&task_id) {
return Ok(None);
for handle in self.worker_handles.iter() {
if handle.contains_task(task_id)? {
return Ok(None);
}
}
}
// assign global id to source and sink table
@@ -317,62 +281,52 @@ impl<'s> FlowNodeManager<'s> {
.assign_global_id_to_table(&self.srv_map, *source)
.await;
}
self.node_context
.register_task_src_sink(task_id, source_table_ids, sink_table_id);
// construct a active dataflow state with it
let flow_plan = sql_to_flow_plan(&mut self.node_context, &self.query_engine, &sql).await?;
// TODO(discord9): parse `expire_when`
let _ = expire_when;
let _ = comment;
let _ = task_options;
// TODO(discord9): add more than one handles
let sink_id = self
.node_context
.table_repr
.get_by_table_id(&sink_table_id)
.unwrap()
.1;
let sink_sender = self.node_context.get_sink_by_global_id(&sink_id)?;
let source_ids = source_table_ids
.iter()
.map(|id| self.node_context.table_repr.get_by_table_id(id).unwrap().1)
.collect_vec();
let source_senders = source_ids
.iter()
.map(|id| {
self.node_context
.get_source_by_global_id(id)
.map(|s| s.subscribe())
})
.collect::<Result<Vec<_>, _>>()?;
let handle = &self.worker_handles[0];
handle.create_task(
task_id,
flow_plan,
sink_id,
sink_sender,
&source_ids,
source_senders,
create_if_not_exist,
)?;
self.create_ctx_and_render(task_id, flow_plan, sink_table_id, source_table_ids)?;
Ok(Some(task_id))
}
/// create a render context, render the plan, and connect source/sink to the rendered dataflow
///
/// return the output table's assigned global id
fn create_ctx_and_render(
&mut self,
task_id: TaskId,
plan: TypedPlan,
sink_table_id: TableId,
source_table_ids: &[TableId],
) -> Result<(), Error> {
let mut cur_task_state = ActiveDataflowState::<'s>::default();
{
let sink_global_id = self
.node_context
.table_repr
.get_by_table_id(&sink_table_id)
.with_context(|| TableNotFoundSnafu {
name: sink_table_id.to_string(),
})?
.1;
let mut ctx = cur_task_state.new_ctx(sink_global_id);
// rendering source now that we have the context
for source in source_table_ids {
let source = self
.node_context
.table_repr
.get_by_table_id(source)
.with_context(|| TableNotFoundSnafu {
name: source.to_string(),
})?
.1;
let source_sender = self.node_context.get_source_by_global_id(&source)?;
let recv = source_sender.subscribe();
let bundle = ctx.render_source(recv)?;
ctx.insert_global(source, bundle);
}
let rendered_dataflow = ctx.render_plan(plan.plan)?;
let sink_sender = self.node_context.get_sink_by_global_id(&sink_global_id)?;
ctx.render_sink(rendered_dataflow, sink_sender);
}
self.task_states.insert(task_id, cur_task_state);
Ok(())
}
}
/// A context that holds the information of the dataflow
@@ -390,10 +344,13 @@ pub struct FlowNodeContext {
///
/// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key
/// note that the sink receiver should only have one, and we are using broadcast as mpsc channel here
pub sink_receiver:
BTreeMap<TableName, (broadcast::Sender<DiffRow>, broadcast::Receiver<DiffRow>)>,
/// store sink buffer for each sink table, used for sending data back to the frontend
pub sink_buffer: BTreeMap<TableName, VecDeque<DiffRow>>,
pub sink_receiver: BTreeMap<
TableName,
(
mpsc::UnboundedSender<DiffRow>,
mpsc::UnboundedReceiver<DiffRow>,
),
>,
/// store source in buffer for each source table, in case broadcast channel is full
pub send_buffer: BTreeMap<TableId, VecDeque<DiffRow>>,
/// the schema of the table
@@ -469,7 +426,7 @@ impl FlowNodeContext {
pub fn add_sink_receiver(&mut self, table_name: TableName) {
self.sink_receiver
.entry(table_name)
.or_insert_with(|| broadcast::channel(BOARDCAST_CAP));
.or_insert_with(|| mpsc::unbounded_channel::<DiffRow>());
}
pub fn get_source_by_global_id(
@@ -493,7 +450,7 @@ impl FlowNodeContext {
pub fn get_sink_by_global_id(
&self,
id: &GlobalId,
) -> Result<broadcast::Sender<DiffRow>, Error> {
) -> Result<mpsc::UnboundedSender<DiffRow>, Error> {
let table_name = self
.table_repr
.get_by_global_id(id)

View File

@@ -68,7 +68,7 @@ fn mock_harness_flow_node_manager() {
{
let table_id: TableId = 451;
let key = TableInfoKey::new(table_id);
let val = todo!();
let val = new_test_table_info_with_name(table_id, "table1", vec![1, 2, 3]);
}
let kv = Arc::new(kvb) as KvBackendRef;
let info_manager = TableInfoManager::new(kv);

View File

@@ -76,7 +76,10 @@ impl WorkerHandle {
};
let ret = self.itc_client.blocking_lock().call_blocking(req)?;
if let Response::Create { task_create_result } = ret {
if let Response::Create {
result: task_create_result,
} = ret
{
task_create_result
} else {
InternalSnafu {
@@ -90,11 +93,11 @@ impl WorkerHandle {
}
/// remove task, return task id
pub fn remove_task(&self, task_id: TaskId) -> Result<TaskId, Error> {
pub fn remove_task(&self, task_id: TaskId) -> Result<bool, Error> {
let req = Request::Remove { task_id };
let ret = self.itc_client.blocking_lock().call_blocking(req)?;
if let Response::Remove { task_remove_result } = ret {
task_remove_result
if let Response::Remove { result } = ret {
Ok(result)
} else {
InternalSnafu {
reason: format!(
@@ -107,10 +110,29 @@ impl WorkerHandle {
}
// trigger running the worker
pub fn trigger_run(&self) {
pub fn run_available(&self) {
self.itc_client
.blocking_lock()
.call_non_resp(Request::TriggerRun);
.call_non_blocking(Request::RunAvail);
}
pub fn contains_task(&self, task_id: TaskId) -> Result<bool, Error> {
let req = Request::ContainTask { task_id };
let ret = self.itc_client.blocking_lock().call_blocking(req).unwrap();
if let Response::ContainTask {
result: task_contain_result,
} = ret
{
Ok(task_contain_result)
} else {
InternalSnafu {
reason: format!(
"Flow Node/Worker itc failed, expect Response::ContainTask, found {ret:?}"
),
}
.fail()
.with_context(|_| EvalSnafu {})
}
}
}
@@ -156,9 +178,9 @@ impl<'s> Worker<'s> {
Ok(Some(task_id))
}
pub fn remove_task(&mut self, task_id: TaskId) -> Result<(), Error> {
self.task_states.remove(&task_id);
Ok(())
/// remove task, return true if a task is removed
pub fn remove_task(&mut self, task_id: TaskId) -> bool {
self.task_states.remove(&task_id).is_some()
}
/// run the worker until it is dropped
@@ -185,21 +207,28 @@ impl<'s> Worker<'s> {
src_recvs,
create_if_not_exist,
);
self.itc_server.lock().await.resp(
req_id,
Response::Create {
result: task_create_result,
},
);
}
Request::Remove { task_id } => {
let ret = self.remove_task(task_id);
self.itc_server
.lock()
.await
.resp(req_id, Response::Create { task_create_result });
.resp(req_id, Response::Remove { result: ret })
}
Request::Remove { task_id } => {
let ret = self.remove_task(task_id).map(|_| task_id);
self.itc_server.lock().await.resp(
req_id,
Response::Remove {
task_remove_result: ret,
},
)
Request::RunAvail => self.run_tick(),
Request::ContainTask { task_id } => {
let ret = self.task_states.contains_key(&task_id);
self.itc_server
.lock()
.await
.resp(req_id, Response::ContainTask { result: ret })
}
Request::TriggerRun => self.run_tick(),
}
}
@@ -228,16 +257,22 @@ enum Request {
task_id: TaskId,
},
/// Trigger the worker to run, useful after input buffer is full
TriggerRun,
RunAvail,
ContainTask {
task_id: TaskId,
},
}
#[derive(Debug)]
enum Response {
Create {
task_create_result: Result<Option<TaskId>, Error>,
result: Result<Option<TaskId>, Error>,
},
Remove {
task_remove_result: Result<TaskId, Error>,
result: bool,
},
ContainTask {
result: bool,
},
}
@@ -263,8 +298,8 @@ struct InterThreadCallClient {
}
impl InterThreadCallClient {
/// call without expecting responses
fn call_non_resp(&mut self, req: Request) {
/// call without expecting responses or blocking
fn call_non_blocking(&mut self, req: Request) {
let call_id = {
let mut call_id = self.call_id.blocking_lock();
*call_id += 1;