feat: remove timeout in the channel between frontend and datanode (#3962)

* style: change builder pattern

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: remove timeout

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused config

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update docs

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-05-16 22:12:42 +08:00
committed by GitHub
parent f93b5b19f0
commit 4b030456f6
7 changed files with 51 additions and 71 deletions

1
Cargo.lock generated
View File

@@ -1613,6 +1613,7 @@ dependencies = [
"common-catalog",
"common-config",
"common-error",
"common-grpc",
"common-macro",
"common-meta",
"common-procedure",

View File

@@ -186,7 +186,6 @@
| `meta_client.metadata_cache_tti` | String | `5m` | -- |
| `datanode` | -- | -- | Datanode options. |
| `datanode.client` | -- | -- | Datanode client options. |
| `datanode.client.timeout` | String | `10s` | -- |
| `datanode.client.connect_timeout` | String | `10s` | -- |
| `datanode.client.tcp_nodelay` | Bool | `true` | -- |
| `logging` | -- | -- | The logging options. |

View File

@@ -136,7 +136,6 @@ metadata_cache_tti = "5m"
[datanode]
## Datanode client options.
[datanode.client]
timeout = "10s"
connect_timeout = "10s"
tcp_nodelay = true

View File

@@ -28,6 +28,7 @@ common-base.workspace = true
common-catalog.workspace = true
common-config.workspace = true
common-error.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-procedure.workspace = true

View File

@@ -24,6 +24,7 @@ use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, Me
use clap::Parser;
use client::client_manager::DatanodeClients;
use common_config::Configurable;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
@@ -318,11 +319,19 @@ impl StartCommand {
Arc::new(executor),
);
// frontend to datanode need not timeout.
// Some queries are expected to take long time.
let channel_config = ChannelConfig {
timeout: None,
..Default::default()
};
let client = DatanodeClients::new(channel_config);
let mut instance = FrontendBuilder::new(
cached_meta_backend.clone(),
layered_cache_registry.clone(),
catalog_manager,
Arc::new(DatanodeClients::default()),
Arc::new(client),
meta_client,
)
.with_plugin(plugins.clone())

View File

@@ -291,88 +291,68 @@ impl ChannelConfig {
}
/// A timeout to each request.
pub fn timeout(self, timeout: Duration) -> Self {
Self {
timeout: Some(timeout),
..self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
/// A timeout to connecting to the uri.
///
/// Defaults to no timeout.
pub fn connect_timeout(self, timeout: Duration) -> Self {
Self {
connect_timeout: Some(timeout),
..self
}
pub fn connect_timeout(mut self, timeout: Duration) -> Self {
self.connect_timeout = Some(timeout);
self
}
/// A concurrency limit to each request.
pub fn concurrency_limit(self, limit: usize) -> Self {
Self {
concurrency_limit: Some(limit),
..self
}
pub fn concurrency_limit(mut self, limit: usize) -> Self {
self.concurrency_limit = Some(limit);
self
}
/// A rate limit to each request.
pub fn rate_limit(self, limit: u64, duration: Duration) -> Self {
Self {
rate_limit: Some((limit, duration)),
..self
}
pub fn rate_limit(mut self, limit: u64, duration: Duration) -> Self {
self.rate_limit = Some((limit, duration));
self
}
/// Sets the SETTINGS_INITIAL_WINDOW_SIZE option for HTTP2 stream-level flow control.
/// Default is 65,535
pub fn initial_stream_window_size(self, size: u32) -> Self {
Self {
initial_stream_window_size: Some(size),
..self
}
pub fn initial_stream_window_size(mut self, size: u32) -> Self {
self.initial_stream_window_size = Some(size);
self
}
/// Sets the max connection-level flow control for HTTP2
///
/// Default is 65,535
pub fn initial_connection_window_size(self, size: u32) -> Self {
Self {
initial_connection_window_size: Some(size),
..self
}
pub fn initial_connection_window_size(mut self, size: u32) -> Self {
self.initial_connection_window_size = Some(size);
self
}
/// Set http2 KEEP_ALIVE_INTERVAL. Uses hypers default otherwise.
pub fn http2_keep_alive_interval(self, duration: Duration) -> Self {
Self {
http2_keep_alive_interval: Some(duration),
..self
}
pub fn http2_keep_alive_interval(mut self, duration: Duration) -> Self {
self.http2_keep_alive_interval = Some(duration);
self
}
/// Set http2 KEEP_ALIVE_TIMEOUT. Uses hypers default otherwise.
pub fn http2_keep_alive_timeout(self, duration: Duration) -> Self {
Self {
http2_keep_alive_timeout: Some(duration),
..self
}
pub fn http2_keep_alive_timeout(mut self, duration: Duration) -> Self {
self.http2_keep_alive_timeout = Some(duration);
self
}
/// Set http2 KEEP_ALIVE_WHILE_IDLE. Uses hypers default otherwise.
pub fn http2_keep_alive_while_idle(self, enabled: bool) -> Self {
Self {
http2_keep_alive_while_idle: Some(enabled),
..self
}
pub fn http2_keep_alive_while_idle(mut self, enabled: bool) -> Self {
self.http2_keep_alive_while_idle = Some(enabled);
self
}
/// Sets whether to use an adaptive flow control. Uses hypers default otherwise.
pub fn http2_adaptive_window(self, enabled: bool) -> Self {
Self {
http2_adaptive_window: Some(enabled),
..self
}
pub fn http2_adaptive_window(mut self, enabled: bool) -> Self {
self.http2_adaptive_window = Some(enabled);
self
}
/// Set whether TCP keepalive messages are enabled on accepted connections.
@@ -381,31 +361,25 @@ impl ChannelConfig {
/// will be the time to remain idle before sending TCP keepalive probes.
///
/// Default is no keepalive (None)
pub fn tcp_keepalive(self, duration: Duration) -> Self {
Self {
tcp_keepalive: Some(duration),
..self
}
pub fn tcp_keepalive(mut self, duration: Duration) -> Self {
self.tcp_keepalive = Some(duration);
self
}
/// Set the value of TCP_NODELAY option for accepted connections.
///
/// Enabled by default.
pub fn tcp_nodelay(self, enabled: bool) -> Self {
Self {
tcp_nodelay: enabled,
..self
}
pub fn tcp_nodelay(mut self, enabled: bool) -> Self {
self.tcp_nodelay = enabled;
self
}
/// Set the value of tls client auth.
///
/// Disabled by default.
pub fn client_tls_config(self, client_tls_option: ClientTlsOption) -> Self {
Self {
client_tls: Some(client_tls_option),
..self
}
pub fn client_tls_config(mut self, client_tls_option: ClientTlsOption) -> Self {
self.client_tls = Some(client_tls_option);
self
}
}

View File

@@ -24,8 +24,6 @@ pub struct DatanodeOptions {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DatanodeClientOptions {
#[serde(with = "humantime_serde")]
pub timeout: Duration,
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,
pub tcp_nodelay: bool,
@@ -34,7 +32,6 @@ pub struct DatanodeClientOptions {
impl Default for DatanodeClientOptions {
fn default() -> Self {
Self {
timeout: Duration::from_secs(channel_manager::DEFAULT_GRPC_REQUEST_TIMEOUT_SECS),
connect_timeout: Duration::from_secs(
channel_manager::DEFAULT_GRPC_CONNECT_TIMEOUT_SECS,
),