From 1067d3453d5d3c7abd9ae5ad41da07d1b3838086 Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 26 Apr 2024 15:20:24 +0800 Subject: [PATCH] feat: dataflow adapter --- Cargo.lock | 34 ++- src/flow/Cargo.toml | 2 + src/flow/src/adapter.rs | 372 ++++++++++++++++++++---- src/flow/src/adapter/error.rs | 11 +- src/flow/src/compute/render/src_sink.rs | 32 +- src/flow/src/repr.rs | 3 + src/flow/src/transform.rs | 7 +- 7 files changed, 394 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c3130c0bfe..bc35448e52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2459,6 +2459,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "ctor" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" +dependencies = [ + "quote", + "syn 1.0.109", +] + [[package]] name = "darling" version = "0.14.4" @@ -3521,6 +3531,7 @@ dependencies = [ "enum_dispatch", "hydroflow", "itertools 0.10.5", + "minstant", "num-traits", "prost 0.12.4", "query", @@ -3530,6 +3541,7 @@ dependencies = [ "session", "smallvec", "snafu", + "store-api", "strum 0.25.0", "substrait 0.7.2", "table", @@ -5398,6 +5410,16 @@ dependencies = [ "adler", ] +[[package]] +name = "minstant" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fb9b5c752f145ac5046bccc3c4f62892e3c950c1d1eab80c5949cd68a2078db" +dependencies = [ + "ctor", + "web-time 1.1.0", +] + [[package]] name = "mio" version = "0.8.11" @@ -11012,7 +11034,7 @@ dependencies = [ "tracing-core", "tracing-log 0.2.0", "tracing-subscriber", - "web-time", + "web-time 0.2.4", ] [[package]] @@ -11770,6 +11792,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webbrowser" version = "0.8.15" diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index ce0ac07ad5..8c9bc9f179 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -24,6 +24,7 @@ enum_dispatch = "0.3" common-meta.workspace = true hydroflow = { git = "https://github.com/GreptimeTeam/hydroflow.git", branch = "main" } itertools.workspace = true +minstant = "0.1.7" num-traits = "0.2" prost.workspace = true query.workspace = true @@ -32,6 +33,7 @@ servers.workspace = true session.workspace = true smallvec.workspace = true snafu.workspace = true +store-api.workspace = true strum.workspace = true substrait.workspace = true table.workspace = true diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index b4d95ba5f0..b4c8f72539 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -14,25 +14,29 @@ //! for getting data from source and sending results to sink //! and communicating with other parts of the database -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; use std::sync::Arc; use common_meta::key::table_info::TableInfoManager; use common_meta::key::table_name::TableNameManager; use hydroflow::scheduled::graph::Hydroflow; +use minstant::Anchor; +use prost::bytes::buf; use query::QueryEngine; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; -use snafu::OptionExt; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionId; use table::metadata::TableId; use tokio::sync::broadcast; use tokio::task::LocalSet; -use crate::adapter::error::TableNotFoundSnafu; +use crate::adapter::error::{EvalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu}; use crate::compute::{Context, DataflowState, ErrCollector}; +use crate::expr::error::InternalSnafu; use crate::expr::GlobalId; use crate::plan::{Plan, TypedPlan}; -use crate::repr::{DiffRow, RelationType}; +use crate::repr::{self, DiffRow, RelationType, Row, BOARDCAST_CAP}; use crate::transform::sql_to_flow_plan; pub(crate) mod error; @@ -43,38 +47,45 @@ use error::Error; pub type TaskId = u64; pub type TableName = Vec; -/// broadcast channel capacity, set to a arbitrary value -pub const BOARDCAST_CAP: usize = 1024; - /// FlowNodeManager manages the state of all tasks in the flow node, which should be run on the same thread /// /// The choice of timestamp is just using current system timestamp for now pub struct FlowNodeManager<'subgraph> { + /// The state of all tasks in the flow node + /// This is also the only field that's not `Send` in the struct pub task_states: BTreeMap>, - pub local_set: LocalSet, // TODO: catalog/tableinfo manager for query schema and translate sql to plan query_engine: Arc, + srv_map: TableIdNameMapper, /// contains mapping from table name to global id, and table schema worker_context: FlowWorkerContext, + tick_manager: FlowTickManager, +} + +#[test] +fn check() { + fn is_send() {} + is_send::(); } /// mapping of table name <-> table id should be query from tableinfo manager -struct TableNameIdMapping { +pub struct TableIdNameMapper { /// for query `TableId -> TableName` mapping table_info_manager: TableInfoManager, - /// for query `TableName -> TableId` mapping - table_name_manager: TableNameManager, - // a in memory cache, will be invalid if necessary } -impl TableNameIdMapping { - pub async fn get_table_id(&self, table_name: TableName) -> Result { - todo!() - } - - pub async fn get_table_name(&self, table_id: TableId) -> Result { - todo!() +impl TableIdNameMapper { + /// query metasrv about the table name and table id + pub async fn get_table_name(&self, table_id: &TableId) -> Result { + self.table_info_manager + .get(*table_id) + .await + .with_context(|_| TableNotFoundMetaSnafu { + msg: format!("TableId = {:?}, couldn't found table name", table_id), + }) + .map(|name| name.unwrap().table_name()) + .map(|name| vec![name.catalog_name, name.schema_name, name.table_name]) } } @@ -110,9 +121,106 @@ impl<'subgraph> ActiveDataflowState<'subgraph> { local_scope: Default::default(), } } + + pub fn set_current_ts(&mut self, ts: repr::Timestamp) { + self.state.set_current_ts(ts); + } + + /// Run all available subgraph + /// + /// return true if any subgraph actually executed + pub fn run_available(&mut self) -> bool { + self.state.run_available_with_schedule(&mut self.df) + } +} + +pub enum DiffRequest { + Insert(Vec), + Delete(Vec), +} + +/// iterate through the diff row and from from continuous diff row with same diff type +pub fn diff_row_to_request(rows: Vec) -> Vec { + let mut reqs = Vec::new(); + for (row, _t, diff) in rows { + let last = reqs.last_mut(); + match (last, diff) { + (Some(DiffRequest::Insert(rows)), 1) => { + rows.push(row); + } + (Some(DiffRequest::Insert(_)), -1) => reqs.push(DiffRequest::Delete(vec![row])), + (Some(DiffRequest::Delete(rows)), -1) => { + rows.push(row); + } + (Some(DiffRequest::Delete(_)), 1) => reqs.push(DiffRequest::Insert(vec![row])), + _ => (), + } + } + reqs } impl<'s> FlowNodeManager<'s> { + /// blocking run the dataflow's grpc service & execution in a `LocalSet` + /// + /// the idomic way to run the dataflow + /// is spawn a new thread, then create a flow node manager, and run the dataflow + /// using this method + pub fn run_dataflow(self, rt: tokio::runtime::Runtime, local_set: LocalSet) { + local_set.block_on(&rt, async move { + // TODO(discord9): might place grpc service on another thread? + let zelf = self; + todo!("main loop"); + }); + } + + /// Run all available subgraph in the flow node + /// This will try to run all dataflow in this node + /// TODO(discord9): add flag for subgraph that have input since last run + pub fn run_available(&mut self) { + let now = self.tick_manager.tick(); + for (task_id, task_state) in self.task_states.iter_mut() { + task_state.set_current_ts(now); + task_state.run_available(); + + // if there is work done, check for new data in the sink + while task_state.run_available() { + let sink_table_name = self.worker_context.task_to_sink.get(task_id).unwrap(); + let sink_buf = self + .worker_context + .sink_buffer + .get_mut(sink_table_name) + .unwrap(); + let sink_recv = self + .worker_context + .sink_receiver + .get_mut(sink_table_name) + .unwrap(); + // TODO(discord9): handle lagging eror + while let Ok(row) = sink_recv.1.try_recv() { + sink_buf.push_back(row); + } + } + } + } + + /// Take everything in sink buffer and construct write request which should be send to the frontend + pub fn take_sink_request_per_table(&mut self) -> Vec<(TableName, Vec)> { + std::mem::take(&mut self.worker_context.sink_buffer) + .into_iter() + .map(|(name, buf)| (name, buf.into_iter().collect())) + .collect() + } + + /// send write request to related source sender + pub async fn handle_write_request( + &mut self, + region_id: RegionId, + rows: Vec, + ) -> Result<(), Error> { + let table_id = region_id.table_id(); + self.worker_context.send(table_id, rows) + } + /// Return task id if a new task is created, otherwise return None /// /// steps to create task: @@ -125,7 +233,7 @@ impl<'s> FlowNodeManager<'s> { &mut self, task_id: TaskId, sink_table_id: TableId, - source_table_ids: SmallVec<[TableId; 2]>, + source_table_ids: &[TableId], create_if_not_exist: bool, expire_when: Option, comment: Option, @@ -138,6 +246,15 @@ impl<'s> FlowNodeManager<'s> { return Ok(None); } } + // assign global id to source and sink table + for source in source_table_ids + .iter() + .chain(std::iter::once(&sink_table_id)) + { + self.worker_context + .assign_global_id_to_table(&self.srv_map, *source) + .await; + } // construct a active dataflow state with it let flow_plan = @@ -145,8 +262,7 @@ impl<'s> FlowNodeManager<'s> { // TODO(discord9): parse `expire_when` - let _sink_gid = - self.create_ctx_and_render(task_id, flow_plan, sink_table_id, source_table_ids)?; + self.create_ctx_and_render(task_id, flow_plan, sink_table_id, source_table_ids)?; Ok(Some(task_id)) } @@ -158,21 +274,30 @@ impl<'s> FlowNodeManager<'s> { task_id: TaskId, plan: TypedPlan, sink_table_id: TableId, - source_table_ids: SmallVec<[TableId; 2]>, - ) -> Result { + source_table_ids: &[TableId], + ) -> Result<(), Error> { let mut cur_task_state = ActiveDataflowState::<'s>::default(); - // 1. render sources - let source_global_ids = source_table_ids - .iter() - .map(|id| self.worker_context.assign_global_id_to_table(*id)) - .collect::>(); - - let sink_global_id = self.worker_context.assign_global_id_to_table(sink_table_id); { + let sink_global_id = self + .worker_context + .table_repr + .get_by_table_id(&sink_table_id) + .with_context(|| TableNotFoundSnafu { + name: sink_table_id.to_string(), + })? + .1; let mut ctx = cur_task_state.new_ctx(sink_global_id); // rendering source now that we have the context - for source in source_global_ids { + for source in source_table_ids { + let source = self + .worker_context + .table_repr + .get_by_table_id(source) + .with_context(|| TableNotFoundSnafu { + name: source.to_string(), + })? + .1; let source_sender = self.worker_context.get_source_by_global_id(&source)?; let recv = source_sender.subscribe(); let bundle = ctx.render_source(recv)?; @@ -184,15 +309,18 @@ impl<'s> FlowNodeManager<'s> { ctx.render_sink(rendered_dataflow, sink_sender); } - // what is wrong with lifetime? ctx is short live than cur_task_state self.task_states.insert(task_id, cur_task_state); - Ok(sink_global_id) + Ok(()) } } /// A context that holds the information of the dataflow #[derive(Default)] pub struct FlowWorkerContext { + /// mapping from source table to tasks, useful for schedule which task to run when a source table is updated + pub source_to_tasks: BTreeMap>, + /// mapping from task to sink table, useful for sending data back to the client when a task is done running + pub task_to_sink: BTreeMap, /// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender /// /// Note that we are getting insert requests with table id, so we should use table id as the key @@ -200,27 +328,89 @@ pub struct FlowWorkerContext { /// broadcast receiver for sink table, there should only be one receiver, and it will receive all the data from the sink table /// /// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key + /// note that the sink receiver should only have one, and we are using broadcast as mpsc channel here pub sink_receiver: BTreeMap, broadcast::Receiver)>, - /// `id` refer to any source table in the dataflow, and `name` is the name of the table - /// which is a `Vec` in substrait - pub id_to_name: HashMap>, - /// see `id_to_name` - pub name_to_id: HashMap, GlobalId>, + /// store sink buffer for each sink table, used for sending data back to the frontend + pub sink_buffer: BTreeMap>, /// the schema of the table pub schema: HashMap, + /// All the tables that have been registered in the worker + pub table_repr: TriMap, } impl FlowWorkerContext { + pub fn send(&mut self, table_id: TableId, rows: Vec) -> Result<(), Error> { + let sender = self + .source_sender + .get(&table_id) + .with_context(|| TableNotFoundSnafu { + name: table_id.to_string(), + })?; + for row in rows { + sender + .send(row) + .map_err(|err| { + InternalSnafu { + reason: format!( + "Failed to send row to table_id = {:?}, error = {:?}", + table_id, err + ), + } + .build() + }) + .with_context(|_| EvalSnafu)?; + } + + Ok(()) + } +} + +impl FlowWorkerContext { + /// mapping source table to task, and sink table to task in worker context + /// + /// also add their corrseponding broadcast sender/receiver + fn register_task_src_sink( + &mut self, + task_id: TaskId, + source_table_ids: &[TableId], + sink_table_id: TableId, + ) { + for source_table_id in source_table_ids { + self.add_source_sender(*source_table_id); + self.source_to_tasks + .entry(*source_table_id) + .or_default() + .insert(task_id); + } + + let sink_table_name = self.table_repr.get_by_table_id(&sink_table_id).unwrap().0; + self.task_to_sink.insert(task_id, sink_table_name); + } + pub fn add_source_sender(&mut self, table_id: TableId) { self.source_sender - .insert(table_id, broadcast::channel(BOARDCAST_CAP).0); + .entry(table_id) + .or_insert_with(|| broadcast::channel(BOARDCAST_CAP).0); } + + pub fn add_sink_receiver(&mut self, table_name: TableName) { + self.sink_receiver + .entry(table_name) + .or_insert_with(|| broadcast::channel(BOARDCAST_CAP)); + } + pub fn get_source_by_global_id( &self, id: &GlobalId, ) -> Result<&broadcast::Sender, Error> { - let table_id = self.get_table_name_id(id)?.1; + let table_id = self + .table_repr + .get_by_global_id(id) + .with_context(|| TableNotFoundSnafu { + name: format!("{:?}", id), + })? + .1; self.source_sender .get(&table_id) .with_context(|| TableNotFoundSnafu { @@ -232,7 +422,13 @@ impl FlowWorkerContext { &self, id: &GlobalId, ) -> Result, Error> { - let table_name = self.get_table_name_id(id)?.0; + let table_name = self + .table_repr + .get_by_global_id(id) + .with_context(|| TableNotFoundSnafu { + name: format!("{:?}", id), + })? + .0; self.sink_receiver .get(&table_name) .map(|(s, r)| s.clone()) @@ -248,9 +444,9 @@ impl FlowWorkerContext { /// Returns an error if no table has been registered with the provided names pub fn table(&self, name: &Vec) -> Result<(GlobalId, RelationType), Error> { let id = self - .name_to_id - .get(name) - .copied() + .table_repr + .get_by_name(name) + .map(|(_tid, gid)| gid) .with_context(|| TableNotFoundSnafu { name: name.join("."), })?; @@ -264,25 +460,87 @@ impl FlowWorkerContext { Ok((id, schema)) } - pub fn table_from_table_id(&self, id: &GlobalId) -> Result<(GlobalId, RelationType), Error> { - todo!() - } - /// Assign a global id to a table, if already assigned, return the existing global id /// /// NOTE: this will not actually render the table into collection refered as GlobalId /// merely creating a mapping from table id to global id - pub fn assign_global_id_to_table(&self, table_id: TableId) -> GlobalId { - todo!() - } - - /// get table name by global id - pub fn get_table_name_id(&self, id: &GlobalId) -> Result<(TableName, TableId), Error> { - todo!() + pub async fn assign_global_id_to_table( + &mut self, + srv_map: &TableIdNameMapper, + table_id: TableId, + ) -> GlobalId { + if let Some((_name, gid)) = self.table_repr.get_by_table_id(&table_id) { + gid + } else { + let global_id = self.new_global_id(); + let table_name = srv_map.get_table_name(&table_id).await.unwrap(); + self.table_repr.insert(table_name, table_id, global_id); + global_id + } } /// Get a new global id pub fn new_global_id(&self) -> GlobalId { - todo!() + GlobalId::User(self.table_repr.global_id_to_name_id.len() as u64) + } +} + +/// A tri-directional map that maps table name, table id, and global id +#[derive(Default)] +pub struct TriMap { + name_to_global_id: HashMap, + id_to_global_id: HashMap, + global_id_to_name_id: BTreeMap, +} + +impl TriMap { + pub fn new() -> Self { + Default::default() + } + + pub fn insert(&mut self, name: TableName, id: TableId, global_id: GlobalId) { + self.name_to_global_id.insert(name.clone(), global_id); + self.id_to_global_id.insert(id, global_id); + self.global_id_to_name_id.insert(global_id, (name, id)); + } + + pub fn get_by_name(&self, name: &TableName) -> Option<(TableId, GlobalId)> { + self.name_to_global_id.get(name).and_then(|global_id| { + self.global_id_to_name_id + .get(global_id) + .map(|(_name, id)| (*id, *global_id)) + }) + } + + pub fn get_by_table_id(&self, id: &TableId) -> Option<(TableName, GlobalId)> { + self.id_to_global_id.get(id).and_then(|global_id| { + self.global_id_to_name_id + .get(global_id) + .map(|(name, _id)| (name.clone(), *global_id)) + }) + } + + pub fn get_by_global_id(&self, global_id: &GlobalId) -> Option<(TableName, TableId)> { + self.global_id_to_name_id + .get(global_id) + .map(|(name, id)| (name.clone(), *id)) + } +} + +/// FlowTickManager is a manager for flow tick +pub struct FlowTickManager { + anchor: Anchor, +} + +impl FlowTickManager { + pub fn new() -> Self { + FlowTickManager { + anchor: Anchor::new(), + } + } + + /// Return the current timestamp in milliseconds + pub fn tick(&self) -> repr::Timestamp { + (minstant::Instant::now().as_unix_nanos(&self.anchor) / 1_000_000) as repr::Timestamp } } diff --git a/src/flow/src/adapter/error.rs b/src/flow/src/adapter/error.rs index 001c176b9b..4ff79dda36 100644 --- a/src/flow/src/adapter/error.rs +++ b/src/flow/src/adapter/error.rs @@ -42,6 +42,13 @@ pub enum Error { #[snafu(display("Table not found: {name}"))] TableNotFound { name: String, location: Location }, + #[snafu(display("Table not found: {msg}, meta error: {source}"))] + TableNotFoundMeta { + source: common_meta::error::Error, + msg: String, + location: Location, + }, + #[snafu(display("Table already exist: {name}"))] TableAlreadyExist { name: String, location: Location }, @@ -110,7 +117,9 @@ impl ErrorExt for Error { StatusCode::Internal } &Self::TableAlreadyExist { .. } => StatusCode::TableAlreadyExists, - Self::TableNotFound { .. } => StatusCode::TableNotFound, + Self::TableNotFound { .. } | Self::TableNotFoundMeta { .. } => { + StatusCode::TableNotFound + } Self::InvalidQueryPlan { .. } | Self::InvalidQuerySubstrait { .. } | Self::InvalidQueryProst { .. } diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index da3620b304..b04828dcc1 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -25,7 +25,7 @@ use crate::adapter::error::{Error, PlanSnafu}; use crate::compute::render::Context; use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff}; use crate::expr::GlobalId; -use crate::repr::{DiffRow, Row}; +use crate::repr::{DiffRow, Row, BOARDCAST_CAP}; #[allow(clippy::mutable_key_type)] impl<'referred, 'df> Context<'referred, 'df> { @@ -84,14 +84,36 @@ impl<'referred, 'df> Context<'referred, 'df> { arranged: _, } = bundle; let mut buf = VecDeque::with_capacity(1000); - self.df + + let schd = self.compute_state.get_scheduler(); + let inner_schd = schd.clone(); + let now = self.compute_state.current_time_ref(); + + let sink = self + .df .add_subgraph_sink("Sink", collection.into_inner(), move |_ctx, recv| { let data = recv.take_inner(); buf.extend(data.into_iter().flat_map(|i| i.into_iter())); - while let Some(row) = buf.pop_front() { - // TODO(discord9): handling tokio broadcast error - let _ = sender.send(row); + if sender.len() >= BOARDCAST_CAP { + return; + } else { + while let Some(row) = buf.pop_front() { + // if the sender is full, stop sending + if sender.len() >= BOARDCAST_CAP { + break; + } + // TODO(discord9): handling tokio broadcast error + let _ = sender.send(row); + } + } + + // if buffer is not empty, schedule the next run at next tick + // so the buffer can be drained as soon as possible + if !buf.is_empty() { + inner_schd.schedule_at(*now.borrow() + 1); } }); + + schd.set_cur_subgraph(sink); } } diff --git a/src/flow/src/repr.rs b/src/flow/src/repr.rs index d21da39579..4b40b80522 100644 --- a/src/flow/src/repr.rs +++ b/src/flow/src/repr.rs @@ -51,6 +51,9 @@ pub type DiffRow = (Row, Timestamp, Diff); /// Row with key-value pair, timestamp and diff pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff); +/// broadcast channel capacity, set to a arbitrary value +pub const BOARDCAST_CAP: usize = 1024; + /// Convert a value that is or can be converted to Datetime to internal timestamp /// /// support types are: `Date`, `DateTime`, `TimeStamp`, `i64` diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 455b2808ac..8fca5f65ac 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -137,17 +137,18 @@ mod test { use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use super::*; + use crate::adapter::TriMap; use crate::repr::ColumnType; pub fn create_test_ctx() -> FlowWorkerContext { let gid = GlobalId::User(0); let name = vec!["numbers".to_string()]; let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); - + let mut tri_map = TriMap::new(); + tri_map.insert(name.clone(), 0, gid); FlowWorkerContext { - id_to_name: HashMap::from([(gid, name.clone())]), - name_to_id: HashMap::from([(name.clone(), gid)]), schema: HashMap::from([(gid, schema)]), + table_repr: tri_map, ..Default::default() } }