diff --git a/.github/actions/setup-greptimedb-cluster/action.yml b/.github/actions/setup-greptimedb-cluster/action.yml index 8a19a192bf..759d16d80b 100644 --- a/.github/actions/setup-greptimedb-cluster/action.yml +++ b/.github/actions/setup-greptimedb-cluster/action.yml @@ -51,7 +51,7 @@ runs: run: | helm upgrade \ --install my-greptimedb \ - --set meta.backendStorage.etcd.endpoints=${{ inputs.etcd-endpoints }} \ + --set 'meta.backendStorage.etcd.endpoints[0]=${{ inputs.etcd-endpoints }}' \ --set meta.enableRegionFailover=${{ inputs.enable-region-failover }} \ --set image.registry=${{ inputs.image-registry }} \ --set image.repository=${{ inputs.image-repository }} \ diff --git a/.github/scripts/deploy-greptimedb.sh b/.github/scripts/deploy-greptimedb.sh index bba7c83a07..642db9d074 100755 --- a/.github/scripts/deploy-greptimedb.sh +++ b/.github/scripts/deploy-greptimedb.sh @@ -68,7 +68,7 @@ function deploy_greptimedb_cluster() { helm install "$cluster_name" greptime/greptimedb-cluster \ --set image.tag="$GREPTIMEDB_IMAGE_TAG" \ - --set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \ + --set meta.backendStorage.etcd.endpoints[0]="etcd.$install_namespace:2379" \ -n "$install_namespace" # Wait for greptimedb cluster to be ready. @@ -103,14 +103,13 @@ function deploy_greptimedb_cluster_with_s3_storage() { helm install "$cluster_name" greptime/greptimedb-cluster -n "$install_namespace" \ --set image.tag="$GREPTIMEDB_IMAGE_TAG" \ - --set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \ + --set meta.backendStorage.etcd.endpoints[0]="etcd.$install_namespace:2379" \ --set storage.s3.bucket="$AWS_CI_TEST_BUCKET" \ --set storage.s3.region="$AWS_REGION" \ --set storage.s3.root="$DATA_ROOT" \ --set storage.credentials.secretName=s3-credentials \ --set storage.credentials.accessKeyId="$AWS_ACCESS_KEY_ID" \ --set storage.credentials.secretAccessKey="$AWS_SECRET_ACCESS_KEY" - # Wait for greptimedb cluster to be ready. while true; do PHASE=$(kubectl -n "$install_namespace" get gtc "$cluster_name" -o jsonpath='{.status.clusterPhase}') diff --git a/Cargo.lock b/Cargo.lock index 4a21ce09ff..5d3acdf6d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4245,7 +4245,7 @@ checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc" dependencies = [ "cfg-if", "libc", - "socket2 0.5.7", + "socket2", "windows-sys 0.48.0", ] @@ -5592,7 +5592,7 @@ checksum = "a56f203cd1c76362b69e3863fd987520ac36cf70a8c92627449b2f64a8cf7d65" dependencies = [ "cfg-if", "libc", - "windows-link 0.1.1", + "windows-link", ] [[package]] @@ -5724,7 +5724,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.7", + "socket2", "tokio", "tower-service", "tracing", @@ -5830,7 +5830,7 @@ dependencies = [ "http-body 1.0.1", "hyper 1.4.1", "pin-project-lite", - "socket2 0.5.7", + "socket2", "tokio", "tower-service", "tracing", @@ -7759,7 +7759,7 @@ dependencies = [ "percent-encoding", "rustls", "rustls-pemfile", - "socket2 0.5.7", + "socket2", "twox-hash 2.1.0", "url", "webpki", @@ -7825,7 +7825,7 @@ dependencies = [ "rustls-pemfile", "serde", "serde_json", - "socket2 0.5.7", + "socket2", "thiserror 2.0.12", "tokio", "tokio-rustls", @@ -10221,7 +10221,7 @@ dependencies = [ "quinn-udp", "rustc-hash 2.0.0", "rustls", - "socket2 0.5.7", + "socket2", "thiserror 1.0.64", "tokio", "tracing", @@ -10252,7 +10252,7 @@ checksum = "4fe68c2e9e1a1234e218683dbdf9f9dfcb094113c5ac2b938dfcb9bab4c4140b" dependencies = [ "libc", "once_cell", - "socket2 0.5.7", + "socket2", "tracing", "windows-sys 0.59.0", ] @@ -10849,7 +10849,7 @@ dependencies = [ [[package]] name = "rskafka" version = "0.6.0" -source = "git+https://github.com/WenyXu/rskafka.git?rev=40150bad95fddd39a772dc132ee9e92f8f91fe38#40150bad95fddd39a772dc132ee9e92f8f91fe38" +source = "git+https://github.com/WenyXu/rskafka.git?rev=9494304ae3947b07e660b5d08549ad4a39c84a26#9494304ae3947b07e660b5d08549ad4a39c84a26" dependencies = [ "bytes", "chrono", @@ -10863,7 +10863,6 @@ dependencies = [ "rsasl", "rustls", "snap", - "socket2 0.6.1", "thiserror 2.0.12", "tokio", "tokio-rustls", @@ -11614,7 +11613,7 @@ dependencies = [ "simd-json", "snafu 0.8.5", "snap", - "socket2 0.5.7", + "socket2", "sql", "store-api", "strum 0.27.1", @@ -11935,16 +11934,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "socket2" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17129e116933cf371d018bb80ae557e889637989d8638274fb25622827b03881" -dependencies = [ - "libc", - "windows-sys 0.60.2", -] - [[package]] name = "spade" version = "2.12.1" @@ -13334,7 +13323,7 @@ dependencies = [ "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.7", + "socket2", "tokio-macros", "tracing", "windows-sys 0.52.0", @@ -13406,7 +13395,7 @@ dependencies = [ "postgres-protocol", "postgres-types", "rand 0.8.5", - "socket2 0.5.7", + "socket2", "tokio", "tokio-util", "whoami", @@ -13582,7 +13571,7 @@ dependencies = [ "pin-project", "prost 0.13.5", "rustls-pemfile", - "socket2 0.5.7", + "socket2", "tokio", "tokio-rustls", "tokio-stream", @@ -14722,12 +14711,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38" -[[package]] -name = "windows-link" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" - [[package]] name = "windows-registry" version = "0.2.0" @@ -14794,15 +14777,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "windows-sys" -version = "0.60.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" -dependencies = [ - "windows-targets 0.53.5", -] - [[package]] name = "windows-targets" version = "0.48.5" @@ -14827,30 +14801,13 @@ dependencies = [ "windows_aarch64_gnullvm 0.52.6", "windows_aarch64_msvc 0.52.6", "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm 0.52.6", + "windows_i686_gnullvm", "windows_i686_msvc 0.52.6", "windows_x86_64_gnu 0.52.6", "windows_x86_64_gnullvm 0.52.6", "windows_x86_64_msvc 0.52.6", ] -[[package]] -name = "windows-targets" -version = "0.53.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" -dependencies = [ - "windows-link 0.2.1", - "windows_aarch64_gnullvm 0.53.1", - "windows_aarch64_msvc 0.53.1", - "windows_i686_gnu 0.53.1", - "windows_i686_gnullvm 0.53.1", - "windows_i686_msvc 0.53.1", - "windows_x86_64_gnu 0.53.1", - "windows_x86_64_gnullvm 0.53.1", - "windows_x86_64_msvc 0.53.1", -] - [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -14863,12 +14820,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" - [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -14881,12 +14832,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" -[[package]] -name = "windows_aarch64_msvc" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" - [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -14899,24 +14844,12 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" -[[package]] -name = "windows_i686_gnu" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" - [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" -[[package]] -name = "windows_i686_gnullvm" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" - [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -14929,12 +14862,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" -[[package]] -name = "windows_i686_msvc" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" - [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -14947,12 +14874,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" -[[package]] -name = "windows_x86_64_gnu" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" - [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -14965,12 +14886,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" - [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -14983,12 +14898,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" -[[package]] -name = "windows_x86_64_msvc" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" - [[package]] name = "winnow" version = "0.5.40" diff --git a/Cargo.toml b/Cargo.toml index 0065db9487..7837c8be00 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -188,8 +188,8 @@ reqwest = { version = "0.12", default-features = false, features = [ "stream", "multipart", ] } -# Branch: feat/keepalive-port -rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "40150bad95fddd39a772dc132ee9e92f8f91fe38", features = [ +# Branch: feat/request-timeout-port +rskafka = { git = "https://github.com/WenyXu/rskafka.git", rev = "9494304ae3947b07e660b5d08549ad4a39c84a26", features = [ "transport-tls", ] } rstest = "0.25" diff --git a/config/config.md b/config/config.md index 95d669fff0..87b3cd12b0 100644 --- a/config/config.md +++ b/config/config.md @@ -78,6 +78,8 @@ | `wal.sync_period` | String | `10s` | Duration for fsyncing log files.
**It's only used when the provider is `raft_engine`**. | | `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. | | `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.
**It's only used when the provider is `kafka`**. | +| `wal.connect_timeout` | String | `3s` | The connect timeout for kafka client.
**It's only used when the provider is `kafka`**. | +| `wal.timeout` | String | `3s` | The timeout for kafka client.
**It's only used when the provider is `kafka`**. | | `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.
Set to `true` to automatically create topics for WAL.
Otherwise, use topics named `topic_name_prefix_[0..num_topics)` | | `wal.num_topics` | Integer | `64` | Number of topics.
**It's only used when the provider is `kafka`**. | | `wal.selector_type` | String | `round_robin` | Topic selector type.
Available selector types:
- `round_robin` (default)
**It's only used when the provider is `kafka`**. | @@ -333,6 +335,7 @@ | `region_failure_detector_initialization_delay` | String | `10m` | The delay before starting region failure detection.
This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started.
Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled. | | `allow_region_failover_on_local_wal` | Bool | `false` | Whether to allow region failover on local WAL.
**This option is not recommended to be set to true, because it may lead to data loss during failover.** | | `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. | +| `heartbeat_interval` | String | `3s` | Base heartbeat interval for calculating distributed time constants.
The frontend heartbeat interval is 6 times of the base heartbeat interval.
The flownode/datanode heartbeat interval is 1 times of the base heartbeat interval.
e.g., If the base heartbeat interval is 3s, the frontend heartbeat interval is 18s, the flownode/datanode heartbeat interval is 3s.
If you change this value, you need to change the heartbeat interval of the flownode/frontend/datanode accordingly. | | `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. | | `runtime` | -- | -- | The runtime options. | | `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. | @@ -343,12 +346,18 @@ | `backend_tls.key_path` | String | `""` | Path to client private key file (for client authentication)
Like "/path/to/client.key" | | `backend_tls.ca_cert_path` | String | `""` | Path to CA certificate file (for server certificate verification)
Required when using custom CAs or self-signed certificates
Leave empty to use system root certificates only
Like "/path/to/ca.crt" | | `backend_tls.watch` | Bool | `false` | Watch for certificate file changes and auto reload | +| `backend_client` | -- | -- | The backend client options.
Currently, only applicable when using etcd as the metadata store. | +| `backend_client.keep_alive_timeout` | String | `3s` | The keep alive timeout for backend client. | +| `backend_client.keep_alive_interval` | String | `10s` | The keep alive interval for backend client. | +| `backend_client.connect_timeout` | String | `3s` | The connect timeout for backend client. | | `grpc` | -- | -- | The gRPC server options. | | `grpc.bind_addr` | String | `127.0.0.1:3002` | The address to bind the gRPC server. | | `grpc.server_addr` | String | `127.0.0.1:3002` | The communication server address for the frontend and datanode to connect to metasrv.
If left empty or unset, the server will automatically use the IP address of the first network interface
on the host, with the same port number as the one specified in `bind_addr`. | | `grpc.runtime_size` | Integer | `8` | The number of server worker threads. | | `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. | | `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. | +| `grpc.http2_keep_alive_interval` | String | `10s` | The server side HTTP/2 keep-alive interval | +| `grpc.http2_keep_alive_timeout` | String | `3s` | The server side HTTP/2 keep-alive timeout. | | `http` | -- | -- | The HTTP server options. | | `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. | | `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. | @@ -459,6 +468,8 @@ | `wal.sync_period` | String | `10s` | Duration for fsyncing log files.
**It's only used when the provider is `raft_engine`**. | | `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. | | `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.
**It's only used when the provider is `kafka`**. | +| `wal.connect_timeout` | String | `3s` | The connect timeout for kafka client.
**It's only used when the provider is `kafka`**. | +| `wal.timeout` | String | `3s` | The timeout for kafka client.
**It's only used when the provider is `kafka`**. | | `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.
Warning: Kafka has a default limit of 1MB per message in a topic.
**It's only used when the provider is `kafka`**. | | `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.
**It's only used when the provider is `kafka`**. | | `wal.create_index` | Bool | `true` | Whether to enable WAL index creation.
**It's only used when the provider is `kafka`**. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 7e14deaa2e..14846c5b73 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -161,6 +161,14 @@ recovery_parallelism = 2 ## **It's only used when the provider is `kafka`**. broker_endpoints = ["127.0.0.1:9092"] +## The connect timeout for kafka client. +## **It's only used when the provider is `kafka`**. +#+ connect_timeout = "3s" + +## The timeout for kafka client. +## **It's only used when the provider is `kafka`**. +#+ timeout = "3s" + ## The max size of a single producer batch. ## Warning: Kafka has a default limit of 1MB per message in a topic. ## **It's only used when the provider is `kafka`**. diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 10f07c138d..ec9491b85b 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -55,6 +55,13 @@ allow_region_failover_on_local_wal = false ## Max allowed idle time before removing node info from metasrv memory. node_max_idle_time = "24hours" +## Base heartbeat interval for calculating distributed time constants. +## The frontend heartbeat interval is 6 times of the base heartbeat interval. +## The flownode/datanode heartbeat interval is 1 times of the base heartbeat interval. +## e.g., If the base heartbeat interval is 3s, the frontend heartbeat interval is 18s, the flownode/datanode heartbeat interval is 3s. +## If you change this value, you need to change the heartbeat interval of the flownode/frontend/datanode accordingly. +#+ heartbeat_interval = "3s" + ## Whether to enable greptimedb telemetry. Enabled by default. #+ enable_telemetry = true @@ -93,6 +100,16 @@ ca_cert_path = "" ## Watch for certificate file changes and auto reload watch = false +## The backend client options. +## Currently, only applicable when using etcd as the metadata store. +#+ [backend_client] +## The keep alive timeout for backend client. +#+ keep_alive_timeout = "3s" +## The keep alive interval for backend client. +#+ keep_alive_interval = "10s" +## The connect timeout for backend client. +#+ connect_timeout = "3s" + ## The gRPC server options. [grpc] ## The address to bind the gRPC server. @@ -107,6 +124,10 @@ runtime_size = 8 max_recv_message_size = "512MB" ## The maximum send message size for gRPC server. max_send_message_size = "512MB" +## The server side HTTP/2 keep-alive interval +#+ http2_keep_alive_interval = "10s" +## The server side HTTP/2 keep-alive timeout. +#+ http2_keep_alive_timeout = "3s" ## The HTTP server options. [http] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index a315ca75d2..00b92df790 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -209,6 +209,14 @@ recovery_parallelism = 2 ## **It's only used when the provider is `kafka`**. broker_endpoints = ["127.0.0.1:9092"] +## The connect timeout for kafka client. +## **It's only used when the provider is `kafka`**. +#+ connect_timeout = "3s" + +## The timeout for kafka client. +## **It's only used when the provider is `kafka`**. +#+ timeout = "3s" + ## Automatically create topics for WAL. ## Set to `true` to automatically create topics for WAL. ## Otherwise, use topics named `topic_name_prefix_[0..num_topics)` diff --git a/src/cli/src/metadata/common.rs b/src/cli/src/metadata/common.rs index 2455aa400c..598b53b8ff 100644 --- a/src/cli/src/metadata/common.rs +++ b/src/cli/src/metadata/common.rs @@ -20,7 +20,7 @@ use common_meta::kv_backend::chroot::ChrootKvBackend; use common_meta::kv_backend::etcd::EtcdStore; use common_meta::kv_backend::KvBackendRef; use meta_srv::bootstrap::create_etcd_client; -use meta_srv::metasrv::BackendImpl; +use meta_srv::metasrv::{BackendClientOptions, BackendImpl}; use crate::error::{EmptyStoreAddrsSnafu, UnsupportedMemoryBackendSnafu}; @@ -67,9 +67,10 @@ impl StoreConfig { } else { let kvbackend = match self.backend { BackendImpl::EtcdStore => { - let etcd_client = create_etcd_client(store_addrs) - .await - .map_err(BoxedError::new)?; + let etcd_client = + create_etcd_client(store_addrs, &BackendClientOptions::default()) + .await + .map_err(BoxedError::new)?; Ok(EtcdStore::with_etcd_client(etcd_client, max_txn_ops)) } #[cfg(feature = "pg_kvbackend")] diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 8e59559bf1..9c6c38f07c 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use clap::Parser; use common_base::Plugins; use common_config::Configurable; +use common_meta::distributed_time_constants::init_distributed_time_constants; use common_telemetry::info; use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR}; use common_version::{short_version, verbose_version}; @@ -327,6 +328,7 @@ impl StartCommand { log_versions(verbose_version(), short_version(), APP_NAME); maybe_activate_heap_profile(&opts.component.memory); create_resource_limit_metrics(APP_NAME); + init_distributed_time_constants(opts.component.heartbeat_interval); info!("Metasrv start command: {:#?}", self); diff --git a/src/common/meta/src/distributed_time_constants.rs b/src/common/meta/src/distributed_time_constants.rs index 4e6e0095ed..729f2dabde 100644 --- a/src/common/meta/src/distributed_time_constants.rs +++ b/src/common/meta/src/distributed_time_constants.rs @@ -12,27 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::OnceLock; use std::time::Duration; -use etcd_client::ConnectOptions; - -/// Heartbeat interval time (is the basic unit of various time). -pub const HEARTBEAT_INTERVAL_MILLIS: u64 = 3000; - -/// The frontend will also send heartbeats to Metasrv, sending an empty -/// heartbeat every HEARTBEAT_INTERVAL_MILLIS * 6 seconds. -pub const FRONTEND_HEARTBEAT_INTERVAL_MILLIS: u64 = HEARTBEAT_INTERVAL_MILLIS * 6; - -/// The lease seconds of a region. It's set by 3 heartbeat intervals -/// (HEARTBEAT_INTERVAL_MILLIS × 3), plus some extra buffer (1 second). -pub const REGION_LEASE_SECS: u64 = - Duration::from_millis(HEARTBEAT_INTERVAL_MILLIS * 3).as_secs() + 1; - -/// When creating table or region failover, a target node needs to be selected. -/// If the node's lease has expired, the `Selector` will not select it. -pub const DATANODE_LEASE_SECS: u64 = REGION_LEASE_SECS; - -pub const FLOWNODE_LEASE_SECS: u64 = DATANODE_LEASE_SECS; +pub const BASE_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(3); /// The lease seconds of metasrv leader. pub const META_LEASE_SECS: u64 = 5; @@ -54,16 +37,62 @@ pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_INTERVAL_SECS: Duration = pub const HEARTBEAT_CHANNEL_KEEP_ALIVE_TIMEOUT_SECS: Duration = Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1); -/// The default options for the etcd client. -pub fn default_etcd_client_options() -> ConnectOptions { - ConnectOptions::new() - .with_keep_alive_while_idle(true) - .with_keep_alive( - Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS + 1), - Duration::from_secs(10), - ) - .with_connect_timeout(Duration::from_secs(10)) -} - /// The default mailbox round-trip timeout. pub const MAILBOX_RTT_SECS: u64 = 1; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +/// The distributed time constants. +pub struct DistributedTimeConstants { + pub heartbeat_interval: Duration, + pub frontend_heartbeat_interval: Duration, + pub region_lease: Duration, + pub datanode_lease: Duration, + pub flownode_lease: Duration, +} + +/// The frontend heartbeat interval is 6 times of the base heartbeat interval. +pub fn frontend_heartbeat_interval(base_heartbeat_interval: Duration) -> Duration { + base_heartbeat_interval * 6 +} + +impl DistributedTimeConstants { + /// Create a new DistributedTimeConstants from the heartbeat interval. + pub fn from_heartbeat_interval(heartbeat_interval: Duration) -> Self { + let region_lease = heartbeat_interval * 3 + Duration::from_secs(1); + let datanode_lease = region_lease; + let flownode_lease = datanode_lease; + Self { + heartbeat_interval, + frontend_heartbeat_interval: frontend_heartbeat_interval(heartbeat_interval), + region_lease, + datanode_lease, + flownode_lease, + } + } +} + +impl Default for DistributedTimeConstants { + fn default() -> Self { + Self::from_heartbeat_interval(BASE_HEARTBEAT_INTERVAL) + } +} + +static DEFAULT_DISTRIBUTED_TIME_CONSTANTS: OnceLock = OnceLock::new(); + +/// Get the default distributed time constants. +pub fn default_distributed_time_constants() -> &'static DistributedTimeConstants { + DEFAULT_DISTRIBUTED_TIME_CONSTANTS.get_or_init(Default::default) +} + +/// Initialize the default distributed time constants. +pub fn init_distributed_time_constants(base_heartbeat_interval: Duration) { + let distributed_time_constants = + DistributedTimeConstants::from_heartbeat_interval(base_heartbeat_interval); + DEFAULT_DISTRIBUTED_TIME_CONSTANTS + .set(distributed_time_constants) + .expect("Failed to set default distributed time constants"); + common_telemetry::info!( + "Initialized default distributed time constants: {:#?}", + distributed_time_constants + ); +} diff --git a/src/common/meta/src/wal_options_allocator/topic_creator.rs b/src/common/meta/src/wal_options_allocator/topic_creator.rs index d756c9770f..642011efad 100644 --- a/src/common/meta/src/wal_options_allocator/topic_creator.rs +++ b/src/common/meta/src/wal_options_allocator/topic_creator.rs @@ -14,8 +14,7 @@ use common_telemetry::{debug, error, info}; use common_wal::config::kafka::common::{ - KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT, - DEFAULT_KEEP_ALIVE_CONFIG, + KafkaConnectionConfig, KafkaTopicConfig, DEFAULT_BACKOFF_CONFIG, }; use rskafka::client::error::Error as RsKafkaError; use rskafka::client::error::ProtocolError::TopicAlreadyExists; @@ -211,8 +210,8 @@ pub async fn build_kafka_client(connection: &KafkaConnectionConfig) -> Result, /// Client TLS config pub tls: Option, + /// The connect timeout for kafka client. + #[serde(with = "humantime_serde")] + pub connect_timeout: Duration, + /// The timeout for kafka client. + #[serde(with = "humantime_serde")] + pub timeout: Duration, } impl Default for KafkaConnectionConfig { @@ -179,6 +175,8 @@ impl Default for KafkaConnectionConfig { broker_endpoints: vec![BROKER_ENDPOINT.to_string()], sasl: None, tls: None, + connect_timeout: Duration::from_secs(3), + timeout: Duration::from_secs(3), } } } diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 0984f4d921..c7471bc401 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -15,9 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; -use common_wal::config::kafka::common::{ - DEFAULT_BACKOFF_CONFIG, DEFAULT_CONNECT_TIMEOUT, DEFAULT_KEEP_ALIVE_CONFIG, -}; +use common_wal::config::kafka::common::DEFAULT_BACKOFF_CONFIG; use common_wal::config::kafka::DatanodeKafkaConfig; use dashmap::DashMap; use rskafka::client::partition::{Compression, PartitionClient, UnknownTopicHandling}; @@ -80,8 +78,8 @@ impl ClientManager { // Sets backoff config for the top-level kafka client and all clients constructed by it. let mut builder = ClientBuilder::new(config.connection.broker_endpoints.clone()) .backoff_config(DEFAULT_BACKOFF_CONFIG) - .connect_timeout(Some(DEFAULT_CONNECT_TIMEOUT)) - .keepalive_config(DEFAULT_KEEP_ALIVE_CONFIG); + .connect_timeout(Some(config.connection.connect_timeout)) + .timeout(Some(config.connection.timeout)); if let Some(sasl) = &config.connection.sasl { builder = builder.sasl_config(sasl.config.clone().into_sasl_config()); }; diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index d1e86e8732..138afcce3c 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -14,7 +14,6 @@ use std::net::SocketAddr; use std::sync::Arc; -use std::time::Duration; use api::v1::meta::cluster_server::ClusterServer; use api::v1::meta::heartbeat_server::HeartbeatServer; @@ -23,7 +22,6 @@ use api::v1::meta::store_server::StoreServer; use common_base::Plugins; use common_config::Configurable; use common_error::ext::BoxedError; -use common_meta::distributed_time_constants::default_etcd_client_options; #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] use common_meta::distributed_time_constants::META_LEASE_SECS; use common_meta::kv_backend::chroot::ChrootKvBackend; @@ -42,7 +40,7 @@ use common_telemetry::info; #[cfg(feature = "pg_kvbackend")] use deadpool_postgres::{Config, Runtime}; use either::Either; -use etcd_client::Client; +use etcd_client::{Client, ConnectOptions}; use servers::configurator::ConfiguratorRef; use servers::export_metrics::ExportMetricsTask; use servers::http::{HttpServer, HttpServerBuilder}; @@ -72,7 +70,9 @@ use crate::election::rds::postgres::PgElection; #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] use crate::election::CANDIDATE_LEASE_SECS; use crate::metasrv::builder::MetasrvBuilder; -use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef}; +use crate::metasrv::{ + BackendClientOptions, BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef, +}; use crate::node_excluder::NodeExcluderRef; use crate::selector::lease_based::LeaseBasedSelector; use crate::selector::load_based::LoadBasedSelector; @@ -83,11 +83,6 @@ use crate::service::admin; use crate::service::admin::admin_axum_router; use crate::{error, Result}; -/// The default keep-alive interval for gRPC. -const DEFAULT_GRPC_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(10); -/// The default keep-alive timeout for gRPC. -const DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10); - pub struct MetasrvInstance { metasrv: Arc, @@ -281,8 +276,8 @@ pub fn router(metasrv: Arc) -> Router { // for admin services .accept_http1(true) // For quick network failures detection. - .http2_keepalive_interval(Some(DEFAULT_GRPC_KEEP_ALIVE_INTERVAL)) - .http2_keepalive_timeout(Some(DEFAULT_GRPC_KEEP_ALIVE_TIMEOUT)); + .http2_keepalive_interval(Some(metasrv.options().grpc.http2_keep_alive_interval)) + .http2_keepalive_timeout(Some(metasrv.options().grpc.http2_keep_alive_timeout)); let router = add_compressed_service!(router, HeartbeatServer::from_arc(metasrv.clone())); let router = add_compressed_service!(router, StoreServer::from_arc(metasrv.clone())); let router = add_compressed_service!(router, ClusterServer::from_arc(metasrv.clone())); @@ -299,7 +294,7 @@ pub async fn metasrv_builder( (Some(kv_backend), _) => (kv_backend, None), (None, BackendImpl::MemoryStore) => (Arc::new(MemoryKvBackend::new()) as _, None), (None, BackendImpl::EtcdStore) => { - let etcd_client = create_etcd_client(&opts.store_addrs).await?; + let etcd_client = create_etcd_client(&opts.store_addrs, &opts.backend_client).await?; let kv_backend = EtcdStore::with_etcd_client(etcd_client.clone(), opts.max_txn_ops); let election = EtcdElection::with_etcd_client( &opts.grpc.server_addr, @@ -447,13 +442,19 @@ pub async fn metasrv_builder( .plugins(plugins)) } -pub async fn create_etcd_client(store_addrs: &[String]) -> Result { +pub async fn create_etcd_client( + store_addrs: &[String], + options: &BackendClientOptions, +) -> Result { let etcd_endpoints = store_addrs .iter() .map(|x| x.trim()) .filter(|x| !x.is_empty()) .collect::>(); - let options = default_etcd_client_options(); + let options = ConnectOptions::new() + .with_keep_alive_while_idle(true) + .with_keep_alive(options.keep_alive_interval, options.keep_alive_timeout) + .with_connect_timeout(options.connect_timeout); Client::connect(&etcd_endpoints, Some(options)) .await .context(error::ConnectEtcdSnafu) diff --git a/src/meta-srv/src/failure_detector.rs b/src/meta-srv/src/failure_detector.rs index ac0df6c2dc..02d2436b25 100644 --- a/src/meta-srv/src/failure_detector.rs +++ b/src/meta-srv/src/failure_detector.rs @@ -15,7 +15,6 @@ use std::collections::VecDeque; use std::time::Duration; -use common_meta::distributed_time_constants; use serde::{Deserialize, Serialize}; /// This is our port of Akka's "[PhiAccrualFailureDetector](https://github.com/akka/akka/blob/v2.6.21/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala)" @@ -83,9 +82,7 @@ impl Default for PhiAccrualFailureDetectorOptions { Self { threshold: 8_f32, min_std_deviation: Duration::from_millis(100), - acceptable_heartbeat_pause: Duration::from_secs( - distributed_time_constants::DATANODE_LEASE_SECS, - ), + acceptable_heartbeat_pause: Duration::from_secs(10), first_heartbeat_estimate: Duration::from_millis(1000), } } diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 311b101103..c3cfd36957 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -124,7 +124,7 @@ mod test { use std::sync::Arc; use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat}; - use common_meta::distributed_time_constants; + use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::key::table_route::TableRouteValue; use common_meta::key::test_utils::new_test_table_info; use common_meta::key::TableMetadataManager; @@ -223,7 +223,7 @@ mod test { let opening_region_keeper = Arc::new(MemoryRegionKeeper::default()); let handler = RegionLeaseHandler::new( - distributed_time_constants::REGION_LEASE_SECS, + default_distributed_time_constants().region_lease.as_secs() as u64, table_metadata_manager.clone(), opening_region_keeper.clone(), None, @@ -253,7 +253,7 @@ mod test { assert_eq!( acc.region_lease.as_ref().unwrap().lease_seconds, - distributed_time_constants::REGION_LEASE_SECS + default_distributed_time_constants().region_lease.as_secs() ); assert_region_lease( @@ -287,7 +287,7 @@ mod test { assert_eq!( acc.region_lease.as_ref().unwrap().lease_seconds, - distributed_time_constants::REGION_LEASE_SECS + default_distributed_time_constants().region_lease.as_secs() ); assert_region_lease( @@ -366,7 +366,7 @@ mod test { }); let handler = RegionLeaseHandler::new( - distributed_time_constants::REGION_LEASE_SECS, + default_distributed_time_constants().region_lease.as_secs(), table_metadata_manager.clone(), Default::default(), None, diff --git a/src/meta-srv/src/lease.rs b/src/meta-srv/src/lease.rs index ec6244450c..34cf612af0 100644 --- a/src/meta-srv/src/lease.rs +++ b/src/meta-srv/src/lease.rs @@ -21,7 +21,7 @@ use std::task::{Context, Poll}; use api::v1::meta::heartbeat_request::NodeWorkloads; use common_error::ext::BoxedError; use common_meta::cluster::{NodeInfo, NodeInfoKey, Role as ClusterRole}; -use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS; +use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef}; use common_meta::peer::{Peer, PeerLookupService}; use common_meta::rpc::store::RangeRequest; @@ -312,7 +312,9 @@ impl PeerLookupService for MetaPeerLookupService { lookup_frontends( &self.meta_peer_client, // TODO(zyy17): How to get the heartbeat interval of the frontend if it uses a custom heartbeat interval? - FRONTEND_HEARTBEAT_INTERVAL_MILLIS, + default_distributed_time_constants() + .frontend_heartbeat_interval + .as_secs(), ) .await .map_err(BoxedError::new) diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 185b4ee8ba..595c26dfa8 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -27,7 +27,7 @@ use common_event_recorder::EventRecorderOptions; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl_manager::DdlManagerRef; -use common_meta::distributed_time_constants; +use common_meta::distributed_time_constants::{self, default_distributed_time_constants}; use common_meta::key::runtime_switch::RuntimeSwitchManagerRef; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; @@ -98,6 +98,27 @@ pub enum BackendImpl { MysqlStore, } +#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)] +#[serde(default)] +pub struct BackendClientOptions { + #[serde(with = "humantime_serde")] + pub keep_alive_timeout: Duration, + #[serde(with = "humantime_serde")] + pub keep_alive_interval: Duration, + #[serde(with = "humantime_serde")] + pub connect_timeout: Duration, +} + +impl Default for BackendClientOptions { + fn default() -> Self { + Self { + keep_alive_interval: Duration::from_secs(10), + keep_alive_timeout: Duration::from_secs(3), + connect_timeout: Duration::from_secs(3), + } + } +} + #[derive(Clone, PartialEq, Serialize, Deserialize)] #[serde(default)] pub struct MetasrvOptions { @@ -113,12 +134,22 @@ pub struct MetasrvOptions { /// Only applicable when using PostgreSQL or MySQL as the metadata store #[serde(default)] pub backend_tls: Option, + /// The backend client options. + /// Currently, only applicable when using etcd as the metadata store. + #[serde(default)] + pub backend_client: BackendClientOptions, /// The type of selector. pub selector: SelectorType, /// Whether to use the memory store. pub use_memory_store: bool, /// Whether to enable region failover. pub enable_region_failover: bool, + /// The base heartbeat interval. + /// + /// This value is used to calculate the distributed time constants for components. + /// e.g., the region lease time is `heartbeat_interval * 3 + Duration::from_secs(1)`. + #[serde(with = "humantime_serde")] + pub heartbeat_interval: Duration, /// The delay before starting region failure detection. /// This delay helps prevent Metasrv from triggering unnecessary region failovers before all Datanodes are fully started. /// Especially useful when the cluster is not deployed with GreptimeDB Operator and maintenance mode is not enabled. @@ -211,7 +242,9 @@ impl fmt::Debug for MetasrvOptions { .field("max_txn_ops", &self.max_txn_ops) .field("flush_stats_factor", &self.flush_stats_factor) .field("tracing", &self.tracing) - .field("backend", &self.backend); + .field("backend", &self.backend) + .field("heartbeat_interval", &self.heartbeat_interval) + .field("backend_client", &self.backend_client); #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] debug_struct.field("meta_table_name", &self.meta_table_name); @@ -239,6 +272,7 @@ impl Default for MetasrvOptions { selector: SelectorType::default(), use_memory_store: false, enable_region_failover: false, + heartbeat_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL, region_failure_detector_initialization_delay: Duration::from_secs(10 * 60), allow_region_failover_on_local_wal: false, grpc: GrpcOptions { @@ -273,6 +307,7 @@ impl Default for MetasrvOptions { meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID, node_max_idle_time: Duration::from_secs(24 * 60 * 60), event_recorder: EventRecorderOptions::default(), + backend_client: BackendClientOptions::default(), } } } @@ -660,7 +695,9 @@ impl Metasrv { lookup_datanode_peer( peer_id, &self.meta_peer_client, - distributed_time_constants::DATANODE_LEASE_SECS, + default_distributed_time_constants() + .datanode_lease + .as_secs(), ) .await } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 3da28f4dab..b59aa3c7dc 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -27,7 +27,7 @@ use common_meta::ddl::{ DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef, }; use common_meta::ddl_manager::DdlManager; -use common_meta::distributed_time_constants; +use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::key::flow::flow_state::FlowStateManager; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef}; @@ -220,8 +220,12 @@ impl MetasrvBuilder { let selector_ctx = SelectorContext { server_addr: options.grpc.server_addr.clone(), - datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS, - flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS, + datanode_lease_secs: default_distributed_time_constants() + .datanode_lease + .as_secs(), + flownode_lease_secs: default_distributed_time_constants() + .flownode_lease + .as_secs(), kv_backend: kv_backend.clone(), meta_peer_client: meta_peer_client.clone(), table_id: None, @@ -438,7 +442,7 @@ impl MetasrvBuilder { Some(handler_group_builder) => handler_group_builder, None => { let region_lease_handler = RegionLeaseHandler::new( - distributed_time_constants::REGION_LEASE_SECS, + default_distributed_time_constants().region_lease.as_secs(), table_metadata_manager.clone(), memory_region_keeper.clone(), customized_region_lease_renewer, diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index ac52bc8280..01d2f029b1 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -770,7 +770,7 @@ mod tests { use std::assert_matches::assert_matches; use std::sync::Arc; - use common_meta::distributed_time_constants::REGION_LEASE_SECS; + use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::instruction::Instruction; use common_meta::key::test_utils::new_test_table_info; use common_meta::rpc::router::{Region, RegionRoute}; @@ -1004,8 +1004,10 @@ mod tests { .run_once() .await; + let region_lease = default_distributed_time_constants().region_lease.as_secs(); + // Ensure it didn't run into the slow path. - assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2); + assert!(timer.elapsed().as_secs() < region_lease / 2); runner.suite.verify_table_metadata().await; } @@ -1059,8 +1061,9 @@ mod tests { .run_once() .await; + let region_lease = default_distributed_time_constants().region_lease.as_secs(); // Ensure it didn't run into the slow path. - assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2); + assert!(timer.elapsed().as_secs() < region_lease / 2); runner.suite.verify_table_metadata().await; } @@ -1380,8 +1383,9 @@ mod tests { .run_once() .await; + let region_lease = default_distributed_time_constants().region_lease.as_secs(); // Ensure it didn't run into the slow path. - assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS); + assert!(timer.elapsed().as_secs() < region_lease); runner.suite.verify_table_metadata().await; } } diff --git a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs index d049366552..bd788fe401 100644 --- a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs @@ -13,10 +13,9 @@ // limitations under the License. use std::any::Any; -use std::time::Duration; use api::v1::meta::MailboxMessage; -use common_meta::distributed_time_constants::REGION_LEASE_SECS; +use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_meta::key::datanode_table::RegionInfo; use common_meta::RegionIdent; @@ -31,9 +30,6 @@ use crate::procedure::region_migration::migration_end::RegionMigrationEnd; use crate::procedure::region_migration::{Context, State}; use crate::service::mailbox::Channel; -/// Uses lease time of a region as the timeout of closing a downgraded region. -const CLOSE_DOWNGRADED_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS); - #[derive(Debug, Serialize, Deserialize)] pub struct CloseDowngradedRegion; @@ -111,7 +107,7 @@ impl CloseDowngradedRegion { let ch = Channel::Datanode(downgrade_leader_datanode.id); let receiver = ctx .mailbox - .send(&ch, msg, CLOSE_DOWNGRADED_REGION_TIMEOUT) + .send(&ch, msg, default_distributed_time_constants().region_lease) .await?; match receiver.await { diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index eb8d73001f..6389e97512 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -17,7 +17,7 @@ use std::time::Duration; use api::v1::meta::MailboxMessage; use common_error::ext::BoxedError; -use common_meta::distributed_time_constants::REGION_LEASE_SECS; +use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::instruction::{ DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply, }; @@ -64,7 +64,7 @@ impl State for DowngradeLeaderRegion { let now = Instant::now(); // Ensures the `leader_region_lease_deadline` must exist after recovering. ctx.volatile_ctx - .set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS)); + .set_leader_region_lease_deadline(default_distributed_time_constants().region_lease); match self.downgrade_region_with_retry(ctx).await { Ok(_) => { @@ -250,14 +250,14 @@ impl DowngradeLeaderRegion { if let Some(last_connection_at) = last_connection_at { let now = current_time_millis(); let elapsed = now - last_connection_at; - let region_lease = Duration::from_secs(REGION_LEASE_SECS); + let region_lease = default_distributed_time_constants().region_lease; // It's safe to update the region leader lease deadline here because: // 1. The old region leader has already been marked as downgraded in metadata, // which means any attempts to renew its lease will be rejected. // 2. The pusher disconnect time record only gets removed when the datanode (from_peer) // establishes a new heartbeat connection stream. - if elapsed >= (REGION_LEASE_SECS * 1000) as i64 { + if elapsed >= (region_lease.as_secs() * 1000) as i64 { ctx.volatile_ctx.reset_leader_region_lease_deadline(); info!( "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {}", @@ -663,7 +663,8 @@ mod tests { let procedure_ctx = new_procedure_context(); let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let elapsed = timer.elapsed().as_secs(); - assert!(elapsed < REGION_LEASE_SECS / 2); + let region_lease = default_distributed_time_constants().region_lease.as_secs(); + assert!(elapsed < region_lease / 2); assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1)); assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none()); diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 7228108cb2..44388e5580 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -13,10 +13,9 @@ // limitations under the License. use std::any::Any; -use std::time::Duration; use api::v1::meta::MailboxMessage; -use common_meta::distributed_time_constants::REGION_LEASE_SECS; +use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; use common_meta::key::datanode_table::RegionInfo; use common_meta::RegionIdent; @@ -32,9 +31,6 @@ use crate::procedure::region_migration::flush_leader_region::PreFlushRegion; use crate::procedure::region_migration::{Context, State}; use crate::service::mailbox::Channel; -/// Uses lease time of a region as the timeout of opening a candidate region. -const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS); - #[derive(Debug, Serialize, Deserialize)] pub struct OpenCandidateRegion; @@ -143,7 +139,7 @@ impl OpenCandidateRegion { let now = Instant::now(); let receiver = ctx .mailbox - .send(&ch, msg, OPEN_CANDIDATE_REGION_TIMEOUT) + .send(&ch, msg, default_distributed_time_constants().region_lease) .await?; match receiver.await { diff --git a/src/meta-srv/src/selector/test_utils.rs b/src/meta-srv/src/selector/test_utils.rs index 966b6733e0..45289ee37e 100644 --- a/src/meta-srv/src/selector/test_utils.rs +++ b/src/meta-srv/src/selector/test_utils.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use common_meta::distributed_time_constants::{FLOWNODE_LEASE_SECS, REGION_LEASE_SECS}; +use common_meta::distributed_time_constants::default_distributed_time_constants; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::peer::Peer; use rand::prelude::SliceRandom; @@ -36,8 +36,10 @@ pub fn new_test_selector_context() -> SelectorContext { SelectorContext { server_addr: "127.0.0.1:3002".to_string(), - datanode_lease_secs: REGION_LEASE_SECS, - flownode_lease_secs: FLOWNODE_LEASE_SECS, + datanode_lease_secs: default_distributed_time_constants().region_lease.as_secs(), + flownode_lease_secs: default_distributed_time_constants() + .flownode_lease + .as_secs(), kv_backend, meta_peer_client, table_id: None, diff --git a/src/meta-srv/src/service/heartbeat.rs b/src/meta-srv/src/service/heartbeat.rs index d1a3a0e636..4545eb9518 100644 --- a/src/meta-srv/src/service/heartbeat.rs +++ b/src/meta-srv/src/service/heartbeat.rs @@ -98,6 +98,7 @@ impl heartbeat_server::Heartbeat for Metasrv { break; } } + error!(err; "Sending heartbeat response error"); if tx.send(Err(err)).await.is_err() { info!("ReceiverStream was dropped; shutting down"); diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index 57ffb9542b..0c044b1e7d 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -23,6 +23,7 @@ pub mod prom_query_gateway; pub mod region_server; use std::net::SocketAddr; +use std::time::Duration; use api::v1::health_check_server::{HealthCheck, HealthCheckServer}; use api::v1::{HealthCheckRequest, HealthCheckResponse}; @@ -72,6 +73,12 @@ pub struct GrpcOptions { pub runtime_size: usize, #[serde(default = "Default::default")] pub tls: TlsOption, + /// The HTTP/2 keep-alive interval. + #[serde(with = "humantime_serde")] + pub http2_keep_alive_interval: Duration, + /// The HTTP/2 keep-alive timeout. + #[serde(with = "humantime_serde")] + pub http2_keep_alive_timeout: Duration, } impl GrpcOptions { @@ -129,6 +136,8 @@ impl Default for GrpcOptions { flight_compression: FlightCompression::ArrowIpc, runtime_size: 8, tls: TlsOption::default(), + http2_keep_alive_interval: Duration::from_secs(10), + http2_keep_alive_timeout: Duration::from_secs(3), } } } diff --git a/src/servers/src/heartbeat_options.rs b/src/servers/src/heartbeat_options.rs index bb7d34e18c..9812625260 100644 --- a/src/servers/src/heartbeat_options.rs +++ b/src/servers/src/heartbeat_options.rs @@ -34,12 +34,10 @@ impl HeartbeatOptions { pub fn frontend_default() -> Self { Self { // Frontend can send heartbeat with a longer interval. - interval: Duration::from_millis( - distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS, - ), - retry_interval: Duration::from_millis( - distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS, + interval: distributed_time_constants::frontend_heartbeat_interval( + distributed_time_constants::BASE_HEARTBEAT_INTERVAL, ), + retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL, } } } @@ -47,10 +45,8 @@ impl HeartbeatOptions { impl Default for HeartbeatOptions { fn default() -> Self { Self { - interval: Duration::from_millis(distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS), - retry_interval: Duration::from_millis( - distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS, - ), + interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL, + retry_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL, } } } diff --git a/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs b/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs index f6390cd8f0..c21698b7bc 100644 --- a/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs +++ b/tests-fuzz/targets/failover/fuzz_failover_metric_regions.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use std::time::Duration; use arbitrary::{Arbitrary, Unstructured}; -use common_meta::distributed_time_constants; +use common_meta::distributed_time_constants::default_distributed_time_constants; use common_telemetry::info; use libfuzzer_sys::fuzz_target; use rand::{Rng, SeedableRng}; @@ -252,10 +252,7 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> { recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?; wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await; - tokio::time::sleep(Duration::from_secs( - distributed_time_constants::REGION_LEASE_SECS, - )) - .await; + tokio::time::sleep(default_distributed_time_constants().region_lease).await; // Validates value rows info!("Validates num of rows"); diff --git a/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs b/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs index 9faf82ec0b..9b14f752fb 100644 --- a/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs +++ b/tests-fuzz/targets/failover/fuzz_failover_mito_regions.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use std::time::Duration; use arbitrary::{Arbitrary, Unstructured}; -use common_meta::distributed_time_constants; +use common_meta::distributed_time_constants::default_distributed_time_constants; use common_telemetry::info; use common_time::util::current_time_millis; use futures::future::try_join_all; @@ -319,10 +319,7 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> { recover_pod_failure(ctx.kube.clone(), &ctx.namespace, &chaos_name).await?; wait_for_all_datanode_online(ctx.greptime.clone(), Duration::from_secs(60)).await; - tokio::time::sleep(Duration::from_secs( - distributed_time_constants::REGION_LEASE_SECS, - )) - .await; + tokio::time::sleep(default_distributed_time_constants().region_lease).await; // Validates value rows info!("Validates num of rows"); for (table_ctx, expected_rows) in table_ctxs.iter().zip(affected_rows) { diff --git a/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs b/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs index 7f7257c078..f1aaa1401f 100644 --- a/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs +++ b/tests-fuzz/targets/migration/fuzz_migrate_metric_regions.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use std::time::Duration; use arbitrary::{Arbitrary, Unstructured}; -use common_meta::distributed_time_constants; +use common_meta::distributed_time_constants::default_distributed_time_constants; use common_telemetry::info; use libfuzzer_sys::fuzz_target; use rand::{Rng, SeedableRng}; @@ -271,10 +271,7 @@ async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result< wait_for_migration(ctx, migration, &procedure_id).await; } - tokio::time::sleep(Duration::from_secs( - distributed_time_constants::REGION_LEASE_SECS, - )) - .await; + tokio::time::sleep(default_distributed_time_constants().region_lease).await; Ok(()) } diff --git a/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs index fe9fa3dc6e..2db51ee3ff 100644 --- a/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs +++ b/tests-fuzz/targets/migration/fuzz_migrate_mito_regions.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use std::time::Duration; use arbitrary::{Arbitrary, Unstructured}; -use common_meta::distributed_time_constants; +use common_meta::distributed_time_constants::default_distributed_time_constants; use common_telemetry::info; use libfuzzer_sys::fuzz_target; use rand::{Rng, SeedableRng}; @@ -262,10 +262,7 @@ async fn migrate_regions(ctx: &FuzzContext, migrations: &[Migration]) -> Result< .await; } - tokio::time::sleep(Duration::from_secs( - distributed_time_constants::REGION_LEASE_SECS, - )) - .await; + tokio::time::sleep(default_distributed_time_constants().region_lease).await; Ok(()) } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index f52d0d20a7..1868f1e71d 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1188,6 +1188,8 @@ max_recv_message_size = "512MiB" max_send_message_size = "512MiB" flight_compression = "arrow_ipc" runtime_size = 8 +http2_keep_alive_interval = "10s" +http2_keep_alive_timeout = "3s" [grpc.tls] mode = "disable"