refactor: flow replace check&better error msg (#5277)

* chore: better error msg

* chore eof newline

* refactor: move replace check to flow worker

* chore: add ctx to insert flow failure

* chore: Update src/flow/src/adapter/flownode_impl.rs

* test: add order by for deterministic

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
discord9
2025-01-03 18:59:58 +08:00
committed by Yingwen
parent eafb01dfff
commit 7caa88abc7
12 changed files with 221 additions and 82 deletions

View File

@@ -50,10 +50,7 @@ use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::ErrCollector;
use crate::df_optimizer::sql_to_flow_plan;
use crate::error::{
EvalSnafu, ExternalSnafu, FlowAlreadyExistSnafu, InternalSnafu, InvalidQuerySnafu,
UnexpectedSnafu,
};
use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
use crate::expr::Batch;
use crate::metrics::{METRIC_FLOW_INSERT_ELAPSED, METRIC_FLOW_ROWS, METRIC_FLOW_RUN_INTERVAL_MS};
use crate::repr::{self, DiffRow, RelationDesc, Row, BATCH_SIZE};
@@ -727,43 +724,6 @@ impl FlowWorkerManager {
query_ctx,
} = args;
let already_exist = {
let mut flag = false;
// check if the task already exists
for handle in self.worker_handles.iter() {
if handle.lock().await.contains_flow(flow_id).await? {
flag = true;
break;
}
}
flag
};
match (create_if_not_exists, or_replace, already_exist) {
// do replace
(_, true, true) => {
info!("Replacing flow with id={}", flow_id);
self.remove_flow(flow_id).await?;
}
(false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
// do nothing if exists
(true, false, true) => {
info!("Flow with id={} already exists, do nothing", flow_id);
return Ok(None);
}
// create if not exists
(_, _, false) => (),
}
if create_if_not_exists {
// check if the task already exists
for handle in self.worker_handles.iter() {
if handle.lock().await.contains_flow(flow_id).await? {
return Ok(None);
}
}
}
let mut node_ctx = self.node_context.write().await;
// assign global id to source and sink table
for source in &source_table_ids {
@@ -877,9 +837,11 @@ impl FlowWorkerManager {
source_ids,
src_recvs: source_receivers,
expire_after,
or_replace,
create_if_not_exists,
err_collector,
};
handle.create_flow(create_request).await?;
info!("Successfully create flow with id={}", flow_id);
Ok(Some(flow_id))

View File

@@ -25,11 +25,11 @@ use common_meta::error::{ExternalSnafu, Result, UnexpectedSnafu};
use common_meta::node_manager::Flownode;
use common_telemetry::{debug, trace};
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use snafu::{IntoError, OptionExt, ResultExt};
use store_api::storage::RegionId;
use crate::adapter::{CreateFlowArgs, FlowWorkerManager};
use crate::error::InternalSnafu;
use crate::error::{CreateFlowSnafu, InsertIntoFlowSnafu, InternalSnafu};
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow};
@@ -79,13 +79,15 @@ impl Flownode for FlowWorkerManager {
or_replace,
expire_after,
comment: Some(comment),
sql,
sql: sql.clone(),
flow_options,
query_ctx,
};
let ret = self
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu { sql: sql.clone() })
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.inc();
Ok(FlowResponse {
@@ -229,13 +231,29 @@ impl Flownode for FlowWorkerManager {
})
.map(|r| (r, now, 1))
.collect_vec();
self.handle_write_request(region_id.into(), rows, &table_types)
if let Err(err) = self
.handle_write_request(region_id.into(), rows, &table_types)
.await
.map_err(|err| {
common_telemetry::error!(err;"Failed to handle write request");
to_meta_err(snafu::location!())(err)
})?;
{
let err = BoxedError::new(err);
let flow_ids = self
.node_context
.read()
.await
.get_flow_ids(table_id)
.into_iter()
.flatten()
.cloned()
.collect_vec();
let err = InsertIntoFlowSnafu {
region_id,
flow_ids,
}
.into_error(err);
common_telemetry::error!(err; "Failed to handle write request");
let err = to_meta_err(snafu::location!())(err);
return Err(err);
}
}
Ok(Default::default())
}

