refactor: FlowNodeContext

This commit is contained in:
discord9
2024-04-26 19:26:54 +08:00
parent ea40691c71
commit d8a191a2db
4 changed files with 30 additions and 29 deletions

View File

@@ -53,6 +53,8 @@ pub type TableName = Vec<String>;
/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread
///
/// The choice of timestamp is just using current system timestamp for now
///
/// TODO(discord9): refactor flow worker into a separate !Send struct?
pub struct FlowNodeManager<'subgraph> {
/// The state of all tasks in the flow node
/// This is also the only field that's not `Send` in the struct
@@ -61,7 +63,7 @@ pub struct FlowNodeManager<'subgraph> {
query_engine: Arc<dyn QueryEngine>,
srv_map: TableInfoSource,
/// contains mapping from table name to global id, and table schema
worker_context: FlowWorkerContext,
node_context: FlowNodeContext,
tick_manager: FlowTickManager,
}
@@ -71,7 +73,7 @@ pub struct FlowNodeManager<'subgraph> {
fn check_is_send() {
fn is_send<T: Send>() {}
is_send::<TableInfoSource>();
is_send::<FlowWorkerContext>();
is_send::<FlowNodeContext>();
is_send::<FlowTickManager>();
}
@@ -235,14 +237,14 @@ impl<'s> FlowNodeManager<'s> {
// if there is work done, check for new data in the sink
while task_state.run_available() {
let sink_table_name = self.worker_context.task_to_sink.get(task_id).unwrap();
let sink_table_name = self.node_context.task_to_sink.get(task_id).unwrap();
let sink_buf = self
.worker_context
.node_context
.sink_buffer
.get_mut(sink_table_name)
.unwrap();
let sink_recv = self
.worker_context
.node_context
.sink_receiver
.get_mut(sink_table_name)
.unwrap();
@@ -256,7 +258,7 @@ impl<'s> FlowNodeManager<'s> {
/// Take everything in sink buffer and construct write request which should be send to the frontend
pub fn take_sink_request_per_table(&mut self) -> Vec<(TableName, Vec<DiffRow>)> {
std::mem::take(&mut self.worker_context.sink_buffer)
std::mem::take(&mut self.node_context.sink_buffer)
.into_iter()
.map(|(name, buf)| (name, buf.into_iter().collect()))
.collect()
@@ -269,7 +271,7 @@ impl<'s> FlowNodeManager<'s> {
rows: Vec<DiffRow>,
) -> Result<(), Error> {
let table_id = region_id.table_id();
self.worker_context.send(table_id, rows)
self.node_context.send(table_id, rows)
}
/// Return task id if a new task is created, otherwise return None
@@ -302,14 +304,13 @@ impl<'s> FlowNodeManager<'s> {
.iter()
.chain(std::iter::once(&sink_table_id))
{
self.worker_context
self.node_context
.assign_global_id_to_table(&self.srv_map, *source)
.await;
}
// construct a active dataflow state with it
let flow_plan =
sql_to_flow_plan(&mut self.worker_context, &self.query_engine, &sql).await?;
let flow_plan = sql_to_flow_plan(&mut self.node_context, &self.query_engine, &sql).await?;
// TODO(discord9): parse `expire_when`
@@ -331,7 +332,7 @@ impl<'s> FlowNodeManager<'s> {
{
let sink_global_id = self
.worker_context
.node_context
.table_repr
.get_by_table_id(&sink_table_id)
.with_context(|| TableNotFoundSnafu {
@@ -342,21 +343,21 @@ impl<'s> FlowNodeManager<'s> {
// rendering source now that we have the context
for source in source_table_ids {
let source = self
.worker_context
.node_context
.table_repr
.get_by_table_id(source)
.with_context(|| TableNotFoundSnafu {
name: source.to_string(),
})?
.1;
let source_sender = self.worker_context.get_source_by_global_id(&source)?;
let source_sender = self.node_context.get_source_by_global_id(&source)?;
let recv = source_sender.subscribe();
let bundle = ctx.render_source(recv)?;
ctx.insert_global(source, bundle);
}
let rendered_dataflow = ctx.render_plan(plan.plan)?;
let sink_sender = self.worker_context.get_sink_by_global_id(&sink_global_id)?;
let sink_sender = self.node_context.get_sink_by_global_id(&sink_global_id)?;
ctx.render_sink(rendered_dataflow, sink_sender);
}
@@ -367,7 +368,7 @@ impl<'s> FlowNodeManager<'s> {
/// A context that holds the information of the dataflow
#[derive(Default)]
pub struct FlowWorkerContext {
pub struct FlowNodeContext {
/// mapping from source table to tasks, useful for schedule which task to run when a source table is updated
pub source_to_tasks: BTreeMap<TableId, BTreeSet<TaskId>>,
/// mapping from task to sink table, useful for sending data back to the client when a task is done running
@@ -390,7 +391,7 @@ pub struct FlowWorkerContext {
pub table_repr: TriMap,
}
impl FlowWorkerContext {
impl FlowNodeContext {
pub fn send(&mut self, table_id: TableId, rows: Vec<DiffRow>) -> Result<(), Error> {
let sender = self
.source_sender
@@ -417,7 +418,7 @@ impl FlowWorkerContext {
}
}
impl FlowWorkerContext {
impl FlowNodeContext {
/// mapping source table to task, and sink table to task in worker context
///
/// also add their corrseponding broadcast sender/receiver
@@ -489,7 +490,7 @@ impl FlowWorkerContext {
}
}
impl FlowWorkerContext {
impl FlowNodeContext {
/// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function.
///
/// Returns an error if no table has been registered with the provided names

View File

@@ -28,7 +28,7 @@ use crate::adapter::error::{
Error, InvalidQueryPlanSnafu, InvalidQueryProstSnafu, InvalidQuerySubstraitSnafu,
NotImplementedSnafu, TableNotFoundSnafu,
};
use crate::adapter::FlowWorkerContext;
use crate::adapter::FlowNodeContext;
use crate::expr::GlobalId;
use crate::plan::TypedPlan;
use crate::repr::RelationType;
@@ -96,7 +96,7 @@ impl FunctionExtensions {
///
/// TODO(discord9): check if use empty `QueryContext` influence anything
pub async fn sql_to_flow_plan(
ctx: &mut FlowWorkerContext,
ctx: &mut FlowNodeContext,
engine: &Arc<dyn QueryEngine>,
sql: &str,
) -> Result<TypedPlan, Error> {
@@ -140,13 +140,13 @@ mod test {
use crate::adapter::TriMap;
use crate::repr::ColumnType;
pub fn create_test_ctx() -> FlowWorkerContext {
pub fn create_test_ctx() -> FlowNodeContext {
let gid = GlobalId::User(0);
let name = vec!["numbers".to_string()];
let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]);
let mut tri_map = TriMap::new();
tri_map.insert(name.clone(), 0, gid);
FlowWorkerContext {
FlowNodeContext {
schema: HashMap::from([(gid, schema)]),
table_repr: tri_map,
..Default::default()

View File

@@ -54,11 +54,11 @@ use crate::expr::{
};
use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan};
use crate::repr::{self, ColumnType, RelationType};
use crate::transform::{FlowWorkerContext, FunctionExtensions};
use crate::transform::{FlowNodeContext, FunctionExtensions};
impl TypedExpr {
fn from_substrait_agg_grouping(
ctx: &mut FlowWorkerContext,
ctx: &mut FlowNodeContext,
groupings: &[Grouping],
typ: &RelationType,
extensions: &FunctionExtensions,
@@ -84,7 +84,7 @@ impl TypedExpr {
impl AggregateExpr {
fn from_substrait_agg_measures(
ctx: &mut FlowWorkerContext,
ctx: &mut FlowNodeContext,
measures: &[Measure],
typ: &RelationType,
extensions: &FunctionExtensions,
@@ -218,7 +218,7 @@ impl KeyValPlan {
impl TypedPlan {
/// Convert AggregateRel into Flow's TypedPlan
pub fn from_substrait_agg_rel(
ctx: &mut FlowWorkerContext,
ctx: &mut FlowNodeContext,
agg: &proto::AggregateRel,
extensions: &FunctionExtensions,
) -> Result<TypedPlan, Error> {

View File

@@ -23,12 +23,12 @@ use crate::adapter::error::{Error, InvalidQuerySnafu, NotImplementedSnafu, PlanS
use crate::expr::{MapFilterProject, TypedExpr};
use crate::plan::{Plan, TypedPlan};
use crate::repr::{self, RelationType};
use crate::transform::{FlowWorkerContext, FunctionExtensions};
use crate::transform::{FlowNodeContext, FunctionExtensions};
impl TypedPlan {
/// Convert Substrait Plan into Flow's TypedPlan
pub fn from_substrait_plan(
ctx: &mut FlowWorkerContext,
ctx: &mut FlowNodeContext,
plan: &SubPlan,
) -> Result<TypedPlan, Error> {
// Register function extension
@@ -62,7 +62,7 @@ impl TypedPlan {
/// Convert Substrait Rel into Flow's TypedPlan
/// TODO: SELECT DISTINCT(does it get compile with something else?)
pub fn from_substrait_rel(
ctx: &mut FlowWorkerContext,
ctx: &mut FlowNodeContext,
rel: &Rel,
extensions: &FunctionExtensions,
) -> Result<TypedPlan, Error> {