From cb0f1afb0126cbf9508c54bbcc013ddd1328a49d Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 10 Dec 2025 17:48:36 +0800 Subject: [PATCH] fix: improve network failure detection (#7382) * fix(meta): add default etcd client options with keep-alive settings (#7363) * fix: improve network failure detection (#7367) * Update src/meta-srv/src/handler.rs Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> --------- Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com> --- Cargo.lock | 2 +- Cargo.toml | 3 ++- .../meta/src/distributed_time_constants.rs | 13 ++++++++++ .../wal_options_allocator/topic_creator.rs | 6 +++-- src/common/wal/src/config/kafka/common.rs | 3 +++ src/log-store/src/kafka/client_manager.rs | 5 ++-- src/meta-srv/src/bootstrap.rs | 22 +++++++++++++--- src/meta-srv/src/election/etcd.rs | 16 ------------ src/meta-srv/src/handler.rs | 26 +++++++++++++++---- src/meta-srv/src/metasrv.rs | 5 ++++ src/meta-srv/src/service/mailbox.rs | 3 +++ src/meta-srv/src/utils/etcd.rs | 18 +++++++------ 12 files changed, 84 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 271eb444de..ca52b4c3ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10838,7 +10838,7 @@ dependencies = [ [[package]] name = "rskafka" version = "0.6.0" -source = "git+https://github.com/WenyXu/rskafka.git?rev=7b0f31ed39db049b4ee2e5f1e95b5a30be9baf76#7b0f31ed39db049b4ee2e5f1e95b5a30be9baf76" +source = "git+https://github.com/GreptimeTeam/rskafka.git?rev=f5688f83e7da591cda3f2674c2408b4c0ed4ed50#f5688f83e7da591cda3f2674c2408b4c0ed4ed50" dependencies = [ "bytes", "chrono", diff --git a/Cargo.toml b/Cargo.toml index fdc23da7ed..66a2b5dbb1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -200,7 +200,8 @@ reqwest = { version = "0.12", default-features = false, features = [ "stream", "multipart", ] } -rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "7b0f31ed39db049b4ee2e5f1e95b5a30be9baf76", features = [ +# Branch: feat/request-timeout +rskafka = { git = "https://github.com/GreptimeTeam/rskafka.git", rev = "f5688f83e7da591cda3f2674c2408b4c0ed4ed50", features = [ "transport-tls", ] } rstest = "0.25" diff --git a/src/common/meta/src/distributed_time_constants.rs b/src/common/meta/src/distributed_time_constants.rs index bd523cd901..fefb175860 100644 --- a/src/common/meta/src/distributed_time_constants.rs +++ b/src/common/meta/src/distributed_time_constants.rs @@ -14,6 +14,8 @@ use std::time::Duration; +use etcd_client::ConnectOptions; + /// Heartbeat interval time (is the basic unit of various time). pub const HEARTBEAT_INTERVAL_MILLIS: u64 = 3000; @@ -52,6 +54,17 @@ pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS: Duration = pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS: Duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1); +/// The default options for the etcd client. +pub fn default_etcd_client_options() -> ConnectOptions { + ConnectOptions::new() + .with_keep_alive_while_idle(true) + .with_keep_alive( + Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1), + Duration::from_secs(10), + ) + .with_connect_timeout(Duration::from_secs(10)) +} + /// The default mailbox round-trip timeout. pub const MAILBOX_RTT_SECS: u64 = 1; diff --git a/src/common/meta/src/wal_options_allocator/topic_creator.rs b/src/common/meta/src/wal_options_allocator/topic_creator.rs index 8ba5b9ec59..0c3caf215a 100644 --- a/src/common/meta/src/wal_options_allocator/topic_creator.rs +++ b/src/common/meta/src/wal_options_allocator/topic_creator.rs @@ -14,7 +14,7 @@ use common_telemetry::{debug, error, info}; use common_wal::config::kafka::common::{ - DEFAULT_BACKOFF_CONFIG, KafkaConnectionConfig, KafkaTopicConfig, + DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT, KafkaConnectionConfig, KafkaTopicConfig, }; use rskafka::client::error::Error as RsKafkaError; use rskafka::client::error::ProtocolError::TopicAlreadyExists; @@ -205,11 +205,13 @@ impl KafkaTopicCreator { self.partition_client(topic).await.unwrap() } } + /// Builds a kafka [Client](rskafka::client::Client). pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result { // Builds an kafka controller client for creating topics. let mut builder = ClientBuilder::new(connection.broker_endpoints.clone()) - .backoff_config(DEFAULT_BACKOFF_CONFIG); + .backoff_config(DEFAULT_BACKOFF_CONFIG) + .connect_timeout(Some(DEFAULT_CONNECT_TIMEOUT)); if let Some(sasl) = &connection.sasl { builder = builder.sasl_config(sasl.config.clone().into_sasl_config()); }; diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs index 1b9bcc77be..f58ba640c8 100644 --- a/src/common/wal/src/config/kafka/common.rs +++ b/src/common/wal/src/config/kafka/common.rs @@ -36,6 +36,9 @@ pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig { deadline: Some(Duration::from_secs(3)), }; +/// The default connect timeout for kafka client. +pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); + /// Default interval for auto WAL pruning. pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::from_mins(30); /// Default limit for concurrent auto pruning tasks. diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 8a19238356..49d363ad14 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use common_wal::config::kafka::DatanodeKafkaConfig; -use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG; +use common_wal::config::kafka::common::{DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT}; use dashmap::DashMap; use rskafka::client::ClientBuilder; use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling}; @@ -78,7 +78,8 @@ impl ClientManager { ) -> Result { // Sets backoff config for the top-level kafka client and all clients constructed by it. let mut builder = ClientBuilder::new(config.connection.broker_endpoints.clone()) - .backoff_config(DEFAULT_BACKOFF_CONFIG); + .backoff_config(DEFAULT_BACKOFF_CONFIG) + .connect_timeout(Some(DEFAULT_CONNECT_TIMEOUT)); if let Some(sasl) = &config.connection.sasl { builder = builder.sasl_config(sasl.config.clone().into_sasl_config()); }; diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 351853d2bf..211edbbd86 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -14,6 +14,7 @@ use std::net::SocketAddr; use std::sync::Arc; +use std::time::Duration; use api::v1::meta::cluster_server::ClusterServer; use api::v1::meta::heartbeat_server::HeartbeatServer; @@ -49,16 +50,21 @@ use crate::metasrv::builder::MetasrvBuilder; use crate::metasrv::{ BackendImpl, ElectionRef, Metasrv, MetasrvOptions, SelectTarget, SelectorRef, }; -use crate::selector::SelectorType; use crate::selector::lease_based::LeaseBasedSelector; use crate::selector::load_based::LoadBasedSelector; use crate::selector::round_robin::RoundRobinSelector; use crate::selector::weight_compute::RegionNumsBasedWeightCompute; +use crate::selector::{Selector, SelectorType}; use crate::service::admin; use crate::service::admin::admin_axum_router; use crate::utils::etcd::create_etcd_client_with_tls; use crate::{Result, error}; +/// The default keep-alive interval for gRPC. +const DEFAULT_GRPC_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(10); +/// The default keep-alive timeout for gRPC. +const DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10); + pub struct MetasrvInstance { metasrv: Arc, @@ -245,7 +251,12 @@ macro_rules! add_compressed_service { } pub fn router(metasrv: Arc) -> Router { - let mut router = tonic::transport::Server::builder().accept_http1(true); // for admin services + let mut router = tonic::transport::Server::builder() + // for admin services + .accept_http1(true) + // For quick network failures detection. + .http2_keepalive_interval(Some(DEFAULT_GRPC_KEEP_ALIVE_INTERVAL)) + .http2_keepalive_timeout(Some(DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT)); let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone())); let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone())); let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone())); @@ -393,7 +404,12 @@ pub async fn metasrv_builder( info!("Using selector from plugins"); selector } else { - let selector = match opts.selector { + let selector: Arc< + dyn Selector< + Context = crate::metasrv::SelectorContext, + Output = Vec, + >, + > = match opts.selector { SelectorType::LoadBased => Arc::new(LoadBasedSelector::new( RegionNumsBasedWeightCompute, meta_peer_client.clone(), diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index 7a6a02f490..883f723d74 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -63,22 +63,6 @@ pub struct EtcdElection { } impl EtcdElection { - pub async fn with_endpoints( - leader_value: E, - endpoints: S, - store_key_prefix: String, - ) -> Result - where - E: AsRef, - S: AsRef<[E]>, - { - let client = Client::connect(endpoints, None) - .await - .context(error::ConnectEtcdSnafu)?; - - Self::with_etcd_client(leader_value, client, store_key_prefix).await - } - pub async fn with_etcd_client( leader_value: E, client: Client, diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 8f7aba2f92..c581bd449f 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -275,6 +275,15 @@ impl Pushers { async fn remove(&self, pusher_id: &str) -> Option { self.0.write().await.remove(pusher_id) } + + pub(crate) async fn clear(&self) -> Vec { + let mut pushers = self.0.write().await; + let keys = pushers.keys().cloned().collect::>(); + if !keys.is_empty() { + pushers.clear(); + } + keys + } } #[derive(Clone)] @@ -309,12 +318,11 @@ impl HeartbeatHandlerGroup { } /// Deregisters the heartbeat response [`Pusher`] with the given key from the group. - /// - /// Returns the [`Pusher`] if it exists. - pub async fn deregister_push(&self, pusher_id: PusherId) -> Option { - METRIC_META_HEARTBEAT_CONNECTION_NUM.dec(); + pub async fn deregister_push(&self, pusher_id: PusherId) { info!("Pusher unregister: {}", pusher_id); - self.pushers.remove(&pusher_id.string_key()).await + if self.pushers.remove(&pusher_id.string_key()).await.is_some() { + METRIC_META_HEARTBEAT_CONNECTION_NUM.dec(); + } } /// Returns the [`Pushers`] of the group. @@ -519,6 +527,14 @@ impl Mailbox for HeartbeatMailbox { Ok(()) } + + async fn reset(&self) { + let keys = self.pushers.clear().await; + if !keys.is_empty() { + info!("Reset mailbox, deregister pushers: {:?}", keys); + METRIC_META_HEARTBEAT_CONNECTION_NUM.sub(keys.len() as i64); + } + } } /// The builder to build the group of heartbeat handlers. diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index c454bc1ca5..99f392cfd8 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -452,6 +452,7 @@ pub struct MetaStateHandler { greptimedb_telemetry_task: Arc, leader_cached_kv_backend: Arc, leadership_change_notifier: LeadershipChangeNotifier, + mailbox: MailboxRef, state: StateRef, } @@ -475,6 +476,9 @@ impl MetaStateHandler { pub async fn on_leader_stop(&self) { self.state.write().unwrap().next_state(become_follower()); + // Enforces the mailbox to clear all pushers. + // The remaining heartbeat connections will be closed by the remote peer or keep-alive detection. + self.mailbox.reset().await; self.leadership_change_notifier .notify_on_leader_stop() .await; @@ -602,6 +606,7 @@ impl Metasrv { state: self.state.clone(), leader_cached_kv_backend: leader_cached_kv_backend.clone(), leadership_change_notifier, + mailbox: self.mailbox.clone(), }; let _handle = common_runtime::spawn_global(async move { loop { diff --git a/src/meta-srv/src/service/mailbox.rs b/src/meta-srv/src/service/mailbox.rs index f339e5c4da..bede162936 100644 --- a/src/meta-srv/src/service/mailbox.rs +++ b/src/meta-srv/src/service/mailbox.rs @@ -207,6 +207,9 @@ pub trait Mailbox: Send + Sync { async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()>; async fn on_recv(&self, id: MessageId, maybe_msg: Result) -> Result<()>; + + /// Reset all pushers of the mailbox. + async fn reset(&self); } #[cfg(test)] diff --git a/src/meta-srv/src/utils/etcd.rs b/src/meta-srv/src/utils/etcd.rs index 15ac76fd75..508db7c148 100644 --- a/src/meta-srv/src/utils/etcd.rs +++ b/src/meta-srv/src/utils/etcd.rs @@ -12,8 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_meta::distributed_time_constants::default_etcd_client_options; use common_meta::kv_backend::etcd::create_etcd_tls_options; -use etcd_client::{Client, ConnectOptions}; +use etcd_client::Client; use servers::tls::{TlsMode, TlsOption}; use snafu::ResultExt; @@ -30,14 +31,15 @@ pub async fn create_etcd_client_with_tls( .filter(|x| !x.is_empty()) .collect::>(); - let connect_options = tls_config - .map(|c| create_etcd_tls_options(&convert_tls_option(c))) - .transpose() - .context(BuildTlsOptionsSnafu)? - .flatten() - .map(|tls_options| ConnectOptions::new().with_tls(tls_options)); + let mut connect_options = default_etcd_client_options(); + if let Some(tls_config) = tls_config + && let Some(tls_options) = create_etcd_tls_options(&convert_tls_option(tls_config)) + .context(BuildTlsOptionsSnafu)? + { + connect_options = connect_options.with_tls(tls_options); + } - Client::connect(&etcd_endpoints, connect_options) + Client::connect(&etcd_endpoints, Some(connect_options)) .await .context(error::ConnectEtcdSnafu) }