feat: flownode grpc client to frontend tls option (#6750)

* feat: flownode grpc client to frontend tls option

Signed-off-by: discord9 <discord9@163.com>

* refactor: client tls option

Signed-off-by: discord9 <discord9@163.com>

* refactor: client_tls to frontend_tls

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-08-15 18:44:27 +08:00
committed by GitHub
parent 351826cd32
commit dfc29eb3b3
9 changed files with 106 additions and 43 deletions

View File

@@ -596,6 +596,11 @@
| `flow.batching_mode.experimental_max_filter_num_per_query` | Integer | `20` | Maximum number of filters allowed in a single query |
| `flow.batching_mode.experimental_time_window_merge_threshold` | Integer | `3` | Time window merge distance |
| `flow.batching_mode.read_preference` | String | `Leader` | Read preference of the Frontend client. |
| `flow.batching_mode.frontend_tls` | -- | -- | -- |
| `flow.batching_mode.frontend_tls.enabled` | Bool | `false` | Whether to enable TLS for client. |
| `flow.batching_mode.frontend_tls.server_ca_cert_path` | String | Unset | Server Certificate file path. |
| `flow.batching_mode.frontend_tls.client_cert_path` | String | Unset | Client Certificate file path. |
| `flow.batching_mode.frontend_tls.client_key_path` | String | Unset | Client Private key file path. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:6800` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:6800` | The address advertised to the metasrv,<br/>and used for connections from outside the host |

View File

@@ -32,6 +32,18 @@ node_id = 14
#+experimental_time_window_merge_threshold=3
## Read preference of the Frontend client.
#+read_preference="Leader"
[flow.batching_mode.frontend_tls]
## Whether to enable TLS for client.
#+enabled=false
## Server Certificate file path.
## @toml2docs:none-default
#+server_ca_cert_path=""
## Client Certificate file path.
## @toml2docs:none-default
#+client_cert_path=""
## Client Private key file path.
## @toml2docs:none-default
#+client_key_path=""
## The gRPC server options.
[grpc]

View File

@@ -376,7 +376,8 @@ impl StartCommand {
flow_auth_header,
opts.query.clone(),
opts.flow.batching_mode.clone(),
);
)
.context(StartFlownodeSnafu)?;
let frontend_client = Arc::new(frontend_client);
let flownode_builder = FlownodeBuilder::new(
opts.clone(),

View File

@@ -21,6 +21,7 @@ use common_telemetry::info;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use tokio_util::sync::CancellationToken;
use tonic::transport::{
@@ -97,6 +98,7 @@ impl ChannelManager {
}
}
/// Read tls cert and key files and create a ChannelManager with TLS config.
pub fn with_tls_config(config: ChannelConfig) -> Result<Self> {
let mut inner = Inner::with_config(config.clone());
@@ -105,20 +107,35 @@ impl ChannelManager {
msg: "no config input",
})?;
let server_root_ca_cert = std::fs::read_to_string(path_config.server_ca_cert_path)
.context(InvalidConfigFilePathSnafu)?;
let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
let client_cert = std::fs::read_to_string(path_config.client_cert_path)
.context(InvalidConfigFilePathSnafu)?;
let client_key = std::fs::read_to_string(path_config.client_key_path)
.context(InvalidConfigFilePathSnafu)?;
let client_identity = Identity::from_pem(client_cert, client_key);
if !path_config.enabled {
// if TLS not enabled, just ignore other tls config
// and not set `client_tls_config` hence not use TLS
return Ok(Self {
inner: Arc::new(inner),
});
}
inner.client_tls_config = Some(
ClientTlsConfig::new()
.ca_certificate(server_root_ca_cert)
.identity(client_identity),
);
let mut tls_config = ClientTlsConfig::new();
if let Some(server_ca) = path_config.server_ca_cert_path {
let server_root_ca_cert =
std::fs::read_to_string(server_ca).context(InvalidConfigFilePathSnafu)?;
let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
tls_config = tls_config.ca_certificate(server_root_ca_cert);
}
if let (Some(client_cert_path), Some(client_key_path)) =
(&path_config.client_cert_path, &path_config.client_key_path)
{
let client_cert =
std::fs::read_to_string(client_cert_path).context(InvalidConfigFilePathSnafu)?;
let client_key =
std::fs::read_to_string(client_key_path).context(InvalidConfigFilePathSnafu)?;
let client_identity = Identity::from_pem(client_cert, client_key);
tls_config = tls_config.identity(client_identity);
}
inner.client_tls_config = Some(tls_config);
Ok(Self {
inner: Arc::new(inner),
@@ -270,11 +287,13 @@ impl ChannelManager {
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ClientTlsOption {
pub server_ca_cert_path: String,
pub client_cert_path: String,
pub client_key_path: String,
/// Whether to enable TLS for client.
pub enabled: bool,
pub server_ca_cert_path: Option<String>,
pub client_cert_path: Option<String>,
pub client_key_path: Option<String>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
@@ -590,9 +609,10 @@ mod tests {
.tcp_keepalive(Duration::from_secs(2))
.tcp_nodelay(false)
.client_tls_config(ClientTlsOption {
server_ca_cert_path: "some_server_path".to_string(),
client_cert_path: "some_cert_path".to_string(),
client_key_path: "some_key_path".to_string(),
enabled: true,
server_ca_cert_path: Some("some_server_path".to_string()),
client_cert_path: Some("some_cert_path".to_string()),
client_key_path: Some("some_key_path".to_string()),
});
assert_eq!(
@@ -610,9 +630,10 @@ mod tests {
tcp_keepalive: Some(Duration::from_secs(2)),
tcp_nodelay: false,
client_tls: Some(ClientTlsOption {
server_ca_cert_path: "some_server_path".to_string(),
client_cert_path: "some_cert_path".to_string(),
client_key_path: "some_key_path".to_string(),
enabled: true,
server_ca_cert_path: Some("some_server_path".to_string()),
client_cert_path: Some("some_cert_path".to_string()),
client_key_path: Some("some_key_path".to_string()),
}),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,

View File

@@ -23,9 +23,10 @@ async fn test_mtls_config() {
// test wrong file
let config = ChannelConfig::new().client_tls_config(ClientTlsOption {
server_ca_cert_path: "tests/tls/wrong_ca.pem".to_string(),
client_cert_path: "tests/tls/wrong_client.pem".to_string(),
client_key_path: "tests/tls/wrong_client.key".to_string(),
enabled: true,
server_ca_cert_path: Some("tests/tls/wrong_ca.pem".to_string()),
client_cert_path: Some("tests/tls/wrong_client.pem".to_string()),
client_key_path: Some("tests/tls/wrong_client.key".to_string()),
});
let re = ChannelManager::with_tls_config(config);
@@ -33,9 +34,10 @@ async fn test_mtls_config() {
// test corrupted file content
let config = ChannelConfig::new().client_tls_config(ClientTlsOption {
server_ca_cert_path: "tests/tls/ca.pem".to_string(),
client_cert_path: "tests/tls/client.pem".to_string(),
client_key_path: "tests/tls/corrupted".to_string(),
enabled: true,
server_ca_cert_path: Some("tests/tls/ca.pem".to_string()),
client_cert_path: Some("tests/tls/client.pem".to_string()),
client_key_path: Some("tests/tls/corrupted".to_string()),
});
let re = ChannelManager::with_tls_config(config).unwrap();
@@ -44,9 +46,10 @@ async fn test_mtls_config() {
// success
let config = ChannelConfig::new().client_tls_config(ClientTlsOption {
server_ca_cert_path: "tests/tls/ca.pem".to_string(),
client_cert_path: "tests/tls/client.pem".to_string(),
client_key_path: "tests/tls/client.key".to_string(),
enabled: true,
server_ca_cert_path: Some("tests/tls/ca.pem".to_string()),
client_cert_path: Some("tests/tls/client.pem".to_string()),
client_key_path: Some("tests/tls/client.key".to_string()),
});
let re = ChannelManager::with_tls_config(config).unwrap();

View File

@@ -16,6 +16,7 @@
use std::time::Duration;
use common_grpc::channel_manager::ClientTlsOption;
use serde::{Deserialize, Serialize};
use session::ReadPreference;
@@ -57,6 +58,8 @@ pub struct BatchingModeOptions {
pub experimental_time_window_merge_threshold: usize,
/// Read preference of the Frontend client.
pub read_preference: ReadPreference,
/// TLS option for client connections to frontends.
pub frontend_tls: Option<ClientTlsOption>,
}
impl Default for BatchingModeOptions {
@@ -72,6 +75,7 @@ impl Default for BatchingModeOptions {
experimental_max_filter_num_per_query: 20,
experimental_time_window_merge_threshold: 3,
read_preference: Default::default(),
frontend_tls: None,
}
}
}

View File

@@ -40,7 +40,10 @@ use session::hints::READ_PREFERENCE_HINT;
use snafu::{OptionExt, ResultExt};
use crate::batching_mode::BatchingModeOptions;
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
use crate::error::{
ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu,
UnexpectedSnafu,
};
use crate::{Error, FlowAuthHeader};
/// Just like [`GrpcQueryHandler`] but use BoxedError
@@ -114,20 +117,25 @@ impl FrontendClient {
auth: Option<FlowAuthHeader>,
query: QueryOptions,
batch_opts: BatchingModeOptions,
) -> Self {
) -> Result<Self, Error> {
common_telemetry::info!("Frontend client build with auth={:?}", auth);
Self::Distributed {
Ok(Self::Distributed {
meta_client,
chnl_mgr: {
let cfg = ChannelConfig::new()
.connect_timeout(batch_opts.grpc_conn_timeout)
.timeout(batch_opts.query_timeout);
ChannelManager::with_config(cfg)
if let Some(tls) = &batch_opts.frontend_tls {
let cfg = cfg.client_tls_config(tls.clone());
ChannelManager::with_tls_config(cfg).context(InvalidClientConfigSnafu)?
} else {
ChannelManager::with_config(cfg)
}
},
auth,
query,
batch_opts,
}
})
}
pub fn from_grpc_handler(

View File

@@ -290,6 +290,13 @@ pub enum Error {
location: Location,
source: operator::error::Error,
},
#[snafu(display("Failed to create channel manager for gRPC client"))]
InvalidClientConfig {
#[snafu(implicit)]
location: Location,
source: common_grpc::error::Error,
},
}
/// the outer message is the full error stack, and inner message in header is the last error message that can be show directly to user
@@ -343,7 +350,8 @@ impl ErrorExt for Error {
Self::InvalidQuery { .. }
| Self::InvalidRequest { .. }
| Self::ParseAddr { .. }
| Self::IllegalAuthConfig { .. } => StatusCode::InvalidArguments,
| Self::IllegalAuthConfig { .. }
| Self::InvalidClientConfig { .. } => StatusCode::InvalidArguments,
Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(),

View File

@@ -882,9 +882,10 @@ pub async fn test_grpc_tls_config(store_type: StorageType) {
let addr = fe_grpc_server.bind_addr().unwrap().to_string();
let mut client_tls = ClientTlsOption {
server_ca_cert_path: ca_path,
client_cert_path,
client_key_path,
enabled: true,
server_ca_cert_path: Some(ca_path),
client_cert_path: Some(client_cert_path),
client_key_path: Some(client_key_path),
};
{
let grpc_client =
@@ -897,7 +898,7 @@ pub async fn test_grpc_tls_config(store_type: StorageType) {
}
// test corrupted client key
{
client_tls.client_key_path = client_corrupted;
client_tls.client_key_path = Some(client_corrupted);
let grpc_client = Client::with_tls_and_urls(vec![addr], client_tls.clone()).unwrap();
let db = Database::new_with_dbname(
format!("{}-{}", DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME),