From e840bb469d39b5d5880de2d09f0b1b0a8ea59a00 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 9 May 2024 16:51:19 +0800 Subject: [PATCH] chore: refactor& remove TODOs --- src/flow/Cargo.toml | 4 - src/flow/bin/mem_footprint.rs | 15 - src/flow/src/adapter.rs | 643 ++++-------------- .../{standalone.rs => flownode_impl.rs} | 3 +- src/flow/src/adapter/node_context.rs | 332 +++++++++ src/flow/src/adapter/server.rs | 7 +- src/flow/src/adapter/table_source.rs | 176 +++++ src/flow/src/adapter/tests.rs | 3 +- src/flow/src/adapter/util.rs | 12 +- src/flow/src/adapter/worker.rs | 20 +- src/flow/src/compute/render/map.rs | 2 - src/flow/src/compute/types.rs | 6 + src/flow/src/expr/error.rs | 5 + src/flow/src/repr.rs | 2 +- src/flow/src/transform.rs | 14 +- src/flow/src/transform/aggr.rs | 8 +- src/flow/src/transform/expr.rs | 2 +- src/flow/src/transform/plan.rs | 6 +- tests-integration/src/standalone.rs | 2 +- 19 files changed, 700 insertions(+), 562 deletions(-) delete mode 100644 src/flow/bin/mem_footprint.rs rename src/flow/src/adapter/{standalone.rs => flownode_impl.rs} (98%) create mode 100644 src/flow/src/adapter/node_context.rs create mode 100644 src/flow/src/adapter/table_source.rs diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 8ec252ad35..ad4342c76f 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -4,10 +4,6 @@ version.workspace = true edition.workspace = true license.workspace = true -[[bin]] -name = "footprint" -path = "bin/mem_footprint.rs" - [lints] workspace = true diff --git a/src/flow/bin/mem_footprint.rs b/src/flow/bin/mem_footprint.rs deleted file mode 100644 index ca4081d307..0000000000 --- a/src/flow/bin/mem_footprint.rs +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -fn main() {} diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index bfa4922020..9b071db7ea 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -14,10 +14,12 @@ //! for getting data from source and sending results to sink //! and communicating with other parts of the database -use std::borrow::{Borrow, BorrowMut}; +#![warn(unused_imports)] + use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; use std::sync::Arc; +use api::v1::flow::flow_server::Flow; use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests}; use catalog::kvbackend::KvBackendCatalogManager; use catalog::memory::MemoryCatalogManager; @@ -29,7 +31,7 @@ use common_meta::key::table_name::{TableNameKey, TableNameManager}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_runtime::JoinHandle; -use common_telemetry::info; +use common_telemetry::{debug, info}; use datatypes::schema::ColumnSchema; use datatypes::value::Value; use greptime_proto::v1; @@ -50,7 +52,9 @@ use tokio::task::LocalSet; use crate::adapter::error::{ EvalSnafu, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu, }; +pub(crate) use crate::adapter::node_context::{FlownodeContext, IdToNameMap}; use crate::adapter::parse_expr::{parse_duration, parse_fixed}; +use crate::adapter::table_source::TableSource; use crate::adapter::util::column_schemas_to_proto; use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; use crate::compute::{Context, DataflowState, ErrCollector}; @@ -61,14 +65,17 @@ use crate::repr::{self, ColumnType, DiffRow, RelationType, Row, BROADCAST_CAP}; use crate::transform::sql_to_flow_plan; pub(crate) mod error; +mod flownode_impl; mod parse_expr; mod server; -mod standalone; #[cfg(test)] mod tests; mod util; mod worker; +mod node_context; +mod table_source; + use error::Error; pub const PER_REQ_MAX_ROW_CNT: usize = 8192; @@ -180,35 +187,17 @@ pub struct FlownodeManager { /// The query engine that will be used to parse the query and convert it to a dataflow plan query_engine: Arc, /// Getting table name and table schema from table info manager - table_info_source: TableInfoSource, + table_info_source: TableSource, frontend_invoker: RwLock>>, /// contains mapping from table name to global id, and table schema - node_context: Mutex, + node_context: Mutex, + flow_err_collectors: RwLock>, tick_manager: FlowTickManager, node_id: Option, } +/// Building FlownodeManager impl FlownodeManager { - /// run in common_runtime background runtime - pub fn run_background(self: Arc) -> JoinHandle<()> { - info!("Starting flownode manager task"); - common_runtime::spawn_bg(async move { - self.run().await; - }) - } - /// Trigger dataflow running, and then send writeback request to the source sender - /// - /// note that this method didn't handle input mirror request, as this should be handled by grpc server - pub async fn run(&self) { - loop { - // TODO(discord9): only run when new inputs arrive or scheduled to - self.run_available().await; - // TODO(discord9): error handling - self.send_writeback_requests().await.unwrap(); - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - } - /// set frontend invoker pub async fn set_frontend_invoker( self: &Arc, @@ -223,11 +212,11 @@ impl FlownodeManager { query_engine: Arc, table_meta: TableMetadataManagerRef, ) -> Self { - let srv_map = TableInfoSource::new( + let srv_map = TableSource::new( table_meta.table_info_manager().clone(), table_meta.table_name_manager().clone(), ); - let node_context = FlowNodeContext::default(); + let node_context = FlownodeContext::default(); let tick_manager = FlowTickManager::new(); let worker_handles = Vec::new(); FlownodeManager { @@ -236,6 +225,7 @@ impl FlownodeManager { table_info_source: srv_map, frontend_invoker: RwLock::new(None), node_context: Mutex::new(node_context), + flow_err_collectors: Default::default(), tick_manager, node_id, } @@ -259,135 +249,13 @@ impl FlownodeManager { } } -/// Just check if NodeManager's other fields are `Send` so later we can refactor so A Flow Node Manager -/// can manage multiple flow worker(thread) then we can run multiple flow worker in a single flow node manager -#[test] -fn check_is_send() { - fn is_send() {} - is_send::(); -} - -/// mapping of table name <-> table id should be query from tableinfo manager -pub struct TableInfoSource { - /// for query `TableId -> TableName` mapping - table_info_manager: TableInfoManager, - table_name_manager: TableNameManager, -} - -impl TableInfoSource { - pub fn new(table_info_manager: TableInfoManager, table_name_manager: TableNameManager) -> Self { - TableInfoSource { - table_info_manager, - table_name_manager, - } - } - - pub async fn get_table_id_from_proto_name( - &self, - name: &greptime_proto::v1::TableName, - ) -> Result { - self.table_name_manager - .get(TableNameKey::new( - &name.catalog_name, - &name.schema_name, - &name.table_name, - )) - .await - .with_context(|_| TableNotFoundMetaSnafu { - msg: format!("Table name = {:?}, couldn't found table id", name), - }) - .map(|id| id.unwrap().table_id()) - } - - /// If the table havn't been created in database, the tableId returned would be null - pub async fn get_table_id_from_name(&self, name: &TableName) -> Result, Error> { - let ret = self - .table_name_manager - .get(TableNameKey::new(&name[0], &name[1], &name[2])) - .await - .with_context(|_| TableNotFoundMetaSnafu { - msg: format!("Table name = {:?}, couldn't found table id", name), - })? - .map(|id| id.table_id()); - Ok(ret) - } - - /// query metasrv about the table name and table id - pub async fn get_table_name(&self, table_id: &TableId) -> Result { - self.table_info_manager - .get(*table_id) - .await - .with_context(|_| TableNotFoundMetaSnafu { - msg: format!("TableId = {:?}, couldn't found table name", table_id), - }) - .map(|name| name.unwrap().table_name()) - .map(|name| [name.catalog_name, name.schema_name, name.table_name]) - } - /// query metasrv about the table name and table id - pub async fn get_table_info_value( - &self, - table_id: &TableId, - ) -> Result, Error> { - Ok(self - .table_info_manager - .get(*table_id) - .await - .with_context(|_| TableNotFoundMetaSnafu { - msg: format!("TableId = {:?}, couldn't found table name", table_id), - })? - .map(|v| v.into_inner())) - } - - pub async fn get_table_name_schema( - &self, - table_id: &TableId, - ) -> Result<(TableName, RelationType), Error> { - let table_info_value = self - .get_table_info_value(table_id) - .await? - .with_context(|| TableNotFoundSnafu { - name: format!("TableId = {:?}, Can't found table info", table_id), - })?; - - let table_name = table_info_value.table_name(); - let table_name = [ - table_name.catalog_name, - table_name.schema_name, - table_name.table_name, - ]; - - let raw_schema = table_info_value.table_info.meta.schema; - let column_types = raw_schema - .column_schemas - .into_iter() - .map(|col| ColumnType { - nullable: col.is_nullable(), - scalar_type: col.data_type, - }) - .collect_vec(); - - let key = table_info_value.table_info.meta.primary_key_indices; - let keys = vec![repr::Key::from(key)]; - - let time_index = raw_schema.timestamp_index; - Ok(( - table_name, - RelationType { - column_types, - keys, - time_index, - }, - )) - } -} - #[derive(Debug)] pub enum DiffRequest { Insert(Vec<(Row, repr::Timestamp)>), Delete(Vec<(Row, repr::Timestamp)>), } -/// iterate through the diff row and from from continuous diff row with same diff type +/// iterate through the diff row and form continuous diff row with same diff type pub fn diff_row_to_request(rows: Vec) -> Vec { let mut reqs = Vec::new(); for (row, ts, diff) in rows { @@ -409,30 +277,8 @@ pub fn diff_row_to_request(rows: Vec) -> Vec { reqs } +/// This impl block contains methods to send writeback requests to frontend impl FlownodeManager { - /// Run all available subgraph in the flow node - /// This will try to run all dataflow in this node - /// - /// However this is not blocking and can sometimes return while actual computation is still running in worker thread - /// TODO(discord9): add flag for subgraph that have input since last run - pub async fn run_available(&self) { - let now = self.tick_manager.tick(); - for worker in self.worker_handles.iter() { - worker.lock().await.run_available(now).await; - } - } - - /// send write request to related source sender - pub async fn handle_write_request( - &self, - region_id: RegionId, - rows: Vec, - ) -> Result<(), Error> { - let table_id = region_id.table_id(); - self.node_context.lock().await.send(table_id, rows)?; - Ok(()) - } - /// TODO(discord9): merge all same type of diff row into one requests /// /// Return the number of requests it made @@ -448,70 +294,70 @@ impl FlownodeManager { } let (catalog, schema) = (table_name[0].clone(), table_name[1].clone()); let ctx = Arc::new(QueryContext::with(&catalog, &schema)); - /*let table_id = self + // TODO(discord9): instead of auto build table from request schema, actually build table + // before `create flow` to be able to assign pk and ts etc. + let (primary_keys, schema) = if let Some(table_id) = self .table_info_source .get_table_id_from_name(&table_name) - .await?; - let table_info = self - .table_info_source - .get_table_info_value(&table_id) .await? - .unwrap(); - let primary_keys = table_info - .table_info - .meta - .primary_key_indices - .into_iter() - .map(|i| { - table_info.table_info.meta.schema.column_schemas[i] - .name - .clone() - }) - .collect_vec(); - let schema = table_info.table_info.meta.schema.column_schemas; - */ - let primary_keys = vec![]; - - let (schema_wout_ts, with_ts) = { - let node_ctx = self.node_context.lock().await; - let gid: GlobalId = node_ctx - .table_repr - .get_by_name(&table_name) - .map(|x| x.1) + { + let table_info = self + .table_info_source + .get_table_info_value(&table_id) + .await? .unwrap(); - let schema = node_ctx - .schema - .get(&gid) - .with_context(|| TableNotFoundSnafu { - name: format!("Table name = {:?}", table_name), - })? - .clone(); - - let ts_col = ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - true, - ) - .with_time_index(true); - - let wout_ts = schema - .column_types + let meta = table_info.table_info.meta; + let primary_keys = meta + .primary_key_indices .into_iter() - .enumerate() - .map(|(idx, typ)| { - ColumnSchema::new(format!("Col_{idx}"), typ.scalar_type, typ.nullable) - }) + .map(|i| meta.schema.column_schemas[i].name.clone()) .collect_vec(); - let mut with_ts = wout_ts.clone(); - with_ts.push(ts_col); - (wout_ts, with_ts) + let schema = meta.schema.column_schemas; + (primary_keys, schema) + } else { + // TODO(discord9): get ts column from `RelationType` once we are done rewriting flow plan to attach ts + let primary_keys = vec![]; + + let with_ts = { + let node_ctx = self.node_context.lock().await; + let gid: GlobalId = node_ctx + .table_repr + .get_by_name(&table_name) + .map(|x| x.1) + .unwrap(); + let schema = node_ctx + .schema + .get(&gid) + .with_context(|| TableNotFoundSnafu { + name: format!("Table name = {:?}", table_name), + })? + .clone(); + + let ts_col = ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + true, + ) + .with_time_index(true); + + let wout_ts = schema + .column_types + .into_iter() + .enumerate() + .map(|(idx, typ)| { + ColumnSchema::new(format!("Col_{idx}"), typ.scalar_type, typ.nullable) + }) + .collect_vec(); + let mut with_ts = wout_ts.clone(); + with_ts.push(ts_col); + with_ts + }; + (primary_keys, with_ts) }; - let _proto_schema_wout_ts = column_schemas_to_proto(schema_wout_ts, &primary_keys)?; + let proto_schema = column_schemas_to_proto(schema, &primary_keys)?; - let proto_schema_with_ts = column_schemas_to_proto(with_ts, &primary_keys)?; - - info!( + debug!( "Sending {} writeback requests to table {}", reqs.len(), table_name.join(".") @@ -533,7 +379,7 @@ impl FlownodeManager { let req = RowInsertRequest { table_name, rows: Some(v1::Rows { - schema: proto_schema_with_ts.clone(), + schema: proto_schema.clone(), rows: rows_proto, }), }; @@ -565,7 +411,7 @@ impl FlownodeManager { let req = RowDeleteRequest { table_name, rows: Some(v1::Rows { - schema: proto_schema_with_ts.clone(), + schema: proto_schema.clone(), rows: rows_proto, }), }; @@ -609,7 +455,57 @@ impl FlownodeManager { } output } +} +/// Flow Runtime related methods +impl FlownodeManager { + /// run in common_runtime background runtime + pub fn run_background(self: Arc) -> JoinHandle<()> { + info!("Starting flownode manager's background task"); + common_runtime::spawn_bg(async move { + self.run().await; + }) + } + + /// Trigger dataflow running, and then send writeback request to the source sender + /// + /// note that this method didn't handle input mirror request, as this should be handled by grpc server + pub async fn run(&self) { + loop { + // TODO(discord9): only run when new inputs arrive or scheduled to + self.run_available().await; + // TODO(discord9): error handling + self.send_writeback_requests().await.unwrap(); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + + /// Run all available subgraph in the flow node + /// This will try to run all dataflow in this node + /// + /// However this is not blocking and can sometimes return while actual computation is still running in worker thread + /// TODO(discord9): add flag for subgraph that have input since last run + pub async fn run_available(&self) { + let now = self.tick_manager.tick(); + for worker in self.worker_handles.iter() { + worker.lock().await.run_available(now).await; + } + } + + /// send write request to related source sender + pub async fn handle_write_request( + &self, + region_id: RegionId, + rows: Vec, + ) -> Result<(), Error> { + let table_id = region_id.table_id(); + self.node_context.lock().await.send(table_id, rows)?; + Ok(()) + } +} + +/// Create&Remove flow +impl FlownodeManager { /// remove a flow by it's id pub async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> { for handle in self.worker_handles.iter() { @@ -662,14 +558,12 @@ impl FlownodeManager { node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_name.clone()); - // TODO(discord9): pass the actual `QueryContext` in here node_ctx.query_context = query_ctx.map(Arc::new); // construct a active dataflow state with it - let flow_plan = sql_to_flow_plan(node_ctx.borrow_mut(), &self.query_engine, &sql).await?; + let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?; info!("Flow Plan is {:?}", flow_plan); node_ctx.assign_table_schema(&sink_table_name, flow_plan.typ.clone())?; - // TODO(discord9): parse `expire_when` let expire_when = expire_when .and_then(|s| { if s.is_empty() || s.split_whitespace().join("").is_empty() { @@ -701,7 +595,11 @@ impl FlownodeManager { .iter() .map(|id| node_ctx.get_source_by_global_id(id).map(|s| s.subscribe())) .collect::, _>>()?; - + let err_collector = ErrCollector::default(); + self.flow_err_collectors + .write() + .await + .insert(flow_id, err_collector.clone()); let handle = &self.worker_handles[0].lock().await; handle .create_flow( @@ -713,6 +611,7 @@ impl FlownodeManager { source_senders, expire_when, create_if_not_exist, + err_collector, ) .await?; info!("Successfully create flow with id={}", flow_id); @@ -720,278 +619,10 @@ impl FlownodeManager { } } -/// A context that holds the information of the dataflow -#[derive(Default)] -pub struct FlowNodeContext { - /// mapping from source table to tasks, useful for schedule which task to run when a source table is updated - pub source_to_tasks: BTreeMap>, - /// mapping from task to sink table, useful for sending data back to the client when a task is done running - pub flow_to_sink: BTreeMap, - /// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender - /// - /// Note that we are getting insert requests with table id, so we should use table id as the key - pub source_sender: BTreeMap>, - /// broadcast receiver for sink table, there should only be one receiver, and it will receive all the data from the sink table - /// - /// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key - /// note that the sink receiver should only have one, and we are using broadcast as mpsc channel here - pub sink_receiver: BTreeMap< - TableName, - ( - mpsc::UnboundedSender, - mpsc::UnboundedReceiver, - ), - >, - /// store source in buffer for each source table, in case broadcast channel is full - pub send_buffer: BTreeMap>, - /// the schema of the table, query from metasrv or infered from TypedPlan - pub schema: HashMap, - /// All the tables that have been registered in the worker - pub table_repr: TriMap, - pub query_context: Option>, -} - -impl FlowNodeContext { - // return number of rows it actuall send(including what's in the buffer) - pub fn send(&mut self, table_id: TableId, rows: Vec) -> Result { - let sender = self - .source_sender - .get(&table_id) - .with_context(|| TableNotFoundSnafu { - name: table_id.to_string(), - })?; - let send_buffer = self.send_buffer.entry(table_id).or_default(); - send_buffer.extend(rows); - let mut row_cnt = 0; - while let Some(row) = send_buffer.pop_front() { - if sender.len() >= BROADCAST_CAP { - break; - } - row_cnt += 1; - sender - .send(row) - .map_err(|err| { - InternalSnafu { - reason: format!( - "Failed to send row to table_id = {:?}, error = {:?}", - table_id, err - ), - } - .build() - }) - .with_context(|_| EvalSnafu)?; - } - - Ok(row_cnt) - } -} - -impl FlowNodeContext { - /// mapping source table to task, and sink table to task in worker context - /// - /// also add their corrseponding broadcast sender/receiver - fn register_task_src_sink( - &mut self, - task_id: FlowId, - source_table_ids: &[TableId], - sink_table_name: TableName, - ) { - for source_table_id in source_table_ids { - self.add_source_sender(*source_table_id); - self.source_to_tasks - .entry(*source_table_id) - .or_default() - .insert(task_id); - } - - self.add_sink_receiver(sink_table_name.clone()); - self.flow_to_sink.insert(task_id, sink_table_name); - } - - pub fn add_source_sender(&mut self, table_id: TableId) { - self.source_sender - .entry(table_id) - .or_insert_with(|| broadcast::channel(BROADCAST_CAP).0); - } - - pub fn add_sink_receiver(&mut self, table_name: TableName) { - self.sink_receiver - .entry(table_name) - .or_insert_with(mpsc::unbounded_channel::); - } - - pub fn get_source_by_global_id( - &self, - id: &GlobalId, - ) -> Result<&broadcast::Sender, Error> { - let table_id = self - .table_repr - .get_by_global_id(id) - .with_context(|| TableNotFoundSnafu { - name: format!("Global Id = {:?}", id), - })? - .1 - .with_context(|| TableNotFoundSnafu { - name: format!("Table Id = {:?}", id), - })?; - self.source_sender - .get(&table_id) - .with_context(|| TableNotFoundSnafu { - name: table_id.to_string(), - }) - } - - pub fn get_sink_by_global_id( - &self, - id: &GlobalId, - ) -> Result, Error> { - let table_name = self - .table_repr - .get_by_global_id(id) - .with_context(|| TableNotFoundSnafu { - name: format!("{:?}", id), - })? - .0 - .with_context(|| TableNotFoundSnafu { - name: format!("Global Id = {:?}", id), - })?; - self.sink_receiver - .get(&table_name) - .map(|(s, _r)| s.clone()) - .with_context(|| TableNotFoundSnafu { - name: table_name.join("."), - }) - } -} - -impl FlowNodeContext { - /// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function. - /// - /// Returns an error if no table has been registered with the provided names - pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationType), Error> { - let id = self - .table_repr - .get_by_name(name) - .map(|(_tid, gid)| gid) - .with_context(|| TableNotFoundSnafu { - name: name.join("."), - })?; - let schema = self - .schema - .get(&id) - .cloned() - .with_context(|| TableNotFoundSnafu { - name: name.join("."), - })?; - Ok((id, schema)) - } - - /// Assign a global id to a table, if already assigned, return the existing global id - /// - /// require at least one of `table_name` or `table_id` to be `Some` - /// - /// and will try to fetch the schema from table info manager(if table exist now) - /// - /// NOTE: this will not actually render the table into collection refered as GlobalId - /// merely creating a mapping from table id to global id - pub async fn assign_global_id_to_table( - &mut self, - srv_map: &TableInfoSource, - mut table_name: Option, - table_id: Option, - ) -> Result { - // if we can find by table name/id. not assign it - if let Some(gid) = table_name - .as_ref() - .and_then(|table_name| self.table_repr.get_by_name(table_name)) - .map(|(_, gid)| gid) - .or_else(|| { - table_id - .and_then(|id| self.table_repr.get_by_table_id(&id)) - .map(|(_, gid)| gid) - }) - { - Ok(gid) - } else { - let global_id = self.new_global_id(); - - if let Some(table_id) = table_id { - let (known_table_name, schema) = srv_map.get_table_name_schema(&table_id).await?; - table_name = table_name.or(Some(known_table_name)); - self.schema.insert(global_id, schema); - } // if we don't have table id, it means database havn't assign one yet or we don't need it - - self.table_repr.insert(table_name, table_id, global_id); - Ok(global_id) - } - } - - /// Assign a schema to a table - /// - /// TODO(discord9): error handling - pub fn assign_table_schema( - &mut self, - table_name: &TableName, - schema: RelationType, - ) -> Result<(), Error> { - let gid = self - .table_repr - .get_by_name(table_name) - .map(|(_, gid)| gid) - .unwrap(); - self.schema.insert(gid, schema); - Ok(()) - } - - /// Get a new global id - pub fn new_global_id(&self) -> GlobalId { - GlobalId::User(self.table_repr.global_id_to_name_id.len() as u64) - } -} - -/// A tri-directional map that maps table name, table id, and global id -#[derive(Default, Debug)] -pub struct TriMap { - name_to_global_id: HashMap, - id_to_global_id: HashMap, - global_id_to_name_id: BTreeMap, Option)>, -} - -impl TriMap { - pub fn new() -> Self { - Default::default() - } - - pub fn insert(&mut self, name: Option, id: Option, global_id: GlobalId) { - name.clone() - .and_then(|name| self.name_to_global_id.insert(name.clone(), global_id)); - id.and_then(|id| self.id_to_global_id.insert(id, global_id)); - self.global_id_to_name_id.insert(global_id, (name, id)); - } - - pub fn get_by_name(&self, name: &TableName) -> Option<(Option, GlobalId)> { - self.name_to_global_id.get(name).map(|global_id| { - let (_name, id) = self.global_id_to_name_id.get(global_id).unwrap(); - (*id, *global_id) - }) - } - - pub fn get_by_table_id(&self, id: &TableId) -> Option<(Option, GlobalId)> { - self.id_to_global_id.get(id).map(|global_id| { - let (name, _id) = self.global_id_to_name_id.get(global_id).unwrap(); - (name.clone(), *global_id) - }) - } - - pub fn get_by_global_id( - &self, - global_id: &GlobalId, - ) -> Option<(Option, Option)> { - self.global_id_to_name_id.get(global_id).cloned() - } -} - -/// FlowTickManager is a manager for flow tick +/// FlowTickManager is a manager for flow tick, which trakc flow execution progress +/// +/// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid +/// TSO coord mess #[derive(Clone)] pub struct FlowTickManager { anchor: Anchor, diff --git a/src/flow/src/adapter/standalone.rs b/src/flow/src/adapter/flownode_impl.rs similarity index 98% rename from src/flow/src/adapter/standalone.rs rename to src/flow/src/adapter/flownode_impl.rs index 773d3d0bc8..f5f32e4df0 100644 --- a/src/flow/src/adapter/standalone.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! impl `FlowNode` trait for FlowNodeManager so standalone can easily call them +//! impl `FlowNode` trait for FlowNodeManager so standalone can call them use api::v1::flow::{flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse}; use api::v1::region::InsertRequests; @@ -26,6 +26,7 @@ use crate::adapter::FlownodeManager; use crate::repr::{self, DiffRow}; fn to_meta_err(err: crate::adapter::error::Error) -> common_meta::error::Error { + // TODO(discord9): refactor this Err::<(), _>(BoxedError::new(err)) .with_context(|_| ExternalSnafu) .unwrap_err() diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs new file mode 100644 index 0000000000..46d710eca7 --- /dev/null +++ b/src/flow/src/adapter/node_context.rs @@ -0,0 +1,332 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Node context, prone to change with every incoming requests + +use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; +use std::sync::Arc; + +use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests}; +use catalog::kvbackend::KvBackendCatalogManager; +use catalog::memory::MemoryCatalogManager; +use common_base::Plugins; +use common_error::ext::BoxedError; +use common_frontend::handler::FrontendInvoker; +use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; +use common_meta::key::table_name::{TableNameKey, TableNameManager}; +use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::kv_backend::KvBackendRef; +use common_runtime::JoinHandle; +use common_telemetry::info; +use datatypes::schema::ColumnSchema; +use datatypes::value::Value; +use greptime_proto::v1; +use hydroflow::scheduled::graph::Hydroflow; +use itertools::Itertools; +use minstant::Anchor; +use prost::bytes::buf; +use query::{QueryEngine, QueryEngineFactory}; +use serde::{Deserialize, Serialize}; +use session::context::QueryContext; +use smallvec::SmallVec; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::{ConcreteDataType, RegionId}; +use table::metadata::TableId; +use tokio::sync::{broadcast, mpsc, oneshot, Mutex, RwLock}; +use tokio::task::LocalSet; + +use crate::adapter::error::{ + Error, EvalSnafu, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu, +}; +use crate::adapter::parse_expr::{parse_duration, parse_fixed}; +use crate::adapter::util::column_schemas_to_proto; +use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; +use crate::adapter::{FlowId, TableName, TableSource}; +use crate::compute::{Context, DataflowState, ErrCollector}; +use crate::expr::error::InternalSnafu; +use crate::expr::GlobalId; +use crate::plan::{Plan, TypedPlan}; +use crate::repr::{self, ColumnType, DiffRow, RelationType, Row, BROADCAST_CAP}; +use crate::transform::sql_to_flow_plan; + +/// A context that holds the information of the dataflow +#[derive(Default)] +pub struct FlownodeContext { + /// mapping from source table to tasks, useful for schedule which task to run when a source table is updated + pub source_to_tasks: BTreeMap>, + /// mapping from task to sink table, useful for sending data back to the client when a task is done running + pub flow_to_sink: BTreeMap, + /// broadcast sender for source table, any incoming write request will be sent to the source table's corresponding sender + /// + /// Note that we are getting insert requests with table id, so we should use table id as the key + pub source_sender: BTreeMap>, + /// broadcast receiver for sink table, there should only be one receiver, and it will receive all the data from the sink table + /// + /// and send it back to the client, since we are mocking the sink table as a client, we should use table name as the key + /// note that the sink receiver should only have one, and we are using broadcast as mpsc channel here + pub sink_receiver: BTreeMap< + TableName, + ( + mpsc::UnboundedSender, + mpsc::UnboundedReceiver, + ), + >, + /// store source in buffer for each source table, in case broadcast channel is full + pub send_buffer: BTreeMap>, + /// the schema of the table, query from metasrv or infered from TypedPlan + pub schema: HashMap, + /// All the tables that have been registered in the worker + pub table_repr: IdToNameMap, + pub query_context: Option>, +} + +impl FlownodeContext { + // return number of rows it actuall send(including what's in the buffer) + pub fn send(&mut self, table_id: TableId, rows: Vec) -> Result { + let sender = self + .source_sender + .get(&table_id) + .with_context(|| TableNotFoundSnafu { + name: table_id.to_string(), + })?; + let send_buffer = self.send_buffer.entry(table_id).or_default(); + send_buffer.extend(rows); + let mut row_cnt = 0; + while let Some(row) = send_buffer.pop_front() { + if sender.len() >= BROADCAST_CAP { + break; + } + row_cnt += 1; + sender + .send(row) + .map_err(|err| { + InternalSnafu { + reason: format!( + "Failed to send row to table_id = {:?}, error = {:?}", + table_id, err + ), + } + .build() + }) + .with_context(|_| EvalSnafu)?; + } + + Ok(row_cnt) + } +} + +impl FlownodeContext { + /// mapping source table to task, and sink table to task in worker context + /// + /// also add their corrseponding broadcast sender/receiver + pub fn register_task_src_sink( + &mut self, + task_id: FlowId, + source_table_ids: &[TableId], + sink_table_name: TableName, + ) { + for source_table_id in source_table_ids { + self.add_source_sender(*source_table_id); + self.source_to_tasks + .entry(*source_table_id) + .or_default() + .insert(task_id); + } + + self.add_sink_receiver(sink_table_name.clone()); + self.flow_to_sink.insert(task_id, sink_table_name); + } + + pub fn add_source_sender(&mut self, table_id: TableId) { + self.source_sender + .entry(table_id) + .or_insert_with(|| broadcast::channel(BROADCAST_CAP).0); + } + + pub fn add_sink_receiver(&mut self, table_name: TableName) { + self.sink_receiver + .entry(table_name) + .or_insert_with(mpsc::unbounded_channel::); + } + + pub fn get_source_by_global_id( + &self, + id: &GlobalId, + ) -> Result<&broadcast::Sender, Error> { + let table_id = self + .table_repr + .get_by_global_id(id) + .with_context(|| TableNotFoundSnafu { + name: format!("Global Id = {:?}", id), + })? + .1 + .with_context(|| TableNotFoundSnafu { + name: format!("Table Id = {:?}", id), + })?; + self.source_sender + .get(&table_id) + .with_context(|| TableNotFoundSnafu { + name: table_id.to_string(), + }) + } + + pub fn get_sink_by_global_id( + &self, + id: &GlobalId, + ) -> Result, Error> { + let table_name = self + .table_repr + .get_by_global_id(id) + .with_context(|| TableNotFoundSnafu { + name: format!("{:?}", id), + })? + .0 + .with_context(|| TableNotFoundSnafu { + name: format!("Global Id = {:?}", id), + })?; + self.sink_receiver + .get(&table_name) + .map(|(s, _r)| s.clone()) + .with_context(|| TableNotFoundSnafu { + name: table_name.join("."), + }) + } +} + +impl FlownodeContext { + /// Retrieves a GlobalId and table schema representing a table previously registered by calling the [register_table] function. + /// + /// Returns an error if no table has been registered with the provided names + pub fn table(&self, name: &TableName) -> Result<(GlobalId, RelationType), Error> { + let id = self + .table_repr + .get_by_name(name) + .map(|(_tid, gid)| gid) + .with_context(|| TableNotFoundSnafu { + name: name.join("."), + })?; + let schema = self + .schema + .get(&id) + .cloned() + .with_context(|| TableNotFoundSnafu { + name: name.join("."), + })?; + Ok((id, schema)) + } + + /// Assign a global id to a table, if already assigned, return the existing global id + /// + /// require at least one of `table_name` or `table_id` to be `Some` + /// + /// and will try to fetch the schema from table info manager(if table exist now) + /// + /// NOTE: this will not actually render the table into collection refered as GlobalId + /// merely creating a mapping from table id to global id + pub async fn assign_global_id_to_table( + &mut self, + srv_map: &TableSource, + mut table_name: Option, + table_id: Option, + ) -> Result { + // if we can find by table name/id. not assign it + if let Some(gid) = table_name + .as_ref() + .and_then(|table_name| self.table_repr.get_by_name(table_name)) + .map(|(_, gid)| gid) + .or_else(|| { + table_id + .and_then(|id| self.table_repr.get_by_table_id(&id)) + .map(|(_, gid)| gid) + }) + { + Ok(gid) + } else { + let global_id = self.new_global_id(); + + if let Some(table_id) = table_id { + let (known_table_name, schema) = srv_map.get_table_name_schema(&table_id).await?; + table_name = table_name.or(Some(known_table_name)); + self.schema.insert(global_id, schema); + } // if we don't have table id, it means database havn't assign one yet or we don't need it + + self.table_repr.insert(table_name, table_id, global_id); + Ok(global_id) + } + } + + /// Assign a schema to a table + /// + /// TODO(discord9): error handling + pub fn assign_table_schema( + &mut self, + table_name: &TableName, + schema: RelationType, + ) -> Result<(), Error> { + let gid = self + .table_repr + .get_by_name(table_name) + .map(|(_, gid)| gid) + .unwrap(); + self.schema.insert(gid, schema); + Ok(()) + } + + /// Get a new global id + pub fn new_global_id(&self) -> GlobalId { + GlobalId::User(self.table_repr.global_id_to_name_id.len() as u64) + } +} + +/// A tri-directional map that maps table name, table id, and global id +#[derive(Default, Debug)] +pub struct IdToNameMap { + name_to_global_id: HashMap, + id_to_global_id: HashMap, + global_id_to_name_id: BTreeMap, Option)>, +} + +impl IdToNameMap { + pub fn new() -> Self { + Default::default() + } + + pub fn insert(&mut self, name: Option, id: Option, global_id: GlobalId) { + name.clone() + .and_then(|name| self.name_to_global_id.insert(name.clone(), global_id)); + id.and_then(|id| self.id_to_global_id.insert(id, global_id)); + self.global_id_to_name_id.insert(global_id, (name, id)); + } + + pub fn get_by_name(&self, name: &TableName) -> Option<(Option, GlobalId)> { + self.name_to_global_id.get(name).map(|global_id| { + let (_name, id) = self.global_id_to_name_id.get(global_id).unwrap(); + (*id, *global_id) + }) + } + + pub fn get_by_table_id(&self, id: &TableId) -> Option<(Option, GlobalId)> { + self.id_to_global_id.get(id).map(|global_id| { + let (name, _id) = self.global_id_to_name_id.get(global_id).unwrap(); + (name.clone(), *global_id) + }) + } + + pub fn get_by_global_id( + &self, + global_id: &GlobalId, + ) -> Option<(Option, Option)> { + self.global_id_to_name_id.get(global_id).cloned() + } +} diff --git a/src/flow/src/adapter/server.rs b/src/flow/src/adapter/server.rs index d8e2714a70..a6363057d1 100644 --- a/src/flow/src/adapter/server.rs +++ b/src/flow/src/adapter/server.rs @@ -37,6 +37,7 @@ use crate::adapter::{FlownodeManager, FlownodeManagerRef}; use crate::repr::{self, DiffRow}; pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER"; +/// wrapping flow node manager to avoid orphan rule with Arc<...> #[derive(Clone)] pub struct FlowService { pub manager: FlownodeManagerRef, @@ -86,19 +87,19 @@ impl flow_server::Flow for FlowService { } } -pub struct FlowNodeServer { +pub struct FlownodeServer { pub shutdown_tx: Mutex>>, pub flow_service: FlowService, } -impl FlowNodeServer { +impl FlownodeServer { pub fn create_flow_service(&self) -> flow_server::FlowServer { flow_server::FlowServer::new(self.flow_service.clone()) } } #[async_trait::async_trait] -impl servers::server::Server for FlowNodeServer { +impl servers::server::Server for FlownodeServer { async fn shutdown(&self) -> Result<(), servers::error::Error> { let mut shutdown_tx = self.shutdown_tx.lock().await; if let Some(tx) = shutdown_tx.take() { diff --git a/src/flow/src/adapter/table_source.rs b/src/flow/src/adapter/table_source.rs new file mode 100644 index 0000000000..f69daefed4 --- /dev/null +++ b/src/flow/src/adapter/table_source.rs @@ -0,0 +1,176 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! How to query table information from database + +use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque}; +use std::sync::Arc; + +use api::v1::{RowDeleteRequest, RowDeleteRequests, RowInsertRequest, RowInsertRequests}; +use catalog::kvbackend::KvBackendCatalogManager; +use catalog::memory::MemoryCatalogManager; +use common_base::Plugins; +use common_error::ext::BoxedError; +use common_frontend::handler::FrontendInvoker; +use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; +use common_meta::key::table_name::{TableNameKey, TableNameManager}; +use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::kv_backend::KvBackendRef; +use common_runtime::JoinHandle; +use common_telemetry::info; +use datatypes::schema::ColumnSchema; +use datatypes::value::Value; +use greptime_proto::v1; +use hydroflow::scheduled::graph::Hydroflow; +use itertools::Itertools; +use minstant::Anchor; +use prost::bytes::buf; +use query::{QueryEngine, QueryEngineFactory}; +use serde::{Deserialize, Serialize}; +use session::context::QueryContext; +use smallvec::SmallVec; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::{ConcreteDataType, RegionId}; +use table::metadata::TableId; +use tokio::sync::{broadcast, mpsc, oneshot, Mutex, RwLock}; +use tokio::task::LocalSet; + +use crate::adapter::error::{ + Error, EvalSnafu, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu, +}; +pub(crate) use crate::adapter::node_context::{FlownodeContext, IdToNameMap}; +use crate::adapter::parse_expr::{parse_duration, parse_fixed}; +use crate::adapter::util::column_schemas_to_proto; +use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; +use crate::adapter::TableName; +use crate::compute::{Context, DataflowState, ErrCollector}; +use crate::expr::error::InternalSnafu; +use crate::expr::GlobalId; +use crate::plan::{Plan, TypedPlan}; +use crate::repr::{self, ColumnType, DiffRow, RelationType, Row, BROADCAST_CAP}; +use crate::transform::sql_to_flow_plan; + +/// mapping of table name <-> table id should be query from tableinfo manager +pub struct TableSource { + /// for query `TableId -> TableName` mapping + table_info_manager: TableInfoManager, + table_name_manager: TableNameManager, +} + +impl TableSource { + pub fn new(table_info_manager: TableInfoManager, table_name_manager: TableNameManager) -> Self { + TableSource { + table_info_manager, + table_name_manager, + } + } + + pub async fn get_table_id_from_proto_name( + &self, + name: &greptime_proto::v1::TableName, + ) -> Result { + self.table_name_manager + .get(TableNameKey::new( + &name.catalog_name, + &name.schema_name, + &name.table_name, + )) + .await + .with_context(|_| TableNotFoundMetaSnafu { + msg: format!("Table name = {:?}, couldn't found table id", name), + }) + .map(|id| id.unwrap().table_id()) + } + + /// If the table havn't been created in database, the tableId returned would be null + pub async fn get_table_id_from_name(&self, name: &TableName) -> Result, Error> { + let ret = self + .table_name_manager + .get(TableNameKey::new(&name[0], &name[1], &name[2])) + .await + .with_context(|_| TableNotFoundMetaSnafu { + msg: format!("Table name = {:?}, couldn't found table id", name), + })? + .map(|id| id.table_id()); + Ok(ret) + } + + /// query metasrv about the table name and table id + pub async fn get_table_name(&self, table_id: &TableId) -> Result { + self.table_info_manager + .get(*table_id) + .await + .with_context(|_| TableNotFoundMetaSnafu { + msg: format!("TableId = {:?}, couldn't found table name", table_id), + }) + .map(|name| name.unwrap().table_name()) + .map(|name| [name.catalog_name, name.schema_name, name.table_name]) + } + /// query metasrv about the table name and table id + pub async fn get_table_info_value( + &self, + table_id: &TableId, + ) -> Result, Error> { + Ok(self + .table_info_manager + .get(*table_id) + .await + .with_context(|_| TableNotFoundMetaSnafu { + msg: format!("TableId = {:?}, couldn't found table name", table_id), + })? + .map(|v| v.into_inner())) + } + + pub async fn get_table_name_schema( + &self, + table_id: &TableId, + ) -> Result<(TableName, RelationType), Error> { + let table_info_value = self + .get_table_info_value(table_id) + .await? + .with_context(|| TableNotFoundSnafu { + name: format!("TableId = {:?}, Can't found table info", table_id), + })?; + + let table_name = table_info_value.table_name(); + let table_name = [ + table_name.catalog_name, + table_name.schema_name, + table_name.table_name, + ]; + + let raw_schema = table_info_value.table_info.meta.schema; + let column_types = raw_schema + .column_schemas + .into_iter() + .map(|col| ColumnType { + nullable: col.is_nullable(), + scalar_type: col.data_type, + }) + .collect_vec(); + + let key = table_info_value.table_info.meta.primary_key_indices; + let keys = vec![repr::Key::from(key)]; + + let time_index = raw_schema.timestamp_index; + Ok(( + table_name, + RelationType { + column_types, + keys, + time_index, + }, + )) + } +} diff --git a/src/flow/src/adapter/tests.rs b/src/flow/src/adapter/tests.rs index 263b45f917..01206fb46b 100644 --- a/src/flow/src/adapter/tests.rs +++ b/src/flow/src/adapter/tests.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Somewhere integration-ish(more like mock) test for adapter module +//! Mock test for adapter module +//! TODO(discord9): write mock test use common_meta::key::table_info::TableInfoKey; use common_meta::kv_backend::memory::MemoryKvBackend; diff --git a/src/flow/src/adapter/util.rs b/src/flow/src/adapter/util.rs index f50badd203..f1997c70e0 100644 --- a/src/flow/src/adapter/util.rs +++ b/src/flow/src/adapter/util.rs @@ -14,11 +14,14 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::{ColumnDataType, ColumnDataTypeExtension, SemanticType}; +use common_error::ext::BoxedError; use datatypes::schema::{ColumnSchema, COMMENT_KEY}; +use itertools::Itertools; +use snafu::ResultExt; -use crate::adapter::error::Error; +use crate::adapter::error::{Error, ExternalSnafu}; -/// TODO(discord9): error handling +/// convert `ColumnSchema` lists to it's corrsponding proto type pub fn column_schemas_to_proto( column_schemas: Vec, primary_keys: &[String], @@ -28,9 +31,10 @@ pub fn column_schemas_to_proto( .map(|c| { ColumnDataTypeWrapper::try_from(c.data_type.clone()) .map(|w| w.to_parts()) - .unwrap() + .map_err(BoxedError::new) + .with_context(|_| ExternalSnafu) }) - .collect::>(); + .try_collect()?; let ret = column_schemas .iter() diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index 0fbd9e6b68..db881295d1 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -107,12 +107,6 @@ pub struct WorkerHandle { itc_client: Mutex, } -#[test] -fn check_if_send_sync() { - fn check() {} - check::(); -} - impl WorkerHandle { /// create task, return task id /// @@ -127,6 +121,7 @@ impl WorkerHandle { src_recvs: Vec>, expire_when: Option, create_if_not_exist: bool, + err_collector: ErrCollector, ) -> Result, Error> { let req = Request::Create { task_id, @@ -137,6 +132,7 @@ impl WorkerHandle { src_recvs, expire_when, create_if_not_exist, + err_collector, }; let ret = self.itc_client.lock().await.call_blocking(req).await?; @@ -235,9 +231,10 @@ impl<'s> Worker<'s> { sink_sender: mpsc::UnboundedSender, source_ids: &[GlobalId], src_recvs: Vec>, - // TODO(discord9): set expire duration for all arrangment and compare to sys timestamp instead + // TODO(discord9): set expire duration for all arrangement and compare to sys timestamp instead expire_when: Option, create_if_not_exist: bool, + err_collector: ErrCollector, ) -> Result, Error> { let _ = expire_when; if create_if_not_exist { @@ -247,7 +244,10 @@ impl<'s> Worker<'s> { } } - let mut cur_task_state = ActiveDataflowState::<'s>::default(); + let mut cur_task_state = ActiveDataflowState::<'s> { + err_collector, + ..Default::default() + }; { let mut ctx = cur_task_state.new_ctx(sink_id); @@ -305,6 +305,7 @@ impl<'s> Worker<'s> { src_recvs, expire_when, create_if_not_exist, + err_collector, } => { let task_create_result = self.create_flow( task_id, @@ -315,6 +316,7 @@ impl<'s> Worker<'s> { src_recvs, expire_when, create_if_not_exist, + err_collector, ); Some(( req_id, @@ -352,6 +354,7 @@ enum Request { src_recvs: Vec>, expire_when: Option, create_if_not_exist: bool, + err_collector: ErrCollector, }, Remove { task_id: FlowId, @@ -494,6 +497,7 @@ mod test { vec![rx], None, true, + ErrCollector::default(), ) .await .unwrap(); diff --git a/src/flow/src/compute/render/map.rs b/src/flow/src/compute/render/map.rs index f94125c4a1..4c4b41953c 100644 --- a/src/flow/src/compute/render/map.rs +++ b/src/flow/src/compute/render/map.rs @@ -145,8 +145,6 @@ fn mfp_subgraph( /// The core of evaluating MFP operator, given a MFP and a input, evaluate the MFP operator, /// return the output updates **And** possibly any number of errors that occurred during the evaluation -/// -/// TODO(discord9): deal with primary key overwrite issue fn eval_mfp_core( input: impl IntoIterator, mfp_plan: &MfpPlan, diff --git a/src/flow/src/compute/types.rs b/src/flow/src/compute/types.rs index 42b102e072..fa8c7315cb 100644 --- a/src/flow/src/compute/types.rs +++ b/src/flow/src/compute/types.rs @@ -21,6 +21,7 @@ use hydroflow::scheduled::graph::Hydroflow; use hydroflow::scheduled::handoff::TeeingHandoff; use hydroflow::scheduled::port::RecvPort; use hydroflow::scheduled::SubgraphId; +use itertools::Itertools; use tokio::sync::{Mutex, RwLock}; use crate::compute::render::Context; @@ -152,9 +153,14 @@ pub struct ErrCollector { } impl ErrCollector { + pub async fn get_all(&self) -> Vec { + self.inner.lock().await.drain(..).collect_vec() + } + pub fn is_empty(&self) -> bool { self.inner.blocking_lock().is_empty() } + pub fn push_err(&self, err: EvalError) { self.inner.blocking_lock().push_back(err) } diff --git a/src/flow/src/expr/error.rs b/src/flow/src/expr/error.rs index bcdcfe36be..7c0310d6ff 100644 --- a/src/flow/src/expr/error.rs +++ b/src/flow/src/expr/error.rs @@ -23,6 +23,11 @@ use datatypes::data_type::ConcreteDataType; use serde::{Deserialize, Serialize}; use snafu::{Location, Snafu}; +fn is_send_sync() { + fn check() {} + check::(); +} + /// EvalError is about errors happen on columnar evaluation /// /// TODO(discord9): add detailed location of column/operator(instead of code) to errors tp help identify related column diff --git a/src/flow/src/repr.rs b/src/flow/src/repr.rs index 1269b1f076..85bdfa8e4a 100644 --- a/src/flow/src/repr.rs +++ b/src/flow/src/repr.rs @@ -51,7 +51,7 @@ pub type DiffRow = (Row, Timestamp, Diff); /// Row with key-value pair, timestamp and diff pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff); -/// broadcast channel capacity, set to a arbitrary value +/// broadcast channel capacity pub const BROADCAST_CAP: usize = 1024; /// Convert a value that is or can be converted to Datetime to internal timestamp diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index c7a113f32a..bfdf50ce1b 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -29,7 +29,7 @@ use crate::adapter::error::{ Error, InvalidQueryPlanSnafu, InvalidQueryProstSnafu, InvalidQuerySubstraitSnafu, NotImplementedSnafu, TableNotFoundSnafu, }; -use crate::adapter::FlowNodeContext; +use crate::adapter::FlownodeContext; use crate::expr::GlobalId; use crate::plan::TypedPlan; use crate::repr::RelationType; @@ -94,10 +94,8 @@ impl FunctionExtensions { /// To reuse existing code for parse sql, the sql is first parsed into a datafusion logical plan, /// then to a substrait plan, and finally to a flow plan. -/// -/// TODO(discord9): check if use empty `QueryContext` influence anything pub async fn sql_to_flow_plan( - ctx: &mut FlowNodeContext, + ctx: &mut FlownodeContext, engine: &Arc, sql: &str, ) -> Result { @@ -138,10 +136,10 @@ mod test { use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use super::*; - use crate::adapter::TriMap; + use crate::adapter::IdToNameMap; use crate::repr::ColumnType; - pub fn create_test_ctx() -> FlowNodeContext { + pub fn create_test_ctx() -> FlownodeContext { let gid = GlobalId::User(0); let name = [ "greptime".to_string(), @@ -149,9 +147,9 @@ mod test { "numbers".to_string(), ]; let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); - let mut tri_map = TriMap::new(); + let mut tri_map = IdToNameMap::new(); tri_map.insert(Some(name.clone()), Some(0), gid); - FlowNodeContext { + FlownodeContext { schema: HashMap::from([(gid, schema)]), table_repr: tri_map, query_context: Some(Arc::new(QueryContext::with("greptime", "public"))), diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index 3ff166b269..61fd7a20dd 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -54,11 +54,11 @@ use crate::expr::{ }; use crate::plan::{AccumulablePlan, AggrWithIndex, KeyValPlan, Plan, ReducePlan, TypedPlan}; use crate::repr::{self, ColumnType, RelationType}; -use crate::transform::{FlowNodeContext, FunctionExtensions}; +use crate::transform::{FlownodeContext, FunctionExtensions}; impl TypedExpr { fn from_substrait_agg_grouping( - ctx: &mut FlowNodeContext, + ctx: &mut FlownodeContext, groupings: &[Grouping], typ: &RelationType, extensions: &FunctionExtensions, @@ -84,7 +84,7 @@ impl TypedExpr { impl AggregateExpr { fn from_substrait_agg_measures( - ctx: &mut FlowNodeContext, + ctx: &mut FlownodeContext, measures: &[Measure], typ: &RelationType, extensions: &FunctionExtensions, @@ -218,7 +218,7 @@ impl KeyValPlan { impl TypedPlan { /// Convert AggregateRel into Flow's TypedPlan pub fn from_substrait_agg_rel( - ctx: &mut FlowNodeContext, + ctx: &mut FlownodeContext, agg: &proto::AggregateRel, extensions: &FunctionExtensions, ) -> Result { diff --git a/src/flow/src/transform/expr.rs b/src/flow/src/transform/expr.rs index 068f7159bb..5010684792 100644 --- a/src/flow/src/transform/expr.rs +++ b/src/flow/src/transform/expr.rs @@ -100,7 +100,7 @@ impl TypedExpr { }, ) .unzip(); - info!("Function: {:?}", f); + match arg_len { // because variadic function can also have 1 arguments, we need to check if it's a variadic function first 1 if VariadicFunc::from_str_and_types(fn_name, &arg_types).is_err() => { diff --git a/src/flow/src/transform/plan.rs b/src/flow/src/transform/plan.rs index 50d9363fca..6bc9ce1d83 100644 --- a/src/flow/src/transform/plan.rs +++ b/src/flow/src/transform/plan.rs @@ -23,12 +23,12 @@ use crate::adapter::error::{Error, InvalidQuerySnafu, NotImplementedSnafu, PlanS use crate::expr::{MapFilterProject, TypedExpr}; use crate::plan::{Plan, TypedPlan}; use crate::repr::{self, RelationType}; -use crate::transform::{FlowNodeContext, FunctionExtensions}; +use crate::transform::{FlownodeContext, FunctionExtensions}; impl TypedPlan { /// Convert Substrait Plan into Flow's TypedPlan pub fn from_substrait_plan( - ctx: &mut FlowNodeContext, + ctx: &mut FlownodeContext, plan: &SubPlan, ) -> Result { // Register function extension @@ -62,7 +62,7 @@ impl TypedPlan { /// Convert Substrait Rel into Flow's TypedPlan /// TODO: SELECT DISTINCT(does it get compile with something else?) pub fn from_substrait_rel( - ctx: &mut FlowNodeContext, + ctx: &mut FlownodeContext, rel: &Rel, extensions: &FunctionExtensions, ) -> Result { diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 4316166a8c..c66d42d855 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -210,7 +210,7 @@ impl GreptimeDbStandaloneBuilder { flownode .set_frontend_invoker(Box::new(instance.clone())) .await; - flownode.run_background().await.unwrap(); + let _node_handle = flownode.run_background(); procedure_manager.start().await.unwrap(); wal_options_allocator.start().await.unwrap();