diff --git a/Cargo.lock b/Cargo.lock index f0b0e93057..c3130c0bfe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3511,6 +3511,7 @@ dependencies = [ "common-decimal", "common-error", "common-macro", + "common-meta", "common-telemetry", "common-time", "datafusion-common", diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 28be2e4dae..ce0ac07ad5 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -21,15 +21,20 @@ datatypes.workspace = true enum_dispatch = "0.3" # This fork is simply for keeping our dependency in our org, and pin the version # it is the same with upstream repo +common-meta.workspace = true hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" } itertools.workspace = true num-traits = "0.2" +prost.workspace = true +query.workspace = true serde.workspace = true servers.workspace = true +session.workspace = true smallvec.workspace = true snafu.workspace = true strum.workspace = true substrait.workspace = true +table.workspace = true tokio.workspace = true tonic.workspace = true diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index f53fae5674..b4d95ba5f0 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -14,5 +14,275 @@ //! for getting data from source and sending results to sink //! and communicating with other parts of the database +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; + +use common_meta::key::table_info::TableInfoManager; +use common_meta::key::table_name::TableNameManager; +use hydroflow::scheduled::graph::Hydroflow; +use query::QueryEngine; +use serde::{Deserialize, Serialize}; +use smallvec::SmallVec; +use snafu::OptionExt; +use table::metadata::TableId; +use tokio::sync::broadcast; +use tokio::task::LocalSet; + +use crate::adapter::error::TableNotFoundSnafu; +use crate::compute::{Context, DataflowState, ErrCollector}; +use crate::expr::GlobalId; +use crate::plan::{Plan, TypedPlan}; +use crate::repr::{DiffRow, RelationType}; +use crate::transform::sql_to_flow_plan; pub(crate) mod error; + +use error::Error; + +// TODO: refactor common types for flow to a separate module +pub type TaskId = u64; +pub type TableName = Vec; + +/// broadcast channel capacity, set to a arbitrary value +pub const BOARDCAST_CAP: usize = 1024; + +/// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread +/// +/// The choice of timestamp is just using current system timestamp for now +pub struct FlowNodeManager<'subgraph> { + pub task_states: BTreeMap>, + pub local_set: LocalSet, + + // TODO: catalog/tableinfo manager for query schema and translate sql to plan + query_engine: Arc, + /// contains mapping from table name to global id, and table schema + worker_context: FlowWorkerContext, +} + +/// mapping of table name <-> table id should be query from tableinfo manager +struct TableNameIdMapping { + /// for query `TableId -> TableName` mapping + table_info_manager: TableInfoManager, + /// for query `TableName -> TableId` mapping + table_name_manager: TableNameManager, + // a in memory cache, will be invalid if necessary +} + +impl TableNameIdMapping { + pub async fn get_table_id(&self, table_name: TableName) -> Result { + todo!() + } + + pub async fn get_table_name(&self, table_id: TableId) -> Result { + todo!() + } +} + +/// ActiveDataflowState is a wrapper around `Hydroflow` and `DataflowState` +pub(crate) struct ActiveDataflowState<'subgraph> { + df: Hydroflow<'subgraph>, + state: DataflowState, + err_collector: ErrCollector, +} + +impl Default for ActiveDataflowState<'_> { + fn default() -> Self { + ActiveDataflowState { + df: Hydroflow::new(), + state: DataflowState::default(), + err_collector: ErrCollector::default(), + } + } +} + +impl<'subgraph> ActiveDataflowState<'subgraph> { + /// Create a new render context, assigned with given global id + pub fn new_ctx<'ctx>(&'ctx mut self, global_id: GlobalId) -> Context<'ctx, 'subgraph> + where + 'subgraph: 'ctx, + { + Context { + id: global_id, + df: &mut self.df, + compute_state: &mut self.state, + err_collector: self.err_collector.clone(), + input_collection: Default::default(), + local_scope: Default::default(), + } + } +} + +impl<'s> FlowNodeManager<'s> { + /// Return task id if a new task is created, otherwise return None + /// + /// steps to create task: + /// 1. parse query into typed plan(and optional parse expire_when expr) + /// 2. render source/sink with output table id and used input table id + /// + /// TODO(discord9): use greptime-proto type to create task instead + #[allow(clippy::too_many_arguments)] + pub async fn create_task( + &mut self, + task_id: TaskId, + sink_table_id: TableId, + source_table_ids: SmallVec<[TableId; 2]>, + create_if_not_exist: bool, + expire_when: Option, + comment: Option, + sql: String, + task_options: HashMap, + ) -> Result, Error> { + if create_if_not_exist { + // check if the task already exists + if self.task_states.contains_key(&task_id) { + return Ok(None); + } + } + + // construct a active dataflow state with it + let flow_plan = + sql_to_flow_plan(&mut self.worker_context, &self.query_engine, &sql).await?; + + // TODO(discord9): parse `expire_when` + + let _sink_gid = + self.create_ctx_and_render(task_id, flow_plan, sink_table_id, source_table_ids)?; + Ok(Some(task_id)) + } + + /// create a render context, render the plan, and connect source/sink to the rendered dataflow + /// + /// return the output table's assigned global id + fn create_ctx_and_render( + &mut self, + task_id: TaskId, + plan: TypedPlan, + sink_table_id: TableId, + source_table_ids: SmallVec<[TableId; 2]>, + ) -> Result { + let mut cur_task_state = ActiveDataflowState::<'s>::default(); + // 1. render sources + let source_global_ids = source_table_ids + .iter() + .map(|id| self.worker_context.assign_global_id_to_table(*id)) + .collect::>(); + + let sink_global_id = self.worker_context.assign_global_id_to_table(sink_table_id); + + { + let mut ctx = cur_task_state.new_ctx(sink_global_id); + // rendering source now that we have the context + for source in source_global_ids { + let source_sender = self.worker_context.get_source_by_global_id(&source)?; + let recv = source_sender.subscribe(); + let bundle = ctx.render_source(recv)?; + ctx.insert_global(source, bundle); + } + + let rendered_dataflow = ctx.render_plan(plan.plan)?; + let sink_sender = self.worker_context.get_sink_by_global_id(&sink_global_id)?; + ctx.render_sink(rendered_dataflow, sink_sender); + } + + // what is wrong with lifetime? ctx is short live than cur_task_state + self.task_states.insert(task_id, cur_task_state); + Ok(sink_global_id) + } +} + +/// A context that holds the information of the dataflow +#[derive(Default)] +pub struct FlowWorkerContext { + /// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender + /// + /// Note that we are getting insert requests with table id, so we should use table id as the key + pub source_sender: BTreeMap>, + /// broadcast receiver for sink table, there should only be one receiver, and it will receive all the data from the sink table + /// + /// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key + pub sink_receiver: + BTreeMap, broadcast::Receiver)>, + /// `id` refer to any source table in the dataflow, and `name` is the name of the table + /// which is a `Vec` in substrait + pub id_to_name: HashMap>, + /// see `id_to_name` + pub name_to_id: HashMap, GlobalId>, + /// the schema of the table + pub schema: HashMap, +} + +impl FlowWorkerContext { + pub fn add_source_sender(&mut self, table_id: TableId) { + self.source_sender + .insert(table_id, broadcast::channel(BOARDCAST_CAP).0); + } + pub fn get_source_by_global_id( + &self, + id: &GlobalId, + ) -> Result<&broadcast::Sender, Error> { + let table_id = self.get_table_name_id(id)?.1; + self.source_sender + .get(&table_id) + .with_context(|| TableNotFoundSnafu { + name: table_id.to_string(), + }) + } + + pub fn get_sink_by_global_id( + &self, + id: &GlobalId, + ) -> Result, Error> { + let table_name = self.get_table_name_id(id)?.0; + self.sink_receiver + .get(&table_name) + .map(|(s, r)| s.clone()) + .with_context(|| TableNotFoundSnafu { + name: table_name.join("."), + }) + } +} + +impl FlowWorkerContext { + /// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function. + /// + /// Returns an error if no table has been registered with the provided names + pub fn table(&self, name: &Vec) -> Result<(GlobalId, RelationType), Error> { + let id = self + .name_to_id + .get(name) + .copied() + .with_context(|| TableNotFoundSnafu { + name: name.join("."), + })?; + let schema = self + .schema + .get(&id) + .cloned() + .with_context(|| TableNotFoundSnafu { + name: name.join("."), + })?; + Ok((id, schema)) + } + + pub fn table_from_table_id(&self, id: &GlobalId) -> Result<(GlobalId, RelationType), Error> { + todo!() + } + + /// Assign a global id to a table, if already assigned, return the existing global id + /// + /// NOTE: this will not actually render the table into collection refered as GlobalId + /// merely creating a mapping from table id to global id + pub fn assign_global_id_to_table(&self, table_id: TableId) -> GlobalId { + todo!() + } + + /// get table name by global id + pub fn get_table_name_id(&self, id: &GlobalId) -> Result<(TableName, TableId), Error> { + todo!() + } + + /// Get a new global id + pub fn new_global_id(&self) -> GlobalId { + todo!() + } +} diff --git a/src/flow/src/compute.rs b/src/flow/src/compute.rs index 294716edf0..8463039dcd 100644 --- a/src/flow/src/compute.rs +++ b/src/flow/src/compute.rs @@ -17,3 +17,7 @@ mod render; mod state; mod types; + +pub(crate) use render::Context; +pub(crate) use state::DataflowState; +pub(crate) use types::ErrCollector; diff --git a/src/flow/src/compute/render.rs b/src/flow/src/compute/render.rs index d0fdc1fdb9..03b4c60413 100644 --- a/src/flow/src/compute/render.rs +++ b/src/flow/src/compute/render.rs @@ -45,6 +45,7 @@ use crate::utils::{ArrangeHandler, ArrangeReader, ArrangeWriter, Arrangement}; mod map; mod reduce; +mod src_sink; /// The Context for build a Operator with id of `GlobalId` pub struct Context<'referred, 'df> { @@ -52,13 +53,15 @@ pub struct Context<'referred, 'df> { pub df: &'referred mut Hydroflow<'df>, pub compute_state: &'referred mut DataflowState, /// a list of all collections being used in the operator + /// + /// TODO(discord9): remove extra clone by counting usage and remove it on last usage? pub input_collection: BTreeMap, /// used by `Get`/`Let` Plan for getting/setting local variables /// /// TODO(discord9): consider if use Vec<(LocalId, CollectionBundle)> instead - local_scope: Vec>, + pub local_scope: Vec>, // Collect all errors in this operator's evaluation - err_collector: ErrCollector, + pub err_collector: ErrCollector, } impl<'referred, 'df> Drop for Context<'referred, 'df> { diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 435f933995..455b2808ac 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -93,7 +93,8 @@ impl FunctionExtensions { /// To reuse existing code for parse sql, the sql is first parsed into a datafusion logical plan, /// then to a substrait plan, and finally to a flow plan. -/// TODO: check if use empty `QueryContext` influence anything +/// +/// TODO(discord9): check if use empty `QueryContext` influence anything pub async fn sql_to_flow_plan( ctx: &mut FlowWorkerContext, engine: &Arc, @@ -147,6 +148,7 @@ mod test { id_to_name: HashMap::from([(gid, name.clone())]), name_to_id: HashMap::from([(name.clone(), gid)]), schema: HashMap::from([(gid, schema)]), + ..Default::default() } }