feat: make distributed time constants and client timeouts configurable (#7433)

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-12-19 10:23:20 +08:00
committed by GitHub
parent 2f4a15ec40
commit 4d66bd96b8
32 changed files with 257 additions and 130 deletions

View File

@@ -83,6 +83,8 @@
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.<br/>**It's only used when the provider is `raft_engine`**. | | `wal.sync_period` | String | `10s` | Duration for fsyncing log files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. | | `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. |
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. | | `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. |
| `wal.connect_timeout` | String | `3s` | The connect timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
| `wal.timeout` | String | `3s` | The timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)` | | `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.<br/>Set to `true` to automatically create topics for WAL.<br/>Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
| `wal.num_topics` | Integer | `64` | Number of topics.<br/>**It's only used when the provider is `kafka`**. | | `wal.num_topics` | Integer | `64` | Number of topics.<br/>**It's only used when the provider is `kafka`**. |
| `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default)<br/>**It's only used when the provider is `kafka`**. | | `wal.selector_type` | String | `round_robin` | Topic selector type.<br/>Available selector types:<br/>- `round_robin` (default)<br/>**It's only used when the provider is `kafka`**. |
@@ -352,6 +354,7 @@
| `region_failure_detector_initialization_delay` | String | `10m` | The delay before starting region failure detection.<br/>This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.<br/>Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled. | | `region_failure_detector_initialization_delay` | String | `10m` | The delay before starting region failure detection.<br/>This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.<br/>Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled. |
| `allow_region_failover_on_local_wal` | Bool | `false` | Whether to allow region failover on local WAL.<br/>**This option is not recommended to be set to true, because it may lead to data loss during failover.** | | `allow_region_failover_on_local_wal` | Bool | `false` | Whether to allow region failover on local WAL.<br/>**This option is not recommended to be set to true, because it may lead to data loss during failover.** |
| `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. | | `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. |
| `heartbeat_interval` | String | `3s` | Base heartbeat interval for calculating distributed time constants.<br/>The frontend heartbeat interval is 6 times of the base heartbeat interval.<br/>The flownode/datanode heartbeat interval is 1 times of the base heartbeat interval.<br/>e.g., If the base heartbeat interval is 3s, the frontend heartbeat interval is 18s, the flownode/datanode heartbeat interval is 3s.<br/>If you change this value, you need to change the heartbeat interval of the flownode/frontend/datanode accordingly. |
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. | | `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. |
| `runtime` | -- | -- | The runtime options. | | `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | | `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
@@ -361,12 +364,18 @@
| `backend_tls.cert_path` | String | `""` | Path to client certificate file (for client authentication)<br/>Like "/path/to/client.crt" | | `backend_tls.cert_path` | String | `""` | Path to client certificate file (for client authentication)<br/>Like "/path/to/client.crt" |
| `backend_tls.key_path` | String | `""` | Path to client private key file (for client authentication)<br/>Like "/path/to/client.key" | | `backend_tls.key_path` | String | `""` | Path to client private key file (for client authentication)<br/>Like "/path/to/client.key" |
| `backend_tls.ca_cert_path` | String | `""` | Path to CA certificate file (for server certificate verification)<br/>Required when using custom CAs or self-signed certificates<br/>Leave empty to use system root certificates only<br/>Like "/path/to/ca.crt" | | `backend_tls.ca_cert_path` | String | `""` | Path to CA certificate file (for server certificate verification)<br/>Required when using custom CAs or self-signed certificates<br/>Leave empty to use system root certificates only<br/>Like "/path/to/ca.crt" |
| `backend_client` | -- | -- | The backend client options.<br/>Currently, only applicable when using etcd as the metadata store. |
| `backend_client.keep_alive_timeout` | String | `3s` | The keep alive timeout for backend client. |
| `backend_client.keep_alive_interval` | String | `10s` | The keep alive interval for backend client. |
| `backend_client.connect_timeout` | String | `3s` | The connect timeout for backend client. |
| `grpc` | -- | -- | The gRPC server options. | | `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:3002` | The address to bind the gRPC server. | | `grpc.bind_addr` | String | `127.0.0.1:3002` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. | | `grpc.server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. | | `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. | | `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. | | `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `grpc.http2_keep_alive_interval` | String | `10s` | The server side HTTP/2 keep-alive interval |
| `grpc.http2_keep_alive_timeout` | String | `3s` | The server side HTTP/2 keep-alive timeout. |
| `http` | -- | -- | The HTTP server options. | | `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. | | `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. | | `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
@@ -476,6 +485,8 @@
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.<br/>**It's only used when the provider is `raft_engine`**. | | `wal.sync_period` | String | `10s` | Duration for fsyncing log files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. | | `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. |
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. | | `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.<br/>**It's only used when the provider is `kafka`**. |
| `wal.connect_timeout` | String | `3s` | The connect timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
| `wal.timeout` | String | `3s` | The timeout for kafka client.<br/>**It's only used when the provider is `kafka`**. |
| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.<br/>Warning: Kafka has a default limit of 1MB per message in a topic.<br/>**It's only used when the provider is `kafka`**. | | `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.<br/>Warning: Kafka has a default limit of 1MB per message in a topic.<br/>**It's only used when the provider is `kafka`**. |
| `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.<br/>**It's only used when the provider is `kafka`**. | | `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.<br/>**It's only used when the provider is `kafka`**. |
| `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.<br/>**It's only used when the provider is `kafka`**. | | `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.<br/>**It's only used when the provider is `kafka`**. |

View File

@@ -169,6 +169,14 @@ recovery_parallelism = 2
## **It's only used when the provider is `kafka`**. ## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"] broker_endpoints = ["127.0.0.1:9092"]
## The connect timeout for kafka client.
## **It's only used when the provider is `kafka`**.
#+ connect_timeout = "3s"
## The timeout for kafka client.
## **It's only used when the provider is `kafka`**.
#+ timeout = "3s"
## The max size of a single producer batch. ## The max size of a single producer batch.
## Warning: Kafka has a default limit of 1MB per message in a topic. ## Warning: Kafka has a default limit of 1MB per message in a topic.
## **It's only used when the provider is `kafka`**. ## **It's only used when the provider is `kafka`**.

View File

