fix(flow): deal with flow drop leftover (#5391)

* fix: deal with flow drop leftover

* chore: make it warn

* chore: apply suggestion.

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* chore: review

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
discord9
2025-01-19 20:55:25 +08:00
committed by GitHub
parent d072801ad6
commit 87c21e2baa

View File

@@ -284,12 +284,29 @@ impl FlowWorkerManager {
let (catalog, schema) = (table_name[0].clone(), table_name[1].clone());
let ctx = Arc::new(QueryContext::with(&catalog, &schema));
let (is_ts_placeholder, proto_schema) = self
let (is_ts_placeholder, proto_schema) = match self
.try_fetch_existing_table(&table_name)
.await?
.context(UnexpectedSnafu {
reason: format!("Table not found: {}", table_name.join(".")),
})?;
}) {
Ok(r) => r,
Err(e) => {
if self
.table_info_source
.get_opt_table_id_from_name(&table_name)
.await?
.is_none()
{
// deal with both flow&sink table no longer exists
// but some output is still in output buf
common_telemetry::warn!(e; "Table `{}` no longer exists, skip writeback", table_name.join("."));
continue;
} else {
return Err(e);
}
}
};
let schema_len = proto_schema.len();
let total_rows = reqs.iter().map(|r| r.len()).sum::<usize>();