chore: debug log

This commit is contained in:
discord9
2024-05-10 14:45:51 +08:00
parent 9ec6107988
commit cfe28b6974
8 changed files with 106 additions and 27 deletions

View File

@@ -304,9 +304,7 @@ impl FlownodeManager {
(primary_keys, schema)
} else {
// TODO(discord9): get ts column from `RelationType` once we are done rewriting flow plan to attach ts
let primary_keys = vec![];
let with_ts = {
let (primary_keys, schema) = {
let node_ctx = self.node_context.lock().await;
let gid: GlobalId = node_ctx
.table_repr
@@ -320,7 +318,17 @@ impl FlownodeManager {
name: format!("Table name = {:?}", table_name),
})?
.clone();
// TODO(discord9): use default key from schema
let primary_keys = schema
.keys
.first()
.map(|v| {
v.column_indices
.iter()
.map(|i| format!("Col_{i}"))
.collect_vec()
})
.unwrap_or_default();
let ts_col = ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
@@ -338,17 +346,18 @@ impl FlownodeManager {
.collect_vec();
let mut with_ts = wout_ts.clone();
with_ts.push(ts_col);
with_ts
(primary_keys, with_ts)
};
(primary_keys, with_ts)
(primary_keys, schema)
};
let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
debug!(
"Sending {} writeback requests to table {}",
"Sending {} writeback requests to table {}, reqs={:?}",
reqs.len(),
table_name.join(".")
table_name.join("."),
reqs
);
for req in reqs {
@@ -455,6 +464,20 @@ impl FlownodeManager {
})
}
/// log all flow errors
pub async fn log_all_errors(&self) {
for (f_id, f_err) in self.flow_err_collectors.read().await.iter() {
let all_errors = f_err.get_all().await;
if !all_errors.is_empty() {
let all_errors = all_errors
.into_iter()
.map(|i| format!("{:?}", i))
.join("\n");
common_telemetry::error!("Flow {} has following errors: {}", f_id, all_errors);
}
}
}
/// Trigger dataflow running, and then send writeback request to the source sender
///
/// note that this method didn't handle input mirror request, as this should be handled by grpc server
@@ -464,6 +487,7 @@ impl FlownodeManager {
self.run_available().await;
// TODO(discord9): error handling
self.send_writeback_requests().await.unwrap();
self.log_all_errors().await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}
@@ -487,6 +511,7 @@ impl FlownodeManager {
rows: Vec<DiffRow>,
) -> Result<(), Error> {
let table_id = region_id.table_id();
debug!("Send {} rows to table {}", rows.len(), table_id);
self.node_context.lock().await.send(table_id, rows)?;
Ok(())
}
@@ -549,7 +574,7 @@ impl FlownodeManager {
node_ctx.query_context = query_ctx.map(Arc::new);
// construct a active dataflow state with it
let flow_plan = sql_to_flow_plan(&mut node_ctx, &self.query_engine, &sql).await?;
info!("Flow Plan is {:?}", flow_plan);
debug!("Flow {:?}'s Plan is {:?}", flow_id, flow_plan);
node_ctx.assign_table_schema(&sink_table_name, flow_plan.typ.clone())?;
let expire_when = expire_when

View File

@@ -15,7 +15,6 @@
use std::collections::BTreeMap;
use std::ops::Range;
use common_telemetry::info;
use datatypes::data_type::ConcreteDataType;
use datatypes::value::{ListValue, Value};
use hydroflow::scheduled::graph_ext::GraphExt;
@@ -81,7 +80,11 @@ impl<'referred, 'df> Context<'referred, 'df> {
out_send_port,
move |_ctx, recv, send| {
// mfp only need to passively receive updates from recvs
let data = recv.take_inner().into_iter().flat_map(|v| v.into_iter());
let data = recv
.take_inner()
.into_iter()
.flat_map(|v| v.into_iter())
.collect_vec();
reduce_subgraph(
&reduce_arrange,

View File

@@ -16,7 +16,7 @@
use std::collections::{BTreeMap, VecDeque};
use common_telemetry::info;
use common_telemetry::{debug, info};
use hydroflow::scheduled::graph_ext::GraphExt;
use itertools::Itertools;
use snafu::OptionExt;
@@ -70,7 +70,13 @@ impl<'referred, 'df> Context<'referred, 'df> {
}
}
let all = prev_avail.chain(to_send).collect_vec();
if !all.is_empty() || !to_arrange.is_empty() {
debug!(
"All send: {} rows, not yet send: {} rows",
all.len(),
to_arrange.len()
);
}
err_collector.run(|| arrange_handler_inner.write().apply_updates(now, to_arrange));
send.give(all);
// always schedule source to run at next tick

View File

@@ -17,6 +17,7 @@
use std::collections::HashMap;
use std::sync::OnceLock;
use common_telemetry::debug;
use common_time::DateTime;
use datafusion_expr::Operator;
use datafusion_substrait::logical_plan::consumer::name_to_op;
@@ -206,8 +207,9 @@ impl UnaryFunc {
from: arg_ty,
to: to.clone(),
}
})?;
Ok(res)
});
debug!("Cast to type: {to:?}, result: {:?}", res);
res
}
}
}

