diff --git a/config/config.md b/config/config.md
index 9b612e902c..5b19d12ceb 100644
--- a/config/config.md
+++ b/config/config.md
@@ -596,6 +596,11 @@
| `flow.batching_mode.experimental_max_filter_num_per_query` | Integer | `20` | Maximum number of filters allowed in a single query |
| `flow.batching_mode.experimental_time_window_merge_threshold` | Integer | `3` | Time window merge distance |
| `flow.batching_mode.read_preference` | String | `Leader` | Read preference of the Frontend client. |
+| `flow.batching_mode.frontend_tls` | -- | -- | -- |
+| `flow.batching_mode.frontend_tls.enabled` | Bool | `false` | Whether to enable TLS for client. |
+| `flow.batching_mode.frontend_tls.server_ca_cert_path` | String | Unset | Server Certificate file path. |
+| `flow.batching_mode.frontend_tls.client_cert_path` | String | Unset | Client Certificate file path. |
+| `flow.batching_mode.frontend_tls.client_key_path` | String | Unset | Client Private key file path. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:6800` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:6800` | The address advertised to the metasrv,
and used for connections from outside the host |
diff --git a/config/flownode.example.toml b/config/flownode.example.toml
index bf2a2365c8..d18fe45be0 100644
--- a/config/flownode.example.toml
+++ b/config/flownode.example.toml
@@ -32,6 +32,18 @@ node_id = 14
#+experimental_time_window_merge_threshold=3
## Read preference of the Frontend client.
#+read_preference="Leader"
+[flow.batching_mode.frontend_tls]
+## Whether to enable TLS for client.
+#+enabled=false
+## Server Certificate file path.
+## @toml2docs:none-default
+#+server_ca_cert_path=""
+## Client Certificate file path.
+## @toml2docs:none-default
+#+client_cert_path=""
+## Client Private key file path.
+## @toml2docs:none-default
+#+client_key_path=""
## The gRPC server options.
[grpc]
diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs
index 55ce5aae0c..5488a2e175 100644
--- a/src/cmd/src/flownode.rs
+++ b/src/cmd/src/flownode.rs
@@ -376,7 +376,8 @@ impl StartCommand {
flow_auth_header,
opts.query.clone(),
opts.flow.batching_mode.clone(),
- );
+ )
+ .context(StartFlownodeSnafu)?;
let frontend_client = Arc::new(frontend_client);
let flownode_builder = FlownodeBuilder::new(
opts.clone(),
diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs
index d7a21c763c..0ee2df822b 100644
--- a/src/common/grpc/src/channel_manager.rs
+++ b/src/common/grpc/src/channel_manager.rs
@@ -21,6 +21,7 @@ use common_telemetry::info;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use lazy_static::lazy_static;
+use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use tokio_util::sync::CancellationToken;
use tonic::transport::{
@@ -97,6 +98,7 @@ impl ChannelManager {
}
}
+ /// Read tls cert and key files and create a ChannelManager with TLS config.
pub fn with_tls_config(config: ChannelConfig) -> Result {
let mut inner = Inner::with_config(config.clone());
@@ -105,20 +107,35 @@ impl ChannelManager {
msg: "no config input",
})?;
- let server_root_ca_cert = std::fs::read_to_string(path_config.server_ca_cert_path)
- .context(InvalidConfigFilePathSnafu)?;
- let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
- let client_cert = std::fs::read_to_string(path_config.client_cert_path)
- .context(InvalidConfigFilePathSnafu)?;
- let client_key = std::fs::read_to_string(path_config.client_key_path)
- .context(InvalidConfigFilePathSnafu)?;
- let client_identity = Identity::from_pem(client_cert, client_key);
+ if !path_config.enabled {
+ // if TLS not enabled, just ignore other tls config
+ // and not set `client_tls_config` hence not use TLS
+ return Ok(Self {
+ inner: Arc::new(inner),
+ });
+ }
- inner.client_tls_config = Some(
- ClientTlsConfig::new()
- .ca_certificate(server_root_ca_cert)
- .identity(client_identity),
- );
+ let mut tls_config = ClientTlsConfig::new();
+
+ if let Some(server_ca) = path_config.server_ca_cert_path {
+ let server_root_ca_cert =
+ std::fs::read_to_string(server_ca).context(InvalidConfigFilePathSnafu)?;
+ let server_root_ca_cert = Certificate::from_pem(server_root_ca_cert);
+ tls_config = tls_config.ca_certificate(server_root_ca_cert);
+ }
+
+ if let (Some(client_cert_path), Some(client_key_path)) =
+ (&path_config.client_cert_path, &path_config.client_key_path)
+ {
+ let client_cert =
+ std::fs::read_to_string(client_cert_path).context(InvalidConfigFilePathSnafu)?;
+ let client_key =
+ std::fs::read_to_string(client_key_path).context(InvalidConfigFilePathSnafu)?;
+ let client_identity = Identity::from_pem(client_cert, client_key);
+ tls_config = tls_config.identity(client_identity);
+ }
+
+ inner.client_tls_config = Some(tls_config);
Ok(Self {
inner: Arc::new(inner),
@@ -270,11 +287,13 @@ impl ChannelManager {
}
}
-#[derive(Clone, Debug, PartialEq, Eq)]
+#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ClientTlsOption {
- pub server_ca_cert_path: String,
- pub client_cert_path: String,
- pub client_key_path: String,
+ /// Whether to enable TLS for client.
+ pub enabled: bool,
+ pub server_ca_cert_path: Option,
+ pub client_cert_path: Option,
+ pub client_key_path: Option,
}
#[derive(Clone, Debug, PartialEq, Eq)]
@@ -590,9 +609,10 @@ mod tests {
.tcp_keepalive(Duration::from_secs(2))
.tcp_nodelay(false)
.client_tls_config(ClientTlsOption {
- server_ca_cert_path: "some_server_path".to_string(),
- client_cert_path: "some_cert_path".to_string(),
- client_key_path: "some_key_path".to_string(),
+ enabled: true,
+ server_ca_cert_path: Some("some_server_path".to_string()),
+ client_cert_path: Some("some_cert_path".to_string()),
+ client_key_path: Some("some_key_path".to_string()),
});
assert_eq!(
@@ -610,9 +630,10 @@ mod tests {
tcp_keepalive: Some(Duration::from_secs(2)),
tcp_nodelay: false,
client_tls: Some(ClientTlsOption {
- server_ca_cert_path: "some_server_path".to_string(),
- client_cert_path: "some_cert_path".to_string(),
- client_key_path: "some_key_path".to_string(),
+ enabled: true,
+ server_ca_cert_path: Some("some_server_path".to_string()),
+ client_cert_path: Some("some_cert_path".to_string()),
+ client_key_path: Some("some_key_path".to_string()),
}),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
diff --git a/src/common/grpc/tests/mod.rs b/src/common/grpc/tests/mod.rs
index 40e5e9541f..d119f22836 100644
--- a/src/common/grpc/tests/mod.rs
+++ b/src/common/grpc/tests/mod.rs
@@ -23,9 +23,10 @@ async fn test_mtls_config() {
// test wrong file
let config = ChannelConfig::new().client_tls_config(ClientTlsOption {
- server_ca_cert_path: "tests/tls/wrong_ca.pem".to_string(),
- client_cert_path: "tests/tls/wrong_client.pem".to_string(),
- client_key_path: "tests/tls/wrong_client.key".to_string(),
+ enabled: true,
+ server_ca_cert_path: Some("tests/tls/wrong_ca.pem".to_string()),
+ client_cert_path: Some("tests/tls/wrong_client.pem".to_string()),
+ client_key_path: Some("tests/tls/wrong_client.key".to_string()),
});
let re = ChannelManager::with_tls_config(config);
@@ -33,9 +34,10 @@ async fn test_mtls_config() {
// test corrupted file content
let config = ChannelConfig::new().client_tls_config(ClientTlsOption {
- server_ca_cert_path: "tests/tls/ca.pem".to_string(),
- client_cert_path: "tests/tls/client.pem".to_string(),
- client_key_path: "tests/tls/corrupted".to_string(),
+ enabled: true,
+ server_ca_cert_path: Some("tests/tls/ca.pem".to_string()),
+ client_cert_path: Some("tests/tls/client.pem".to_string()),
+ client_key_path: Some("tests/tls/corrupted".to_string()),
});
let re = ChannelManager::with_tls_config(config).unwrap();
@@ -44,9 +46,10 @@ async fn test_mtls_config() {
// success
let config = ChannelConfig::new().client_tls_config(ClientTlsOption {
- server_ca_cert_path: "tests/tls/ca.pem".to_string(),
- client_cert_path: "tests/tls/client.pem".to_string(),
- client_key_path: "tests/tls/client.key".to_string(),
+ enabled: true,
+ server_ca_cert_path: Some("tests/tls/ca.pem".to_string()),
+ client_cert_path: Some("tests/tls/client.pem".to_string()),
+ client_key_path: Some("tests/tls/client.key".to_string()),
});
let re = ChannelManager::with_tls_config(config).unwrap();
diff --git a/src/flow/src/batching_mode.rs b/src/flow/src/batching_mode.rs
index 76702a2f73..7817cfb6d0 100644
--- a/src/flow/src/batching_mode.rs
+++ b/src/flow/src/batching_mode.rs
@@ -16,6 +16,7 @@
use std::time::Duration;
+use common_grpc::channel_manager::ClientTlsOption;
use serde::{Deserialize, Serialize};
use session::ReadPreference;
@@ -57,6 +58,8 @@ pub struct BatchingModeOptions {
pub experimental_time_window_merge_threshold: usize,
/// Read preference of the Frontend client.
pub read_preference: ReadPreference,
+ /// TLS option for client connections to frontends.
+ pub frontend_tls: Option,
}
impl Default for BatchingModeOptions {
@@ -72,6 +75,7 @@ impl Default for BatchingModeOptions {
experimental_max_filter_num_per_query: 20,
experimental_time_window_merge_threshold: 3,
read_preference: Default::default(),
+ frontend_tls: None,
}
}
}
diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs
index 1c920b1db5..a91cbe433e 100644
--- a/src/flow/src/batching_mode/frontend_client.rs
+++ b/src/flow/src/batching_mode/frontend_client.rs
@@ -40,7 +40,10 @@ use session::hints::READ_PREFERENCE_HINT;
use snafu::{OptionExt, ResultExt};
use crate::batching_mode::BatchingModeOptions;
-use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
+use crate::error::{
+ ExternalSnafu, InvalidClientConfigSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu,
+ UnexpectedSnafu,
+};
use crate::{Error, FlowAuthHeader};
/// Just like [`GrpcQueryHandler`] but use BoxedError
@@ -114,20 +117,25 @@ impl FrontendClient {
auth: Option,
query: QueryOptions,
batch_opts: BatchingModeOptions,
- ) -> Self {
+ ) -> Result {
common_telemetry::info!("Frontend client build with auth={:?}", auth);
- Self::Distributed {
+ Ok(Self::Distributed {
meta_client,
chnl_mgr: {
let cfg = ChannelConfig::new()
.connect_timeout(batch_opts.grpc_conn_timeout)
.timeout(batch_opts.query_timeout);
- ChannelManager::with_config(cfg)
+ if let Some(tls) = &batch_opts.frontend_tls {
+ let cfg = cfg.client_tls_config(tls.clone());
+ ChannelManager::with_tls_config(cfg).context(InvalidClientConfigSnafu)?
+ } else {
+ ChannelManager::with_config(cfg)
+ }
},
auth,
query,
batch_opts,
- }
+ })
}
pub fn from_grpc_handler(
diff --git a/src/flow/src/error.rs b/src/flow/src/error.rs
index 15b8a53ab4..ad1190eeeb 100644
--- a/src/flow/src/error.rs
+++ b/src/flow/src/error.rs
@@ -290,6 +290,13 @@ pub enum Error {
location: Location,
source: operator::error::Error,
},
+
+ #[snafu(display("Failed to create channel manager for gRPC client"))]
+ InvalidClientConfig {
+ #[snafu(implicit)]
+ location: Location,
+ source: common_grpc::error::Error,
+ },
}
/// the outer message is the full error stack, and inner message in header is the last error message that can be show directly to user
@@ -343,7 +350,8 @@ impl ErrorExt for Error {
Self::InvalidQuery { .. }
| Self::InvalidRequest { .. }
| Self::ParseAddr { .. }
- | Self::IllegalAuthConfig { .. } => StatusCode::InvalidArguments,
+ | Self::IllegalAuthConfig { .. }
+ | Self::InvalidClientConfig { .. } => StatusCode::InvalidArguments,
Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(),
diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs
index 007e424b56..95cbeb70dd 100644
--- a/tests-integration/tests/grpc.rs
+++ b/tests-integration/tests/grpc.rs
@@ -882,9 +882,10 @@ pub async fn test_grpc_tls_config(store_type: StorageType) {
let addr = fe_grpc_server.bind_addr().unwrap().to_string();
let mut client_tls = ClientTlsOption {
- server_ca_cert_path: ca_path,
- client_cert_path,
- client_key_path,
+ enabled: true,
+ server_ca_cert_path: Some(ca_path),
+ client_cert_path: Some(client_cert_path),
+ client_key_path: Some(client_key_path),
};
{
let grpc_client =
@@ -897,7 +898,7 @@ pub async fn test_grpc_tls_config(store_type: StorageType) {
}
// test corrupted client key
{
- client_tls.client_key_path = client_corrupted;
+ client_tls.client_key_path = Some(client_corrupted);
let grpc_client = Client::with_tls_and_urls(vec![addr], client_tls.clone()).unwrap();
let db = Database::new_with_dbname(
format!("{}-{}", DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME),