fix: flow handle reordered inserts (#5275)

* fix: reorder correct schema

* tests: reorder insert handled correctly

* chore: rm unused

* refactor: per review

* chore: more comment

* chore: per review
This commit is contained in:
discord9
2025-01-03 14:25:39 +08:00
committed by Yingwen
parent f6feac26f5
commit 5e3c5945c4
4 changed files with 304 additions and 14 deletions

View File

@@ -28,7 +28,6 @@ use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use super::util::from_proto_to_data_type;
use crate::adapter::{CreateFlowArgs, FlowWorkerManager};
use crate::error::InternalSnafu;
use crate::metrics::METRIC_FLOW_TASK_COUNT;
@@ -168,7 +167,7 @@ impl Flownode for FlowWorkerManager {
// TODO(discord9): reconsider time assignment mechanism
let now = self.tick_manager.tick();
let fetch_order = {
let (table_types, fetch_order) = {
let ctx = self.node_context.read().await;
// TODO(discord9): also check schema version so that altered table can be reported
@@ -177,6 +176,13 @@ impl Flownode for FlowWorkerManager {
.table_from_id(&table_id)
.await
.map_err(to_meta_err(snafu::location!()))?;
let table_types = table_schema
.typ()
.column_types
.clone()
.into_iter()
.map(|t| t.scalar_type)
.collect_vec();
let table_col_names = table_schema.names;
let table_col_names = table_col_names
.iter().enumerate()
@@ -196,16 +202,19 @@ impl Flownode for FlowWorkerManager {
);
let fetch_order: Vec<usize> = table_col_names
.iter()
.map(|names| {
name_to_col.get(names).copied().context(UnexpectedSnafu {
err_msg: format!("Column not found: {}", names),
})
.map(|col_name| {
name_to_col
.get(col_name)
.copied()
.with_context(|| UnexpectedSnafu {
err_msg: format!("Column not found: {}", col_name),
})
})
.try_collect()?;
if !fetch_order.iter().enumerate().all(|(i, &v)| i == v) {
trace!("Reordering columns: {:?}", fetch_order)
}
fetch_order
(table_types, fetch_order)
};
let rows: Vec<DiffRow> = rows_proto
@@ -220,12 +229,8 @@ impl Flownode for FlowWorkerManager {
})
.map(|r| (r, now, 1))
.collect_vec();
let batch_datatypes = insert_schema
.iter()
.map(from_proto_to_data_type)
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(to_meta_err(snafu::location!()))?;
self.handle_write_request(region_id.into(), rows, &batch_datatypes)
self.handle_write_request(region_id.into(), rows, &table_types)
.await
.map_err(|err| {
common_telemetry::error!(err;"Failed to handle write request");

View File

@@ -128,7 +128,11 @@ impl AggregateExpr {
}
if args.len() != 1 {
return not_impl_err!("Aggregated function with multiple arguments is not supported");
let fn_name = extensions.get(&f.function_reference).cloned();
return not_impl_err!(
"Aggregated function (name={:?}) with multiple arguments is not supported",
fn_name
);
}
let arg = if let Some(first) = args.first() {