View File

@@ -71,6 +71,10 @@ impl FlownodeContext {
query_context: Default::default(),
}
}
pub fn get_flow_ids(&self, table_id: TableId) -> Option<&BTreeSet<FlowId>> {
self.source_to_tasks.get(&table_id)
}
}
/// a simple broadcast sender with backpressure, bounded capacity and blocking on send when send buf is full

View File

@@ -247,15 +247,25 @@ impl<'s> Worker<'s> {
src_recvs: Vec<broadcast::Receiver<Batch>>,
// TODO(discord9): set expire duration for all arrangement and compare to sys timestamp instead
expire_after: Option<repr::Duration>,
or_replace: bool,
create_if_not_exists: bool,
err_collector: ErrCollector,
) -> Result<Option<FlowId>, Error> {
let already_exists = self.task_states.contains_key(&flow_id);
match (already_exists, create_if_not_exists) {
(true, true) => return Ok(None),
(true, false) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
(false, _) => (),
};
let already_exist = self.task_states.contains_key(&flow_id);
match (create_if_not_exists, or_replace, already_exist) {
// if replace, ignore that old flow exists
(_, true, true) => {
info!("Replacing flow with id={}", flow_id);
}
(false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
// already exists, and not replace, return None
(true, false, true) => {
info!("Flow with id={} already exists, do nothing", flow_id);
return Ok(None);
}
// continue as normal
(_, _, false) => (),
}
let mut cur_task_state = ActiveDataflowState::<'s> {
err_collector,
@@ -341,6 +351,7 @@ impl<'s> Worker<'s> {
source_ids,
src_recvs,
expire_after,
or_replace,
create_if_not_exists,
err_collector,
} => {
@@ -352,6 +363,7 @@ impl<'s> Worker<'s> {
&source_ids,
src_recvs,
expire_after,
or_replace,
create_if_not_exists,
err_collector,
);
@@ -398,6 +410,7 @@ pub enum Request {
source_ids: Vec<GlobalId>,
src_recvs: Vec<broadcast::Receiver<Batch>>,
expire_after: Option<repr::Duration>,
or_replace: bool,
create_if_not_exists: bool,
err_collector: ErrCollector,
},
@@ -547,6 +560,7 @@ mod test {
source_ids: src_ids,
src_recvs: vec![rx],
expire_after: None,
or_replace: false,
create_if_not_exists: true,
err_collector: ErrCollector::default(),
};

View File

@@ -32,6 +32,27 @@ use crate::expr::EvalError;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display(
"Failed to insert into flow: region_id={}, flow_ids={:?}",
region_id,
flow_ids
))]
InsertIntoFlow {
region_id: u64,
flow_ids: Vec<u64>,
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Error encountered while creating flow: {sql}"))]
CreateFlow {
sql: String,
source: BoxedError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("External error"))]
External {
source: BoxedError,
@@ -207,16 +228,17 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Self::Eval { .. } | Self::JoinTask { .. } | Self::Datafusion { .. } => {
StatusCode::Internal
}
Self::Eval { .. }
| Self::JoinTask { .. }
| Self::Datafusion { .. }
| Self::InsertIntoFlow { .. } => StatusCode::Internal,
Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Self::TableNotFound { .. }
| Self::TableNotFoundMeta { .. }
| Self::FlowNotFound { .. }
| Self::ListFlows { .. } => StatusCode::TableNotFound,
Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery,
Self::InvalidQuery { .. } => StatusCode::EngineExecuteQuery,
Self::InvalidQuery { .. } | Self::CreateFlow { .. } => StatusCode::EngineExecuteQuery,
Self::Unexpected { .. } => StatusCode::Unexpected,
Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } => {
StatusCode::Unsupported

View File

@@ -50,8 +50,8 @@ use tonic::{Request, Response, Status};
use crate::adapter::{CreateFlowArgs, FlowWorkerManagerRef};
use crate::error::{
to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, FlowNotFoundSnafu, ListFlowsSnafu,
ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
to_status_with_last_err, CacheRequiredSnafu, CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu,
ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
};
use crate::heartbeat::HeartbeatTask;
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
@@ -392,7 +392,13 @@ impl FlownodeBuilder {
.build(),
),
};
manager.create_flow(args).await?;
manager
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu {
sql: info.raw_sql().clone(),
})?;
}
Ok(cnt)

