diff --git a/Cargo.lock b/Cargo.lock
index 68d36dee5c..9b5a7296e6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -11571,7 +11571,6 @@ dependencies = [
"headers",
"hostname 0.3.1",
"http 1.3.1",
- "http-body 1.0.1",
"humantime",
"humantime-serde",
"hyper 1.6.0",
diff --git a/config/config.md b/config/config.md
index ab3e1255ee..58c538d4ba 100644
--- a/config/config.md
+++ b/config/config.md
@@ -31,6 +31,7 @@
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
+| `grpc.max_connection_age` | String | Unset | The maximum connection age for gRPC connection.
The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour.
Refer to https://grpc.io/docs/guides/keepalive/ for more details. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
@@ -242,6 +243,7 @@
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.
If left empty or unset, the server will automatically use the IP address of the first network interface
on the host, with the same port number as the one specified in `grpc.bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:
- `none`: disable all compression
- `transport`: only enable gRPC transport compression (zstd)
- `arrow_ipc`: only enable Arrow IPC compression (lz4)
- `all`: enable all compression.
Default to `none` |
+| `grpc.max_connection_age` | String | Unset | The maximum connection age for gRPC connection.
The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour.
Refer to https://grpc.io/docs/guides/keepalive/ for more details. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
diff --git a/config/frontend.example.toml b/config/frontend.example.toml
index 933e82e431..b26d88323e 100644
--- a/config/frontend.example.toml
+++ b/config/frontend.example.toml
@@ -61,6 +61,11 @@ runtime_size = 8
## - `all`: enable all compression.
## Default to `none`
flight_compression = "arrow_ipc"
+## The maximum connection age for gRPC connection.
+## The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour.
+## Refer to https://grpc.io/docs/guides/keepalive/ for more details.
+## @toml2docs:none-default
+#+ max_connection_age = "10m"
## gRPC server TLS options, see `mysql.tls` section.
[grpc.tls]
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index aef9cd0077..72df1a4184 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -56,6 +56,11 @@ prom_validation_mode = "strict"
bind_addr = "127.0.0.1:4001"
## The number of server worker threads.
runtime_size = 8
+## The maximum connection age for gRPC connection.
+## The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour.
+## Refer to https://grpc.io/docs/guides/keepalive/ for more details.
+## @toml2docs:none-default
+#+ max_connection_age = "10m"
## gRPC server TLS options, see `mysql.tls` section.
[grpc.tls]
diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs
index 827fd017a1..b92cf9631d 100644
--- a/src/cmd/tests/load_config_test.rs
+++ b/src/cmd/tests/load_config_test.rs
@@ -143,9 +143,11 @@ fn test_load_frontend_example_config() {
remote_write: Some(Default::default()),
..Default::default()
},
- grpc: GrpcOptions::default()
- .with_bind_addr("127.0.0.1:4001")
- .with_server_addr("127.0.0.1:4001"),
+ grpc: GrpcOptions {
+ bind_addr: "127.0.0.1:4001".to_string(),
+ server_addr: "127.0.0.1:4001".to_string(),
+ ..Default::default()
+ },
internal_grpc: Some(GrpcOptions::internal_default()),
http: HttpOptions {
cors_allowed_origins: vec!["https://example.com".to_string()],
diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs
index cda2391c55..3f46203ba0 100644
--- a/src/flow/src/server.rs
+++ b/src/flow/src/server.rs
@@ -491,6 +491,7 @@ impl<'a> FlownodeServiceBuilder<'a> {
max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize,
tls: opts.grpc.tls.clone(),
+ max_connection_age: opts.grpc.max_connection_age,
};
let service = flownode_server.create_flow_service();
let runtime = common_runtime::global_runtime();
diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs
index 6c19109ab2..344b5f386e 100644
--- a/src/frontend/src/server.rs
+++ b/src/frontend/src/server.rs
@@ -68,7 +68,7 @@ where
}
}
- pub fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result {
+ fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result {
let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
.with_tls_config(opts.tls.clone())
.context(error::InvalidTlsConfigSnafu)?;
diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml
index 89d05490ed..5203fb2a13 100644
--- a/src/servers/Cargo.toml
+++ b/src/servers/Cargo.toml
@@ -64,7 +64,6 @@ futures-util.workspace = true
headers = "0.4"
hostname = "0.3"
http.workspace = true
-http-body = "1"
humantime.workspace = true
humantime-serde.workspace = true
hyper = { workspace = true, features = ["full"] }
diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs
index 38ba40f056..2f759db2a0 100644
--- a/src/servers/src/grpc.rs
+++ b/src/servers/src/grpc.rs
@@ -23,6 +23,7 @@ pub mod prom_query_gateway;
pub mod region_server;
use std::net::SocketAddr;
+use std::time::Duration;
use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
use api::v1::{HealthCheckRequest, HealthCheckResponse};
@@ -71,6 +72,11 @@ pub struct GrpcOptions {
pub runtime_size: usize,
#[serde(default = "Default::default")]
pub tls: TlsOption,
+ /// Maximum time that a channel may exist.
+ /// Useful when the server wants to control the reconnection of its clients.
+ /// Default to `None`, means infinite.
+ #[serde(with = "humantime_serde")]
+ pub max_connection_age: Option,
}
impl GrpcOptions {
@@ -111,6 +117,7 @@ impl GrpcOptions {
max_recv_message_size: self.max_recv_message_size.as_bytes() as usize,
max_send_message_size: self.max_send_message_size.as_bytes() as usize,
tls: self.tls.clone(),
+ max_connection_age: self.max_connection_age,
}
}
}
@@ -130,6 +137,7 @@ impl Default for GrpcOptions {
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8,
tls: TlsOption::default(),
+ max_connection_age: None,
}
}
}
@@ -148,6 +156,7 @@ impl GrpcOptions {
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8,
tls: TlsOption::default(),
+ max_connection_age: None,
}
}
@@ -207,6 +216,7 @@ pub struct GrpcServer {
>,
bind_addr: Option,
name: Option,
+ config: GrpcServerConfig,
}
/// Grpc Server configuration
@@ -217,6 +227,10 @@ pub struct GrpcServerConfig {
// Max gRPC sending(encoding) message size
pub max_send_message_size: usize,
pub tls: TlsOption,
+ /// Maximum time that a channel may exist.
+ /// Useful when the server wants to control the reconnection of its clients.
+ /// Default to `None`, means infinite.
+ pub max_connection_age: Option,
}
impl Default for GrpcServerConfig {
@@ -225,6 +239,7 @@ impl Default for GrpcServerConfig {
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
tls: TlsOption::default(),
+ max_connection_age: None,
}
}
}
@@ -333,6 +348,10 @@ impl Server for GrpcServer {
builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?;
}
+ if let Some(max_connection_age) = self.config.max_connection_age {
+ builder = builder.max_connection_age(max_connection_age);
+ }
+
let mut builder = builder
.add_routes(routes)
.add_service(self.create_healthcheck_service())
diff --git a/src/servers/src/grpc/builder.rs b/src/servers/src/grpc/builder.rs
index 370b1f6481..75a0bb13c3 100644
--- a/src/servers/src/grpc/builder.rs
+++ b/src/servers/src/grpc/builder.rs
@@ -197,6 +197,7 @@ impl GrpcServerBuilder {
otel_arrow_service: Mutex::new(self.otel_arrow_service),
bind_addr: None,
name: self.name,
+ config: self.config,
}
}
}
diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml
index 3112ff866e..13e4cc3115 100644
--- a/tests-integration/Cargo.toml
+++ b/tests-integration/Cargo.toml
@@ -47,6 +47,7 @@ flow.workspace = true
frontend = { workspace = true, features = ["testing"] }
futures.workspace = true
futures-util.workspace = true
+http.workspace = true
hyper-util = { workspace = true, features = ["tokio"] }
log-query = { workspace = true }
loki-proto.workspace = true
diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs
index 6a5efda46a..b5a130137c 100644
--- a/tests-integration/src/grpc.rs
+++ b/tests-integration/src/grpc.rs
@@ -13,6 +13,7 @@
// limitations under the License.
mod flight;
+mod network;
use api::v1::QueryRequest;
use api::v1::greptime_request::Request;
diff --git a/tests-integration/src/grpc/network.rs b/tests-integration/src/grpc/network.rs
new file mode 100644
index 0000000000..625597cc59
--- /dev/null
+++ b/tests-integration/src/grpc/network.rs
@@ -0,0 +1,196 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::io;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use futures_util::future::BoxFuture;
+use http::Uri;
+use hyper_util::rt::TokioIo;
+use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
+use tokio::net::TcpStream;
+use tokio::sync::mpsc;
+use tower::Service;
+
+struct NetworkTrafficMonitorableConnector {
+ interested_tx: mpsc::Sender,
+}
+
+impl Service for NetworkTrafficMonitorableConnector {
+ type Response = TokioIo;
+ type Error = String;
+ type Future = BoxFuture<'static, Result>;
+
+ fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, uri: Uri) -> Self::Future {
+ let frame_types = self.interested_tx.clone();
+
+ Box::pin(async move {
+ let addr = format!(
+ "{}:{}",
+ uri.host().unwrap_or("localhost"),
+ uri.port_u16().unwrap_or(4001),
+ );
+ let inner = TcpStream::connect(addr).await.map_err(|e| e.to_string())?;
+ Ok(TokioIo::new(CollectGrpcResponseFrameTypeStream {
+ inner,
+ frame_types,
+ }))
+ })
+ }
+}
+
+struct CollectGrpcResponseFrameTypeStream {
+ inner: TcpStream,
+ frame_types: mpsc::Sender,
+}
+
+impl AsyncRead for CollectGrpcResponseFrameTypeStream {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut ReadBuf<'_>,
+ ) -> Poll> {
+ let before_len = buf.filled().len();
+
+ let result = Pin::new(&mut self.inner).poll_read(cx, buf);
+ if let Poll::Ready(Ok(())) = &result {
+ let after_len = buf.filled().len();
+
+ let new_data = &buf.filled()[before_len..after_len];
+ if let Some(frame_type) = maybe_decode_frame_type(new_data)
+ && let Err(_) = self.frame_types.try_send(frame_type.to_string())
+ {
+ return Poll::Ready(Err(io::Error::other("interested party has gone")));
+ }
+ }
+ result
+ }
+}
+
+fn maybe_decode_frame_type(data: &[u8]) -> Option<&str> {
+ (data.len() >= 9).then(|| match data[3] {
+ 0x0 => "DATA",
+ 0x1 => "HEADERS",
+ 0x2 => "PRIORITY",
+ 0x3 => "RST_STREAM",
+ 0x4 => "SETTINGS",
+ 0x5 => "PUSH_PROMISE",
+ 0x6 => "PING",
+ 0x7 => "GOAWAY",
+ 0x8 => "WINDOW_UPDATE",
+ 0x9 => "CONTINUATION",
+ _ => "UNKNOWN",
+ })
+}
+
+impl AsyncWrite for CollectGrpcResponseFrameTypeStream {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll> {
+ Pin::new(&mut self.inner).poll_write(cx, buf)
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
+ Pin::new(&mut self.inner).poll_flush(cx)
+ }
+
+ fn poll_shutdown(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll> {
+ Pin::new(&mut self.inner).poll_shutdown(cx)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::time::Duration;
+
+ use client::Client;
+ use common_grpc::channel_manager::ChannelManager;
+ use servers::grpc::GrpcServerConfig;
+ use servers::server::Server;
+
+ use super::*;
+ use crate::test_util::{StorageType, setup_grpc_server_with};
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+ async fn test_grpc_max_connection_age() {
+ let config = GrpcServerConfig {
+ max_connection_age: Some(Duration::from_secs(1)),
+ ..Default::default()
+ };
+ let (_db, server) = setup_grpc_server_with(
+ StorageType::File,
+ "test_grpc_max_connection_age",
+ None,
+ Some(config),
+ )
+ .await;
+ let addr = server.bind_addr().unwrap().to_string();
+
+ let channel_manager = ChannelManager::new();
+ let client = Client::with_manager_and_urls(channel_manager.clone(), vec![&addr]);
+
+ let (tx, mut rx) = mpsc::channel(1024);
+ channel_manager
+ .reset_with_connector(
+ &addr,
+ NetworkTrafficMonitorableConnector { interested_tx: tx },
+ )
+ .unwrap();
+
+ let recv = tokio::spawn(async move {
+ let sleep = tokio::time::sleep(Duration::from_secs(3));
+ tokio::pin!(sleep);
+
+ let mut frame_types = vec![];
+ loop {
+ tokio::select! {
+ x = rx.recv() => {
+ if let Some(x) = x {
+ frame_types.push(x);
+ } else {
+ break;
+ }
+ }
+ _ = &mut sleep => {
+ break;
+ }
+ }
+ }
+ frame_types
+ });
+
+ // Drive the gRPC connection, has no special meaning for this keep-alive test.
+ let _ = client.health_check().await;
+
+ let frame_types = recv.await.unwrap();
+ // If "max_connection_age" has taken effects, server will return a "GOAWAY" message.
+ assert!(
+ frame_types.iter().any(|x| x == "GOAWAY"),
+ "{:?}",
+ frame_types
+ );
+
+ server.shutdown().await.unwrap();
+ }
+}
diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs
index 1ee3e4d09b..f359c45ff9 100644
--- a/tests-integration/tests/grpc.rs
+++ b/tests-integration/tests/grpc.rs
@@ -959,6 +959,7 @@ pub async fn test_grpc_tls_config(store_type: StorageType) {
max_recv_message_size: 1024,
max_send_message_size: 1024,
tls,
+ max_connection_age: None,
};
let (_db, fe_grpc_server) =
setup_grpc_server_with(store_type, "tls_create_table", None, Some(config)).await;
@@ -1000,6 +1001,7 @@ pub async fn test_grpc_tls_config(store_type: StorageType) {
max_recv_message_size: 1024,
max_send_message_size: 1024,
tls,
+ max_connection_age: None,
};
let runtime = Runtime::builder().build().unwrap();
let grpc_builder =