mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-19 14:30:43 +00:00
feat!: make heartbeat config only in metasrv (#7510)
* feat: make heartbeat config only in metasrv Signed-off-by: jeremyhi <fengjiachun@gmail.com> * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * feat: refine config doc Signed-off-by: jeremyhi <fengjiachun@gmail.com> * feat: make the heartbeat setup simple Signed-off-by: jeremyhi <fengjiachun@gmail.com> * chore: by comment Signed-off-by: jeremyhi <fengjiachun@gmail.com> * chore: revert config Signed-off-by: jeremyhi <fengjiachun@gmail.com> * feat: proto update Signed-off-by: jeremyhi <fengjiachun@gmail.com> * chore: fix sqlness wrong cfg Signed-off-by: jeremyhi <fengjiachun@gmail.com> --------- Signed-off-by: jeremyhi <fengjiachun@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -5467,7 +5467,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a2e5099d72a1cfa8ba41fa4296101eb5f874074a#a2e5099d72a1cfa8ba41fa4296101eb5f874074a"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=fa0f5716556b3276317701fffa702002e7fce275#fa0f5716556b3276317701fffa702002e7fce275"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"prost-types 0.13.5",
|
||||
|
||||
@@ -151,7 +151,7 @@ etcd-client = { version = "0.16.1", features = [
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a2e5099d72a1cfa8ba41fa4296101eb5f874074a" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fa0f5716556b3276317701fffa702002e7fce275" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -330,7 +330,6 @@ mod tests {
|
||||
use common_config::ENV_VAR_SEP;
|
||||
use common_test_util::temp_dir::create_named_temp_file;
|
||||
use object_store::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
|
||||
use super::*;
|
||||
use crate::options::GlobalOptions;
|
||||
@@ -374,9 +373,6 @@ mod tests {
|
||||
hostname = "127.0.0.1"
|
||||
runtime_size = 8
|
||||
|
||||
[heartbeat]
|
||||
interval = "300ms"
|
||||
|
||||
[meta_client]
|
||||
metasrv_addrs = ["127.0.0.1:3002"]
|
||||
timeout = "3s"
|
||||
@@ -434,13 +430,6 @@ mod tests {
|
||||
);
|
||||
assert!(!raft_engine_config.sync_write);
|
||||
|
||||
let HeartbeatOptions {
|
||||
interval: heart_beat_interval,
|
||||
..
|
||||
} = options.heartbeat;
|
||||
|
||||
assert_eq!(300, heart_beat_interval.as_millis());
|
||||
|
||||
let MetaClientOptions {
|
||||
metasrv_addrs: metasrv_addr,
|
||||
timeout,
|
||||
|
||||
@@ -358,7 +358,6 @@ impl StartCommand {
|
||||
let heartbeat_task = flow::heartbeat::HeartbeatTask::new(
|
||||
&opts,
|
||||
meta_client.clone(),
|
||||
opts.heartbeat.clone(),
|
||||
Arc::new(executor),
|
||||
Arc::new(resource_stat),
|
||||
);
|
||||
|
||||
@@ -228,7 +228,6 @@ fn test_load_flownode_example_config() {
|
||||
..Default::default()
|
||||
},
|
||||
tracing: Default::default(),
|
||||
heartbeat: Default::default(),
|
||||
// flownode deliberately use a slower query parallelism
|
||||
// to avoid overwhelming the frontend with too many queries
|
||||
query: QueryOptions {
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||
|
||||
use api::v1::meta::GrantedRegion;
|
||||
use async_trait::async_trait;
|
||||
@@ -50,7 +50,7 @@ use crate::region_server::RegionServer;
|
||||
pub struct RegionAliveKeeper {
|
||||
region_server: RegionServer,
|
||||
tasks: Arc<Mutex<HashMap<RegionId, Arc<CountdownTaskHandle>>>>,
|
||||
heartbeat_interval_millis: u64,
|
||||
heartbeat_interval_millis: Arc<AtomicU64>,
|
||||
started: Arc<AtomicBool>,
|
||||
|
||||
/// The epoch when [RegionAliveKeeper] is created. It's used to get a monotonically non-decreasing
|
||||
@@ -67,18 +67,26 @@ impl RegionAliveKeeper {
|
||||
pub fn new(
|
||||
region_server: RegionServer,
|
||||
countdown_task_handler_ext: Option<CountdownTaskHandlerExtRef>,
|
||||
heartbeat_interval_millis: u64,
|
||||
heartbeat_interval: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
region_server,
|
||||
tasks: Arc::new(Mutex::new(HashMap::new())),
|
||||
heartbeat_interval_millis,
|
||||
heartbeat_interval_millis: Arc::new(AtomicU64::new(
|
||||
heartbeat_interval.as_millis() as u64
|
||||
)),
|
||||
started: Arc::new(AtomicBool::new(false)),
|
||||
epoch: Instant::now(),
|
||||
countdown_task_handler_ext,
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the heartbeat interval with the value received from Metasrv.
|
||||
pub fn update_heartbeat_interval(&self, heartbeat_interval_millis: u64) {
|
||||
self.heartbeat_interval_millis
|
||||
.store(heartbeat_interval_millis, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
async fn find_handle(&self, region_id: RegionId) -> Option<Arc<CountdownTaskHandle>> {
|
||||
self.tasks.lock().await.get(®ion_id).cloned()
|
||||
}
|
||||
@@ -108,7 +116,9 @@ impl RegionAliveKeeper {
|
||||
};
|
||||
|
||||
if should_start {
|
||||
handle.start(self.heartbeat_interval_millis).await;
|
||||
handle
|
||||
.start(self.heartbeat_interval_millis.load(Ordering::Relaxed))
|
||||
.await;
|
||||
info!("Region alive countdown for region {region_id} is started!");
|
||||
} else {
|
||||
info!(
|
||||
@@ -230,8 +240,9 @@ impl RegionAliveKeeper {
|
||||
}
|
||||
|
||||
let tasks = self.tasks.lock().await;
|
||||
let interval = self.heartbeat_interval_millis.load(Ordering::Relaxed);
|
||||
for task in tasks.values() {
|
||||
task.start(self.heartbeat_interval_millis).await;
|
||||
task.start(interval).await;
|
||||
}
|
||||
|
||||
info!(
|
||||
@@ -505,7 +516,11 @@ mod test {
|
||||
let engine = Arc::new(engine);
|
||||
region_server.register_engine(engine.clone());
|
||||
|
||||
let alive_keeper = Arc::new(RegionAliveKeeper::new(region_server.clone(), None, 100));
|
||||
let alive_keeper = Arc::new(RegionAliveKeeper::new(
|
||||
region_server.clone(),
|
||||
None,
|
||||
Duration::from_millis(100),
|
||||
));
|
||||
|
||||
let region_id = RegionId::new(1024, 1);
|
||||
let builder = CreateRequestBuilder::new();
|
||||
|
||||
@@ -29,7 +29,6 @@ pub(crate) use object_store::config::ObjectStoreConfig;
|
||||
use query::options::QueryOptions;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
use servers::http::HttpOptions;
|
||||
|
||||
/// Storage engine config
|
||||
@@ -71,7 +70,6 @@ pub struct DatanodeOptions {
|
||||
pub init_regions_in_background: bool,
|
||||
pub init_regions_parallelism: usize,
|
||||
pub grpc: GrpcOptions,
|
||||
pub heartbeat: HeartbeatOptions,
|
||||
pub http: HttpOptions,
|
||||
pub meta_client: Option<MetaClientOptions>,
|
||||
pub wal: DatanodeWalConfig,
|
||||
@@ -134,7 +132,6 @@ impl Default for DatanodeOptions {
|
||||
RegionEngineConfig::File(FileEngineConfig::default()),
|
||||
],
|
||||
logging: LoggingOptions::default(),
|
||||
heartbeat: HeartbeatOptions::datanode_default(),
|
||||
enable_telemetry: true,
|
||||
tracing: TracingOptions::default(),
|
||||
query: QueryOptions::default(),
|
||||
|
||||
@@ -22,7 +22,7 @@ use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionR
|
||||
use common_base::Plugins;
|
||||
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
||||
use common_meta::datanode::REGION_STATISTIC_KEY;
|
||||
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
|
||||
use common_meta::distributed_time_constants::BASE_HEARTBEAT_INTERVAL;
|
||||
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
|
||||
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
|
||||
use common_meta::heartbeat::handler::suspend::SuspendHandler;
|
||||
@@ -35,6 +35,7 @@ use common_stat::ResourceStatRef;
|
||||
use common_telemetry::{debug, error, info, trace, warn};
|
||||
use common_workload::DatanodeWorkloadType;
|
||||
use meta_client::MetaClientRef;
|
||||
use meta_client::client::heartbeat::HeartbeatConfig;
|
||||
use meta_client::client::{HeartbeatSender, MetaClient};
|
||||
use servers::addrs;
|
||||
use snafu::{OptionExt as _, ResultExt};
|
||||
@@ -61,7 +62,6 @@ pub struct HeartbeatTask {
|
||||
running: Arc<AtomicBool>,
|
||||
meta_client: MetaClientRef,
|
||||
region_server: RegionServer,
|
||||
interval: u64,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
region_alive_keeper: Arc<RegionAliveKeeper>,
|
||||
resource_stat: ResourceStatRef,
|
||||
@@ -87,7 +87,7 @@ impl HeartbeatTask {
|
||||
let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
|
||||
region_server.clone(),
|
||||
countdown_task_handler_ext,
|
||||
opts.heartbeat.interval.as_millis() as u64,
|
||||
BASE_HEARTBEAT_INTERVAL,
|
||||
));
|
||||
let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
|
||||
region_alive_keeper.clone(),
|
||||
@@ -109,7 +109,6 @@ impl HeartbeatTask {
|
||||
running: Arc::new(AtomicBool::new(false)),
|
||||
meta_client,
|
||||
region_server,
|
||||
interval: opts.heartbeat.interval.as_millis() as u64,
|
||||
resp_handler_executor,
|
||||
region_alive_keeper,
|
||||
resource_stat,
|
||||
@@ -123,9 +122,9 @@ impl HeartbeatTask {
|
||||
mailbox: MailboxRef,
|
||||
mut notify: Option<Arc<Notify>>,
|
||||
quit_signal: Arc<Notify>,
|
||||
) -> Result<HeartbeatSender> {
|
||||
) -> Result<(HeartbeatSender, HeartbeatConfig)> {
|
||||
let client_id = meta_client.id();
|
||||
let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
|
||||
let (tx, mut rx, config) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
|
||||
|
||||
let mut last_received_lease = Instant::now();
|
||||
|
||||
@@ -175,7 +174,7 @@ impl HeartbeatTask {
|
||||
quit_signal.notify_one();
|
||||
info!("Heartbeat handling loop exit.");
|
||||
});
|
||||
Ok(tx)
|
||||
Ok((tx, config))
|
||||
}
|
||||
|
||||
async fn handle_response(
|
||||
@@ -204,13 +203,9 @@ impl HeartbeatTask {
|
||||
warn!("Heartbeat task started multiple times");
|
||||
return Ok(());
|
||||
}
|
||||
let interval = self.interval;
|
||||
let node_id = self.node_id;
|
||||
let node_epoch = self.node_epoch;
|
||||
let addr = &self.peer_addr;
|
||||
info!(
|
||||
"Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}."
|
||||
);
|
||||
|
||||
let meta_client = self.meta_client.clone();
|
||||
let region_server_clone = self.region_server.clone();
|
||||
@@ -222,7 +217,7 @@ impl HeartbeatTask {
|
||||
|
||||
let quit_signal = Arc::new(Notify::new());
|
||||
|
||||
let mut tx = Self::create_streams(
|
||||
let (mut tx, config) = Self::create_streams(
|
||||
&meta_client,
|
||||
running.clone(),
|
||||
handler_executor.clone(),
|
||||
@@ -232,6 +227,17 @@ impl HeartbeatTask {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let interval = config.interval.as_millis() as u64;
|
||||
let mut retry_interval = config.retry_interval;
|
||||
|
||||
// Update RegionAliveKeeper with the interval from Metasrv
|
||||
self.region_alive_keeper.update_heartbeat_interval(interval);
|
||||
|
||||
info!(
|
||||
"Starting heartbeat to Metasrv with config: {}. My node id is {}, address is {}.",
|
||||
config, node_id, addr
|
||||
);
|
||||
|
||||
let self_peer = Some(Peer {
|
||||
id: node_id,
|
||||
addr: addr.clone(),
|
||||
@@ -244,6 +250,7 @@ impl HeartbeatTask {
|
||||
let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
|
||||
let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
|
||||
let resource_stat = self.resource_stat.clone();
|
||||
let region_alive_keeper = self.region_alive_keeper.clone();
|
||||
let gc_limiter = self
|
||||
.region_server
|
||||
.mito_engine()
|
||||
@@ -363,20 +370,23 @@ impl HeartbeatTask {
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(new_tx) => {
|
||||
info!("Reconnected to metasrv");
|
||||
Ok((new_tx, new_config)) => {
|
||||
info!("Reconnected to metasrv, heartbeat config: {}", new_config);
|
||||
tx = new_tx;
|
||||
// Update retry_interval from new config
|
||||
retry_interval = new_config.retry_interval;
|
||||
// Update region_alive_keeper's heartbeat interval
|
||||
region_alive_keeper.update_heartbeat_interval(
|
||||
new_config.interval.as_millis() as u64,
|
||||
);
|
||||
// Triggers to send heartbeat immediately.
|
||||
sleep.as_mut().reset(Instant::now());
|
||||
}
|
||||
Err(e) => {
|
||||
// Before the META_LEASE_SECS expires,
|
||||
// any retries are meaningless, it always reads the old meta leader address.
|
||||
// Triggers to retry after META_KEEP_ALIVE_INTERVAL_SECS.
|
||||
sleep.as_mut().reset(
|
||||
Instant::now()
|
||||
+ Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS),
|
||||
);
|
||||
// Triggers to retry after retry_interval from Metasrv config.
|
||||
sleep.as_mut().reset(Instant::now() + retry_interval);
|
||||
error!(e; "Failed to reconnect to metasrv!");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,7 +39,6 @@ use query::QueryEngine;
|
||||
use query::options::QueryOptions;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
use servers::http::HttpOptions;
|
||||
use session::context::QueryContext;
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
@@ -111,7 +110,6 @@ pub struct FlownodeOptions {
|
||||
pub meta_client: Option<MetaClientOptions>,
|
||||
pub logging: LoggingOptions,
|
||||
pub tracing: TracingOptions,
|
||||
pub heartbeat: HeartbeatOptions,
|
||||
pub query: QueryOptions,
|
||||
pub user_provider: Option<String>,
|
||||
pub memory: MemoryOptions,
|
||||
@@ -127,7 +125,6 @@ impl Default for FlownodeOptions {
|
||||
meta_client: None,
|
||||
logging: LoggingOptions::default(),
|
||||
tracing: TracingOptions::default(),
|
||||
heartbeat: HeartbeatOptions::default(),
|
||||
// flownode's query option is set to 1 to throttle flow's query so
|
||||
// that it won't use too much cpu or memory
|
||||
query: QueryOptions {
|
||||
|
||||
@@ -30,7 +30,6 @@ use common_telemetry::{debug, error, info, warn};
|
||||
use greptime_proto::v1::meta::NodeInfo;
|
||||
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
|
||||
use servers::addrs;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
use snafu::ResultExt;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::Duration;
|
||||
@@ -64,8 +63,6 @@ pub struct HeartbeatTask {
|
||||
node_epoch: u64,
|
||||
peer_addr: String,
|
||||
meta_client: Arc<MetaClient>,
|
||||
report_interval: Duration,
|
||||
retry_interval: Duration,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
running: Arc<AtomicBool>,
|
||||
query_stat_size: Option<SizeReportSender>,
|
||||
@@ -81,7 +78,6 @@ impl HeartbeatTask {
|
||||
pub fn new(
|
||||
opts: &FlownodeOptions,
|
||||
meta_client: Arc<MetaClient>,
|
||||
heartbeat_opts: HeartbeatOptions,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
resource_stat: ResourceStatRef,
|
||||
) -> Self {
|
||||
@@ -90,8 +86,6 @@ impl HeartbeatTask {
|
||||
node_epoch: common_time::util::current_time_millis() as u64,
|
||||
peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
|
||||
meta_client,
|
||||
report_interval: heartbeat_opts.interval,
|
||||
retry_interval: heartbeat_opts.retry_interval,
|
||||
resp_handler_executor,
|
||||
running: Arc::new(AtomicBool::new(false)),
|
||||
query_stat_size: None,
|
||||
@@ -113,22 +107,26 @@ impl HeartbeatTask {
|
||||
}
|
||||
|
||||
async fn create_streams(&self) -> Result<(), Error> {
|
||||
info!("Start to establish the heartbeat connection to metasrv.");
|
||||
let (req_sender, resp_stream) = self
|
||||
info!("Establishing heartbeat connection to Metasrv...");
|
||||
|
||||
let (req_sender, resp_stream, config) = self
|
||||
.meta_client
|
||||
.heartbeat()
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?;
|
||||
|
||||
info!("Flownode's heartbeat connection has been established with metasrv");
|
||||
info!(
|
||||
"Heartbeat started for flownode {}, Metasrv config: {}",
|
||||
self.node_id, config
|
||||
);
|
||||
|
||||
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
|
||||
let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
|
||||
|
||||
self.start_handle_resp_stream(resp_stream, mailbox);
|
||||
self.start_handle_resp_stream(resp_stream, mailbox, config.retry_interval);
|
||||
|
||||
self.start_heartbeat_report(req_sender, outgoing_rx);
|
||||
self.start_heartbeat_report(req_sender, outgoing_rx, config.interval);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -217,8 +215,8 @@ impl HeartbeatTask {
|
||||
&self,
|
||||
req_sender: HeartbeatSender,
|
||||
mut outgoing_rx: mpsc::Receiver<OutgoingMessage>,
|
||||
report_interval: Duration,
|
||||
) {
|
||||
let report_interval = self.report_interval;
|
||||
let node_epoch = self.node_epoch;
|
||||
let self_peer = Some(Peer {
|
||||
id: self.node_id,
|
||||
@@ -277,9 +275,13 @@ impl HeartbeatTask {
|
||||
});
|
||||
}
|
||||
|
||||
fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
|
||||
fn start_handle_resp_stream(
|
||||
&self,
|
||||
mut resp_stream: HeartbeatStream,
|
||||
mailbox: MailboxRef,
|
||||
retry_interval: Duration,
|
||||
) {
|
||||
let capture_self = self.clone();
|
||||
let retry_interval = self.retry_interval;
|
||||
|
||||
let _handle = common_runtime::spawn_hb(async move {
|
||||
loop {
|
||||
|
||||
@@ -25,7 +25,6 @@ use meta_client::MetaClientOptions;
|
||||
use query::options::QueryOptions;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
use servers::http::HttpOptions;
|
||||
use servers::server::ServerHandlers;
|
||||
use snafu::ResultExt;
|
||||
@@ -45,7 +44,6 @@ pub struct FrontendOptions {
|
||||
pub node_id: Option<String>,
|
||||
pub default_timezone: Option<String>,
|
||||
pub default_column_prefix: Option<String>,
|
||||
pub heartbeat: HeartbeatOptions,
|
||||
/// Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
|
||||
/// Set to 0 to disable the limit. Default: "0" (unlimited)
|
||||
pub max_in_flight_write_bytes: ReadableSize,
|
||||
@@ -82,7 +80,6 @@ impl Default for FrontendOptions {
|
||||
node_id: None,
|
||||
default_timezone: None,
|
||||
default_column_prefix: None,
|
||||
heartbeat: HeartbeatOptions::frontend_default(),
|
||||
max_in_flight_write_bytes: ReadableSize(0),
|
||||
write_bytes_exhausted_policy: OnExhaustedPolicy::default(),
|
||||
http: HttpOptions::default(),
|
||||
@@ -406,10 +403,6 @@ mod tests {
|
||||
..Default::default()
|
||||
},
|
||||
meta_client: Some(meta_client_options.clone()),
|
||||
heartbeat: HeartbeatOptions {
|
||||
interval: Duration::from_secs(1),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@@ -419,7 +412,11 @@ mod tests {
|
||||
let meta_client = create_meta_client(&meta_client_options, server.clone()).await;
|
||||
let frontend = create_frontend(&options, meta_client).await?;
|
||||
|
||||
let frontend_heartbeat_interval = options.heartbeat.interval;
|
||||
use common_meta::distributed_time_constants::{
|
||||
BASE_HEARTBEAT_INTERVAL, frontend_heartbeat_interval,
|
||||
};
|
||||
let frontend_heartbeat_interval =
|
||||
frontend_heartbeat_interval(BASE_HEARTBEAT_INTERVAL) + Duration::from_secs(1);
|
||||
tokio::time::sleep(frontend_heartbeat_interval).await;
|
||||
// initial state: not suspend:
|
||||
assert!(!frontend.instance.is_suspended());
|
||||
|
||||
@@ -42,8 +42,6 @@ use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
|
||||
pub struct HeartbeatTask {
|
||||
peer_addr: String,
|
||||
meta_client: Arc<MetaClient>,
|
||||
report_interval: Duration,
|
||||
retry_interval: Duration,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
start_time_ms: u64,
|
||||
resource_stat: ResourceStatRef,
|
||||
@@ -66,8 +64,6 @@ impl HeartbeatTask {
|
||||
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
|
||||
},
|
||||
meta_client,
|
||||
report_interval: opts.heartbeat.interval,
|
||||
retry_interval: opts.heartbeat.retry_interval,
|
||||
resp_handler_executor,
|
||||
start_time_ms: common_time::util::current_time_millis() as u64,
|
||||
resource_stat,
|
||||
@@ -75,27 +71,31 @@ impl HeartbeatTask {
|
||||
}
|
||||
|
||||
pub async fn start(&self) -> Result<()> {
|
||||
let (req_sender, resp_stream) = self
|
||||
let (req_sender, resp_stream, config) = self
|
||||
.meta_client
|
||||
.heartbeat()
|
||||
.await
|
||||
.context(error::CreateMetaHeartbeatStreamSnafu)?;
|
||||
|
||||
info!("A heartbeat connection has been established with metasrv");
|
||||
info!("Heartbeat started with Metasrv config: {}", config);
|
||||
|
||||
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
|
||||
let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
|
||||
|
||||
self.start_handle_resp_stream(resp_stream, mailbox);
|
||||
self.start_handle_resp_stream(resp_stream, mailbox, config.retry_interval);
|
||||
|
||||
self.start_heartbeat_report(req_sender, outgoing_rx);
|
||||
self.start_heartbeat_report(req_sender, outgoing_rx, config.interval);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
|
||||
fn start_handle_resp_stream(
|
||||
&self,
|
||||
mut resp_stream: HeartbeatStream,
|
||||
mailbox: MailboxRef,
|
||||
retry_interval: Duration,
|
||||
) {
|
||||
let capture_self = self.clone();
|
||||
let retry_interval = self.retry_interval;
|
||||
|
||||
let _handle = common_runtime::spawn_hb(async move {
|
||||
loop {
|
||||
@@ -190,8 +190,8 @@ impl HeartbeatTask {
|
||||
&self,
|
||||
req_sender: HeartbeatSender,
|
||||
mut outgoing_rx: Receiver<OutgoingMessage>,
|
||||
report_interval: Duration,
|
||||
) {
|
||||
let report_interval = self.report_interval;
|
||||
let start_time_ms = self.start_time_ms;
|
||||
let self_peer = Some(Peer {
|
||||
// The node id will be actually calculated from its address (by hashing the address
|
||||
|
||||
@@ -44,7 +44,7 @@ async fn run() {
|
||||
// required only when the heartbeat_client is enabled
|
||||
meta_client.ask_leader().await.unwrap();
|
||||
|
||||
let (sender, mut receiver) = meta_client.heartbeat().await.unwrap();
|
||||
let (sender, mut receiver, _config) = meta_client.heartbeat().await.unwrap();
|
||||
|
||||
// send heartbeats
|
||||
let _handle = tokio::spawn(async move {
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
mod ask_leader;
|
||||
mod heartbeat;
|
||||
pub mod heartbeat;
|
||||
mod load_balance;
|
||||
mod procedure;
|
||||
|
||||
@@ -57,7 +57,7 @@ use common_meta::rpc::store::{
|
||||
};
|
||||
use common_telemetry::info;
|
||||
use futures::TryStreamExt;
|
||||
use heartbeat::Client as HeartbeatClient;
|
||||
use heartbeat::{Client as HeartbeatClient, HeartbeatConfig};
|
||||
use procedure::Client as ProcedureClient;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store::Client as StoreClient;
|
||||
@@ -594,7 +594,9 @@ impl MetaClient {
|
||||
/// The `datanode` needs to use the sender to continuously send heartbeat
|
||||
/// packets (some self-state data), and the receiver can receive a response
|
||||
/// from "metasrv" (which may contain some scheduling instructions).
|
||||
pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> {
|
||||
///
|
||||
/// Returns the heartbeat sender, stream, and configuration received from Metasrv.
|
||||
pub async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> {
|
||||
self.heartbeat_client()?.heartbeat().await
|
||||
}
|
||||
|
||||
@@ -873,7 +875,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_heartbeat() {
|
||||
let tc = new_client("test_heartbeat").await;
|
||||
let (sender, mut receiver) = tc.client.heartbeat().await.unwrap();
|
||||
let (sender, mut receiver, _config) = tc.client.heartbeat().await.unwrap();
|
||||
// send heartbeats
|
||||
|
||||
let request_sent = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
@@ -12,14 +12,17 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::heartbeat_client::HeartbeatClient;
|
||||
use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, RequestHeader, Role};
|
||||
use common_grpc::channel_manager::ChannelManager;
|
||||
use common_meta::distributed_time_constants::BASE_HEARTBEAT_INTERVAL;
|
||||
use common_meta::util;
|
||||
use common_telemetry::info;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use common_telemetry::{info, warn};
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
use tokio::sync::{RwLock, mpsc};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
@@ -32,6 +35,52 @@ use crate::client::{Id, LeaderProviderRef};
|
||||
use crate::error;
|
||||
use crate::error::{InvalidResponseHeaderSnafu, Result};
|
||||
|
||||
/// Heartbeat configuration received from Metasrv during handshake.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct HeartbeatConfig {
|
||||
pub interval: Duration,
|
||||
pub retry_interval: Duration,
|
||||
}
|
||||
|
||||
impl Default for HeartbeatConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
interval: BASE_HEARTBEAT_INTERVAL,
|
||||
retry_interval: BASE_HEARTBEAT_INTERVAL,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for HeartbeatConfig {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"interval={:?}, retry={:?}",
|
||||
self.interval, self.retry_interval
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl HeartbeatConfig {
|
||||
/// Extract configuration from HeartbeatResponse.
|
||||
pub fn from_response(res: &HeartbeatResponse) -> Self {
|
||||
if let Some(cfg) = &res.heartbeat_config {
|
||||
// Metasrv provided complete configuration
|
||||
Self {
|
||||
interval: Duration::from_millis(cfg.heartbeat_interval_ms),
|
||||
retry_interval: Duration::from_millis(cfg.retry_interval_ms),
|
||||
}
|
||||
} else {
|
||||
let fallback = Self::default();
|
||||
warn!(
|
||||
"Metasrv didn't provide heartbeat_config, using default: {}",
|
||||
fallback
|
||||
);
|
||||
fallback
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct HeartbeatSender {
|
||||
id: Id,
|
||||
role: Role,
|
||||
@@ -130,7 +179,9 @@ impl Client {
|
||||
inner.ask_leader().await
|
||||
}
|
||||
|
||||
pub async fn heartbeat(&mut self) -> Result<(HeartbeatSender, HeartbeatStream)> {
|
||||
pub async fn heartbeat(
|
||||
&mut self,
|
||||
) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> {
|
||||
let inner = self.inner.read().await;
|
||||
inner.ask_leader().await?;
|
||||
inner.heartbeat().await
|
||||
@@ -198,7 +249,7 @@ impl Inner {
|
||||
leader_provider.ask_leader().await
|
||||
}
|
||||
|
||||
async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> {
|
||||
async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream, HeartbeatConfig)> {
|
||||
ensure!(
|
||||
self.is_started(),
|
||||
error::IllegalGrpcClientStateSnafu {
|
||||
@@ -245,14 +296,18 @@ impl Inner {
|
||||
.map_err(error::Error::from)?
|
||||
.context(error::CreateHeartbeatStreamSnafu)?;
|
||||
|
||||
// Extract heartbeat configuration from handshake response
|
||||
let config = HeartbeatConfig::from_response(&res);
|
||||
|
||||
info!(
|
||||
"Success to create heartbeat stream to server: {}, response: {:#?}",
|
||||
leader_addr, res
|
||||
"Handshake successful with Metasrv at {}, received config: {}",
|
||||
leader_addr, config
|
||||
);
|
||||
|
||||
Ok((
|
||||
HeartbeatSender::new(self.id, self.role, sender),
|
||||
HeartbeatStream::new(self.id, stream),
|
||||
config,
|
||||
))
|
||||
}
|
||||
|
||||
|
||||
@@ -348,6 +348,8 @@ impl HeartbeatHandlerGroup {
|
||||
err_msg: format!("invalid role: {:?}", req.header),
|
||||
})?;
|
||||
|
||||
let is_handshake = ctx.is_handshake;
|
||||
|
||||
for NameCachedHandler { name, handler } in self.handlers.iter() {
|
||||
if !handler.is_acceptable(role) {
|
||||
continue;
|
||||
@@ -363,10 +365,26 @@ impl HeartbeatHandlerGroup {
|
||||
}
|
||||
let header = std::mem::take(&mut acc.header);
|
||||
let mailbox_message = acc.take_mailbox_message();
|
||||
|
||||
// Populate heartbeat_config during handshake
|
||||
let heartbeat_config = if is_handshake {
|
||||
let config = ctx.heartbeat_options_for(role).into();
|
||||
|
||||
info!(
|
||||
"Handshake with {:?} node, sending config: {:?}",
|
||||
role, config
|
||||
);
|
||||
|
||||
Some(config)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let res = HeartbeatResponse {
|
||||
header,
|
||||
region_lease: acc.region_lease,
|
||||
mailbox_message,
|
||||
heartbeat_config,
|
||||
};
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
|
||||
use common_meta::distributed_time_constants::BASE_HEARTBEAT_INTERVAL;
|
||||
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
|
||||
@@ -90,6 +91,8 @@ impl TestEnv {
|
||||
cache_invalidator: self.cache_invalidator.clone(),
|
||||
leader_region_registry: self.leader_region_registry.clone(),
|
||||
topic_stats_registry: self.topic_stats_registry.clone(),
|
||||
heartbeat_interval: BASE_HEARTBEAT_INTERVAL,
|
||||
is_handshake: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::{HeartbeatConfig, Role};
|
||||
use clap::ValueEnum;
|
||||
use common_base::Plugins;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
@@ -27,7 +28,9 @@ use common_event_recorder::EventRecorderOptions;
|
||||
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
|
||||
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
||||
use common_meta::ddl_manager::DdlManagerRef;
|
||||
use common_meta::distributed_time_constants::{self, default_distributed_time_constants};
|
||||
use common_meta::distributed_time_constants::{
|
||||
self, BASE_HEARTBEAT_INTERVAL, default_distributed_time_constants, frontend_heartbeat_interval,
|
||||
};
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
|
||||
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
|
||||
@@ -121,6 +124,59 @@ impl Default for StatsPersistenceOptions {
|
||||
}
|
||||
}
|
||||
|
||||
/// Heartbeat configuration for a single node type.
|
||||
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
|
||||
#[serde(default)]
|
||||
pub struct HeartbeatOptions {
|
||||
/// Heartbeat interval.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub interval: Duration,
|
||||
/// Retry interval when heartbeat connection fails.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub retry_interval: Duration,
|
||||
}
|
||||
|
||||
impl Default for HeartbeatOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
interval: BASE_HEARTBEAT_INTERVAL,
|
||||
retry_interval: BASE_HEARTBEAT_INTERVAL,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl HeartbeatOptions {
|
||||
pub fn datanode_from(base_interval: Duration) -> Self {
|
||||
Self {
|
||||
interval: base_interval,
|
||||
retry_interval: base_interval,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn frontend_from(base_interval: Duration) -> Self {
|
||||
Self {
|
||||
interval: frontend_heartbeat_interval(base_interval),
|
||||
retry_interval: base_interval,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn flownode_from(base_interval: Duration) -> Self {
|
||||
Self {
|
||||
interval: base_interval,
|
||||
retry_interval: base_interval,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<HeartbeatOptions> for HeartbeatConfig {
|
||||
fn from(opts: HeartbeatOptions) -> Self {
|
||||
Self {
|
||||
heartbeat_interval_ms: opts.interval.as_millis() as u64,
|
||||
retry_interval_ms: opts.retry_interval.as_millis() as u64,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
|
||||
#[serde(default)]
|
||||
pub struct BackendClientOptions {
|
||||
@@ -379,6 +435,8 @@ pub struct Context {
|
||||
pub cache_invalidator: CacheInvalidatorRef,
|
||||
pub leader_region_registry: LeaderRegionRegistryRef,
|
||||
pub topic_stats_registry: TopicStatsRegistryRef,
|
||||
pub heartbeat_interval: Duration,
|
||||
pub is_handshake: bool,
|
||||
}
|
||||
|
||||
impl Context {
|
||||
@@ -386,6 +444,19 @@ impl Context {
|
||||
self.in_memory.reset();
|
||||
self.leader_region_registry.reset();
|
||||
}
|
||||
|
||||
pub fn with_handshake(mut self, is_handshake: bool) -> Self {
|
||||
self.is_handshake = is_handshake;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn heartbeat_options_for(&self, role: Role) -> HeartbeatOptions {
|
||||
match role {
|
||||
Role::Datanode => HeartbeatOptions::datanode_from(self.heartbeat_interval),
|
||||
Role::Frontend => HeartbeatOptions::frontend_from(self.heartbeat_interval),
|
||||
Role::Flownode => HeartbeatOptions::flownode_from(self.heartbeat_interval),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The value of the leader. It is used to store the leader's address.
|
||||
@@ -903,6 +974,8 @@ impl Metasrv {
|
||||
cache_invalidator,
|
||||
leader_region_registry,
|
||||
topic_stats_registry,
|
||||
heartbeat_interval: self.options().heartbeat_interval,
|
||||
is_handshake: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,7 +66,8 @@ impl heartbeat_server::Heartbeat for Metasrv {
|
||||
break;
|
||||
};
|
||||
|
||||
if pusher_id.is_none() {
|
||||
let is_handshake = pusher_id.is_none();
|
||||
if is_handshake {
|
||||
pusher_id =
|
||||
Some(register_pusher(&handler_group, header, tx.clone()).await);
|
||||
}
|
||||
@@ -77,7 +78,7 @@ impl heartbeat_server::Heartbeat for Metasrv {
|
||||
}
|
||||
|
||||
let res = handler_group
|
||||
.handle(req, ctx.clone())
|
||||
.handle(req, ctx.clone().with_handshake(is_handshake))
|
||||
.await
|
||||
.inspect_err(|e| warn!(e; "Failed to handle heartbeat request, pusher: {pusher_id:?}", ))
|
||||
.map_err(|e| e.into());
|
||||
|
||||
@@ -1,52 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::distributed_time_constants;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct HeartbeatOptions {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub interval: Duration,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub retry_interval: Duration,
|
||||
}
|
||||
|
||||
impl HeartbeatOptions {
|
||||
pub fn datanode_default() -> Self {
|
||||
Default::default()
|
||||
}
|
||||
|
||||
pub fn frontend_default() -> Self {
|
||||
Self {
|
||||
// Frontend can send heartbeat with a longer interval.
|
||||
interval: distributed_time_constants::frontend_heartbeat_interval(
|
||||
distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
|
||||
),
|
||||
retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for HeartbeatOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
|
||||
retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -30,7 +30,7 @@ pub mod configurator;
|
||||
pub(crate) mod elasticsearch;
|
||||
pub mod error;
|
||||
pub mod grpc;
|
||||
pub mod heartbeat_options;
|
||||
|
||||
mod hint_headers;
|
||||
pub mod http;
|
||||
pub mod influxdb;
|
||||
|
||||
@@ -36,6 +36,3 @@ tcp_nodelay = false
|
||||
[procedure]
|
||||
max_retry_times = 3
|
||||
retry_delay = "500ms"
|
||||
|
||||
[heartbeat]
|
||||
interval = '1s'
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
flush_stats_factor = 1
|
||||
heartbeat_interval = "1s"
|
||||
{{ if use_etcd }}
|
||||
## Store server address default to etcd store.
|
||||
store_addrs = [{store_addrs | unescaped}]
|
||||
|
||||
Reference in New Issue
Block a user