View File

@@ -32,7 +32,7 @@ INSERT INTO test VALUES ('hello', '2020-01-01 00:00:00'),
Affected Rows: 4
SELECT * FROM test WHERE MATCHES(message, 'hello');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;
+-------------+---------------------+
| message | time |
@@ -46,7 +46,7 @@ ALTER TABLE test MODIFY COLUMN message SET FULLTEXT WITH(analyzer = 'Chinese', c
Affected Rows: 0
SELECT * FROM test WHERE MATCHES(message, 'hello');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;
+-------------+---------------------+
| message | time |
@@ -63,15 +63,15 @@ INSERT INTO test VALUES ('hello NiKo', '2020-01-03 00:00:00'),
Affected Rows: 4
SELECT * FROM test WHERE MATCHES(message, 'hello');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;
+-------------+---------------------+
| message | time |
+-------------+---------------------+
| hello NiKo | 2020-01-03T00:00:00 |
| NiKo hello | 2020-01-03T00:00:01 |
| hello hello | 2020-01-04T00:00:00 |
| hello | 2020-01-01T00:00:00 |
| hello NiKo | 2020-01-03T00:00:00 |
| hello hello | 2020-01-04T00:00:00 |
| hello world | 2020-01-02T00:00:00 |
| world hello | 2020-01-02T00:00:01 |
+-------------+---------------------+

View File

@@ -13,18 +13,18 @@ INSERT INTO test VALUES ('hello', '2020-01-01 00:00:00'),
('hello world', '2020-01-02 00:00:00'),
('world hello', '2020-01-02 00:00:01');
SELECT * FROM test WHERE MATCHES(message, 'hello');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;
ALTER TABLE test MODIFY COLUMN message SET FULLTEXT WITH(analyzer = 'Chinese', case_sensitive = 'true');
SELECT * FROM test WHERE MATCHES(message, 'hello');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;
INSERT INTO test VALUES ('hello NiKo', '2020-01-03 00:00:00'),
('NiKo hello', '2020-01-03 00:00:01'),
('hello hello', '2020-01-04 00:00:00'),
('NiKo, NiKo', '2020-01-04 00:00:01');
SELECT * FROM test WHERE MATCHES(message, 'hello');
SELECT * FROM test WHERE MATCHES(message, 'hello') ORDER BY message;
-- SQLNESS ARG restart=true
SHOW CREATE TABLE test;

View File

@@ -0,0 +1,56 @@
CREATE TABLE `api_requests` (
`timestamp` TIMESTAMP NOT NULL,
`request_id` STRING NOT NULL,
`upstream_id` STRING NOT NULL,
`application_id` STRING NULL,
`url` STRING NOT NULL,
`method` STRING NOT NULL,
`status_code` INTEGER NOT NULL,
`request_headers` JSON NULL,
`request_body` STRING NULL,
`response_headers` JSON NULL,
`response_body` STRING NULL,
`latency_ms` INTEGER NOT NULL,
`client_ip` STRING NULL,
`user_agent` STRING NULL,
TIME INDEX (`timestamp`)
)
WITH(
append_mode = 'true'
);
Affected Rows: 0
CREATE TABLE api_request_volume_upstream_stats (
`upstream_id` STRING NOT NULL,
`time_window` TIMESTAMP NOT NULL,
`request_count` BIGINT NOT NULL,
TIME INDEX (`time_window`)
);
Affected Rows: 0
CREATE FLOW api_request_volume_by_upstream
SINK TO api_request_volume_upstream_stats
AS
SELECT
upstream_id,
date_bin(INTERVAL '1 hour', timestamp, '2024-01-01 00:00:00'::TimestampNanosecond) AS time_window,
COUNT(*) AS request_count
FROM api_requests
GROUP BY upstream_id, time_window;
Affected Rows: 0
DROP FLOW api_request_volume_by_upstream;
Affected Rows: 0
DROP TABLE api_request_volume_upstream_stats;
Affected Rows: 0
DROP TABLE api_requests;
Affected Rows: 0

View File

@@ -0,0 +1,41 @@
CREATE TABLE `api_requests` (
`timestamp` TIMESTAMP NOT NULL,
`request_id` STRING NOT NULL,
`upstream_id` STRING NOT NULL,
`application_id` STRING NULL,
`url` STRING NOT NULL,
`method` STRING NOT NULL,
`status_code` INTEGER NOT NULL,
`request_headers` JSON NULL,
`request_body` STRING NULL,
`response_headers` JSON NULL,
`response_body` STRING NULL,
`latency_ms` INTEGER NOT NULL,
`client_ip` STRING NULL,
`user_agent` STRING NULL,
TIME INDEX (`timestamp`)
)
WITH(
append_mode = 'true'
);
CREATE TABLE api_request_volume_upstream_stats (
`upstream_id` STRING NOT NULL,
`time_window` TIMESTAMP NOT NULL,
`request_count` BIGINT NOT NULL,
TIME INDEX (`time_window`)
);
CREATE FLOW api_request_volume_by_upstream
SINK TO api_request_volume_upstream_stats
AS
SELECT
upstream_id,
date_bin(INTERVAL '1 hour', timestamp, '2024-01-01 00:00:00'::TimestampNanosecond) AS time_window,
COUNT(*) AS request_count
FROM api_requests
GROUP BY upstream_id, time_window;
DROP FLOW api_request_volume_by_upstream;
DROP TABLE api_request_volume_upstream_stats;
DROP TABLE api_requests;

View File

@@ -365,33 +365,48 @@ SELECT number FROM out_num_cnt_show;
| 16 |
+--------+
-- should mismatch
-- should mismatch, hence the old flow remains
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2 FROM numbers_input_show where number > 15;
Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type)
-- should mismatch
-- should mismatch, hence the old flow remains
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2, number AS n3 FROM numbers_input_show where number > 15;
Error: 3001(EngineExecuteQuery), Invalid query: Column 1(name is 'ts', flow inferred name is 'n2')'s data type mismatch, expect Timestamp(Millisecond(TimestampMillisecondType)) got Int32(Int32Type)
INSERT INTO numbers_input_show VALUES (10, 6),(15, 7),(18, 3);
SELECT flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
Affected Rows: 3
+---------------------------------------------------------------+
| flow_definition |
+---------------------------------------------------------------+
| SELECT number AS n1 FROM numbers_input_show WHERE number > 10 |
+---------------------------------------------------------------+
INSERT INTO numbers_input_show VALUES (10, 6),(11, 8),(15, 7),(18, 3);
Affected Rows: 4
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_numbers_show');
Error: 1003(Internal), Internal error: 1003
+-----------------------------------------+
| ADMIN FLUSH_FLOW('filter_numbers_show') |
+-----------------------------------------+
| FLOW_FLUSHED |
+-----------------------------------------+
-- sink table stays the same since the flow error out due to column mismatch
-- sink table shows new 11 since old flow remains
SELECT number FROM out_num_cnt_show;
+--------+
| number |
+--------+
| 11 |
| 15 |
| 15 |
| 16 |
| 18 |
+--------+
DROP FLOW filter_numbers_show;

View File

@@ -147,19 +147,20 @@ ADMIN FLUSH_FLOW('filter_numbers_show');
SELECT number FROM out_num_cnt_show;
-- should mismatch
-- should mismatch, hence the old flow remains
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2 FROM numbers_input_show where number > 15;
-- should mismatch
-- should mismatch, hence the old flow remains
CREATE OR REPLACE FLOW filter_numbers_show SINK TO out_num_cnt_show AS SELECT number AS n1, number AS n2, number AS n3 FROM numbers_input_show where number > 15;
INSERT INTO numbers_input_show VALUES (10, 6),(15, 7),(18, 3);
SELECT flow_definition FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';
INSERT INTO numbers_input_show VALUES (10, 6),(11, 8),(15, 7),(18, 3);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('filter_numbers_show');
-- sink table stays the same since the flow error out due to column mismatch
-- sink table shows new 11 since old flow remains
SELECT number FROM out_num_cnt_show;
DROP FLOW filter_numbers_show;