@@ -71,6 +71,13 @@ allow_region_failover_on_local_wal = false
## Max allowed idle time before removing node info from metasrv memory. ## Max allowed idle time before removing node info from metasrv memory.
node_max_idle_time = "24hours" node_max_idle_time = "24hours"
## Base heartbeat interval for calculating distributed time constants.
## The frontend heartbeat interval is 6 times of the base heartbeat interval.
## The flownode/datanode heartbeat interval is 1 times of the base heartbeat interval.
## e.g., If the base heartbeat interval is 3s, the frontend heartbeat interval is 18s, the flownode/datanode heartbeat interval is 3s.
## If you change this value, you need to change the heartbeat interval of the flownode/frontend/datanode accordingly.
#+ heartbeat_interval = "3s"
## Whether to enable greptimedb telemetry. Enabled by default. ## Whether to enable greptimedb telemetry. Enabled by default.
#+ enable_telemetry = true #+ enable_telemetry = true
@@ -109,6 +116,16 @@ key_path = ""
## Like "/path/to/ca.crt" ## Like "/path/to/ca.crt"
ca_cert_path = "" ca_cert_path = ""
## The backend client options.
## Currently, only applicable when using etcd as the metadata store.
#+ [backend_client]
## The keep alive timeout for backend client.
#+ keep_alive_timeout = "3s"
## The keep alive interval for backend client.
#+ keep_alive_interval = "10s"
## The connect timeout for backend client.
#+ connect_timeout = "3s"
## The gRPC server options. ## The gRPC server options.
[grpc] [grpc]
## The address to bind the gRPC server. ## The address to bind the gRPC server.
@@ -123,6 +140,10 @@ runtime_size = 8
max_recv_message_size = "512MB" max_recv_message_size = "512MB"
## The maximum send message size for gRPC server. ## The maximum send message size for gRPC server.
max_send_message_size = "512MB" max_send_message_size = "512MB"
## The server side HTTP/2 keep-alive interval
#+ http2_keep_alive_interval = "10s"
## The server side HTTP/2 keep-alive timeout.
#+ http2_keep_alive_timeout = "3s"
## The HTTP server options. ## The HTTP server options.
[http] [http]

View File

@@ -230,6 +230,14 @@ recovery_parallelism = 2
## **It's only used when the provider is `kafka`**. ## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"] broker_endpoints = ["127.0.0.1:9092"]
## The connect timeout for kafka client.
## **It's only used when the provider is `kafka`**.
#+ connect_timeout = "3s"
## The timeout for kafka client.
## **It's only used when the provider is `kafka`**.
#+ timeout = "3s"
## Automatically create topics for WAL. ## Automatically create topics for WAL.
## Set to `true` to automatically create topics for WAL. ## Set to `true` to automatically create topics for WAL.
## Otherwise, use topics named `topic_name_prefix_[0..num_topics)` ## Otherwise, use topics named `topic_name_prefix_[0..num_topics)`

View File

@@ -19,7 +19,7 @@ use common_error::ext::BoxedError;
use common_meta::kv_backend::KvBackendRef; use common_meta::kv_backend::KvBackendRef;
use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::etcd::EtcdStore;
use meta_srv::metasrv::BackendImpl; use meta_srv::metasrv::{BackendClientOptions, BackendImpl};
use meta_srv::utils::etcd::create_etcd_client_with_tls; use meta_srv::utils::etcd::create_etcd_client_with_tls;
use servers::tls::{TlsMode, TlsOption}; use servers::tls::{TlsMode, TlsOption};
@@ -112,7 +112,11 @@ impl StoreConfig {
let kvbackend = match self.backend { let kvbackend = match self.backend {
BackendImpl::EtcdStore => { BackendImpl::EtcdStore => {
let tls_config = self.tls_config(); let tls_config = self.tls_config();
let etcd_client = create_etcd_client_with_tls(store_addrs, tls_config.as_ref()) let etcd_client = create_etcd_client_with_tls(
store_addrs,
&BackendClientOptions::default(),
tls_config.as_ref(),
)
.await .await
.map_err(BoxedError::new)?; .map_err(BoxedError::new)?;
Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops)) Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))

View File

@@ -20,6 +20,7 @@ use async_trait::async_trait;
use clap::Parser; use clap::Parser;
use common_base::Plugins; use common_base::Plugins;
use common_config::Configurable; use common_config::Configurable;
use common_meta::distributed_time_constants::init_distributed_time_constants;
use common_telemetry::info; use common_telemetry::info;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions}; use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
use common_version::{short_version, verbose_version}; use common_version::{short_version, verbose_version};
@@ -327,6 +328,7 @@ impl StartCommand {
log_versions(verbose_version(), short_version(), APP_NAME); log_versions(verbose_version(), short_version(), APP_NAME);
maybe_activate_heap_profile(&opts.component.memory); maybe_activate_heap_profile(&opts.component.memory);
create_resource_limit_metrics(APP_NAME); create_resource_limit_metrics(APP_NAME);
init_distributed_time_constants(opts.component.heartbeat_interval);
info!("Metasrv start command: {:#?}", self); info!("Metasrv start command: {:#?}", self);

View File

@@ -12,27 +12,10 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::OnceLock;
use std::time::Duration; use std::time::Duration;
use etcd_client::ConnectOptions; pub const BASE_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(3);
/// Heartbeat interval time (is the basic unit of various time).
pub const HEARTBEAT_INTERVAL_MILLIS: u64 = 3000;
/// The frontend will also send heartbeats to Metasrv, sending an empty
/// heartbeat every HEARTBEAT_INTERVAL_MILLIS * 6 seconds.
pub const FRONTEND_HEARTBEAT_INTERVAL_MILLIS: u64 = HEARTBEAT_INTERVAL_MILLIS * 6;
/// The lease seconds of a region. It's set by 3 heartbeat intervals
/// (HEARTBEAT_INTERVAL_MILLIS × 3), plus some extra buffer (1 second).
pub const REGION_LEASE_SECS: u64 =
Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS * 3).as_secs() + 1;
/// When creating table or region failover, a target node needs to be selected.
/// If the node's lease has expired, the `Selector` will not select it.
pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS;
pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS;
/// The lease seconds of metasrv leader. /// The lease seconds of metasrv leader.
pub const META_LEASE_SECS: u64 = 5; pub const META_LEASE_SECS: u64 = 5;
@@ -52,14 +35,6 @@ pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS: Duration = Duration::from_
/// The keep-alive timeout of the heartbeat channel. /// The keep-alive timeout of the heartbeat channel.
pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS: Duration = Duration::from_secs(5); pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS: Duration = Duration::from_secs(5);
/// The default options for the etcd client.
pub fn default_etcd_client_options() -> ConnectOptions {
ConnectOptions::new()
.with_keep_alive_while_idle(true)
.with_keep_alive(Duration::from_secs(15), Duration::from_secs(5))
.with_connect_timeout(Duration::from_secs(10))
}
/// The default mailbox round-trip timeout. /// The default mailbox round-trip timeout.
pub const MAILBOX_RTT_SECS: u64 = 1; pub const MAILBOX_RTT_SECS: u64 = 1;
@@ -68,3 +43,60 @@ pub const TOPIC_STATS_REPORT_INTERVAL_SECS: u64 = 15;
/// The retention seconds of topic stats. /// The retention seconds of topic stats.
pub const TOPIC_STATS_RETENTION_SECS: u64 = TOPIC_STATS_REPORT_INTERVAL_SECS * 100; pub const TOPIC_STATS_RETENTION_SECS: u64 = TOPIC_STATS_REPORT_INTERVAL_SECS * 100;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
/// The distributed time constants.
pub struct DistributedTimeConstants {
pub heartbeat_interval: Duration,
pub frontend_heartbeat_interval: Duration,
pub region_lease: Duration,
pub datanode_lease: Duration,
pub flownode_lease: Duration,
}
/// The frontend heartbeat interval is 6 times of the base heartbeat interval.
pub fn frontend_heartbeat_interval(base_heartbeat_interval: Duration) -> Duration {
base_heartbeat_interval * 6
}
impl DistributedTimeConstants {
/// Create a new DistributedTimeConstants from the heartbeat interval.
pub fn from_heartbeat_interval(heartbeat_interval: Duration) -> Self {
let region_lease = heartbeat_interval * 3 + Duration::from_secs(1);
let datanode_lease = region_lease;
let flownode_lease = datanode_lease;
Self {
heartbeat_interval,
frontend_heartbeat_interval: frontend_heartbeat_interval(heartbeat_interval),
region_lease,
datanode_lease,
flownode_lease,
}
}
}
impl Default for DistributedTimeConstants {
fn default() -> Self {
Self::from_heartbeat_interval(BASE_HEARTBEAT_INTERVAL)
}
}
static DEFAULT_DISTRIBUTED_TIME_CONSTANTS: OnceLock<DistributedTimeConstants> = OnceLock::new();
/// Get the default distributed time constants.
pub fn default_distributed_time_constants() -> &'static DistributedTimeConstants {
DEFAULT_DISTRIBUTED_TIME_CONSTANTS.get_or_init(Default::default)
}
/// Initialize the default distributed time constants.
pub fn init_distributed_time_constants(base_heartbeat_interval: Duration) {
let distributed_time_constants =
DistributedTimeConstants::from_heartbeat_interval(base_heartbeat_interval);
DEFAULT_DISTRIBUTED_TIME_CONSTANTS
.set(distributed_time_constants)
.expect("Failed to set default distributed time constants");
common_telemetry::info!(
"Initialized default distributed time constants: {:#?}",
distributed_time_constants
);
}

