From 7caa88abc77e37d67607a3259ab8feb7dbe6ed21 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Fri, 3 Jan 2025 18:59:58 +0800 Subject: [PATCH] refactor: flow replace check&better error msg (#5277) * chore: better error msg * chore eof newline * refactor: move replace check to flow worker * chore: add ctx to insert flow failure * chore: Update src/flow/src/adapter/flownode_impl.rs * test: add order by for deterministic --------- Co-authored-by: Yingwen --- src/flow/src/adapter.rs | 44 +-------------- src/flow/src/adapter/flownode_impl.rs | 36 +++++++++--- src/flow/src/adapter/node_context.rs | 4 ++ src/flow/src/adapter/worker.rs | 26 +++++++-- src/flow/src/error.rs | 30 ++++++++-- src/flow/src/server.rs | 12 +++- .../alter/change_col_fulltext_options.result | 10 ++-- .../alter/change_col_fulltext_options.sql | 6 +- .../common/flow/flow_more_usecase.result | 56 +++++++++++++++++++ .../common/flow/flow_more_usecase.sql | 41 ++++++++++++++ .../common/flow/show_create_flow.result | 27 +++++++-- .../common/flow/show_create_flow.sql | 11 ++-- 12 files changed, 221 insertions(+), 82 deletions(-) create mode 100644 tests/cases/standalone/common/flow/flow_more_usecase.result create mode 100644 tests/cases/standalone/common/flow/flow_more_usecase.sql diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index a62799164d..085096fd95 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -50,10 +50,7 @@ use crate::adapter::util::relation_desc_to_column_schemas_with_fallback; use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; use crate::compute::ErrCollector; use crate::df_optimizer::sql_to_flow_plan; -use crate::error::{ - EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, InvalidQuerySnafu, - UnexpectedSnafu, -}; +use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu}; use crate::expr::Batch; use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS}; use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE}; @@ -727,43 +724,6 @@ impl FlowWorkerManager { query_ctx, } = args; - let already_exist = { - let mut flag = false; - - // check if the task already exists - for handle in self.worker_handles.iter() { - if handle.lock().await.contains_flow(flow_id).await? { - flag = true; - break; - } - } - flag - }; - match (create_if_not_exists, or_replace, already_exist) { - // do replace - (_, true, true) => { - info!("Replacing flow with id={}", flow_id); - self.remove_flow(flow_id).await?; - } - (false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?, - // do nothing if exists - (true, false, true) => { - info!("Flow with id={} already exists, do nothing", flow_id); - return Ok(None); - } - // create if not exists - (_, _, false) => (), - } - - if create_if_not_exists { - // check if the task already exists - for handle in self.worker_handles.iter() { - if handle.lock().await.contains_flow(flow_id).await? { - return Ok(None); - } - } - } - let mut node_ctx = self.node_context.write().await; // assign global id to source and sink table for source in &source_table_ids { @@ -877,9 +837,11 @@ impl FlowWorkerManager { source_ids, src_recvs: source_receivers, expire_after, + or_replace, create_if_not_exists, err_collector, }; + handle.create_flow(create_request).await?; info!("Successfully create flow with id={}", flow_id); Ok(Some(flow_id)) diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 826b527fe8..f46f0d5440 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -25,11 +25,11 @@ use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu}; use common_meta::node_manager::Flownode; use common_telemetry::{debug, trace}; use itertools::Itertools; -use snafu::{OptionExt, ResultExt}; +use snafu::{IntoError, OptionExt, ResultExt}; use store_api::storage::RegionId; use crate::adapter::{CreateFlowArgs, FlowWorkerManager}; -use crate::error::InternalSnafu; +use crate::error::{CreateFlowSnafu, InsertIntoFlowSnafu, InternalSnafu}; use crate::metrics::METRIC_FLOW_TASK_COUNT; use crate::repr::{self, DiffRow}; @@ -79,13 +79,15 @@ impl Flownode for FlowWorkerManager { or_replace, expire_after, comment: Some(comment), - sql, + sql: sql.clone(), flow_options, query_ctx, }; let ret = self .create_flow(args) .await + .map_err(BoxedError::new) + .with_context(|_| CreateFlowSnafu { sql: sql.clone() }) .map_err(to_meta_err(snafu::location!()))?; METRIC_FLOW_TASK_COUNT.inc(); Ok(FlowResponse { @@ -229,13 +231,29 @@ impl Flownode for FlowWorkerManager { }) .map(|r| (r, now, 1)) .collect_vec(); - - self.handle_write_request(region_id.into(), rows, &table_types) + if let Err(err) = self + .handle_write_request(region_id.into(), rows, &table_types) .await - .map_err(|err| { - common_telemetry::error!(err;"Failed to handle write request"); - to_meta_err(snafu::location!())(err) - })?; + { + let err = BoxedError::new(err); + let flow_ids = self + .node_context + .read() + .await + .get_flow_ids(table_id) + .into_iter() + .flatten() + .cloned() + .collect_vec(); + let err = InsertIntoFlowSnafu { + region_id, + flow_ids, + } + .into_error(err); + common_telemetry::error!(err; "Failed to handle write request"); + let err = to_meta_err(snafu::location!())(err); + return Err(err); + } } Ok(Default::default()) } diff --git a/src/flow/src/adapter/node_context.rs b/src/flow/src/adapter/node_context.rs index 02612b6f5a..1896534e86 100644 --- a/src/flow/src/adapter/node_context.rs +++ b/src/flow/src/adapter/node_context.rs @@ -71,6 +71,10 @@ impl FlownodeContext { query_context: Default::default(), } } + + pub fn get_flow_ids(&self, table_id: TableId) -> Option<&BTreeSet> { + self.source_to_tasks.get(&table_id) + } } /// a simple broadcast sender with backpressure, bounded capacity and blocking on send when send buf is full diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index 4a6b0ba963..46b4c0669a 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -247,15 +247,25 @@ impl<'s> Worker<'s> { src_recvs: Vec>, // TODO(discord9): set expire duration for all arrangement and compare to sys timestamp instead expire_after: Option, + or_replace: bool, create_if_not_exists: bool, err_collector: ErrCollector, ) -> Result, Error> { - let already_exists = self.task_states.contains_key(&flow_id); - match (already_exists, create_if_not_exists) { - (true, true) => return Ok(None), - (true, false) => FlowAlreadyExistSnafu { id: flow_id }.fail()?, - (false, _) => (), - }; + let already_exist = self.task_states.contains_key(&flow_id); + match (create_if_not_exists, or_replace, already_exist) { + // if replace, ignore that old flow exists + (_, true, true) => { + info!("Replacing flow with id={}", flow_id); + } + (false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?, + // already exists, and not replace, return None + (true, false, true) => { + info!("Flow with id={} already exists, do nothing", flow_id); + return Ok(None); + } + // continue as normal + (_, _, false) => (), + } let mut cur_task_state = ActiveDataflowState::<'s> { err_collector, @@ -341,6 +351,7 @@ impl<'s> Worker<'s> { source_ids, src_recvs, expire_after, + or_replace, create_if_not_exists, err_collector, } => { @@ -352,6 +363,7 @@ impl<'s> Worker<'s> { &source_ids, src_recvs, expire_after, + or_replace, create_if_not_exists, err_collector, ); @@ -398,6 +410,7 @@ pub enum Request { source_ids: Vec, src_recvs: Vec>, expire_after: Option, + or_replace: bool, create_if_not_exists: bool, err_collector: ErrCollector, }, @@ -547,6 +560,7 @@ mod test { source_ids: src_ids, src_recvs: vec![rx], expire_after: None, + or_replace: false, create_if_not_exists: true, err_collector: ErrCollector::default(), }; diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index 137e024307..0374e00506 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -32,6 +32,27 @@ use crate::expr::EvalError; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display( + "Failed to insert into flow: region_id={}, flow_ids={:?}", + region_id, + flow_ids + ))] + InsertIntoFlow { + region_id: u64, + flow_ids: Vec, + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Error encountered while creating flow: {sql}"))] + CreateFlow { + sql: String, + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("External error"))] External { source: BoxedError, @@ -207,16 +228,17 @@ pub type Result = std::result::Result; impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - Self::Eval { .. } | Self::JoinTask { .. } | Self::Datafusion { .. } => { - StatusCode::Internal - } + Self::Eval { .. } + | Self::JoinTask { .. } + | Self::Datafusion { .. } + | Self::InsertIntoFlow { .. } => StatusCode::Internal, Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists, Self::TableNotFound { .. } | Self::TableNotFoundMeta { .. } | Self::FlowNotFound { .. } | Self::ListFlows { .. } => StatusCode::TableNotFound, Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery, - Self::InvalidQuery { .. } => StatusCode::EngineExecuteQuery, + Self::InvalidQuery { .. } | Self::CreateFlow { .. } => StatusCode::EngineExecuteQuery, Self::Unexpected { .. } => StatusCode::Unexpected, Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => { StatusCode::Unsupported diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index d22ba22044..3ed8b7efa9 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -50,8 +50,8 @@ use tonic::{Request, Response, Status}; use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef}; use crate::error::{ - to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu, - ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, + to_status_with_last_err, CacheRequiredSnafu, CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, + ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, }; use crate::heartbeat::HeartbeatTask; use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS}; @@ -392,7 +392,13 @@ impl FlownodeBuilder { .build(), ), }; - manager.create_flow(args).await?; + manager + .create_flow(args) + .await + .map_err(BoxedError::new) + .with_context(|_| CreateFlowSnafu { + sql: info.raw_sql().clone(), + })?; } Ok(cnt) diff --git a/tests/cases/standalone/common/alter/change_col_fulltext_options.result b/tests/cases/standalone/common/alter/change_col_fulltext_options.result index 46bd510462..fa0443293c 100644 --- a/tests/cases/standalone/common/alter/change_col_fulltext_options.result +++ b/tests/cases/standalone/common/alter/change_col_fulltext_options.result @@ -32,7 +32,7 @@ INSERT INTO test VALUES ('hello', '2020-01-01 00:00:00'), Affected Rows: 4 -SELECT * FROM test WHERE MATCHES(message, 'hello'); +SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message; +-------------+---------------------+ | message | time | @@ -46,7 +46,7 @@ ALTER TABLE test MODIFY COLUMN message SET FULLTEXT WITH(analyzer = 'Chinese', c Affected Rows: 0 -SELECT * FROM test WHERE MATCHES(message, 'hello'); +SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message; +-------------+---------------------+ | message | time | @@ -63,15 +63,15 @@ INSERT INTO test VALUES ('hello NiKo', '2020-01-03 00:00:00'), Affected Rows: 4 -SELECT * FROM test WHERE MATCHES(message, 'hello'); +SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message; +-------------+---------------------+ | message | time | +-------------+---------------------+ -| hello NiKo | 2020-01-03T00:00:00 | | NiKo hello | 2020-01-03T00:00:01 | -| hello hello | 2020-01-04T00:00:00 | | hello | 2020-01-01T00:00:00 | +| hello NiKo | 2020-01-03T00:00:00 | +| hello hello | 2020-01-04T00:00:00 | | hello world | 2020-01-02T00:00:00 | | world hello | 2020-01-02T00:00:01 | +-------------+---------------------+ diff --git a/tests/cases/standalone/common/alter/change_col_fulltext_options.sql b/tests/cases/standalone/common/alter/change_col_fulltext_options.sql index 88c8b3b180..6197ba1dd1 100644 --- a/tests/cases/standalone/common/alter/change_col_fulltext_options.sql +++ b/tests/cases/standalone/common/alter/change_col_fulltext_options.sql @@ -13,18 +13,18 @@ INSERT INTO test VALUES ('hello', '2020-01-01 00:00:00'), ('hello world', '2020-01-02 00:00:00'), ('world hello', '2020-01-02 00:00:01'); -SELECT * FROM test WHERE MATCHES(message, 'hello'); +SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message; ALTER TABLE test MODIFY COLUMN message SET FULLTEXT WITH(analyzer = 'Chinese', case_sensitive = 'true'); -SELECT * FROM test WHERE MATCHES(message, 'hello'); +SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message; INSERT INTO test VALUES ('hello NiKo', '2020-01-03 00:00:00'), ('NiKo hello', '2020-01-03 00:00:01'), ('hello hello', '2020-01-04 00:00:00'), ('NiKo, NiKo', '2020-01-04 00:00:01'); -SELECT * FROM test WHERE MATCHES(message, 'hello'); +SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message; -- SQLNESS ARG restart=true SHOW CREATE TABLE test; diff --git a/tests/cases/standalone/common/flow/flow_more_usecase.result b/tests/cases/standalone/common/flow/flow_more_usecase.result new file mode 100644 index 0000000000..304589f6a5 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_more_usecase.result @@ -0,0 +1,56 @@ +CREATE TABLE `api_requests` ( + `timestamp` TIMESTAMP NOT NULL, + `request_id` STRING NOT NULL, + `upstream_id` STRING NOT NULL, + `application_id` STRING NULL, + `url` STRING NOT NULL, + `method` STRING NOT NULL, + `status_code` INTEGER NOT NULL, + `request_headers` JSON NULL, + `request_body` STRING NULL, + `response_headers` JSON NULL, + `response_body` STRING NULL, + `latency_ms` INTEGER NOT NULL, + `client_ip` STRING NULL, + `user_agent` STRING NULL, + TIME INDEX (`timestamp`) +) +WITH( + append_mode = 'true' +); + +Affected Rows: 0 + +CREATE TABLE api_request_volume_upstream_stats ( + `upstream_id` STRING NOT NULL, + `time_window` TIMESTAMP NOT NULL, + `request_count` BIGINT NOT NULL, + TIME INDEX (`time_window`) +); + +Affected Rows: 0 + +CREATE FLOW api_request_volume_by_upstream +SINK TO api_request_volume_upstream_stats +AS +SELECT + upstream_id, + date_bin(INTERVAL '1 hour', timestamp, '2024-01-01 00:00:00'::TimestampNanosecond) AS time_window, + COUNT(*) AS request_count +FROM api_requests +GROUP BY upstream_id, time_window; + +Affected Rows: 0 + +DROP FLOW api_request_volume_by_upstream; + +Affected Rows: 0 + +DROP TABLE api_request_volume_upstream_stats; + +Affected Rows: 0 + +DROP TABLE api_requests; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_more_usecase.sql b/tests/cases/standalone/common/flow/flow_more_usecase.sql new file mode 100644 index 0000000000..fb5dc12d54 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_more_usecase.sql @@ -0,0 +1,41 @@ +CREATE TABLE `api_requests` ( + `timestamp` TIMESTAMP NOT NULL, + `request_id` STRING NOT NULL, + `upstream_id` STRING NOT NULL, + `application_id` STRING NULL, + `url` STRING NOT NULL, + `method` STRING NOT NULL, + `status_code` INTEGER NOT NULL, + `request_headers` JSON NULL, + `request_body` STRING NULL, + `response_headers` JSON NULL, + `response_body` STRING NULL, + `latency_ms` INTEGER NOT NULL, + `client_ip` STRING NULL, + `user_agent` STRING NULL, + TIME INDEX (`timestamp`) +) +WITH( + append_mode = 'true' +); + +CREATE TABLE api_request_volume_upstream_stats ( + `upstream_id` STRING NOT NULL, + `time_window` TIMESTAMP NOT NULL, + `request_count` BIGINT NOT NULL, + TIME INDEX (`time_window`) +); + +CREATE FLOW api_request_volume_by_upstream +SINK TO api_request_volume_upstream_stats +AS +SELECT + upstream_id, + date_bin(INTERVAL '1 hour', timestamp, '2024-01-01 00:00:00'::TimestampNanosecond) AS time_window, + COUNT(*) AS request_count +FROM api_requests +GROUP BY upstream_id, time_window; + +DROP FLOW api_request_volume_by_upstream; +DROP TABLE api_request_volume_upstream_stats; +DROP TABLE api_requests; diff --git a/tests/cases/standalone/common/flow/show_create_flow.result b/tests/cases/standalone/common/flow/show_create_flow.result index 14e8012944..2a177bddee 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.result +++ b/tests/cases/standalone/common/flow/show_create_flow.result @@ -365,33 +365,48 @@ SELECT number FROM out_num_cnt_show; | 16 | +--------+ --- should mismatch +-- should mismatch, hence the old flow remains CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2 FROM numbers_input_show where number > 15; Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type) --- should mismatch +-- should mismatch, hence the old flow remains CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2, number AS n3 FROM numbers_input_show where number > 15; Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type) -INSERT INTO numbers_input_show VALUES (10, 6),(15, 7),(18, 3); +SELECT flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; -Affected Rows: 3 ++---------------------------------------------------------------+ +| flow_definition | ++---------------------------------------------------------------+ +| SELECT number AS n1 FROM numbers_input_show WHERE number > 10 | ++---------------------------------------------------------------+ + +INSERT INTO numbers_input_show VALUES (10, 6),(11, 8),(15, 7),(18, 3); + +Affected Rows: 4 -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('filter_numbers_show'); -Error: 1003(Internal), Internal error: 1003 ++-----------------------------------------+ +| ADMIN FLUSH_FLOW('filter_numbers_show') | ++-----------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------+ --- sink table stays the same since the flow error out due to column mismatch +-- sink table shows new 11 since old flow remains SELECT number FROM out_num_cnt_show; +--------+ | number | +--------+ +| 11 | +| 15 | | 15 | | 16 | +| 18 | +--------+ DROP FLOW filter_numbers_show; diff --git a/tests/cases/standalone/common/flow/show_create_flow.sql b/tests/cases/standalone/common/flow/show_create_flow.sql index f445c4f254..0c4110c196 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.sql +++ b/tests/cases/standalone/common/flow/show_create_flow.sql @@ -147,19 +147,20 @@ ADMIN FLUSH_FLOW('filter_numbers_show'); SELECT number FROM out_num_cnt_show; - --- should mismatch +-- should mismatch, hence the old flow remains CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2 FROM numbers_input_show where number > 15; --- should mismatch +-- should mismatch, hence the old flow remains CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2, number AS n3 FROM numbers_input_show where number > 15; -INSERT INTO numbers_input_show VALUES (10, 6),(15, 7),(18, 3); +SELECT flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; + +INSERT INTO numbers_input_show VALUES (10, 6),(11, 8),(15, 7),(18, 3); -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('filter_numbers_show'); --- sink table stays the same since the flow error out due to column mismatch +-- sink table shows new 11 since old flow remains SELECT number FROM out_num_cnt_show; DROP FLOW filter_numbers_show;