diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 6f76fd9387..874d13a244 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -25,3 +25,11 @@ use_memory_store = false max_retry_times = 3 # Initial retry delay of procedures, increases exponentially retry_delay = "500ms" + +# # Datanode options. +# [datanode] +# # Datanode client options. +# [datanode.client_options] +# timeout_millis = 10000 +# connect_timeout_millis = 10000 +# tcp_nodelay = true diff --git a/src/client/src/client_manager.rs b/src/client/src/client_manager.rs index 6e85908e83..eeda1510c8 100644 --- a/src/client/src/client_manager.rs +++ b/src/client/src/client_manager.rs @@ -21,8 +21,6 @@ use moka::future::{Cache, CacheBuilder}; use crate::Client; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); - pub struct DatanodeClients { channel_manager: ChannelManager, clients: Cache, @@ -30,17 +28,7 @@ pub struct DatanodeClients { impl Default for DatanodeClients { fn default() -> Self { - let config = ChannelConfig::new() - .timeout(DEFAULT_TIMEOUT) - .connect_timeout(DEFAULT_TIMEOUT); - - Self { - channel_manager: ChannelManager::with_config(config), - clients: CacheBuilder::new(1024) - .time_to_live(Duration::from_secs(30 * 60)) - .time_to_idle(Duration::from_secs(5 * 60)) - .build(), - } + Self::new(ChannelConfig::new()) } } @@ -53,6 +41,16 @@ impl Debug for DatanodeClients { } impl DatanodeClients { + pub fn new(config: ChannelConfig) -> Self { + Self { + channel_manager: ChannelManager::with_config(config), + clients: CacheBuilder::new(1024) + .time_to_live(Duration::from_secs(30 * 60)) + .time_to_idle(Duration::from_secs(5 * 60)) + .build(), + } + } + pub async fn get_client(&self, datanode: &Peer) -> Client { self.clients .get_with_by_ref(datanode, async move { diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs index d6a17341a5..c563f5d078 100644 --- a/src/common/grpc/src/channel_manager.rs +++ b/src/common/grpc/src/channel_manager.rs @@ -29,8 +29,8 @@ use tower::make::MakeConnection; use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, InvalidTlsConfigSnafu, Result}; const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60; -const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 10; -const DEFAULT_CONNECT_TIMEOUT_SECS: u64 = 10; +pub const DEFAULT_GRPC_REQUEST_TIMEOUT_SECS: u64 = 10; +pub const DEFAULT_GRPC_CONNECT_TIMEOUT_SECS: u64 = 10; lazy_static! { static ref ID: AtomicU64 = AtomicU64::new(0); @@ -252,8 +252,8 @@ pub struct ChannelConfig { impl Default for ChannelConfig { fn default() -> Self { Self { - timeout: Some(Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS)), - connect_timeout: Some(Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS)), + timeout: Some(Duration::from_secs(DEFAULT_GRPC_REQUEST_TIMEOUT_SECS)), + connect_timeout: Some(Duration::from_secs(DEFAULT_GRPC_CONNECT_TIMEOUT_SECS)), concurrency_limit: None, rate_limit: None, initial_stream_window_size: None, @@ -516,8 +516,8 @@ mod tests { let default_cfg = ChannelConfig::new(); assert_eq!( ChannelConfig { - timeout: Some(Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS)), - connect_timeout: Some(Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS)), + timeout: Some(Duration::from_secs(DEFAULT_GRPC_REQUEST_TIMEOUT_SECS)), + connect_timeout: Some(Duration::from_secs(DEFAULT_GRPC_CONNECT_TIMEOUT_SECS)), concurrency_limit: None, rate_limit: None, initial_stream_window_size: None, diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index ff70c1922d..e81c1c632e 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use api::v1::meta::Peer; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_grpc::channel_manager; use common_meta::key::TableMetadataManagerRef; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; @@ -55,6 +56,7 @@ pub struct MetaSrvOptions { pub http_opts: HttpOptions, pub logging: LoggingOptions, pub procedure: ProcedureConfig, + pub datanode: DatanodeOptions, } impl Default for MetaSrvOptions { @@ -70,6 +72,7 @@ impl Default for MetaSrvOptions { http_opts: HttpOptions::default(), logging: LoggingOptions::default(), procedure: ProcedureConfig::default(), + datanode: DatanodeOptions::default(), } } } @@ -80,6 +83,30 @@ impl MetaSrvOptions { } } +// Options for datanode. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +pub struct DatanodeOptions { + client_options: DatanodeClientOptions, +} + +// Options for datanode client. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct DatanodeClientOptions { + pub timeout_millis: u64, + pub connect_timeout_millis: u64, + pub tcp_nodelay: bool, +} + +impl Default for DatanodeClientOptions { + fn default() -> Self { + Self { + timeout_millis: channel_manager::DEFAULT_GRPC_REQUEST_TIMEOUT_SECS * 1000, + connect_timeout_millis: channel_manager::DEFAULT_GRPC_CONNECT_TIMEOUT_SECS * 1000, + tcp_nodelay: true, + } + } +} + #[derive(Clone)] pub struct Context { pub server_addr: String, diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index c7991780b5..7b857df16c 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -14,8 +14,10 @@ use std::sync::atomic::AtomicBool; use std::sync::Arc; +use std::time::Duration; use client::client_manager::DatanodeClients; +use common_grpc::channel_manager::ChannelConfig; use common_meta::key::TableMetadataManager; use common_procedure::local::{LocalManager, ManagerConfig}; @@ -178,11 +180,23 @@ impl MetaSrvBuilder { kv_store.clone(), ))); + let datanode_clients = datanode_clients.unwrap_or_else(|| { + let datanode_client_channel_config = ChannelConfig::new() + .timeout(Duration::from_millis( + options.datanode.client_options.timeout_millis, + )) + .connect_timeout(Duration::from_millis( + options.datanode.client_options.connect_timeout_millis, + )) + .tcp_nodelay(options.datanode.client_options.tcp_nodelay); + Arc::new(DatanodeClients::new(datanode_client_channel_config)) + }); + // TODO(weny): considers to modify the default config of procedure manager let ddl_manager = Arc::new(DdlManager::new( procedure_manager.clone(), kv_store.clone(), - datanode_clients.unwrap_or_else(|| Arc::new(DatanodeClients::default())), + datanode_clients, mailbox.clone(), options.server_addr.clone(), table_metadata_manager.clone(),