View File

@@ -14,7 +14,7 @@
use common_telemetry::{debug, error, info}; use common_telemetry::{debug, error, info};
use common_wal::config::kafka::common::{ use common_wal::config::kafka::common::{
DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT, KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_BACKOFF_CONFIG, KafkaConnectionConfig, KafkaTopicConfig,
}; };
use rskafka::client::error::Error as RsKafkaError; use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists; use rskafka::client::error::ProtocolError::TopicAlreadyExists;
@@ -211,7 +211,8 @@ pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Cl
// Builds an kafka controller client for creating topics. // Builds an kafka controller client for creating topics.
let mut builder = ClientBuilder::new(connection.broker_endpoints.clone()) let mut builder = ClientBuilder::new(connection.broker_endpoints.clone())
.backoff_config(DEFAULT_BACKOFF_CONFIG) .backoff_config(DEFAULT_BACKOFF_CONFIG)
.connect_timeout(Some(DEFAULT_CONNECT_TIMEOUT)); .connect_timeout(Some(connection.connect_timeout))
.timeout(Some(connection.timeout));
if let Some(sasl) = &connection.sasl { if let Some(sasl) = &connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config()); builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
}; };

View File

@@ -206,6 +206,8 @@ mod tests {
client_cert_path: None, client_cert_path: None,
client_key_path: None, client_key_path: None,
}), }),
connect_timeout: Duration::from_secs(3),
timeout: Duration::from_secs(3),
}, },
kafka_topic: KafkaTopicConfig { kafka_topic: KafkaTopicConfig {
num_topics: 32, num_topics: 32,
@@ -239,6 +241,8 @@ mod tests {
client_cert_path: None, client_cert_path: None,
client_key_path: None, client_key_path: None,
}), }),
connect_timeout: Duration::from_secs(3),
timeout: Duration::from_secs(3),
}, },
max_batch_bytes: ReadableSize::mb(1), max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100), consumer_wait_timeout: Duration::from_millis(100),

View File

@@ -36,9 +36,6 @@ pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
deadline: Some(Duration::from_secs(3)), deadline: Some(Duration::from_secs(3)),
}; };
/// The default connect timeout for kafka client.
pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
/// Default interval for auto WAL pruning. /// Default interval for auto WAL pruning.
pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::from_mins(30); pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::from_mins(30);
/// Default limit for concurrent auto pruning tasks. /// Default limit for concurrent auto pruning tasks.
@@ -167,6 +164,12 @@ pub struct KafkaConnectionConfig {
pub sasl: Option<KafkaClientSasl>, pub sasl: Option<KafkaClientSasl>,
/// Client TLS config /// Client TLS config
pub tls: Option<KafkaClientTls>, pub tls: Option<KafkaClientTls>,
/// The connect timeout for kafka client.
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,
/// The timeout for kafka client.
#[serde(with = "humantime_serde")]
pub timeout: Duration,
} }
impl Default for KafkaConnectionConfig { impl Default for KafkaConnectionConfig {
@@ -175,6 +178,8 @@ impl Default for KafkaConnectionConfig {
broker_endpoints: vec![BROKER_ENDPOINT.to_string()], broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
sasl: None, sasl: None,
tls: None, tls: None,
connect_timeout: Duration::from_secs(3),
timeout: Duration::from_secs(3),
} }
} }
} }

View File

