chore: cleanup debug log&fix after rebase

This commit is contained in:
discord9
2024-05-09 14:54:47 +08:00
parent f360b2e812
commit 76aadb2223
5 changed files with 6 additions and 19 deletions

View File

@@ -441,14 +441,13 @@ impl FlownodeManager {
if all_reqs.is_empty() || all_reqs.iter().all(|v| v.1.is_empty()) {
return Ok(0);
}
info!("Sending writeback requests, all reqs={:?}", all_reqs);
let mut req_cnt = 0;
for (table_name, reqs) in all_reqs {
if reqs.is_empty() {
continue;
}
let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
let ctx = QueryContext::with(&catalog, &schema);
let ctx = Arc::new(QueryContext::with(&catalog, &schema));
/*let table_id = self
.table_info_source
.get_table_id_from_name(&table_name)
@@ -663,7 +662,7 @@ impl FlownodeManager {
node_ctx.register_task_src_sink(flow_id, source_table_ids, sink_table_name.clone());
// TODO(discord9): pass the actual `QueryContext` in here
node_ctx.query_context = Some(QueryContext::with("greptime", "public"));
node_ctx.query_context = Some(Arc::new(QueryContext::with("greptime", "public")));
// construct a active dataflow state with it
let flow_plan = sql_to_flow_plan(node_ctx.borrow_mut(), &self.query_engine, &sql).await?;
info!("Flow Plan is {:?}", flow_plan);
@@ -781,10 +780,6 @@ impl FlowNodeContext {
})
.with_context(|_| EvalSnafu)?;
}
info!(
"FlowNodeContext send {} rows to table_id = {}",
row_cnt, table_id
);
Ok(row_cnt)
}

View File

@@ -379,9 +379,8 @@ fn reduce_accum_subgraph(
let mut all_updates = Vec::with_capacity(key_to_vals.len());
let mut all_outputs = Vec::with_capacity(key_to_vals.len());
let null_vals = (0..full_aggrs.len()).map(|_| Value::Null);
// lock the arrange for write for the rest of function body
// so to prevent wide race condition since we are going to update the arrangement by write after read
// so to prevent wired race condition since we are going to update the arrangement by write after read
// TODO(discord9): consider key-based lock
let mut arrange = arrange.write();
for (key, value_diffs) in key_to_vals {

View File

@@ -60,10 +60,9 @@ impl<'referred, 'df> Context<'referred, 'df> {
let prev_avail = arr.into_iter().map(|((k, _), t, d)| (k, t, d));
let mut to_send = Vec::new();
let mut to_arrange = Vec::new();
let mut recv_cnt = 0;
// TODO(discord9): handling tokio broadcast error
while let Ok((r, t, d)) = src_recv.try_recv() {
recv_cnt += 1;
if t <= now {
to_send.push((r, t, d));
} else {
@@ -71,12 +70,7 @@ impl<'referred, 'df> Context<'referred, 'df> {
}
}
let all = prev_avail.chain(to_send).collect_vec();
if recv_cnt != 0 {
info!(
"Flow receive {recv_cnt} rows, send as source {} rows at {now}",
all.len()
);
}
err_collector.run(|| arrange_handler_inner.write().apply_updates(now, to_arrange));
send.give(all);
// always schedule source to run at next tick

View File

@@ -154,7 +154,7 @@ mod test {
FlowNodeContext {
schema: HashMap::from([(gid, schema)]),
table_repr: tri_map,
query_context: Some(QueryContext::with("greptime", "public")),
query_context: Some(Arc::new(QueryContext::with("greptime", "public"))),
..Default::default()
}
}

View File

@@ -242,7 +242,6 @@ impl FrontendInvoker for Instance {
requests: RowInsertRequests,
ctx: QueryContextRef,
) -> common_frontend::error::Result<Output> {
info!("Received row inserts: {:?}", requests);
self.inserter
.handle_row_inserts(requests, ctx, &self.statement_executor)
.await