From cfe28b6974b4269a240f62e8dacee8855b27e1cf Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 10 May 2024 14:45:51 +0800 Subject: [PATCH] chore: debug log --- src/flow/src/adapter.rs | 43 ++++++++++++++++++----- src/flow/src/compute/render/reduce.rs | 7 ++-- src/flow/src/compute/render/src_sink.rs | 10 ++++-- src/flow/src/expr/func.rs | 6 ++-- src/flow/src/plan.rs | 4 +-- src/flow/src/repr/relation.rs | 46 +++++++++++++++++++++++++ src/flow/src/transform.rs | 4 --- src/flow/src/transform/aggr.rs | 13 +++---- 8 files changed, 106 insertions(+), 27 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index b6c6e4baee..317620b68d 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -304,9 +304,7 @@ impl FlownodeManager { (primary_keys, schema) } else { // TODO(discord9): get ts column from `RelationType` once we are done rewriting flow plan to attach ts - let primary_keys = vec![]; - - let with_ts = { + let (primary_keys, schema) = { let node_ctx = self.node_context.lock().await; let gid: GlobalId = node_ctx .table_repr @@ -320,7 +318,17 @@ impl FlownodeManager { name: format!("Table name = {:?}", table_name), })? .clone(); - + // TODO(discord9): use default key from schema + let primary_keys = schema + .keys + .first() + .map(|v| { + v.column_indices + .iter() + .map(|i| format!("Col_{i}")) + .collect_vec() + }) + .unwrap_or_default(); let ts_col = ColumnSchema::new( "ts", ConcreteDataType::timestamp_millisecond_datatype(), @@ -338,17 +346,18 @@ impl FlownodeManager { .collect_vec(); let mut with_ts = wout_ts.clone(); with_ts.push(ts_col); - with_ts + (primary_keys, with_ts) }; - (primary_keys, with_ts) + (primary_keys, schema) }; let proto_schema = column_schemas_to_proto(schema, &primary_keys)?; debug!( - "Sending {} writeback requests to table {}", + "Sending {} writeback requests to table {}, reqs={:?}", reqs.len(), - table_name.join(".") + table_name.join("."), + reqs ); for req in reqs { @@ -455,6 +464,20 @@ impl FlownodeManager { }) } + /// log all flow errors + pub async fn log_all_errors(&self) { + for (f_id, f_err) in self.flow_err_collectors.read().await.iter() { + let all_errors = f_err.get_all().await; + if !all_errors.is_empty() { + let all_errors = all_errors + .into_iter() + .map(|i| format!("{:?}", i)) + .join("\n"); + common_telemetry::error!("Flow {} has following errors: {}", f_id, all_errors); + } + } + } + /// Trigger dataflow running, and then send writeback request to the source sender /// /// note that this method didn't handle input mirror request, as this should be handled by grpc server @@ -464,6 +487,7 @@ impl FlownodeManager { self.run_available().await; // TODO(discord9): error handling self.send_writeback_requests().await.unwrap(); + self.log_all_errors().await; tokio::time::sleep(std::time::Duration::from_secs(1)).await; } } @@ -487,6 +511,7 @@ impl FlownodeManager { rows: Vec, ) -> Result<(), Error> { let table_id = region_id.table_id(); + debug!("Send {} rows to table {}", rows.len(), table_id); self.node_context.lock().await.send(table_id, rows)?; Ok(()) } @@ -549,7 +574,7 @@ impl FlownodeManager { node_ctx.query_context = query_ctx.map(Arc::new); // construct a active dataflow state with it let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?; - info!("Flow Plan is {:?}", flow_plan); + debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan); node_ctx.assign_table_schema(&sink_table_name, flow_plan.typ.clone())?; let expire_when = expire_when diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index 72b2487249..c8ebefb7fb 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -15,7 +15,6 @@ use std::collections::BTreeMap; use std::ops::Range; -use common_telemetry::info; use datatypes::data_type::ConcreteDataType; use datatypes::value::{ListValue, Value}; use hydroflow::scheduled::graph_ext::GraphExt; @@ -81,7 +80,11 @@ impl<'referred, 'df> Context<'referred, 'df> { out_send_port, move |_ctx, recv, send| { // mfp only need to passively receive updates from recvs - let data = recv.take_inner().into_iter().flat_map(|v| v.into_iter()); + let data = recv + .take_inner() + .into_iter() + .flat_map(|v| v.into_iter()) + .collect_vec(); reduce_subgraph( &reduce_arrange, diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index e9cf718f7c..77f3e41053 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -16,7 +16,7 @@ use std::collections::{BTreeMap, VecDeque}; -use common_telemetry::info; +use common_telemetry::{debug, info}; use hydroflow::scheduled::graph_ext::GraphExt; use itertools::Itertools; use snafu::OptionExt; @@ -70,7 +70,13 @@ impl<'referred, 'df> Context<'referred, 'df> { } } let all = prev_avail.chain(to_send).collect_vec(); - + if !all.is_empty() || !to_arrange.is_empty() { + debug!( + "All send: {} rows, not yet send: {} rows", + all.len(), + to_arrange.len() + ); + } err_collector.run(|| arrange_handler_inner.write().apply_updates(now, to_arrange)); send.give(all); // always schedule source to run at next tick diff --git a/src/flow/src/expr/func.rs b/src/flow/src/expr/func.rs index 518bb14ade..6c6ebc6349 100644 --- a/src/flow/src/expr/func.rs +++ b/src/flow/src/expr/func.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::sync::OnceLock; +use common_telemetry::debug; use common_time::DateTime; use datafusion_expr::Operator; use datafusion_substrait::logical_plan::consumer::name_to_op; @@ -206,8 +207,9 @@ impl UnaryFunc { from: arg_ty, to: to.clone(), } - })?; - Ok(res) + }); + debug!("Cast to type: {to:?}, result: {:?}", res); + res } } } diff --git a/src/flow/src/plan.rs b/src/flow/src/plan.rs index 993666294b..2bbd45cb7e 100644 --- a/src/flow/src/plan.rs +++ b/src/flow/src/plan.rs @@ -74,6 +74,7 @@ impl TypedPlan { let mfp = MapFilterProject::new(input_arity) .map(exprs)? .project(input_arity..input_arity + output_arity)?; + let out_typ = self.typ.apply_mfp(&mfp, &expr_typs); // special case for mfp to compose when the plan is already mfp let plan = match self.plan { Plan::Mfp { @@ -88,8 +89,7 @@ impl TypedPlan { mfp, }, }; - let typ = RelationType::new(expr_typs); - Ok(TypedPlan { typ, plan }) + Ok(TypedPlan { typ: out_typ, plan }) } /// Add a new filter to the plan, will filter out the records that do not satisfy the filter diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index 3714209cc9..38780398cd 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -12,11 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::{BTreeMap, HashMap}; + use datatypes::prelude::ConcreteDataType; +use itertools::Itertools; use serde::{Deserialize, Serialize}; use snafu::ensure; use crate::adapter::error::{InvalidQuerySnafu, Result}; +use crate::expr::MapFilterProject; /// a set of column indices that are "keys" for the collection. #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)] @@ -102,6 +106,48 @@ pub struct RelationType { } impl RelationType { + pub fn apply_mfp(&self, mfp: &MapFilterProject, expr_typs: &[ColumnType]) -> Self { + let all_types = self + .column_types + .iter() + .chain(expr_typs.iter()) + .cloned() + .collect_vec(); + let mfp_out_types = mfp + .projection + .iter() + .map(|i| all_types[*i].clone()) + .collect_vec(); + let old_to_new_col = BTreeMap::from_iter( + mfp.projection + .clone() + .into_iter() + .enumerate() + .map(|(new, old)| (old, new)), + ); + + // since it's just a mfp, we also try to preserve keys&time index information, if they survive mfp transform + let keys = self + .keys + .iter() + .filter_map(|key| { + key.column_indices + .iter() + .map(|old| old_to_new_col.get(old).cloned()) + .collect::>>() + .map(Key::from) + }) + .collect_vec(); + + let time_index = self + .time_index + .and_then(|old| old_to_new_col.get(&old).cloned()); + Self { + column_types: mfp_out_types, + keys, + time_index, + } + } /// Constructs a `RelationType` representing the relation with no columns and /// no keys. pub fn empty() -> Self { diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 5c85fa8f2b..bc24a0de69 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -120,10 +120,6 @@ pub async fn sql_to_flow_plan( Ok(flow_plan) } -fn rewrite_to_prop_ts(plan: TypedPlan) -> TypedPlan { - todo!() -} - #[cfg(test)] mod test { use std::sync::Arc; diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index 61fd7a20dd..50ecb21ebd 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -228,7 +228,7 @@ impl TypedPlan { return not_impl_err!("Aggregate without an input is not supported"); }; - let group_expr = + let group_exprs = TypedExpr::from_substrait_agg_grouping(ctx, &agg.groupings, &input.typ, extensions)?; let mut aggr_exprs = @@ -236,14 +236,14 @@ impl TypedPlan { let key_val_plan = KeyValPlan::from_substrait_gen_key_val_plan( &mut aggr_exprs, - &group_expr, + &group_exprs, input.typ.column_types.len(), )?; let output_type = { let mut output_types = Vec::new(); // first append group_expr as key, then aggr_expr as value - for expr in &group_expr { + for expr in &group_exprs { output_types.push(expr.typ.clone()); } @@ -252,7 +252,8 @@ impl TypedPlan { aggr.func.signature().output.clone(), )); } - RelationType::new(output_types) + // TODO(discord9): try best to get time + RelationType::new(output_types).with_key((0..group_exprs.len()).collect_vec()) }; // copy aggr_exprs to full_aggrs, and split them into simple_aggrs and distinct_aggrs @@ -365,8 +366,8 @@ mod test { }; let expected = TypedPlan { typ: RelationType::new(vec![ - ColumnType::new(CDT::uint32_datatype(), true), - ColumnType::new(CDT::uint32_datatype(), false), + ColumnType::new(CDT::uint32_datatype(), true), // col sum(number) + ColumnType::new(CDT::uint32_datatype(), false), // col number ]), plan: Plan::Mfp { input: Box::new(Plan::Reduce {