diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 2202eae30e..4f52380d65 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -25,7 +25,7 @@ heartbeat_timeout_millis = 500 # Operation timeout in milliseconds, 3000 by default. timeout_millis = 3000 # Connect server timeout in milliseconds, 5000 by default. -connect_timeout_millis = 5000 +connect_timeout_millis = 1000 # `TCP_NODELAY` option for accepted connections, true by default. tcp_nodelay = true diff --git a/config/frontend.example.toml b/config/frontend.example.toml index c87dc8292c..e87936551f 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -62,7 +62,7 @@ metasrv_addrs = ["127.0.0.1:3002"] timeout_millis = 3000 # DDL timeouts options. ddl_timeout_millis = 10000 -connect_timeout_millis = 5000 +connect_timeout_millis = 1000 tcp_nodelay = true # Log options, see `standalone.example.toml` diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs index f52177e289..d1e372b677 100644 --- a/src/common/grpc/src/channel_manager.rs +++ b/src/common/grpc/src/channel_manager.rs @@ -30,7 +30,7 @@ use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, InvalidTlsCon const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60; pub const DEFAULT_GRPC_REQUEST_TIMEOUT_SECS: u64 = 10; -pub const DEFAULT_GRPC_CONNECT_TIMEOUT_SECS: u64 = 10; +pub const DEFAULT_GRPC_CONNECT_TIMEOUT_SECS: u64 = 1; pub const DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE: usize = 512 * 1024 * 1024; pub const DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE: usize = 512 * 1024 * 1024; diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 5bf6fb7cbb..44b48a2236 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -18,6 +18,7 @@ use std::time::Duration; use api::v1::meta::{HeartbeatRequest, Peer, RegionStat, Role}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::{ HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, @@ -97,6 +98,7 @@ impl HeartbeatTask { handler_executor: HeartbeatResponseHandlerExecutorRef, mailbox: MailboxRef, mut notify: Option>, + quit_signal: Arc, ) -> Result { let client_id = meta_client.id(); @@ -123,7 +125,8 @@ impl HeartbeatTask { info!("Heartbeat task shutdown"); } } - info!("Heartbeat handling loop exit.") + quit_signal.notify_one(); + info!("Heartbeat handling loop exit."); }); Ok(tx) } @@ -167,12 +170,15 @@ impl HeartbeatTask { let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16); let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx)); + let quit_signal = Arc::new(tokio::sync::Notify::new()); + let mut tx = Self::create_streams( &meta_client, running.clone(), handler_executor.clone(), mailbox.clone(), notify, + quit_signal.clone(), ) .await?; @@ -187,7 +193,6 @@ impl HeartbeatTask { common_runtime::spawn_bg(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); tokio::pin!(sleep); - loop { if !running.load(Ordering::Relaxed) { info!("shutdown heartbeat task"); @@ -228,6 +233,11 @@ impl HeartbeatTask { sleep.as_mut().reset(now + Duration::from_millis(interval)); Some(req) } + // If the heartbeat stream is broken, send a dummy heartbeat request to re-create the heartbeat stream. + _ = quit_signal.notified() => { + let req = HeartbeatRequest::default(); + Some(req) + } }; if let Some(req) = req { debug!("Sending heartbeat request: {:?}", req); @@ -239,6 +249,7 @@ impl HeartbeatTask { handler_executor.clone(), mailbox.clone(), None, + quit_signal.clone(), ) .await { @@ -249,6 +260,13 @@ impl HeartbeatTask { sleep.as_mut().reset(Instant::now()); } Err(e) => { + // Before the META_LEASE_SECS expires, + // any retries are meaningless, it always reads the old meta leader address. + // Triggers to retry after META_KEEP_ALIVE_INTERVAL_SECS. + sleep.as_mut().reset( + Instant::now() + + Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS), + ); error!(e;"Failed to reconnect to metasrv!"); } } @@ -317,13 +335,19 @@ pub async fn new_metasrv_client( .timeout(Duration::from_millis(meta_config.timeout_millis)) .connect_timeout(Duration::from_millis(meta_config.connect_timeout_millis)) .tcp_nodelay(meta_config.tcp_nodelay); - let channel_manager = ChannelManager::with_config(config); + let channel_manager = ChannelManager::with_config(config.clone()); + let heartbeat_channel_manager = ChannelManager::with_config( + config + .timeout(Duration::from_millis(meta_config.heartbeat_timeout_millis)) + .connect_timeout(Duration::from_millis(meta_config.heartbeat_timeout_millis)), + ); let mut meta_client = MetaClientBuilder::new(cluster_id, member_id, Role::Datanode) .enable_heartbeat() .enable_router() .enable_store() .channel_manager(channel_manager) + .heartbeat_channel_manager(heartbeat_channel_manager) .build(); meta_client .start(&meta_config.metasrv_addrs) diff --git a/src/meta-client/src/lib.rs b/src/meta-client/src/lib.rs index ad18718040..9b3d5cfcf2 100644 --- a/src/meta-client/src/lib.rs +++ b/src/meta-client/src/lib.rs @@ -45,7 +45,7 @@ impl Default for MetaClientOptions { timeout_millis: 3_000u64, heartbeat_timeout_millis: default_heartbeat_timeout_millis(), ddl_timeout_millis: default_ddl_timeout_millis(), - connect_timeout_millis: 5_000u64, + connect_timeout_millis: 1_000u64, tcp_nodelay: true, } }