View File

@@ -74,6 +74,7 @@ impl TypedPlan {
let mfp = MapFilterProject::new(input_arity)
.map(exprs)?
.project(input_arity..input_arity + output_arity)?;
let out_typ = self.typ.apply_mfp(&mfp, &expr_typs);
// special case for mfp to compose when the plan is already mfp
let plan = match self.plan {
Plan::Mfp {
@@ -88,8 +89,7 @@ impl TypedPlan {
mfp,
},
};
let typ = RelationType::new(expr_typs);
Ok(TypedPlan { typ, plan })
Ok(TypedPlan { typ: out_typ, plan })
}
/// Add a new filter to the plan, will filter out the records that do not satisfy the filter

View File

@@ -12,11 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, HashMap};
use datatypes::prelude::ConcreteDataType;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use snafu::ensure;
use crate::adapter::error::{InvalidQuerySnafu, Result};
use crate::expr::MapFilterProject;
/// a set of column indices that are "keys" for the collection.
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
@@ -102,6 +106,48 @@ pub struct RelationType {
}
impl RelationType {
pub fn apply_mfp(&self, mfp: &MapFilterProject, expr_typs: &[ColumnType]) -> Self {
let all_types = self
.column_types
.iter()
.chain(expr_typs.iter())
.cloned()
.collect_vec();
let mfp_out_types = mfp
.projection
.iter()
.map(|i| all_types[*i].clone())
.collect_vec();
let old_to_new_col = BTreeMap::from_iter(
mfp.projection
.clone()
.into_iter()
.enumerate()
.map(|(new, old)| (old, new)),
);
// since it's just a mfp, we also try to preserve keys&time index information, if they survive mfp transform
let keys = self
.keys
.iter()
.filter_map(|key| {
key.column_indices
.iter()
.map(|old| old_to_new_col.get(old).cloned())
.collect::<Option<Vec<_>>>()
.map(Key::from)
})
.collect_vec();
let time_index = self
.time_index
.and_then(|old| old_to_new_col.get(&old).cloned());
Self {
column_types: mfp_out_types,
keys,
time_index,
}
}
/// Constructs a `RelationType` representing the relation with no columns and
/// no keys.
pub fn empty() -> Self {

View File

@@ -120,10 +120,6 @@ pub async fn sql_to_flow_plan(
Ok(flow_plan)
}
fn rewrite_to_prop_ts(plan: TypedPlan) -> TypedPlan {
todo!()
}
#[cfg(test)]
mod test {
use std::sync::Arc;

View File

@@ -228,7 +228,7 @@ impl TypedPlan {
return not_impl_err!("Aggregate without an input is not supported");
};
let group_expr =
let group_exprs =
TypedExpr::from_substrait_agg_grouping(ctx, &agg.groupings, &input.typ, extensions)?;
let mut aggr_exprs =
@@ -236,14 +236,14 @@ impl TypedPlan {
let key_val_plan = KeyValPlan::from_substrait_gen_key_val_plan(
&mut aggr_exprs,
&group_expr,
&group_exprs,
input.typ.column_types.len(),
)?;
let output_type = {
let mut output_types = Vec::new();
// first append group_expr as key, then aggr_expr as value
for expr in &group_expr {
for expr in &group_exprs {
output_types.push(expr.typ.clone());
}
@@ -252,7 +252,8 @@ impl TypedPlan {
aggr.func.signature().output.clone(),
));
}
RelationType::new(output_types)
// TODO(discord9): try best to get time
RelationType::new(output_types).with_key((0..group_exprs.len()).collect_vec())
};
// copy aggr_exprs to full_aggrs, and split them into simple_aggrs and distinct_aggrs
@@ -365,8 +366,8 @@ mod test {
};
let expected = TypedPlan {
typ: RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), true),
ColumnType::new(CDT::uint32_datatype(), false),
ColumnType::new(CDT::uint32_datatype(), true), // col sum(number)
ColumnType::new(CDT::uint32_datatype(), false), // col number
]),
plan: Plan::Mfp {
input: Box::new(Plan::Reduce {