@@ -142,7 +142,6 @@ impl Frontend {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use api::v1::meta::heartbeat_server::HeartbeatServer; use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::mailbox_message::Payload; use api::v1::meta::mailbox_message::Payload;
@@ -157,7 +156,7 @@ mod tests {
use common_error::from_header_to_err_code_msg; use common_error::from_header_to_err_code_msg;
use common_error::status_code::StatusCode; use common_error::status_code::StatusCode;
use common_grpc::channel_manager::ChannelManager; use common_grpc::channel_manager::ChannelManager;
use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS; use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::suspend::SuspendHandler; use common_meta::heartbeat::handler::suspend::SuspendHandler;
@@ -409,7 +408,9 @@ mod tests {
let meta_client = create_meta_client(&meta_client_options, server.clone()).await; let meta_client = create_meta_client(&meta_client_options, server.clone()).await;
let frontend = create_frontend(&options, meta_client).await?; let frontend = create_frontend(&options, meta_client).await?;
tokio::time::sleep(Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS)).await; let frontend_heartbeat_interval =
default_distributed_time_constants().frontend_heartbeat_interval;
tokio::time::sleep(frontend_heartbeat_interval).await;
// initial state: not suspend: // initial state: not suspend:
assert!(!frontend.instance.is_suspended()); assert!(!frontend.instance.is_suspended());
verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await; verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;
@@ -426,7 +427,7 @@ mod tests {
// make heartbeat server returned "suspend" instruction, // make heartbeat server returned "suspend" instruction,
server.suspend.store(true, Ordering::Relaxed); server.suspend.store(true, Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS)).await; tokio::time::sleep(frontend_heartbeat_interval).await;
// ... then the frontend is suspended: // ... then the frontend is suspended:
assert!(frontend.instance.is_suspended()); assert!(frontend.instance.is_suspended());
verify_suspend_state_by_http( verify_suspend_state_by_http(
@@ -442,7 +443,7 @@ mod tests {
// make heartbeat server NOT returned "suspend" instruction, // make heartbeat server NOT returned "suspend" instruction,
server.suspend.store(false, Ordering::Relaxed); server.suspend.store(false, Ordering::Relaxed);
tokio::time::sleep(Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS)).await; tokio::time::sleep(frontend_heartbeat_interval).await;
// ... then frontend's suspend state is cleared: // ... then frontend's suspend state is cleared:
assert!(!frontend.instance.is_suspended()); assert!(!frontend.instance.is_suspended());
verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await; verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await;

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use common_wal::config::kafka::DatanodeKafkaConfig; use common_wal::config::kafka::DatanodeKafkaConfig;
use common_wal::config::kafka::common::{DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT}; use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
use dashmap::DashMap; use dashmap::DashMap;
use rskafka::client::ClientBuilder; use rskafka::client::ClientBuilder;
use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling}; use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling};
@@ -79,7 +79,8 @@ impl ClientManager {
// Sets backoff config for the top-level kafka client and all clients constructed by it. // Sets backoff config for the top-level kafka client and all clients constructed by it.
let mut builder = ClientBuilder::new(config.connection.broker_endpoints.clone()) let mut builder = ClientBuilder::new(config.connection.broker_endpoints.clone())
.backoff_config(DEFAULT_BACKOFF_CONFIG) .backoff_config(DEFAULT_BACKOFF_CONFIG)
.connect_timeout(Some(DEFAULT_CONNECT_TIMEOUT)); .connect_timeout(Some(config.connection.connect_timeout))
.timeout(Some(config.connection.timeout));
if let Some(sasl) = &config.connection.sasl { if let Some(sasl) = &config.connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config()); builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
}; };

View File

@@ -14,7 +14,6 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::cluster_server::ClusterServer; use api::v1::meta::cluster_server::ClusterServer;
use api::v1::meta::heartbeat_server::HeartbeatServer; use api::v1::meta::heartbeat_server::HeartbeatServer;
@@ -60,11 +59,6 @@ use crate::service::admin::admin_axum_router;
use crate::utils::etcd::create_etcd_client_with_tls; use crate::utils::etcd::create_etcd_client_with_tls;
use crate::{Result, error}; use crate::{Result, error};
/// The default keep-alive interval for gRPC.
const DEFAULT_GRPC_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(10);
/// The default keep-alive timeout for gRPC.
const DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10);
pub struct MetasrvInstance { pub struct MetasrvInstance {
metasrv: Arc<Metasrv>, metasrv: Arc<Metasrv>,
@@ -255,8 +249,8 @@ pub fn router(metasrv: Arc<Metasrv>) -> Router {
// for admin services // for admin services
.accept_http1(true) .accept_http1(true)
// For quick network failures detection. // For quick network failures detection.
.http2_keepalive_interval(Some(DEFAULT_GRPC_KEEP_ALIVE_INTERVAL)) .http2_keepalive_interval(Some(metasrv.options().grpc.http2_keep_alive_interval))
.http2_keepalive_timeout(Some(DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT)); .http2_keepalive_timeout(Some(metasrv.options().grpc.http2_keep_alive_timeout));
let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone())); let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone())); let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone())); let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
@@ -273,8 +267,12 @@ pub async fn metasrv_builder(
(Some(kv_backend), _) => (kv_backend, None), (Some(kv_backend), _) => (kv_backend, None),
(None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None), (None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
(None, BackendImpl::EtcdStore) => { (None, BackendImpl::EtcdStore) => {
let etcd_client = let etcd_client = create_etcd_client_with_tls(
create_etcd_client_with_tls(&opts.store_addrs, opts.backend_tls.as_ref()).await?; &opts.store_addrs,
&opts.backend_client,
opts.backend_tls.as_ref(),
)
.await?;
let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops); let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
let election = EtcdElection::with_etcd_client( let election = EtcdElection::with_etcd_client(
&opts.grpc.server_addr, &opts.grpc.server_addr,

View File

@@ -16,13 +16,9 @@ pub mod lease;
pub mod node_info; pub mod node_info;
pub mod utils; pub mod utils;
use std::time::Duration;
use api::v1::meta::heartbeat_request::NodeWorkloads; use api::v1::meta::heartbeat_request::NodeWorkloads;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::distributed_time_constants::{ use common_meta::distributed_time_constants::default_distributed_time_constants;
DATANODE_LEASE_SECS, FLOWNODE_LEASE_SECS, FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
};
use common_meta::error::Result; use common_meta::error::Result;
use common_meta::peer::{Peer, PeerDiscovery, PeerResolver}; use common_meta::peer::{Peer, PeerDiscovery, PeerResolver};
use common_meta::{DatanodeId, FlownodeId}; use common_meta::{DatanodeId, FlownodeId};
@@ -38,7 +34,7 @@ impl PeerDiscovery for MetaPeerClient {
utils::alive_frontends( utils::alive_frontends(
&DefaultSystemTimer, &DefaultSystemTimer,
self, self,
Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS), default_distributed_time_constants().frontend_heartbeat_interval,
) )
.await .await
.map_err(BoxedError::new) .map_err(BoxedError::new)
@@ -52,7 +48,7 @@ impl PeerDiscovery for MetaPeerClient {
utils::alive_datanodes( utils::alive_datanodes(
&DefaultSystemTimer, &DefaultSystemTimer,
self, self,
Duration::from_secs(DATANODE_LEASE_SECS), default_distributed_time_constants().datanode_lease,
filter, filter,
) )
.await .await
@@ -67,7 +63,7 @@ impl PeerDiscovery for MetaPeerClient {
utils::alive_flownodes( utils::alive_flownodes(
&DefaultSystemTimer, &DefaultSystemTimer,
self, self,
Duration::from_secs(FLOWNODE_LEASE_SECS), default_distributed_time_constants().flownode_lease,
filter, filter,
) )
.await .await

View File

@@ -102,7 +102,7 @@ mod tests {
use api::v1::meta::heartbeat_request::NodeWorkloads; use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads}; use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads};
use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, Role}; use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, Role};
use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS; use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::kv_backend::ResettableKvBackendRef; use common_meta::kv_backend::ResettableKvBackendRef;
use common_meta::peer::{Peer, PeerDiscovery}; use common_meta::peer::{Peer, PeerDiscovery};
use common_meta::rpc::store::PutRequest; use common_meta::rpc::store::PutRequest;
@@ -473,8 +473,10 @@ mod tests {
let client = create_meta_peer_client(); let client = create_meta_peer_client();
let in_memory = client.memory_backend(); let in_memory = client.memory_backend();
let frontend_heartbeat_interval =
default_distributed_time_constants().frontend_heartbeat_interval;
let last_activity_ts = let last_activity_ts =
current_time_millis() - FRONTEND_HEARTBEAT_INTERVAL_MILLIS as i64 - 1000; current_time_millis() - frontend_heartbeat_interval.as_millis() as i64 - 1000;
let active_frontend_node = NodeInfo { let active_frontend_node = NodeInfo {
peer: Peer { peer: Peer {
id: 0, id: 0,

View File

@@ -15,7 +15,6 @@
use std::collections::VecDeque; use std::collections::VecDeque;
use std::time::Duration; use std::time::Duration;
use common_meta::distributed_time_constants;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
const FIRST_HEARTBEAT_ESTIMATE_MILLIS: i64 = 1000; const FIRST_HEARTBEAT_ESTIMATE_MILLIS: i64 = 1000;
@@ -79,9 +78,7 @@ impl Default for PhiAccrualFailureDetectorOptions {
Self { Self {
threshold: 8_f32, threshold: 8_f32,
min_std_deviation: Duration::from_millis(100), min_std_deviation: Duration::from_millis(100),
acceptable_heartbeat_pause: Duration::from_secs( acceptable_heartbeat_pause: Duration::from_secs(10),
distributed_time_constants::DATANODE_LEASE_SECS,
),
} }
} }
} }

View File

@@ -134,7 +134,7 @@ mod test {
use std::sync::Arc; use std::sync::Arc;
use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat}; use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
use common_meta::distributed_time_constants; use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::key::TableMetadataManager; use common_meta::key::TableMetadataManager;
use common_meta::key::table_route::TableRouteValue; use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info; use common_meta::key::test_utils::new_test_table_info;
@@ -236,7 +236,7 @@ mod test {
let opening_region_keeper = Arc::new(MemoryRegionKeeper::default()); let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
let handler = RegionLeaseHandler::new( let handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS, default_distributed_time_constants().region_lease.as_secs(),
table_metadata_manager.clone(), table_metadata_manager.clone(),
opening_region_keeper.clone(), opening_region_keeper.clone(),
None, None,
@@ -266,7 +266,7 @@ mod test {
assert_eq!( assert_eq!(
acc.region_lease.as_ref().unwrap().lease_seconds, acc.region_lease.as_ref().unwrap().lease_seconds,
distributed_time_constants::REGION_LEASE_SECS default_distributed_time_constants().region_lease.as_secs()
); );
assert_region_lease( assert_region_lease(
@@ -300,7 +300,7 @@ mod test {
assert_eq!( assert_eq!(
acc.region_lease.as_ref().unwrap().lease_seconds, acc.region_lease.as_ref().unwrap().lease_seconds,
distributed_time_constants::REGION_LEASE_SECS default_distributed_time_constants().region_lease.as_secs()
); );
assert_region_lease( assert_region_lease(
@@ -379,7 +379,7 @@ mod test {
}); });
let handler = RegionLeaseHandler::new( let handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS, default_distributed_time_constants().region_lease.as_secs(),
table_metadata_manager.clone(), table_metadata_manager.clone(),
Default::default(), Default::default(),
None, None,
@@ -461,7 +461,7 @@ mod test {
..Default::default() ..Default::default()
}); });
let handler = RegionLeaseHandler::new( let handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS, default_distributed_time_constants().region_lease.as_secs(),
table_metadata_manager.clone(), table_metadata_manager.clone(),
Default::default(), Default::default(),
None, None,

View File

@@ -27,7 +27,7 @@ use common_event_recorder::EventRecorderOptions;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl_manager::DdlManagerRef; use common_meta::ddl_manager::DdlManagerRef;
use common_meta::distributed_time_constants; use common_meta::distributed_time_constants::{self, default_distributed_time_constants};
use common_meta::key::TableMetadataManagerRef; use common_meta::key::TableMetadataManagerRef;
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef; use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
@@ -121,6 +121,27 @@ impl Default for StatsPersistenceOptions {
} }
} }
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
#[serde(default)]
pub struct BackendClientOptions {
#[serde(with = "humantime_serde")]
pub keep_alive_timeout: Duration,
#[serde(with = "humantime_serde")]
pub keep_alive_interval: Duration,
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,
}
impl Default for BackendClientOptions {
fn default() -> Self {
Self {
keep_alive_interval: Duration::from_secs(10),
keep_alive_timeout: Duration::from_secs(3),
connect_timeout: Duration::from_secs(3),
}
}
}
#[derive(Clone, PartialEq, Serialize, Deserialize)] #[derive(Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)] #[serde(default)]
pub struct MetasrvOptions { pub struct MetasrvOptions {
@@ -136,12 +157,22 @@ pub struct MetasrvOptions {
/// Only applicable when using PostgreSQL or MySQL as the metadata store /// Only applicable when using PostgreSQL or MySQL as the metadata store
#[serde(default)] #[serde(default)]
pub backend_tls: Option<TlsOption>, pub backend_tls: Option<TlsOption>,
/// The backend client options.
/// Currently, only applicable when using etcd as the metadata store.
#[serde(default)]
pub backend_client: BackendClientOptions,
/// The type of selector. /// The type of selector.
pub selector: SelectorType, pub selector: SelectorType,
/// Whether to use the memory store. /// Whether to use the memory store.
pub use_memory_store: bool, pub use_memory_store: bool,
/// Whether to enable region failover. /// Whether to enable region failover.
pub enable_region_failover: bool, pub enable_region_failover: bool,
/// The base heartbeat interval.
///
/// This value is used to calculate the distributed time constants for components.
/// e.g., the region lease time is `heartbeat_interval * 3 + Duration::from_secs(1)`.
#[serde(with = "humantime_serde")]
pub heartbeat_interval: Duration,
/// The delay before starting region failure detection. /// The delay before starting region failure detection.
/// This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started. /// This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.
/// Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled. /// Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled.
@@ -240,7 +271,9 @@ impl fmt::Debug for MetasrvOptions {
.field("tracing", &self.tracing) .field("tracing", &self.tracing)
.field("backend", &self.backend) .field("backend", &self.backend)
.field("event_recorder", &self.event_recorder) .field("event_recorder", &self.event_recorder)
.field("stats_persistence", &self.stats_persistence); .field("stats_persistence", &self.stats_persistence)
.field("heartbeat_interval", &self.heartbeat_interval)
.field("backend_client", &self.backend_client);
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
debug_struct.field("meta_table_name", &self.meta_table_name); debug_struct.field("meta_table_name", &self.meta_table_name);
@@ -270,6 +303,7 @@ impl Default for MetasrvOptions {
selector: SelectorType::default(), selector: SelectorType::default(),
use_memory_store: false, use_memory_store: false,
enable_region_failover: false, enable_region_failover: false,
heartbeat_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
region_failure_detector_initialization_delay: Duration::from_secs(10 * 60), region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
allow_region_failover_on_local_wal: false, allow_region_failover_on_local_wal: false,
grpc: GrpcOptions { grpc: GrpcOptions {
@@ -307,6 +341,7 @@ impl Default for MetasrvOptions {
event_recorder: EventRecorderOptions::default(), event_recorder: EventRecorderOptions::default(),
stats_persistence: StatsPersistenceOptions::default(), stats_persistence: StatsPersistenceOptions::default(),
gc: GcSchedulerOptions::default(), gc: GcSchedulerOptions::default(),
backend_client: BackendClientOptions::default(),
} }
} }
} }
@@ -747,7 +782,7 @@ impl Metasrv {
&DefaultSystemTimer, &DefaultSystemTimer,
self.meta_peer_client.as_ref(), self.meta_peer_client.as_ref(),
peer_id, peer_id,
Duration::from_secs(distributed_time_constants::DATANODE_LEASE_SECS), default_distributed_time_constants().datanode_lease,
) )
.await .await
} }

View File

@@ -29,7 +29,7 @@ use common_meta::ddl::{
DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef, DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
}; };
use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef}; use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef};
use common_meta::distributed_time_constants::{self}; use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::key::TableMetadataManager; use common_meta::key::TableMetadataManager;
use common_meta::key::flow::FlowMetadataManager; use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::flow::flow_state::FlowStateManager; use common_meta::key::flow::flow_state::FlowStateManager;
@@ -513,7 +513,7 @@ impl MetasrvBuilder {
Some(handler_group_builder) => handler_group_builder, Some(handler_group_builder) => handler_group_builder,
None => { None => {
let region_lease_handler = RegionLeaseHandler::new( let region_lease_handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS, default_distributed_time_constants().region_lease.as_secs(),
table_metadata_manager.clone(), table_metadata_manager.clone(),
memory_region_keeper.clone(), memory_region_keeper.clone(),
customized_region_lease_renewer, customized_region_lease_renewer,

View File

@@ -921,7 +921,7 @@ mod tests {
use std::assert_matches::assert_matches; use std::assert_matches::assert_matches;
use std::sync::Arc; use std::sync::Arc;
use common_meta::distributed_time_constants::REGION_LEASE_SECS; use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::instruction::Instruction; use common_meta::instruction::Instruction;
use common_meta::key::test_utils::new_test_table_info; use common_meta::key::test_utils::new_test_table_info;
use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::rpc::router::{Region, RegionRoute};
@@ -1192,8 +1192,10 @@ mod tests {
.run_once() .run_once()
.await; .await;
let region_lease = default_distributed_time_constants().region_lease.as_secs();
// Ensure it didn't run into the slow path. // Ensure it didn't run into the slow path.
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2); assert!(timer.elapsed().as_secs() < region_lease / 2);
runner.suite.verify_table_metadata().await; runner.suite.verify_table_metadata().await;
} }
@@ -1539,8 +1541,9 @@ mod tests {
.run_once() .run_once()
.await; .await;
let region_lease = default_distributed_time_constants().region_lease.as_secs();
// Ensure it didn't run into the slow path. // Ensure it didn't run into the slow path.
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS); assert!(timer.elapsed().as_secs() < region_lease);
runner.suite.verify_table_metadata().await; runner.suite.verify_table_metadata().await;
} }
} }

View File

@@ -13,11 +13,10 @@
// limitations under the License. // limitations under the License.
use std::any::Any; use std::any::Any;
use std::time::Duration;
use api::v1::meta::MailboxMessage; use api::v1::meta::MailboxMessage;
use common_meta::RegionIdent; use common_meta::RegionIdent;
use common_meta::distributed_time_constants::REGION_LEASE_SECS; use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_procedure::{Context as ProcedureContext, Status}; use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{info, warn}; use common_telemetry::{info, warn};
@@ -30,9 +29,6 @@ use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::{Context, State}; use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel; use crate::service::mailbox::Channel;
/// Uses lease time of a region as the timeout of closing a downgraded region.
const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct CloseDowngradedRegion; pub struct CloseDowngradedRegion;
@@ -112,7 +108,7 @@ impl CloseDowngradedRegion {
let ch = Channel::Datanode(downgrade_leader_datanode.id); let ch = Channel::Datanode(downgrade_leader_datanode.id);
let receiver = ctx let receiver = ctx
.mailbox .mailbox
.send(&ch, msg, CLOSE_DOWNGRADED_REGION_TIMEOUT) .send(&ch, msg, default_distributed_time_constants().region_lease)
.await?; .await?;
match receiver.await { match receiver.await {

View File

@@ -17,7 +17,7 @@ use std::time::Duration;
use api::v1::meta::MailboxMessage; use api::v1::meta::MailboxMessage;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_meta::distributed_time_constants::REGION_LEASE_SECS; use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::instruction::{ use common_meta::instruction::{
DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply, DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply,
}; };
@@ -64,7 +64,7 @@ impl State for DowngradeLeaderRegion {
let now = Instant::now(); let now = Instant::now();
// Ensures the `leader_region_lease_deadline` must exist after recovering. // Ensures the `leader_region_lease_deadline` must exist after recovering.
ctx.volatile_ctx ctx.volatile_ctx
.set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS)); .set_leader_region_lease_deadline(default_distributed_time_constants().region_lease);
match self.downgrade_region_with_retry(ctx).await { match self.downgrade_region_with_retry(ctx).await {
Ok(_) => { Ok(_) => {
@@ -277,14 +277,14 @@ impl DowngradeLeaderRegion {
if let Some(last_connection_at) = last_connection_at { if let Some(last_connection_at) = last_connection_at {
let now = current_time_millis(); let now = current_time_millis();
let elapsed = now - last_connection_at; let elapsed = now - last_connection_at;
let region_lease = Duration::from_secs(REGION_LEASE_SECS); let region_lease = default_distributed_time_constants().region_lease;
// It's safe to update the region leader lease deadline here because: // It's safe to update the region leader lease deadline here because:
// 1. The old region leader has already been marked as downgraded in metadata, // 1. The old region leader has already been marked as downgraded in metadata,
// which means any attempts to renew its lease will be rejected. // which means any attempts to renew its lease will be rejected.
// 2. The pusher disconnect time record only gets removed when the datanode (from_peer) // 2. The pusher disconnect time record only gets removed when the datanode (from_peer)
// establishes a new heartbeat connection stream. // establishes a new heartbeat connection stream.
if elapsed >= (REGION_LEASE_SECS * 1000) as i64 { if elapsed >= (region_lease.as_secs() * 1000) as i64 {
ctx.volatile_ctx.reset_leader_region_lease_deadline(); ctx.volatile_ctx.reset_leader_region_lease_deadline();
info!( info!(
"Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {:?}", "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {:?}",
@@ -697,7 +697,8 @@ mod tests {
let procedure_ctx = new_procedure_context(); let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let elapsed = timer.elapsed().as_secs(); let elapsed = timer.elapsed().as_secs();
assert!(elapsed < REGION_LEASE_SECS / 2); let region_lease = default_distributed_time_constants().region_lease.as_secs();
assert!(elapsed < region_lease / 2);
assert_eq!( assert_eq!(
ctx.volatile_ctx ctx.volatile_ctx
.leader_region_last_entry_ids .leader_region_last_entry_ids

View File

@@ -14,11 +14,10 @@
use std::any::Any; use std::any::Any;
use std::ops::Div; use std::ops::Div;
use std::time::Duration;
use api::v1::meta::MailboxMessage; use api::v1::meta::MailboxMessage;
use common_meta::RegionIdent; use common_meta::RegionIdent;
use common_meta::distributed_time_constants::REGION_LEASE_SECS; use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::key::datanode_table::RegionInfo; use common_meta::key::datanode_table::RegionInfo;
use common_procedure::{Context as ProcedureContext, Status}; use common_procedure::{Context as ProcedureContext, Status};
@@ -33,9 +32,6 @@ use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
use crate::procedure::region_migration::{Context, State}; use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel; use crate::service::mailbox::Channel;
/// Uses lease time of a region as the timeout of opening a candidate region.
const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct OpenCandidateRegion; pub struct OpenCandidateRegion;
@@ -157,7 +153,9 @@ impl OpenCandidateRegion {
.context(error::ExceededDeadlineSnafu { .context(error::ExceededDeadlineSnafu {
operation: "Open candidate region", operation: "Open candidate region",
})?; })?;
let operation_timeout = operation_timeout.div(2).max(OPEN_CANDIDATE_REGION_TIMEOUT); let operation_timeout = operation_timeout
.div(2)
.max(default_distributed_time_constants().region_lease);
let ch = Channel::Datanode(candidate.id); let ch = Channel::Datanode(candidate.id);
let now = Instant::now(); let now = Instant::now();
let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?; let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;

View File

@@ -99,6 +99,7 @@ impl heartbeat_server::Heartbeat for Metasrv {
error!("Client disconnected: broken pipe"); error!("Client disconnected: broken pipe");
break; break;
} }
error!(err; "Sending heartbeat response error");
if tx.send(Err(err)).await.is_err() { if tx.send(Err(err)).await.is_err() {
info!("ReceiverStream was dropped; shutting down"); info!("ReceiverStream was dropped; shutting down");

View File

@@ -12,17 +12,18 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use common_meta::distributed_time_constants::default_etcd_client_options;
use common_meta::kv_backend::etcd::create_etcd_tls_options; use common_meta::kv_backend::etcd::create_etcd_tls_options;
use etcd_client::Client; use etcd_client::{Client, ConnectOptions};
use servers::tls::{TlsMode, TlsOption}; use servers::tls::{TlsMode, TlsOption};
use snafu::ResultExt; use snafu::ResultExt;
use crate::error::{self, BuildTlsOptionsSnafu, Result}; use crate::error::{self, BuildTlsOptionsSnafu, Result};
use crate::metasrv::BackendClientOptions;
/// Creates an etcd client with TLS configuration. /// Creates an etcd client with TLS configuration.
pub async fn create_etcd_client_with_tls( pub async fn create_etcd_client_with_tls(
store_addrs: &[String], store_addrs: &[String],
client_options: &BackendClientOptions,
tls_config: Option<&TlsOption>, tls_config: Option<&TlsOption>,
) -> Result<Client> { ) -> Result<Client> {
let etcd_endpoints = store_addrs let etcd_endpoints = store_addrs
@@ -31,7 +32,12 @@ pub async fn create_etcd_client_with_tls(
.filter(|x| !x.is_empty()) .filter(|x| !x.is_empty())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let mut connect_options = default_etcd_client_options(); let mut connect_options = ConnectOptions::new()
.with_keep_alive_while_idle(true)
.with_keep_alive(
client_options.keep_alive_interval,
client_options.keep_alive_timeout,
);
if let Some(tls_config) = tls_config if let Some(tls_config) = tls_config
&& let Some(tls_options) = create_etcd_tls_options(&convert_tls_option(tls_config)) && let Some(tls_options) = create_etcd_tls_options(&convert_tls_option(tls_config))
.context(BuildTlsOptionsSnafu)? .context(BuildTlsOptionsSnafu)?

View File

@@ -81,6 +81,12 @@ pub struct GrpcOptions {
/// Default to `None`, means infinite. /// Default to `None`, means infinite.
#[serde(with = "humantime_serde")] #[serde(with = "humantime_serde")]
pub max_connection_age: Option<Duration>, pub max_connection_age: Option<Duration>,
/// The HTTP/2 keep-alive interval.
#[serde(with = "humantime_serde")]
pub http2_keep_alive_interval: Duration,
/// The HTTP/2 keep-alive timeout.
#[serde(with = "humantime_serde")]
pub http2_keep_alive_timeout: Duration,
} }
impl GrpcOptions { impl GrpcOptions {
@@ -144,6 +150,8 @@ impl Default for GrpcOptions {
runtime_size: 8, runtime_size: 8,
tls: TlsOption::default(), tls: TlsOption::default(),
max_connection_age: None, max_connection_age: None,
http2_keep_alive_interval: Duration::from_secs(10),
http2_keep_alive_timeout: Duration::from_secs(3),
} }
} }
} }
@@ -164,6 +172,8 @@ impl GrpcOptions {
runtime_size: 8, runtime_size: 8,
tls: TlsOption::default(), tls: TlsOption::default(),
max_connection_age: None, max_connection_age: None,
http2_keep_alive_interval: Duration::from_secs(10),
http2_keep_alive_timeout: Duration::from_secs(3),
} }
} }

View File

@@ -34,12 +34,10 @@ impl HeartbeatOptions {
pub fn frontend_default() -> Self { pub fn frontend_default() -> Self {
Self { Self {
// Frontend can send heartbeat with a longer interval. // Frontend can send heartbeat with a longer interval.
interval: Duration::from_millis( interval: distributed_time_constants::frontend_heartbeat_interval(
distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS, distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
),
retry_interval: Duration::from_millis(
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
), ),
retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
} }
} }
} }
@@ -47,10 +45,8 @@ impl HeartbeatOptions {
impl Default for HeartbeatOptions { impl Default for HeartbeatOptions {
fn default() -> Self { fn default() -> Self {
Self { Self {
interval: Duration::from_millis(distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS), interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
retry_interval: Duration::from_millis( retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
),
} }
} }
} }

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use arbitrary::{Arbitrary, Unstructured}; use arbitrary::{Arbitrary, Unstructured};
use common_meta::distributed_time_constants; use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_telemetry::info; use common_telemetry::info;
use libfuzzer_sys::fuzz_target; use libfuzzer_sys::fuzz_target;
use rand::{Rng, SeedableRng}; use rand::{Rng, SeedableRng};
@@ -254,10 +254,7 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?; recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?;
wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await; wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await;
tokio::time::sleep(Duration::from_secs( tokio::time::sleep(default_distributed_time_constants().region_lease).await;
distributed_time_constants::REGION_LEASE_SECS,
))
.await;
// Validates value rows // Validates value rows
info!("Validates num of rows"); info!("Validates num of rows");

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use arbitrary::{Arbitrary, Unstructured}; use arbitrary::{Arbitrary, Unstructured};
use common_meta::distributed_time_constants; use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_telemetry::info; use common_telemetry::info;
use common_time::util::current_time_millis; use common_time::util::current_time_millis;
use futures::future::try_join_all; use futures::future::try_join_all;
@@ -322,10 +322,7 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?; recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?;
wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await; wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await;
tokio::time::sleep(Duration::from_secs( tokio::time::sleep(default_distributed_time_constants().region_lease).await;
distributed_time_constants::REGION_LEASE_SECS,
))
.await;
// Validates value rows // Validates value rows
info!("Validates num of rows"); info!("Validates num of rows");
for (table_ctx, expected_rows) in table_ctxs.iter().zip(affected_rows) { for (table_ctx, expected_rows) in table_ctxs.iter().zip(affected_rows) {

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use arbitrary::{Arbitrary, Unstructured}; use arbitrary::{Arbitrary, Unstructured};
use common_meta::distributed_time_constants; use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_telemetry::info; use common_telemetry::info;
use libfuzzer_sys::fuzz_target; use libfuzzer_sys::fuzz_target;
use rand::{Rng, SeedableRng}; use rand::{Rng, SeedableRng};
@@ -275,10 +275,7 @@ async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<
wait_for_migration(ctx, migration, &procedure_id).await; wait_for_migration(ctx, migration, &procedure_id).await;
} }
tokio::time::sleep(Duration::from_secs( tokio::time::sleep(default_distributed_time_constants().region_lease).await;
distributed_time_constants::REGION_LEASE_SECS,
))
.await;
Ok(()) Ok(())
} }

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use arbitrary::{Arbitrary, Unstructured}; use arbitrary::{Arbitrary, Unstructured};
use common_meta::distributed_time_constants; use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_telemetry::info; use common_telemetry::info;
use libfuzzer_sys::fuzz_target; use libfuzzer_sys::fuzz_target;
use rand::{Rng, SeedableRng}; use rand::{Rng, SeedableRng};
@@ -274,10 +274,7 @@ async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result<
.await; .await;
} }
tokio::time::sleep(Duration::from_secs( tokio::time::sleep(default_distributed_time_constants().region_lease).await;
distributed_time_constants::REGION_LEASE_SECS,
))
.await;
Ok(()) Ok(())
} }

View File

@@ -1397,6 +1397,8 @@ max_recv_message_size = "512MiB"
max_send_message_size = "512MiB" max_send_message_size = "512MiB"
flight_compression = "arrow_ipc" flight_compression = "arrow_ipc"
runtime_size = 8 runtime_size = 8
http2_keep_alive_interval = "10s"
http2_keep_alive_timeout = "3s"
[grpc.tls] [grpc.tls]
mode = "disable" mode = "disable"