feat: make distributed time constants and client timeouts configurable (#7429)

* chore: update rskafka

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: make distributed time constants and client timeouts configurable

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: change etcd endpoints to array in the test scripts (#7419)

chore: change etcd endpoint

Signed-off-by: liyang <daviderli614@gmail.com>

* fix: fix tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
Signed-off-by: liyang <daviderli614@gmail.com>
Co-authored-by: liyang <daviderli614@gmail.com>
This commit is contained in:
Weny Xu
2025-12-17 19:40:16 +08:00
committed by GitHub
parent 4d33b9687a
commit c112cdf241
34 changed files with 273 additions and 250 deletions

View File

@@ -20,7 +20,7 @@ use common_meta::kv_backend::chroot::ChrootKvBackend;
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::kv_backend::KvBackendRef;
use meta_srv::bootstrap::create_etcd_client;
use meta_srv::metasrv::BackendImpl;
use meta_srv::metasrv::{BackendClientOptions, BackendImpl};
use crate::error::{EmptyStoreAddrsSnafu, UnsupportedMemoryBackendSnafu};
@@ -67,9 +67,10 @@ impl StoreConfig {
} else {
let kvbackend = match self.backend {
BackendImpl::EtcdStore => {
let etcd_client = create_etcd_client(store_addrs)
.await
.map_err(BoxedError::new)?;
let etcd_client =
create_etcd_client(store_addrs, &BackendClientOptions::default())
.await
.map_err(BoxedError::new)?;
Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops))
}
#[cfg(feature = "pg_kvbackend")]

View File

@@ -20,6 +20,7 @@ use async_trait::async_trait;
use clap::Parser;
use common_base::Plugins;
use common_config::Configurable;
use common_meta::distributed_time_constants::init_distributed_time_constants;
use common_telemetry::info;
use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
use common_version::{short_version, verbose_version};
@@ -327,6 +328,7 @@ impl StartCommand {
log_versions(verbose_version(), short_version(), APP_NAME);
maybe_activate_heap_profile(&opts.component.memory);
create_resource_limit_metrics(APP_NAME);
init_distributed_time_constants(opts.component.heartbeat_interval);
info!("Metasrv start command: {:#?}", self);

View File

@@ -12,27 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::OnceLock;
use std::time::Duration;
use etcd_client::ConnectOptions;
/// Heartbeat interval time (is the basic unit of various time).
pub const HEARTBEAT_INTERVAL_MILLIS: u64 = 3000;
/// The frontend will also send heartbeats to Metasrv, sending an empty
/// heartbeat every HEARTBEAT_INTERVAL_MILLIS * 6 seconds.
pub const FRONTEND_HEARTBEAT_INTERVAL_MILLIS: u64 = HEARTBEAT_INTERVAL_MILLIS * 6;
/// The lease seconds of a region. It's set by 3 heartbeat intervals
/// (HEARTBEAT_INTERVAL_MILLIS × 3), plus some extra buffer (1 second).
pub const REGION_LEASE_SECS: u64 =
Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS * 3).as_secs() + 1;
/// When creating table or region failover, a target node needs to be selected.
/// If the node's lease has expired, the `Selector` will not select it.
pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS;
pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS;
pub const BASE_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(3);
/// The lease seconds of metasrv leader.
pub const META_LEASE_SECS: u64 = 5;
@@ -54,16 +37,62 @@ pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS: Duration =
pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS: Duration =
Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1);
/// The default options for the etcd client.
pub fn default_etcd_client_options() -> ConnectOptions {
ConnectOptions::new()
.with_keep_alive_while_idle(true)
.with_keep_alive(
Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1),
Duration::from_secs(10),
)
.with_connect_timeout(Duration::from_secs(10))
}
/// The default mailbox round-trip timeout.
pub const MAILBOX_RTT_SECS: u64 = 1;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
/// The distributed time constants.
pub struct DistributedTimeConstants {
pub heartbeat_interval: Duration,
pub frontend_heartbeat_interval: Duration,
pub region_lease: Duration,
pub datanode_lease: Duration,
pub flownode_lease: Duration,
}
/// The frontend heartbeat interval is 6 times of the base heartbeat interval.
pub fn frontend_heartbeat_interval(base_heartbeat_interval: Duration) -> Duration {
base_heartbeat_interval * 6
}
impl DistributedTimeConstants {
/// Create a new DistributedTimeConstants from the heartbeat interval.
pub fn from_heartbeat_interval(heartbeat_interval: Duration) -> Self {
let region_lease = heartbeat_interval * 3 + Duration::from_secs(1);
let datanode_lease = region_lease;
let flownode_lease = datanode_lease;
Self {
heartbeat_interval,
frontend_heartbeat_interval: frontend_heartbeat_interval(heartbeat_interval),
region_lease,
datanode_lease,
flownode_lease,
}
}
}
impl Default for DistributedTimeConstants {
fn default() -> Self {
Self::from_heartbeat_interval(BASE_HEARTBEAT_INTERVAL)
}
}
static DEFAULT_DISTRIBUTED_TIME_CONSTANTS: OnceLock<DistributedTimeConstants> = OnceLock::new();
/// Get the default distributed time constants.
pub fn default_distributed_time_constants() -> &'static DistributedTimeConstants {
DEFAULT_DISTRIBUTED_TIME_CONSTANTS.get_or_init(Default::default)
}
/// Initialize the default distributed time constants.
pub fn init_distributed_time_constants(base_heartbeat_interval: Duration) {
let distributed_time_constants =
DistributedTimeConstants::from_heartbeat_interval(base_heartbeat_interval);
DEFAULT_DISTRIBUTED_TIME_CONSTANTS
.set(distributed_time_constants)
.expect("Failed to set default distributed time constants");
common_telemetry::info!(
"Initialized default distributed time constants: {:#?}",
distributed_time_constants
);
}

