fix: improve network failure detection (#7382)

* fix(meta): add default etcd client options with keep-alive settings (#7363)

* fix: improve network failure detection (#7367)

* Update src/meta-srv/src/handler.rs

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

---------

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
This commit is contained in:
Weny Xu
2025-12-10 17:48:36 +08:00
committed by GitHub
parent a22d08f1b1
commit cb0f1afb01
12 changed files with 84 additions and 38 deletions

2
Cargo.lock generated
View File

@@ -10838,7 +10838,7 @@ dependencies = [
[[package]] [[package]]
name = "rskafka" name = "rskafka"
version = "0.6.0" version = "0.6.0"
source = "git+https://github.com/WenyXu/rskafka.git?rev=7b0f31ed39db049b4ee2e5f1e95b5a30be9baf76#7b0f31ed39db049b4ee2e5f1e95b5a30be9baf76" source = "git+https://github.com/GreptimeTeam/rskafka.git?rev=f5688f83e7da591cda3f2674c2408b4c0ed4ed50#f5688f83e7da591cda3f2674c2408b4c0ed4ed50"
dependencies = [ dependencies = [
"bytes", "bytes",
"chrono", "chrono",

View File

@@ -200,7 +200,8 @@ reqwest = { version = "0.12", default-features = false, features = [
"stream", "stream",
"multipart", "multipart",
] } ] }
rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "7b0f31ed39db049b4ee2e5f1e95b5a30be9baf76", features = [ # Branch: feat/request-timeout
rskafka = { git = "https://github.com/GreptimeTeam/rskafka.git", rev = "f5688f83e7da591cda3f2674c2408b4c0ed4ed50", features = [
"transport-tls", "transport-tls",
] } ] }
rstest = "0.25" rstest = "0.25"

View File

@@ -14,6 +14,8 @@
use std::time::Duration; use std::time::Duration;
use etcd_client::ConnectOptions;
/// Heartbeat interval time (is the basic unit of various time). /// Heartbeat interval time (is the basic unit of various time).
pub const HEARTBEAT_INTERVAL_MILLIS: u64 = 3000; pub const HEARTBEAT_INTERVAL_MILLIS: u64 = 3000;
@@ -52,6 +54,17 @@ pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS: Duration =
pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS: Duration = pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS: Duration =
Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1); Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
/// The default options for the etcd client.
pub fn default_etcd_client_options() -> ConnectOptions {
ConnectOptions::new()
.with_keep_alive_while_idle(true)
.with_keep_alive(
Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1),
Duration::from_secs(10),
)
.with_connect_timeout(Duration::from_secs(10))
}
/// The default mailbox round-trip timeout. /// The default mailbox round-trip timeout.
pub const MAILBOX_RTT_SECS: u64 = 1; pub const MAILBOX_RTT_SECS: u64 = 1;

View File

@@ -14,7 +14,7 @@
use common_telemetry::{debug, error, info}; use common_telemetry::{debug, error, info};
use common_wal::config::kafka::common::{ use common_wal::config::kafka::common::{
DEFAULT_BACKOFF_CONFIG, KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT, KafkaConnectionConfig, KafkaTopicConfig,
}; };
use rskafka::client::error::Error as RsKafkaError; use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists; use rskafka::client::error::ProtocolError::TopicAlreadyExists;
@@ -205,11 +205,13 @@ impl KafkaTopicCreator {
self.partition_client(topic).await.unwrap() self.partition_client(topic).await.unwrap()
} }
} }
/// Builds a kafka [Client](rskafka::client::Client). /// Builds a kafka [Client](rskafka::client::Client).
pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> { pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Client> {
// Builds an kafka controller client for creating topics. // Builds an kafka controller client for creating topics.
let mut builder = ClientBuilder::new(connection.broker_endpoints.clone()) let mut builder = ClientBuilder::new(connection.broker_endpoints.clone())
.backoff_config(DEFAULT_BACKOFF_CONFIG); .backoff_config(DEFAULT_BACKOFF_CONFIG)
.connect_timeout(Some(DEFAULT_CONNECT_TIMEOUT));
if let Some(sasl) = &connection.sasl { if let Some(sasl) = &connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config()); builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
}; };

View File

