diff --git a/config/config.md b/config/config.md index e61c48c43f..779c5a7ea8 100644 --- a/config/config.md +++ b/config/config.md @@ -83,6 +83,8 @@ | `wal.sync_period` | String | `10s` | Duration for fsyncing log files.
**It's only used when the provider is `raft_engine`**. | | `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. | | `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.
**It's only used when the provider is `kafka`**. | +| `wal.connect_timeout` | String | `3s` | The connect timeout for kafka client.
**It's only used when the provider is `kafka`**. | +| `wal.timeout` | String | `3s` | The timeout for kafka client.
**It's only used when the provider is `kafka`**. | | `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.
Set to `true` to automatically create topics for WAL.
Otherwise, use topics named `topic_name_prefix_[0..num_topics)` | | `wal.num_topics` | Integer | `64` | Number of topics.
**It's only used when the provider is `kafka`**. | | `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default)
**It's only used when the provider is `kafka`**. | @@ -352,6 +354,7 @@ | `region_failure_detector_initialization_delay` | String | `10m` | The delay before starting region failure detection.
This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.
Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled. | | `allow_region_failover_on_local_wal` | Bool | `false` | Whether to allow region failover on local WAL.
**This option is not recommended to be set to true, because it may lead to data loss during failover.** | | `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. | +| `heartbeat_interval` | String | `3s` | Base heartbeat interval for calculating distributed time constants.
The frontend heartbeat interval is 6 times of the base heartbeat interval.
The flownode/datanode heartbeat interval is 1 times of the base heartbeat interval.
e.g., If the base heartbeat interval is 3s, the frontend heartbeat interval is 18s, the flownode/datanode heartbeat interval is 3s.
If you change this value, you need to change the heartbeat interval of the flownode/frontend/datanode accordingly. | | `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. | | `runtime` | -- | -- | The runtime options. | | `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | @@ -361,12 +364,18 @@ | `backend_tls.cert_path` | String | `""` | Path to client certificate file (for client authentication)
Like "/path/to/client.crt" | | `backend_tls.key_path` | String | `""` | Path to client private key file (for client authentication)
Like "/path/to/client.key" | | `backend_tls.ca_cert_path` | String | `""` | Path to CA certificate file (for server certificate verification)
Required when using custom CAs or self-signed certificates
Leave empty to use system root certificates only
Like "/path/to/ca.crt" | +| `backend_client` | -- | -- | The backend client options.
Currently, only applicable when using etcd as the metadata store. | +| `backend_client.keep_alive_timeout` | String | `3s` | The keep alive timeout for backend client. | +| `backend_client.keep_alive_interval` | String | `10s` | The keep alive interval for backend client. | +| `backend_client.connect_timeout` | String | `3s` | The connect timeout for backend client. | | `grpc` | -- | -- | The gRPC server options. | | `grpc.bind_addr` | String | `127.0.0.1:3002` | The address to bind the gRPC server. | | `grpc.server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.
If left empty or unset, the server will automatically use the IP address of the first network interface
on the host, with the same port number as the one specified in `bind_addr`. | | `grpc.runtime_size` | Integer | `8` | The number of server worker threads. | | `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. | | `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. | +| `grpc.http2_keep_alive_interval` | String | `10s` | The server side HTTP/2 keep-alive interval | +| `grpc.http2_keep_alive_timeout` | String | `3s` | The server side HTTP/2 keep-alive timeout. | | `http` | -- | -- | The HTTP server options. | | `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. | | `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. | @@ -476,6 +485,8 @@ | `wal.sync_period` | String | `10s` | Duration for fsyncing log files.
**It's only used when the provider is `raft_engine`**. | | `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. | | `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.
**It's only used when the provider is `kafka`**. | +| `wal.connect_timeout` | String | `3s` | The connect timeout for kafka client.
**It's only used when the provider is `kafka`**. | +| `wal.timeout` | String | `3s` | The timeout for kafka client.
**It's only used when the provider is `kafka`**. | | `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.
Warning: Kafka has a default limit of 1MB per message in a topic.
**It's only used when the provider is `kafka`**. | | `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.
**It's only used when the provider is `kafka`**. | | `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.
**It's only used when the provider is `kafka`**. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 0a1358a6e7..85e5415e16 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -169,6 +169,14 @@ recovery_parallelism = 2 ## **It's only used when the provider is `kafka`**. broker_endpoints = ["127.0.0.1:9092"] +## The connect timeout for kafka client. +## **It's only used when the provider is `kafka`**. +#+ connect_timeout = "3s" + +## The timeout for kafka client. +## **It's only used when the provider is `kafka`**. +#+ timeout = "3s" + ## The max size of a single producer batch. ## Warning: Kafka has a default limit of 1MB per message in a topic. ## **It's only used when the provider is `kafka`**. diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 7997383a52..1afffe186f 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -71,6 +71,13 @@ allow_region_failover_on_local_wal = false ## Max allowed idle time before removing node info from metasrv memory. node_max_idle_time = "24hours" +## Base heartbeat interval for calculating distributed time constants. +## The frontend heartbeat interval is 6 times of the base heartbeat interval. +## The flownode/datanode heartbeat interval is 1 times of the base heartbeat interval. +## e.g., If the base heartbeat interval is 3s, the frontend heartbeat interval is 18s, the flownode/datanode heartbeat interval is 3s. +## If you change this value, you need to change the heartbeat interval of the flownode/frontend/datanode accordingly. +#+ heartbeat_interval = "3s" + ## Whether to enable greptimedb telemetry. Enabled by default. #+ enable_telemetry = true @@ -109,6 +116,16 @@ key_path = "" ## Like "/path/to/ca.crt" ca_cert_path = "" +## The backend client options. +## Currently, only applicable when using etcd as the metadata store. +#+ [backend_client] +## The keep alive timeout for backend client. +#+ keep_alive_timeout = "3s" +## The keep alive interval for backend client. +#+ keep_alive_interval = "10s" +## The connect timeout for backend client. +#+ connect_timeout = "3s" + ## The gRPC server options. [grpc] ## The address to bind the gRPC server. @@ -123,6 +140,10 @@ runtime_size = 8 max_recv_message_size = "512MB" ## The maximum send message size for gRPC server. max_send_message_size = "512MB" +## The server side HTTP/2 keep-alive interval +#+ http2_keep_alive_interval = "10s" +## The server side HTTP/2 keep-alive timeout. +#+ http2_keep_alive_timeout = "3s" ## The HTTP server options. [http] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index ec5b6301ff..6eb9a70f40 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -230,6 +230,14 @@ recovery_parallelism = 2 ## **It's only used when the provider is `kafka`**. broker_endpoints = ["127.0.0.1:9092"] +## The connect timeout for kafka client. +## **It's only used when the provider is `kafka`**. +#+ connect_timeout = "3s" + +## The timeout for kafka client. +## **It's only used when the provider is `kafka`**. +#+ timeout = "3s" + ## Automatically create topics for WAL. ## Set to `true` to automatically create topics for WAL. ## Otherwise, use topics named `topic_name_prefix_[0..num_topics)` diff --git a/src/cli/src/common/store.rs b/src/cli/src/common/store.rs index d6f1d89105..03415b93ee 100644 --- a/src/cli/src/common/store.rs +++ b/src/cli/src/common/store.rs @@ -19,7 +19,7 @@ use common_error::ext::BoxedError; use common_meta::kv_backend::KvBackendRef; use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::etcd::EtcdStore; -use meta_srv::metasrv::BackendImpl; +use meta_srv::metasrv::{BackendClientOptions, BackendImpl}; use meta_srv::utils::etcd::create_etcd_client_with_tls; use servers::tls::{TlsMode, TlsOption}; @@ -112,9 +112,13 @@ impl StoreConfig { let kvbackend = match self.backend { BackendImpl::EtcdStore => { let tls_config = self.tls_config(); - let etcd_client = create_etcd_client_with_tls(store_addrs, tls_config.as_ref()) - .await - .map_err(BoxedError::new)?; + let etcd_client = create_etcd_client_with_tls( + store_addrs, + &BackendClientOptions::default(), + tls_config.as_ref(), + ) + .await + .map_err(BoxedError::new)?; Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops)) } #[cfg(feature = "pg_kvbackend")] diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index ee67267de3..204ee25c8b 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use clap::Parser; use common_base::Plugins; use common_config::Configurable; +use common_meta::distributed_time_constants::init_distributed_time_constants; use common_telemetry::info; use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions}; use common_version::{short_version, verbose_version}; @@ -327,6 +328,7 @@ impl StartCommand { log_versions(verbose_version(), short_version(), APP_NAME); maybe_activate_heap_profile(&opts.component.memory); create_resource_limit_metrics(APP_NAME); + init_distributed_time_constants(opts.component.heartbeat_interval); info!("Metasrv start command: {:#?}", self); diff --git a/src/common/meta/src/distributed_time_constants.rs b/src/common/meta/src/distributed_time_constants.rs index 688e7a424a..bc94117cc0 100644 --- a/src/common/meta/src/distributed_time_constants.rs +++ b/src/common/meta/src/distributed_time_constants.rs @@ -12,27 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::OnceLock; use std::time::Duration; -use etcd_client::ConnectOptions; - -/// Heartbeat interval time (is the basic unit of various time). -pub const HEARTBEAT_INTERVAL_MILLIS: u64 = 3000; - -/// The frontend will also send heartbeats to Metasrv, sending an empty -/// heartbeat every HEARTBEAT_INTERVAL_MILLIS * 6 seconds. -pub const FRONTEND_HEARTBEAT_INTERVAL_MILLIS: u64 = HEARTBEAT_INTERVAL_MILLIS * 6; - -/// The lease seconds of a region. It's set by 3 heartbeat intervals -/// (HEARTBEAT_INTERVAL_MILLIS × 3), plus some extra buffer (1 second). -pub const REGION_LEASE_SECS: u64 = - Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS * 3).as_secs() + 1; - -/// When creating table or region failover, a target node needs to be selected. -/// If the node's lease has expired, the `Selector` will not select it. -pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS; - -pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS; +pub const BASE_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(3); /// The lease seconds of metasrv leader. pub const META_LEASE_SECS: u64 = 5; @@ -52,14 +35,6 @@ pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS: Duration = Duration::from_ /// The keep-alive timeout of the heartbeat channel. pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS: Duration = Duration::from_secs(5); -/// The default options for the etcd client. -pub fn default_etcd_client_options() -> ConnectOptions { - ConnectOptions::new() - .with_keep_alive_while_idle(true) - .with_keep_alive(Duration::from_secs(15), Duration::from_secs(5)) - .with_connect_timeout(Duration::from_secs(10)) -} - /// The default mailbox round-trip timeout. pub const MAILBOX_RTT_SECS: u64 = 1; @@ -68,3 +43,60 @@ pub const TOPIC_STATS_REPORT_INTERVAL_SECS: u64 = 15; /// The retention seconds of topic stats. pub const TOPIC_STATS_RETENTION_SECS: u64 = TOPIC_STATS_REPORT_INTERVAL_SECS * 100; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +/// The distributed time constants. +pub struct DistributedTimeConstants { + pub heartbeat_interval: Duration, + pub frontend_heartbeat_interval: Duration, + pub region_lease: Duration, + pub datanode_lease: Duration, + pub flownode_lease: Duration, +} + +/// The frontend heartbeat interval is 6 times of the base heartbeat interval. +pub fn frontend_heartbeat_interval(base_heartbeat_interval: Duration) -> Duration { + base_heartbeat_interval * 6 +} + +impl DistributedTimeConstants { + /// Create a new DistributedTimeConstants from the heartbeat interval. + pub fn from_heartbeat_interval(heartbeat_interval: Duration) -> Self { + let region_lease = heartbeat_interval * 3 + Duration::from_secs(1); + let datanode_lease = region_lease; + let flownode_lease = datanode_lease; + Self { + heartbeat_interval, + frontend_heartbeat_interval: frontend_heartbeat_interval(heartbeat_interval), + region_lease, + datanode_lease, + flownode_lease, + } + } +} + +impl Default for DistributedTimeConstants { + fn default() -> Self { + Self::from_heartbeat_interval(BASE_HEARTBEAT_INTERVAL) + } +} + +static DEFAULT_DISTRIBUTED_TIME_CONSTANTS: OnceLock = OnceLock::new(); + +/// Get the default distributed time constants. +pub fn default_distributed_time_constants() -> &'static DistributedTimeConstants { + DEFAULT_DISTRIBUTED_TIME_CONSTANTS.get_or_init(Default::default) +} + +/// Initialize the default distributed time constants. +pub fn init_distributed_time_constants(base_heartbeat_interval: Duration) { + let distributed_time_constants = + DistributedTimeConstants::from_heartbeat_interval(base_heartbeat_interval); + DEFAULT_DISTRIBUTED_TIME_CONSTANTS + .set(distributed_time_constants) + .expect("Failed to set default distributed time constants"); + common_telemetry::info!( + "Initialized default distributed time constants: {:#?}", + distributed_time_constants + ); +} diff --git a/src/common/meta/src/wal_options_allocator/topic_creator.rs b/src/common/meta/src/wal_options_allocator/topic_creator.rs index 0c3caf215a..b0ab863677 100644 --- a/src/common/meta/src/wal_options_allocator/topic_creator.rs +++ b/src/common/meta/src/wal_options_allocator/topic_creator.rs @@ -14,7 +14,7 @@ use common_telemetry::{debug, error, info}; use common_wal::config::kafka::common::{ - DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT, KafkaConnectionConfig, KafkaTopicConfig, + DEFAULT_BACKOFF_CONFIG, KafkaConnectionConfig, KafkaTopicConfig, }; use rskafka::client::error::Error as RsKafkaError; use rskafka::client::error::ProtocolError::TopicAlreadyExists; @@ -211,7 +211,8 @@ pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result, /// Client TLS config pub tls: Option, + /// The connect timeout for kafka client. + #[serde(with = "humantime_serde")] + pub connect_timeout: Duration, + /// The timeout for kafka client. + #[serde(with = "humantime_serde")] + pub timeout: Duration, } impl Default for KafkaConnectionConfig { @@ -175,6 +178,8 @@ impl Default for KafkaConnectionConfig { broker_endpoints: vec![BROKER_ENDPOINT.to_string()], sasl: None, tls: None, + connect_timeout: Duration::from_secs(3), + timeout: Duration::from_secs(3), } } } diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index ac617c8b08..b782454660 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -142,7 +142,6 @@ impl Frontend { #[cfg(test)] mod tests { use std::sync::atomic::{AtomicBool, Ordering}; - use std::time::Duration; use api::v1::meta::heartbeat_server::HeartbeatServer; use api::v1::meta::mailbox_message::Payload; @@ -157,7 +156,7 @@ mod tests { use common_error::from_header_to_err_code_msg; use common_error::status_code::StatusCode; use common_grpc::channel_manager::ChannelManager; - use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS; + use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::suspend::SuspendHandler; @@ -409,7 +408,9 @@ mod tests { let meta_client = create_meta_client(&meta_client_options, server.clone()).await; let frontend = create_frontend(&options, meta_client).await?; - tokio::time::sleep(Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS)).await; + let frontend_heartbeat_interval = + default_distributed_time_constants().frontend_heartbeat_interval; + tokio::time::sleep(frontend_heartbeat_interval).await; // initial state: not suspend: assert!(!frontend.instance.is_suspended()); verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await; @@ -426,7 +427,7 @@ mod tests { // make heartbeat server returned "suspend" instruction, server.suspend.store(true, Ordering::Relaxed); - tokio::time::sleep(Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS)).await; + tokio::time::sleep(frontend_heartbeat_interval).await; // ... then the frontend is suspended: assert!(frontend.instance.is_suspended()); verify_suspend_state_by_http( @@ -442,7 +443,7 @@ mod tests { // make heartbeat server NOT returned "suspend" instruction, server.suspend.store(false, Ordering::Relaxed); - tokio::time::sleep(Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS)).await; + tokio::time::sleep(frontend_heartbeat_interval).await; // ... then frontend's suspend state is cleared: assert!(!frontend.instance.is_suspended()); verify_suspend_state_by_http(&frontend, Ok(r#"[{"records":{"schema":{"column_schemas":[{"name":"Int64(1)","data_type":"Int64"}]},"rows":[[1]],"total_rows":1}}]"#)).await; diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 49d363ad14..e594c9148e 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use common_wal::config::kafka::DatanodeKafkaConfig; -use common_wal::config::kafka::common::{DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT}; +use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG; use dashmap::DashMap; use rskafka::client::ClientBuilder; use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling}; @@ -79,7 +79,8 @@ impl ClientManager { // Sets backoff config for the top-level kafka client and all clients constructed by it. let mut builder = ClientBuilder::new(config.connection.broker_endpoints.clone()) .backoff_config(DEFAULT_BACKOFF_CONFIG) - .connect_timeout(Some(DEFAULT_CONNECT_TIMEOUT)); + .connect_timeout(Some(config.connection.connect_timeout)) + .timeout(Some(config.connection.timeout)); if let Some(sasl) = &config.connection.sasl { builder = builder.sasl_config(sasl.config.clone().into_sasl_config()); }; diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index e00b65380a..8c60623b47 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -14,7 +14,6 @@ use std::net::SocketAddr; use std::sync::Arc; -use std::time::Duration; use api::v1::meta::cluster_server::ClusterServer; use api::v1::meta::heartbeat_server::HeartbeatServer; @@ -60,11 +59,6 @@ use crate::service::admin::admin_axum_router; use crate::utils::etcd::create_etcd_client_with_tls; use crate::{Result, error}; -/// The default keep-alive interval for gRPC. -const DEFAULT_GRPC_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(10); -/// The default keep-alive timeout for gRPC. -const DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10); - pub struct MetasrvInstance { metasrv: Arc, @@ -255,8 +249,8 @@ pub fn router(metasrv: Arc) -> Router { // for admin services .accept_http1(true) // For quick network failures detection. - .http2_keepalive_interval(Some(DEFAULT_GRPC_KEEP_ALIVE_INTERVAL)) - .http2_keepalive_timeout(Some(DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT)); + .http2_keepalive_interval(Some(metasrv.options().grpc.http2_keep_alive_interval)) + .http2_keepalive_timeout(Some(metasrv.options().grpc.http2_keep_alive_timeout)); let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone())); let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone())); let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone())); @@ -273,8 +267,12 @@ pub async fn metasrv_builder( (Some(kv_backend), _) => (kv_backend, None), (None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None), (None, BackendImpl::EtcdStore) => { - let etcd_client = - create_etcd_client_with_tls(&opts.store_addrs, opts.backend_tls.as_ref()).await?; + let etcd_client = create_etcd_client_with_tls( + &opts.store_addrs, + &opts.backend_client, + opts.backend_tls.as_ref(), + ) + .await?; let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops); let election = EtcdElection::with_etcd_client( &opts.grpc.server_addr, diff --git a/src/meta-srv/src/discovery.rs b/src/meta-srv/src/discovery.rs index 54532ec454..c65d401232 100644 --- a/src/meta-srv/src/discovery.rs +++ b/src/meta-srv/src/discovery.rs @@ -16,13 +16,9 @@ pub mod lease; pub mod node_info; pub mod utils; -use std::time::Duration; - use api::v1::meta::heartbeat_request::NodeWorkloads; use common_error::ext::BoxedError; -use common_meta::distributed_time_constants::{ - DATANODE_LEASE_SECS, FLOWNODE_LEASE_SECS, FRONTEND_HEARTBEAT_INTERVAL_MILLIS, -}; +use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::error::Result; use common_meta::peer::{Peer, PeerDiscovery, PeerResolver}; use common_meta::{DatanodeId, FlownodeId}; @@ -38,7 +34,7 @@ impl PeerDiscovery for MetaPeerClient { utils::alive_frontends( &DefaultSystemTimer, self, - Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS), + default_distributed_time_constants().frontend_heartbeat_interval, ) .await .map_err(BoxedError::new) @@ -52,7 +48,7 @@ impl PeerDiscovery for MetaPeerClient { utils::alive_datanodes( &DefaultSystemTimer, self, - Duration::from_secs(DATANODE_LEASE_SECS), + default_distributed_time_constants().datanode_lease, filter, ) .await @@ -67,7 +63,7 @@ impl PeerDiscovery for MetaPeerClient { utils::alive_flownodes( &DefaultSystemTimer, self, - Duration::from_secs(FLOWNODE_LEASE_SECS), + default_distributed_time_constants().flownode_lease, filter, ) .await diff --git a/src/meta-srv/src/discovery/lease.rs b/src/meta-srv/src/discovery/lease.rs index 7035e3bcad..b8ca8a0ebb 100644 --- a/src/meta-srv/src/discovery/lease.rs +++ b/src/meta-srv/src/discovery/lease.rs @@ -102,7 +102,7 @@ mod tests { use api::v1::meta::heartbeat_request::NodeWorkloads; use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads}; use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, Role}; - use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS; + use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::kv_backend::ResettableKvBackendRef; use common_meta::peer::{Peer, PeerDiscovery}; use common_meta::rpc::store::PutRequest; @@ -473,8 +473,10 @@ mod tests { let client = create_meta_peer_client(); let in_memory = client.memory_backend(); + let frontend_heartbeat_interval = + default_distributed_time_constants().frontend_heartbeat_interval; let last_activity_ts = - current_time_millis() - FRONTEND_HEARTBEAT_INTERVAL_MILLIS as i64 - 1000; + current_time_millis() - frontend_heartbeat_interval.as_millis() as i64 - 1000; let active_frontend_node = NodeInfo { peer: Peer { id: 0, diff --git a/src/meta-srv/src/failure_detector.rs b/src/meta-srv/src/failure_detector.rs index 5284ba2995..682af32a64 100644 --- a/src/meta-srv/src/failure_detector.rs +++ b/src/meta-srv/src/failure_detector.rs @@ -15,7 +15,6 @@ use std::collections::VecDeque; use std::time::Duration; -use common_meta::distributed_time_constants; use serde::{Deserialize, Serialize}; const FIRST_HEARTBEAT_ESTIMATE_MILLIS: i64 = 1000; @@ -79,9 +78,7 @@ impl Default for PhiAccrualFailureDetectorOptions { Self { threshold: 8_f32, min_std_deviation: Duration::from_millis(100), - acceptable_heartbeat_pause: Duration::from_secs( - distributed_time_constants::DATANODE_LEASE_SECS, - ), + acceptable_heartbeat_pause: Duration::from_secs(10), } } } diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index d0e9757742..be46a9db1b 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -134,7 +134,7 @@ mod test { use std::sync::Arc; use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat}; - use common_meta::distributed_time_constants; + use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::key::TableMetadataManager; use common_meta::key::table_route::TableRouteValue; use common_meta::key::test_utils::new_test_table_info; @@ -236,7 +236,7 @@ mod test { let opening_region_keeper = Arc::new(MemoryRegionKeeper::default()); let handler = RegionLeaseHandler::new( - distributed_time_constants::REGION_LEASE_SECS, + default_distributed_time_constants().region_lease.as_secs(), table_metadata_manager.clone(), opening_region_keeper.clone(), None, @@ -266,7 +266,7 @@ mod test { assert_eq!( acc.region_lease.as_ref().unwrap().lease_seconds, - distributed_time_constants::REGION_LEASE_SECS + default_distributed_time_constants().region_lease.as_secs() ); assert_region_lease( @@ -300,7 +300,7 @@ mod test { assert_eq!( acc.region_lease.as_ref().unwrap().lease_seconds, - distributed_time_constants::REGION_LEASE_SECS + default_distributed_time_constants().region_lease.as_secs() ); assert_region_lease( @@ -379,7 +379,7 @@ mod test { }); let handler = RegionLeaseHandler::new( - distributed_time_constants::REGION_LEASE_SECS, + default_distributed_time_constants().region_lease.as_secs(), table_metadata_manager.clone(), Default::default(), None, @@ -461,7 +461,7 @@ mod test { ..Default::default() }); let handler = RegionLeaseHandler::new( - distributed_time_constants::REGION_LEASE_SECS, + default_distributed_time_constants().region_lease.as_secs(), table_metadata_manager.clone(), Default::default(), None, diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 99f392cfd8..34f3410101 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -27,7 +27,7 @@ use common_event_recorder::EventRecorderOptions; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl_manager::DdlManagerRef; -use common_meta::distributed_time_constants; +use common_meta::distributed_time_constants::{self, default_distributed_time_constants}; use common_meta::key::TableMetadataManagerRef; use common_meta::key::runtime_switch::RuntimeSwitchManagerRef; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; @@ -121,6 +121,27 @@ impl Default for StatsPersistenceOptions { } } +#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)] +#[serde(default)] +pub struct BackendClientOptions { + #[serde(with = "humantime_serde")] + pub keep_alive_timeout: Duration, + #[serde(with = "humantime_serde")] + pub keep_alive_interval: Duration, + #[serde(with = "humantime_serde")] + pub connect_timeout: Duration, +} + +impl Default for BackendClientOptions { + fn default() -> Self { + Self { + keep_alive_interval: Duration::from_secs(10), + keep_alive_timeout: Duration::from_secs(3), + connect_timeout: Duration::from_secs(3), + } + } +} + #[derive(Clone, PartialEq, Serialize, Deserialize)] #[serde(default)] pub struct MetasrvOptions { @@ -136,12 +157,22 @@ pub struct MetasrvOptions { /// Only applicable when using PostgreSQL or MySQL as the metadata store #[serde(default)] pub backend_tls: Option, + /// The backend client options. + /// Currently, only applicable when using etcd as the metadata store. + #[serde(default)] + pub backend_client: BackendClientOptions, /// The type of selector. pub selector: SelectorType, /// Whether to use the memory store. pub use_memory_store: bool, /// Whether to enable region failover. pub enable_region_failover: bool, + /// The base heartbeat interval. + /// + /// This value is used to calculate the distributed time constants for components. + /// e.g., the region lease time is `heartbeat_interval * 3 + Duration::from_secs(1)`. + #[serde(with = "humantime_serde")] + pub heartbeat_interval: Duration, /// The delay before starting region failure detection. /// This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started. /// Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled. @@ -240,7 +271,9 @@ impl fmt::Debug for MetasrvOptions { .field("tracing", &self.tracing) .field("backend", &self.backend) .field("event_recorder", &self.event_recorder) - .field("stats_persistence", &self.stats_persistence); + .field("stats_persistence", &self.stats_persistence) + .field("heartbeat_interval", &self.heartbeat_interval) + .field("backend_client", &self.backend_client); #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] debug_struct.field("meta_table_name", &self.meta_table_name); @@ -270,6 +303,7 @@ impl Default for MetasrvOptions { selector: SelectorType::default(), use_memory_store: false, enable_region_failover: false, + heartbeat_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL, region_failure_detector_initialization_delay: Duration::from_secs(10 * 60), allow_region_failover_on_local_wal: false, grpc: GrpcOptions { @@ -307,6 +341,7 @@ impl Default for MetasrvOptions { event_recorder: EventRecorderOptions::default(), stats_persistence: StatsPersistenceOptions::default(), gc: GcSchedulerOptions::default(), + backend_client: BackendClientOptions::default(), } } } @@ -747,7 +782,7 @@ impl Metasrv { &DefaultSystemTimer, self.meta_peer_client.as_ref(), peer_id, - Duration::from_secs(distributed_time_constants::DATANODE_LEASE_SECS), + default_distributed_time_constants().datanode_lease, ) .await } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index cbefb79cfa..fc93886aed 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -29,7 +29,7 @@ use common_meta::ddl::{ DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef, }; use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef}; -use common_meta::distributed_time_constants::{self}; +use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::key::TableMetadataManager; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::flow::flow_state::FlowStateManager; @@ -513,7 +513,7 @@ impl MetasrvBuilder { Some(handler_group_builder) => handler_group_builder, None => { let region_lease_handler = RegionLeaseHandler::new( - distributed_time_constants::REGION_LEASE_SECS, + default_distributed_time_constants().region_lease.as_secs(), table_metadata_manager.clone(), memory_region_keeper.clone(), customized_region_lease_renewer, diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 3613fd0894..6d27cbf4e7 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -921,7 +921,7 @@ mod tests { use std::assert_matches::assert_matches; use std::sync::Arc; - use common_meta::distributed_time_constants::REGION_LEASE_SECS; + use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::instruction::Instruction; use common_meta::key::test_utils::new_test_table_info; use common_meta::rpc::router::{Region, RegionRoute}; @@ -1192,8 +1192,10 @@ mod tests { .run_once() .await; + let region_lease = default_distributed_time_constants().region_lease.as_secs(); + // Ensure it didn't run into the slow path. - assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2); + assert!(timer.elapsed().as_secs() < region_lease / 2); runner.suite.verify_table_metadata().await; } @@ -1539,8 +1541,9 @@ mod tests { .run_once() .await; + let region_lease = default_distributed_time_constants().region_lease.as_secs(); // Ensure it didn't run into the slow path. - assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS); + assert!(timer.elapsed().as_secs() < region_lease); runner.suite.verify_table_metadata().await; } } diff --git a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs index c20c7fede2..e6bb861288 100644 --- a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs @@ -13,11 +13,10 @@ // limitations under the License. use std::any::Any; -use std::time::Duration; use api::v1::meta::MailboxMessage; use common_meta::RegionIdent; -use common_meta::distributed_time_constants::REGION_LEASE_SECS; +use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::{info, warn}; @@ -30,9 +29,6 @@ use crate::procedure::region_migration::migration_end::RegionMigrationEnd; use crate::procedure::region_migration::{Context, State}; use crate::service::mailbox::Channel; -/// Uses lease time of a region as the timeout of closing a downgraded region. -const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS); - #[derive(Debug, Serialize, Deserialize)] pub struct CloseDowngradedRegion; @@ -112,7 +108,7 @@ impl CloseDowngradedRegion { let ch = Channel::Datanode(downgrade_leader_datanode.id); let receiver = ctx .mailbox - .send(&ch, msg, CLOSE_DOWNGRADED_REGION_TIMEOUT) + .send(&ch, msg, default_distributed_time_constants().region_lease) .await?; match receiver.await { diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index d10220098f..2a9cb2b187 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -17,7 +17,7 @@ use std::time::Duration; use api::v1::meta::MailboxMessage; use common_error::ext::BoxedError; -use common_meta::distributed_time_constants::REGION_LEASE_SECS; +use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::instruction::{ DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply, }; @@ -64,7 +64,7 @@ impl State for DowngradeLeaderRegion { let now = Instant::now(); // Ensures the `leader_region_lease_deadline` must exist after recovering. ctx.volatile_ctx - .set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS)); + .set_leader_region_lease_deadline(default_distributed_time_constants().region_lease); match self.downgrade_region_with_retry(ctx).await { Ok(_) => { @@ -277,14 +277,14 @@ impl DowngradeLeaderRegion { if let Some(last_connection_at) = last_connection_at { let now = current_time_millis(); let elapsed = now - last_connection_at; - let region_lease = Duration::from_secs(REGION_LEASE_SECS); + let region_lease = default_distributed_time_constants().region_lease; // It's safe to update the region leader lease deadline here because: // 1. The old region leader has already been marked as downgraded in metadata, // which means any attempts to renew its lease will be rejected. // 2. The pusher disconnect time record only gets removed when the datanode (from_peer) // establishes a new heartbeat connection stream. - if elapsed >= (REGION_LEASE_SECS * 1000) as i64 { + if elapsed >= (region_lease.as_secs() * 1000) as i64 { ctx.volatile_ctx.reset_leader_region_lease_deadline(); info!( "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {:?}", @@ -697,7 +697,8 @@ mod tests { let procedure_ctx = new_procedure_context(); let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let elapsed = timer.elapsed().as_secs(); - assert!(elapsed < REGION_LEASE_SECS / 2); + let region_lease = default_distributed_time_constants().region_lease.as_secs(); + assert!(elapsed < region_lease / 2); assert_eq!( ctx.volatile_ctx .leader_region_last_entry_ids diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 67e1bfb857..8a0b625b97 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -14,11 +14,10 @@ use std::any::Any; use std::ops::Div; -use std::time::Duration; use api::v1::meta::MailboxMessage; use common_meta::RegionIdent; -use common_meta::distributed_time_constants::REGION_LEASE_SECS; +use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; use common_meta::key::datanode_table::RegionInfo; use common_procedure::{Context as ProcedureContext, Status}; @@ -33,9 +32,6 @@ use crate::procedure::region_migration::flush_leader_region::PreFlushRegion; use crate::procedure::region_migration::{Context, State}; use crate::service::mailbox::Channel; -/// Uses lease time of a region as the timeout of opening a candidate region. -const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS); - #[derive(Debug, Serialize, Deserialize)] pub struct OpenCandidateRegion; @@ -157,7 +153,9 @@ impl OpenCandidateRegion { .context(error::ExceededDeadlineSnafu { operation: "Open candidate region", })?; - let operation_timeout = operation_timeout.div(2).max(OPEN_CANDIDATE_REGION_TIMEOUT); + let operation_timeout = operation_timeout + .div(2) + .max(default_distributed_time_constants().region_lease); let ch = Channel::Datanode(candidate.id); let now = Instant::now(); let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?; diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index 046318def9..20c9069428 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -99,6 +99,7 @@ impl heartbeat_server::Heartbeat for Metasrv { error!("Client disconnected: broken pipe"); break; } + error!(err; "Sending heartbeat response error"); if tx.send(Err(err)).await.is_err() { info!("ReceiverStream was dropped; shutting down"); diff --git a/src/meta-srv/src/utils/etcd.rs b/src/meta-srv/src/utils/etcd.rs index 508db7c148..adfe12204d 100644 --- a/src/meta-srv/src/utils/etcd.rs +++ b/src/meta-srv/src/utils/etcd.rs @@ -12,17 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_meta::distributed_time_constants::default_etcd_client_options; use common_meta::kv_backend::etcd::create_etcd_tls_options; -use etcd_client::Client; +use etcd_client::{Client, ConnectOptions}; use servers::tls::{TlsMode, TlsOption}; use snafu::ResultExt; use crate::error::{self, BuildTlsOptionsSnafu, Result}; +use crate::metasrv::BackendClientOptions; /// Creates an etcd client with TLS configuration. pub async fn create_etcd_client_with_tls( store_addrs: &[String], + client_options: &BackendClientOptions, tls_config: Option<&TlsOption>, ) -> Result { let etcd_endpoints = store_addrs @@ -31,7 +32,12 @@ pub async fn create_etcd_client_with_tls( .filter(|x| !x.is_empty()) .collect::>(); - let mut connect_options = default_etcd_client_options(); + let mut connect_options = ConnectOptions::new() + .with_keep_alive_while_idle(true) + .with_keep_alive( + client_options.keep_alive_interval, + client_options.keep_alive_timeout, + ); if let Some(tls_config) = tls_config && let Some(tls_options) = create_etcd_tls_options(&convert_tls_option(tls_config)) .context(BuildTlsOptionsSnafu)? diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index 1c479a04de..ff27e80c69 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -81,6 +81,12 @@ pub struct GrpcOptions { /// Default to `None`, means infinite. #[serde(with = "humantime_serde")] pub max_connection_age: Option, + /// The HTTP/2 keep-alive interval. + #[serde(with = "humantime_serde")] + pub http2_keep_alive_interval: Duration, + /// The HTTP/2 keep-alive timeout. + #[serde(with = "humantime_serde")] + pub http2_keep_alive_timeout: Duration, } impl GrpcOptions { @@ -144,6 +150,8 @@ impl Default for GrpcOptions { runtime_size: 8, tls: TlsOption::default(), max_connection_age: None, + http2_keep_alive_interval: Duration::from_secs(10), + http2_keep_alive_timeout: Duration::from_secs(3), } } } @@ -164,6 +172,8 @@ impl GrpcOptions { runtime_size: 8, tls: TlsOption::default(), max_connection_age: None, + http2_keep_alive_interval: Duration::from_secs(10), + http2_keep_alive_timeout: Duration::from_secs(3), } } diff --git a/src/servers/src/heartbeat_options.rs b/src/servers/src/heartbeat_options.rs index bb7d34e18c..9812625260 100644 --- a/src/servers/src/heartbeat_options.rs +++ b/src/servers/src/heartbeat_options.rs @@ -34,12 +34,10 @@ impl HeartbeatOptions { pub fn frontend_default() -> Self { Self { // Frontend can send heartbeat with a longer interval. - interval: Duration::from_millis( - distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS, - ), - retry_interval: Duration::from_millis( - distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS, + interval: distributed_time_constants::frontend_heartbeat_interval( + distributed_time_constants::BASE_HEARTBEAT_INTERVAL, ), + retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL, } } } @@ -47,10 +45,8 @@ impl HeartbeatOptions { impl Default for HeartbeatOptions { fn default() -> Self { Self { - interval: Duration::from_millis(distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS), - retry_interval: Duration::from_millis( - distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS, - ), + interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL, + retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL, } } } diff --git a/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs b/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs index edfadf4f4f..8502103adb 100644 --- a/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs +++ b/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use std::time::Duration; use arbitrary::{Arbitrary, Unstructured}; -use common_meta::distributed_time_constants; +use common_meta::distributed_time_constants::default_distributed_time_constants; use common_telemetry::info; use libfuzzer_sys::fuzz_target; use rand::{Rng, SeedableRng}; @@ -254,10 +254,7 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> { recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?; wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await; - tokio::time::sleep(Duration::from_secs( - distributed_time_constants::REGION_LEASE_SECS, - )) - .await; + tokio::time::sleep(default_distributed_time_constants().region_lease).await; // Validates value rows info!("Validates num of rows"); diff --git a/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs b/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs index d7c1aa831b..77d51dc09d 100644 --- a/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs +++ b/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use std::time::Duration; use arbitrary::{Arbitrary, Unstructured}; -use common_meta::distributed_time_constants; +use common_meta::distributed_time_constants::default_distributed_time_constants; use common_telemetry::info; use common_time::util::current_time_millis; use futures::future::try_join_all; @@ -322,10 +322,7 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> { recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?; wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await; - tokio::time::sleep(Duration::from_secs( - distributed_time_constants::REGION_LEASE_SECS, - )) - .await; + tokio::time::sleep(default_distributed_time_constants().region_lease).await; // Validates value rows info!("Validates num of rows"); for (table_ctx, expected_rows) in table_ctxs.iter().zip(affected_rows) { diff --git a/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs b/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs index dc2e47fcc7..d727eefa30 100644 --- a/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs +++ b/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use std::time::Duration; use arbitrary::{Arbitrary, Unstructured}; -use common_meta::distributed_time_constants; +use common_meta::distributed_time_constants::default_distributed_time_constants; use common_telemetry::info; use libfuzzer_sys::fuzz_target; use rand::{Rng, SeedableRng}; @@ -275,10 +275,7 @@ async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result< wait_for_migration(ctx, migration, &procedure_id).await; } - tokio::time::sleep(Duration::from_secs( - distributed_time_constants::REGION_LEASE_SECS, - )) - .await; + tokio::time::sleep(default_distributed_time_constants().region_lease).await; Ok(()) } diff --git a/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs index 6629151006..c8ebbb54af 100644 --- a/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs +++ b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use std::time::Duration; use arbitrary::{Arbitrary, Unstructured}; -use common_meta::distributed_time_constants; +use common_meta::distributed_time_constants::default_distributed_time_constants; use common_telemetry::info; use libfuzzer_sys::fuzz_target; use rand::{Rng, SeedableRng}; @@ -274,10 +274,7 @@ async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result< .await; } - tokio::time::sleep(Duration::from_secs( - distributed_time_constants::REGION_LEASE_SECS, - )) - .await; + tokio::time::sleep(default_distributed_time_constants().region_lease).await; Ok(()) } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 902373ebb8..5c400776fd 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1397,6 +1397,8 @@ max_recv_message_size = "512MiB" max_send_message_size = "512MiB" flight_compression = "arrow_ipc" runtime_size = 8 +http2_keep_alive_interval = "10s" +http2_keep_alive_timeout = "3s" [grpc.tls] mode = "disable"