feat: add max_connection_age config to grpc server (#7031)

* feat: add `max_connection_age` config to grpc server

Signed-off-by: luofucong <luofc@foxmail.com>

* Apply suggestions from code review

Co-authored-by: Yingwen <realevenyag@gmail.com>

* fix ci

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
LFC
2025-09-29 15:32:43 +08:00
committed by GitHub
parent c4a7cc0adb
commit aa05b3b993
14 changed files with 239 additions and 6 deletions

1
Cargo.lock generated
View File

@@ -11571,7 +11571,6 @@ dependencies = [
"headers",
"hostname 0.3.1",
"http 1.3.1",
"http-body 1.0.1",
"humantime",
"humantime-serde",
"hyper 1.6.0",

View File

@@ -31,6 +31,7 @@
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_connection_age` | String | Unset | The maximum connection age for gRPC connection.<br/>The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour.<br/>Refer to https://grpc.io/docs/guides/keepalive/ for more details. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |
@@ -242,6 +243,7 @@
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression.<br/>Default to `none` |
| `grpc.max_connection_age` | String | Unset | The maximum connection age for gRPC connection.<br/>The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour.<br/>Refer to https://grpc.io/docs/guides/keepalive/ for more details. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | Unset | Certificate file path. |

View File

@@ -61,6 +61,11 @@ runtime_size = 8
## - `all`: enable all compression.
## Default to `none`
flight_compression = "arrow_ipc"
## The maximum connection age for gRPC connection.
## The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour.
## Refer to https://grpc.io/docs/guides/keepalive/ for more details.
## @toml2docs:none-default
#+ max_connection_age = "10m"
## gRPC server TLS options, see `mysql.tls` section.
[grpc.tls]

View File

@@ -56,6 +56,11 @@ prom_validation_mode = "strict"
bind_addr = "127.0.0.1:4001"
## The number of server worker threads.
runtime_size = 8
## The maximum connection age for gRPC connection.
## The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour.
## Refer to https://grpc.io/docs/guides/keepalive/ for more details.
## @toml2docs:none-default
#+ max_connection_age = "10m"
## gRPC server TLS options, see `mysql.tls` section.
[grpc.tls]

View File

@@ -143,9 +143,11 @@ fn test_load_frontend_example_config() {
remote_write: Some(Default::default()),
..Default::default()
},
grpc: GrpcOptions::default()
.with_bind_addr("127.0.0.1:4001")
.with_server_addr("127.0.0.1:4001"),
grpc: GrpcOptions {
bind_addr: "127.0.0.1:4001".to_string(),
server_addr: "127.0.0.1:4001".to_string(),
..Default::default()
},
internal_grpc: Some(GrpcOptions::internal_default()),
http: HttpOptions {
cors_allowed_origins: vec!["https://example.com".to_string()],

View File

@@ -491,6 +491,7 @@ impl<'a> FlownodeServiceBuilder<'a> {
max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize,
tls: opts.grpc.tls.clone(),
max_connection_age: opts.grpc.max_connection_age,
};
let service = flownode_server.create_flow_service();
let runtime = common_runtime::global_runtime();

View File

@@ -68,7 +68,7 @@ where
}
}
pub fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result<GrpcServerBuilder> {
fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result<GrpcServerBuilder> {
let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
.with_tls_config(opts.tls.clone())
.context(error::InvalidTlsConfigSnafu)?;

View File

@@ -64,7 +64,6 @@ futures-util.workspace = true
headers = "0.4"
hostname = "0.3"
http.workspace = true
http-body = "1"
humantime.workspace = true
humantime-serde.workspace = true
hyper = { workspace = true, features = ["full"] }

View File

@@ -23,6 +23,7 @@ pub mod prom_query_gateway;
pub mod region_server;
use std::net::SocketAddr;
use std::time::Duration;
use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
use api::v1::{HealthCheckRequest, HealthCheckResponse};
@@ -71,6 +72,11 @@ pub struct GrpcOptions {
pub runtime_size: usize,
#[serde(default = "Default::default")]
pub tls: TlsOption,
/// Maximum time that a channel may exist.
/// Useful when the server wants to control the reconnection of its clients.
/// Default to `None`, means infinite.
#[serde(with = "humantime_serde")]
pub max_connection_age: Option<Duration>,
}
impl GrpcOptions {
@@ -111,6 +117,7 @@ impl GrpcOptions {
max_recv_message_size: self.max_recv_message_size.as_bytes() as usize,
max_send_message_size: self.max_send_message_size.as_bytes() as usize,
tls: self.tls.clone(),
max_connection_age: self.max_connection_age,
}
}
}
@@ -130,6 +137,7 @@ impl Default for GrpcOptions {
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8,
tls: TlsOption::default(),
max_connection_age: None,
}
}
}
@@ -148,6 +156,7 @@ impl GrpcOptions {
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8,
tls: TlsOption::default(),
max_connection_age: None,
}
}
@@ -207,6 +216,7 @@ pub struct GrpcServer {
>,
bind_addr: Option<SocketAddr>,
name: Option<String>,
config: GrpcServerConfig,
}
/// Grpc Server configuration
@@ -217,6 +227,10 @@ pub struct GrpcServerConfig {
// Max gRPC sending(encoding) message size
pub max_send_message_size: usize,
pub tls: TlsOption,
/// Maximum time that a channel may exist.
/// Useful when the server wants to control the reconnection of its clients.
/// Default to `None`, means infinite.
pub max_connection_age: Option<Duration>,
}
impl Default for GrpcServerConfig {
@@ -225,6 +239,7 @@ impl Default for GrpcServerConfig {
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
tls: TlsOption::default(),
max_connection_age: None,
}
}
}
@@ -333,6 +348,10 @@ impl Server for GrpcServer {
builder = builder.tls_config(tls_config).context(StartGrpcSnafu)?;
}
if let Some(max_connection_age) = self.config.max_connection_age {
builder = builder.max_connection_age(max_connection_age);
}
let mut builder = builder
.add_routes(routes)
.add_service(self.create_healthcheck_service())

View File

@@ -197,6 +197,7 @@ impl GrpcServerBuilder {
otel_arrow_service: Mutex::new(self.otel_arrow_service),
bind_addr: None,
name: self.name,
config: self.config,
}
}
}

View File

@@ -47,6 +47,7 @@ flow.workspace = true
frontend = { workspace = true, features = ["testing"] }
futures.workspace = true
futures-util.workspace = true
http.workspace = true
hyper-util = { workspace = true, features = ["tokio"] }
log-query = { workspace = true }
loki-proto.workspace = true

View File

@@ -13,6 +13,7 @@
// limitations under the License.
mod flight;
mod network;
use api::v1::QueryRequest;
use api::v1::greptime_request::Request;

View File

@@ -0,0 +1,196 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_util::future::BoxFuture;
use http::Uri;
use hyper_util::rt::TokioIo;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tower::Service;
struct NetworkTrafficMonitorableConnector {
interested_tx: mpsc::Sender<String>,
}
impl Service<Uri> for NetworkTrafficMonitorableConnector {
type Response = TokioIo<CollectGrpcResponseFrameTypeStream>;
type Error = String;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, uri: Uri) -> Self::Future {
let frame_types = self.interested_tx.clone();
Box::pin(async move {
let addr = format!(
"{}:{}",
uri.host().unwrap_or("localhost"),
uri.port_u16().unwrap_or(4001),
);
let inner = TcpStream::connect(addr).await.map_err(|e| e.to_string())?;
Ok(TokioIo::new(CollectGrpcResponseFrameTypeStream {
inner,
frame_types,
}))
})
}
}
struct CollectGrpcResponseFrameTypeStream {
inner: TcpStream,
frame_types: mpsc::Sender<String>,
}
impl AsyncRead for CollectGrpcResponseFrameTypeStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let before_len = buf.filled().len();
let result = Pin::new(&mut self.inner).poll_read(cx, buf);
if let Poll::Ready(Ok(())) = &result {
let after_len = buf.filled().len();
let new_data = &buf.filled()[before_len..after_len];
if let Some(frame_type) = maybe_decode_frame_type(new_data)
&& let Err(_) = self.frame_types.try_send(frame_type.to_string())
{
return Poll::Ready(Err(io::Error::other("interested party has gone")));
}
}
result
}
}
fn maybe_decode_frame_type(data: &[u8]) -> Option<&str> {
(data.len() >= 9).then(|| match data[3] {
0x0 => "DATA",
0x1 => "HEADERS",
0x2 => "PRIORITY",
0x3 => "RST_STREAM",
0x4 => "SETTINGS",
0x5 => "PUSH_PROMISE",
0x6 => "PING",
0x7 => "GOAWAY",
0x8 => "WINDOW_UPDATE",
0x9 => "CONTINUATION",
_ => "UNKNOWN",
})
}
impl AsyncWrite for CollectGrpcResponseFrameTypeStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.inner).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.inner).poll_flush(cx)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use client::Client;
use common_grpc::channel_manager::ChannelManager;
use servers::grpc::GrpcServerConfig;
use servers::server::Server;
use super::*;
use crate::test_util::{StorageType, setup_grpc_server_with};
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_grpc_max_connection_age() {
let config = GrpcServerConfig {
max_connection_age: Some(Duration::from_secs(1)),
..Default::default()
};
let (_db, server) = setup_grpc_server_with(
StorageType::File,
"test_grpc_max_connection_age",
None,
Some(config),
)
.await;
let addr = server.bind_addr().unwrap().to_string();
let channel_manager = ChannelManager::new();
let client = Client::with_manager_and_urls(channel_manager.clone(), vec![&addr]);
let (tx, mut rx) = mpsc::channel(1024);
channel_manager
.reset_with_connector(
&addr,
NetworkTrafficMonitorableConnector { interested_tx: tx },
)
.unwrap();
let recv = tokio::spawn(async move {
let sleep = tokio::time::sleep(Duration::from_secs(3));
tokio::pin!(sleep);
let mut frame_types = vec![];
loop {
tokio::select! {
x = rx.recv() => {
if let Some(x) = x {
frame_types.push(x);
} else {
break;
}
}
_ = &mut sleep => {
break;
}
}
}
frame_types
});
// Drive the gRPC connection, has no special meaning for this keep-alive test.
let _ = client.health_check().await;
let frame_types = recv.await.unwrap();
// If "max_connection_age" has taken effects, server will return a "GOAWAY" message.
assert!(
frame_types.iter().any(|x| x == "GOAWAY"),
"{:?}",
frame_types
);
server.shutdown().await.unwrap();
}
}

View File

@@ -959,6 +959,7 @@ pub async fn test_grpc_tls_config(store_type: StorageType) {
max_recv_message_size: 1024,
max_send_message_size: 1024,
tls,
max_connection_age: None,
};
let (_db, fe_grpc_server) =
setup_grpc_server_with(store_type, "tls_create_table", None, Some(config)).await;
@@ -1000,6 +1001,7 @@ pub async fn test_grpc_tls_config(store_type: StorageType) {
max_recv_message_size: 1024,
max_send_message_size: 1024,
tls,
max_connection_age: None,
};
let runtime = Runtime::builder().build().unwrap();
let grpc_builder =