diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 482245f90e..77710bbdeb 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -53,6 +53,8 @@ pub type TableName = Vec; /// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread /// /// The choice of timestamp is just using current system timestamp for now +/// +/// TODO(discord9): refactor flow worker into a separate !Send struct? pub struct FlowNodeManager<'subgraph> { /// The state of all tasks in the flow node /// This is also the only field that's not `Send` in the struct @@ -61,7 +63,7 @@ pub struct FlowNodeManager<'subgraph> { query_engine: Arc, srv_map: TableInfoSource, /// contains mapping from table name to global id, and table schema - worker_context: FlowWorkerContext, + node_context: FlowNodeContext, tick_manager: FlowTickManager, } @@ -71,7 +73,7 @@ pub struct FlowNodeManager<'subgraph> { fn check_is_send() { fn is_send() {} is_send::(); - is_send::(); + is_send::(); is_send::(); } @@ -235,14 +237,14 @@ impl<'s> FlowNodeManager<'s> { // if there is work done, check for new data in the sink while task_state.run_available() { - let sink_table_name = self.worker_context.task_to_sink.get(task_id).unwrap(); + let sink_table_name = self.node_context.task_to_sink.get(task_id).unwrap(); let sink_buf = self - .worker_context + .node_context .sink_buffer .get_mut(sink_table_name) .unwrap(); let sink_recv = self - .worker_context + .node_context .sink_receiver .get_mut(sink_table_name) .unwrap(); @@ -256,7 +258,7 @@ impl<'s> FlowNodeManager<'s> { /// Take everything in sink buffer and construct write request which should be send to the frontend pub fn take_sink_request_per_table(&mut self) -> Vec<(TableName, Vec)> { - std::mem::take(&mut self.worker_context.sink_buffer) + std::mem::take(&mut self.node_context.sink_buffer) .into_iter() .map(|(name, buf)| (name, buf.into_iter().collect())) .collect() @@ -269,7 +271,7 @@ impl<'s> FlowNodeManager<'s> { rows: Vec, ) -> Result<(), Error> { let table_id = region_id.table_id(); - self.worker_context.send(table_id, rows) + self.node_context.send(table_id, rows) } /// Return task id if a new task is created, otherwise return None @@ -302,14 +304,13 @@ impl<'s> FlowNodeManager<'s> { .iter() .chain(std::iter::once(&sink_table_id)) { - self.worker_context + self.node_context .assign_global_id_to_table(&self.srv_map, *source) .await; } // construct a active dataflow state with it - let flow_plan = - sql_to_flow_plan(&mut self.worker_context, &self.query_engine, &sql).await?; + let flow_plan = sql_to_flow_plan(&mut self.node_context, &self.query_engine, &sql).await?; // TODO(discord9): parse `expire_when` @@ -331,7 +332,7 @@ impl<'s> FlowNodeManager<'s> { { let sink_global_id = self - .worker_context + .node_context .table_repr .get_by_table_id(&sink_table_id) .with_context(|| TableNotFoundSnafu { @@ -342,21 +343,21 @@ impl<'s> FlowNodeManager<'s> { // rendering source now that we have the context for source in source_table_ids { let source = self - .worker_context + .node_context .table_repr .get_by_table_id(source) .with_context(|| TableNotFoundSnafu { name: source.to_string(), })? .1; - let source_sender = self.worker_context.get_source_by_global_id(&source)?; + let source_sender = self.node_context.get_source_by_global_id(&source)?; let recv = source_sender.subscribe(); let bundle = ctx.render_source(recv)?; ctx.insert_global(source, bundle); } let rendered_dataflow = ctx.render_plan(plan.plan)?; - let sink_sender = self.worker_context.get_sink_by_global_id(&sink_global_id)?; + let sink_sender = self.node_context.get_sink_by_global_id(&sink_global_id)?; ctx.render_sink(rendered_dataflow, sink_sender); } @@ -367,7 +368,7 @@ impl<'s> FlowNodeManager<'s> { /// A context that holds the information of the dataflow #[derive(Default)] -pub struct FlowWorkerContext { +pub struct FlowNodeContext { /// mapping from source table to tasks, useful for schedule which task to run when a source table is updated pub source_to_tasks: BTreeMap>, /// mapping from task to sink table, useful for sending data back to the client when a task is done running @@ -390,7 +391,7 @@ pub struct FlowWorkerContext { pub table_repr: TriMap, } -impl FlowWorkerContext { +impl FlowNodeContext { pub fn send(&mut self, table_id: TableId, rows: Vec) -> Result<(), Error> { let sender = self .source_sender @@ -417,7 +418,7 @@ impl FlowWorkerContext { } } -impl FlowWorkerContext { +impl FlowNodeContext { /// mapping source table to task, and sink table to task in worker context /// /// also add their corrseponding broadcast sender/receiver @@ -489,7 +490,7 @@ impl FlowWorkerContext { } } -impl FlowWorkerContext { +impl FlowNodeContext { /// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function. /// /// Returns an error if no table has been registered with the provided names diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 8fca5f65ac..6159e02d62 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -28,7 +28,7 @@ use crate::adapter::error::{ Error, InvalidQueryPlanSnafu, InvalidQueryProstSnafu, InvalidQuerySubstraitSnafu, NotImplementedSnafu, TableNotFoundSnafu, }; -use crate::adapter::FlowWorkerContext; +use crate::adapter::FlowNodeContext; use crate::expr::GlobalId; use crate::plan::TypedPlan; use crate::repr::RelationType; @@ -96,7 +96,7 @@ impl FunctionExtensions { /// /// TODO(discord9): check if use empty `QueryContext` influence anything pub async fn sql_to_flow_plan( - ctx: &mut FlowWorkerContext, + ctx: &mut FlowNodeContext, engine: &Arc, sql: &str, ) -> Result { @@ -140,13 +140,13 @@ mod test { use crate::adapter::TriMap; use crate::repr::ColumnType; - pub fn create_test_ctx() -> FlowWorkerContext { + pub fn create_test_ctx() -> FlowNodeContext { let gid = GlobalId::User(0); let name = vec!["numbers".to_string()]; let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); let mut tri_map = TriMap::new(); tri_map.insert(name.clone(), 0, gid); - FlowWorkerContext { + FlowNodeContext { schema: HashMap::from([(gid, schema)]), table_repr: tri_map, ..Default::default() diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index f5d49bcae4..3ff166b269 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -54,11 +54,11 @@ use crate::expr::{ }; use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan}; use crate::repr::{self, ColumnType, RelationType}; -use crate::transform::{FlowWorkerContext, FunctionExtensions}; +use crate::transform::{FlowNodeContext, FunctionExtensions}; impl TypedExpr { fn from_substrait_agg_grouping( - ctx: &mut FlowWorkerContext, + ctx: &mut FlowNodeContext, groupings: &[Grouping], typ: &RelationType, extensions: &FunctionExtensions, @@ -84,7 +84,7 @@ impl TypedExpr { impl AggregateExpr { fn from_substrait_agg_measures( - ctx: &mut FlowWorkerContext, + ctx: &mut FlowNodeContext, measures: &[Measure], typ: &RelationType, extensions: &FunctionExtensions, @@ -218,7 +218,7 @@ impl KeyValPlan { impl TypedPlan { /// Convert AggregateRel into Flow's TypedPlan pub fn from_substrait_agg_rel( - ctx: &mut FlowWorkerContext, + ctx: &mut FlowNodeContext, agg: &proto::AggregateRel, extensions: &FunctionExtensions, ) -> Result { diff --git a/src/flow/src/transform/plan.rs b/src/flow/src/transform/plan.rs index 7290eaf183..9422c67569 100644 --- a/src/flow/src/transform/plan.rs +++ b/src/flow/src/transform/plan.rs @@ -23,12 +23,12 @@ use crate::adapter::error::{Error, InvalidQuerySnafu, NotImplementedSnafu, PlanS use crate::expr::{MapFilterProject, TypedExpr}; use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, RelationType}; -use crate::transform::{FlowWorkerContext, FunctionExtensions}; +use crate::transform::{FlowNodeContext, FunctionExtensions}; impl TypedPlan { /// Convert Substrait Plan into Flow's TypedPlan pub fn from_substrait_plan( - ctx: &mut FlowWorkerContext, + ctx: &mut FlowNodeContext, plan: &SubPlan, ) -> Result { // Register function extension @@ -62,7 +62,7 @@ impl TypedPlan { /// Convert Substrait Rel into Flow's TypedPlan /// TODO: SELECT DISTINCT(does it get compile with something else?) pub fn from_substrait_rel( - ctx: &mut FlowWorkerContext, + ctx: &mut FlowNodeContext, rel: &Rel, extensions: &FunctionExtensions, ) -> Result {