fix: improve network failure detection (#7367)

This commit is contained in:
Weny Xu
2025-12-09 19:47:45 +08:00
committed by GitHub
parent b0d2d26ad8
commit 4d33b9687a
9 changed files with 178 additions and 32 deletions

135
Cargo.lock generated
View File

@@ -4245,7 +4245,7 @@ checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc"
dependencies = [
"cfg-if",
"libc",
"socket2",
"socket2 0.5.7",
"windows-sys 0.48.0",
]
@@ -5592,7 +5592,7 @@ checksum = "a56f203cd1c76362b69e3863fd987520ac36cf70a8c92627449b2f64a8cf7d65"
dependencies = [
"cfg-if",
"libc",
"windows-link",
"windows-link 0.1.1",
]
[[package]]
@@ -5724,7 +5724,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"socket2",
"socket2 0.5.7",
"tokio",
"tower-service",
"tracing",
@@ -5830,7 +5830,7 @@ dependencies = [
"http-body 1.0.1",
"hyper 1.4.1",
"pin-project-lite",
"socket2",
"socket2 0.5.7",
"tokio",
"tower-service",
"tracing",
@@ -6870,9 +6870,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.171"
version = "0.2.178"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091"
[[package]]
name = "libflate"
@@ -6928,7 +6928,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
dependencies = [
"cfg-if",
"windows-targets 0.52.6",
"windows-targets 0.48.5",
]
[[package]]
@@ -7005,13 +7005,13 @@ checksum = "b4ce301924b7887e9d637144fdade93f9dfff9b60981d4ac161db09720d39aa5"
[[package]]
name = "local-ip-address"
version = "0.6.3"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3669cf5561f8d27e8fc84cc15e58350e70f557d4d65f70e3154e54cd2f8e1782"
checksum = "656b3b27f8893f7bbf9485148ff9a65f019e3f33bd5cdc87c83cab16b3fd9ec8"
dependencies = [
"libc",
"neli",
"thiserror 1.0.64",
"thiserror 2.0.12",
"windows-sys 0.59.0",
]
@@ -7759,7 +7759,7 @@ dependencies = [
"percent-encoding",
"rustls",
"rustls-pemfile",
"socket2",
"socket2 0.5.7",
"twox-hash 2.1.0",
"url",
"webpki",
@@ -7825,7 +7825,7 @@ dependencies = [
"rustls-pemfile",
"serde",
"serde_json",
"socket2",
"socket2 0.5.7",
"thiserror 2.0.12",
"tokio",
"tokio-rustls",
@@ -10221,7 +10221,7 @@ dependencies = [
"quinn-udp",
"rustc-hash 2.0.0",
"rustls",
"socket2",
"socket2 0.5.7",
"thiserror 1.0.64",
"tokio",
"tracing",
@@ -10252,7 +10252,7 @@ checksum = "4fe68c2e9e1a1234e218683dbdf9f9dfcb094113c5ac2b938dfcb9bab4c4140b"
dependencies = [
"libc",
"once_cell",
"socket2",
"socket2 0.5.7",
"tracing",
"windows-sys 0.59.0",
]
@@ -10849,7 +10849,7 @@ dependencies = [
[[package]]
name = "rskafka"
version = "0.6.0"
source = "git+https://github.com/influxdata/rskafka.git?rev=8dbd01ed809f5a791833a594e85b144e36e45820#8dbd01ed809f5a791833a594e85b144e36e45820"
source = "git+https://github.com/WenyXu/rskafka.git?rev=40150bad95fddd39a772dc132ee9e92f8f91fe38#40150bad95fddd39a772dc132ee9e92f8f91fe38"
dependencies = [
"bytes",
"chrono",
@@ -10863,6 +10863,7 @@ dependencies = [
"rsasl",
"rustls",
"snap",
"socket2 0.6.1",
"thiserror 2.0.12",
"tokio",
"tokio-rustls",
@@ -11613,7 +11614,7 @@ dependencies = [
"simd-json",
"snafu 0.8.5",
"snap",
"socket2",
"socket2 0.5.7",
"sql",
"store-api",
"strum 0.27.1",
@@ -11934,6 +11935,16 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "socket2"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881"
dependencies = [
"libc",
"windows-sys 0.60.2",
]
[[package]]
name = "spade"
version = "2.12.1"
@@ -12331,7 +12342,7 @@ dependencies = [
"cfg-if",
"libc",
"psm",
"windows-sys 0.59.0",
"windows-sys 0.52.0",
]
[[package]]
@@ -13323,7 +13334,7 @@ dependencies = [
"parking_lot 0.12.3",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"socket2 0.5.7",
"tokio-macros",
"tracing",
"windows-sys 0.52.0",
@@ -13395,7 +13406,7 @@ dependencies = [
"postgres-protocol",
"postgres-types",
"rand 0.8.5",
"socket2",
"socket2 0.5.7",
"tokio",
"tokio-util",
"whoami",
@@ -13571,7 +13582,7 @@ dependencies = [
"pin-project",
"prost 0.13.5",
"rustls-pemfile",
"socket2",
"socket2 0.5.7",
"tokio",
"tokio-rustls",
"tokio-stream",
@@ -14598,7 +14609,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
"windows-sys 0.59.0",
"windows-sys 0.48.0",
]
[[package]]
@@ -14711,6 +14722,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38"
[[package]]
name = "windows-link"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-registry"
version = "0.2.0"
@@ -14777,6 +14794,15 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-sys"
version = "0.60.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb"
dependencies = [
"windows-targets 0.53.5",
]
[[package]]
name = "windows-targets"
version = "0.48.5"
@@ -14801,13 +14827,30 @@ dependencies = [
"windows_aarch64_gnullvm 0.52.6",
"windows_aarch64_msvc 0.52.6",
"windows_i686_gnu 0.52.6",
"windows_i686_gnullvm",
"windows_i686_gnullvm 0.52.6",
"windows_i686_msvc 0.52.6",
"windows_x86_64_gnu 0.52.6",
"windows_x86_64_gnullvm 0.52.6",
"windows_x86_64_msvc 0.52.6",
]
[[package]]
name = "windows-targets"
version = "0.53.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3"
dependencies = [
"windows-link 0.2.1",
"windows_aarch64_gnullvm 0.53.1",
"windows_aarch64_msvc 0.53.1",
"windows_i686_gnu 0.53.1",
"windows_i686_gnullvm 0.53.1",
"windows_i686_msvc 0.53.1",
"windows_x86_64_gnu 0.53.1",
"windows_x86_64_gnullvm 0.53.1",
"windows_x86_64_msvc 0.53.1",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.5"
@@ -14820,6 +14863,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53"
[[package]]
name = "windows_aarch64_msvc"
version = "0.48.5"
@@ -14832,6 +14881,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
[[package]]
name = "windows_aarch64_msvc"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006"
[[package]]
name = "windows_i686_gnu"
version = "0.48.5"
@@ -14844,12 +14899,24 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
[[package]]
name = "windows_i686_gnu"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3"
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
[[package]]
name = "windows_i686_gnullvm"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c"
[[package]]
name = "windows_i686_msvc"
version = "0.48.5"
@@ -14862,6 +14929,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
[[package]]
name = "windows_i686_msvc"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2"
[[package]]
name = "windows_x86_64_gnu"
version = "0.48.5"
@@ -14874,6 +14947,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
[[package]]
name = "windows_x86_64_gnu"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.5"
@@ -14886,6 +14965,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1"
[[package]]
name = "windows_x86_64_msvc"
version = "0.48.5"
@@ -14898,6 +14983,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "windows_x86_64_msvc"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650"
[[package]]
name = "winnow"
version = "0.5.40"

View File

@@ -188,7 +188,8 @@ reqwest = { version = "0.12", default-features = false, features = [
"stream",
"multipart",
] }
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "8dbd01ed809f5a791833a594e85b144e36e45820", features = [
# Branch: feat/keepalive-port
rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "40150bad95fddd39a772dc132ee9e92f8f91fe38", features = [
"transport-tls",
] }
rstest = "0.25"

View File

@@ -14,7 +14,8 @@
use common_telemetry::{debug, error, info};
use common_wal::config::kafka::common::{
KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_BACKOFF_CONFIG,
KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT,
DEFAULT_KEEP_ALIVE_CONFIG,
};
use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
@@ -209,7 +210,9 @@ impl KafkaTopicCreator {
pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
// Builds an kafka controller client for creating topics.
let mut builder = ClientBuilder::new(connection.broker_endpoints.clone())
.backoff_config(DEFAULT_BACKOFF_CONFIG);
.backoff_config(DEFAULT_BACKOFF_CONFIG)
.connect_timeout(Some(DEFAULT_CONNECT_TIMEOUT))
.keepalive_config(DEFAULT_KEEP_ALIVE_CONFIG);
if let Some(sasl) = &connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};

View File

@@ -16,7 +16,7 @@ use std::io::Cursor;
use std::sync::Arc;
use std::time::Duration;
use rskafka::client::{Credentials, SaslConfig};
use rskafka::client::{Credentials, KeepaliveConfig, SaslConfig};
use rskafka::BackoffConfig;
use rustls::{ClientConfig, RootCertStore};
use serde::{Deserialize, Serialize};
@@ -35,6 +35,16 @@ pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
deadline: Some(Duration::from_secs(3)),
};
/// The default keep-alive config for kafka client.
pub const DEFAULT_KEEP_ALIVE_CONFIG: KeepaliveConfig = KeepaliveConfig {
time: Some(Duration::from_secs(10)),
interval: Some(Duration::from_secs(7)),
retries: Some(3),
};
/// The default connect timeout for kafka client.
pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
/// Default interval for auto WAL pruning.
pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::ZERO;
/// Default limit for concurrent auto pruning tasks.

View File

@@ -15,7 +15,9 @@
use std::collections::HashMap;
use std::sync::Arc;
use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
use common_wal::config::kafka::common::{
DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT, DEFAULT_KEEP_ALIVE_CONFIG,
};
use common_wal::config::kafka::DatanodeKafkaConfig;
use dashmap::DashMap;
use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling};
@@ -77,7 +79,9 @@ impl ClientManager {
) -> Result<Self> {
// Sets backoff config for the top-level kafka client and all clients constructed by it.
let mut builder = ClientBuilder::new(config.connection.broker_endpoints.clone())
.backoff_config(DEFAULT_BACKOFF_CONFIG);
.backoff_config(DEFAULT_BACKOFF_CONFIG)
.connect_timeout(Some(DEFAULT_CONNECT_TIMEOUT))
.keepalive_config(DEFAULT_KEEP_ALIVE_CONFIG);
if let Some(sasl) = &config.connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};

View File

@@ -14,6 +14,7 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::cluster_server::ClusterServer;
use api::v1::meta::heartbeat_server::HeartbeatServer;
@@ -82,6 +83,11 @@ use crate::service::admin;
use crate::service::admin::admin_axum_router;
use crate::{error, Result};
/// The default keep-alive interval for gRPC.
const DEFAULT_GRPC_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(10);
/// The default keep-alive timeout for gRPC.
const DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10);
pub struct MetasrvInstance {
metasrv: Arc<Metasrv>,
@@ -271,7 +277,12 @@ macro_rules! add_compressed_service {
}
pub fn router(metasrv: Arc<Metasrv>) -> Router {
let mut router = tonic::transport::Server::builder().accept_http1(true); // for admin services
let mut router = tonic::transport::Server::builder()
// for admin services
.accept_http1(true)
// For quick network failures detection.
.http2_keepalive_interval(Some(DEFAULT_GRPC_KEEP_ALIVE_INTERVAL))
.http2_keepalive_timeout(Some(DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT));
let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));

View File

@@ -268,6 +268,15 @@ impl Pushers {
async fn remove(&self, pusher_id: &str) -> Option<Pusher> {
self.0.write().await.remove(pusher_id)
}
pub(crate) async fn clear(&self) -> Vec<String> {
let mut pushers = self.0.write().await;
let keys = pushers.keys().cloned().collect::<Vec<_>>();
if !keys.is_empty() {
pushers.clear();
}
keys
}
}
#[derive(Clone)]
@@ -304,10 +313,11 @@ impl HeartbeatHandlerGroup {
/// Deregisters the heartbeat response [`Pusher`] with the given key from the group.
///
/// Returns the [`Pusher`] if it exists.
pub async fn deregister_push(&self, pusher_id: PusherId) -> Option<Pusher> {
METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
pub async fn deregister_push(&self, pusher_id: PusherId) {
info!("Pusher unregister: {}", pusher_id);
self.pushers.remove(&pusher_id.string_key()).await
if self.pushers.remove(&pusher_id.string_key()).await.is_some() {
METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
}
}
/// Returns the [`Pushers`] of the group.
@@ -516,6 +526,14 @@ impl Mailbox for HeartbeatMailbox {
Ok(())
}
async fn reset(&self) {
let keys = self.pushers.clear().await;
if !keys.is_empty() {
info!("Reset mailbox, deregister pushers: {:?}", keys);
METRIC_META_HEARTBEAT_CONNECTION_NUM.sub(keys.len() as i64);
}
}
}
/// The builder to build the group of heartbeat handlers.

View File

@@ -388,6 +388,7 @@ pub struct MetaStateHandler {
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
leadership_change_notifier: LeadershipChangeNotifier,
mailbox: MailboxRef,
state: StateRef,
}
@@ -411,6 +412,9 @@ impl MetaStateHandler {
pub async fn on_leader_stop(&self) {
self.state.write().unwrap().next_state(become_follower());
// Enforces the mailbox to clear all pushers.
// The remaining heartbeat connections will be closed by the remote peer or keep-alive detection.
self.mailbox.reset().await;
self.leadership_change_notifier
.notify_on_leader_stop()
.await;
@@ -528,6 +532,7 @@ impl Metasrv {
state: self.state.clone(),
leader_cached_kv_backend: leader_cached_kv_backend.clone(),
leadership_change_notifier,
mailbox: self.mailbox.clone(),
};
let _handle = common_runtime::spawn_global(async move {
loop {

View File

@@ -207,6 +207,9 @@ pub trait Mailbox: Send + Sync {
async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()>;
async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()>;
/// Reset all pushers of the mailbox.
async fn reset(&self);
}
#[cfg(test)]