mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 14:22:58 +00:00
fix: dn doesn't have chance to send a heartbeat to the new leader (#2471)
* refactor: set meta leader lease secs to 3s * fix: correct default heartbeat interval * refactor: ask meta leader in parallel * feat: configure heartbeat client timeout to 500ms * fix: trigger to send heartbeat immediately after fail * fix: fix clippy
This commit is contained in:
@@ -13,13 +13,15 @@ rpc_runtime_size = 8
|
||||
require_lease_before_startup = false
|
||||
|
||||
[heartbeat]
|
||||
# Interval for sending heartbeat messages to the Metasrv in milliseconds, 5000 by default.
|
||||
interval_millis = 5000
|
||||
# Interval for sending heartbeat messages to the Metasrv in milliseconds, 3000 by default.
|
||||
interval_millis = 3000
|
||||
|
||||
# Metasrv client options.
|
||||
[meta_client]
|
||||
# Metasrv address list.
|
||||
metasrv_addrs = ["127.0.0.1:3002"]
|
||||
# Heartbeat timeout in milliseconds, 500 by default.
|
||||
heartbeat_timeout_millis = 500
|
||||
# Operation timeout in milliseconds, 3000 by default.
|
||||
timeout_millis = 3000
|
||||
# Connect server timeout in milliseconds, 5000 by default.
|
||||
|
||||
@@ -255,6 +255,7 @@ mod tests {
|
||||
connect_timeout_millis,
|
||||
tcp_nodelay,
|
||||
ddl_timeout_millis,
|
||||
..
|
||||
} = options.meta_client.unwrap();
|
||||
|
||||
assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
|
||||
|
||||
@@ -29,3 +29,9 @@ pub const REGION_LEASE_SECS: u64 =
|
||||
/// When creating table or region failover, a target node needs to be selected.
|
||||
/// If the node's lease has expired, the `Selector` will not select it.
|
||||
pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS;
|
||||
|
||||
/// The lease seconds of metasrv leader.
|
||||
pub const META_LEASE_SECS: u64 = 3;
|
||||
|
||||
// In a lease, there are two opportunities for renewal.
|
||||
pub const META_KEEP_ALIVE_INTERVAL_SECS: u64 = META_LEASE_SECS / 2;
|
||||
|
||||
@@ -245,6 +245,8 @@ impl HeartbeatTask {
|
||||
Ok(new_tx) => {
|
||||
info!("Reconnected to metasrv");
|
||||
tx = new_tx;
|
||||
// Triggers to send heartbeat immediately.
|
||||
sleep.as_mut().reset(Instant::now());
|
||||
}
|
||||
Err(e) => {
|
||||
error!(e;"Failed to reconnect to metasrv!");
|
||||
|
||||
@@ -62,6 +62,7 @@ pub struct MetaClientBuilder {
|
||||
enable_ddl: bool,
|
||||
channel_manager: Option<ChannelManager>,
|
||||
ddl_channel_manager: Option<ChannelManager>,
|
||||
heartbeat_channel_manager: Option<ChannelManager>,
|
||||
}
|
||||
|
||||
impl MetaClientBuilder {
|
||||
@@ -122,6 +123,13 @@ impl MetaClientBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn heartbeat_channel_manager(self, channel_manager: ChannelManager) -> Self {
|
||||
Self {
|
||||
heartbeat_channel_manager: Some(channel_manager),
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build(self) -> MetaClient {
|
||||
let mut client = if let Some(mgr) = self.channel_manager {
|
||||
MetaClient::with_channel_manager(self.id, mgr)
|
||||
@@ -136,10 +144,11 @@ impl MetaClientBuilder {
|
||||
let mgr = client.channel_manager.clone();
|
||||
|
||||
if self.enable_heartbeat {
|
||||
let mgr = self.heartbeat_channel_manager.unwrap_or(mgr.clone());
|
||||
client.heartbeat = Some(HeartbeatClient::new(
|
||||
self.id,
|
||||
self.role,
|
||||
mgr.clone(),
|
||||
mgr,
|
||||
DEFAULT_ASK_LEADER_MAX_RETRY,
|
||||
));
|
||||
}
|
||||
|
||||
@@ -13,13 +13,16 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::heartbeat_client::HeartbeatClient;
|
||||
use api::v1::meta::{AskLeaderRequest, RequestHeader, Role};
|
||||
use common_grpc::channel_manager::ChannelManager;
|
||||
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
|
||||
use common_telemetry::warn;
|
||||
use rand::seq::SliceRandom;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tokio::time::timeout;
|
||||
use tonic::transport::Channel;
|
||||
|
||||
use crate::client::Id;
|
||||
@@ -73,29 +76,44 @@ impl AskLeader {
|
||||
};
|
||||
peers.shuffle(&mut rand::thread_rng());
|
||||
|
||||
let header = RequestHeader::new(self.id, self.role);
|
||||
let mut leader = None;
|
||||
let req = AskLeaderRequest {
|
||||
header: Some(RequestHeader::new(self.id, self.role)),
|
||||
};
|
||||
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len());
|
||||
|
||||
for addr in &peers {
|
||||
let req = AskLeaderRequest {
|
||||
header: Some(header.clone()),
|
||||
};
|
||||
let mut client = self.create_asker(addr)?;
|
||||
match client.ask_leader(req).await {
|
||||
Ok(res) => {
|
||||
let Some(endpoint) = res.into_inner().leader else {
|
||||
warn!("No leader from: {addr}");
|
||||
continue;
|
||||
};
|
||||
leader = Some(endpoint.addr);
|
||||
break;
|
||||
let tx_clone = tx.clone();
|
||||
let req = req.clone();
|
||||
let addr = addr.to_string();
|
||||
tokio::spawn(async move {
|
||||
match client.ask_leader(req).await {
|
||||
Ok(res) => {
|
||||
if let Some(endpoint) = res.into_inner().leader {
|
||||
let _ = tx_clone.send(endpoint.addr).await;
|
||||
} else {
|
||||
warn!("No leader from: {addr}");
|
||||
};
|
||||
}
|
||||
Err(status) => {
|
||||
warn!("Failed to ask leader from: {addr}, {status}");
|
||||
}
|
||||
}
|
||||
Err(status) => {
|
||||
warn!("Failed to ask leader from: {addr}, {status}");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let leader = leader.context(error::NoLeaderSnafu)?;
|
||||
let leader = timeout(
|
||||
self.channel_manager
|
||||
.config()
|
||||
.timeout
|
||||
.unwrap_or(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS)),
|
||||
rx.recv(),
|
||||
)
|
||||
.await
|
||||
.context(error::AskLeaderTimeoutSnafu)?
|
||||
.context(error::NoLeaderSnafu)?;
|
||||
|
||||
let mut leadership_group = self.leadership_group.write().unwrap();
|
||||
leadership_group.leader = Some(leader.clone());
|
||||
|
||||
|
||||
@@ -33,6 +33,12 @@ pub enum Error {
|
||||
#[snafu(display("No leader, should ask leader first"))]
|
||||
NoLeader { location: Location },
|
||||
|
||||
#[snafu(display("Ask leader timeout"))]
|
||||
AskLeaderTimeout {
|
||||
location: Location,
|
||||
source: tokio::time::error::Elapsed,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to create gRPC channel"))]
|
||||
CreateChannel {
|
||||
location: Location,
|
||||
@@ -83,6 +89,7 @@ impl ErrorExt for Error {
|
||||
Error::IllegalGrpcClientState { .. }
|
||||
| Error::AskLeader { .. }
|
||||
| Error::NoLeader { .. }
|
||||
| Error::AskLeaderTimeout { .. }
|
||||
| Error::NotStarted { .. }
|
||||
| Error::SendHeartbeat { .. }
|
||||
| Error::CreateHeartbeatStream { .. }
|
||||
|
||||
@@ -22,12 +22,18 @@ pub mod error;
|
||||
pub struct MetaClientOptions {
|
||||
pub metasrv_addrs: Vec<String>,
|
||||
pub timeout_millis: u64,
|
||||
#[serde(default = "default_heartbeat_timeout_millis")]
|
||||
pub heartbeat_timeout_millis: u64,
|
||||
#[serde(default = "default_ddl_timeout_millis")]
|
||||
pub ddl_timeout_millis: u64,
|
||||
pub connect_timeout_millis: u64,
|
||||
pub tcp_nodelay: bool,
|
||||
}
|
||||
|
||||
fn default_heartbeat_timeout_millis() -> u64 {
|
||||
500u64
|
||||
}
|
||||
|
||||
fn default_ddl_timeout_millis() -> u64 {
|
||||
10_000u64
|
||||
}
|
||||
@@ -37,6 +43,7 @@ impl Default for MetaClientOptions {
|
||||
Self {
|
||||
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
|
||||
timeout_millis: 3_000u64,
|
||||
heartbeat_timeout_millis: default_heartbeat_timeout_millis(),
|
||||
ddl_timeout_millis: default_ddl_timeout_millis(),
|
||||
connect_timeout_millis: 5_000u64,
|
||||
tcp_nodelay: true,
|
||||
|
||||
@@ -21,9 +21,6 @@ use tokio::sync::broadcast::Receiver;
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
pub const LEASE_SECS: i64 = 5;
|
||||
// In a lease, there are two opportunities for renewal.
|
||||
pub const KEEP_ALIVE_PERIOD_SECS: u64 = LEASE_SECS as u64 / 2;
|
||||
pub const ELECTION_KEY: &str = "__meta_srv_election";
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::distributed_time_constants::{META_KEEP_ALIVE_INTERVAL_SECS, META_LEASE_SECS};
|
||||
use common_telemetry::{error, info, warn};
|
||||
use etcd_client::Client;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -23,9 +24,7 @@ use tokio::sync::broadcast;
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
|
||||
use crate::election::{
|
||||
Election, LeaderChangeMessage, ELECTION_KEY, KEEP_ALIVE_PERIOD_SECS, LEASE_SECS,
|
||||
};
|
||||
use crate::election::{Election, LeaderChangeMessage, ELECTION_KEY};
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::metasrv::{ElectionRef, LeaderValue};
|
||||
@@ -114,7 +113,7 @@ impl Election for EtcdElection {
|
||||
let mut lease_client = self.client.lease_client();
|
||||
let mut election_client = self.client.election_client();
|
||||
let res = lease_client
|
||||
.grant(LEASE_SECS, None)
|
||||
.grant(META_LEASE_SECS as i64, None)
|
||||
.await
|
||||
.context(error::EtcdFailedSnafu)?;
|
||||
let lease_id = res.id();
|
||||
@@ -140,7 +139,7 @@ impl Election for EtcdElection {
|
||||
.context(error::EtcdFailedSnafu)?;
|
||||
|
||||
let mut keep_alive_interval =
|
||||
tokio::time::interval(Duration::from_secs(KEEP_ALIVE_PERIOD_SECS));
|
||||
tokio::time::interval(Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS));
|
||||
loop {
|
||||
let _ = keep_alive_interval.tick().await;
|
||||
keeper.keep_alive().await.context(error::EtcdFailedSnafu)?;
|
||||
|
||||
Reference in New Issue
Block a user