From f8599327458da5892909b00bd6b5f71125d50abe Mon Sep 17 00:00:00 2001 From: Yun Chen Date: Fri, 13 Oct 2023 16:28:29 +1300 Subject: [PATCH] fix: convert to ReadableSize & Durations (#2594) * fix: convert to ReadableSize & Durations * fix: change more grpc sender/recv message size to ReadableSize fix: format fix: cargo fmt fix: change cmd test to use durations fix: revert metaclient change fix: convert default fields in meta client options fix: human serde meta client durations * fix: remove milisecond postfix in heartbeat option * fix: humantime serde on heartbeat * fix: update config example * fix: update integration test config * fix: address pr comments * fix: fix pr comment on default annotation --- Cargo.lock | 1 + config/datanode.example.toml | 16 ++++----- config/frontend.example.toml | 14 ++++---- config/metasrv.example.toml | 4 +-- src/client/src/client.rs | 12 +++++-- src/cmd/src/datanode.rs | 33 ++++++++++++------- src/cmd/src/frontend.rs | 4 +-- src/cmd/src/options.rs | 4 +-- src/common/grpc/src/channel_manager.rs | 9 ++--- src/datanode/src/config.rs | 4 +-- src/datanode/src/heartbeat.rs | 12 +++---- src/datanode/src/server.rs | 4 +-- src/frontend/src/heartbeat.rs | 4 +-- src/frontend/src/instance.rs | 13 +++----- src/frontend/src/server.rs | 4 +-- src/frontend/src/service_config/grpc.rs | 5 +-- src/meta-client/Cargo.toml | 1 + src/meta-client/src/lib.rs | 44 +++++++++++++++++-------- src/servers/src/grpc.rs | 4 +-- src/servers/src/heartbeat_options.rs | 22 +++++++++---- tests-integration/tests/http.rs | 8 ++--- 21 files changed, 134 insertions(+), 88 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a8a9528140..2730af5907 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5287,6 +5287,7 @@ dependencies = [ "datatypes", "etcd-client", "futures", + "humantime-serde", "meta-srv", "rand", "serde", diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 77c8dd7959..d4058f6e3f 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -13,19 +13,19 @@ rpc_runtime_size = 8 require_lease_before_startup = false [heartbeat] -# Interval for sending heartbeat messages to the Metasrv in milliseconds, 3000 by default. -interval_millis = 3000 +# Interval for sending heartbeat messages to the Metasrv, 3 seconds by default. +interval = "3s" # Metasrv client options. [meta_client] # Metasrv address list. metasrv_addrs = ["127.0.0.1:3002"] -# Heartbeat timeout in milliseconds, 500 by default. -heartbeat_timeout_millis = 500 -# Operation timeout in milliseconds, 3000 by default. -timeout_millis = 3000 -# Connect server timeout in milliseconds, 5000 by default. -connect_timeout_millis = 1000 +# Heartbeat timeout, 500 milliseconds by default. +heartbeat_timeout = "500ms" +# Operation timeout, 3 seconds by default. +timeout = "3s" +# Connect server timeout, 1 second by default. +connect_timeout = "1s" # `TCP_NODELAY` option for accepted connections, true by default. tcp_nodelay = true diff --git a/config/frontend.example.toml b/config/frontend.example.toml index e87936551f..566ed42f9e 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -2,10 +2,10 @@ mode = "distributed" [heartbeat] -# Interval for sending heartbeat task to the Metasrv in milliseconds, 5000 by default. -interval_millis = 5000 -# Interval for retry sending heartbeat task in milliseconds, 5000 by default. -retry_interval_millis = 5000 +# Interval for sending heartbeat task to the Metasrv, 5 seconds by default. +interval = "5s" +# Interval for retry sending heartbeat task, 5 seconds by default. +retry_interval = "5s" # HTTP server options, see `standalone.example.toml`. [http] @@ -59,10 +59,10 @@ enable = true # Metasrv client options, see `datanode.example.toml`. [meta_client] metasrv_addrs = ["127.0.0.1:3002"] -timeout_millis = 3000 +timeout = "3s" # DDL timeouts options. -ddl_timeout_millis = 10000 -connect_timeout_millis = 1000 +ddl_timeout = "10s" +connect_timeout = "1s" tcp_nodelay = true # Log options, see `standalone.example.toml` diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index deb5ec512d..6832573b05 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -32,6 +32,6 @@ retry_delay = "500ms" # [datanode] # # Datanode client options. # [datanode.client_options] -# timeout_millis = 10000 -# connect_timeout_millis = 10000 +# timeout = "10s" +# connect_timeout = "10s" # tcp_nodelay = true diff --git a/src/client/src/client.rs b/src/client/src/client.rs index c5457e5874..137565bdb6 100644 --- a/src/client/src/client.rs +++ b/src/client/src/client.rs @@ -139,11 +139,19 @@ impl Client { } fn max_grpc_recv_message_size(&self) -> usize { - self.inner.channel_manager.config().max_recv_message_size + self.inner + .channel_manager + .config() + .max_recv_message_size + .as_bytes() as usize } fn max_grpc_send_message_size(&self) -> usize { - self.inner.channel_manager.config().max_send_message_size + self.inner + .channel_manager + .config() + .max_send_message_size + .as_bytes() as usize } pub(crate) fn make_flight_client(&self) -> Result { diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 63c7a073b6..fbf4d2cb73 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -188,6 +188,7 @@ mod tests { use common_base::readable_size::ReadableSize; use common_test_util::temp_dir::create_named_temp_file; use datanode::config::{CompactionConfig, FileConfig, ObjectStoreConfig, RegionManifestConfig}; + use servers::heartbeat_options::HeartbeatOptions; use servers::Mode; use super::*; @@ -204,11 +205,14 @@ mod tests { rpc_hostname = "127.0.0.1" rpc_runtime_size = 8 + [heartbeat] + interval = "300ms" + [meta_client] metasrv_addrs = ["127.0.0.1:3002"] - timeout_millis = 3000 - connect_timeout_millis = 5000 - ddl_timeout_millis= 10000 + timeout = "3s" + connect_timeout = "5s" + ddl_timeout = "10s" tcp_nodelay = true [wal] @@ -257,19 +261,26 @@ mod tests { assert_eq!(1024 * 1024 * 1024 * 50, options.wal.purge_threshold.0); assert!(!options.wal.sync_write); + let HeartbeatOptions { + interval: heart_beat_interval, + .. + } = options.heartbeat; + + assert_eq!(300, heart_beat_interval.as_millis()); + let MetaClientOptions { metasrv_addrs: metasrv_addr, - timeout_millis, - connect_timeout_millis, + timeout, + connect_timeout, + ddl_timeout, tcp_nodelay, - ddl_timeout_millis, .. } = options.meta_client.unwrap(); assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr); - assert_eq!(5000, connect_timeout_millis); - assert_eq!(10000, ddl_timeout_millis); - assert_eq!(3000, timeout_millis); + assert_eq!(5000, connect_timeout.as_millis()); + assert_eq!(10000, ddl_timeout.as_millis()); + assert_eq!(3000, timeout.as_millis()); assert!(tcp_nodelay); assert_eq!("/tmp/greptimedb/", options.storage.data_home); assert!(matches!( @@ -363,8 +374,8 @@ mod tests { rpc_runtime_size = 8 [meta_client] - timeout_millis = 3000 - connect_timeout_millis = 5000 + timeout = "3s" + connect_timeout = "5s" tcp_nodelay = true [wal] diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index e6f5459046..c39ceb9ef5 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -353,8 +353,8 @@ mod tests { addr = "127.0.0.1:4000" [meta_client] - timeout_millis = 3000 - connect_timeout_millis = 5000 + timeout = "3s" + connect_timeout = "5s" tcp_nodelay = true [mysql] diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index a8e6d73e35..dbd260405d 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -144,8 +144,8 @@ mod tests { mysql_runtime_size = 2 [meta_client] - timeout_millis = 3000 - connect_timeout_millis = 5000 + timeout = "3s" + connect_timeout = "5s" tcp_nodelay = true [wal] diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs index d1e372b677..98451f13ee 100644 --- a/src/common/grpc/src/channel_manager.rs +++ b/src/common/grpc/src/channel_manager.rs @@ -16,6 +16,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; +use common_base::readable_size::ReadableSize; use common_telemetry::info; use dashmap::mapref::entry::Entry; use dashmap::DashMap; @@ -31,8 +32,8 @@ use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, InvalidTlsCon const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60; pub const DEFAULT_GRPC_REQUEST_TIMEOUT_SECS: u64 = 10; pub const DEFAULT_GRPC_CONNECT_TIMEOUT_SECS: u64 = 1; -pub const DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE: usize = 512 * 1024 * 1024; -pub const DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE: usize = 512 * 1024 * 1024; +pub const DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE: ReadableSize = ReadableSize::mb(512); +pub const DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE: ReadableSize = ReadableSize::mb(512); lazy_static! { static ref ID: AtomicU64 = AtomicU64::new(0); @@ -250,9 +251,9 @@ pub struct ChannelConfig { pub tcp_nodelay: bool, pub client_tls: Option, // Max gRPC receiving(decoding) message size - pub max_recv_message_size: usize, + pub max_recv_message_size: ReadableSize, // Max gRPC sending(encoding) message size - pub max_send_message_size: usize, + pub max_send_message_size: ReadableSize, } impl Default for ChannelConfig { diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 11b30aea07..5730ce9c85 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -333,9 +333,9 @@ pub struct DatanodeOptions { pub rpc_hostname: Option, pub rpc_runtime_size: usize, // Max gRPC receiving(decoding) message size - pub rpc_max_recv_message_size: usize, + pub rpc_max_recv_message_size: ReadableSize, // Max gRPC sending(encoding) message size - pub rpc_max_send_message_size: usize, + pub rpc_max_send_message_size: ReadableSize, pub heartbeat: HeartbeatOptions, pub http: HttpOptions, pub meta_client: Option, diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 44b48a2236..5fc9a31def 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -69,7 +69,7 @@ impl HeartbeatTask { ) -> Result { let region_alive_keeper = Arc::new(RegionAliveKeeper::new( region_server.clone(), - opts.heartbeat.interval_millis, + opts.heartbeat.interval.as_millis() as u64, )); let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), @@ -86,7 +86,7 @@ impl HeartbeatTask { running: Arc::new(AtomicBool::new(false)), meta_client: Arc::new(meta_client), region_server, - interval: opts.heartbeat.interval_millis, + interval: opts.heartbeat.interval.as_millis() as u64, resp_handler_executor, region_alive_keeper, }) @@ -332,14 +332,14 @@ pub async fn new_metasrv_client( let member_id = node_id; let config = ChannelConfig::new() - .timeout(Duration::from_millis(meta_config.timeout_millis)) - .connect_timeout(Duration::from_millis(meta_config.connect_timeout_millis)) + .timeout(meta_config.timeout) + .connect_timeout(meta_config.connect_timeout) .tcp_nodelay(meta_config.tcp_nodelay); let channel_manager = ChannelManager::with_config(config.clone()); let heartbeat_channel_manager = ChannelManager::with_config( config - .timeout(Duration::from_millis(meta_config.heartbeat_timeout_millis)) - .connect_timeout(Duration::from_millis(meta_config.heartbeat_timeout_millis)), + .timeout(meta_config.timeout) + .connect_timeout(meta_config.connect_timeout), ); let mut meta_client = MetaClientBuilder::new(cluster_id, member_id, Role::Datanode) diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index 1847dc4c99..ad198af813 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -40,8 +40,8 @@ impl Services { let region_server_handler = Some(Arc::new(region_server.clone()) as _); let runtime = region_server.runtime(); let grpc_config = GrpcServerConfig { - max_recv_message_size: opts.rpc_max_recv_message_size, - max_send_message_size: opts.rpc_max_send_message_size, + max_recv_message_size: opts.rpc_max_recv_message_size.as_bytes() as usize, + max_send_message_size: opts.rpc_max_send_message_size.as_bytes() as usize, }; Ok(Self { diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index 4f96e5da70..ffe52eece8 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -49,8 +49,8 @@ impl HeartbeatTask { ) -> Self { HeartbeatTask { meta_client, - report_interval: heartbeat_opts.interval_millis, - retry_interval: heartbeat_opts.retry_interval_millis, + report_interval: heartbeat_opts.interval.as_millis() as u64, + retry_interval: heartbeat_opts.retry_interval.as_millis() as u64, resp_handler_executor, } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 8828bdd88a..b7dca70233 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -22,7 +22,6 @@ mod script; mod standalone; use std::collections::HashMap; use std::sync::Arc; -use std::time::Duration; use api::v1::meta::Role; use async_trait::async_trait; @@ -231,14 +230,12 @@ impl Instance { ); let channel_config = ChannelConfig::new() - .timeout(Duration::from_millis(meta_client_options.timeout_millis)) - .connect_timeout(Duration::from_millis( - meta_client_options.connect_timeout_millis, - )) + .timeout(meta_client_options.timeout) + .connect_timeout(meta_client_options.connect_timeout) .tcp_nodelay(meta_client_options.tcp_nodelay); - let ddl_channel_config = channel_config.clone().timeout(Duration::from_millis( - meta_client_options.ddl_timeout_millis, - )); + let ddl_channel_config = channel_config + .clone() + .timeout(meta_client_options.ddl_timeout); let channel_manager = ChannelManager::with_config(channel_config); let ddl_channel_manager = ChannelManager::with_config(ddl_channel_config); diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 073bc0d53e..5105c1d14e 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -68,8 +68,8 @@ impl Services { ); let grpc_config = GrpcServerConfig { - max_recv_message_size: opts.max_recv_message_size, - max_send_message_size: opts.max_send_message_size, + max_recv_message_size: opts.max_recv_message_size.as_bytes() as usize, + max_send_message_size: opts.max_send_message_size.as_bytes() as usize, }; let grpc_server = GrpcServer::new( Some(grpc_config), diff --git a/src/frontend/src/service_config/grpc.rs b/src/frontend/src/service_config/grpc.rs index 4a83b5b849..899a22d3eb 100644 --- a/src/frontend/src/service_config/grpc.rs +++ b/src/frontend/src/service_config/grpc.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_base::readable_size::ReadableSize; use common_grpc::channel_manager::{ DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, }; @@ -22,9 +23,9 @@ pub struct GrpcOptions { pub addr: String, pub runtime_size: usize, // Max gRPC receiving(decoding) message size - pub max_recv_message_size: usize, + pub max_recv_message_size: ReadableSize, // Max gRPC sending(encoding) message size - pub max_send_message_size: usize, + pub max_send_message_size: ReadableSize, } impl Default for GrpcOptions { diff --git a/src/meta-client/Cargo.toml b/src/meta-client/Cargo.toml index 7f8a9035d8..0a38e1bf69 100644 --- a/src/meta-client/Cargo.toml +++ b/src/meta-client/Cargo.toml @@ -14,6 +14,7 @@ common-macro = { workspace = true } common-meta = { workspace = true } common-telemetry = { workspace = true } etcd-client.workspace = true +humantime-serde.workspace = true rand.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/src/meta-client/src/lib.rs b/src/meta-client/src/lib.rs index ec97735736..29be282bc5 100644 --- a/src/meta-client/src/lib.rs +++ b/src/meta-client/src/lib.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + use serde::{Deserialize, Serialize}; pub mod client; @@ -21,31 +23,45 @@ pub mod error; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct MetaClientOptions { pub metasrv_addrs: Vec, - pub timeout_millis: u64, - #[serde(default = "default_heartbeat_timeout_millis")] - pub heartbeat_timeout_millis: u64, - #[serde(default = "default_ddl_timeout_millis")] - pub ddl_timeout_millis: u64, - pub connect_timeout_millis: u64, + #[serde(default = "default_timeout")] + #[serde(with = "humantime_serde")] + pub timeout: Duration, + #[serde(default = "default_heartbeat_timeout")] + #[serde(with = "humantime_serde")] + pub heartbeat_timeout: Duration, + #[serde(default = "default_ddl_timeout")] + #[serde(with = "humantime_serde")] + pub ddl_timeout: Duration, + #[serde(default = "default_connect_timeout")] + #[serde(with = "humantime_serde")] + pub connect_timeout: Duration, pub tcp_nodelay: bool, } -fn default_heartbeat_timeout_millis() -> u64 { - 500u64 +fn default_heartbeat_timeout() -> Duration { + Duration::from_millis(500u64) } -fn default_ddl_timeout_millis() -> u64 { - 10_000u64 +fn default_ddl_timeout() -> Duration { + Duration::from_millis(10_000u64) +} + +fn default_connect_timeout() -> Duration { + Duration::from_millis(1_000u64) +} + +fn default_timeout() -> Duration { + Duration::from_millis(3_000u64) } impl Default for MetaClientOptions { fn default() -> Self { Self { metasrv_addrs: vec!["127.0.0.1:3002".to_string()], - timeout_millis: 3_000u64, - heartbeat_timeout_millis: default_heartbeat_timeout_millis(), - ddl_timeout_millis: default_ddl_timeout_millis(), - connect_timeout_millis: 1_000u64, + timeout: default_timeout(), + heartbeat_timeout: default_heartbeat_timeout(), + ddl_timeout: default_ddl_timeout(), + connect_timeout: default_connect_timeout(), tcp_nodelay: true, } } diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index baa0a73d89..b58f06e15a 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -95,8 +95,8 @@ pub struct GrpcServerConfig { impl Default for GrpcServerConfig { fn default() -> Self { Self { - max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, - max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, + max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize, + max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize, } } } diff --git a/src/servers/src/heartbeat_options.rs b/src/servers/src/heartbeat_options.rs index 75a2875c96..bb7d34e18c 100644 --- a/src/servers/src/heartbeat_options.rs +++ b/src/servers/src/heartbeat_options.rs @@ -12,14 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Duration; + use common_meta::distributed_time_constants; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(default)] pub struct HeartbeatOptions { - pub interval_millis: u64, - pub retry_interval_millis: u64, + #[serde(with = "humantime_serde")] + pub interval: Duration, + #[serde(with = "humantime_serde")] + pub retry_interval: Duration, } impl HeartbeatOptions { @@ -30,8 +34,12 @@ impl HeartbeatOptions { pub fn frontend_default() -> Self { Self { // Frontend can send heartbeat with a longer interval. - interval_millis: distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS, - retry_interval_millis: distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS, + interval: Duration::from_millis( + distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS, + ), + retry_interval: Duration::from_millis( + distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS, + ), } } } @@ -39,8 +47,10 @@ impl HeartbeatOptions { impl Default for HeartbeatOptions { fn default() -> Self { Self { - interval_millis: distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS, - retry_interval_millis: distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS, + interval: Duration::from_millis(distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS), + retry_interval: Duration::from_millis( + distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS, + ), } } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index ba4d9df2d6..7fbf7ea1d3 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -610,13 +610,13 @@ node_id = 0 require_lease_before_startup = true rpc_addr = "127.0.0.1:3001" rpc_runtime_size = 8 -rpc_max_recv_message_size = 536870912 -rpc_max_send_message_size = 536870912 +rpc_max_recv_message_size = "512MiB" +rpc_max_send_message_size = "512MiB" enable_telemetry = true [heartbeat] -interval_millis = 3000 -retry_interval_millis = 3000 +interval = "3s" +retry_interval = "3s" [http] addr = "127.0.0.1:4000"