move discovery request mechanism into that type as well

Can't move the policy when we send disovery mechanism because that's
tied to connection_manager loop state.
This commit is contained in:
Christian Schwarz
2025-05-04 16:50:23 +02:00
parent 6380c9674c
commit 35dbbbaf60
3 changed files with 58 additions and 51 deletions

View File

@@ -423,11 +423,12 @@ fn start_pageserver(
.map(storage_broker::Certificate::from_pem),
);
// Note: we do not attempt connecting here (but validate endpoints sanity).
storage_broker::connect(
let service_client = storage_broker::connect(
conf.broker_endpoint.clone(),
conf.broker_keepalive_interval,
tls_config,
)
)?;
anyhow::Ok(storage_broker::TimelineUpdatesSubscriber::new(service_client))
})
.with_context(|| {
format!(

View File

@@ -20,10 +20,7 @@ use chrono::{NaiveDateTime, Utc};
use futures::StreamExt;
use pageserver_api::models::TimelineState;
use postgres_connection::PgConnectionConfig;
use storage_broker::proto::{
MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
TenantTimelineId as ProtoTenantTimelineId, TypedMessage,
};
use storage_broker::proto::SafekeeperDiscoveryResponse;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::{NodeId, TenantTimelineId};
@@ -77,11 +74,6 @@ pub(super) async fn connection_manager_loop_step(
WALRECEIVER_ACTIVE_MANAGERS.dec();
}
let id: TenantTimelineId = TenantTimelineId {
tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id,
timeline_id: connection_manager_state.timeline.timeline_id,
};
let mut timeline_state_updates = connection_manager_state
.timeline
.subscribe_for_state_updates();
@@ -97,11 +89,12 @@ pub(super) async fn connection_manager_loop_step(
// Subscribe to the broker updates. Stream shares underlying TCP connection
// with other streams on this client (other connection managers). When
// object goes out of scope, stream finishes in drop() automatically.
let mut timeline_updates = Box::pin(broker_client.subscribe(
let (timeline_updates, mut discovery_requester) = broker_client.subscribe(
connection_manager_state.timeline.tenant_shard_id,
connection_manager_state.timeline.timeline_id,
cancel,
));
);
let mut timeline_updates = Box::pin(timeline_updates);
debug!("Subscribed for broker timeline updates");
loop {
@@ -155,8 +148,9 @@ pub(super) async fn connection_manager_loop_step(
}
},
// Got a new update from the broker
timeline_update = timeline_updates.next() => {
// Got a new update from the broker.
// The stream ends with None if and only if `cancel` is cancelled.
Some(timeline_update) = timeline_updates.next() => {
connection_manager_state.register_timeline_update(timeline_update)
},
@@ -238,32 +232,11 @@ pub(super) async fn connection_manager_loop_step(
tokio::time::sleep(next_discovery_ts - now).await;
}
let tenant_timeline_id = Some(ProtoTenantTimelineId {
tenant_id: id.tenant_id.as_ref().to_owned(),
timeline_id: id.timeline_id.as_ref().to_owned(),
});
let request = SafekeeperDiscoveryRequest { tenant_timeline_id };
let msg = TypedMessage {
r#type: MessageType::SafekeeperDiscoveryRequest as i32,
safekeeper_timeline_info: None,
safekeeper_discovery_request: Some(request),
safekeeper_discovery_response: None,
};
info!("No active connection and no candidates, sending discovery request to the broker");
discovery_requester.request().await;
last_discovery_ts = Some(std::time::Instant::now());
info!("No active connection and no candidates, sending discovery request to the broker");
// Cancellation safety: we want to send a message to the broker, but publish_one()
// function can get cancelled by the other select! arm. This is absolutely fine, because
// we just want to receive broker updates and discovery is not important if we already
// receive updates.
//
// It is possible that `last_discovery_ts` will be updated, but the message will not be sent.
// This is totally fine because of the reason above.
// This is a fire-and-forget request, we don't care about the response
let _ = broker_client.publish_one(msg).await;
debug!("Discovery request sent to the broker");
None
} => {}
}

View File

@@ -5,7 +5,7 @@ use proto::TenantTimelineId as ProtoTenantTimelineId;
use tokio_util::sync::CancellationToken;
use tonic::Status;
use tonic::transport::Endpoint;
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};
use utils::backoff::{
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff,
};
@@ -50,8 +50,9 @@ pub struct TimelineShardUpdate {
pub inner: proto::SafekeeperDiscoveryResponse,
}
pub enum SubscriberError {
Cancelled,
pub struct DiscoveryRequester {
id: ProtoTenantTimelineId,
client: proto::broker_service_client::BrokerServiceClient<tonic::transport::Channel>,
}
impl TimelineUpdatesSubscriber {
@@ -65,8 +66,16 @@ impl TimelineUpdatesSubscriber {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
cancel: &CancellationToken,
) -> impl Stream<Item = Result<TimelineShardUpdate, SubscriberError>> {
async_stream::stream! {
) -> (impl Stream<Item = TimelineShardUpdate>, DiscoveryRequester) {
let id = ProtoTenantTimelineId {
tenant_id: tenant_shard_id.tenant_id.as_ref().to_owned(),
timeline_id: timeline_id.as_ref().to_owned(),
};
let discovery_requester = DiscoveryRequester {
id: id.clone(),
client: self.client.clone(),
};
let stream = async_stream::stream! {
let mut attempt = 0;
'resubscribe: loop {
exponential_backoff(
@@ -91,16 +100,13 @@ impl TimelineUpdatesSubscriber {
],
tenant_timeline_id: Some(FilterTenantTimelineId {
enabled: true,
tenant_timeline_id: Some(ProtoTenantTimelineId {
tenant_id: tenant_shard_id.tenant_id.as_ref().to_owned(),
timeline_id: timeline_id.as_ref().to_owned(),
}),
tenant_timeline_id: Some(id.clone()),
}),
};
let res = tokio::select! {
r = self.client.subscribe_by_filter(request) => { r }
_ = cancel.cancelled() => { yield Err(SubscriberError::Cancelled); return; }
_ = cancel.cancelled() => { return; }
};
let mut update_stream = match res
{
@@ -119,7 +125,7 @@ impl TimelineUpdatesSubscriber {
loop {
let broker_update = tokio::select!{
_ = cancel.cancelled() => {
yield Err(SubscriberError::Cancelled); return;
return;
}
update = update_stream.message() => { update }
};
@@ -161,7 +167,7 @@ impl TimelineUpdatesSubscriber {
}
};
attempt = 0; // reset backoff iff we received a valid update
yield Ok(TimelineShardUpdate{is_discovery, inner: timeline_update });
yield TimelineShardUpdate{is_discovery, inner: timeline_update };
},
Err(status) => {
match status.code() {
@@ -184,7 +190,34 @@ impl TimelineUpdatesSubscriber {
}
}
}
}
};
(stream, discovery_requester)
}
}
impl DiscoveryRequester {
pub async fn request(&mut self) {
let request = proto::SafekeeperDiscoveryRequest {
tenant_timeline_id: Some(self.id.clone()),
};
let msg = proto::TypedMessage {
r#type: proto::MessageType::SafekeeperDiscoveryRequest as i32,
safekeeper_timeline_info: None,
safekeeper_discovery_request: Some(request),
safekeeper_discovery_response: None,
};
// Cancellation safety: we want to send a message to the broker, but publish_one()
// function can get cancelled by the other select! arm. This is absolutely fine, because
// we just want to receive broker updates and discovery is not important if we already
// receive updates.
//
// It is possible that `last_discovery_ts` will be updated, but the message will not be sent.
// This is totally fine because of the reason above.
// This is a fire-and-forget request, we don't care about the response
let _ = self.client.publish_one(msg).await;
debug!("Discovery request sent to the broker");
}
}