From 35dbbbaf60d634914fd8b781f516a21cbdbd7856 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sun, 4 May 2025 16:50:23 +0200 Subject: [PATCH] move discovery request mechanism into that type as well Can't move the policy when we send disovery mechanism because that's tied to connection_manager loop state. --- pageserver/src/bin/pageserver.rs | 5 +- .../walreceiver/connection_manager.rs | 45 +++----------- storage_broker/src/lib.rs | 59 +++++++++++++++---- 3 files changed, 58 insertions(+), 51 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 4c2572a577..4affa199f8 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -423,11 +423,12 @@ fn start_pageserver( .map(storage_broker::Certificate::from_pem), ); // Note: we do not attempt connecting here (but validate endpoints sanity). - storage_broker::connect( + let service_client = storage_broker::connect( conf.broker_endpoint.clone(), conf.broker_keepalive_interval, tls_config, - ) + )?; + anyhow::Ok(storage_broker::TimelineUpdatesSubscriber::new(service_client)) }) .with_context(|| { format!( diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index bc58736f6f..51f6d686f8 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -20,10 +20,7 @@ use chrono::{NaiveDateTime, Utc}; use futures::StreamExt; use pageserver_api::models::TimelineState; use postgres_connection::PgConnectionConfig; -use storage_broker::proto::{ - MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse, - TenantTimelineId as ProtoTenantTimelineId, TypedMessage, -}; +use storage_broker::proto::SafekeeperDiscoveryResponse; use tokio_util::sync::CancellationToken; use tracing::*; use utils::id::{NodeId, TenantTimelineId}; @@ -77,11 +74,6 @@ pub(super) async fn connection_manager_loop_step( WALRECEIVER_ACTIVE_MANAGERS.dec(); } - let id: TenantTimelineId = TenantTimelineId { - tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id, - timeline_id: connection_manager_state.timeline.timeline_id, - }; - let mut timeline_state_updates = connection_manager_state .timeline .subscribe_for_state_updates(); @@ -97,11 +89,12 @@ pub(super) async fn connection_manager_loop_step( // Subscribe to the broker updates. Stream shares underlying TCP connection // with other streams on this client (other connection managers). When // object goes out of scope, stream finishes in drop() automatically. - let mut timeline_updates = Box::pin(broker_client.subscribe( + let (timeline_updates, mut discovery_requester) = broker_client.subscribe( connection_manager_state.timeline.tenant_shard_id, connection_manager_state.timeline.timeline_id, cancel, - )); + ); + let mut timeline_updates = Box::pin(timeline_updates); debug!("Subscribed for broker timeline updates"); loop { @@ -155,8 +148,9 @@ pub(super) async fn connection_manager_loop_step( } }, - // Got a new update from the broker - timeline_update = timeline_updates.next() => { + // Got a new update from the broker. + // The stream ends with None if and only if `cancel` is cancelled. + Some(timeline_update) = timeline_updates.next() => { connection_manager_state.register_timeline_update(timeline_update) }, @@ -238,32 +232,11 @@ pub(super) async fn connection_manager_loop_step( tokio::time::sleep(next_discovery_ts - now).await; } - let tenant_timeline_id = Some(ProtoTenantTimelineId { - tenant_id: id.tenant_id.as_ref().to_owned(), - timeline_id: id.timeline_id.as_ref().to_owned(), - }); - let request = SafekeeperDiscoveryRequest { tenant_timeline_id }; - let msg = TypedMessage { - r#type: MessageType::SafekeeperDiscoveryRequest as i32, - safekeeper_timeline_info: None, - safekeeper_discovery_request: Some(request), - safekeeper_discovery_response: None, - }; + info!("No active connection and no candidates, sending discovery request to the broker"); + discovery_requester.request().await; last_discovery_ts = Some(std::time::Instant::now()); - info!("No active connection and no candidates, sending discovery request to the broker"); - // Cancellation safety: we want to send a message to the broker, but publish_one() - // function can get cancelled by the other select! arm. This is absolutely fine, because - // we just want to receive broker updates and discovery is not important if we already - // receive updates. - // - // It is possible that `last_discovery_ts` will be updated, but the message will not be sent. - // This is totally fine because of the reason above. - - // This is a fire-and-forget request, we don't care about the response - let _ = broker_client.publish_one(msg).await; - debug!("Discovery request sent to the broker"); None } => {} } diff --git a/storage_broker/src/lib.rs b/storage_broker/src/lib.rs index c746df9f74..e46133cad7 100644 --- a/storage_broker/src/lib.rs +++ b/storage_broker/src/lib.rs @@ -5,7 +5,7 @@ use proto::TenantTimelineId as ProtoTenantTimelineId; use tokio_util::sync::CancellationToken; use tonic::Status; use tonic::transport::Endpoint; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; use utils::backoff::{ DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff, }; @@ -50,8 +50,9 @@ pub struct TimelineShardUpdate { pub inner: proto::SafekeeperDiscoveryResponse, } -pub enum SubscriberError { - Cancelled, +pub struct DiscoveryRequester { + id: ProtoTenantTimelineId, + client: proto::broker_service_client::BrokerServiceClient, } impl TimelineUpdatesSubscriber { @@ -65,8 +66,16 @@ impl TimelineUpdatesSubscriber { tenant_shard_id: TenantShardId, timeline_id: TimelineId, cancel: &CancellationToken, - ) -> impl Stream> { - async_stream::stream! { + ) -> (impl Stream, DiscoveryRequester) { + let id = ProtoTenantTimelineId { + tenant_id: tenant_shard_id.tenant_id.as_ref().to_owned(), + timeline_id: timeline_id.as_ref().to_owned(), + }; + let discovery_requester = DiscoveryRequester { + id: id.clone(), + client: self.client.clone(), + }; + let stream = async_stream::stream! { let mut attempt = 0; 'resubscribe: loop { exponential_backoff( @@ -91,16 +100,13 @@ impl TimelineUpdatesSubscriber { ], tenant_timeline_id: Some(FilterTenantTimelineId { enabled: true, - tenant_timeline_id: Some(ProtoTenantTimelineId { - tenant_id: tenant_shard_id.tenant_id.as_ref().to_owned(), - timeline_id: timeline_id.as_ref().to_owned(), - }), + tenant_timeline_id: Some(id.clone()), }), }; let res = tokio::select! { r = self.client.subscribe_by_filter(request) => { r } - _ = cancel.cancelled() => { yield Err(SubscriberError::Cancelled); return; } + _ = cancel.cancelled() => { return; } }; let mut update_stream = match res { @@ -119,7 +125,7 @@ impl TimelineUpdatesSubscriber { loop { let broker_update = tokio::select!{ _ = cancel.cancelled() => { - yield Err(SubscriberError::Cancelled); return; + return; } update = update_stream.message() => { update } }; @@ -161,7 +167,7 @@ impl TimelineUpdatesSubscriber { } }; attempt = 0; // reset backoff iff we received a valid update - yield Ok(TimelineShardUpdate{is_discovery, inner: timeline_update }); + yield TimelineShardUpdate{is_discovery, inner: timeline_update }; }, Err(status) => { match status.code() { @@ -184,7 +190,34 @@ impl TimelineUpdatesSubscriber { } } } - } + }; + (stream, discovery_requester) + } +} + +impl DiscoveryRequester { + pub async fn request(&mut self) { + let request = proto::SafekeeperDiscoveryRequest { + tenant_timeline_id: Some(self.id.clone()), + }; + let msg = proto::TypedMessage { + r#type: proto::MessageType::SafekeeperDiscoveryRequest as i32, + safekeeper_timeline_info: None, + safekeeper_discovery_request: Some(request), + safekeeper_discovery_response: None, + }; + + // Cancellation safety: we want to send a message to the broker, but publish_one() + // function can get cancelled by the other select! arm. This is absolutely fine, because + // we just want to receive broker updates and discovery is not important if we already + // receive updates. + // + // It is possible that `last_discovery_ts` will be updated, but the message will not be sent. + // This is totally fine because of the reason above. + + // This is a fire-and-forget request, we don't care about the response + let _ = self.client.publish_one(msg).await; + debug!("Discovery request sent to the broker"); } }