chore: remove auth in flownode (#8244)

* chore: remove auth in flownode

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update docs

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add flow startup check

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: clippy

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2026-06-08 16:15:19 +08:00
committed by GitHub
parent 0881b7ba32
commit 5fd5b91b29
13 changed files with 350 additions and 117 deletions

3
Cargo.lock generated
View File

@@ -5288,10 +5288,10 @@ version = "1.1.0"
dependencies = [
"api",
"arrow 58.3.0",
"arrow-flight",
"arrow-schema 58.3.0",
"async-recursion",
"async-trait",
"auth",
"bytes",
"cache",
"catalog",
@@ -5356,6 +5356,7 @@ dependencies = [
"substrait 1.1.0",
"table",
"tokio",
"tokio-stream",
"tonic 0.14.2",
]

View File

@@ -619,7 +619,6 @@
| Key | Type | Default | Descriptions |
| --- | -----| ------- | ----------- |
| `node_id` | Integer | Unset | The flownode identifier and should be unique in the cluster. |
| `user_provider` | String | Unset | The user provider for authentication.<br/>Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd" |
| `flow` | -- | -- | flow engine options. |
| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.<br/>Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
| `flow.batching_mode` | -- | -- | -- |

View File

@@ -2,11 +2,6 @@
## @toml2docs:none-default
node_id = 14
## The user provider for authentication.
## Examples: "static_user_provider:file:/path/to/users", "static_user_provider:cmd:greptime_user=greptime_pwd"
## @toml2docs:none-default
#+ user_provider = "static_user_provider:file:/path/to/users"
## flow engine options.
[flow]
## The number of flow worker in flownode.

View File

@@ -39,7 +39,6 @@ use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
use common_version::{short_version, verbose_version};
use flow::{
FlownodeBuilder, FlownodeInstance, FlownodeServiceBuilder, FrontendClient, FrontendInvoker,
get_flow_auth_options,
};
use meta_client::{MetaClientOptions, MetaClientType};
use plugins::flownode::context::GrpcConfigureContext;
@@ -167,9 +166,6 @@ struct StartCommand {
/// HTTP request timeout in seconds.
#[clap(long)]
http_timeout: Option<u64>,
/// User Provider cfg, for auth, currently only support static user provider
#[clap(long)]
user_provider: Option<String>,
}
impl StartCommand {
@@ -241,10 +237,6 @@ impl StartCommand {
opts.http.timeout = Duration::from_secs(http_timeout);
}
if let Some(user_provider) = &self.user_provider {
opts.user_provider = Some(user_provider.clone());
}
ensure!(
opts.node_id.is_some(),
MissingConfigSnafu {
@@ -367,10 +359,8 @@ impl StartCommand {
);
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
let flow_auth_header = get_flow_auth_options(&opts).context(StartFlownodeSnafu)?;
let frontend_client = FrontendClient::from_meta_client(
meta_client.clone(),
flow_auth_header,
opts.query.clone(),
opts.flow.batching_mode.clone(),
)
@@ -498,5 +488,18 @@ mod tests {
assert!(!help.contains("--rpc-server-addr"));
assert!(!help.contains("--rpc-addr"));
assert!(!help.contains("--rpc-hostname"));
assert!(!help.contains("--user-provider"));
}
#[test]
fn test_user_provider_cli_option_is_removed() {
let command = StartCommand::try_parse_from([
"flownode",
"--node-id",
"14",
"--user-provider",
"static_user_provider:cmd:test=test",
]);
assert!(command.is_err());
}
}

View File

@@ -250,7 +250,6 @@ fn test_load_flownode_example_config() {
addr: "127.0.0.1:4000".to_string(),
..Default::default()
},
user_provider: None,
memory: Default::default(),
},
..Default::default()

View File

@@ -13,7 +13,6 @@ arrow.workspace = true
arrow-schema.workspace = true
async-recursion = "1.0"
async-trait.workspace = true
auth.workspace = true
bytes.workspace = true
cache.workspace = true
catalog.workspace = true
@@ -79,6 +78,7 @@ tokio.workspace = true
tonic.workspace = true
[dev-dependencies]
arrow-flight.workspace = true
catalog = { workspace = true, features = ["testing"] }
common-catalog.workspace = true
pretty_assertions.workspace = true
@@ -86,3 +86,4 @@ prost.workspace = true
query.workspace = true
session.workspace = true
table.workspace = true
tokio-stream = { workspace = true, features = ["net"] }

View File

@@ -110,7 +110,6 @@ pub struct FlownodeOptions {
pub logging: LoggingOptions,
pub tracing: TracingOptions,
pub query: QueryOptions,
pub user_provider: Option<String>,
pub memory: MemoryOptions,
}
@@ -131,7 +130,6 @@ impl Default for FlownodeOptions {
allow_query_fallback: false,
memory_pool_size: MemoryLimit::default(),
},
user_provider: None,
memory: MemoryOptions::default(),
}
}

View File

@@ -198,9 +198,9 @@ impl FlowDualEngine {
}
}
/// In distributed mode, scan periodically(1s) until available frontend is found, or timeout,
/// in standalone mode, return immediately
/// notice here if any frontend appear in cluster info this function will return immediately
/// In distributed mode, scan periodically(1s) until all advertised frontends
/// accept unauthenticated queries, or timeout. In standalone mode, return
/// immediately.
async fn wait_for_available_frontend(&self, timeout: std::time::Duration) -> Result<(), Error> {
if !self.is_distributed() {
return Ok(());
@@ -215,8 +215,20 @@ impl FlowDualEngine {
.iter()
.map(|peer| &peer.addr)
.collect::<Vec<_>>();
info!("Available frontend found: {:?}", fe_list);
return Ok(());
let probe_failures = frontend_client
.check_all_frontends_without_auth(&frontend_list)
.await?;
if probe_failures.is_empty() {
info!(
"Available frontend found and unauthenticated probe succeeded: {:?}",
fe_list
);
return Ok(());
}
warn!(
"Unauthenticated frontend probe failed, will retry. frontends={:?}, failures={:?}",
fe_list, probe_failures
);
}
let elapsed = now.elapsed();
tokio::time::sleep(sleep_duration).await;
@@ -224,7 +236,7 @@ impl FlowDualEngine {
if elapsed >= timeout {
return NoAvailableFrontendSnafu {
timeout,
context: "No available frontend found in cluster info",
context: "No frontend accepted unauthenticated flownode probe",
}
.fail();
}
@@ -499,19 +511,14 @@ impl ConsistentCheckTask {
.batching_engine()
.batch_opts
.experimental_frontend_scan_timeout;
engine
.wait_for_available_frontend(frontend_scan_timeout)
.await?;
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let (trigger_tx, mut trigger_rx) =
tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
let handle = common_runtime::spawn_global(async move {
// first check if available frontend is found
if let Err(err) = engine
.wait_for_available_frontend(frontend_scan_timeout)
.await
{
warn!("No frontend is available yet:\n {err:?}");
}
// then do recover flows, if failed, always retry
// Recover flows after the startup frontend probe succeeds.
let mut recover_retry = 0;
while let Err(err) = engine.check_flow_consistent(true, false).await {
recover_retry += 1;

View File

@@ -20,13 +20,14 @@ use std::sync::{Arc, Mutex, Weak};
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{CreateTableExpr, QueryRequest};
use client::{Client, Database, OutputWithMetrics};
use client::{Client, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, Database, OutputWithMetrics};
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config};
use common_meta::peer::{Peer, PeerDiscovery};
use common_query::{Output, OutputData};
use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
use common_telemetry::warn;
use futures::stream::{FuturesUnordered, StreamExt};
use meta_client::client::MetaClient;
use query::datafusion::QUERY_PARALLELISM_HINT;
use query::metrics::terminal_recordbatch_metrics_from_plan;
@@ -39,12 +40,12 @@ use session::hints::READ_PREFERENCE_HINT;
use snafu::{OptionExt, ResultExt};
use tokio::sync::SetOnce;
use crate::Error;
use crate::batching_mode::BatchingModeOptions;
use crate::error::{
CreateSinkTableSnafu, ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu,
NoAvailableFrontendSnafu, UnexpectedSnafu,
};
use crate::{Error, FlowAuthHeader};
/// Adapter trait for [`GrpcQueryHandler`] that boxes the underlying error into [`BoxedError`].
///
@@ -92,7 +93,6 @@ pub enum FrontendClient {
Distributed {
meta_client: Arc<MetaClient>,
chnl_mgr: ChannelManager,
auth: Option<FlowAuthHeader>,
query: QueryOptions,
batch_opts: BatchingModeOptions,
},
@@ -133,11 +133,10 @@ impl FrontendClient {
pub fn from_meta_client(
meta_client: Arc<MetaClient>,
auth: Option<FlowAuthHeader>,
query: QueryOptions,
batch_opts: BatchingModeOptions,
) -> Result<Self, Error> {
common_telemetry::info!("Frontend client build with auth={:?}", auth);
common_telemetry::info!("Frontend client build without auth");
Ok(Self::Distributed {
meta_client,
chnl_mgr: {
@@ -149,7 +148,6 @@ impl FrontendClient {
.context(InvalidClientConfigSnafu)?;
ChannelManager::with_config(cfg, tls_config)
},
auth,
query,
batch_opts,
})
@@ -212,6 +210,63 @@ impl FrontendClient {
.context(ExternalSnafu)
}
/// Probes all discovered frontends without auth.
///
/// Returns non-auth failures to allow callers to retry transient connectivity
/// errors. Authentication failures are returned immediately because they mean
/// a frontend advertised an auth-protected endpoint to flownodes.
pub(crate) async fn check_all_frontends_without_auth(
&self,
frontends: &[Peer],
) -> Result<Vec<String>, Error> {
let Self::Distributed {
chnl_mgr,
batch_opts,
..
} = self
else {
return Ok(vec![]);
};
let probe_timeout = batch_opts.grpc_conn_timeout;
let mut probes = frontends
.iter()
.map(|peer| {
let addr = peer.addr.clone();
let chnl_mgr = chnl_mgr.clone();
async move {
let client = Client::with_manager_and_urls(chnl_mgr, vec![addr.clone()]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
match tokio::time::timeout(probe_timeout, database.sql("SELECT 1")).await {
Ok(Ok(_)) => Ok(None),
Ok(Err(err)) if err.tonic_code() == Some(tonic::Code::Unauthenticated) => {
Err(err).context(InvalidRequestSnafu {
context: format!(
"Frontend {addr} rejected unauthenticated flownode probe; ensure frontend internal_grpc is advertised to metasrv"
),
})
}
Ok(Err(err)) => Ok(Some(format!("{addr}: {err}"))),
Err(_) => Ok(Some(format!(
"{addr}: health check timed out after {probe_timeout:?}"
))),
}
}
})
.collect::<FuturesUnordered<_>>();
let mut failures = Vec::new();
while let Some(probe_result) = probes.next().await {
if let Some(failure) = probe_result? {
failures.push(failure);
}
}
Ok(failures)
}
/// Get a frontend discovered by metasrv and verified with a query probe.
async fn get_random_active_frontend(
&self,
@@ -221,7 +276,6 @@ impl FrontendClient {
let Self::Distributed {
meta_client: _,
chnl_mgr,
auth,
query: _,
batch_opts,
} = self
@@ -242,13 +296,7 @@ impl FrontendClient {
for peer in frontends {
let addr = peer.addr.clone();
let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
let database = {
let mut db = Database::new(catalog, schema, client);
if let Some(auth) = auth {
db.set_auth(auth.auth().clone());
}
db
};
let database = Database::new(catalog, schema, client);
let db = DatabaseWithPeer::new(database, peer);
match db.try_select_one().await {
Ok(_) => return Ok(db),
@@ -591,6 +639,8 @@ mod tests {
use std::task::{Context, Poll};
use std::time::Duration;
use arrow_flight::flight_service_server::FlightServiceServer;
use arrow_flight::{FlightData, Ticket};
use common_query::{Output, OutputData};
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream};
@@ -598,7 +648,12 @@ mod tests {
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
use futures::StreamExt;
use servers::grpc::flight::{FlightCraft, FlightCraftWrapper, TonicStream};
use tokio::net::TcpListener;
use tokio::task::JoinHandle;
use tokio::time::timeout;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::{Request as TonicRequest, Response as TonicResponse, Status};
use super::*;
@@ -657,6 +712,16 @@ mod tests {
#[derive(Debug)]
struct SnapshotBindingHandler;
#[derive(Debug)]
struct RejectUnauthenticatedFlight;
#[derive(Debug)]
struct SlowFlight;
struct WaitForConcurrentFlight {
barrier: Arc<tokio::sync::Barrier>,
}
#[async_trait::async_trait]
impl GrpcQueryHandlerWithBoxedError for NoopHandler {
async fn do_query(
@@ -725,6 +790,54 @@ mod tests {
}
}
#[async_trait::async_trait]
impl FlightCraft for RejectUnauthenticatedFlight {
async fn do_get(
&self,
_request: TonicRequest<Ticket>,
) -> std::result::Result<TonicResponse<TonicStream<FlightData>>, Status> {
Err(Status::unauthenticated("auth failed"))
}
}
#[async_trait::async_trait]
impl FlightCraft for SlowFlight {
async fn do_get(
&self,
_request: TonicRequest<Ticket>,
) -> std::result::Result<TonicResponse<TonicStream<FlightData>>, Status> {
tokio::time::sleep(Duration::from_secs(60)).await;
Err(Status::unavailable("slow response"))
}
}
#[async_trait::async_trait]
impl FlightCraft for WaitForConcurrentFlight {
async fn do_get(
&self,
_request: TonicRequest<Ticket>,
) -> std::result::Result<TonicResponse<TonicStream<FlightData>>, Status> {
self.barrier.wait().await;
Err(Status::unavailable("probe started concurrently"))
}
}
async fn start_flight_server<T: FlightCraft>(handler: T) -> (String, JoinHandle<()>) {
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("bind test flight server");
let addr = listener.local_addr().expect("local addr").to_string();
let server = tokio::spawn(async move {
tonic::transport::Server::builder()
.add_service(FlightServiceServer::new(FlightCraftWrapper(handler)))
.serve_with_incoming(TcpListenerStream::new(listener))
.await
.expect("serve test flight server");
});
(addr, server)
}
#[tokio::test]
async fn wait_initialized() {
let (client, handler_mut) =
@@ -759,7 +872,6 @@ mod tests {
let meta_client = Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend));
let client = FrontendClient::from_meta_client(
meta_client,
None,
QueryOptions::default(),
BatchingModeOptions::default(),
)
@@ -883,4 +995,109 @@ mod tests {
assert!(format!("{err:?}").contains("Invalid value for flow.return_region_seq"));
}
#[tokio::test]
async fn test_check_all_frontends_without_auth_fails_fast_on_unauthenticated_frontend() {
let (addr, server) = start_flight_server(RejectUnauthenticatedFlight).await;
let client = FrontendClient::from_meta_client(
Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)),
QueryOptions::default(),
BatchingModeOptions::default(),
)
.unwrap();
let err = client
.check_all_frontends_without_auth(&[Peer {
id: 1,
addr: addr.clone(),
}])
.await
.unwrap_err();
server.abort();
let Error::InvalidRequest {
context, source, ..
} = err
else {
panic!("expected InvalidRequest, got {err:?}");
};
assert!(context.contains(&addr));
assert!(context.contains("rejected unauthenticated flownode probe"));
assert_eq!(source.tonic_code(), Some(tonic::Code::Unauthenticated));
}
#[tokio::test]
async fn test_check_all_frontends_without_auth_uses_grpc_connection_timeout() {
let (addr, server) = start_flight_server(SlowFlight).await;
let client = FrontendClient::from_meta_client(
Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)),
QueryOptions::default(),
BatchingModeOptions {
grpc_conn_timeout: Duration::from_millis(50),
..Default::default()
},
)
.unwrap();
let failures = client
.check_all_frontends_without_auth(&[Peer {
id: 1,
addr: addr.clone(),
}])
.await
.unwrap();
server.abort();
assert_eq!(failures.len(), 1);
assert!(failures[0].contains(&addr));
assert!(failures[0].contains("health check timed out"));
}
#[tokio::test]
async fn test_check_all_frontends_without_auth_checks_frontends_concurrently() {
let barrier = Arc::new(tokio::sync::Barrier::new(2));
let (addr1, server1) = start_flight_server(WaitForConcurrentFlight {
barrier: barrier.clone(),
})
.await;
let (addr2, server2) = start_flight_server(WaitForConcurrentFlight { barrier }).await;
let client = FrontendClient::from_meta_client(
Arc::new(MetaClient::new(0, api::v1::meta::Role::Frontend)),
QueryOptions::default(),
BatchingModeOptions {
grpc_conn_timeout: Duration::from_millis(500),
..Default::default()
},
)
.unwrap();
let failures = timeout(
Duration::from_secs(2),
client.check_all_frontends_without_auth(&[
Peer {
id: 1,
addr: addr1.clone(),
},
Peer {
id: 2,
addr: addr2.clone(),
},
]),
)
.await
.expect("concurrent probes should complete before per-peer timeouts")
.unwrap();
server1.abort();
server2.abort();
assert_eq!(failures.len(), 2);
assert!(failures.iter().any(|failure| failure.contains(&addr1)));
assert!(failures.iter().any(|failure| failure.contains(&addr2)));
assert!(
failures
.iter()
.all(|failure| !failure.contains("health check timed out")),
"sequential probes would time out before both requests reach the barrier: {failures:?}"
);
}
}

View File

@@ -26,42 +26,6 @@ use crate::Error;
pub type FlowId = u64;
pub type TableName = [String; 3];
#[derive(Clone)]
pub struct FlowAuthHeader {
auth_schema: api::v1::auth_header::AuthScheme,
}
impl std::fmt::Debug for FlowAuthHeader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.auth() {
api::v1::auth_header::AuthScheme::Basic(basic) => f
.debug_struct("Basic")
.field("username", &basic.username)
.field("password", &"<RETRACTED>")
.finish(),
api::v1::auth_header::AuthScheme::Token(_) => f
.debug_struct("Token")
.field("token", &"<RETRACTED>")
.finish(),
}
}
}
impl FlowAuthHeader {
pub fn from_user_pwd(username: &str, pwd: &str) -> Self {
Self {
auth_schema: api::v1::auth_header::AuthScheme::Basic(api::v1::Basic {
username: username.to_string(),
password: pwd.to_string(),
}),
}
}
pub fn auth(&self) -> &api::v1::auth_header::AuthScheme {
&self.auth_schema
}
}
/// The arguments to create a flow
#[derive(Debug, Clone)]
pub struct CreateFlowArgs {

View File

@@ -165,9 +165,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Invalid auth config"))]
IllegalAuthConfig { source: auth::error::Error },
#[snafu(display("Flow plan error: {reason}"))]
Plan {
reason: String,
@@ -358,7 +355,6 @@ impl ErrorExt for Error {
Self::InvalidQuery { .. }
| Self::InvalidRequest { .. }
| Self::ParseAddr { .. }
| Self::IllegalAuthConfig { .. }
| Self::InvalidClientConfig { .. } => StatusCode::InvalidArguments,
Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(),

View File

@@ -44,12 +44,10 @@ mod test_utils;
pub use adapter::flownode_impl::FlowDualEngineRef;
pub use adapter::{FlowConfig, FlowStreamingEngineRef, StreamingEngine};
pub use batching_mode::frontend_client::{FrontendClient, GrpcQueryHandlerWithBoxedError};
pub use engine::FlowAuthHeader;
pub(crate) use engine::{CreateFlowArgs, FlowId, TableName};
pub use error::{Error, Result};
pub use server::{
FlownodeBuilder, FlownodeInstance, FlownodeServer, FlownodeServiceBuilder, FrontendInvoker,
get_flow_auth_options,
};
pub use crate::adapter::FlownodeOptions;

View File

@@ -57,14 +57,14 @@ use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef};
use crate::adapter::{FlowStreamingEngineRef, create_worker};
use crate::batching_mode::engine::BatchingEngine;
use crate::error::{
CacheRequiredSnafu, ExternalSnafu, IllegalAuthConfigSnafu, ListFlowsSnafu, ParseAddrSnafu,
ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu, to_status_with_last_err,
CacheRequiredSnafu, ExternalSnafu, ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu,
StartServerSnafu, UnexpectedSnafu, to_status_with_last_err,
};
use crate::heartbeat::HeartbeatTask;
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
use crate::transform::register_function_to_query_engine;
use crate::utils::{SizeReportSender, StateReportHandler};
use crate::{Error, FlowAuthHeader, FlownodeOptions, FrontendClient, StreamingEngine};
use crate::{Error, FlownodeOptions, FrontendClient, StreamingEngine};
pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
/// wrapping flow node manager to avoid orphan rule with Arc<...>
@@ -193,6 +193,7 @@ impl FlownodeServer {
async fn start_workers(&self) -> Result<(), Error> {
let manager_ref = self.inner.flow_service.dual_engine.clone();
let mut state_report_task_handler = self.inner.state_report_task_handler.lock().await;
let started_state_report_task = state_report_task_handler.is_none();
if state_report_task_handler.is_none() {
*state_report_task_handler = manager_ref.clone().start_state_report_task().await;
}
@@ -206,15 +207,39 @@ impl FlownodeServer {
.await
.replace(handle);
self.inner
if let Err(err) = self
.inner
.flow_service
.dual_engine
.start_flow_consistent_check_task()
.await?;
.await
{
self.rollback_started_workers(started_state_report_task)
.await;
return Err(err);
}
Ok(())
}
async fn rollback_started_workers(&self, abort_state_report_task: bool) {
let tx = self.inner.worker_shutdown_tx.lock().await;
if tx.send(()).is_err() {
info!("Receiver dropped, the flow node server has already shutdown");
}
drop(tx);
if let Some(handle) = self.inner.streaming_task_handler.lock().await.take() {
handle.abort();
}
if abort_state_report_task
&& let Some(handle) = self.inner.state_report_task_handler.lock().await.take()
{
handle.abort();
}
}
/// Stop the background task for streaming computation.
async fn stop_workers(&self) -> Result<(), Error> {
let tx = self.inner.worker_shutdown_tx.lock().await;
@@ -289,21 +314,6 @@ impl FlownodeInstance {
}
}
pub fn get_flow_auth_options(fn_opts: &FlownodeOptions) -> Result<Option<FlowAuthHeader>, Error> {
if let Some(user_provider) = fn_opts.user_provider.as_ref() {
let static_provider = auth::static_user_provider_from_option(user_provider)
.context(IllegalAuthConfigSnafu)?;
let (usr, pwd) = static_provider
.get_one_user_pwd()
.context(IllegalAuthConfigSnafu)?;
let auth_header = FlowAuthHeader::from_user_pwd(&usr, &pwd);
return Ok(Some(auth_header));
}
Ok(None)
}
/// [`FlownodeInstance`] Builder
pub struct FlownodeBuilder {
opts: FlownodeOptions,
@@ -708,11 +718,13 @@ mod tests {
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::Role;
use catalog::memory::new_memory_catalog_manager;
use common_base::Plugins;
use common_meta::key::TableMetadataManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use meta_client::client::MetaClient;
use query::options::QueryOptions;
use super::*;
@@ -722,6 +734,22 @@ mod tests {
use crate::utils::SizeReportSender;
async fn new_test_flownode_server() -> (FlownodeServer, SizeReportSender) {
let (frontend_client, _handler) =
FrontendClient::from_empty_grpc_handler(QueryOptions::default());
new_test_flownode_server_with_frontend_client(
frontend_client,
BatchingModeOptions::default(),
None,
)
.await
}
async fn new_test_flownode_server_with_frontend_client(
frontend_client: FrontendClient,
batching_opts: BatchingModeOptions,
node_id: Option<u32>,
) -> (FlownodeServer, SizeReportSender) {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_meta = Arc::new(TableMetadataManager::new(kv_backend.clone()));
table_meta.init().await.unwrap();
@@ -730,19 +758,17 @@ mod tests {
let query_engine = crate::test_utils::create_test_query_engine();
let streaming_engine = Arc::new(StreamingEngine::new(
None,
node_id,
query_engine.clone(),
table_meta.clone(),
));
let (frontend_client, _handler) =
FrontendClient::from_empty_grpc_handler(QueryOptions::default());
let batching_engine = Arc::new(BatchingEngine::new(
Arc::new(frontend_client),
query_engine,
flow_meta.clone(),
table_meta,
catalog_manager.clone(),
BatchingModeOptions::default(),
batching_opts,
));
let dual_engine = Arc::new(FlowDualEngine::new(
streaming_engine,
@@ -774,4 +800,33 @@ mod tests {
server.stop_workers().await.unwrap();
}
#[tokio::test]
async fn test_start_workers_rolls_back_on_check_task_start_failure() {
let batching_opts = BatchingModeOptions {
experimental_frontend_scan_timeout: Duration::from_millis(1),
..Default::default()
};
let frontend_client = FrontendClient::from_meta_client(
Arc::new(MetaClient::new(0, Role::Frontend)),
QueryOptions::default(),
batching_opts.clone(),
)
.unwrap();
let (server, _report_sender) =
new_test_flownode_server_with_frontend_client(frontend_client, batching_opts, Some(1))
.await;
server.start_workers().await.unwrap_err();
assert!(server.inner.streaming_task_handler.lock().await.is_none());
assert!(
server
.inner
.state_report_task_handler
.lock()
.await
.is_none()
);
}
}