From 898e84898ce102a85a751e014630445cea8fff61 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Tue, 6 Jan 2026 17:43:36 +0800 Subject: [PATCH] feat!: make heartbeat config only in metasrv (#7510) * feat: make heartbeat config only in metasrv Signed-off-by: jeremyhi * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * feat: refine config doc Signed-off-by: jeremyhi * feat: make the heartbeat setup simple Signed-off-by: jeremyhi * chore: by comment Signed-off-by: jeremyhi * chore: revert config Signed-off-by: jeremyhi * feat: proto update Signed-off-by: jeremyhi * chore: fix sqlness wrong cfg Signed-off-by: jeremyhi --------- Signed-off-by: jeremyhi Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/cmd/src/datanode.rs | 11 ---- src/cmd/src/flownode.rs | 1 - src/cmd/tests/load_config_test.rs | 1 - src/datanode/src/alive_keeper.rs | 29 +++++++--- src/datanode/src/config.rs | 3 - src/datanode/src/heartbeat.rs | 48 +++++++++------- src/flow/src/adapter.rs | 3 - src/flow/src/heartbeat.rs | 30 +++++----- src/frontend/src/frontend.rs | 13 ++--- src/frontend/src/heartbeat.rs | 22 ++++---- src/meta-client/examples/meta_client.rs | 2 +- src/meta-client/src/client.rs | 10 ++-- src/meta-client/src/client/heartbeat.rs | 65 +++++++++++++++++++-- src/meta-srv/src/handler.rs | 18 ++++++ src/meta-srv/src/handler/test_utils.rs | 3 + src/meta-srv/src/metasrv.rs | 75 ++++++++++++++++++++++++- src/meta-srv/src/service/heartbeat.rs | 5 +- src/servers/src/heartbeat_options.rs | 52 ----------------- src/servers/src/lib.rs | 2 +- tests/conf/datanode-test.toml.template | 3 - tests/conf/metasrv-test.toml.template | 1 + 23 files changed, 252 insertions(+), 149 deletions(-) delete mode 100644 src/servers/src/heartbeat_options.rs diff --git a/Cargo.lock b/Cargo.lock index fc5ea07564..fc0e240a40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5467,7 +5467,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a2e5099d72a1cfa8ba41fa4296101eb5f874074a#a2e5099d72a1cfa8ba41fa4296101eb5f874074a" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=fa0f5716556b3276317701fffa702002e7fce275#fa0f5716556b3276317701fffa702002e7fce275" dependencies = [ "prost 0.13.5", "prost-types 0.13.5", diff --git a/Cargo.toml b/Cargo.toml index 594002bedb..9ed26f3328 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -151,7 +151,7 @@ etcd-client = { version = "0.16.1", features = [ fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a2e5099d72a1cfa8ba41fa4296101eb5f874074a" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fa0f5716556b3276317701fffa702002e7fce275" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 23ca644ffc..9557bba6d8 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -330,7 +330,6 @@ mod tests { use common_config::ENV_VAR_SEP; use common_test_util::temp_dir::create_named_temp_file; use object_store::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config}; - use servers::heartbeat_options::HeartbeatOptions; use super::*; use crate::options::GlobalOptions; @@ -374,9 +373,6 @@ mod tests { hostname = "127.0.0.1" runtime_size = 8 - [heartbeat] - interval = "300ms" - [meta_client] metasrv_addrs = ["127.0.0.1:3002"] timeout = "3s" @@ -434,13 +430,6 @@ mod tests { ); assert!(!raft_engine_config.sync_write); - let HeartbeatOptions { - interval: heart_beat_interval, - .. - } = options.heartbeat; - - assert_eq!(300, heart_beat_interval.as_millis()); - let MetaClientOptions { metasrv_addrs: metasrv_addr, timeout, diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index 6cefdb0f79..3f8458cddf 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -358,7 +358,6 @@ impl StartCommand { let heartbeat_task = flow::heartbeat::HeartbeatTask::new( &opts, meta_client.clone(), - opts.heartbeat.clone(), Arc::new(executor), Arc::new(resource_stat), ); diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index 79b42dbfc1..2300a2250e 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -228,7 +228,6 @@ fn test_load_flownode_example_config() { ..Default::default() }, tracing: Default::default(), - heartbeat: Default::default(), // flownode deliberately use a slower query parallelism // to avoid overwhelming the frontend with too many queries query: QueryOptions { diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs index fd6d7da8c4..57f4e00aa2 100644 --- a/src/datanode/src/alive_keeper.rs +++ b/src/datanode/src/alive_keeper.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use api::v1::meta::GrantedRegion; use async_trait::async_trait; @@ -50,7 +50,7 @@ use crate::region_server::RegionServer; pub struct RegionAliveKeeper { region_server: RegionServer, tasks: Arc>>>, - heartbeat_interval_millis: u64, + heartbeat_interval_millis: Arc, started: Arc, /// The epoch when [RegionAliveKeeper] is created. It's used to get a monotonically non-decreasing @@ -67,18 +67,26 @@ impl RegionAliveKeeper { pub fn new( region_server: RegionServer, countdown_task_handler_ext: Option, - heartbeat_interval_millis: u64, + heartbeat_interval: Duration, ) -> Self { Self { region_server, tasks: Arc::new(Mutex::new(HashMap::new())), - heartbeat_interval_millis, + heartbeat_interval_millis: Arc::new(AtomicU64::new( + heartbeat_interval.as_millis() as u64 + )), started: Arc::new(AtomicBool::new(false)), epoch: Instant::now(), countdown_task_handler_ext, } } + /// Update the heartbeat interval with the value received from Metasrv. + pub fn update_heartbeat_interval(&self, heartbeat_interval_millis: u64) { + self.heartbeat_interval_millis + .store(heartbeat_interval_millis, Ordering::Relaxed); + } + async fn find_handle(&self, region_id: RegionId) -> Option> { self.tasks.lock().await.get(®ion_id).cloned() } @@ -108,7 +116,9 @@ impl RegionAliveKeeper { }; if should_start { - handle.start(self.heartbeat_interval_millis).await; + handle + .start(self.heartbeat_interval_millis.load(Ordering::Relaxed)) + .await; info!("Region alive countdown for region {region_id} is started!"); } else { info!( @@ -230,8 +240,9 @@ impl RegionAliveKeeper { } let tasks = self.tasks.lock().await; + let interval = self.heartbeat_interval_millis.load(Ordering::Relaxed); for task in tasks.values() { - task.start(self.heartbeat_interval_millis).await; + task.start(interval).await; } info!( @@ -505,7 +516,11 @@ mod test { let engine = Arc::new(engine); region_server.register_engine(engine.clone()); - let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), None, 100)); + let alive_keeper = Arc::new(RegionAliveKeeper::new( + region_server.clone(), + None, + Duration::from_millis(100), + )); let region_id = RegionId::new(1024, 1); let builder = CreateRequestBuilder::new(); diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 19b4647b8e..6f6815a869 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -29,7 +29,6 @@ pub(crate) use object_store::config::ObjectStoreConfig; use query::options::QueryOptions; use serde::{Deserialize, Serialize}; use servers::grpc::GrpcOptions; -use servers::heartbeat_options::HeartbeatOptions; use servers::http::HttpOptions; /// Storage engine config @@ -71,7 +70,6 @@ pub struct DatanodeOptions { pub init_regions_in_background: bool, pub init_regions_parallelism: usize, pub grpc: GrpcOptions, - pub heartbeat: HeartbeatOptions, pub http: HttpOptions, pub meta_client: Option, pub wal: DatanodeWalConfig, @@ -134,7 +132,6 @@ impl Default for DatanodeOptions { RegionEngineConfig::File(FileEngineConfig::default()), ], logging: LoggingOptions::default(), - heartbeat: HeartbeatOptions::datanode_default(), enable_telemetry: true, tracing: TracingOptions::default(), query: QueryOptions::default(), diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 2b07adf06e..ee538e1280 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -22,7 +22,7 @@ use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionR use common_base::Plugins; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::datanode::REGION_STATISTIC_KEY; -use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS; +use common_meta::distributed_time_constants::BASE_HEARTBEAT_INTERVAL; use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::suspend::SuspendHandler; @@ -35,6 +35,7 @@ use common_stat::ResourceStatRef; use common_telemetry::{debug, error, info, trace, warn}; use common_workload::DatanodeWorkloadType; use meta_client::MetaClientRef; +use meta_client::client::heartbeat::HeartbeatConfig; use meta_client::client::{HeartbeatSender, MetaClient}; use servers::addrs; use snafu::{OptionExt as _, ResultExt}; @@ -61,7 +62,6 @@ pub struct HeartbeatTask { running: Arc, meta_client: MetaClientRef, region_server: RegionServer, - interval: u64, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, region_alive_keeper: Arc, resource_stat: ResourceStatRef, @@ -87,7 +87,7 @@ impl HeartbeatTask { let region_alive_keeper = Arc::new(RegionAliveKeeper::new( region_server.clone(), countdown_task_handler_ext, - opts.heartbeat.interval.as_millis() as u64, + BASE_HEARTBEAT_INTERVAL, )); let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![ region_alive_keeper.clone(), @@ -109,7 +109,6 @@ impl HeartbeatTask { running: Arc::new(AtomicBool::new(false)), meta_client, region_server, - interval: opts.heartbeat.interval.as_millis() as u64, resp_handler_executor, region_alive_keeper, resource_stat, @@ -123,9 +122,9 @@ impl HeartbeatTask { mailbox: MailboxRef, mut notify: Option>, quit_signal: Arc, - ) -> Result { + ) -> Result<(HeartbeatSender, HeartbeatConfig)> { let client_id = meta_client.id(); - let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?; + let (tx, mut rx, config) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?; let mut last_received_lease = Instant::now(); @@ -175,7 +174,7 @@ impl HeartbeatTask { quit_signal.notify_one(); info!("Heartbeat handling loop exit."); }); - Ok(tx) + Ok((tx, config)) } async fn handle_response( @@ -204,13 +203,9 @@ impl HeartbeatTask { warn!("Heartbeat task started multiple times"); return Ok(()); } - let interval = self.interval; let node_id = self.node_id; let node_epoch = self.node_epoch; let addr = &self.peer_addr; - info!( - "Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}." - ); let meta_client = self.meta_client.clone(); let region_server_clone = self.region_server.clone(); @@ -222,7 +217,7 @@ impl HeartbeatTask { let quit_signal = Arc::new(Notify::new()); - let mut tx = Self::create_streams( + let (mut tx, config) = Self::create_streams( &meta_client, running.clone(), handler_executor.clone(), @@ -232,6 +227,17 @@ impl HeartbeatTask { ) .await?; + let interval = config.interval.as_millis() as u64; + let mut retry_interval = config.retry_interval; + + // Update RegionAliveKeeper with the interval from Metasrv + self.region_alive_keeper.update_heartbeat_interval(interval); + + info!( + "Starting heartbeat to Metasrv with config: {}. My node id is {}, address is {}.", + config, node_id, addr + ); + let self_peer = Some(Peer { id: node_id, addr: addr.clone(), @@ -244,6 +250,7 @@ impl HeartbeatTask { let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores(); let total_memory_bytes = self.resource_stat.get_total_memory_bytes(); let resource_stat = self.resource_stat.clone(); + let region_alive_keeper = self.region_alive_keeper.clone(); let gc_limiter = self .region_server .mito_engine() @@ -363,20 +370,23 @@ impl HeartbeatTask { ) .await { - Ok(new_tx) => { - info!("Reconnected to metasrv"); + Ok((new_tx, new_config)) => { + info!("Reconnected to metasrv, heartbeat config: {}", new_config); tx = new_tx; + // Update retry_interval from new config + retry_interval = new_config.retry_interval; + // Update region_alive_keeper's heartbeat interval + region_alive_keeper.update_heartbeat_interval( + new_config.interval.as_millis() as u64, + ); // Triggers to send heartbeat immediately. sleep.as_mut().reset(Instant::now()); } Err(e) => { // Before the META_LEASE_SECS expires, // any retries are meaningless, it always reads the old meta leader address. - // Triggers to retry after META_KEEP_ALIVE_INTERVAL_SECS. - sleep.as_mut().reset( - Instant::now() - + Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS), - ); + // Triggers to retry after retry_interval from Metasrv config. + sleep.as_mut().reset(Instant::now() + retry_interval); error!(e; "Failed to reconnect to metasrv!"); } } diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index a8d2482faf..c6a1a2dcd8 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -39,7 +39,6 @@ use query::QueryEngine; use query::options::QueryOptions; use serde::{Deserialize, Serialize}; use servers::grpc::GrpcOptions; -use servers::heartbeat_options::HeartbeatOptions; use servers::http::HttpOptions; use session::context::QueryContext; use snafu::{OptionExt, ResultExt, ensure}; @@ -111,7 +110,6 @@ pub struct FlownodeOptions { pub meta_client: Option, pub logging: LoggingOptions, pub tracing: TracingOptions, - pub heartbeat: HeartbeatOptions, pub query: QueryOptions, pub user_provider: Option, pub memory: MemoryOptions, @@ -127,7 +125,6 @@ impl Default for FlownodeOptions { meta_client: None, logging: LoggingOptions::default(), tracing: TracingOptions::default(), - heartbeat: HeartbeatOptions::default(), // flownode's query option is set to 1 to throttle flow's query so // that it won't use too much cpu or memory query: QueryOptions { diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs index 89b37860c5..9026bcd4c3 100644 --- a/src/flow/src/heartbeat.rs +++ b/src/flow/src/heartbeat.rs @@ -30,7 +30,6 @@ use common_telemetry::{debug, error, info, warn}; use greptime_proto::v1::meta::NodeInfo; use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; use servers::addrs; -use servers::heartbeat_options::HeartbeatOptions; use snafu::ResultExt; use tokio::sync::mpsc; use tokio::time::Duration; @@ -64,8 +63,6 @@ pub struct HeartbeatTask { node_epoch: u64, peer_addr: String, meta_client: Arc, - report_interval: Duration, - retry_interval: Duration, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, running: Arc, query_stat_size: Option, @@ -81,7 +78,6 @@ impl HeartbeatTask { pub fn new( opts: &FlownodeOptions, meta_client: Arc, - heartbeat_opts: HeartbeatOptions, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, resource_stat: ResourceStatRef, ) -> Self { @@ -90,8 +86,6 @@ impl HeartbeatTask { node_epoch: common_time::util::current_time_millis() as u64, peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)), meta_client, - report_interval: heartbeat_opts.interval, - retry_interval: heartbeat_opts.retry_interval, resp_handler_executor, running: Arc::new(AtomicBool::new(false)), query_stat_size: None, @@ -113,22 +107,26 @@ impl HeartbeatTask { } async fn create_streams(&self) -> Result<(), Error> { - info!("Start to establish the heartbeat connection to metasrv."); - let (req_sender, resp_stream) = self + info!("Establishing heartbeat connection to Metasrv..."); + + let (req_sender, resp_stream, config) = self .meta_client .heartbeat() .await .map_err(BoxedError::new) .context(ExternalSnafu)?; - info!("Flownode's heartbeat connection has been established with metasrv"); + info!( + "Heartbeat started for flownode {}, Metasrv config: {}", + self.node_id, config + ); let (outgoing_tx, outgoing_rx) = mpsc::channel(16); let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx)); - self.start_handle_resp_stream(resp_stream, mailbox); + self.start_handle_resp_stream(resp_stream, mailbox, config.retry_interval); - self.start_heartbeat_report(req_sender, outgoing_rx); + self.start_heartbeat_report(req_sender, outgoing_rx, config.interval); Ok(()) } @@ -217,8 +215,8 @@ impl HeartbeatTask { &self, req_sender: HeartbeatSender, mut outgoing_rx: mpsc::Receiver, + report_interval: Duration, ) { - let report_interval = self.report_interval; let node_epoch = self.node_epoch; let self_peer = Some(Peer { id: self.node_id, @@ -277,9 +275,13 @@ impl HeartbeatTask { }); } - fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) { + fn start_handle_resp_stream( + &self, + mut resp_stream: HeartbeatStream, + mailbox: MailboxRef, + retry_interval: Duration, + ) { let capture_self = self.clone(); - let retry_interval = self.retry_interval; let _handle = common_runtime::spawn_hb(async move { loop { diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 8d698d65b1..f42150582a 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -25,7 +25,6 @@ use meta_client::MetaClientOptions; use query::options::QueryOptions; use serde::{Deserialize, Serialize}; use servers::grpc::GrpcOptions; -use servers::heartbeat_options::HeartbeatOptions; use servers::http::HttpOptions; use servers::server::ServerHandlers; use snafu::ResultExt; @@ -45,7 +44,6 @@ pub struct FrontendOptions { pub node_id: Option, pub default_timezone: Option, pub default_column_prefix: Option, - pub heartbeat: HeartbeatOptions, /// Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight). /// Set to 0 to disable the limit. Default: "0" (unlimited) pub max_in_flight_write_bytes: ReadableSize, @@ -82,7 +80,6 @@ impl Default for FrontendOptions { node_id: None, default_timezone: None, default_column_prefix: None, - heartbeat: HeartbeatOptions::frontend_default(), max_in_flight_write_bytes: ReadableSize(0), write_bytes_exhausted_policy: OnExhaustedPolicy::default(), http: HttpOptions::default(), @@ -406,10 +403,6 @@ mod tests { ..Default::default() }, meta_client: Some(meta_client_options.clone()), - heartbeat: HeartbeatOptions { - interval: Duration::from_secs(1), - ..Default::default() - }, ..Default::default() }; @@ -419,7 +412,11 @@ mod tests { let meta_client = create_meta_client(&meta_client_options, server.clone()).await; let frontend = create_frontend(&options, meta_client).await?; - let frontend_heartbeat_interval = options.heartbeat.interval; + use common_meta::distributed_time_constants::{ + BASE_HEARTBEAT_INTERVAL, frontend_heartbeat_interval, + }; + let frontend_heartbeat_interval = + frontend_heartbeat_interval(BASE_HEARTBEAT_INTERVAL) + Duration::from_secs(1); tokio::time::sleep(frontend_heartbeat_interval).await; // initial state: not suspend: assert!(!frontend.instance.is_suspended()); diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index 64680abfd4..add4940214 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -42,8 +42,6 @@ use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT}; pub struct HeartbeatTask { peer_addr: String, meta_client: Arc, - report_interval: Duration, - retry_interval: Duration, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, start_time_ms: u64, resource_stat: ResourceStatRef, @@ -66,8 +64,6 @@ impl HeartbeatTask { addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)) }, meta_client, - report_interval: opts.heartbeat.interval, - retry_interval: opts.heartbeat.retry_interval, resp_handler_executor, start_time_ms: common_time::util::current_time_millis() as u64, resource_stat, @@ -75,27 +71,31 @@ impl HeartbeatTask { } pub async fn start(&self) -> Result<()> { - let (req_sender, resp_stream) = self + let (req_sender, resp_stream, config) = self .meta_client .heartbeat() .await .context(error::CreateMetaHeartbeatStreamSnafu)?; - info!("A heartbeat connection has been established with metasrv"); + info!("Heartbeat started with Metasrv config: {}", config); let (outgoing_tx, outgoing_rx) = mpsc::channel(16); let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx)); - self.start_handle_resp_stream(resp_stream, mailbox); + self.start_handle_resp_stream(resp_stream, mailbox, config.retry_interval); - self.start_heartbeat_report(req_sender, outgoing_rx); + self.start_heartbeat_report(req_sender, outgoing_rx, config.interval); Ok(()) } - fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) { + fn start_handle_resp_stream( + &self, + mut resp_stream: HeartbeatStream, + mailbox: MailboxRef, + retry_interval: Duration, + ) { let capture_self = self.clone(); - let retry_interval = self.retry_interval; let _handle = common_runtime::spawn_hb(async move { loop { @@ -190,8 +190,8 @@ impl HeartbeatTask { &self, req_sender: HeartbeatSender, mut outgoing_rx: Receiver, + report_interval: Duration, ) { - let report_interval = self.report_interval; let start_time_ms = self.start_time_ms; let self_peer = Some(Peer { // The node id will be actually calculated from its address (by hashing the address diff --git a/src/meta-client/examples/meta_client.rs b/src/meta-client/examples/meta_client.rs index 175888f170..e1e11d9750 100644 --- a/src/meta-client/examples/meta_client.rs +++ b/src/meta-client/examples/meta_client.rs @@ -44,7 +44,7 @@ async fn run() { // required only when the heartbeat_client is enabled meta_client.ask_leader().await.unwrap(); - let (sender, mut receiver) = meta_client.heartbeat().await.unwrap(); + let (sender, mut receiver, _config) = meta_client.heartbeat().await.unwrap(); // send heartbeats let _handle = tokio::spawn(async move { diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index fff34d6d26..0d0bcecee5 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -13,7 +13,7 @@ // limitations under the License. mod ask_leader; -mod heartbeat; +pub mod heartbeat; mod load_balance; mod procedure; @@ -57,7 +57,7 @@ use common_meta::rpc::store::{ }; use common_telemetry::info; use futures::TryStreamExt; -use heartbeat::Client as HeartbeatClient; +use heartbeat::{Client as HeartbeatClient, HeartbeatConfig}; use procedure::Client as ProcedureClient; use snafu::{OptionExt, ResultExt}; use store::Client as StoreClient; @@ -594,7 +594,9 @@ impl MetaClient { /// The `datanode` needs to use the sender to continuously send heartbeat /// packets (some self-state data), and the receiver can receive a response /// from "metasrv" (which may contain some scheduling instructions). - pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> { + /// + /// Returns the heartbeat sender, stream, and configuration received from Metasrv. + pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> { self.heartbeat_client()?.heartbeat().await } @@ -873,7 +875,7 @@ mod tests { #[tokio::test] async fn test_heartbeat() { let tc = new_client("test_heartbeat").await; - let (sender, mut receiver) = tc.client.heartbeat().await.unwrap(); + let (sender, mut receiver, _config) = tc.client.heartbeat().await.unwrap(); // send heartbeats let request_sent = Arc::new(AtomicUsize::new(0)); diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs index be5a319fa8..8171289a26 100644 --- a/src/meta-client/src/client/heartbeat.rs +++ b/src/meta-client/src/client/heartbeat.rs @@ -12,14 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt; use std::sync::Arc; +use std::time::Duration; use api::v1::meta::heartbeat_client::HeartbeatClient; use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, RequestHeader, Role}; use common_grpc::channel_manager::ChannelManager; +use common_meta::distributed_time_constants::BASE_HEARTBEAT_INTERVAL; use common_meta::util; -use common_telemetry::info; use common_telemetry::tracing_context::TracingContext; +use common_telemetry::{info, warn}; use snafu::{OptionExt, ResultExt, ensure}; use tokio::sync::{RwLock, mpsc}; use tokio_stream::wrappers::ReceiverStream; @@ -32,6 +35,52 @@ use crate::client::{Id, LeaderProviderRef}; use crate::error; use crate::error::{InvalidResponseHeaderSnafu, Result}; +/// Heartbeat configuration received from Metasrv during handshake. +#[derive(Debug, Clone, Copy)] +pub struct HeartbeatConfig { + pub interval: Duration, + pub retry_interval: Duration, +} + +impl Default for HeartbeatConfig { + fn default() -> Self { + Self { + interval: BASE_HEARTBEAT_INTERVAL, + retry_interval: BASE_HEARTBEAT_INTERVAL, + } + } +} + +impl fmt::Display for HeartbeatConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "interval={:?}, retry={:?}", + self.interval, self.retry_interval + ) + } +} + +impl HeartbeatConfig { + /// Extract configuration from HeartbeatResponse. + pub fn from_response(res: &HeartbeatResponse) -> Self { + if let Some(cfg) = &res.heartbeat_config { + // Metasrv provided complete configuration + Self { + interval: Duration::from_millis(cfg.heartbeat_interval_ms), + retry_interval: Duration::from_millis(cfg.retry_interval_ms), + } + } else { + let fallback = Self::default(); + warn!( + "Metasrv didn't provide heartbeat_config, using default: {}", + fallback + ); + fallback + } + } +} + pub struct HeartbeatSender { id: Id, role: Role, @@ -130,7 +179,9 @@ impl Client { inner.ask_leader().await } - pub async fn heartbeat(&mut self) -> Result<(HeartbeatSender, HeartbeatStream)> { + pub async fn heartbeat( + &mut self, + ) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> { let inner = self.inner.read().await; inner.ask_leader().await?; inner.heartbeat().await @@ -198,7 +249,7 @@ impl Inner { leader_provider.ask_leader().await } - async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> { + async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> { ensure!( self.is_started(), error::IllegalGrpcClientStateSnafu { @@ -245,14 +296,18 @@ impl Inner { .map_err(error::Error::from)? .context(error::CreateHeartbeatStreamSnafu)?; + // Extract heartbeat configuration from handshake response + let config = HeartbeatConfig::from_response(&res); + info!( - "Success to create heartbeat stream to server: {}, response: {:#?}", - leader_addr, res + "Handshake successful with Metasrv at {}, received config: {}", + leader_addr, config ); Ok(( HeartbeatSender::new(self.id, self.role, sender), HeartbeatStream::new(self.id, stream), + config, )) } diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 12fcfab26f..5c77425fac 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -348,6 +348,8 @@ impl HeartbeatHandlerGroup { err_msg: format!("invalid role: {:?}", req.header), })?; + let is_handshake = ctx.is_handshake; + for NameCachedHandler { name, handler } in self.handlers.iter() { if !handler.is_acceptable(role) { continue; @@ -363,10 +365,26 @@ impl HeartbeatHandlerGroup { } let header = std::mem::take(&mut acc.header); let mailbox_message = acc.take_mailbox_message(); + + // Populate heartbeat_config during handshake + let heartbeat_config = if is_handshake { + let config = ctx.heartbeat_options_for(role).into(); + + info!( + "Handshake with {:?} node, sending config: {:?}", + role, config + ); + + Some(config) + } else { + None + }; + let res = HeartbeatResponse { header, region_lease: acc.region_lease, mailbox_message, + heartbeat_config, }; Ok(res) } diff --git a/src/meta-srv/src/handler/test_utils.rs b/src/meta-srv/src/handler/test_utils.rs index 742aee4b23..7bfdfbee79 100644 --- a/src/meta-srv/src/handler/test_utils.rs +++ b/src/meta-srv/src/handler/test_utils.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator}; +use common_meta::distributed_time_constants::BASE_HEARTBEAT_INTERVAL; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; @@ -90,6 +91,8 @@ impl TestEnv { cache_invalidator: self.cache_invalidator.clone(), leader_region_registry: self.leader_region_registry.clone(), topic_stats_registry: self.topic_stats_registry.clone(), + heartbeat_interval: BASE_HEARTBEAT_INTERVAL, + is_handshake: false, } } } diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 11f4cc5513..19bcb3ee4a 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -19,6 +19,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; +use api::v1::meta::{HeartbeatConfig, Role}; use clap::ValueEnum; use common_base::Plugins; use common_base::readable_size::ReadableSize; @@ -27,7 +28,9 @@ use common_event_recorder::EventRecorderOptions; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl_manager::DdlManagerRef; -use common_meta::distributed_time_constants::{self, default_distributed_time_constants}; +use common_meta::distributed_time_constants::{ + self, BASE_HEARTBEAT_INTERVAL, default_distributed_time_constants, frontend_heartbeat_interval, +}; use common_meta::key::TableMetadataManagerRef; use common_meta::key::runtime_switch::RuntimeSwitchManagerRef; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; @@ -121,6 +124,59 @@ impl Default for StatsPersistenceOptions { } } +/// Heartbeat configuration for a single node type. +#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)] +#[serde(default)] +pub struct HeartbeatOptions { + /// Heartbeat interval. + #[serde(with = "humantime_serde")] + pub interval: Duration, + /// Retry interval when heartbeat connection fails. + #[serde(with = "humantime_serde")] + pub retry_interval: Duration, +} + +impl Default for HeartbeatOptions { + fn default() -> Self { + Self { + interval: BASE_HEARTBEAT_INTERVAL, + retry_interval: BASE_HEARTBEAT_INTERVAL, + } + } +} + +impl HeartbeatOptions { + pub fn datanode_from(base_interval: Duration) -> Self { + Self { + interval: base_interval, + retry_interval: base_interval, + } + } + + pub fn frontend_from(base_interval: Duration) -> Self { + Self { + interval: frontend_heartbeat_interval(base_interval), + retry_interval: base_interval, + } + } + + pub fn flownode_from(base_interval: Duration) -> Self { + Self { + interval: base_interval, + retry_interval: base_interval, + } + } +} + +impl From for HeartbeatConfig { + fn from(opts: HeartbeatOptions) -> Self { + Self { + heartbeat_interval_ms: opts.interval.as_millis() as u64, + retry_interval_ms: opts.retry_interval.as_millis() as u64, + } + } +} + #[derive(Clone, PartialEq, Serialize, Deserialize, Debug)] #[serde(default)] pub struct BackendClientOptions { @@ -379,6 +435,8 @@ pub struct Context { pub cache_invalidator: CacheInvalidatorRef, pub leader_region_registry: LeaderRegionRegistryRef, pub topic_stats_registry: TopicStatsRegistryRef, + pub heartbeat_interval: Duration, + pub is_handshake: bool, } impl Context { @@ -386,6 +444,19 @@ impl Context { self.in_memory.reset(); self.leader_region_registry.reset(); } + + pub fn with_handshake(mut self, is_handshake: bool) -> Self { + self.is_handshake = is_handshake; + self + } + + pub fn heartbeat_options_for(&self, role: Role) -> HeartbeatOptions { + match role { + Role::Datanode => HeartbeatOptions::datanode_from(self.heartbeat_interval), + Role::Frontend => HeartbeatOptions::frontend_from(self.heartbeat_interval), + Role::Flownode => HeartbeatOptions::flownode_from(self.heartbeat_interval), + } + } } /// The value of the leader. It is used to store the leader's address. @@ -903,6 +974,8 @@ impl Metasrv { cache_invalidator, leader_region_registry, topic_stats_registry, + heartbeat_interval: self.options().heartbeat_interval, + is_handshake: false, } } } diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 20c9069428..e09073546a 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -66,7 +66,8 @@ impl heartbeat_server::Heartbeat for Metasrv { break; }; - if pusher_id.is_none() { + let is_handshake = pusher_id.is_none(); + if is_handshake { pusher_id = Some(register_pusher(&handler_group, header, tx.clone()).await); } @@ -77,7 +78,7 @@ impl heartbeat_server::Heartbeat for Metasrv { } let res = handler_group - .handle(req, ctx.clone()) + .handle(req, ctx.clone().with_handshake(is_handshake)) .await .inspect_err(|e| warn!(e; "Failed to handle heartbeat request, pusher: {pusher_id:?}", )) .map_err(|e| e.into()); diff --git a/src/servers/src/heartbeat_options.rs b/src/servers/src/heartbeat_options.rs deleted file mode 100644 index 9812625260..0000000000 --- a/src/servers/src/heartbeat_options.rs +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::time::Duration; - -use common_meta::distributed_time_constants; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[serde(default)] -pub struct HeartbeatOptions { - #[serde(with = "humantime_serde")] - pub interval: Duration, - #[serde(with = "humantime_serde")] - pub retry_interval: Duration, -} - -impl HeartbeatOptions { - pub fn datanode_default() -> Self { - Default::default() - } - - pub fn frontend_default() -> Self { - Self { - // Frontend can send heartbeat with a longer interval. - interval: distributed_time_constants::frontend_heartbeat_interval( - distributed_time_constants::BASE_HEARTBEAT_INTERVAL, - ), - retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL, - } - } -} - -impl Default for HeartbeatOptions { - fn default() -> Self { - Self { - interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL, - retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL, - } - } -} diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 8f77b853f9..4214f7daed 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -30,7 +30,7 @@ pub mod configurator; pub(crate) mod elasticsearch; pub mod error; pub mod grpc; -pub mod heartbeat_options; + mod hint_headers; pub mod http; pub mod influxdb; diff --git a/tests/conf/datanode-test.toml.template b/tests/conf/datanode-test.toml.template index b95472543e..4cb0423c72 100644 --- a/tests/conf/datanode-test.toml.template +++ b/tests/conf/datanode-test.toml.template @@ -36,6 +36,3 @@ tcp_nodelay = false [procedure] max_retry_times = 3 retry_delay = "500ms" - -[heartbeat] -interval = '1s' diff --git a/tests/conf/metasrv-test.toml.template b/tests/conf/metasrv-test.toml.template index e5cde7703d..64ceb686ac 100644 --- a/tests/conf/metasrv-test.toml.template +++ b/tests/conf/metasrv-test.toml.template @@ -1,4 +1,5 @@ flush_stats_factor = 1 +heartbeat_interval = "1s" {{ if use_etcd }} ## Store server address default to etcd store. store_addrs = [{store_addrs | unescaped}]