@@ -36,6 +36,9 @@ pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
deadline: Some(Duration::from_secs(3)), deadline: Some(Duration::from_secs(3)),
}; };
/// The default connect timeout for kafka client.
pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
/// Default interval for auto WAL pruning. /// Default interval for auto WAL pruning.
pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::from_mins(30); pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::from_mins(30);
/// Default limit for concurrent auto pruning tasks. /// Default limit for concurrent auto pruning tasks.

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use common_wal::config::kafka::DatanodeKafkaConfig; use common_wal::config::kafka::DatanodeKafkaConfig;
use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG; use common_wal::config::kafka::common::{DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT};
use dashmap::DashMap; use dashmap::DashMap;
use rskafka::client::ClientBuilder; use rskafka::client::ClientBuilder;
use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling}; use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling};
@@ -78,7 +78,8 @@ impl ClientManager {
) -> Result<Self> { ) -> Result<Self> {
// Sets backoff config for the top-level kafka client and all clients constructed by it. // Sets backoff config for the top-level kafka client and all clients constructed by it.
let mut builder = ClientBuilder::new(config.connection.broker_endpoints.clone()) let mut builder = ClientBuilder::new(config.connection.broker_endpoints.clone())
.backoff_config(DEFAULT_BACKOFF_CONFIG); .backoff_config(DEFAULT_BACKOFF_CONFIG)
.connect_timeout(Some(DEFAULT_CONNECT_TIMEOUT));
if let Some(sasl) = &config.connection.sasl { if let Some(sasl) = &config.connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config()); builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
}; };

View File

