diff --git a/Cargo.lock b/Cargo.lock
index 646e793c64..85a2585282 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5288,10 +5288,10 @@ version = "1.1.0"
dependencies = [
"api",
"arrow 58.3.0",
+ "arrow-flight",
"arrow-schema 58.3.0",
"async-recursion",
"async-trait",
- "auth",
"bytes",
"cache",
"catalog",
@@ -5356,6 +5356,7 @@ dependencies = [
"substrait 1.1.0",
"table",
"tokio",
+ "tokio-stream",
"tonic 0.14.2",
]
diff --git a/config/config.md b/config/config.md
index df06d2153c..6006a34eec 100644
--- a/config/config.md
+++ b/config/config.md
@@ -619,7 +619,6 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `node_id` | Integer | Unset | The flownode identifier and should be unique in the cluster. |
-| `user_provider` | String | Unset | The user provider for authentication.
Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd" |
| `flow` | -- | -- | flow engine options. |
| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.
Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
| `flow.batching_mode` | -- | -- | -- |
diff --git a/config/flownode.example.toml b/config/flownode.example.toml
index ff8a9e4a50..f8f7f1a779 100644
--- a/config/flownode.example.toml
+++ b/config/flownode.example.toml
@@ -2,11 +2,6 @@
## @toml2docs:none-default
node_id = 14
-## The user provider for authentication.
-## Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd"
-## @toml2docs:none-default
-#+ user_provider = "static_user_provider:file:/path/to/users"
-
## flow engine options.
[flow]
## The number of flow worker in flownode.
diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs
index 889cf719a6..126e1a4069 100644
--- a/src/cmd/src/flownode.rs
+++ b/src/cmd/src/flownode.rs
@@ -39,7 +39,6 @@ use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
use common_version::{short_version, verbose_version};
use flow::{
FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder, FrontendClient, FrontendInvoker,
- get_flow_auth_options,
};
use meta_client::{MetaClientOptions, MetaClientType};
use plugins::flownode::context::GrpcConfigureContext;
@@ -167,9 +166,6 @@ struct StartCommand {
/// HTTP request timeout in seconds.
#[clap(long)]
http_timeout: Option,
- /// User Provider cfg, for auth, currently only support static user provider
- #[clap(long)]
- user_provider: Option,
}
impl StartCommand {
@@ -241,10 +237,6 @@ impl StartCommand {
opts.http.timeout = Duration::from_secs(http_timeout);
}
- if let Some(user_provider) = &self.user_provider {
- opts.user_provider = Some(user_provider.clone());
- }
-
ensure!(
opts.node_id.is_some(),
MissingConfigSnafu {
@@ -367,10 +359,8 @@ impl StartCommand {
);
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
- let flow_auth_header = get_flow_auth_options(&opts).context(StartFlownodeSnafu)?;
let frontend_client = FrontendClient::from_meta_client(
meta_client.clone(),
- flow_auth_header,
opts.query.clone(),
opts.flow.batching_mode.clone(),
)
@@ -498,5 +488,18 @@ mod tests {
assert!(!help.contains("--rpc-server-addr"));
assert!(!help.contains("--rpc-addr"));
assert!(!help.contains("--rpc-hostname"));
+ assert!(!help.contains("--user-provider"));
+ }
+
+ #[test]
+ fn test_user_provider_cli_option_is_removed() {
+ let command = StartCommand::try_parse_from([
+ "flownode",
+ "--node-id",
+ "14",
+ "--user-provider",
+ "static_user_provider:cmd:test=test",
+ ]);
+ assert!(command.is_err());
}
}
diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs
index cee29e4456..a8efec6244 100644
--- a/src/cmd/tests/load_config_test.rs
+++ b/src/cmd/tests/load_config_test.rs
@@ -250,7 +250,6 @@ fn test_load_flownode_example_config() {
addr: "127.0.0.1:4000".to_string(),
..Default::default()
},
- user_provider: None,
memory: Default::default(),
},
..Default::default()
diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml
index 9223ffc026..55f47901a2 100644
--- a/src/flow/Cargo.toml
+++ b/src/flow/Cargo.toml
@@ -13,7 +13,6 @@ arrow.workspace = true
arrow-schema.workspace = true
async-recursion = "1.0"
async-trait.workspace = true
-auth.workspace = true
bytes.workspace = true
cache.workspace = true
catalog.workspace = true
@@ -79,6 +78,7 @@ tokio.workspace = true
tonic.workspace = true
[dev-dependencies]
+arrow-flight.workspace = true
catalog = { workspace = true, features = ["testing"] }
common-catalog.workspace = true
pretty_assertions.workspace = true
@@ -86,3 +86,4 @@ prost.workspace = true
query.workspace = true
session.workspace = true
table.workspace = true
+tokio-stream = { workspace = true, features = ["net"] }
diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs
index 7dc4bc93d8..351d176db0 100644
--- a/src/flow/src/adapter.rs
+++ b/src/flow/src/adapter.rs
@@ -110,7 +110,6 @@ pub struct FlownodeOptions {
pub logging: LoggingOptions,
pub tracing: TracingOptions,
pub query: QueryOptions,
- pub user_provider: Option,
pub memory: MemoryOptions,
}
@@ -131,7 +130,6 @@ impl Default for FlownodeOptions {
allow_query_fallback: false,
memory_pool_size: MemoryLimit::default(),
},
- user_provider: None,
memory: MemoryOptions::default(),
}
}
diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs
index f4ca149f1a..7d451d9038 100644
--- a/src/flow/src/adapter/flownode_impl.rs
+++ b/src/flow/src/adapter/flownode_impl.rs
@@ -198,9 +198,9 @@ impl FlowDualEngine {
}
}
- /// In distributed mode, scan periodically(1s) until available frontend is found, or timeout,
- /// in standalone mode, return immediately
- /// notice here if any frontend appear in cluster info this function will return immediately
+ /// In distributed mode, scan periodically(1s) until all advertised frontends
+ /// accept unauthenticated queries, or timeout. In standalone mode, return
+ /// immediately.
async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> {
if !self.is_distributed() {
return Ok(());
@@ -215,8 +215,20 @@ impl FlowDualEngine {
.iter()
.map(|peer| &peer.addr)
.collect::>();
- info!("Available frontend found: {:?}", fe_list);
- return Ok(());
+ let probe_failures = frontend_client
+ .check_all_frontends_without_auth(&frontend_list)
+ .await?;
+ if probe_failures.is_empty() {
+ info!(
+ "Available frontend found and unauthenticated probe succeeded: {:?}",
+ fe_list
+ );
+ return Ok(());
+ }
+ warn!(
+ "Unauthenticated frontend probe failed, will retry. frontends={:?}, failures={:?}",
+ fe_list, probe_failures
+ );
}
let elapsed = now.elapsed();
tokio::time::sleep(sleep_duration).await;
@@ -224,7 +236,7 @@ impl FlowDualEngine {
if elapsed >= timeout {
return NoAvailableFrontendSnafu {
timeout,
- context: "No available frontend found in cluster info",
+ context: "No frontend accepted unauthenticated flownode probe",
}
.fail();
}
@@ -499,19 +511,14 @@ impl ConsistentCheckTask {
.batching_engine()
.batch_opts
.experimental_frontend_scan_timeout;
+ engine
+ .wait_for_available_frontend(frontend_scan_timeout)
+ .await?;
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let (trigger_tx, mut trigger_rx) =
tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
let handle = common_runtime::spawn_global(async move {
- // first check if available frontend is found
- if let Err(err) = engine
- .wait_for_available_frontend(frontend_scan_timeout)
- .await
- {
- warn!("No frontend is available yet:\n {err:?}");
- }
-
- // then do recover flows, if failed, always retry
+ // Recover flows after the startup frontend probe succeeds.
let mut recover_retry = 0;
while let Err(err) = engine.check_flow_consistent(true, false).await {
recover_retry += 1;
diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs
index c6194d96b3..a4c5cd8ba1 100644
--- a/src/flow/src/batching_mode/frontend_client.rs
+++ b/src/flow/src/batching_mode/frontend_client.rs
@@ -20,13 +20,14 @@ use std::sync::{Arc, Mutex, Weak};
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{CreateTableExpr, QueryRequest};
-use client::{Client, Database, OutputWithMetrics};
+use client::{Client, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, Database, OutputWithMetrics};
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config};
use common_meta::peer::{Peer, PeerDiscovery};
use common_query::{Output, OutputData};
use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
use common_telemetry::warn;
+use futures::stream::{FuturesUnordered, StreamExt};
use meta_client::client::MetaClient;
use query::datafusion::QUERY_PARALLELISM_HINT;
use query::metrics::terminal_recordbatch_metrics_from_plan;
@@ -39,12 +40,12 @@ use session::hints::READ_PREFERENCE_HINT;
use snafu::{OptionExt, ResultExt};
use tokio::sync::SetOnce;
+use crate::Error;
use crate::batching_mode::BatchingModeOptions;
use crate::error::{
CreateSinkTableSnafu, ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu,
NoAvailableFrontendSnafu, UnexpectedSnafu,
};
-use crate::{Error, FlowAuthHeader};
/// Adapter trait for [`GrpcQueryHandler`] that boxes the underlying error into [`BoxedError`].
///
@@ -92,7 +93,6 @@ pub enum FrontendClient {
Distributed {
meta_client: Arc,
chnl_mgr: ChannelManager,
- auth: Option,
query: QueryOptions,
batch_opts: BatchingModeOptions,
},
@@ -133,11 +133,10 @@ impl FrontendClient {
pub fn from_meta_client(
meta_client: Arc,
- auth: Option,
query: QueryOptions,
batch_opts: BatchingModeOptions,
) -> Result {
- common_telemetry::info!("Frontend client build with auth={:?}", auth);
+ common_telemetry::info!("Frontend client build without auth");
Ok(Self::Distributed {
meta_client,
chnl_mgr: {
@@ -149,7 +148,6 @@ impl FrontendClient {
.context(InvalidClientConfigSnafu)?;
ChannelManager::with_config(cfg, tls_config)
},
- auth,
query,
batch_opts,
})
@@ -212,6 +210,63 @@ impl FrontendClient {
.context(ExternalSnafu)
}
+ /// Probes all discovered frontends without auth.
+ ///
+ /// Returns non-auth failures to allow callers to retry transient connectivity
+ /// errors. Authentication failures are returned immediately because they mean
+ /// a frontend advertised an auth-protected endpoint to flownodes.
+ pub(crate) async fn check_all_frontends_without_auth(
+ &self,
+ frontends: &[Peer],
+ ) -> Result, Error> {
+ let Self::Distributed {
+ chnl_mgr,
+ batch_opts,
+ ..
+ } = self
+ else {
+ return Ok(vec![]);
+ };
+
+ let probe_timeout = batch_opts.grpc_conn_timeout;
+ let mut probes = frontends
+ .iter()
+ .map(|peer| {
+ let addr = peer.addr.clone();
+ let chnl_mgr = chnl_mgr.clone();
+
+ async move {
+ let client = Client::with_manager_and_urls(chnl_mgr, vec![addr.clone()]);
+ let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
+
+ match tokio::time::timeout(probe_timeout, database.sql("SELECT 1")).await {
+ Ok(Ok(_)) => Ok(None),
+ Ok(Err(err)) if err.tonic_code() == Some(tonic::Code::Unauthenticated) => {
+ Err(err).context(InvalidRequestSnafu {
+ context: format!(
+ "Frontend {addr} rejected unauthenticated flownode probe; ensure frontend internal_grpc is advertised to metasrv"
+ ),
+ })
+ }
+ Ok(Err(err)) => Ok(Some(format!("{addr}: {err}"))),
+ Err(_) => Ok(Some(format!(
+ "{addr}: health check timed out after {probe_timeout:?}"
+ ))),
+ }
+ }
+ })
+ .collect::>();
+
+ let mut failures = Vec::new();
+ while let Some(probe_result) = probes.next().await {
+ if let Some(failure) = probe_result? {
+ failures.push(failure);
+ }
+ }
+
+ Ok(failures)
+ }
+
/// Get a frontend discovered by metasrv and verified with a query probe.
async fn get_random_active_frontend(
&self,
@@ -221,7 +276,6 @@ impl FrontendClient {
let Self::Distributed {
meta_client: _,
chnl_mgr,
- auth,
query: _,
batch_opts,
} = self
@@ -242,13 +296,7 @@ impl FrontendClient {
for peer in frontends {
let addr = peer.addr.clone();
let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
- let database = {
- let mut db = Database::new(catalog, schema, client);
- if let Some(auth) = auth {
- db.set_auth(auth.auth().clone());
- }
- db
- };
+ let database = Database::new(catalog, schema, client);
let db = DatabaseWithPeer::new(database, peer);
match db.try_select_one().await {
Ok(_) => return Ok(db),
@@ -591,6 +639,8 @@ mod tests {
use std::task::{Context, Poll};
use std::time::Duration;
+ use arrow_flight::flight_service_server::FlightServiceServer;
+ use arrow_flight::{FlightData, Ticket};
use common_query::{Output, OutputData};
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream};
@@ -598,7 +648,12 @@ mod tests {
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
use futures::StreamExt;
+ use servers::grpc::flight::{FlightCraft, FlightCraftWrapper, TonicStream};
+ use tokio::net::TcpListener;
+ use tokio::task::JoinHandle;
use tokio::time::timeout;
+ use tokio_stream::wrappers::TcpListenerStream;
+ use tonic::{Request as TonicRequest, Response as TonicResponse, Status};
use super::*;
@@ -657,6 +712,16 @@ mod tests {
#[derive(Debug)]
struct SnapshotBindingHandler;
+ #[derive(Debug)]
+ struct RejectUnauthenticatedFlight;
+
+ #[derive(Debug)]
+ struct SlowFlight;
+
+ struct WaitForConcurrentFlight {
+ barrier: Arc,
+ }
+
#[async_trait::async_trait]
impl GrpcQueryHandlerWithBoxedError for NoopHandler {
async fn do_query(
@@ -725,6 +790,54 @@ mod tests {
}
}
+ #[async_trait::async_trait]
+ impl FlightCraft for RejectUnauthenticatedFlight {
+ async fn do_get(
+ &self,
+ _request: TonicRequest,
+ ) -> std::result::Result>, Status> {
+ Err(Status::unauthenticated("auth failed"))
+ }
+ }
+
+ #[async_trait::async_trait]
+ impl FlightCraft for SlowFlight {
+ async fn do_get(
+ &self,
+ _request: TonicRequest,
+ ) -> std::result::Result>, Status> {
+ tokio::time::sleep(Duration::from_secs(60)).await;
+ Err(Status::unavailable("slow response"))
+ }
+ }
+
+ #[async_trait::async_trait]
+ impl FlightCraft for WaitForConcurrentFlight {
+ async fn do_get(
+ &self,
+ _request: TonicRequest,
+ ) -> std::result::Result>, Status> {
+ self.barrier.wait().await;
+ Err(Status::unavailable("probe started concurrently"))
+ }
+ }
+
+ async fn start_flight_server(handler: T) -> (String, JoinHandle<()>) {
+ let listener = TcpListener::bind("127.0.0.1:0")
+ .await
+ .expect("bind test flight server");
+ let addr = listener.local_addr().expect("local addr").to_string();
+ let server = tokio::spawn(async move {
+ tonic::transport::Server::builder()
+ .add_service(FlightServiceServer::new(FlightCraftWrapper(handler)))
+ .serve_with_incoming(TcpListenerStream::new(listener))
+ .await
+ .expect("serve test flight server");
+ });
+
+ (addr, server)
+ }
+
#[tokio::test]
async fn wait_initialized() {
let (client, handler_mut) =
@@ -759,7 +872,6 @@ mod tests {
let meta_client = Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend));
let client = FrontendClient::from_meta_client(
meta_client,
- None,
QueryOptions::default(),
BatchingModeOptions::default(),
)
@@ -883,4 +995,109 @@ mod tests {
assert!(format!("{err:?}").contains("Invalid value for flow.return_region_seq"));
}
+
+ #[tokio::test]
+ async fn test_check_all_frontends_without_auth_fails_fast_on_unauthenticated_frontend() {
+ let (addr, server) = start_flight_server(RejectUnauthenticatedFlight).await;
+ let client = FrontendClient::from_meta_client(
+ Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)),
+ QueryOptions::default(),
+ BatchingModeOptions::default(),
+ )
+ .unwrap();
+
+ let err = client
+ .check_all_frontends_without_auth(&[Peer {
+ id: 1,
+ addr: addr.clone(),
+ }])
+ .await
+ .unwrap_err();
+ server.abort();
+
+ let Error::InvalidRequest {
+ context, source, ..
+ } = err
+ else {
+ panic!("expected InvalidRequest, got {err:?}");
+ };
+ assert!(context.contains(&addr));
+ assert!(context.contains("rejected unauthenticated flownode probe"));
+ assert_eq!(source.tonic_code(), Some(tonic::Code::Unauthenticated));
+ }
+
+ #[tokio::test]
+ async fn test_check_all_frontends_without_auth_uses_grpc_connection_timeout() {
+ let (addr, server) = start_flight_server(SlowFlight).await;
+ let client = FrontendClient::from_meta_client(
+ Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)),
+ QueryOptions::default(),
+ BatchingModeOptions {
+ grpc_conn_timeout: Duration::from_millis(50),
+ ..Default::default()
+ },
+ )
+ .unwrap();
+
+ let failures = client
+ .check_all_frontends_without_auth(&[Peer {
+ id: 1,
+ addr: addr.clone(),
+ }])
+ .await
+ .unwrap();
+ server.abort();
+
+ assert_eq!(failures.len(), 1);
+ assert!(failures[0].contains(&addr));
+ assert!(failures[0].contains("health check timed out"));
+ }
+
+ #[tokio::test]
+ async fn test_check_all_frontends_without_auth_checks_frontends_concurrently() {
+ let barrier = Arc::new(tokio::sync::Barrier::new(2));
+ let (addr1, server1) = start_flight_server(WaitForConcurrentFlight {
+ barrier: barrier.clone(),
+ })
+ .await;
+ let (addr2, server2) = start_flight_server(WaitForConcurrentFlight { barrier }).await;
+ let client = FrontendClient::from_meta_client(
+ Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)),
+ QueryOptions::default(),
+ BatchingModeOptions {
+ grpc_conn_timeout: Duration::from_millis(500),
+ ..Default::default()
+ },
+ )
+ .unwrap();
+
+ let failures = timeout(
+ Duration::from_secs(2),
+ client.check_all_frontends_without_auth(&[
+ Peer {
+ id: 1,
+ addr: addr1.clone(),
+ },
+ Peer {
+ id: 2,
+ addr: addr2.clone(),
+ },
+ ]),
+ )
+ .await
+ .expect("concurrent probes should complete before per-peer timeouts")
+ .unwrap();
+ server1.abort();
+ server2.abort();
+
+ assert_eq!(failures.len(), 2);
+ assert!(failures.iter().any(|failure| failure.contains(&addr1)));
+ assert!(failures.iter().any(|failure| failure.contains(&addr2)));
+ assert!(
+ failures
+ .iter()
+ .all(|failure| !failure.contains("health check timed out")),
+ "sequential probes would time out before both requests reach the barrier: {failures:?}"
+ );
+ }
}
diff --git a/src/flow/src/engine.rs b/src/flow/src/engine.rs
index d7ac7cdb45..a360a1015d 100644
--- a/src/flow/src/engine.rs
+++ b/src/flow/src/engine.rs
@@ -26,42 +26,6 @@ use crate::Error;
pub type FlowId = u64;
pub type TableName = [String; 3];
-#[derive(Clone)]
-pub struct FlowAuthHeader {
- auth_schema: api::v1::auth_header::AuthScheme,
-}
-
-impl std::fmt::Debug for FlowAuthHeader {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self.auth() {
- api::v1::auth_header::AuthScheme::Basic(basic) => f
- .debug_struct("Basic")
- .field("username", &basic.username)
- .field("password", &"")
- .finish(),
- api::v1::auth_header::AuthScheme::Token(_) => f
- .debug_struct("Token")
- .field("token", &"")
- .finish(),
- }
- }
-}
-
-impl FlowAuthHeader {
- pub fn from_user_pwd(username: &str, pwd: &str) -> Self {
- Self {
- auth_schema: api::v1::auth_header::AuthScheme::Basic(api::v1::Basic {
- username: username.to_string(),
- password: pwd.to_string(),
- }),
- }
- }
-
- pub fn auth(&self) -> &api::v1::auth_header::AuthScheme {
- &self.auth_schema
- }
-}
-
/// The arguments to create a flow
#[derive(Debug, Clone)]
pub struct CreateFlowArgs {
diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs
index 46cfb7c1c3..825c1dc5ac 100644
--- a/src/flow/src/error.rs
+++ b/src/flow/src/error.rs
@@ -165,9 +165,6 @@ pub enum Error {
location: Location,
},
- #[snafu(display("Invalid auth config"))]
- IllegalAuthConfig { source: auth::error::Error },
-
#[snafu(display("Flow plan error: {reason}"))]
Plan {
reason: String,
@@ -358,7 +355,6 @@ impl ErrorExt for Error {
Self::InvalidQuery { .. }
| Self::InvalidRequest { .. }
| Self::ParseAddr { .. }
- | Self::IllegalAuthConfig { .. }
| Self::InvalidClientConfig { .. } => StatusCode::InvalidArguments,
Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(),
diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs
index fe8a760a07..e55e7bab76 100644
--- a/src/flow/src/lib.rs
+++ b/src/flow/src/lib.rs
@@ -44,12 +44,10 @@ mod test_utils;
pub use adapter::flownode_impl::FlowDualEngineRef;
pub use adapter::{FlowConfig, FlowStreamingEngineRef, StreamingEngine};
pub use batching_mode::frontend_client::{FrontendClient, GrpcQueryHandlerWithBoxedError};
-pub use engine::FlowAuthHeader;
pub(crate) use engine::{CreateFlowArgs, FlowId, TableName};
pub use error::{Error, Result};
pub use server::{
FlownodeBuilder, FlownodeInstance, FlownodeServer, FlownodeServiceBuilder, FrontendInvoker,
- get_flow_auth_options,
};
pub use crate::adapter::FlownodeOptions;
diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs
index f3e2f20b80..97a61af90b 100644
--- a/src/flow/src/server.rs
+++ b/src/flow/src/server.rs
@@ -57,14 +57,14 @@ use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef};
use crate::adapter::{FlowStreamingEngineRef, create_worker};
use crate::batching_mode::engine::BatchingEngine;
use crate::error::{
- CacheRequiredSnafu, ExternalSnafu, IllegalAuthConfigSnafu, ListFlowsSnafu, ParseAddrSnafu,
- ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, to_status_with_last_err,
+ CacheRequiredSnafu, ExternalSnafu, ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu,
+ StartServerSnafu, UnexpectedSnafu, to_status_with_last_err,
};
use crate::heartbeat::HeartbeatTask;
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
use crate::transform::register_function_to_query_engine;
use crate::utils::{SizeReportSender, StateReportHandler};
-use crate::{Error, FlowAuthHeader, FlownodeOptions, FrontendClient, StreamingEngine};
+use crate::{Error, FlownodeOptions, FrontendClient, StreamingEngine};
pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
/// wrapping flow node manager to avoid orphan rule with Arc<...>
@@ -193,6 +193,7 @@ impl FlownodeServer {
async fn start_workers(&self) -> Result<(), Error> {
let manager_ref = self.inner.flow_service.dual_engine.clone();
let mut state_report_task_handler = self.inner.state_report_task_handler.lock().await;
+ let started_state_report_task = state_report_task_handler.is_none();
if state_report_task_handler.is_none() {
*state_report_task_handler = manager_ref.clone().start_state_report_task().await;
}
@@ -206,15 +207,39 @@ impl FlownodeServer {
.await
.replace(handle);
- self.inner
+ if let Err(err) = self
+ .inner
.flow_service
.dual_engine
.start_flow_consistent_check_task()
- .await?;
+ .await
+ {
+ self.rollback_started_workers(started_state_report_task)
+ .await;
+ return Err(err);
+ }
Ok(())
}
+ async fn rollback_started_workers(&self, abort_state_report_task: bool) {
+ let tx = self.inner.worker_shutdown_tx.lock().await;
+ if tx.send(()).is_err() {
+ info!("Receiver dropped, the flow node server has already shutdown");
+ }
+ drop(tx);
+
+ if let Some(handle) = self.inner.streaming_task_handler.lock().await.take() {
+ handle.abort();
+ }
+
+ if abort_state_report_task
+ && let Some(handle) = self.inner.state_report_task_handler.lock().await.take()
+ {
+ handle.abort();
+ }
+ }
+
/// Stop the background task for streaming computation.
async fn stop_workers(&self) -> Result<(), Error> {
let tx = self.inner.worker_shutdown_tx.lock().await;
@@ -289,21 +314,6 @@ impl FlownodeInstance {
}
}
-pub fn get_flow_auth_options(fn_opts: &FlownodeOptions) -> Result