feat: expose metasrv datanode_client_options (#1965)

* feat: expose meta datanode_client_options

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2023-07-15 15:26:24 +09:00
committed by GitHub
parent 8f1241912c
commit d9751268aa
5 changed files with 67 additions and 20 deletions

View File

@@ -21,8 +21,6 @@ use moka::future::{Cache, CacheBuilder};
use crate::Client;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
pub struct DatanodeClients {
channel_manager: ChannelManager,
clients: Cache<Peer, Client>,
@@ -30,17 +28,7 @@ pub struct DatanodeClients {
impl Default for DatanodeClients {
fn default() -> Self {
let config = ChannelConfig::new()
.timeout(DEFAULT_TIMEOUT)
.connect_timeout(DEFAULT_TIMEOUT);
Self {
channel_manager: ChannelManager::with_config(config),
clients: CacheBuilder::new(1024)
.time_to_live(Duration::from_secs(30 * 60))
.time_to_idle(Duration::from_secs(5 * 60))
.build(),
}
Self::new(ChannelConfig::new())
}
}
@@ -53,6 +41,16 @@ impl Debug for DatanodeClients {
}
impl DatanodeClients {
pub fn new(config: ChannelConfig) -> Self {
Self {
channel_manager: ChannelManager::with_config(config),
clients: CacheBuilder::new(1024)
.time_to_live(Duration::from_secs(30 * 60))
.time_to_idle(Duration::from_secs(5 * 60))
.build(),
}
}
pub async fn get_client(&self, datanode: &Peer) -> Client {
self.clients
.get_with_by_ref(datanode, async move {

View File

@@ -29,8 +29,8 @@ use tower::make::MakeConnection;
use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, InvalidTlsConfigSnafu, Result};
const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60;
const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 10;
const DEFAULT_CONNECT_TIMEOUT_SECS: u64 = 10;
pub const DEFAULT_GRPC_REQUEST_TIMEOUT_SECS: u64 = 10;
pub const DEFAULT_GRPC_CONNECT_TIMEOUT_SECS: u64 = 10;
lazy_static! {
static ref ID: AtomicU64 = AtomicU64::new(0);
@@ -252,8 +252,8 @@ pub struct ChannelConfig {
impl Default for ChannelConfig {
fn default() -> Self {
Self {
timeout: Some(Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS)),
connect_timeout: Some(Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS)),
timeout: Some(Duration::from_secs(DEFAULT_GRPC_REQUEST_TIMEOUT_SECS)),
connect_timeout: Some(Duration::from_secs(DEFAULT_GRPC_CONNECT_TIMEOUT_SECS)),
concurrency_limit: None,
rate_limit: None,
initial_stream_window_size: None,
@@ -516,8 +516,8 @@ mod tests {
let default_cfg = ChannelConfig::new();
assert_eq!(
ChannelConfig {
timeout: Some(Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS)),
connect_timeout: Some(Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS)),
timeout: Some(Duration::from_secs(DEFAULT_GRPC_REQUEST_TIMEOUT_SECS)),
connect_timeout: Some(Duration::from_secs(DEFAULT_GRPC_CONNECT_TIMEOUT_SECS)),
concurrency_limit: None,
rate_limit: None,
initial_stream_window_size: None,

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use api::v1::meta::Peer;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_grpc::channel_manager;
use common_meta::key::TableMetadataManagerRef;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
@@ -55,6 +56,7 @@ pub struct MetaSrvOptions {
pub http_opts: HttpOptions,
pub logging: LoggingOptions,
pub procedure: ProcedureConfig,
pub datanode: DatanodeOptions,
}
impl Default for MetaSrvOptions {
@@ -70,6 +72,7 @@ impl Default for MetaSrvOptions {
http_opts: HttpOptions::default(),
logging: LoggingOptions::default(),
procedure: ProcedureConfig::default(),
datanode: DatanodeOptions::default(),
}
}
}
@@ -80,6 +83,30 @@ impl MetaSrvOptions {
}
}
// Options for datanode.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct DatanodeOptions {
client_options: DatanodeClientOptions,
}
// Options for datanode client.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DatanodeClientOptions {
pub timeout_millis: u64,
pub connect_timeout_millis: u64,
pub tcp_nodelay: bool,
}
impl Default for DatanodeClientOptions {
fn default() -> Self {
Self {
timeout_millis: channel_manager::DEFAULT_GRPC_REQUEST_TIMEOUT_SECS * 1000,
connect_timeout_millis: channel_manager::DEFAULT_GRPC_CONNECT_TIMEOUT_SECS * 1000,
tcp_nodelay: true,
}
}
}
#[derive(Clone)]
pub struct Context {
pub server_addr: String,

View File

@@ -14,8 +14,10 @@
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use client::client_manager::DatanodeClients;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::key::TableMetadataManager;
use common_procedure::local::{LocalManager, ManagerConfig};
@@ -178,11 +180,23 @@ impl MetaSrvBuilder {
kv_store.clone(),
)));
let datanode_clients = datanode_clients.unwrap_or_else(|| {
let datanode_client_channel_config = ChannelConfig::new()
.timeout(Duration::from_millis(
options.datanode.client_options.timeout_millis,
))
.connect_timeout(Duration::from_millis(
options.datanode.client_options.connect_timeout_millis,
))
.tcp_nodelay(options.datanode.client_options.tcp_nodelay);
Arc::new(DatanodeClients::new(datanode_client_channel_config))
});
// TODO(weny): considers to modify the default config of procedure manager
let ddl_manager = Arc::new(DdlManager::new(
procedure_manager.clone(),
kv_store.clone(),
datanode_clients.unwrap_or_else(|| Arc::new(DatanodeClients::default())),
datanode_clients,
mailbox.clone(),
options.server_addr.clone(),
table_metadata_manager.clone(),