diff --git a/Cargo.lock b/Cargo.lock index 32b9bed8e5..4a21ce09ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4245,7 +4245,7 @@ checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc" dependencies = [ "cfg-if", "libc", - "socket2", + "socket2 0.5.7", "windows-sys 0.48.0", ] @@ -5592,7 +5592,7 @@ checksum = "a56f203cd1c76362b69e3863fd987520ac36cf70a8c92627449b2f64a8cf7d65" dependencies = [ "cfg-if", "libc", - "windows-link", + "windows-link 0.1.1", ] [[package]] @@ -5724,7 +5724,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -5830,7 +5830,7 @@ dependencies = [ "http-body 1.0.1", "hyper 1.4.1", "pin-project-lite", - "socket2", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -6870,9 +6870,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.171" +version = "0.2.178" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" +checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" [[package]] name = "libflate" @@ -6928,7 +6928,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -7005,13 +7005,13 @@ checksum = "b4ce301924b7887e9d637144fdade93f9dfff9b60981d4ac161db09720d39aa5" [[package]] name = "local-ip-address" -version = "0.6.3" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3669cf5561f8d27e8fc84cc15e58350e70f557d4d65f70e3154e54cd2f8e1782" +checksum = "656b3b27f8893f7bbf9485148ff9a65f019e3f33bd5cdc87c83cab16b3fd9ec8" dependencies = [ "libc", "neli", - "thiserror 1.0.64", + "thiserror 2.0.12", "windows-sys 0.59.0", ] @@ -7759,7 +7759,7 @@ dependencies = [ "percent-encoding", "rustls", "rustls-pemfile", - "socket2", + "socket2 0.5.7", "twox-hash 2.1.0", "url", "webpki", @@ -7825,7 +7825,7 @@ dependencies = [ "rustls-pemfile", "serde", "serde_json", - "socket2", + "socket2 0.5.7", "thiserror 2.0.12", "tokio", "tokio-rustls", @@ -10221,7 +10221,7 @@ dependencies = [ "quinn-udp", "rustc-hash 2.0.0", "rustls", - "socket2", + "socket2 0.5.7", "thiserror 1.0.64", "tokio", "tracing", @@ -10252,7 +10252,7 @@ checksum = "4fe68c2e9e1a1234e218683dbdf9f9dfcb094113c5ac2b938dfcb9bab4c4140b" dependencies = [ "libc", "once_cell", - "socket2", + "socket2 0.5.7", "tracing", "windows-sys 0.59.0", ] @@ -10849,7 +10849,7 @@ dependencies = [ [[package]] name = "rskafka" version = "0.6.0" -source = "git+https://github.com/influxdata/rskafka.git?rev=8dbd01ed809f5a791833a594e85b144e36e45820#8dbd01ed809f5a791833a594e85b144e36e45820" +source = "git+https://github.com/WenyXu/rskafka.git?rev=40150bad95fddd39a772dc132ee9e92f8f91fe38#40150bad95fddd39a772dc132ee9e92f8f91fe38" dependencies = [ "bytes", "chrono", @@ -10863,6 +10863,7 @@ dependencies = [ "rsasl", "rustls", "snap", + "socket2 0.6.1", "thiserror 2.0.12", "tokio", "tokio-rustls", @@ -11613,7 +11614,7 @@ dependencies = [ "simd-json", "snafu 0.8.5", "snap", - "socket2", + "socket2 0.5.7", "sql", "store-api", "strum 0.27.1", @@ -11934,6 +11935,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "socket2" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881" +dependencies = [ + "libc", + "windows-sys 0.60.2", +] + [[package]] name = "spade" version = "2.12.1" @@ -12331,7 +12342,7 @@ dependencies = [ "cfg-if", "libc", "psm", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -13323,7 +13334,7 @@ dependencies = [ "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.7", "tokio-macros", "tracing", "windows-sys 0.52.0", @@ -13395,7 +13406,7 @@ dependencies = [ "postgres-protocol", "postgres-types", "rand 0.8.5", - "socket2", + "socket2 0.5.7", "tokio", "tokio-util", "whoami", @@ -13571,7 +13582,7 @@ dependencies = [ "pin-project", "prost 0.13.5", "rustls-pemfile", - "socket2", + "socket2 0.5.7", "tokio", "tokio-rustls", "tokio-stream", @@ -14598,7 +14609,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] @@ -14711,6 +14722,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38" +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + [[package]] name = "windows-registry" version = "0.2.0" @@ -14777,6 +14794,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets 0.53.5", +] + [[package]] name = "windows-targets" version = "0.48.5" @@ -14801,13 +14827,30 @@ dependencies = [ "windows_aarch64_gnullvm 0.52.6", "windows_aarch64_msvc 0.52.6", "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm", + "windows_i686_gnullvm 0.52.6", "windows_i686_msvc 0.52.6", "windows_x86_64_gnu 0.52.6", "windows_x86_64_gnullvm 0.52.6", "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows-targets" +version = "0.53.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" +dependencies = [ + "windows-link 0.2.1", + "windows_aarch64_gnullvm 0.53.1", + "windows_aarch64_msvc 0.53.1", + "windows_i686_gnu 0.53.1", + "windows_i686_gnullvm 0.53.1", + "windows_i686_msvc 0.53.1", + "windows_x86_64_gnu 0.53.1", + "windows_x86_64_gnullvm 0.53.1", + "windows_x86_64_msvc 0.53.1", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -14820,6 +14863,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -14832,6 +14881,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" + [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -14844,12 +14899,24 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" +[[package]] +name = "windows_i686_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" + [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" + [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -14862,6 +14929,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_i686_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -14874,6 +14947,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -14886,6 +14965,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -14898,6 +14983,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + [[package]] name = "winnow" version = "0.5.40" diff --git a/Cargo.toml b/Cargo.toml index cdc1c1ee16..0065db9487 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -188,7 +188,8 @@ reqwest = { version = "0.12", default-features = false, features = [ "stream", "multipart", ] } -rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "8dbd01ed809f5a791833a594e85b144e36e45820", features = [ +# Branch: feat/keepalive-port +rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "40150bad95fddd39a772dc132ee9e92f8f91fe38", features = [ "transport-tls", ] } rstest = "0.25" diff --git a/src/common/meta/src/wal_options_allocator/topic_creator.rs b/src/common/meta/src/wal_options_allocator/topic_creator.rs index f19b9476ee..d756c9770f 100644 --- a/src/common/meta/src/wal_options_allocator/topic_creator.rs +++ b/src/common/meta/src/wal_options_allocator/topic_creator.rs @@ -14,7 +14,8 @@ use common_telemetry::{debug, error, info}; use common_wal::config::kafka::common::{ - KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_BACKOFF_CONFIG, + KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT, + DEFAULT_KEEP_ALIVE_CONFIG, }; use rskafka::client::error::Error as RsKafkaError; use rskafka::client::error::ProtocolError::TopicAlreadyExists; @@ -209,7 +210,9 @@ impl KafkaTopicCreator { pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result { // Builds an kafka controller client for creating topics. let mut builder = ClientBuilder::new(connection.broker_endpoints.clone()) - .backoff_config(DEFAULT_BACKOFF_CONFIG); + .backoff_config(DEFAULT_BACKOFF_CONFIG) + .connect_timeout(Some(DEFAULT_CONNECT_TIMEOUT)) + .keepalive_config(DEFAULT_KEEP_ALIVE_CONFIG); if let Some(sasl) = &connection.sasl { builder = builder.sasl_config(sasl.config.clone().into_sasl_config()); }; diff --git a/src/common/wal/src/config/kafka/common.rs b/src/common/wal/src/config/kafka/common.rs index 41f9a379db..e898b9c50d 100644 --- a/src/common/wal/src/config/kafka/common.rs +++ b/src/common/wal/src/config/kafka/common.rs @@ -16,7 +16,7 @@ use std::io::Cursor; use std::sync::Arc; use std::time::Duration; -use rskafka::client::{Credentials, SaslConfig}; +use rskafka::client::{Credentials, KeepaliveConfig, SaslConfig}; use rskafka::BackoffConfig; use rustls::{ClientConfig, RootCertStore}; use serde::{Deserialize, Serialize}; @@ -35,6 +35,16 @@ pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig { deadline: Some(Duration::from_secs(3)), }; +/// The default keep-alive config for kafka client. +pub const DEFAULT_KEEP_ALIVE_CONFIG: KeepaliveConfig = KeepaliveConfig { + time: Some(Duration::from_secs(10)), + interval: Some(Duration::from_secs(7)), + retries: Some(3), +}; + +/// The default connect timeout for kafka client. +pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); + /// Default interval for auto WAL pruning. pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::ZERO; /// Default limit for concurrent auto pruning tasks. diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index c4128b1648..0984f4d921 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -15,7 +15,9 @@ use std::collections::HashMap; use std::sync::Arc; -use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG; +use common_wal::config::kafka::common::{ + DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT, DEFAULT_KEEP_ALIVE_CONFIG, +}; use common_wal::config::kafka::DatanodeKafkaConfig; use dashmap::DashMap; use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling}; @@ -77,7 +79,9 @@ impl ClientManager { ) -> Result { // Sets backoff config for the top-level kafka client and all clients constructed by it. let mut builder = ClientBuilder::new(config.connection.broker_endpoints.clone()) - .backoff_config(DEFAULT_BACKOFF_CONFIG); + .backoff_config(DEFAULT_BACKOFF_CONFIG) + .connect_timeout(Some(DEFAULT_CONNECT_TIMEOUT)) + .keepalive_config(DEFAULT_KEEP_ALIVE_CONFIG); if let Some(sasl) = &config.connection.sasl { builder = builder.sasl_config(sasl.config.clone().into_sasl_config()); }; diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index a6d4f7913e..d1e86e8732 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -14,6 +14,7 @@ use std::net::SocketAddr; use std::sync::Arc; +use std::time::Duration; use api::v1::meta::cluster_server::ClusterServer; use api::v1::meta::heartbeat_server::HeartbeatServer; @@ -82,6 +83,11 @@ use crate::service::admin; use crate::service::admin::admin_axum_router; use crate::{error, Result}; +/// The default keep-alive interval for gRPC. +const DEFAULT_GRPC_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(10); +/// The default keep-alive timeout for gRPC. +const DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10); + pub struct MetasrvInstance { metasrv: Arc, @@ -271,7 +277,12 @@ macro_rules! add_compressed_service { } pub fn router(metasrv: Arc) -> Router { - let mut router = tonic::transport::Server::builder().accept_http1(true); // for admin services + let mut router = tonic::transport::Server::builder() + // for admin services + .accept_http1(true) + // For quick network failures detection. + .http2_keepalive_interval(Some(DEFAULT_GRPC_KEEP_ALIVE_INTERVAL)) + .http2_keepalive_timeout(Some(DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT)); let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone())); let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone())); let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone())); diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 6ab5496c7c..fe8134e7b5 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -268,6 +268,15 @@ impl Pushers { async fn remove(&self, pusher_id: &str) -> Option { self.0.write().await.remove(pusher_id) } + + pub(crate) async fn clear(&self) -> Vec { + let mut pushers = self.0.write().await; + let keys = pushers.keys().cloned().collect::>(); + if !keys.is_empty() { + pushers.clear(); + } + keys + } } #[derive(Clone)] @@ -304,10 +313,11 @@ impl HeartbeatHandlerGroup { /// Deregisters the heartbeat response [`Pusher`] with the given key from the group. /// /// Returns the [`Pusher`] if it exists. - pub async fn deregister_push(&self, pusher_id: PusherId) -> Option { - METRIC_META_HEARTBEAT_CONNECTION_NUM.dec(); + pub async fn deregister_push(&self, pusher_id: PusherId) { info!("Pusher unregister: {}", pusher_id); - self.pushers.remove(&pusher_id.string_key()).await + if self.pushers.remove(&pusher_id.string_key()).await.is_some() { + METRIC_META_HEARTBEAT_CONNECTION_NUM.dec(); + } } /// Returns the [`Pushers`] of the group. @@ -516,6 +526,14 @@ impl Mailbox for HeartbeatMailbox { Ok(()) } + + async fn reset(&self) { + let keys = self.pushers.clear().await; + if !keys.is_empty() { + info!("Reset mailbox, deregister pushers: {:?}", keys); + METRIC_META_HEARTBEAT_CONNECTION_NUM.sub(keys.len() as i64); + } + } } /// The builder to build the group of heartbeat handlers. diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index a62d875954..185b4ee8ba 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -388,6 +388,7 @@ pub struct MetaStateHandler { greptimedb_telemetry_task: Arc, leader_cached_kv_backend: Arc, leadership_change_notifier: LeadershipChangeNotifier, + mailbox: MailboxRef, state: StateRef, } @@ -411,6 +412,9 @@ impl MetaStateHandler { pub async fn on_leader_stop(&self) { self.state.write().unwrap().next_state(become_follower()); + // Enforces the mailbox to clear all pushers. + // The remaining heartbeat connections will be closed by the remote peer or keep-alive detection. + self.mailbox.reset().await; self.leadership_change_notifier .notify_on_leader_stop() .await; @@ -528,6 +532,7 @@ impl Metasrv { state: self.state.clone(), leader_cached_kv_backend: leader_cached_kv_backend.clone(), leadership_change_notifier, + mailbox: self.mailbox.clone(), }; let _handle = common_runtime::spawn_global(async move { loop { diff --git a/src/meta-srv/src/service/mailbox.rs b/src/meta-srv/src/service/mailbox.rs index f339e5c4da..bede162936 100644 --- a/src/meta-srv/src/service/mailbox.rs +++ b/src/meta-srv/src/service/mailbox.rs @@ -207,6 +207,9 @@ pub trait Mailbox: Send + Sync { async fn broadcast(&self, ch: &BroadcastChannel, msg: &MailboxMessage) -> Result<()>; async fn on_recv(&self, id: MessageId, maybe_msg: Result) -> Result<()>; + + /// Reset all pushers of the mailbox. + async fn reset(&self); } #[cfg(test)]