diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 4affa199f8..2f2b652c2e 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -428,7 +428,9 @@ fn start_pageserver( conf.broker_keepalive_interval, tls_config, )?; - anyhow::Ok(storage_broker::TimelineUpdatesSubscriber::new(service_client)) + anyhow::Ok(storage_broker::TimelineUpdatesSubscriber::new( + service_client, + )) }) .with_context(|| { format!( diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 3b15cf8d70..41e9d6a6cd 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -50,7 +50,8 @@ async fn push_loop( conf.broker_endpoint.clone(), conf.broker_keepalive_interval, make_tls_config(&conf), - )?; + )? + .into_raw_grpc_client(); let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC); let outbound = async_stream::stream! { @@ -97,7 +98,8 @@ async fn pull_loop( conf.broker_endpoint.clone(), conf.broker_keepalive_interval, make_tls_config(&conf), - )?; + )? + .into_raw_grpc_client(); // TODO: subscribe only to local timelines instead of all let request = SubscribeSafekeeperInfoRequest { @@ -153,7 +155,8 @@ async fn discover_loop( conf.broker_endpoint.clone(), conf.broker_keepalive_interval, make_tls_config(&conf), - )?; + )? + .into_raw_grpc_client(); let request = SubscribeByFilterRequest { types: vec![TypeSubscription { diff --git a/storage_broker/src/lib.rs b/storage_broker/src/lib.rs index e46133cad7..a6edea695d 100644 --- a/storage_broker/src/lib.rs +++ b/storage_broker/src/lib.rs @@ -40,11 +40,18 @@ pub struct TimelineUpdatesSubscriber { } /// Wrapper type to weed out all places in the codebase that interact directly with the gRPC generated code. -/// We want all to go through the facade structs above so we can implement brokerless mode in the future. pub struct BrokerClientChannel { client: proto::broker_service_client::BrokerServiceClient, } +impl BrokerClientChannel { + pub fn into_raw_grpc_client( + self, + ) -> proto::broker_service_client::BrokerServiceClient { + self.client + } +} + pub struct TimelineShardUpdate { pub is_discovery: bool, pub inner: proto::SafekeeperDiscoveryResponse,