diff --git a/Cargo.lock b/Cargo.lock index c127c0ab38..4728d300d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4436,6 +4436,7 @@ dependencies = [ "prometheus", "prost 0.13.5", "query", + "rand 0.9.0", "serde", "serde_json", "servers", diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index c416910440..a18a67d3b3 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -60,6 +60,7 @@ partition.workspace = true prometheus.workspace = true prost.workspace = true query.workspace = true +rand.workspace = true serde.workspace = true servers.workspace = true session.workspace = true diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 6abc88b6cb..054190a693 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -14,6 +14,7 @@ //! impl `FlowNode` trait for FlowNodeManager so standalone can call them use std::collections::{HashMap, HashSet}; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use api::v1::flow::{ @@ -40,9 +41,9 @@ use crate::batching_mode::engine::BatchingEngine; use crate::batching_mode::{FRONTEND_SCAN_TIMEOUT, MIN_REFRESH_DURATION}; use crate::engine::FlowEngine; use crate::error::{ - CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, IllegalCheckTaskStateSnafu, - InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, NoAvailableFrontendSnafu, - SyncCheckTaskSnafu, UnexpectedSnafu, + CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, FlowNotRecoveredSnafu, + IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, + NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu, }; use crate::metrics::METRIC_FLOW_TASK_COUNT; use crate::repr::{self, DiffRow}; @@ -63,6 +64,7 @@ pub struct FlowDualEngine { flow_metadata_manager: Arc, catalog_manager: Arc, check_task: tokio::sync::Mutex>, + done_recovering: AtomicBool, } impl FlowDualEngine { @@ -79,9 +81,55 @@ impl FlowDualEngine { flow_metadata_manager, catalog_manager, check_task: Mutex::new(None), + done_recovering: AtomicBool::new(false), } } + /// Set `done_recovering` to true + /// indicate that we are ready to handle requests + pub fn set_done_recovering(&self) { + info!("FlowDualEngine done recovering"); + self.done_recovering + .store(true, std::sync::atomic::Ordering::Release); + } + + /// Check if `done_recovering` is true + pub fn is_recover_done(&self) -> bool { + self.done_recovering + .load(std::sync::atomic::Ordering::Acquire) + } + + /// wait for recovering to be done, this will only happen when flownode just started + async fn wait_for_all_flow_recover(&self, waiting_req_cnt: usize) -> Result<(), Error> { + if self.is_recover_done() { + return Ok(()); + } + + warn!( + "FlowDualEngine is not done recovering, {} insert request waiting for recovery", + waiting_req_cnt + ); + // wait 3 seconds, check every 1 second + // TODO(discord9): make this configurable + let mut retry = 0; + let max_retry = 3; + while retry < max_retry && !self.is_recover_done() { + warn!( + "FlowDualEngine is not done recovering, retry {} in 1s", + retry + ); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + retry += 1; + } + if retry == max_retry { + return FlowNotRecoveredSnafu.fail(); + } else { + info!("FlowDualEngine is done recovering"); + } + // TODO(discord9): also put to centralized logging for flow once it implemented + Ok(()) + } + /// Determine if the engine is in distributed mode pub fn is_distributed(&self) -> bool { self.streaming_engine.node_id.is_some() @@ -235,7 +283,7 @@ impl FlowDualEngine { to_be_created ); let mut errors = vec![]; - for flow_id in to_be_created { + for flow_id in to_be_created.clone() { let flow_id = *flow_id; let info = self .flow_metadata_manager @@ -294,6 +342,10 @@ impl FlowDualEngine { errors.push((flow_id, err)); } } + if errors.is_empty() { + info!("Recover flows successfully, flows: {:?}", to_be_created); + } + for (flow_id, err) in errors { warn!("Failed to recreate flow {}, err={:#?}", flow_id, err); } @@ -402,6 +454,8 @@ impl ConsistentCheckTask { tokio::time::sleep(MIN_REFRESH_DURATION).await; } + engine.set_done_recovering(); + // then do check flows, with configurable allow_create and allow_drop let (mut allow_create, mut allow_drop) = (false, false); let mut ret_signal: Option> = None; @@ -621,11 +675,14 @@ impl FlowEngine for FlowDualEngine { &self, request: api::v1::region::InsertRequests, ) -> Result<(), Error> { + self.wait_for_all_flow_recover(request.requests.len()) + .await?; // TODO(discord9): make as little clone as possible let mut to_stream_engine = Vec::with_capacity(request.requests.len()); let mut to_batch_engine = request.requests; { + // not locking this, or recover flows will be starved when also handling flow inserts let src_table2flow = self.src_table2flow.read().await; to_batch_engine.retain(|req| { let region_id = RegionId::from(req.region_id); diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index fbfea7715c..3a376a35f9 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -330,7 +330,7 @@ impl BatchingEngine { let frontend = self.frontend_client.clone(); // check execute once first to detect any error early - task.check_execute(&engine, &frontend).await?; + task.check_or_create_sink_table(&engine, &frontend).await?; // TODO(discord9): use time wheel or what for better let handle = common_runtime::spawn_global(async move { diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index 1821369f06..0503d2e5b3 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -27,8 +27,9 @@ use common_meta::peer::Peer; use common_meta::rpc::store::RangeRequest; use common_query::Output; use common_telemetry::warn; -use itertools::Itertools; use meta_client::client::MetaClient; +use rand::rng; +use rand::seq::SliceRandom; use servers::query_handler::grpc::GrpcQueryHandler; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; @@ -177,8 +178,9 @@ impl FrontendClient { Ok(res) } - /// Get the database with maximum `last_activity_ts`& is able to process query - async fn get_latest_active_frontend( + /// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts` + /// and is able to process query + async fn get_random_active_frontend( &self, catalog: &str, schema: &str, @@ -197,17 +199,17 @@ impl FrontendClient { let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT); interval.tick().await; for retry in 0..GRPC_MAX_RETRIES { - let frontends = self.scan_for_frontend().await?; + let mut frontends = self.scan_for_frontend().await?; let now_in_ms = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_millis() as i64; + // shuffle the frontends to avoid always pick the same one + frontends.shuffle(&mut rng()); // found node with maximum last_activity_ts for (_, node_info) in frontends .iter() - .sorted_by_key(|(_, node_info)| node_info.last_activity_ts) - .rev() // filter out frontend that have been down for more than 1 min .filter(|(_, node_info)| { node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64 @@ -267,7 +269,7 @@ impl FrontendClient { ) -> Result { match self { FrontendClient::Distributed { .. } => { - let db = self.get_latest_active_frontend(catalog, schema).await?; + let db = self.get_random_active_frontend(catalog, schema).await?; *peer_desc = Some(PeerDesc::Dist { peer: db.peer.clone(), diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index bb1f296c90..7bdcf90069 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -142,26 +142,12 @@ impl BatchingTask { Ok(()) } - /// Test execute, for check syntax or such - pub async fn check_execute( + /// Create sink table if not exists + pub async fn check_or_create_sink_table( &self, engine: &QueryEngineRef, frontend_client: &Arc, ) -> Result, Error> { - // use current time to test get a dirty time window, which should be safe - let start = SystemTime::now(); - let ts = Timestamp::new_second( - start - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_secs() as _, - ); - self.state - .write() - .unwrap() - .dirty_time_windows - .add_lower_bounds(vec![ts].into_iter()); - if !self.is_table_exist(&self.config.sink_table_name).await? { let create_table = self.gen_create_table_expr(engine.clone()).await?; info!( @@ -174,7 +160,8 @@ impl BatchingTask { self.config.sink_table_name.join(".") ); } - self.gen_exec_once(engine, frontend_client).await + + Ok(None) } async fn is_table_exist(&self, table_name: &[String; 3]) -> Result { diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index 06c776f9b5..af4d384a65 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -46,6 +46,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Flow engine is still recovering"))] + FlowNotRecovered { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Error encountered while creating flow: {sql}"))] CreateFlow { sql: String, @@ -307,7 +313,8 @@ impl ErrorExt for Error { | Self::JoinTask { .. } | Self::Datafusion { .. } | Self::InsertIntoFlow { .. } - | Self::NoAvailableFrontend { .. } => StatusCode::Internal, + | Self::NoAvailableFrontend { .. } + | Self::FlowNotRecovered { .. } => StatusCode::Internal, Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists, Self::TableNotFound { .. } | Self::TableNotFoundMeta { .. } diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index e82a20dd81..53710f5048 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -43,7 +43,7 @@ use servers::error::{StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu}; use servers::http::HttpServerBuilder; use servers::metrics_handler::MetricsHandler; use servers::server::{ServerHandler, ServerHandlers}; -use session::context::{QueryContextBuilder, QueryContextRef}; +use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; use tokio::net::TcpListener; use tokio::sync::{broadcast, oneshot, Mutex}; @@ -54,18 +54,18 @@ use tonic::{Request, Response, Status}; use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef}; use crate::adapter::{create_worker, FlowStreamingEngineRef}; use crate::batching_mode::engine::BatchingEngine; -use crate::engine::FlowEngine; use crate::error::{ - to_status_with_last_err, CacheRequiredSnafu, CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, - ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, + to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, ListFlowsSnafu, ParseAddrSnafu, + ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, }; use crate::heartbeat::HeartbeatTask; use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS}; use crate::transform::register_function_to_query_engine; use crate::utils::{SizeReportSender, StateReportHandler}; -use crate::{CreateFlowArgs, Error, FlownodeOptions, FrontendClient, StreamingEngine}; +use crate::{Error, FlownodeOptions, FrontendClient, StreamingEngine}; pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER"; + /// wrapping flow node manager to avoid orphan rule with Arc<...> #[derive(Clone)] pub struct FlowService { @@ -397,109 +397,6 @@ impl FlownodeBuilder { Ok(instance) } - /// recover all flow tasks in this flownode in distributed mode(nodeid is Some()) - /// - /// or recover all existing flow tasks if in standalone mode(nodeid is None) - /// - /// TODO(discord9): persistent flow tasks with internal state - async fn recover_flows(&self, manager: &FlowDualEngine) -> Result { - let nodeid = self.opts.node_id; - let to_be_recovered: Vec<_> = if let Some(nodeid) = nodeid { - let to_be_recover = self - .flow_metadata_manager - .flownode_flow_manager() - .flows(nodeid) - .try_collect::>() - .await - .context(ListFlowsSnafu { id: Some(nodeid) })?; - to_be_recover.into_iter().map(|(id, _)| id).collect() - } else { - let all_catalogs = self - .catalog_manager - .catalog_names() - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - let mut all_flow_ids = vec![]; - for catalog in all_catalogs { - let flows = self - .flow_metadata_manager - .flow_name_manager() - .flow_names(&catalog) - .await - .try_collect::>() - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - - all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id())); - } - all_flow_ids - }; - let cnt = to_be_recovered.len(); - - // TODO(discord9): recover in parallel - info!("Recovering {} flows: {:?}", cnt, to_be_recovered); - for flow_id in to_be_recovered { - let info = self - .flow_metadata_manager - .flow_info_manager() - .get(flow_id) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)? - .context(FlowNotFoundSnafu { id: flow_id })?; - - let sink_table_name = [ - info.sink_table_name().catalog_name.clone(), - info.sink_table_name().schema_name.clone(), - info.sink_table_name().table_name.clone(), - ]; - - let args = CreateFlowArgs { - flow_id: flow_id as _, - sink_table_name, - source_table_ids: info.source_table_ids().to_vec(), - // because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist) - // but for the sake of consistency and to make sure recover of flow actually happen, we set both to true - // (which is also fine since checks for not allow both to be true is on metasrv and we already pass that) - create_if_not_exists: true, - or_replace: true, - expire_after: info.expire_after(), - comment: Some(info.comment().clone()), - sql: info.raw_sql().clone(), - flow_options: info.options().clone(), - query_ctx: info - .query_context() - .clone() - .map(|ctx| { - ctx.try_into() - .map_err(BoxedError::new) - .context(ExternalSnafu) - }) - .transpose()? - // or use default QueryContext with catalog_name from info - // to keep compatibility with old version - .or_else(|| { - Some( - QueryContextBuilder::default() - .current_catalog(info.catalog_name().to_string()) - .build(), - ) - }), - }; - manager - .create_flow(args) - .await - .map_err(BoxedError::new) - .with_context(|_| CreateFlowSnafu { - sql: info.raw_sql().clone(), - })?; - } - - Ok(cnt) - } - /// build [`FlowWorkerManager`], note this doesn't take ownership of `self`, /// nor does it actually start running the worker. async fn build_manager(