diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 373cc7e891..777dcbcdf8 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -284,12 +284,29 @@ impl FlowWorkerManager { let (catalog, schema) = (table_name[0].clone(), table_name[1].clone()); let ctx = Arc::new(QueryContext::with(&catalog, &schema)); - let (is_ts_placeholder, proto_schema) = self + let (is_ts_placeholder, proto_schema) = match self .try_fetch_existing_table(&table_name) .await? .context(UnexpectedSnafu { reason: format!("Table not found: {}", table_name.join(".")), - })?; + }) { + Ok(r) => r, + Err(e) => { + if self + .table_info_source + .get_opt_table_id_from_name(&table_name) + .await? + .is_none() + { + // deal with both flow&sink table no longer exists + // but some output is still in output buf + common_telemetry::warn!(e; "Table `{}` no longer exists, skip writeback", table_name.join(".")); + continue; + } else { + return Err(e); + } + } + }; let schema_len = proto_schema.len(); let total_rows = reqs.iter().map(|r| r.len()).sum::();