@@ -14,6 +14,7 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::cluster_server::ClusterServer; use api::v1::meta::cluster_server::ClusterServer;
use api::v1::meta::heartbeat_server::HeartbeatServer; use api::v1::meta::heartbeat_server::HeartbeatServer;
@@ -49,16 +50,21 @@ use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{ use crate::metasrv::{
BackendImpl, ElectionRef, Metasrv, MetasrvOptions, SelectTarget, SelectorRef, BackendImpl, ElectionRef, Metasrv, MetasrvOptions, SelectTarget, SelectorRef,
}; };
use crate::selector::SelectorType;
use crate::selector::lease_based::LeaseBasedSelector; use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::load_based::LoadBasedSelector; use crate::selector::load_based::LoadBasedSelector;
use crate::selector::round_robin::RoundRobinSelector; use crate::selector::round_robin::RoundRobinSelector;
use crate::selector::weight_compute::RegionNumsBasedWeightCompute; use crate::selector::weight_compute::RegionNumsBasedWeightCompute;
use crate::selector::{Selector, SelectorType};
use crate::service::admin; use crate::service::admin;
use crate::service::admin::admin_axum_router; use crate::service::admin::admin_axum_router;
use crate::utils::etcd::create_etcd_client_with_tls; use crate::utils::etcd::create_etcd_client_with_tls;
use crate::{Result, error}; use crate::{Result, error};
/// The default keep-alive interval for gRPC.
const DEFAULT_GRPC_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(10);
/// The default keep-alive timeout for gRPC.
const DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10);
pub struct MetasrvInstance { pub struct MetasrvInstance {
metasrv: Arc<Metasrv>, metasrv: Arc<Metasrv>,
@@ -245,7 +251,12 @@ macro_rules! add_compressed_service {
} }
pub fn router(metasrv: Arc<Metasrv>) -> Router { pub fn router(metasrv: Arc<Metasrv>) -> Router {
let mut router = tonic::transport::Server::builder().accept_http1(true); // for admin services let mut router = tonic::transport::Server::builder()
// for admin services
.accept_http1(true)
// For quick network failures detection.
.http2_keepalive_interval(Some(DEFAULT_GRPC_KEEP_ALIVE_INTERVAL))
.http2_keepalive_timeout(Some(DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT));
let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone())); let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone())); let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone())); let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
@@ -393,7 +404,12 @@ pub async fn metasrv_builder(
info!("Using selector from plugins"); info!("Using selector from plugins");
selector selector
} else { } else {
let selector = match opts.selector { let selector: Arc<
dyn Selector<
Context = crate::metasrv::SelectorContext,
Output = Vec<common_meta::peer::Peer>,
>,
> = match opts.selector {
SelectorType::LoadBased => Arc::new(LoadBasedSelector::new( SelectorType::LoadBased => Arc::new(LoadBasedSelector::new(
RegionNumsBasedWeightCompute, RegionNumsBasedWeightCompute,
meta_peer_client.clone(), meta_peer_client.clone(),

View File

@@ -63,22 +63,6 @@ pub struct EtcdElection {
} }
impl EtcdElection { impl EtcdElection {
pub async fn with_endpoints<E, S>(
leader_value: E,
endpoints: S,
store_key_prefix: String,
) -> Result<ElectionRef>
where
E: AsRef<str>,
S: AsRef<[E]>,
{
let client = Client::connect(endpoints, None)
.await
.context(error::ConnectEtcdSnafu)?;
Self::with_etcd_client(leader_value, client, store_key_prefix).await
}
pub async fn with_etcd_client<E>( pub async fn with_etcd_client<E>(
leader_value: E, leader_value: E,
client: Client, client: Client,

View File

@@ -275,6 +275,15 @@ impl Pushers {
async fn remove(&self, pusher_id: &str) -> Option<Pusher> { async fn remove(&self, pusher_id: &str) -> Option<Pusher> {
self.0.write().await.remove(pusher_id) self.0.write().await.remove(pusher_id)
} }
pub(crate) async fn clear(&self) -> Vec<String> {
let mut pushers = self.0.write().await;
let keys = pushers.keys().cloned().collect::<Vec<_>>();
if !keys.is_empty() {
pushers.clear();
}
keys
}
} }
#[derive(Clone)] #[derive(Clone)]
@@ -309,12 +318,11 @@ impl HeartbeatHandlerGroup {
} }
/// Deregisters the heartbeat response [`Pusher`] with the given key from the group. /// Deregisters the heartbeat response [`Pusher`] with the given key from the group.
/// pub async fn deregister_push(&self, pusher_id: PusherId) {
/// Returns the [`Pusher`] if it exists.
pub async fn deregister_push(&self, pusher_id: PusherId) -> Option<Pusher> {
METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
info!("Pusher unregister: {}", pusher_id); info!("Pusher unregister: {}", pusher_id);
self.pushers.remove(&pusher_id.string_key()).await if self.pushers.remove(&pusher_id.string_key()).await.is_some() {
METRIC_META_HEARTBEAT_CONNECTION_NUM.dec();
}
} }
/// Returns the [`Pushers`] of the group. /// Returns the [`Pushers`] of the group.
@@ -519,6 +527,14 @@ impl Mailbox for HeartbeatMailbox {
Ok(()) Ok(())
} }
async fn reset(&self) {
let keys = self.pushers.clear().await;
if !keys.is_empty() {
info!("Reset mailbox, deregister pushers: {:?}", keys);
METRIC_META_HEARTBEAT_CONNECTION_NUM.sub(keys.len() as i64);
}
}
} }
/// The builder to build the group of heartbeat handlers. /// The builder to build the group of heartbeat handlers.

View File

@@ -452,6 +452,7 @@ pub struct MetaStateHandler {
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>, greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>, leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
leadership_change_notifier: LeadershipChangeNotifier, leadership_change_notifier: LeadershipChangeNotifier,
mailbox: MailboxRef,
state: StateRef, state: StateRef,
} }
@@ -475,6 +476,9 @@ impl MetaStateHandler {
pub async fn on_leader_stop(&self) { pub async fn on_leader_stop(&self) {
self.state.write().unwrap().next_state(become_follower()); self.state.write().unwrap().next_state(become_follower());
// Enforces the mailbox to clear all pushers.
// The remaining heartbeat connections will be closed by the remote peer or keep-alive detection.
self.mailbox.reset().await;
self.leadership_change_notifier self.leadership_change_notifier
.notify_on_leader_stop() .notify_on_leader_stop()
.await; .await;
@@ -602,6 +606,7 @@ impl Metasrv {
state: self.state.clone(), state: self.state.clone(),
leader_cached_kv_backend: leader_cached_kv_backend.clone(), leader_cached_kv_backend: leader_cached_kv_backend.clone(),
leadership_change_notifier, leadership_change_notifier,
mailbox: self.mailbox.clone(),
}; };
let _handle = common_runtime::spawn_global(async move { let _handle = common_runtime::spawn_global(async move {
loop { loop {

View File

@@ -207,6 +207,9 @@ pub trait Mailbox: Send + Sync {
async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()>; async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()>;
async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()>; async fn on_recv(&self, id: MessageId, maybe_msg: Result<MailboxMessage>) -> Result<()>;
/// Reset all pushers of the mailbox.
async fn reset(&self);
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -12,8 +12,9 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use common_meta::distributed_time_constants::default_etcd_client_options;
use common_meta::kv_backend::etcd::create_etcd_tls_options; use common_meta::kv_backend::etcd::create_etcd_tls_options;
use etcd_client::{Client, ConnectOptions}; use etcd_client::Client;
use servers::tls::{TlsMode, TlsOption}; use servers::tls::{TlsMode, TlsOption};
use snafu::ResultExt; use snafu::ResultExt;
@@ -30,14 +31,15 @@ pub async fn create_etcd_client_with_tls(
.filter(|x| !x.is_empty()) .filter(|x| !x.is_empty())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let connect_options = tls_config let mut connect_options = default_etcd_client_options();
.map(|c| create_etcd_tls_options(&convert_tls_option(c))) if let Some(tls_config) = tls_config
.transpose() && let Some(tls_options) = create_etcd_tls_options(&convert_tls_option(tls_config))
.context(BuildTlsOptionsSnafu)? .context(BuildTlsOptionsSnafu)?
.flatten() {
.map(|tls_options| ConnectOptions::new().with_tls(tls_options)); connect_options = connect_options.with_tls(tls_options);
}
Client::connect(&etcd_endpoints, connect_options) Client::connect(&etcd_endpoints, Some(connect_options))
.await .await
.context(error::ConnectEtcdSnafu) .context(error::ConnectEtcdSnafu)
} }