diff --git a/src/client/src/database.rs b/src/client/src/database.rs index c9dc9b08e5..863b6b6419 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -36,8 +36,8 @@ use common_grpc::flight::{FlightDecoder, FlightMessage}; use common_query::Output; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::RecordBatchStreamWrapper; -use common_telemetry::error; use common_telemetry::tracing_context::W3cTrace; +use common_telemetry::{error, warn}; use futures::future; use futures_util::{Stream, StreamExt, TryStreamExt}; use prost::Message; @@ -192,6 +192,36 @@ impl Database { from_grpc_response(response) } + /// Retry if connection fails, max_retries is the max number of retries, so the total wait time + /// is `max_retries * GRPC_CONN_TIMEOUT` + pub async fn handle_with_retry(&self, request: Request, max_retries: u32) -> Result { + let mut client = make_database_client(&self.client)?.inner; + let mut retries = 0; + let request = self.to_rpc_request(request); + loop { + let raw_response = client.handle(request.clone()).await; + match (raw_response, retries < max_retries) { + (Ok(resp), _) => return from_grpc_response(resp.into_inner()), + (Err(err), true) => { + // determine if the error is retryable + if is_grpc_retryable(&err) { + // retry + retries += 1; + warn!("Retrying {} times with error = {:?}", retries, err); + continue; + } + } + (Err(err), false) => { + error!( + "Failed to send request to grpc handle after {} retries, error = {:?}", + retries, err + ); + return Err(err.into()); + } + } + } + } + #[inline] fn to_rpc_request(&self, request: Request) -> GreptimeRequest { GreptimeRequest { @@ -368,6 +398,11 @@ impl Database { } } +/// by grpc standard, only `Unavailable` is retryable, see: https://github.com/grpc/grpc/blob/master/doc/statuscodes.md#status-codes-and-their-use-in-grpc +pub fn is_grpc_retryable(err: &tonic::Status) -> bool { + matches!(err.code(), tonic::Code::Unavailable) +} + #[derive(Default, Debug, Clone)] struct FlightContext { auth_header: Option, diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index c49cbb97ef..6abc88b6cb 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -37,11 +37,12 @@ use tokio::sync::{Mutex, RwLock}; use crate::adapter::{CreateFlowArgs, StreamingEngine}; use crate::batching_mode::engine::BatchingEngine; +use crate::batching_mode::{FRONTEND_SCAN_TIMEOUT, MIN_REFRESH_DURATION}; use crate::engine::FlowEngine; use crate::error::{ CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, IllegalCheckTaskStateSnafu, - InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, SyncCheckTaskSnafu, - UnexpectedSnafu, + InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, NoAvailableFrontendSnafu, + SyncCheckTaskSnafu, UnexpectedSnafu, }; use crate::metrics::METRIC_FLOW_TASK_COUNT; use crate::repr::{self, DiffRow}; @@ -81,6 +82,11 @@ impl FlowDualEngine { } } + /// Determine if the engine is in distributed mode + pub fn is_distributed(&self) -> bool { + self.streaming_engine.node_id.is_some() + } + pub fn streaming_engine(&self) -> Arc { self.streaming_engine.clone() } @@ -89,6 +95,39 @@ impl FlowDualEngine { self.batching_engine.clone() } + /// In distributed mode, scan periodically(1s) until available frontend is found, or timeout, + /// in standalone mode, return immediately + /// notice here if any frontend appear in cluster info this function will return immediately + async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> { + if !self.is_distributed() { + return Ok(()); + } + let frontend_client = self.batching_engine().frontend_client.clone(); + let sleep_duration = std::time::Duration::from_millis(1_000); + let now = std::time::Instant::now(); + loop { + let frontend_list = frontend_client.scan_for_frontend().await?; + if !frontend_list.is_empty() { + let fe_list = frontend_list + .iter() + .map(|(_, info)| &info.peer.addr) + .collect::>(); + info!("Available frontend found: {:?}", fe_list); + return Ok(()); + } + let elapsed = now.elapsed(); + tokio::time::sleep(sleep_duration).await; + info!("Waiting for available frontend, elapsed={:?}", elapsed); + if elapsed >= timeout { + return NoAvailableFrontendSnafu { + timeout, + context: "No available frontend found in cluster info", + } + .fail(); + } + } + } + /// Try to sync with check task, this is only used in drop flow&flush flow, so a flow id is required /// /// the need to sync is to make sure flush flow actually get called @@ -338,18 +377,36 @@ struct ConsistentCheckTask { impl ConsistentCheckTask { async fn start_check_task(engine: &Arc) -> Result { - // first do recover flows - engine.check_flow_consistent(true, false).await?; - - let inner = engine.clone(); + let engine = engine.clone(); let (tx, mut rx) = tokio::sync::mpsc::channel(1); let (trigger_tx, mut trigger_rx) = tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10); let handle = common_runtime::spawn_global(async move { + // first check if available frontend is found + if let Err(err) = engine + .wait_for_available_frontend(FRONTEND_SCAN_TIMEOUT) + .await + { + warn!("No frontend is available yet:\n {err:?}"); + } + + // then do recover flows, if failed, always retry + let mut recover_retry = 0; + while let Err(err) = engine.check_flow_consistent(true, false).await { + recover_retry += 1; + error!( + "Failed to recover flows:\n {err:?}, retry {} in {}s", + recover_retry, + MIN_REFRESH_DURATION.as_secs() + ); + tokio::time::sleep(MIN_REFRESH_DURATION).await; + } + + // then do check flows, with configurable allow_create and allow_drop let (mut allow_create, mut allow_drop) = (false, false); let mut ret_signal: Option> = None; loop { - if let Err(err) = inner.check_flow_consistent(allow_create, allow_drop).await { + if let Err(err) = engine.check_flow_consistent(allow_create, allow_drop).await { error!(err; "Failed to check flow consistent"); } if let Some(done) = ret_signal.take() { @@ -534,7 +591,12 @@ impl FlowEngine for FlowDualEngine { match flow_type { Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await, Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await, - None => Ok(0), + None => { + warn!( + "Currently flow={flow_id} doesn't exist in flownode, ignore flush_flow request" + ); + Ok(0) + } } } diff --git a/src/flow/src/batching_mode.rs b/src/flow/src/batching_mode.rs index 031c7aad4b..0e6f3bbb25 100644 --- a/src/flow/src/batching_mode.rs +++ b/src/flow/src/batching_mode.rs @@ -31,10 +31,19 @@ pub const DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs( pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60); /// The minimum duration between two queries execution by batching mode task -const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0); +pub const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0); /// Grpc connection timeout const GRPC_CONN_TIMEOUT: Duration = Duration::from_secs(5); /// Grpc max retry number const GRPC_MAX_RETRIES: u32 = 3; + +/// Flow wait for available frontend timeout, +/// if failed to find available frontend after FRONTEND_SCAN_TIMEOUT elapsed, return error +/// which should prevent flownode from starting +pub const FRONTEND_SCAN_TIMEOUT: Duration = Duration::from_secs(30); + +/// Frontend activity timeout +/// if frontend is down(not sending heartbeat) for more than FRONTEND_ACTIVITY_TIMEOUT, it will be removed from the list that flownode use to connect +pub const FRONTEND_ACTIVITY_TIMEOUT: Duration = Duration::from_secs(60); diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 6c667d56d5..fbfea7715c 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -49,7 +49,8 @@ use crate::{CreateFlowArgs, Error, FlowId, TableName}; pub struct BatchingEngine { tasks: RwLock>, shutdown_txs: RwLock>>, - frontend_client: Arc, + /// frontend client for insert request + pub(crate) frontend_client: Arc, flow_metadata_manager: FlowMetadataManagerRef, table_meta: TableMetadataManagerRef, catalog_manager: CatalogManagerRef, diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index 9f16ea07fa..1821369f06 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -15,6 +15,7 @@ //! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user use std::sync::{Arc, Weak}; +use std::time::SystemTime; use api::v1::greptime_request::Request; use api::v1::CreateTableExpr; @@ -26,15 +27,17 @@ use common_meta::peer::Peer; use common_meta::rpc::store::RangeRequest; use common_query::Output; use common_telemetry::warn; +use itertools::Itertools; use meta_client::client::MetaClient; use servers::query_handler::grpc::GrpcQueryHandler; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; use crate::batching_mode::{ - DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, GRPC_CONN_TIMEOUT, GRPC_MAX_RETRIES, + DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT, + GRPC_MAX_RETRIES, }; -use crate::error::{ExternalSnafu, InvalidRequestSnafu, UnexpectedSnafu}; +use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu}; use crate::Error; /// Just like [`GrpcQueryHandler`] but use BoxedError @@ -127,10 +130,24 @@ impl DatabaseWithPeer { fn new(database: Database, peer: Peer) -> Self { Self { database, peer } } + + /// Try sending a "SELECT 1" to the database + async fn try_select_one(&self) -> Result<(), Error> { + // notice here use `sql` for `SELECT 1` return 1 row + let _ = self + .database + .sql("SELECT 1") + .await + .with_context(|_| InvalidRequestSnafu { + context: format!("Failed to handle `SELECT 1` request at {:?}", self.peer), + })?; + Ok(()) + } } impl FrontendClient { - async fn scan_for_frontend(&self) -> Result, Error> { + /// scan for available frontend from metadata + pub(crate) async fn scan_for_frontend(&self) -> Result, Error> { let Self::Distributed { meta_client, .. } = self else { return Ok(vec![]); }; @@ -160,8 +177,8 @@ impl FrontendClient { Ok(res) } - /// Get the database with max `last_activity_ts` - async fn get_last_active_frontend( + /// Get the database with maximum `last_activity_ts`& is able to process query + async fn get_latest_active_frontend( &self, catalog: &str, schema: &str, @@ -177,22 +194,50 @@ impl FrontendClient { .fail(); }; - let frontends = self.scan_for_frontend().await?; - let mut peer = None; + let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT); + interval.tick().await; + for retry in 0..GRPC_MAX_RETRIES { + let frontends = self.scan_for_frontend().await?; + let now_in_ms = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as i64; - if let Some((_, val)) = frontends.iter().max_by_key(|(_, val)| val.last_activity_ts) { - peer = Some(val.peer.clone()); + // found node with maximum last_activity_ts + for (_, node_info) in frontends + .iter() + .sorted_by_key(|(_, node_info)| node_info.last_activity_ts) + .rev() + // filter out frontend that have been down for more than 1 min + .filter(|(_, node_info)| { + node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64 + > now_in_ms + }) + { + let addr = &node_info.peer.addr; + let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]); + let database = Database::new(catalog, schema, client); + let db = DatabaseWithPeer::new(database, node_info.peer.clone()); + match db.try_select_one().await { + Ok(_) => return Ok(db), + Err(e) => { + warn!( + "Failed to connect to frontend {} on retry={}: \n{e:?}", + addr, retry + ); + } + } + } + // no available frontend + // sleep and retry + interval.tick().await; } - let Some(peer) = peer else { - UnexpectedSnafu { - reason: format!("No frontend available: {:?}", frontends), - } - .fail()? - }; - let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![peer.addr.clone()]); - let database = Database::new(catalog, schema, client); - Ok(DatabaseWithPeer::new(database, peer)) + NoAvailableFrontendSnafu { + timeout: GRPC_CONN_TIMEOUT, + context: "No available frontend found that is able to process query", + } + .fail() } pub async fn create( @@ -222,38 +267,18 @@ impl FrontendClient { ) -> Result { match self { FrontendClient::Distributed { .. } => { - let db = self.get_last_active_frontend(catalog, schema).await?; + let db = self.get_latest_active_frontend(catalog, schema).await?; *peer_desc = Some(PeerDesc::Dist { peer: db.peer.clone(), }); - let mut retry = 0; - - loop { - let ret = db.database.handle(req.clone()).await.with_context(|_| { - InvalidRequestSnafu { - context: format!("Failed to handle request: {:?}", req), - } - }); - if let Err(err) = ret { - if retry < GRPC_MAX_RETRIES { - retry += 1; - warn!( - "Failed to send request to grpc handle at Peer={:?}, retry = {}, error = {:?}", - db.peer, retry, err - ); - continue; - } else { - common_telemetry::error!( - "Failed to send request to grpc handle at Peer={:?} after {} retries, error = {:?}", - db.peer, retry, err - ); - return Err(err); - } - } - return ret; - } + db.database + .handle_with_retry(req.clone(), GRPC_MAX_RETRIES) + .await + .with_context(|_| InvalidRequestSnafu { + context: format!("Failed to handle request at {:?}: {:?}", db.peer, req), + }) } FrontendClient::Standalone { database_client } => { let ctx = QueryContextBuilder::default() diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index 904a0a8fa7..06c776f9b5 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -61,6 +61,16 @@ pub enum Error { location: Location, }, + #[snafu(display( + "No available frontend found after timeout: {timeout:?}, context: {context}" + ))] + NoAvailableFrontend { + timeout: std::time::Duration, + context: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("External error"))] External { source: BoxedError, @@ -296,7 +306,8 @@ impl ErrorExt for Error { Self::Eval { .. } | Self::JoinTask { .. } | Self::Datafusion { .. } - | Self::InsertIntoFlow { .. } => StatusCode::Internal, + | Self::InsertIntoFlow { .. } + | Self::NoAvailableFrontend { .. } => StatusCode::Internal, Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists, Self::TableNotFound { .. } | Self::TableNotFoundMeta { .. } diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 46c0d93301..e82a20dd81 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -172,6 +172,8 @@ impl FlownodeServer { } /// Start the background task for streaming computation. + /// + /// Should be called only after heartbeat is establish, hence can get cluster info async fn start_workers(&self) -> Result<(), Error> { let manager_ref = self.inner.flow_service.dual_engine.clone(); let handle = manager_ref diff --git a/tests/cases/standalone/common/flow/flow_advance_ttl.result b/tests/cases/standalone/common/flow/flow_advance_ttl.result index 3f5a940977..6addda09bd 100644 --- a/tests/cases/standalone/common/flow/flow_advance_ttl.result +++ b/tests/cases/standalone/common/flow/flow_advance_ttl.result @@ -31,6 +31,15 @@ FROM Affected Rows: 0 -- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 3s INSERT INTO distinct_basic VALUES diff --git a/tests/cases/standalone/common/flow/flow_advance_ttl.sql b/tests/cases/standalone/common/flow/flow_advance_ttl.sql index 2691af2b0c..3b9b46b613 100644 --- a/tests/cases/standalone/common/flow/flow_advance_ttl.sql +++ b/tests/cases/standalone/common/flow/flow_advance_ttl.sql @@ -23,6 +23,9 @@ FROM distinct_basic; -- SQLNESS ARG restart=true +SELECT 1; + +-- SQLNESS SLEEP 3s INSERT INTO distinct_basic VALUES diff --git a/tests/cases/standalone/common/flow/flow_auto_sink_table.result b/tests/cases/standalone/common/flow/flow_auto_sink_table.result index de8a44fad7..f1d229e6e8 100644 --- a/tests/cases/standalone/common/flow/flow_auto_sink_table.result +++ b/tests/cases/standalone/common/flow/flow_auto_sink_table.result @@ -44,6 +44,15 @@ ADMIN FLUSH_FLOW('test_numbers_basic'); +----------------------------------------+ -- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 3s SHOW CREATE TABLE out_num_cnt_basic; +-------------------+--------------------------------------------------+ @@ -101,6 +110,16 @@ GROUP BY Affected Rows: 0 +-- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 3s SHOW CREATE TABLE out_num_cnt_basic; +-------------------+--------------------------------------------------+ @@ -118,6 +137,15 @@ SHOW CREATE TABLE out_num_cnt_basic; +-------------------+--------------------------------------------------+ -- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 3s SHOW CREATE FLOW test_numbers_basic; +--------------------+---------------------------------------------------------------------------------------+ diff --git a/tests/cases/standalone/common/flow/flow_auto_sink_table.sql b/tests/cases/standalone/common/flow/flow_auto_sink_table.sql index ca76ba767e..61790272b8 100644 --- a/tests/cases/standalone/common/flow/flow_auto_sink_table.sql +++ b/tests/cases/standalone/common/flow/flow_auto_sink_table.sql @@ -20,6 +20,9 @@ SHOW CREATE TABLE out_num_cnt_basic; ADMIN FLUSH_FLOW('test_numbers_basic'); -- SQLNESS ARG restart=true +SELECT 1; + +-- SQLNESS SLEEP 3s SHOW CREATE TABLE out_num_cnt_basic; SHOW CREATE FLOW test_numbers_basic; @@ -44,10 +47,16 @@ FROM numbers_input_basic GROUP BY ts; +-- SQLNESS ARG restart=true +SELECT 1; +-- SQLNESS SLEEP 3s SHOW CREATE TABLE out_num_cnt_basic; -- SQLNESS ARG restart=true +SELECT 1; + +-- SQLNESS SLEEP 3s SHOW CREATE FLOW test_numbers_basic; SHOW CREATE TABLE out_num_cnt_basic; diff --git a/tests/cases/standalone/common/flow/flow_basic.result b/tests/cases/standalone/common/flow/flow_basic.result index 5b4e6b32ab..f222a9c331 100644 --- a/tests/cases/standalone/common/flow/flow_basic.result +++ b/tests/cases/standalone/common/flow/flow_basic.result @@ -62,6 +62,15 @@ SHOW CREATE TABLE out_num_cnt_basic; +-------------------+--------------------------------------------------+ -- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 3s INSERT INTO numbers_input_basic VALUES @@ -206,6 +215,15 @@ SHOW CREATE TABLE out_basic; +-----------+---------------------------------------------+ -- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 3s INSERT INTO input_basic VALUES @@ -306,6 +324,15 @@ ADMIN FLUSH_FLOW('test_distinct_basic'); +-----------------------------------------+ -- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 3s INSERT INTO distinct_basic VALUES @@ -1665,6 +1692,15 @@ ADMIN FLUSH_FLOW('test_numbers_basic'); +----------------------------------------+ -- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 3s INSERT INTO numbers_input_basic VALUES diff --git a/tests/cases/standalone/common/flow/flow_basic.sql b/tests/cases/standalone/common/flow/flow_basic.sql index 32598927ab..92310f98c4 100644 --- a/tests/cases/standalone/common/flow/flow_basic.sql +++ b/tests/cases/standalone/common/flow/flow_basic.sql @@ -24,6 +24,9 @@ ADMIN FLUSH_FLOW('test_numbers_basic'); SHOW CREATE TABLE out_num_cnt_basic; -- SQLNESS ARG restart=true +SELECT 1; + +-- SQLNESS SLEEP 3s INSERT INTO numbers_input_basic VALUES @@ -91,6 +94,9 @@ FROM SHOW CREATE TABLE out_basic; -- SQLNESS ARG restart=true +SELECT 1; + +-- SQLNESS SLEEP 3s INSERT INTO input_basic VALUES @@ -130,6 +136,9 @@ SHOW CREATE TABLE out_distinct_basic; ADMIN FLUSH_FLOW('test_distinct_basic'); -- SQLNESS ARG restart=true +SELECT 1; + +-- SQLNESS SLEEP 3s INSERT INTO distinct_basic VALUES @@ -788,6 +797,9 @@ SHOW CREATE TABLE out_num_cnt_basic; ADMIN FLUSH_FLOW('test_numbers_basic'); -- SQLNESS ARG restart=true +SELECT 1; + +-- SQLNESS SLEEP 3s INSERT INTO numbers_input_basic VALUES diff --git a/tests/cases/standalone/common/flow/flow_rebuild.result b/tests/cases/standalone/common/flow/flow_rebuild.result index 521bcdb5ae..70946a1739 100644 --- a/tests/cases/standalone/common/flow/flow_rebuild.result +++ b/tests/cases/standalone/common/flow/flow_rebuild.result @@ -730,10 +730,21 @@ SELECT key FROM api_stats; +-----+ -- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 5s INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1); Affected Rows: 1 +-- wait more time so flownode have time to recover flows +-- SQLNESS SLEEP 5s -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('api_stats_flow'); diff --git a/tests/cases/standalone/common/flow/flow_rebuild.sql b/tests/cases/standalone/common/flow/flow_rebuild.sql index b2b6149761..170696ed26 100644 --- a/tests/cases/standalone/common/flow/flow_rebuild.sql +++ b/tests/cases/standalone/common/flow/flow_rebuild.sql @@ -399,8 +399,13 @@ ADMIN FLUSH_FLOW('api_stats_flow'); SELECT key FROM api_stats; -- SQLNESS ARG restart=true +SELECT 1; + +-- SQLNESS SLEEP 5s INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1); +-- wait more time so flownode have time to recover flows +-- SQLNESS SLEEP 5s -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('api_stats_flow'); diff --git a/tests/cases/standalone/common/flow/show_create_flow.result b/tests/cases/standalone/common/flow/show_create_flow.result index be58053824..2e28f09750 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.result +++ b/tests/cases/standalone/common/flow/show_create_flow.result @@ -263,6 +263,15 @@ SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORM -- makesure after recover should be the same -- SQLNESS ARG restart=true +SELECT 1; + ++----------+ +| Int64(1) | ++----------+ +| 1 | ++----------+ + +-- SQLNESS SLEEP 3s SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show'; +---------------------+---------------+-------------------------------------------------------------+------------------------------------+ diff --git a/tests/cases/standalone/common/flow/show_create_flow.sql b/tests/cases/standalone/common/flow/show_create_flow.sql index 1c5eed8c4e..15a6b13936 100644 --- a/tests/cases/standalone/common/flow/show_create_flow.sql +++ b/tests/cases/standalone/common/flow/show_create_flow.sql @@ -108,6 +108,9 @@ SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORM -- makesure after recover should be the same -- SQLNESS ARG restart=true +SELECT 1; + +-- SQLNESS SLEEP 3s SELECT flow_name, table_catalog, flow_definition, source_table_names FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name='filter_numbers_show';