move subscription code into new client struct

This commit is contained in:
Christian Schwarz
2025-05-04 16:22:46 +02:00
parent 1f53688189
commit 6380c9674c
9 changed files with 196 additions and 159 deletions

1
Cargo.lock generated
View File

@@ -6634,6 +6634,7 @@ dependencies = [
"rustls 0.23.18",
"tokio",
"tokio-rustls 0.26.0",
"tokio-util",
"tonic",
"tonic-build",
"tracing",

View File

@@ -100,7 +100,7 @@ pub struct State {
auth: Option<Arc<SwappableJwtAuth>>,
allowlist_routes: &'static [&'static str],
remote_storage: GenericRemoteStorage,
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
@@ -114,7 +114,7 @@ impl State {
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
remote_storage: GenericRemoteStorage,
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,

View File

@@ -48,7 +48,6 @@ use remote_timeline_client::{
download_tenant_manifest,
};
use secondary::heatmap::{HeatMapTenant, HeatMapTimeline};
use storage_broker::BrokerClientChannel;
use timeline::compaction::{CompactionOutcome, GcCompactionQueue};
use timeline::offload::{OffloadError, offload_timeline};
use timeline::{
@@ -152,7 +151,7 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
/// as the shared remote storage client and process initialization state.
#[derive(Clone)]
pub struct TenantSharedResources {
pub broker_client: storage_broker::BrokerClientChannel,
pub broker_client: storage_broker::TimelineUpdatesSubscriber,
pub remote_storage: GenericRemoteStorage,
pub deletion_queue_client: DeletionQueueClient,
pub l0_flush_global_state: L0FlushGlobalState,
@@ -2122,7 +2121,7 @@ impl TenantShard {
async fn unoffload_timeline(
self: &Arc<Self>,
timeline_id: TimelineId,
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
ctx: RequestContext,
) -> Result<Arc<Timeline>, TimelineArchivalError> {
info!("unoffloading timeline");
@@ -2257,7 +2256,7 @@ impl TenantShard {
self: &Arc<Self>,
timeline_id: TimelineId,
new_state: TimelineArchivalState,
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
ctx: RequestContext,
) -> Result<(), TimelineArchivalError> {
info!("setting timeline archival config");
@@ -2586,7 +2585,7 @@ impl TenantShard {
pub(crate) async fn create_timeline(
self: &Arc<TenantShard>,
params: CreateTimelineParams,
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
ctx: &RequestContext,
) -> Result<Arc<Timeline>, CreateTimelineError> {
if !self.is_active() {
@@ -3321,7 +3320,7 @@ impl TenantShard {
/// to delay background jobs. Background jobs can be started right away when None is given.
fn activate(
self: &Arc<Self>,
broker_client: BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
) {
@@ -3951,7 +3950,7 @@ where
enum ActivateTimelineArgs {
Yes {
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
},
No,
}

View File

@@ -61,7 +61,6 @@ use postgres_ffi::{WAL_SEGMENT_SIZE, to_pg_timestamp};
use rand::Rng;
use remote_storage::DownloadError;
use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
use tokio::runtime::Handle;
use tokio::sync::mpsc::Sender;
use tokio::sync::{Notify, oneshot, watch};
@@ -2070,7 +2069,7 @@ impl Timeline {
pub(crate) fn activate(
self: &Arc<Self>,
parent: Arc<crate::tenant::TenantShard>,
broker_client: BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
) {
@@ -3112,7 +3111,7 @@ impl Timeline {
fn launch_wal_receiver(
self: &Arc<Self>,
ctx: &RequestContext,
broker_client: BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
) {
info!(
"launching WAL receiver for timeline {} of tenant {}",

View File

@@ -161,7 +161,7 @@ impl<'t> UninitializedTimeline<'t> {
tenant: Arc<TenantShard>,
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn,
broker_client: storage_broker::BrokerClientChannel,
broker_client: storage_broker::TimelineUpdatesSubscriber,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
self.write(|raw_timeline| async move {

View File

@@ -28,7 +28,6 @@ use std::num::NonZeroU64;
use std::sync::Arc;
use std::time::Duration;
use storage_broker::BrokerClientChannel;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -69,7 +68,7 @@ impl WalReceiver {
pub fn start(
timeline: Arc<Timeline>,
conf: WalReceiverConf,
mut broker_client: BrokerClientChannel,
mut broker_client: storage_broker::TimelineUpdatesSubscriber,
ctx: &RequestContext,
) -> Self {
let tenant_shard_id = timeline.tenant_shard_id;

View File

@@ -17,19 +17,15 @@ use std::time::Duration;
use anyhow::Context;
use chrono::{NaiveDateTime, Utc};
use futures::StreamExt;
use pageserver_api::models::TimelineState;
use postgres_connection::PgConnectionConfig;
use storage_broker::proto::{
FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
SubscribeByFilterRequest, TenantTimelineId as ProtoTenantTimelineId, TypeSubscription,
TypedMessage,
MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
TenantTimelineId as ProtoTenantTimelineId, TypedMessage,
};
use storage_broker::{BrokerClientChannel, Code, Streaming};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::backoff::{
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff,
};
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
use utils::postgres_client::{
@@ -56,7 +52,7 @@ pub(crate) struct Cancelled;
///
/// Not cancellation-safe. Use `cancel` token to request cancellation.
pub(super) async fn connection_manager_loop_step(
broker_client: &mut BrokerClientChannel,
broker_client: &mut storage_broker::TimelineUpdatesSubscriber,
connection_manager_state: &mut ConnectionManagerState,
ctx: &RequestContext,
cancel: &CancellationToken,
@@ -81,7 +77,7 @@ pub(super) async fn connection_manager_loop_step(
WALRECEIVER_ACTIVE_MANAGERS.dec();
}
let id = TenantTimelineId {
let id: TenantTimelineId = TenantTimelineId {
tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id,
timeline_id: connection_manager_state.timeline.timeline_id,
};
@@ -101,7 +97,11 @@ pub(super) async fn connection_manager_loop_step(
// Subscribe to the broker updates. Stream shares underlying TCP connection
// with other streams on this client (other connection managers). When
// object goes out of scope, stream finishes in drop() automatically.
let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
let mut timeline_updates = Box::pin(broker_client.subscribe(
connection_manager_state.timeline.tenant_shard_id,
connection_manager_state.timeline.timeline_id,
cancel,
));
debug!("Subscribed for broker timeline updates");
loop {
@@ -156,28 +156,8 @@ pub(super) async fn connection_manager_loop_step(
},
// Got a new update from the broker
broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => {
match broker_update {
Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
Err(status) => {
match status.code() {
Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => {
// tonic's error handling doesn't provide a clear code for disconnections: we get
// "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
// => https://github.com/neondatabase/neon/issues/9562
info!("broker disconnected: {status}");
},
_ => {
warn!("broker subscription failed: {status}");
}
}
return Ok(());
}
Ok(None) => {
error!("broker subscription stream ended"); // can't happen
return Ok(());
}
}
timeline_update = timeline_updates.next() => {
connection_manager_state.register_timeline_update(timeline_update)
},
new_event = async {
@@ -298,63 +278,6 @@ pub(super) async fn connection_manager_loop_step(
}
}
/// Endlessly try to subscribe for broker updates for a given timeline.
async fn subscribe_for_timeline_updates(
broker_client: &mut BrokerClientChannel,
id: TenantTimelineId,
cancel: &CancellationToken,
) -> Result<Streaming<TypedMessage>, Cancelled> {
let mut attempt = 0;
loop {
exponential_backoff(
attempt,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
cancel,
)
.await;
attempt += 1;
// subscribe to the specific timeline
let request = SubscribeByFilterRequest {
types: vec![
TypeSubscription {
r#type: MessageType::SafekeeperTimelineInfo as i32,
},
TypeSubscription {
r#type: MessageType::SafekeeperDiscoveryResponse as i32,
},
],
tenant_timeline_id: Some(FilterTenantTimelineId {
enabled: true,
tenant_timeline_id: Some(ProtoTenantTimelineId {
tenant_id: id.tenant_id.as_ref().to_owned(),
timeline_id: id.timeline_id.as_ref().to_owned(),
}),
}),
};
match {
tokio::select! {
r = broker_client.subscribe_by_filter(request) => { r }
_ = cancel.cancelled() => { return Err(Cancelled); }
}
} {
Ok(resp) => {
return Ok(resp.into_inner());
}
Err(e) => {
// Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
// entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
info!(
"Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}"
);
continue;
}
}
}
}
const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
@@ -695,44 +618,14 @@ impl ConnectionManagerState {
}
/// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
fn register_timeline_update(&mut self, typed_msg: TypedMessage) {
let mut is_discovery = false;
let timeline_update = match typed_msg.r#type() {
MessageType::SafekeeperTimelineInfo => {
let info = match typed_msg.safekeeper_timeline_info {
Some(info) => info,
None => {
warn!("bad proto message from broker: no safekeeper_timeline_info");
return;
}
};
SafekeeperDiscoveryResponse {
safekeeper_id: info.safekeeper_id,
tenant_timeline_id: info.tenant_timeline_id,
commit_lsn: info.commit_lsn,
safekeeper_connstr: info.safekeeper_connstr,
availability_zone: info.availability_zone,
standby_horizon: info.standby_horizon,
}
}
MessageType::SafekeeperDiscoveryResponse => {
is_discovery = true;
match typed_msg.safekeeper_discovery_response {
Some(response) => response,
None => {
warn!("bad proto message from broker: no safekeeper_discovery_response");
return;
}
}
}
_ => {
// unexpected message
return;
}
};
fn register_timeline_update(&mut self, timeline_update: storage_broker::TimelineShardUpdate) {
WALRECEIVER_BROKER_UPDATES.inc();
let storage_broker::TimelineShardUpdate {
is_discovery,
inner: timeline_update,
} = timeline_update;
trace!(
"safekeeper info update: standby_horizon(cutoff)={}",
timeline_update.standby_horizon

View File

@@ -27,6 +27,7 @@ parking_lot.workspace = true
prost.workspace = true
tonic.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-util.workspace = true
tokio-rustls.workspace = true
tracing.workspace = true
metrics.workspace = true

View File

@@ -1,10 +1,14 @@
use std::time::Duration;
use futures::Stream;
use proto::TenantTimelineId as ProtoTenantTimelineId;
use proto::broker_service_client::BrokerServiceClient;
use tokio_util::sync::CancellationToken;
use tonic::Status;
use tonic::codegen::StdError;
use tonic::transport::{Channel, Endpoint};
use tonic::transport::Endpoint;
use tracing::{error, info, warn};
use utils::backoff::{
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff,
};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
// Code generated by protobuf.
@@ -22,6 +26,7 @@ pub mod metrics;
pub use hyper::Uri;
pub use tonic::transport::{Certificate, ClientTlsConfig};
pub use tonic::{Code, Request, Streaming};
use utils::shard::TenantShardId;
pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}");
@@ -29,9 +34,159 @@ pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LIST
pub const DEFAULT_KEEPALIVE_INTERVAL: &str = "5000 ms";
pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000);
// BrokerServiceClient charged with tonic provided Channel transport; helps to
// avoid depending on tonic directly in user crates.
pub type BrokerClientChannel = BrokerServiceClient<Channel>;
#[derive(Clone)]
pub struct TimelineUpdatesSubscriber {
client: proto::broker_service_client::BrokerServiceClient<tonic::transport::Channel>,
}
/// Wrapper type to weed out all places in the codebase that interact directly with the gRPC generated code.
/// We want all to go through the facade structs above so we can implement brokerless mode in the future.
pub struct BrokerClientChannel {
client: proto::broker_service_client::BrokerServiceClient<tonic::transport::Channel>,
}
pub struct TimelineShardUpdate {
pub is_discovery: bool,
pub inner: proto::SafekeeperDiscoveryResponse,
}
pub enum SubscriberError {
Cancelled,
}
impl TimelineUpdatesSubscriber {
pub fn new(service_client: BrokerClientChannel) -> Self {
Self {
client: service_client.client.clone(),
}
}
pub fn subscribe(
&mut self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
cancel: &CancellationToken,
) -> impl Stream<Item = Result<TimelineShardUpdate, SubscriberError>> {
async_stream::stream! {
let mut attempt = 0;
'resubscribe: loop {
exponential_backoff(
attempt,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
cancel,
)
.await;
attempt += 1;
use proto::*;
// subscribe to the specific timeline
let request = SubscribeByFilterRequest {
types: vec![
TypeSubscription {
r#type: MessageType::SafekeeperTimelineInfo as i32,
},
TypeSubscription {
r#type: MessageType::SafekeeperDiscoveryResponse as i32,
},
],
tenant_timeline_id: Some(FilterTenantTimelineId {
enabled: true,
tenant_timeline_id: Some(ProtoTenantTimelineId {
tenant_id: tenant_shard_id.tenant_id.as_ref().to_owned(),
timeline_id: timeline_id.as_ref().to_owned(),
}),
}),
};
let res = tokio::select! {
r = self.client.subscribe_by_filter(request) => { r }
_ = cancel.cancelled() => { yield Err(SubscriberError::Cancelled); return; }
};
let mut update_stream = match res
{
Ok(resp) => {
resp.into_inner()
}
Err(e) => {
// Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
// entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
info!(
attempt, "failed to subscribe: {e:#}"
);
continue 'resubscribe;
}
};
loop {
let broker_update = tokio::select!{
_ = cancel.cancelled() => {
yield Err(SubscriberError::Cancelled); return;
}
update = update_stream.message() => { update }
};
match broker_update {
Ok(Some(typed_msg)) => {
let mut is_discovery = false;
let timeline_update = match typed_msg.r#type() {
MessageType::SafekeeperTimelineInfo => {
let info = match typed_msg.safekeeper_timeline_info {
Some(info) => info,
None => {
warn!("bad proto message from broker: no safekeeper_timeline_info");
continue 'resubscribe;
}
};
SafekeeperDiscoveryResponse {
safekeeper_id: info.safekeeper_id,
tenant_timeline_id: info.tenant_timeline_id,
commit_lsn: info.commit_lsn,
safekeeper_connstr: info.safekeeper_connstr,
availability_zone: info.availability_zone,
standby_horizon: info.standby_horizon,
}
}
MessageType::SafekeeperDiscoveryResponse => {
is_discovery = true;
match typed_msg.safekeeper_discovery_response {
Some(response) => response,
None => {
warn!("bad proto message from broker: no safekeeper_discovery_response");
continue 'resubscribe;
}
}
}
_ => {
// unexpected message
warn!("unexpected message from broker: {typed_msg:?}");
continue 'resubscribe;
}
};
attempt = 0; // reset backoff iff we received a valid update
yield Ok(TimelineShardUpdate{is_discovery, inner: timeline_update });
},
Err(status) => {
match status.code() {
Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => {
// tonic's error handling doesn't provide a clear code for disconnections: we get
// "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
// => https://github.com/neondatabase/neon/issues/9562
info!("broker disconnected: {status}");
},
_ => {
warn!("broker subscription failed: {status}");
}
}
continue 'resubscribe;
}
Ok(None) => {
error!("broker subscription stream ended"); // can't happen
continue 'resubscribe;
}
}
}
}
}
}
}
// Create connection object configured to run TLS if schema starts with https://
// and plain text otherwise. Connection is lazy, only endpoint sanity is
@@ -67,19 +222,9 @@ where
.connect_timeout(DEFAULT_CONNECT_TIMEOUT);
// keep_alive_timeout is 20s by default on both client and server side
let channel = tonic_endpoint.connect_lazy();
Ok(BrokerClientChannel::new(channel))
}
impl BrokerClientChannel {
/// Create a new client to the given endpoint, but don't actually connect until the first request.
pub async fn connect_lazy<D>(dst: D) -> Result<Self, tonic::transport::Error>
where
D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let conn = tonic::transport::Endpoint::new(dst)?.connect_lazy();
Ok(Self::new(conn))
}
Ok(BrokerClientChannel {
client: proto::broker_service_client::BrokerServiceClient::new(channel),
})
}
// parse variable length bytes from protobuf