diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index fd858ca5dd..826b527fe8 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -28,7 +28,6 @@ use itertools::Itertools; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; -use super::util::from_proto_to_data_type; use crate::adapter::{CreateFlowArgs, FlowWorkerManager}; use crate::error::InternalSnafu; use crate::metrics::METRIC_FLOW_TASK_COUNT; @@ -168,7 +167,7 @@ impl Flownode for FlowWorkerManager { // TODO(discord9): reconsider time assignment mechanism let now = self.tick_manager.tick(); - let fetch_order = { + let (table_types, fetch_order) = { let ctx = self.node_context.read().await; // TODO(discord9): also check schema version so that altered table can be reported @@ -177,6 +176,13 @@ impl Flownode for FlowWorkerManager { .table_from_id(&table_id) .await .map_err(to_meta_err(snafu::location!()))?; + let table_types = table_schema + .typ() + .column_types + .clone() + .into_iter() + .map(|t| t.scalar_type) + .collect_vec(); let table_col_names = table_schema.names; let table_col_names = table_col_names .iter().enumerate() @@ -196,16 +202,19 @@ impl Flownode for FlowWorkerManager { ); let fetch_order: Vec = table_col_names .iter() - .map(|names| { - name_to_col.get(names).copied().context(UnexpectedSnafu { - err_msg: format!("Column not found: {}", names), - }) + .map(|col_name| { + name_to_col + .get(col_name) + .copied() + .with_context(|| UnexpectedSnafu { + err_msg: format!("Column not found: {}", col_name), + }) }) .try_collect()?; if !fetch_order.iter().enumerate().all(|(i, &v)| i == v) { trace!("Reordering columns: {:?}", fetch_order) } - fetch_order + (table_types, fetch_order) }; let rows: Vec = rows_proto @@ -220,12 +229,8 @@ impl Flownode for FlowWorkerManager { }) .map(|r| (r, now, 1)) .collect_vec(); - let batch_datatypes = insert_schema - .iter() - .map(from_proto_to_data_type) - .collect::, _>>() - .map_err(to_meta_err(snafu::location!()))?; - self.handle_write_request(region_id.into(), rows, &batch_datatypes) + + self.handle_write_request(region_id.into(), rows, &table_types) .await .map_err(|err| { common_telemetry::error!(err;"Failed to handle write request"); diff --git a/src/flow/src/transform/aggr.rs b/src/flow/src/transform/aggr.rs index b944e3b263..27e6798e8d 100644 --- a/src/flow/src/transform/aggr.rs +++ b/src/flow/src/transform/aggr.rs @@ -128,7 +128,11 @@ impl AggregateExpr { } if args.len() != 1 { - return not_impl_err!("Aggregated function with multiple arguments is not supported"); + let fn_name = extensions.get(&f.function_reference).cloned(); + return not_impl_err!( + "Aggregated function (name={:?}) with multiple arguments is not supported", + fn_name + ); } let arg = if let Some(first) = args.first() { diff --git a/tests/cases/standalone/common/flow/flow_insert.result b/tests/cases/standalone/common/flow/flow_insert.result new file mode 100644 index 0000000000..b1797b2e1d --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_insert.result @@ -0,0 +1,185 @@ +-- test if reordered insert is correctly handled +CREATE TABLE bytes_log ( + byte INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + -- event time + TIME INDEX(ts) +); + +Affected Rows: 0 + +-- TODO(discord9): remove this after auto infer table's time index is impl +CREATE TABLE approx_rate ( + rate DOUBLE, + time_window TIMESTAMP, + update_at TIMESTAMP, + TIME INDEX(time_window) +); + +Affected Rows: 0 + +CREATE FLOW find_approx_rate SINK TO approx_rate AS +SELECT + (max(byte) - min(byte)) / 30.0 as rate, + date_bin(INTERVAL '30 second', ts) as time_window +from + bytes_log +GROUP BY + time_window; + +Affected Rows: 0 + +SHOW CREATE TABLE approx_rate; + ++-------------+--------------------------------------------+ +| Table | Create Table | ++-------------+--------------------------------------------+ +| approx_rate | CREATE TABLE IF NOT EXISTS "approx_rate" ( | +| | "rate" DOUBLE NULL, | +| | "time_window" TIMESTAMP(3) NOT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | TIME INDEX ("time_window") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++-------------+--------------------------------------------+ + +-- reordered insert, also test if null is handled correctly +INSERT INTO + bytes_log (ts, byte) +VALUES + ('2023-01-01 00:00:01', NULL), + ('2023-01-01 00:00:29', 300); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('find_approx_rate') | ++--------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------+ + +SELECT + rate, + time_window +FROM + approx_rate; + ++------+---------------------+ +| rate | time_window | ++------+---------------------+ +| 0.0 | 2023-01-01T00:00:00 | ++------+---------------------+ + +-- reordered insert, also test if null is handled correctly +INSERT INTO + bytes_log (ts, byte) +VALUES + ('2022-01-01 00:00:01', NULL), + ('2022-01-01 00:00:29', NULL); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('find_approx_rate') | ++--------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------+ + +SELECT + rate, + time_window +FROM + approx_rate; + ++------+---------------------+ +| rate | time_window | ++------+---------------------+ +| | 2022-01-01T00:00:00 | +| 0.0 | 2023-01-01T00:00:00 | ++------+---------------------+ + +-- reordered insert +INSERT INTO + bytes_log (ts, byte) +VALUES + ('2025-01-01 00:00:01', 101), + ('2025-01-01 00:00:29', 300); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('find_approx_rate') | ++--------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------+ + +SELECT + rate, + time_window +FROM + approx_rate; + ++-------------------+---------------------+ +| rate | time_window | ++-------------------+---------------------+ +| | 2022-01-01T00:00:00 | +| 0.0 | 2023-01-01T00:00:00 | +| 6.633333333333334 | 2025-01-01T00:00:00 | ++-------------------+---------------------+ + +-- reordered insert +INSERT INTO + bytes_log (ts, byte) +VALUES + ('2025-01-01 00:00:32', 450), + ('2025-01-01 00:00:37', 500); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('find_approx_rate') | ++--------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------+ + +SELECT + rate, + time_window +FROM + approx_rate; + ++--------------------+---------------------+ +| rate | time_window | ++--------------------+---------------------+ +| | 2022-01-01T00:00:00 | +| 0.0 | 2023-01-01T00:00:00 | +| 6.633333333333334 | 2025-01-01T00:00:00 | +| 1.6666666666666667 | 2025-01-01T00:00:30 | ++--------------------+---------------------+ + +DROP TABLE bytes_log; + +Affected Rows: 0 + +DROP FLOW find_approx_rate; + +Affected Rows: 0 + +DROP TABLE approx_rate; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_insert.sql b/tests/cases/standalone/common/flow/flow_insert.sql new file mode 100644 index 0000000000..e44b8d558c --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_insert.sql @@ -0,0 +1,96 @@ +-- test if reordered insert is correctly handled +CREATE TABLE bytes_log ( + byte INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + -- event time + TIME INDEX(ts) +); + +-- TODO(discord9): remove this after auto infer table's time index is impl +CREATE TABLE approx_rate ( + rate DOUBLE, + time_window TIMESTAMP, + update_at TIMESTAMP, + TIME INDEX(time_window) +); + +CREATE FLOW find_approx_rate SINK TO approx_rate AS +SELECT + (max(byte) - min(byte)) / 30.0 as rate, + date_bin(INTERVAL '30 second', ts) as time_window +from + bytes_log +GROUP BY + time_window; + +SHOW CREATE TABLE approx_rate; + +-- reordered insert, also test if null is handled correctly +INSERT INTO + bytes_log (ts, byte) +VALUES + ('2023-01-01 00:00:01', NULL), + ('2023-01-01 00:00:29', 300); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); + +SELECT + rate, + time_window +FROM + approx_rate; + +-- reordered insert, also test if null is handled correctly +INSERT INTO + bytes_log (ts, byte) +VALUES + ('2022-01-01 00:00:01', NULL), + ('2022-01-01 00:00:29', NULL); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); + +SELECT + rate, + time_window +FROM + approx_rate; + +-- reordered insert +INSERT INTO + bytes_log (ts, byte) +VALUES + ('2025-01-01 00:00:01', 101), + ('2025-01-01 00:00:29', 300); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); + +SELECT + rate, + time_window +FROM + approx_rate; + +-- reordered insert +INSERT INTO + bytes_log (ts, byte) +VALUES + ('2025-01-01 00:00:32', 450), + ('2025-01-01 00:00:37', 500); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('find_approx_rate'); + +SELECT + rate, + time_window +FROM + approx_rate; + +DROP TABLE bytes_log; + +DROP FLOW find_approx_rate; + +DROP TABLE approx_rate;