mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-29 11:20:38 +00:00
feat: Make heartbeat intervals configurable in Frontend and Datanode (#1864)
* update frontend options and config * fix format
This commit is contained in:
@@ -10,6 +10,8 @@ rpc_addr = "127.0.0.1:3001"
|
||||
rpc_hostname = "127.0.0.1"
|
||||
# The number of gRPC server worker threads, 8 by default.
|
||||
rpc_runtime_size = 8
|
||||
# Interval for sending heartbeat messages to the Metasrv in milliseconds, 5000 by default.
|
||||
heartbeat_interval_millis = 5000
|
||||
|
||||
# Metasrv client options.
|
||||
[meta_client_options]
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
# Node running mode, see `standalone.example.toml`.
|
||||
mode = "distributed"
|
||||
# Interval for sending heartbeat task to the Metasrv in milliseconds, 5000 by default.
|
||||
heartbeat_interval_millis = 5000
|
||||
# Interval for retry sending heartbeat task in milliseconds, 5000 by default.
|
||||
retry_interval_millis = 5000
|
||||
|
||||
# HTTP server options, see `standalone.example.toml`.
|
||||
[http_options]
|
||||
|
||||
@@ -132,6 +132,7 @@ impl StandaloneOptions {
|
||||
prom_options: self.prom_options,
|
||||
meta_client_options: None,
|
||||
logging: self.logging,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -27,6 +27,8 @@ use crate::service_config::{
|
||||
#[serde(default)]
|
||||
pub struct FrontendOptions {
|
||||
pub mode: Mode,
|
||||
pub heartbeat_interval_millis: u64,
|
||||
pub retry_interval_millis: u64,
|
||||
pub http_options: Option<HttpOptions>,
|
||||
pub grpc_options: Option<GrpcOptions>,
|
||||
pub mysql_options: Option<MysqlOptions>,
|
||||
@@ -43,6 +45,8 @@ impl Default for FrontendOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
mode: Mode::Standalone,
|
||||
heartbeat_interval_millis: 5000,
|
||||
retry_interval_millis: 5000,
|
||||
http_options: Some(HttpOptions::default()),
|
||||
grpc_options: Some(GrpcOptions::default()),
|
||||
mysql_options: Some(MysqlOptions::default()),
|
||||
|
||||
@@ -43,14 +43,14 @@ pub struct HeartbeatTask {
|
||||
impl HeartbeatTask {
|
||||
pub fn new(
|
||||
meta_client: Arc<MetaClient>,
|
||||
report_interval: u64,
|
||||
retry_interval: u64,
|
||||
heartbeat_interval_millis: u64,
|
||||
retry_interval_millis: u64,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
) -> Self {
|
||||
HeartbeatTask {
|
||||
meta_client,
|
||||
report_interval,
|
||||
retry_interval,
|
||||
report_interval: heartbeat_interval_millis,
|
||||
retry_interval: retry_interval_millis,
|
||||
resp_handler_executor,
|
||||
}
|
||||
}
|
||||
@@ -92,7 +92,7 @@ impl HeartbeatTask {
|
||||
Err(e) => {
|
||||
error!(e; "Occur error while reading heartbeat response");
|
||||
capture_self
|
||||
.start_with_retry(Duration::from_secs(retry_interval))
|
||||
.start_with_retry(Duration::from_millis(retry_interval))
|
||||
.await;
|
||||
|
||||
break;
|
||||
@@ -136,7 +136,7 @@ impl HeartbeatTask {
|
||||
}
|
||||
}
|
||||
_ = &mut sleep => {
|
||||
sleep.as_mut().reset(Instant::now() + Duration::from_secs(report_interval));
|
||||
sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
|
||||
Some(HeartbeatRequest::default())
|
||||
}
|
||||
};
|
||||
|
||||
@@ -136,13 +136,14 @@ impl Instance {
|
||||
|
||||
let datanode_clients = Arc::new(DatanodeClients::default());
|
||||
|
||||
Self::try_new_distributed_with(meta_client, datanode_clients, plugins).await
|
||||
Self::try_new_distributed_with(meta_client, datanode_clients, plugins, opts).await
|
||||
}
|
||||
|
||||
pub async fn try_new_distributed_with(
|
||||
meta_client: Arc<MetaClient>,
|
||||
datanode_clients: Arc<DatanodeClients>,
|
||||
plugins: Arc<Plugins>,
|
||||
opts: &FrontendOptions,
|
||||
) -> Result<Self> {
|
||||
let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
|
||||
let table_routes = Arc::new(TableRoutes::new(meta_client.clone()));
|
||||
@@ -195,8 +196,8 @@ impl Instance {
|
||||
|
||||
let heartbeat_task = Some(HeartbeatTask::new(
|
||||
meta_client,
|
||||
5,
|
||||
5,
|
||||
opts.heartbeat_interval_millis,
|
||||
opts.retry_interval_millis,
|
||||
Arc::new(handlers_executor),
|
||||
));
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ use common_test_util::temp_dir::create_temp_dir;
|
||||
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
|
||||
use datanode::heartbeat::HeartbeatTask;
|
||||
use datanode::instance::Instance as DatanodeInstance;
|
||||
use frontend::frontend::FrontendOptions;
|
||||
use frontend::instance::{FrontendInstance, Instance as FeInstance};
|
||||
use meta_client::client::MetaClientBuilder;
|
||||
use meta_srv::cluster::MetaPeerClientRef;
|
||||
@@ -221,11 +222,14 @@ impl GreptimeDbClusterBuilder {
|
||||
meta_client.start(&[&meta_srv.server_addr]).await.unwrap();
|
||||
let meta_client = Arc::new(meta_client);
|
||||
|
||||
let frontend_opts = FrontendOptions::default();
|
||||
|
||||
Arc::new(
|
||||
FeInstance::try_new_distributed_with(
|
||||
meta_client,
|
||||
datanode_clients,
|
||||
Arc::new(Plugins::default()),
|
||||
&frontend_opts,
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
|
||||
Reference in New Issue
Block a user