From 76aadb2223cf86bb50e4c3a33a9423029511fe10 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 9 May 2024 14:54:47 +0800 Subject: [PATCH] chore: cleanup debug log&fix after rebase --- src/flow/src/adapter.rs | 9 ++------- src/flow/src/compute/render/reduce.rs | 3 +-- src/flow/src/compute/render/src_sink.rs | 10 ++-------- src/flow/src/transform.rs | 2 +- src/frontend/src/instance.rs | 1 - 5 files changed, 6 insertions(+), 19 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 94ad7f8baa..20191080d7 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -441,14 +441,13 @@ impl FlownodeManager { if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) { return Ok(0); } - info!("Sending writeback requests, all reqs={:?}", all_reqs); let mut req_cnt = 0; for (table_name, reqs) in all_reqs { if reqs.is_empty() { continue; } let (catalog, schema) = (table_name[0].clone(), table_name[1].clone()); - let ctx = QueryContext::with(&catalog, &schema); + let ctx = Arc::new(QueryContext::with(&catalog, &schema)); /*let table_id = self .table_info_source .get_table_id_from_name(&table_name) @@ -663,7 +662,7 @@ impl FlownodeManager { node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_name.clone()); // TODO(discord9): pass the actual `QueryContext` in here - node_ctx.query_context = Some(QueryContext::with("greptime", "public")); + node_ctx.query_context = Some(Arc::new(QueryContext::with("greptime", "public"))); // construct a active dataflow state with it let flow_plan = sql_to_flow_plan(node_ctx.borrow_mut(), &self.query_engine, &sql).await?; info!("Flow Plan is {:?}", flow_plan); @@ -781,10 +780,6 @@ impl FlowNodeContext { }) .with_context(|_| EvalSnafu)?; } - info!( - "FlowNodeContext send {} rows to table_id = {}", - row_cnt, table_id - ); Ok(row_cnt) } diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index 27287f0d77..72b2487249 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -379,9 +379,8 @@ fn reduce_accum_subgraph( let mut all_updates = Vec::with_capacity(key_to_vals.len()); let mut all_outputs = Vec::with_capacity(key_to_vals.len()); - let null_vals = (0..full_aggrs.len()).map(|_| Value::Null); // lock the arrange for write for the rest of function body - // so to prevent wide race condition since we are going to update the arrangement by write after read + // so to prevent wired race condition since we are going to update the arrangement by write after read // TODO(discord9): consider key-based lock let mut arrange = arrange.write(); for (key, value_diffs) in key_to_vals { diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index 9cc2ff60a0..e9cf718f7c 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -60,10 +60,9 @@ impl<'referred, 'df> Context<'referred, 'df> { let prev_avail = arr.into_iter().map(|((k, _), t, d)| (k, t, d)); let mut to_send = Vec::new(); let mut to_arrange = Vec::new(); - let mut recv_cnt = 0; + // TODO(discord9): handling tokio broadcast error while let Ok((r, t, d)) = src_recv.try_recv() { - recv_cnt += 1; if t <= now { to_send.push((r, t, d)); } else { @@ -71,12 +70,7 @@ impl<'referred, 'df> Context<'referred, 'df> { } } let all = prev_avail.chain(to_send).collect_vec(); - if recv_cnt != 0 { - info!( - "Flow receive {recv_cnt} rows, send as source {} rows at {now}", - all.len() - ); - } + err_collector.run(|| arrange_handler_inner.write().apply_updates(now, to_arrange)); send.give(all); // always schedule source to run at next tick diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 4c2a9678d4..c7a113f32a 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -154,7 +154,7 @@ mod test { FlowNodeContext { schema: HashMap::from([(gid, schema)]), table_repr: tri_map, - query_context: Some(QueryContext::with("greptime", "public")), + query_context: Some(Arc::new(QueryContext::with("greptime", "public"))), ..Default::default() } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 4cd23e64f7..096c3de247 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -242,7 +242,6 @@ impl FrontendInvoker for Instance { requests: RowInsertRequests, ctx: QueryContextRef, ) -> common_frontend::error::Result { - info!("Received row inserts: {:?}", requests); self.inserter .handle_row_inserts(requests, ctx, &self.statement_executor) .await