fix: convert to ReadableSize & Durations (#2594)

* fix: convert to ReadableSize & Durations

* fix: change more grpc sender/recv message size to ReadableSize

fix: format

fix: cargo fmt

fix: change cmd test to use durations

fix: revert metaclient change

fix: convert default fields in meta client options

fix: human serde meta client durations

* fix: remove milisecond postfix in heartbeat option

* fix: humantime serde on heartbeat

* fix: update config example

* fix: update integration test config

* fix: address pr comments

* fix: fix pr comment on default annotation
This commit is contained in:
Yun Chen
2023-10-13 16:28:29 +13:00
committed by GitHub
parent 9a8fc08e6a
commit f859932745
21 changed files with 134 additions and 88 deletions

1
Cargo.lock generated
View File

@@ -5287,6 +5287,7 @@ dependencies = [
"datatypes",
"etcd-client",
"futures",
"humantime-serde",
"meta-srv",
"rand",
"serde",

View File

@@ -13,19 +13,19 @@ rpc_runtime_size = 8
require_lease_before_startup = false
[heartbeat]
# Interval for sending heartbeat messages to the Metasrv in milliseconds, 3000 by default.
interval_millis = 3000
# Interval for sending heartbeat messages to the Metasrv, 3 seconds by default.
interval = "3s"
# Metasrv client options.
[meta_client]
# Metasrv address list.
metasrv_addrs = ["127.0.0.1:3002"]
# Heartbeat timeout in milliseconds, 500 by default.
heartbeat_timeout_millis = 500
# Operation timeout in milliseconds, 3000 by default.
timeout_millis = 3000
# Connect server timeout in milliseconds, 5000 by default.
connect_timeout_millis = 1000
# Heartbeat timeout, 500 milliseconds by default.
heartbeat_timeout = "500ms"
# Operation timeout, 3 seconds by default.
timeout = "3s"
# Connect server timeout, 1 second by default.
connect_timeout = "1s"
# `TCP_NODELAY` option for accepted connections, true by default.
tcp_nodelay = true

View File

@@ -2,10 +2,10 @@
mode = "distributed"
[heartbeat]
# Interval for sending heartbeat task to the Metasrv in milliseconds, 5000 by default.
interval_millis = 5000
# Interval for retry sending heartbeat task in milliseconds, 5000 by default.
retry_interval_millis = 5000
# Interval for sending heartbeat task to the Metasrv, 5 seconds by default.
interval = "5s"
# Interval for retry sending heartbeat task, 5 seconds by default.
retry_interval = "5s"
# HTTP server options, see `standalone.example.toml`.
[http]
@@ -59,10 +59,10 @@ enable = true
# Metasrv client options, see `datanode.example.toml`.
[meta_client]
metasrv_addrs = ["127.0.0.1:3002"]
timeout_millis = 3000
timeout = "3s"
# DDL timeouts options.
ddl_timeout_millis = 10000
connect_timeout_millis = 1000
ddl_timeout = "10s"
connect_timeout = "1s"
tcp_nodelay = true
# Log options, see `standalone.example.toml`

View File

@@ -32,6 +32,6 @@ retry_delay = "500ms"
# [datanode]
# # Datanode client options.
# [datanode.client_options]
# timeout_millis = 10000
# connect_timeout_millis = 10000
# timeout = "10s"
# connect_timeout = "10s"
# tcp_nodelay = true

View File

@@ -139,11 +139,19 @@ impl Client {
}
fn max_grpc_recv_message_size(&self) -> usize {
self.inner.channel_manager.config().max_recv_message_size
self.inner
.channel_manager
.config()
.max_recv_message_size
.as_bytes() as usize
}
fn max_grpc_send_message_size(&self) -> usize {
self.inner.channel_manager.config().max_send_message_size
self.inner
.channel_manager
.config()
.max_send_message_size
.as_bytes() as usize
}
pub(crate) fn make_flight_client(&self) -> Result<FlightClient> {

View File

@@ -188,6 +188,7 @@ mod tests {
use common_base::readable_size::ReadableSize;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{CompactionConfig, FileConfig, ObjectStoreConfig, RegionManifestConfig};
use servers::heartbeat_options::HeartbeatOptions;
use servers::Mode;
use super::*;
@@ -204,11 +205,14 @@ mod tests {
rpc_hostname = "127.0.0.1"
rpc_runtime_size = 8
[heartbeat]
interval = "300ms"
[meta_client]
metasrv_addrs = ["127.0.0.1:3002"]
timeout_millis = 3000
connect_timeout_millis = 5000
ddl_timeout_millis= 10000
timeout = "3s"
connect_timeout = "5s"
ddl_timeout = "10s"
tcp_nodelay = true
[wal]
@@ -257,19 +261,26 @@ mod tests {
assert_eq!(1024 * 1024 * 1024 * 50, options.wal.purge_threshold.0);
assert!(!options.wal.sync_write);
let HeartbeatOptions {
interval: heart_beat_interval,
..
} = options.heartbeat;
assert_eq!(300, heart_beat_interval.as_millis());
let MetaClientOptions {
metasrv_addrs: metasrv_addr,
timeout_millis,
connect_timeout_millis,
timeout,
connect_timeout,
ddl_timeout,
tcp_nodelay,
ddl_timeout_millis,
..
} = options.meta_client.unwrap();
assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
assert_eq!(5000, connect_timeout_millis);
assert_eq!(10000, ddl_timeout_millis);
assert_eq!(3000, timeout_millis);
assert_eq!(5000, connect_timeout.as_millis());
assert_eq!(10000, ddl_timeout.as_millis());
assert_eq!(3000, timeout.as_millis());
assert!(tcp_nodelay);
assert_eq!("/tmp/greptimedb/", options.storage.data_home);
assert!(matches!(
@@ -363,8 +374,8 @@ mod tests {
rpc_runtime_size = 8
[meta_client]
timeout_millis = 3000
connect_timeout_millis = 5000
timeout = "3s"
connect_timeout = "5s"
tcp_nodelay = true
[wal]

View File

@@ -353,8 +353,8 @@ mod tests {
addr = "127.0.0.1:4000"
[meta_client]
timeout_millis = 3000
connect_timeout_millis = 5000
timeout = "3s"
connect_timeout = "5s"
tcp_nodelay = true
[mysql]

View File

@@ -144,8 +144,8 @@ mod tests {
mysql_runtime_size = 2
[meta_client]
timeout_millis = 3000
connect_timeout_millis = 5000
timeout = "3s"
connect_timeout = "5s"
tcp_nodelay = true
[wal]

View File

@@ -16,6 +16,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_telemetry::info;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
@@ -31,8 +32,8 @@ use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, InvalidTlsCon
const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60;
pub const DEFAULT_GRPC_REQUEST_TIMEOUT_SECS: u64 = 10;
pub const DEFAULT_GRPC_CONNECT_TIMEOUT_SECS: u64 = 1;
pub const DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE: usize = 512 * 1024 * 1024;
pub const DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE: usize = 512 * 1024 * 1024;
pub const DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE: ReadableSize = ReadableSize::mb(512);
pub const DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE: ReadableSize = ReadableSize::mb(512);
lazy_static! {
static ref ID: AtomicU64 = AtomicU64::new(0);
@@ -250,9 +251,9 @@ pub struct ChannelConfig {
pub tcp_nodelay: bool,
pub client_tls: Option<ClientTlsOption>,
// Max gRPC receiving(decoding) message size
pub max_recv_message_size: usize,
pub max_recv_message_size: ReadableSize,
// Max gRPC sending(encoding) message size
pub max_send_message_size: usize,
pub max_send_message_size: ReadableSize,
}
impl Default for ChannelConfig {

View File

@@ -333,9 +333,9 @@ pub struct DatanodeOptions {
pub rpc_hostname: Option<String>,
pub rpc_runtime_size: usize,
// Max gRPC receiving(decoding) message size
pub rpc_max_recv_message_size: usize,
pub rpc_max_recv_message_size: ReadableSize,
// Max gRPC sending(encoding) message size
pub rpc_max_send_message_size: usize,
pub rpc_max_send_message_size: ReadableSize,
pub heartbeat: HeartbeatOptions,
pub http: HttpOptions,
pub meta_client: Option<MetaClientOptions>,

View File

@@ -69,7 +69,7 @@ impl HeartbeatTask {
) -> Result<Self> {
let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
region_server.clone(),
opts.heartbeat.interval_millis,
opts.heartbeat.interval.as_millis() as u64,
));
let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
@@ -86,7 +86,7 @@ impl HeartbeatTask {
running: Arc::new(AtomicBool::new(false)),
meta_client: Arc::new(meta_client),
region_server,
interval: opts.heartbeat.interval_millis,
interval: opts.heartbeat.interval.as_millis() as u64,
resp_handler_executor,
region_alive_keeper,
})
@@ -332,14 +332,14 @@ pub async fn new_metasrv_client(
let member_id = node_id;
let config = ChannelConfig::new()
.timeout(Duration::from_millis(meta_config.timeout_millis))
.connect_timeout(Duration::from_millis(meta_config.connect_timeout_millis))
.timeout(meta_config.timeout)
.connect_timeout(meta_config.connect_timeout)
.tcp_nodelay(meta_config.tcp_nodelay);
let channel_manager = ChannelManager::with_config(config.clone());
let heartbeat_channel_manager = ChannelManager::with_config(
config
.timeout(Duration::from_millis(meta_config.heartbeat_timeout_millis))
.connect_timeout(Duration::from_millis(meta_config.heartbeat_timeout_millis)),
.timeout(meta_config.timeout)
.connect_timeout(meta_config.connect_timeout),
);
let mut meta_client = MetaClientBuilder::new(cluster_id, member_id, Role::Datanode)

View File

@@ -40,8 +40,8 @@ impl Services {
let region_server_handler = Some(Arc::new(region_server.clone()) as _);
let runtime = region_server.runtime();
let grpc_config = GrpcServerConfig {
max_recv_message_size: opts.rpc_max_recv_message_size,
max_send_message_size: opts.rpc_max_send_message_size,
max_recv_message_size: opts.rpc_max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.rpc_max_send_message_size.as_bytes() as usize,
};
Ok(Self {

View File

@@ -49,8 +49,8 @@ impl HeartbeatTask {
) -> Self {
HeartbeatTask {
meta_client,
report_interval: heartbeat_opts.interval_millis,
retry_interval: heartbeat_opts.retry_interval_millis,
report_interval: heartbeat_opts.interval.as_millis() as u64,
retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,
resp_handler_executor,
}
}

View File

@@ -22,7 +22,6 @@ mod script;
mod standalone;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::Role;
use async_trait::async_trait;
@@ -231,14 +230,12 @@ impl Instance {
);
let channel_config = ChannelConfig::new()
.timeout(Duration::from_millis(meta_client_options.timeout_millis))
.connect_timeout(Duration::from_millis(
meta_client_options.connect_timeout_millis,
))
.timeout(meta_client_options.timeout)
.connect_timeout(meta_client_options.connect_timeout)
.tcp_nodelay(meta_client_options.tcp_nodelay);
let ddl_channel_config = channel_config.clone().timeout(Duration::from_millis(
meta_client_options.ddl_timeout_millis,
));
let ddl_channel_config = channel_config
.clone()
.timeout(meta_client_options.ddl_timeout);
let channel_manager = ChannelManager::with_config(channel_config);
let ddl_channel_manager = ChannelManager::with_config(ddl_channel_config);

View File

@@ -68,8 +68,8 @@ impl Services {
);
let grpc_config = GrpcServerConfig {
max_recv_message_size: opts.max_recv_message_size,
max_send_message_size: opts.max_send_message_size,
max_recv_message_size: opts.max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.max_send_message_size.as_bytes() as usize,
};
let grpc_server = GrpcServer::new(
Some(grpc_config),

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::readable_size::ReadableSize;
use common_grpc::channel_manager::{
DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
};
@@ -22,9 +23,9 @@ pub struct GrpcOptions {
pub addr: String,
pub runtime_size: usize,
// Max gRPC receiving(decoding) message size
pub max_recv_message_size: usize,
pub max_recv_message_size: ReadableSize,
// Max gRPC sending(encoding) message size
pub max_send_message_size: usize,
pub max_send_message_size: ReadableSize,
}
impl Default for GrpcOptions {

View File

@@ -14,6 +14,7 @@ common-macro = { workspace = true }
common-meta = { workspace = true }
common-telemetry = { workspace = true }
etcd-client.workspace = true
humantime-serde.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use serde::{Deserialize, Serialize};
pub mod client;
@@ -21,31 +23,45 @@ pub mod error;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct MetaClientOptions {
pub metasrv_addrs: Vec<String>,
pub timeout_millis: u64,
#[serde(default = "default_heartbeat_timeout_millis")]
pub heartbeat_timeout_millis: u64,
#[serde(default = "default_ddl_timeout_millis")]
pub ddl_timeout_millis: u64,
pub connect_timeout_millis: u64,
#[serde(default = "default_timeout")]
#[serde(with = "humantime_serde")]
pub timeout: Duration,
#[serde(default = "default_heartbeat_timeout")]
#[serde(with = "humantime_serde")]
pub heartbeat_timeout: Duration,
#[serde(default = "default_ddl_timeout")]
#[serde(with = "humantime_serde")]
pub ddl_timeout: Duration,
#[serde(default = "default_connect_timeout")]
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,
pub tcp_nodelay: bool,
}
fn default_heartbeat_timeout_millis() -> u64 {
500u64
fn default_heartbeat_timeout() -> Duration {
Duration::from_millis(500u64)
}
fn default_ddl_timeout_millis() -> u64 {
10_000u64
fn default_ddl_timeout() -> Duration {
Duration::from_millis(10_000u64)
}
fn default_connect_timeout() -> Duration {
Duration::from_millis(1_000u64)
}
fn default_timeout() -> Duration {
Duration::from_millis(3_000u64)
}
impl Default for MetaClientOptions {
fn default() -> Self {
Self {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout_millis: 3_000u64,
heartbeat_timeout_millis: default_heartbeat_timeout_millis(),
ddl_timeout_millis: default_ddl_timeout_millis(),
connect_timeout_millis: 1_000u64,
timeout: default_timeout(),
heartbeat_timeout: default_heartbeat_timeout(),
ddl_timeout: default_ddl_timeout(),
connect_timeout: default_connect_timeout(),
tcp_nodelay: true,
}
}

View File

@@ -95,8 +95,8 @@ pub struct GrpcServerConfig {
impl Default for GrpcServerConfig {
fn default() -> Self {
Self {
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
}
}
}

View File

@@ -12,14 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use common_meta::distributed_time_constants;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct HeartbeatOptions {
pub interval_millis: u64,
pub retry_interval_millis: u64,
#[serde(with = "humantime_serde")]
pub interval: Duration,
#[serde(with = "humantime_serde")]
pub retry_interval: Duration,
}
impl HeartbeatOptions {
@@ -30,8 +34,12 @@ impl HeartbeatOptions {
pub fn frontend_default() -> Self {
Self {
// Frontend can send heartbeat with a longer interval.
interval_millis: distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
retry_interval_millis: distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
interval: Duration::from_millis(
distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
),
retry_interval: Duration::from_millis(
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
),
}
}
}
@@ -39,8 +47,10 @@ impl HeartbeatOptions {
impl Default for HeartbeatOptions {
fn default() -> Self {
Self {
interval_millis: distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
retry_interval_millis: distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
interval: Duration::from_millis(distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS),
retry_interval: Duration::from_millis(
distributed_time_constants::HEARTBEAT_INTERVAL_MILLIS,
),
}
}
}

View File

@@ -610,13 +610,13 @@ node_id = 0
require_lease_before_startup = true
rpc_addr = "127.0.0.1:3001"
rpc_runtime_size = 8
rpc_max_recv_message_size = 536870912
rpc_max_send_message_size = 536870912
rpc_max_recv_message_size = "512MiB"
rpc_max_send_message_size = "512MiB"
enable_telemetry = true
[heartbeat]
interval_millis = 3000
retry_interval_millis = 3000
interval = "3s"
retry_interval = "3s"
[http]
addr = "127.0.0.1:4000"