View File

@@ -14,8 +14,7 @@
use common_telemetry::{debug, error, info};
use common_wal::config::kafka::common::{
KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT,
DEFAULT_KEEP_ALIVE_CONFIG,
KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_BACKOFF_CONFIG,
};
use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
@@ -211,8 +210,8 @@ pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result<Cl
// Builds an kafka controller client for creating topics.
let mut builder = ClientBuilder::new(connection.broker_endpoints.clone())
.backoff_config(DEFAULT_BACKOFF_CONFIG)
.connect_timeout(Some(DEFAULT_CONNECT_TIMEOUT))
.keepalive_config(DEFAULT_KEEP_ALIVE_CONFIG);
.connect_timeout(Some(connection.connect_timeout))
.timeout(Some(connection.timeout));
if let Some(sasl) = &connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};

View File

@@ -189,6 +189,8 @@ mod tests {
client_cert_path: None,
client_key_path: None,
}),
connect_timeout: Duration::from_secs(3),
timeout: Duration::from_secs(3),
},
kafka_topic: KafkaTopicConfig {
num_topics: 32,
@@ -221,6 +223,8 @@ mod tests {
client_cert_path: None,
client_key_path: None,
}),
connect_timeout: Duration::from_secs(3),
timeout: Duration::from_secs(3),
},
max_batch_bytes: ReadableSize::mb(1),
consumer_wait_timeout: Duration::from_millis(100),

View File

