From a21b55fe0b3c2ee34b2adcf7930b2233a8a47ab4 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Fri, 9 Jun 2023 17:38:53 +0300 Subject: [PATCH] Use connect_timeout for broker::connect (#4452) Use `storage_broker::connect` everywhere. Add a default 5 seconds timeout for opening new connection. --- safekeeper/src/broker.rs | 5 +++-- storage_broker/src/lib.rs | 4 +++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 48c56ee58f..3282afc72d 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -8,7 +8,7 @@ use anyhow::Error; use anyhow::Result; use storage_broker::parse_proto_ttid; -use storage_broker::proto::broker_service_client::BrokerServiceClient; + use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey; use storage_broker::proto::SubscribeSafekeeperInfoRequest; use storage_broker::Request; @@ -45,7 +45,8 @@ pub fn thread_main(conf: SafeKeeperConf) { /// Push once in a while data about all active timelines to the broker. async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { - let mut client = BrokerServiceClient::connect(conf.broker_endpoint.clone()).await?; + let mut client = + storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?; let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC); let outbound = async_stream::stream! { diff --git a/storage_broker/src/lib.rs b/storage_broker/src/lib.rs index 4bc561449d..3f6fa35cbe 100644 --- a/storage_broker/src/lib.rs +++ b/storage_broker/src/lib.rs @@ -32,6 +32,7 @@ pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051"; pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}"); pub const DEFAULT_KEEPALIVE_INTERVAL: &str = "5000 ms"; +pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000); // BrokerServiceClient charged with tonic provided Channel transport; helps to // avoid depending on tonic directly in user crates. @@ -58,7 +59,8 @@ where } tonic_endpoint = tonic_endpoint .http2_keep_alive_interval(keepalive_interval) - .keep_alive_while_idle(true); + .keep_alive_while_idle(true) + .connect_timeout(DEFAULT_CONNECT_TIMEOUT); // keep_alive_timeout is 20s by default on both client and server side let channel = tonic_endpoint.connect_lazy(); Ok(BrokerClientChannel::new(channel))