diff --git a/config/datanode.example.toml b/config/datanode.example.toml index ddea4523fe..2202eae30e 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -13,13 +13,15 @@ rpc_runtime_size = 8 require_lease_before_startup = false [heartbeat] -# Interval for sending heartbeat messages to the Metasrv in milliseconds, 5000 by default. -interval_millis = 5000 +# Interval for sending heartbeat messages to the Metasrv in milliseconds, 3000 by default. +interval_millis = 3000 # Metasrv client options. [meta_client] # Metasrv address list. metasrv_addrs = ["127.0.0.1:3002"] +# Heartbeat timeout in milliseconds, 500 by default. +heartbeat_timeout_millis = 500 # Operation timeout in milliseconds, 3000 by default. timeout_millis = 3000 # Connect server timeout in milliseconds, 5000 by default. diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 8b6daef959..e2ff160eef 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -255,6 +255,7 @@ mod tests { connect_timeout_millis, tcp_nodelay, ddl_timeout_millis, + .. } = options.meta_client.unwrap(); assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr); diff --git a/src/common/meta/src/distributed_time_constants.rs b/src/common/meta/src/distributed_time_constants.rs index e271a0943a..a90629f14a 100644 --- a/src/common/meta/src/distributed_time_constants.rs +++ b/src/common/meta/src/distributed_time_constants.rs @@ -29,3 +29,9 @@ pub const REGION_LEASE_SECS: u64 = /// When creating table or region failover, a target node needs to be selected. /// If the node's lease has expired, the `Selector` will not select it. pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS; + +/// The lease seconds of metasrv leader. +pub const META_LEASE_SECS: u64 = 3; + +// In a lease, there are two opportunities for renewal. +pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2; diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 2b7cb04c5c..5bf6fb7cbb 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -245,6 +245,8 @@ impl HeartbeatTask { Ok(new_tx) => { info!("Reconnected to metasrv"); tx = new_tx; + // Triggers to send heartbeat immediately. + sleep.as_mut().reset(Instant::now()); } Err(e) => { error!(e;"Failed to reconnect to metasrv!"); diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index ab6700af04..73e132353a 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -62,6 +62,7 @@ pub struct MetaClientBuilder { enable_ddl: bool, channel_manager: Option, ddl_channel_manager: Option, + heartbeat_channel_manager: Option, } impl MetaClientBuilder { @@ -122,6 +123,13 @@ impl MetaClientBuilder { } } + pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self { + Self { + heartbeat_channel_manager: Some(channel_manager), + ..self + } + } + pub fn build(self) -> MetaClient { let mut client = if let Some(mgr) = self.channel_manager { MetaClient::with_channel_manager(self.id, mgr) @@ -136,10 +144,11 @@ impl MetaClientBuilder { let mgr = client.channel_manager.clone(); if self.enable_heartbeat { + let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone()); client.heartbeat = Some(HeartbeatClient::new( self.id, self.role, - mgr.clone(), + mgr, DEFAULT_ASK_LEADER_MAX_RETRY, )); } diff --git a/src/meta-client/src/client/ask_leader.rs b/src/meta-client/src/client/ask_leader.rs index e79294bf1f..40c731f2cd 100644 --- a/src/meta-client/src/client/ask_leader.rs +++ b/src/meta-client/src/client/ask_leader.rs @@ -13,13 +13,16 @@ // limitations under the License. use std::sync::{Arc, RwLock}; +use std::time::Duration; use api::v1::meta::heartbeat_client::HeartbeatClient; use api::v1::meta::{AskLeaderRequest, RequestHeader, Role}; use common_grpc::channel_manager::ChannelManager; +use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS; use common_telemetry::warn; use rand::seq::SliceRandom; use snafu::{OptionExt, ResultExt}; +use tokio::time::timeout; use tonic::transport::Channel; use crate::client::Id; @@ -73,29 +76,44 @@ impl AskLeader { }; peers.shuffle(&mut rand::thread_rng()); - let header = RequestHeader::new(self.id, self.role); - let mut leader = None; + let req = AskLeaderRequest { + header: Some(RequestHeader::new(self.id, self.role)), + }; + + let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len()); + for addr in &peers { - let req = AskLeaderRequest { - header: Some(header.clone()), - }; let mut client = self.create_asker(addr)?; - match client.ask_leader(req).await { - Ok(res) => { - let Some(endpoint) = res.into_inner().leader else { - warn!("No leader from: {addr}"); - continue; - }; - leader = Some(endpoint.addr); - break; + let tx_clone = tx.clone(); + let req = req.clone(); + let addr = addr.to_string(); + tokio::spawn(async move { + match client.ask_leader(req).await { + Ok(res) => { + if let Some(endpoint) = res.into_inner().leader { + let _ = tx_clone.send(endpoint.addr).await; + } else { + warn!("No leader from: {addr}"); + }; + } + Err(status) => { + warn!("Failed to ask leader from: {addr}, {status}"); + } } - Err(status) => { - warn!("Failed to ask leader from: {addr}, {status}"); - } - } + }); } - let leader = leader.context(error::NoLeaderSnafu)?; + let leader = timeout( + self.channel_manager + .config() + .timeout + .unwrap_or(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)), + rx.recv(), + ) + .await + .context(error::AskLeaderTimeoutSnafu)? + .context(error::NoLeaderSnafu)?; + let mut leadership_group = self.leadership_group.write().unwrap(); leadership_group.leader = Some(leader.clone()); diff --git a/src/meta-client/src/error.rs b/src/meta-client/src/error.rs index f302a31360..8644e18948 100644 --- a/src/meta-client/src/error.rs +++ b/src/meta-client/src/error.rs @@ -33,6 +33,12 @@ pub enum Error { #[snafu(display("No leader, should ask leader first"))] NoLeader { location: Location }, + #[snafu(display("Ask leader timeout"))] + AskLeaderTimeout { + location: Location, + source: tokio::time::error::Elapsed, + }, + #[snafu(display("Failed to create gRPC channel"))] CreateChannel { location: Location, @@ -83,6 +89,7 @@ impl ErrorExt for Error { Error::IllegalGrpcClientState { .. } | Error::AskLeader { .. } | Error::NoLeader { .. } + | Error::AskLeaderTimeout { .. } | Error::NotStarted { .. } | Error::SendHeartbeat { .. } | Error::CreateHeartbeatStream { .. } diff --git a/src/meta-client/src/lib.rs b/src/meta-client/src/lib.rs index 3489bccbf7..ad18718040 100644 --- a/src/meta-client/src/lib.rs +++ b/src/meta-client/src/lib.rs @@ -22,12 +22,18 @@ pub mod error; pub struct MetaClientOptions { pub metasrv_addrs: Vec, pub timeout_millis: u64, + #[serde(default = "default_heartbeat_timeout_millis")] + pub heartbeat_timeout_millis: u64, #[serde(default = "default_ddl_timeout_millis")] pub ddl_timeout_millis: u64, pub connect_timeout_millis: u64, pub tcp_nodelay: bool, } +fn default_heartbeat_timeout_millis() -> u64 { + 500u64 +} + fn default_ddl_timeout_millis() -> u64 { 10_000u64 } @@ -37,6 +43,7 @@ impl Default for MetaClientOptions { Self { metasrv_addrs: vec!["127.0.0.1:3002".to_string()], timeout_millis: 3_000u64, + heartbeat_timeout_millis: default_heartbeat_timeout_millis(), ddl_timeout_millis: default_ddl_timeout_millis(), connect_timeout_millis: 5_000u64, tcp_nodelay: true, diff --git a/src/meta-srv/src/election.rs b/src/meta-srv/src/election.rs index 98d3e29649..5c2bc503e4 100644 --- a/src/meta-srv/src/election.rs +++ b/src/meta-srv/src/election.rs @@ -21,9 +21,6 @@ use tokio::sync::broadcast::Receiver; use crate::error::Result; -pub const LEASE_SECS: i64 = 5; -// In a lease, there are two opportunities for renewal. -pub const KEEP_ALIVE_PERIOD_SECS: u64 = LEASE_SECS as u64 / 2; pub const ELECTION_KEY: &str = "__meta_srv_election"; #[derive(Debug, Clone)] diff --git a/src/meta-srv/src/election/etcd.rs b/src/meta-srv/src/election/etcd.rs index 96db650853..30dcc5518d 100644 --- a/src/meta-srv/src/election/etcd.rs +++ b/src/meta-srv/src/election/etcd.rs @@ -16,6 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; +use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS}; use common_telemetry::{error, info, warn}; use etcd_client::Client; use snafu::{OptionExt, ResultExt}; @@ -23,9 +24,7 @@ use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::Receiver; -use crate::election::{ - Election, LeaderChangeMessage, ELECTION_KEY, KEEP_ALIVE_PERIOD_SECS, LEASE_SECS, -}; +use crate::election::{Election, LeaderChangeMessage, ELECTION_KEY}; use crate::error; use crate::error::Result; use crate::metasrv::{ElectionRef, LeaderValue}; @@ -114,7 +113,7 @@ impl Election for EtcdElection { let mut lease_client = self.client.lease_client(); let mut election_client = self.client.election_client(); let res = lease_client - .grant(LEASE_SECS, None) + .grant(META_LEASE_SECS as i64, None) .await .context(error::EtcdFailedSnafu)?; let lease_id = res.id(); @@ -140,7 +139,7 @@ impl Election for EtcdElection { .context(error::EtcdFailedSnafu)?; let mut keep_alive_interval = - tokio::time::interval(Duration::from_secs(KEEP_ALIVE_PERIOD_SECS)); + tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)); loop { let _ = keep_alive_interval.tick().await; keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;