@@ -16,7 +16,7 @@ use std::io::Cursor;
use std::sync::Arc;
use std::time::Duration;
use rskafka::client::{Credentials, KeepaliveConfig, SaslConfig};
use rskafka::client::{Credentials, SaslConfig};
use rskafka::BackoffConfig;
use rustls::{ClientConfig, RootCertStore};
use serde::{Deserialize, Serialize};
@@ -35,16 +35,6 @@ pub const DEFAULT_BACKOFF_CONFIG: BackoffConfig = BackoffConfig {
deadline: Some(Duration::from_secs(3)),
};
/// The default keep-alive config for kafka client.
pub const DEFAULT_KEEP_ALIVE_CONFIG: KeepaliveConfig = KeepaliveConfig {
time: Some(Duration::from_secs(10)),
interval: Some(Duration::from_secs(7)),
retries: Some(3),
};
/// The default connect timeout for kafka client.
pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
/// Default interval for auto WAL pruning.
pub const DEFAULT_AUTO_PRUNE_INTERVAL: Duration = Duration::ZERO;
/// Default limit for concurrent auto pruning tasks.
@@ -171,6 +161,12 @@ pub struct KafkaConnectionConfig {
pub sasl: Option<KafkaClientSasl>,
/// Client TLS config
pub tls: Option<KafkaClientTls>,
/// The connect timeout for kafka client.
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,
/// The timeout for kafka client.
#[serde(with = "humantime_serde")]
pub timeout: Duration,
}
impl Default for KafkaConnectionConfig {
@@ -179,6 +175,8 @@ impl Default for KafkaConnectionConfig {
broker_endpoints: vec![BROKER_ENDPOINT.to_string()],
sasl: None,
tls: None,
connect_timeout: Duration::from_secs(3),
timeout: Duration::from_secs(3),
}
}
}

View File

