From 5fd5b91b29561c5c5409d4f80d273f855e1d593e Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Mon, 8 Jun 2026 16:15:19 +0800 Subject: [PATCH] chore: remove auth in flownode (#8244) * chore: remove auth in flownode Signed-off-by: shuiyisong * chore: update docs Signed-off-by: shuiyisong * chore: add flow startup check Signed-off-by: shuiyisong * fix: clippy Signed-off-by: shuiyisong --------- Signed-off-by: shuiyisong --- Cargo.lock | 3 +- config/config.md | 1 - config/flownode.example.toml | 5 - src/cmd/src/flownode.rs | 23 +- src/cmd/tests/load_config_test.rs | 1 - src/flow/Cargo.toml | 3 +- src/flow/src/adapter.rs | 2 - src/flow/src/adapter/flownode_impl.rs | 37 +-- src/flow/src/batching_mode/frontend_client.rs | 247 ++++++++++++++++-- src/flow/src/engine.rs | 36 --- src/flow/src/error.rs | 4 - src/flow/src/lib.rs | 2 - src/flow/src/server.rs | 103 ++++++-- 13 files changed, 350 insertions(+), 117 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 646e793c64..85a2585282 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5288,10 +5288,10 @@ version = "1.1.0" dependencies = [ "api", "arrow 58.3.0", + "arrow-flight", "arrow-schema 58.3.0", "async-recursion", "async-trait", - "auth", "bytes", "cache", "catalog", @@ -5356,6 +5356,7 @@ dependencies = [ "substrait 1.1.0", "table", "tokio", + "tokio-stream", "tonic 0.14.2", ] diff --git a/config/config.md b/config/config.md index df06d2153c..6006a34eec 100644 --- a/config/config.md +++ b/config/config.md @@ -619,7 +619,6 @@ | Key | Type | Default | Descriptions | | --- | -----| ------- | ----------- | | `node_id` | Integer | Unset | The flownode identifier and should be unique in the cluster. | -| `user_provider` | String | Unset | The user provider for authentication.
Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd" | | `flow` | -- | -- | flow engine options. | | `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.
Not setting(or set to 0) this value will use the number of CPU cores divided by 2. | | `flow.batching_mode` | -- | -- | -- | diff --git a/config/flownode.example.toml b/config/flownode.example.toml index ff8a9e4a50..f8f7f1a779 100644 --- a/config/flownode.example.toml +++ b/config/flownode.example.toml @@ -2,11 +2,6 @@ ## @toml2docs:none-default node_id = 14 -## The user provider for authentication. -## Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd" -## @toml2docs:none-default -#+ user_provider = "static_user_provider:file:/path/to/users" - ## flow engine options. [flow] ## The number of flow worker in flownode. diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index 889cf719a6..126e1a4069 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -39,7 +39,6 @@ use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions}; use common_version::{short_version, verbose_version}; use flow::{ FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder, FrontendClient, FrontendInvoker, - get_flow_auth_options, }; use meta_client::{MetaClientOptions, MetaClientType}; use plugins::flownode::context::GrpcConfigureContext; @@ -167,9 +166,6 @@ struct StartCommand { /// HTTP request timeout in seconds. #[clap(long)] http_timeout: Option, - /// User Provider cfg, for auth, currently only support static user provider - #[clap(long)] - user_provider: Option, } impl StartCommand { @@ -241,10 +237,6 @@ impl StartCommand { opts.http.timeout = Duration::from_secs(http_timeout); } - if let Some(user_provider) = &self.user_provider { - opts.user_provider = Some(user_provider.clone()); - } - ensure!( opts.node_id.is_some(), MissingConfigSnafu { @@ -367,10 +359,8 @@ impl StartCommand { ); let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone())); - let flow_auth_header = get_flow_auth_options(&opts).context(StartFlownodeSnafu)?; let frontend_client = FrontendClient::from_meta_client( meta_client.clone(), - flow_auth_header, opts.query.clone(), opts.flow.batching_mode.clone(), ) @@ -498,5 +488,18 @@ mod tests { assert!(!help.contains("--rpc-server-addr")); assert!(!help.contains("--rpc-addr")); assert!(!help.contains("--rpc-hostname")); + assert!(!help.contains("--user-provider")); + } + + #[test] + fn test_user_provider_cli_option_is_removed() { + let command = StartCommand::try_parse_from([ + "flownode", + "--node-id", + "14", + "--user-provider", + "static_user_provider:cmd:test=test", + ]); + assert!(command.is_err()); } } diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index cee29e4456..a8efec6244 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -250,7 +250,6 @@ fn test_load_flownode_example_config() { addr: "127.0.0.1:4000".to_string(), ..Default::default() }, - user_provider: None, memory: Default::default(), }, ..Default::default() diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 9223ffc026..55f47901a2 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -13,7 +13,6 @@ arrow.workspace = true arrow-schema.workspace = true async-recursion = "1.0" async-trait.workspace = true -auth.workspace = true bytes.workspace = true cache.workspace = true catalog.workspace = true @@ -79,6 +78,7 @@ tokio.workspace = true tonic.workspace = true [dev-dependencies] +arrow-flight.workspace = true catalog = { workspace = true, features = ["testing"] } common-catalog.workspace = true pretty_assertions.workspace = true @@ -86,3 +86,4 @@ prost.workspace = true query.workspace = true session.workspace = true table.workspace = true +tokio-stream = { workspace = true, features = ["net"] } diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 7dc4bc93d8..351d176db0 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -110,7 +110,6 @@ pub struct FlownodeOptions { pub logging: LoggingOptions, pub tracing: TracingOptions, pub query: QueryOptions, - pub user_provider: Option, pub memory: MemoryOptions, } @@ -131,7 +130,6 @@ impl Default for FlownodeOptions { allow_query_fallback: false, memory_pool_size: MemoryLimit::default(), }, - user_provider: None, memory: MemoryOptions::default(), } } diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index f4ca149f1a..7d451d9038 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -198,9 +198,9 @@ impl FlowDualEngine { } } - /// In distributed mode, scan periodically(1s) until available frontend is found, or timeout, - /// in standalone mode, return immediately - /// notice here if any frontend appear in cluster info this function will return immediately + /// In distributed mode, scan periodically(1s) until all advertised frontends + /// accept unauthenticated queries, or timeout. In standalone mode, return + /// immediately. async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> { if !self.is_distributed() { return Ok(()); @@ -215,8 +215,20 @@ impl FlowDualEngine { .iter() .map(|peer| &peer.addr) .collect::>(); - info!("Available frontend found: {:?}", fe_list); - return Ok(()); + let probe_failures = frontend_client + .check_all_frontends_without_auth(&frontend_list) + .await?; + if probe_failures.is_empty() { + info!( + "Available frontend found and unauthenticated probe succeeded: {:?}", + fe_list + ); + return Ok(()); + } + warn!( + "Unauthenticated frontend probe failed, will retry. frontends={:?}, failures={:?}", + fe_list, probe_failures + ); } let elapsed = now.elapsed(); tokio::time::sleep(sleep_duration).await; @@ -224,7 +236,7 @@ impl FlowDualEngine { if elapsed >= timeout { return NoAvailableFrontendSnafu { timeout, - context: "No available frontend found in cluster info", + context: "No frontend accepted unauthenticated flownode probe", } .fail(); } @@ -499,19 +511,14 @@ impl ConsistentCheckTask { .batching_engine() .batch_opts .experimental_frontend_scan_timeout; + engine + .wait_for_available_frontend(frontend_scan_timeout) + .await?; let (tx, mut rx) = tokio::sync::mpsc::channel(1); let (trigger_tx, mut trigger_rx) = tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10); let handle = common_runtime::spawn_global(async move { - // first check if available frontend is found - if let Err(err) = engine - .wait_for_available_frontend(frontend_scan_timeout) - .await - { - warn!("No frontend is available yet:\n {err:?}"); - } - - // then do recover flows, if failed, always retry + // Recover flows after the startup frontend probe succeeds. let mut recover_retry = 0; while let Err(err) = engine.check_flow_consistent(true, false).await { recover_retry += 1; diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index c6194d96b3..a4c5cd8ba1 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -20,13 +20,14 @@ use std::sync::{Arc, Mutex, Weak}; use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::{CreateTableExpr, QueryRequest}; -use client::{Client, Database, OutputWithMetrics}; +use client::{Client, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, Database, OutputWithMetrics}; use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config}; use common_meta::peer::{Peer, PeerDiscovery}; use common_query::{Output, OutputData}; use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry}; use common_telemetry::warn; +use futures::stream::{FuturesUnordered, StreamExt}; use meta_client::client::MetaClient; use query::datafusion::QUERY_PARALLELISM_HINT; use query::metrics::terminal_recordbatch_metrics_from_plan; @@ -39,12 +40,12 @@ use session::hints::READ_PREFERENCE_HINT; use snafu::{OptionExt, ResultExt}; use tokio::sync::SetOnce; +use crate::Error; use crate::batching_mode::BatchingModeOptions; use crate::error::{ CreateSinkTableSnafu, ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu, }; -use crate::{Error, FlowAuthHeader}; /// Adapter trait for [`GrpcQueryHandler`] that boxes the underlying error into [`BoxedError`]. /// @@ -92,7 +93,6 @@ pub enum FrontendClient { Distributed { meta_client: Arc, chnl_mgr: ChannelManager, - auth: Option, query: QueryOptions, batch_opts: BatchingModeOptions, }, @@ -133,11 +133,10 @@ impl FrontendClient { pub fn from_meta_client( meta_client: Arc, - auth: Option, query: QueryOptions, batch_opts: BatchingModeOptions, ) -> Result { - common_telemetry::info!("Frontend client build with auth={:?}", auth); + common_telemetry::info!("Frontend client build without auth"); Ok(Self::Distributed { meta_client, chnl_mgr: { @@ -149,7 +148,6 @@ impl FrontendClient { .context(InvalidClientConfigSnafu)?; ChannelManager::with_config(cfg, tls_config) }, - auth, query, batch_opts, }) @@ -212,6 +210,63 @@ impl FrontendClient { .context(ExternalSnafu) } + /// Probes all discovered frontends without auth. + /// + /// Returns non-auth failures to allow callers to retry transient connectivity + /// errors. Authentication failures are returned immediately because they mean + /// a frontend advertised an auth-protected endpoint to flownodes. + pub(crate) async fn check_all_frontends_without_auth( + &self, + frontends: &[Peer], + ) -> Result, Error> { + let Self::Distributed { + chnl_mgr, + batch_opts, + .. + } = self + else { + return Ok(vec![]); + }; + + let probe_timeout = batch_opts.grpc_conn_timeout; + let mut probes = frontends + .iter() + .map(|peer| { + let addr = peer.addr.clone(); + let chnl_mgr = chnl_mgr.clone(); + + async move { + let client = Client::with_manager_and_urls(chnl_mgr, vec![addr.clone()]); + let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + + match tokio::time::timeout(probe_timeout, database.sql("SELECT 1")).await { + Ok(Ok(_)) => Ok(None), + Ok(Err(err)) if err.tonic_code() == Some(tonic::Code::Unauthenticated) => { + Err(err).context(InvalidRequestSnafu { + context: format!( + "Frontend {addr} rejected unauthenticated flownode probe; ensure frontend internal_grpc is advertised to metasrv" + ), + }) + } + Ok(Err(err)) => Ok(Some(format!("{addr}: {err}"))), + Err(_) => Ok(Some(format!( + "{addr}: health check timed out after {probe_timeout:?}" + ))), + } + } + }) + .collect::>(); + + let mut failures = Vec::new(); + while let Some(probe_result) = probes.next().await { + if let Some(failure) = probe_result? { + failures.push(failure); + } + } + + Ok(failures) + } + /// Get a frontend discovered by metasrv and verified with a query probe. async fn get_random_active_frontend( &self, @@ -221,7 +276,6 @@ impl FrontendClient { let Self::Distributed { meta_client: _, chnl_mgr, - auth, query: _, batch_opts, } = self @@ -242,13 +296,7 @@ impl FrontendClient { for peer in frontends { let addr = peer.addr.clone(); let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]); - let database = { - let mut db = Database::new(catalog, schema, client); - if let Some(auth) = auth { - db.set_auth(auth.auth().clone()); - } - db - }; + let database = Database::new(catalog, schema, client); let db = DatabaseWithPeer::new(database, peer); match db.try_select_one().await { Ok(_) => return Ok(db), @@ -591,6 +639,8 @@ mod tests { use std::task::{Context, Poll}; use std::time::Duration; + use arrow_flight::flight_service_server::FlightServiceServer; + use arrow_flight::{FlightData, Ticket}; use common_query::{Output, OutputData}; use common_recordbatch::adapter::RecordBatchMetrics; use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream}; @@ -598,7 +648,12 @@ mod tests { use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::Int32Vector; use futures::StreamExt; + use servers::grpc::flight::{FlightCraft, FlightCraftWrapper, TonicStream}; + use tokio::net::TcpListener; + use tokio::task::JoinHandle; use tokio::time::timeout; + use tokio_stream::wrappers::TcpListenerStream; + use tonic::{Request as TonicRequest, Response as TonicResponse, Status}; use super::*; @@ -657,6 +712,16 @@ mod tests { #[derive(Debug)] struct SnapshotBindingHandler; + #[derive(Debug)] + struct RejectUnauthenticatedFlight; + + #[derive(Debug)] + struct SlowFlight; + + struct WaitForConcurrentFlight { + barrier: Arc, + } + #[async_trait::async_trait] impl GrpcQueryHandlerWithBoxedError for NoopHandler { async fn do_query( @@ -725,6 +790,54 @@ mod tests { } } + #[async_trait::async_trait] + impl FlightCraft for RejectUnauthenticatedFlight { + async fn do_get( + &self, + _request: TonicRequest, + ) -> std::result::Result>, Status> { + Err(Status::unauthenticated("auth failed")) + } + } + + #[async_trait::async_trait] + impl FlightCraft for SlowFlight { + async fn do_get( + &self, + _request: TonicRequest, + ) -> std::result::Result>, Status> { + tokio::time::sleep(Duration::from_secs(60)).await; + Err(Status::unavailable("slow response")) + } + } + + #[async_trait::async_trait] + impl FlightCraft for WaitForConcurrentFlight { + async fn do_get( + &self, + _request: TonicRequest, + ) -> std::result::Result>, Status> { + self.barrier.wait().await; + Err(Status::unavailable("probe started concurrently")) + } + } + + async fn start_flight_server(handler: T) -> (String, JoinHandle<()>) { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("bind test flight server"); + let addr = listener.local_addr().expect("local addr").to_string(); + let server = tokio::spawn(async move { + tonic::transport::Server::builder() + .add_service(FlightServiceServer::new(FlightCraftWrapper(handler))) + .serve_with_incoming(TcpListenerStream::new(listener)) + .await + .expect("serve test flight server"); + }); + + (addr, server) + } + #[tokio::test] async fn wait_initialized() { let (client, handler_mut) = @@ -759,7 +872,6 @@ mod tests { let meta_client = Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)); let client = FrontendClient::from_meta_client( meta_client, - None, QueryOptions::default(), BatchingModeOptions::default(), ) @@ -883,4 +995,109 @@ mod tests { assert!(format!("{err:?}").contains("Invalid value for flow.return_region_seq")); } + + #[tokio::test] + async fn test_check_all_frontends_without_auth_fails_fast_on_unauthenticated_frontend() { + let (addr, server) = start_flight_server(RejectUnauthenticatedFlight).await; + let client = FrontendClient::from_meta_client( + Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)), + QueryOptions::default(), + BatchingModeOptions::default(), + ) + .unwrap(); + + let err = client + .check_all_frontends_without_auth(&[Peer { + id: 1, + addr: addr.clone(), + }]) + .await + .unwrap_err(); + server.abort(); + + let Error::InvalidRequest { + context, source, .. + } = err + else { + panic!("expected InvalidRequest, got {err:?}"); + }; + assert!(context.contains(&addr)); + assert!(context.contains("rejected unauthenticated flownode probe")); + assert_eq!(source.tonic_code(), Some(tonic::Code::Unauthenticated)); + } + + #[tokio::test] + async fn test_check_all_frontends_without_auth_uses_grpc_connection_timeout() { + let (addr, server) = start_flight_server(SlowFlight).await; + let client = FrontendClient::from_meta_client( + Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)), + QueryOptions::default(), + BatchingModeOptions { + grpc_conn_timeout: Duration::from_millis(50), + ..Default::default() + }, + ) + .unwrap(); + + let failures = client + .check_all_frontends_without_auth(&[Peer { + id: 1, + addr: addr.clone(), + }]) + .await + .unwrap(); + server.abort(); + + assert_eq!(failures.len(), 1); + assert!(failures[0].contains(&addr)); + assert!(failures[0].contains("health check timed out")); + } + + #[tokio::test] + async fn test_check_all_frontends_without_auth_checks_frontends_concurrently() { + let barrier = Arc::new(tokio::sync::Barrier::new(2)); + let (addr1, server1) = start_flight_server(WaitForConcurrentFlight { + barrier: barrier.clone(), + }) + .await; + let (addr2, server2) = start_flight_server(WaitForConcurrentFlight { barrier }).await; + let client = FrontendClient::from_meta_client( + Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)), + QueryOptions::default(), + BatchingModeOptions { + grpc_conn_timeout: Duration::from_millis(500), + ..Default::default() + }, + ) + .unwrap(); + + let failures = timeout( + Duration::from_secs(2), + client.check_all_frontends_without_auth(&[ + Peer { + id: 1, + addr: addr1.clone(), + }, + Peer { + id: 2, + addr: addr2.clone(), + }, + ]), + ) + .await + .expect("concurrent probes should complete before per-peer timeouts") + .unwrap(); + server1.abort(); + server2.abort(); + + assert_eq!(failures.len(), 2); + assert!(failures.iter().any(|failure| failure.contains(&addr1))); + assert!(failures.iter().any(|failure| failure.contains(&addr2))); + assert!( + failures + .iter() + .all(|failure| !failure.contains("health check timed out")), + "sequential probes would time out before both requests reach the barrier: {failures:?}" + ); + } } diff --git a/src/flow/src/engine.rs b/src/flow/src/engine.rs index d7ac7cdb45..a360a1015d 100644 --- a/src/flow/src/engine.rs +++ b/src/flow/src/engine.rs @@ -26,42 +26,6 @@ use crate::Error; pub type FlowId = u64; pub type TableName = [String; 3]; -#[derive(Clone)] -pub struct FlowAuthHeader { - auth_schema: api::v1::auth_header::AuthScheme, -} - -impl std::fmt::Debug for FlowAuthHeader { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self.auth() { - api::v1::auth_header::AuthScheme::Basic(basic) => f - .debug_struct("Basic") - .field("username", &basic.username) - .field("password", &"") - .finish(), - api::v1::auth_header::AuthScheme::Token(_) => f - .debug_struct("Token") - .field("token", &"") - .finish(), - } - } -} - -impl FlowAuthHeader { - pub fn from_user_pwd(username: &str, pwd: &str) -> Self { - Self { - auth_schema: api::v1::auth_header::AuthScheme::Basic(api::v1::Basic { - username: username.to_string(), - password: pwd.to_string(), - }), - } - } - - pub fn auth(&self) -> &api::v1::auth_header::AuthScheme { - &self.auth_schema - } -} - /// The arguments to create a flow #[derive(Debug, Clone)] pub struct CreateFlowArgs { diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs index 46cfb7c1c3..825c1dc5ac 100644 --- a/src/flow/src/error.rs +++ b/src/flow/src/error.rs @@ -165,9 +165,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Invalid auth config"))] - IllegalAuthConfig { source: auth::error::Error }, - #[snafu(display("Flow plan error: {reason}"))] Plan { reason: String, @@ -358,7 +355,6 @@ impl ErrorExt for Error { Self::InvalidQuery { .. } | Self::InvalidRequest { .. } | Self::ParseAddr { .. } - | Self::IllegalAuthConfig { .. } | Self::InvalidClientConfig { .. } => StatusCode::InvalidArguments, Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(), diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index fe8a760a07..e55e7bab76 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -44,12 +44,10 @@ mod test_utils; pub use adapter::flownode_impl::FlowDualEngineRef; pub use adapter::{FlowConfig, FlowStreamingEngineRef, StreamingEngine}; pub use batching_mode::frontend_client::{FrontendClient, GrpcQueryHandlerWithBoxedError}; -pub use engine::FlowAuthHeader; pub(crate) use engine::{CreateFlowArgs, FlowId, TableName}; pub use error::{Error, Result}; pub use server::{ FlownodeBuilder, FlownodeInstance, FlownodeServer, FlownodeServiceBuilder, FrontendInvoker, - get_flow_auth_options, }; pub use crate::adapter::FlownodeOptions; diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index f3e2f20b80..97a61af90b 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -57,14 +57,14 @@ use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef}; use crate::adapter::{FlowStreamingEngineRef, create_worker}; use crate::batching_mode::engine::BatchingEngine; use crate::error::{ - CacheRequiredSnafu, ExternalSnafu, IllegalAuthConfigSnafu, ListFlowsSnafu, ParseAddrSnafu, - ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, to_status_with_last_err, + CacheRequiredSnafu, ExternalSnafu, ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, + StartServerSnafu, UnexpectedSnafu, to_status_with_last_err, }; use crate::heartbeat::HeartbeatTask; use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS}; use crate::transform::register_function_to_query_engine; use crate::utils::{SizeReportSender, StateReportHandler}; -use crate::{Error, FlowAuthHeader, FlownodeOptions, FrontendClient, StreamingEngine}; +use crate::{Error, FlownodeOptions, FrontendClient, StreamingEngine}; pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER"; /// wrapping flow node manager to avoid orphan rule with Arc<...> @@ -193,6 +193,7 @@ impl FlownodeServer { async fn start_workers(&self) -> Result<(), Error> { let manager_ref = self.inner.flow_service.dual_engine.clone(); let mut state_report_task_handler = self.inner.state_report_task_handler.lock().await; + let started_state_report_task = state_report_task_handler.is_none(); if state_report_task_handler.is_none() { *state_report_task_handler = manager_ref.clone().start_state_report_task().await; } @@ -206,15 +207,39 @@ impl FlownodeServer { .await .replace(handle); - self.inner + if let Err(err) = self + .inner .flow_service .dual_engine .start_flow_consistent_check_task() - .await?; + .await + { + self.rollback_started_workers(started_state_report_task) + .await; + return Err(err); + } Ok(()) } + async fn rollback_started_workers(&self, abort_state_report_task: bool) { + let tx = self.inner.worker_shutdown_tx.lock().await; + if tx.send(()).is_err() { + info!("Receiver dropped, the flow node server has already shutdown"); + } + drop(tx); + + if let Some(handle) = self.inner.streaming_task_handler.lock().await.take() { + handle.abort(); + } + + if abort_state_report_task + && let Some(handle) = self.inner.state_report_task_handler.lock().await.take() + { + handle.abort(); + } + } + /// Stop the background task for streaming computation. async fn stop_workers(&self) -> Result<(), Error> { let tx = self.inner.worker_shutdown_tx.lock().await; @@ -289,21 +314,6 @@ impl FlownodeInstance { } } -pub fn get_flow_auth_options(fn_opts: &FlownodeOptions) -> Result, Error> { - if let Some(user_provider) = fn_opts.user_provider.as_ref() { - let static_provider = auth::static_user_provider_from_option(user_provider) - .context(IllegalAuthConfigSnafu)?; - - let (usr, pwd) = static_provider - .get_one_user_pwd() - .context(IllegalAuthConfigSnafu)?; - let auth_header = FlowAuthHeader::from_user_pwd(&usr, &pwd); - return Ok(Some(auth_header)); - } - - Ok(None) -} - /// [`FlownodeInstance`] Builder pub struct FlownodeBuilder { opts: FlownodeOptions, @@ -708,11 +718,13 @@ mod tests { use std::sync::Arc; use std::time::Duration; + use api::v1::meta::Role; use catalog::memory::new_memory_catalog_manager; use common_base::Plugins; use common_meta::key::TableMetadataManager; use common_meta::key::flow::FlowMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; + use meta_client::client::MetaClient; use query::options::QueryOptions; use super::*; @@ -722,6 +734,22 @@ mod tests { use crate::utils::SizeReportSender; async fn new_test_flownode_server() -> (FlownodeServer, SizeReportSender) { + let (frontend_client, _handler) = + FrontendClient::from_empty_grpc_handler(QueryOptions::default()); + + new_test_flownode_server_with_frontend_client( + frontend_client, + BatchingModeOptions::default(), + None, + ) + .await + } + + async fn new_test_flownode_server_with_frontend_client( + frontend_client: FrontendClient, + batching_opts: BatchingModeOptions, + node_id: Option, + ) -> (FlownodeServer, SizeReportSender) { let kv_backend = Arc::new(MemoryKvBackend::new()); let table_meta = Arc::new(TableMetadataManager::new(kv_backend.clone())); table_meta.init().await.unwrap(); @@ -730,19 +758,17 @@ mod tests { let query_engine = crate::test_utils::create_test_query_engine(); let streaming_engine = Arc::new(StreamingEngine::new( - None, + node_id, query_engine.clone(), table_meta.clone(), )); - let (frontend_client, _handler) = - FrontendClient::from_empty_grpc_handler(QueryOptions::default()); let batching_engine = Arc::new(BatchingEngine::new( Arc::new(frontend_client), query_engine, flow_meta.clone(), table_meta, catalog_manager.clone(), - BatchingModeOptions::default(), + batching_opts, )); let dual_engine = Arc::new(FlowDualEngine::new( streaming_engine, @@ -774,4 +800,33 @@ mod tests { server.stop_workers().await.unwrap(); } + + #[tokio::test] + async fn test_start_workers_rolls_back_on_check_task_start_failure() { + let batching_opts = BatchingModeOptions { + experimental_frontend_scan_timeout: Duration::from_millis(1), + ..Default::default() + }; + let frontend_client = FrontendClient::from_meta_client( + Arc::new(MetaClient::new(0, Role::Frontend)), + QueryOptions::default(), + batching_opts.clone(), + ) + .unwrap(); + let (server, _report_sender) = + new_test_flownode_server_with_frontend_client(frontend_client, batching_opts, Some(1)) + .await; + + server.start_workers().await.unwrap_err(); + + assert!(server.inner.streaming_task_handler.lock().await.is_none()); + assert!( + server + .inner + .state_report_task_handler + .lock() + .await + .is_none() + ); + } }