@@ -15,9 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use common_wal::config::kafka::common::{
DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT, DEFAULT_KEEP_ALIVE_CONFIG,
};
use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG;
use common_wal::config::kafka::DatanodeKafkaConfig;
use dashmap::DashMap;
use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling};
@@ -80,8 +78,8 @@ impl ClientManager {
// Sets backoff config for the top-level kafka client and all clients constructed by it.
let mut builder = ClientBuilder::new(config.connection.broker_endpoints.clone())
.backoff_config(DEFAULT_BACKOFF_CONFIG)
.connect_timeout(Some(DEFAULT_CONNECT_TIMEOUT))
.keepalive_config(DEFAULT_KEEP_ALIVE_CONFIG);
.connect_timeout(Some(config.connection.connect_timeout))
.timeout(Some(config.connection.timeout));
if let Some(sasl) = &config.connection.sasl {
builder = builder.sasl_config(sasl.config.clone().into_sasl_config());
};

View File

@@ -14,7 +14,6 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::cluster_server::ClusterServer;
use api::v1::meta::heartbeat_server::HeartbeatServer;
@@ -23,7 +22,6 @@ use api::v1::meta::store_server::StoreServer;
use common_base::Plugins;
use common_config::Configurable;
use common_error::ext::BoxedError;
use common_meta::distributed_time_constants::default_etcd_client_options;
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
use common_meta::distributed_time_constants::META_LEASE_SECS;
use common_meta::kv_backend::chroot::ChrootKvBackend;
@@ -42,7 +40,7 @@ use common_telemetry::info;
#[cfg(feature = "pg_kvbackend")]
use deadpool_postgres::{Config, Runtime};
use either::Either;
use etcd_client::Client;
use etcd_client::{Client, ConnectOptions};
use servers::configurator::ConfiguratorRef;
use servers::export_metrics::ExportMetricsTask;
use servers::http::{HttpServer, HttpServerBuilder};
@@ -72,7 +70,9 @@ use crate::election::rds::postgres::PgElection;
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
use crate::election::CANDIDATE_LEASE_SECS;
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef};
use crate::metasrv::{
BackendClientOptions, BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef,
};
use crate::node_excluder::NodeExcluderRef;
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::load_based::LoadBasedSelector;
@@ -83,11 +83,6 @@ use crate::service::admin;
use crate::service::admin::admin_axum_router;
use crate::{error, Result};
/// The default keep-alive interval for gRPC.
const DEFAULT_GRPC_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(10);
/// The default keep-alive timeout for gRPC.
const DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10);
pub struct MetasrvInstance {
metasrv: Arc<Metasrv>,
@@ -281,8 +276,8 @@ pub fn router(metasrv: Arc<Metasrv>) -> Router {
// for admin services
.accept_http1(true)
// For quick network failures detection.
.http2_keepalive_interval(Some(DEFAULT_GRPC_KEEP_ALIVE_INTERVAL))
.http2_keepalive_timeout(Some(DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT));
.http2_keepalive_interval(Some(metasrv.options().grpc.http2_keep_alive_interval))
.http2_keepalive_timeout(Some(metasrv.options().grpc.http2_keep_alive_timeout));
let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone()));
let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone()));
@@ -299,7 +294,7 @@ pub async fn metasrv_builder(
(Some(kv_backend), _) => (kv_backend, None),
(None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None),
(None, BackendImpl::EtcdStore) => {
let etcd_client = create_etcd_client(&opts.store_addrs).await?;
let etcd_client = create_etcd_client(&opts.store_addrs, &opts.backend_client).await?;
let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops);
let election = EtcdElection::with_etcd_client(
&opts.grpc.server_addr,
@@ -447,13 +442,19 @@ pub async fn metasrv_builder(
.plugins(plugins))
}
pub async fn create_etcd_client(store_addrs: &[String]) -> Result<Client> {
pub async fn create_etcd_client(
store_addrs: &[String],
options: &BackendClientOptions,
) -> Result<Client> {
let etcd_endpoints = store_addrs
.iter()
.map(|x| x.trim())
.filter(|x| !x.is_empty())
.collect::<Vec<_>>();
let options = default_etcd_client_options();
let options = ConnectOptions::new()
.with_keep_alive_while_idle(true)
.with_keep_alive(options.keep_alive_interval, options.keep_alive_timeout)
.with_connect_timeout(options.connect_timeout);
Client::connect(&etcd_endpoints, Some(options))
.await
.context(error::ConnectEtcdSnafu)

View File

@@ -15,7 +15,6 @@
use std::collections::VecDeque;
use std::time::Duration;
use common_meta::distributed_time_constants;
use serde::{Deserialize, Serialize};
/// This is our port of Akka's "[PhiAccrualFailureDetector](https://github.com/akka/akka/blob/v2.6.21/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala)"
@@ -83,9 +82,7 @@ impl Default for PhiAccrualFailureDetectorOptions {
Self {
threshold: 8_f32,
min_std_deviation: Duration::from_millis(100),
acceptable_heartbeat_pause: Duration::from_secs(
distributed_time_constants::DATANODE_LEASE_SECS,
),
acceptable_heartbeat_pause: Duration::from_secs(10),
first_heartbeat_estimate: Duration::from_millis(1000),
}
}

View File

@@ -124,7 +124,7 @@ mod test {
use std::sync::Arc;
use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
use common_meta::distributed_time_constants;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::key::TableMetadataManager;
@@ -223,7 +223,7 @@ mod test {
let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
let handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
default_distributed_time_constants().region_lease.as_secs() as u64,
table_metadata_manager.clone(),
opening_region_keeper.clone(),
None,
@@ -253,7 +253,7 @@ mod test {
assert_eq!(
acc.region_lease.as_ref().unwrap().lease_seconds,
distributed_time_constants::REGION_LEASE_SECS
default_distributed_time_constants().region_lease.as_secs()
);
assert_region_lease(
@@ -287,7 +287,7 @@ mod test {
assert_eq!(
acc.region_lease.as_ref().unwrap().lease_seconds,
distributed_time_constants::REGION_LEASE_SECS
default_distributed_time_constants().region_lease.as_secs()
);
assert_region_lease(
@@ -366,7 +366,7 @@ mod test {
});
let handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
default_distributed_time_constants().region_lease.as_secs(),
table_metadata_manager.clone(),
Default::default(),
None,

View File

@@ -21,7 +21,7 @@ use std::task::{Context, Poll};
use api::v1::meta::heartbeat_request::NodeWorkloads;
use common_error::ext::BoxedError;
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role as ClusterRole};
use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef};
use common_meta::peer::{Peer, PeerLookupService};
use common_meta::rpc::store::RangeRequest;
@@ -312,7 +312,9 @@ impl PeerLookupService for MetaPeerLookupService {
lookup_frontends(
&self.meta_peer_client,
// TODO(zyy17): How to get the heartbeat interval of the frontend if it uses a custom heartbeat interval?
FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
default_distributed_time_constants()
.frontend_heartbeat_interval
.as_secs(),
)
.await
.map_err(BoxedError::new)

View File

@@ -27,7 +27,7 @@ use common_event_recorder::EventRecorderOptions;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl_manager::DdlManagerRef;
use common_meta::distributed_time_constants;
use common_meta::distributed_time_constants::{self, default_distributed_time_constants};
use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
@@ -98,6 +98,27 @@ pub enum BackendImpl {
MysqlStore,
}
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
#[serde(default)]
pub struct BackendClientOptions {
#[serde(with = "humantime_serde")]
pub keep_alive_timeout: Duration,
#[serde(with = "humantime_serde")]
pub keep_alive_interval: Duration,
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,
}
impl Default for BackendClientOptions {
fn default() -> Self {
Self {
keep_alive_interval: Duration::from_secs(10),
keep_alive_timeout: Duration::from_secs(3),
connect_timeout: Duration::from_secs(3),
}
}
}
#[derive(Clone, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct MetasrvOptions {
@@ -113,12 +134,22 @@ pub struct MetasrvOptions {
/// Only applicable when using PostgreSQL or MySQL as the metadata store
#[serde(default)]
pub backend_tls: Option<TlsOption>,
/// The backend client options.
/// Currently, only applicable when using etcd as the metadata store.
#[serde(default)]
pub backend_client: BackendClientOptions,
/// The type of selector.
pub selector: SelectorType,
/// Whether to use the memory store.
pub use_memory_store: bool,
/// Whether to enable region failover.
pub enable_region_failover: bool,
/// The base heartbeat interval.
///
/// This value is used to calculate the distributed time constants for components.
/// e.g., the region lease time is `heartbeat_interval * 3 + Duration::from_secs(1)`.
#[serde(with = "humantime_serde")]
pub heartbeat_interval: Duration,
/// The delay before starting region failure detection.
/// This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.
/// Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled.
@@ -211,7 +242,9 @@ impl fmt::Debug for MetasrvOptions {
.field("max_txn_ops", &self.max_txn_ops)
.field("flush_stats_factor", &self.flush_stats_factor)
.field("tracing", &self.tracing)
.field("backend", &self.backend);
.field("backend", &self.backend)
.field("heartbeat_interval", &self.heartbeat_interval)
.field("backend_client", &self.backend_client);
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
debug_struct.field("meta_table_name", &self.meta_table_name);
@@ -239,6 +272,7 @@ impl Default for MetasrvOptions {
selector: SelectorType::default(),
use_memory_store: false,
enable_region_failover: false,
heartbeat_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
allow_region_failover_on_local_wal: false,
grpc: GrpcOptions {
@@ -273,6 +307,7 @@ impl Default for MetasrvOptions {
meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
node_max_idle_time: Duration::from_secs(24 * 60 * 60),
event_recorder: EventRecorderOptions::default(),
backend_client: BackendClientOptions::default(),
}
}
}
@@ -660,7 +695,9 @@ impl Metasrv {
lookup_datanode_peer(
peer_id,
&self.meta_peer_client,
distributed_time_constants::DATANODE_LEASE_SECS,
default_distributed_time_constants()
.datanode_lease
.as_secs(),
)
.await
}

View File

@@ -27,7 +27,7 @@ use common_meta::ddl::{
DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
};
use common_meta::ddl_manager::DdlManager;
use common_meta::distributed_time_constants;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::key::flow::flow_state::FlowStateManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
@@ -220,8 +220,12 @@ impl MetasrvBuilder {
let selector_ctx = SelectorContext {
server_addr: options.grpc.server_addr.clone(),
datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS,
datanode_lease_secs: default_distributed_time_constants()
.datanode_lease
.as_secs(),
flownode_lease_secs: default_distributed_time_constants()
.flownode_lease
.as_secs(),
kv_backend: kv_backend.clone(),
meta_peer_client: meta_peer_client.clone(),
table_id: None,
@@ -438,7 +442,7 @@ impl MetasrvBuilder {
Some(handler_group_builder) => handler_group_builder,
None => {
let region_lease_handler = RegionLeaseHandler::new(
distributed_time_constants::REGION_LEASE_SECS,
default_distributed_time_constants().region_lease.as_secs(),
table_metadata_manager.clone(),
memory_region_keeper.clone(),
customized_region_lease_renewer,

View File

@@ -770,7 +770,7 @@ mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::instruction::Instruction;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::rpc::router::{Region, RegionRoute};
@@ -1004,8 +1004,10 @@ mod tests {
.run_once()
.await;
let region_lease = default_distributed_time_constants().region_lease.as_secs();
// Ensure it didn't run into the slow path.
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
assert!(timer.elapsed().as_secs() < region_lease / 2);
runner.suite.verify_table_metadata().await;
}
@@ -1059,8 +1061,9 @@ mod tests {
.run_once()
.await;
let region_lease = default_distributed_time_constants().region_lease.as_secs();
// Ensure it didn't run into the slow path.
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
assert!(timer.elapsed().as_secs() < region_lease / 2);
runner.suite.verify_table_metadata().await;
}
@@ -1380,8 +1383,9 @@ mod tests {
.run_once()
.await;
let region_lease = default_distributed_time_constants().region_lease.as_secs();
// Ensure it didn't run into the slow path.
assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS);
assert!(timer.elapsed().as_secs() < region_lease);
runner.suite.verify_table_metadata().await;
}
}

View File

@@ -13,10 +13,9 @@
// limitations under the License.
use std::any::Any;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::datanode_table::RegionInfo;
use common_meta::RegionIdent;
@@ -31,9 +30,6 @@ use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
/// Uses lease time of a region as the timeout of closing a downgraded region.
const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
#[derive(Debug, Serialize, Deserialize)]
pub struct CloseDowngradedRegion;
@@ -111,7 +107,7 @@ impl CloseDowngradedRegion {
let ch = Channel::Datanode(downgrade_leader_datanode.id);
let receiver = ctx
.mailbox
.send(&ch, msg, CLOSE_DOWNGRADED_REGION_TIMEOUT)
.send(&ch, msg, default_distributed_time_constants().region_lease)
.await?;
match receiver.await {

View File

@@ -17,7 +17,7 @@ use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_error::ext::BoxedError;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::instruction::{
DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
};
@@ -64,7 +64,7 @@ impl State for DowngradeLeaderRegion {
let now = Instant::now();
// Ensures the `leader_region_lease_deadline` must exist after recovering.
ctx.volatile_ctx
.set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
.set_leader_region_lease_deadline(default_distributed_time_constants().region_lease);
match self.downgrade_region_with_retry(ctx).await {
Ok(_) => {
@@ -250,14 +250,14 @@ impl DowngradeLeaderRegion {
if let Some(last_connection_at) = last_connection_at {
let now = current_time_millis();
let elapsed = now - last_connection_at;
let region_lease = Duration::from_secs(REGION_LEASE_SECS);
let region_lease = default_distributed_time_constants().region_lease;
// It's safe to update the region leader lease deadline here because:
// 1. The old region leader has already been marked as downgraded in metadata,
// which means any attempts to renew its lease will be rejected.
// 2. The pusher disconnect time record only gets removed when the datanode (from_peer)
// establishes a new heartbeat connection stream.
if elapsed >= (REGION_LEASE_SECS * 1000) as i64 {
if elapsed >= (region_lease.as_secs() * 1000) as i64 {
ctx.volatile_ctx.reset_leader_region_lease_deadline();
info!(
"Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {}",
@@ -663,7 +663,8 @@ mod tests {
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let elapsed = timer.elapsed().as_secs();
assert!(elapsed < REGION_LEASE_SECS / 2);
let region_lease = default_distributed_time_constants().region_lease.as_secs();
assert!(elapsed < region_lease / 2);
assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());

View File

@@ -13,10 +13,9 @@
// limitations under the License.
use std::any::Any;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::key::datanode_table::RegionInfo;
use common_meta::RegionIdent;
@@ -32,9 +31,6 @@ use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
use crate::procedure::region_migration::{Context, State};
use crate::service::mailbox::Channel;
/// Uses lease time of a region as the timeout of opening a candidate region.
const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
#[derive(Debug, Serialize, Deserialize)]
pub struct OpenCandidateRegion;
@@ -143,7 +139,7 @@ impl OpenCandidateRegion {
let now = Instant::now();
let receiver = ctx
.mailbox
.send(&ch, msg, OPEN_CANDIDATE_REGION_TIMEOUT)
.send(&ch, msg, default_distributed_time_constants().region_lease)
.await?;
match receiver.await {

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use common_meta::distributed_time_constants::{FLOWNODE_LEASE_SECS, REGION_LEASE_SECS};
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use rand::prelude::SliceRandom;
@@ -36,8 +36,10 @@ pub fn new_test_selector_context() -> SelectorContext {
SelectorContext {
server_addr: "127.0.0.1:3002".to_string(),
datanode_lease_secs: REGION_LEASE_SECS,
flownode_lease_secs: FLOWNODE_LEASE_SECS,
datanode_lease_secs: default_distributed_time_constants().region_lease.as_secs(),
flownode_lease_secs: default_distributed_time_constants()
.flownode_lease
.as_secs(),
kv_backend,
meta_peer_client,
table_id: None,

View File

@@ -98,6 +98,7 @@ impl heartbeat_server::Heartbeat for Metasrv {
break;
}
}
error!(err; "Sending heartbeat response error");
if tx.send(Err(err)).await.is_err() {
info!("ReceiverStream was dropped; shutting down");

View File

@@ -23,6 +23,7 @@ pub mod prom_query_gateway;
pub mod region_server;
use std::net::SocketAddr;
use std::time::Duration;
use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
use api::v1::{HealthCheckRequest, HealthCheckResponse};
@@ -72,6 +73,12 @@ pub struct GrpcOptions {
pub runtime_size: usize,
#[serde(default = "Default::default")]
pub tls: TlsOption,
/// The HTTP/2 keep-alive interval.
#[serde(with = "humantime_serde")]
pub http2_keep_alive_interval: Duration,
/// The HTTP/2 keep-alive timeout.
#[serde(with = "humantime_serde")]
pub http2_keep_alive_timeout: Duration,
}
impl GrpcOptions {
@@ -129,6 +136,8 @@ impl Default for GrpcOptions {
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8,
tls: TlsOption::default(),
http2_keep_alive_interval: Duration::from_secs(10),
http2_keep_alive_timeout: Duration::from_secs(3),
}
}
}

View File

@@ -34,12 +34,10 @@ impl HeartbeatOptions {
pub fn frontend_default() -> Self {
Self {
// Frontend can send heartbeat with a longer interval.
interval: Duration::from_millis(
distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
),
retry_interval: Duration::from_millis(
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
interval: distributed_time_constants::frontend_heartbeat_interval(
distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
),
retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
}
}
}
@@ -47,10 +45,8 @@ impl HeartbeatOptions {
impl Default for HeartbeatOptions {
fn default() -> Self {
Self {
interval: Duration::from_millis(distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS),
retry_interval: Duration::from_millis(
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
),
interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
}
}
}