diff --git a/Cargo.lock b/Cargo.lock index 4c464c62b8..42ecc974fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6634,6 +6634,7 @@ dependencies = [ "rustls 0.23.18", "tokio", "tokio-rustls 0.26.0", + "tokio-util", "tonic", "tonic-build", "tracing", diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 8b6500b020..ee8daa6a80 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -100,7 +100,7 @@ pub struct State { auth: Option>, allowlist_routes: &'static [&'static str], remote_storage: GenericRemoteStorage, - broker_client: storage_broker::BrokerClientChannel, + broker_client: storage_broker::TimelineUpdatesSubscriber, disk_usage_eviction_state: Arc, deletion_queue_client: DeletionQueueClient, secondary_controller: SecondaryController, @@ -114,7 +114,7 @@ impl State { tenant_manager: Arc, auth: Option>, remote_storage: GenericRemoteStorage, - broker_client: storage_broker::BrokerClientChannel, + broker_client: storage_broker::TimelineUpdatesSubscriber, disk_usage_eviction_state: Arc, deletion_queue_client: DeletionQueueClient, secondary_controller: SecondaryController, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index e59db74479..201ca30eb4 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -48,7 +48,6 @@ use remote_timeline_client::{ download_tenant_manifest, }; use secondary::heatmap::{HeatMapTenant, HeatMapTimeline}; -use storage_broker::BrokerClientChannel; use timeline::compaction::{CompactionOutcome, GcCompactionQueue}; use timeline::offload::{OffloadError, offload_timeline}; use timeline::{ @@ -152,7 +151,7 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines"; /// as the shared remote storage client and process initialization state. #[derive(Clone)] pub struct TenantSharedResources { - pub broker_client: storage_broker::BrokerClientChannel, + pub broker_client: storage_broker::TimelineUpdatesSubscriber, pub remote_storage: GenericRemoteStorage, pub deletion_queue_client: DeletionQueueClient, pub l0_flush_global_state: L0FlushGlobalState, @@ -2122,7 +2121,7 @@ impl TenantShard { async fn unoffload_timeline( self: &Arc, timeline_id: TimelineId, - broker_client: storage_broker::BrokerClientChannel, + broker_client: storage_broker::TimelineUpdatesSubscriber, ctx: RequestContext, ) -> Result, TimelineArchivalError> { info!("unoffloading timeline"); @@ -2257,7 +2256,7 @@ impl TenantShard { self: &Arc, timeline_id: TimelineId, new_state: TimelineArchivalState, - broker_client: storage_broker::BrokerClientChannel, + broker_client: storage_broker::TimelineUpdatesSubscriber, ctx: RequestContext, ) -> Result<(), TimelineArchivalError> { info!("setting timeline archival config"); @@ -2586,7 +2585,7 @@ impl TenantShard { pub(crate) async fn create_timeline( self: &Arc, params: CreateTimelineParams, - broker_client: storage_broker::BrokerClientChannel, + broker_client: storage_broker::TimelineUpdatesSubscriber, ctx: &RequestContext, ) -> Result, CreateTimelineError> { if !self.is_active() { @@ -3321,7 +3320,7 @@ impl TenantShard { /// to delay background jobs. Background jobs can be started right away when None is given. fn activate( self: &Arc, - broker_client: BrokerClientChannel, + broker_client: storage_broker::TimelineUpdatesSubscriber, background_jobs_can_start: Option<&completion::Barrier>, ctx: &RequestContext, ) { @@ -3951,7 +3950,7 @@ where enum ActivateTimelineArgs { Yes { - broker_client: storage_broker::BrokerClientChannel, + broker_client: storage_broker::TimelineUpdatesSubscriber, }, No, } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index cfeab77598..8b56b8f6bb 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -61,7 +61,6 @@ use postgres_ffi::{WAL_SEGMENT_SIZE, to_pg_timestamp}; use rand::Rng; use remote_storage::DownloadError; use serde_with::serde_as; -use storage_broker::BrokerClientChannel; use tokio::runtime::Handle; use tokio::sync::mpsc::Sender; use tokio::sync::{Notify, oneshot, watch}; @@ -2070,7 +2069,7 @@ impl Timeline { pub(crate) fn activate( self: &Arc, parent: Arc, - broker_client: BrokerClientChannel, + broker_client: storage_broker::TimelineUpdatesSubscriber, background_jobs_can_start: Option<&completion::Barrier>, ctx: &RequestContext, ) { @@ -3112,7 +3111,7 @@ impl Timeline { fn launch_wal_receiver( self: &Arc, ctx: &RequestContext, - broker_client: BrokerClientChannel, + broker_client: storage_broker::TimelineUpdatesSubscriber, ) { info!( "launching WAL receiver for timeline {} of tenant {}", diff --git a/pageserver/src/tenant/timeline/uninit.rs b/pageserver/src/tenant/timeline/uninit.rs index beebf35462..3da49a1007 100644 --- a/pageserver/src/tenant/timeline/uninit.rs +++ b/pageserver/src/tenant/timeline/uninit.rs @@ -161,7 +161,7 @@ impl<'t> UninitializedTimeline<'t> { tenant: Arc, copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin), base_lsn: Lsn, - broker_client: storage_broker::BrokerClientChannel, + broker_client: storage_broker::TimelineUpdatesSubscriber, ctx: &RequestContext, ) -> anyhow::Result> { self.write(|raw_timeline| async move { diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index 4f80073cc3..645b4eaf2f 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -28,7 +28,6 @@ use std::num::NonZeroU64; use std::sync::Arc; use std::time::Duration; -use storage_broker::BrokerClientChannel; use tokio::sync::watch; use tokio_util::sync::CancellationToken; use tracing::*; @@ -69,7 +68,7 @@ impl WalReceiver { pub fn start( timeline: Arc, conf: WalReceiverConf, - mut broker_client: BrokerClientChannel, + mut broker_client: storage_broker::TimelineUpdatesSubscriber, ctx: &RequestContext, ) -> Self { let tenant_shard_id = timeline.tenant_shard_id; diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 3c3608d1bd..bc58736f6f 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -17,19 +17,15 @@ use std::time::Duration; use anyhow::Context; use chrono::{NaiveDateTime, Utc}; +use futures::StreamExt; use pageserver_api::models::TimelineState; use postgres_connection::PgConnectionConfig; use storage_broker::proto::{ - FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse, - SubscribeByFilterRequest, TenantTimelineId as ProtoTenantTimelineId, TypeSubscription, - TypedMessage, + MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse, + TenantTimelineId as ProtoTenantTimelineId, TypedMessage, }; -use storage_broker::{BrokerClientChannel, Code, Streaming}; use tokio_util::sync::CancellationToken; use tracing::*; -use utils::backoff::{ - DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff, -}; use utils::id::{NodeId, TenantTimelineId}; use utils::lsn::Lsn; use utils::postgres_client::{ @@ -56,7 +52,7 @@ pub(crate) struct Cancelled; /// /// Not cancellation-safe. Use `cancel` token to request cancellation. pub(super) async fn connection_manager_loop_step( - broker_client: &mut BrokerClientChannel, + broker_client: &mut storage_broker::TimelineUpdatesSubscriber, connection_manager_state: &mut ConnectionManagerState, ctx: &RequestContext, cancel: &CancellationToken, @@ -81,7 +77,7 @@ pub(super) async fn connection_manager_loop_step( WALRECEIVER_ACTIVE_MANAGERS.dec(); } - let id = TenantTimelineId { + let id: TenantTimelineId = TenantTimelineId { tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id, timeline_id: connection_manager_state.timeline.timeline_id, }; @@ -101,7 +97,11 @@ pub(super) async fn connection_manager_loop_step( // Subscribe to the broker updates. Stream shares underlying TCP connection // with other streams on this client (other connection managers). When // object goes out of scope, stream finishes in drop() automatically. - let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?; + let mut timeline_updates = Box::pin(broker_client.subscribe( + connection_manager_state.timeline.tenant_shard_id, + connection_manager_state.timeline.timeline_id, + cancel, + )); debug!("Subscribed for broker timeline updates"); loop { @@ -156,28 +156,8 @@ pub(super) async fn connection_manager_loop_step( }, // Got a new update from the broker - broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => { - match broker_update { - Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update), - Err(status) => { - match status.code() { - Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => { - // tonic's error handling doesn't provide a clear code for disconnections: we get - // "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe" - // => https://github.com/neondatabase/neon/issues/9562 - info!("broker disconnected: {status}"); - }, - _ => { - warn!("broker subscription failed: {status}"); - } - } - return Ok(()); - } - Ok(None) => { - error!("broker subscription stream ended"); // can't happen - return Ok(()); - } - } + timeline_update = timeline_updates.next() => { + connection_manager_state.register_timeline_update(timeline_update) }, new_event = async { @@ -298,63 +278,6 @@ pub(super) async fn connection_manager_loop_step( } } -/// Endlessly try to subscribe for broker updates for a given timeline. -async fn subscribe_for_timeline_updates( - broker_client: &mut BrokerClientChannel, - id: TenantTimelineId, - cancel: &CancellationToken, -) -> Result, Cancelled> { - let mut attempt = 0; - loop { - exponential_backoff( - attempt, - DEFAULT_BASE_BACKOFF_SECONDS, - DEFAULT_MAX_BACKOFF_SECONDS, - cancel, - ) - .await; - attempt += 1; - - // subscribe to the specific timeline - let request = SubscribeByFilterRequest { - types: vec![ - TypeSubscription { - r#type: MessageType::SafekeeperTimelineInfo as i32, - }, - TypeSubscription { - r#type: MessageType::SafekeeperDiscoveryResponse as i32, - }, - ], - tenant_timeline_id: Some(FilterTenantTimelineId { - enabled: true, - tenant_timeline_id: Some(ProtoTenantTimelineId { - tenant_id: id.tenant_id.as_ref().to_owned(), - timeline_id: id.timeline_id.as_ref().to_owned(), - }), - }), - }; - - match { - tokio::select! { - r = broker_client.subscribe_by_filter(request) => { r } - _ = cancel.cancelled() => { return Err(Cancelled); } - } - } { - Ok(resp) => { - return Ok(resp.into_inner()); - } - Err(e) => { - // Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and - // entire WAL is streamed. Keep this noticeable with logging, but do not warn/error. - info!( - "Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}" - ); - continue; - } - } - } -} - const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1; const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0; const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5; @@ -695,44 +618,14 @@ impl ConnectionManagerState { } /// Adds another broker timeline into the state, if its more recent than the one already added there for the same key. - fn register_timeline_update(&mut self, typed_msg: TypedMessage) { - let mut is_discovery = false; - let timeline_update = match typed_msg.r#type() { - MessageType::SafekeeperTimelineInfo => { - let info = match typed_msg.safekeeper_timeline_info { - Some(info) => info, - None => { - warn!("bad proto message from broker: no safekeeper_timeline_info"); - return; - } - }; - SafekeeperDiscoveryResponse { - safekeeper_id: info.safekeeper_id, - tenant_timeline_id: info.tenant_timeline_id, - commit_lsn: info.commit_lsn, - safekeeper_connstr: info.safekeeper_connstr, - availability_zone: info.availability_zone, - standby_horizon: info.standby_horizon, - } - } - MessageType::SafekeeperDiscoveryResponse => { - is_discovery = true; - match typed_msg.safekeeper_discovery_response { - Some(response) => response, - None => { - warn!("bad proto message from broker: no safekeeper_discovery_response"); - return; - } - } - } - _ => { - // unexpected message - return; - } - }; - + fn register_timeline_update(&mut self, timeline_update: storage_broker::TimelineShardUpdate) { WALRECEIVER_BROKER_UPDATES.inc(); + let storage_broker::TimelineShardUpdate { + is_discovery, + inner: timeline_update, + } = timeline_update; + trace!( "safekeeper info update: standby_horizon(cutoff)={}", timeline_update.standby_horizon diff --git a/storage_broker/Cargo.toml b/storage_broker/Cargo.toml index 67b276c8fe..d6fcefbaf2 100644 --- a/storage_broker/Cargo.toml +++ b/storage_broker/Cargo.toml @@ -27,6 +27,7 @@ parking_lot.workspace = true prost.workspace = true tonic.workspace = true tokio = { workspace = true, features = ["rt-multi-thread"] } +tokio-util.workspace = true tokio-rustls.workspace = true tracing.workspace = true metrics.workspace = true diff --git a/storage_broker/src/lib.rs b/storage_broker/src/lib.rs index 149656a191..c746df9f74 100644 --- a/storage_broker/src/lib.rs +++ b/storage_broker/src/lib.rs @@ -1,10 +1,14 @@ use std::time::Duration; +use futures::Stream; use proto::TenantTimelineId as ProtoTenantTimelineId; -use proto::broker_service_client::BrokerServiceClient; +use tokio_util::sync::CancellationToken; use tonic::Status; -use tonic::codegen::StdError; -use tonic::transport::{Channel, Endpoint}; +use tonic::transport::Endpoint; +use tracing::{error, info, warn}; +use utils::backoff::{ + DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff, +}; use utils::id::{TenantId, TenantTimelineId, TimelineId}; // Code generated by protobuf. @@ -22,6 +26,7 @@ pub mod metrics; pub use hyper::Uri; pub use tonic::transport::{Certificate, ClientTlsConfig}; pub use tonic::{Code, Request, Streaming}; +use utils::shard::TenantShardId; pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051"; pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}"); @@ -29,9 +34,159 @@ pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LIST pub const DEFAULT_KEEPALIVE_INTERVAL: &str = "5000 ms"; pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000); -// BrokerServiceClient charged with tonic provided Channel transport; helps to -// avoid depending on tonic directly in user crates. -pub type BrokerClientChannel = BrokerServiceClient; +#[derive(Clone)] +pub struct TimelineUpdatesSubscriber { + client: proto::broker_service_client::BrokerServiceClient, +} + +/// Wrapper type to weed out all places in the codebase that interact directly with the gRPC generated code. +/// We want all to go through the facade structs above so we can implement brokerless mode in the future. +pub struct BrokerClientChannel { + client: proto::broker_service_client::BrokerServiceClient, +} + +pub struct TimelineShardUpdate { + pub is_discovery: bool, + pub inner: proto::SafekeeperDiscoveryResponse, +} + +pub enum SubscriberError { + Cancelled, +} + +impl TimelineUpdatesSubscriber { + pub fn new(service_client: BrokerClientChannel) -> Self { + Self { + client: service_client.client.clone(), + } + } + pub fn subscribe( + &mut self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + cancel: &CancellationToken, + ) -> impl Stream> { + async_stream::stream! { + let mut attempt = 0; + 'resubscribe: loop { + exponential_backoff( + attempt, + DEFAULT_BASE_BACKOFF_SECONDS, + DEFAULT_MAX_BACKOFF_SECONDS, + cancel, + ) + .await; + attempt += 1; + + use proto::*; + // subscribe to the specific timeline + let request = SubscribeByFilterRequest { + types: vec![ + TypeSubscription { + r#type: MessageType::SafekeeperTimelineInfo as i32, + }, + TypeSubscription { + r#type: MessageType::SafekeeperDiscoveryResponse as i32, + }, + ], + tenant_timeline_id: Some(FilterTenantTimelineId { + enabled: true, + tenant_timeline_id: Some(ProtoTenantTimelineId { + tenant_id: tenant_shard_id.tenant_id.as_ref().to_owned(), + timeline_id: timeline_id.as_ref().to_owned(), + }), + }), + }; + + let res = tokio::select! { + r = self.client.subscribe_by_filter(request) => { r } + _ = cancel.cancelled() => { yield Err(SubscriberError::Cancelled); return; } + }; + let mut update_stream = match res + { + Ok(resp) => { + resp.into_inner() + } + Err(e) => { + // Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and + // entire WAL is streamed. Keep this noticeable with logging, but do not warn/error. + info!( + attempt, "failed to subscribe: {e:#}" + ); + continue 'resubscribe; + } + }; + loop { + let broker_update = tokio::select!{ + _ = cancel.cancelled() => { + yield Err(SubscriberError::Cancelled); return; + } + update = update_stream.message() => { update } + }; + match broker_update { + Ok(Some(typed_msg)) => { + let mut is_discovery = false; + let timeline_update = match typed_msg.r#type() { + MessageType::SafekeeperTimelineInfo => { + let info = match typed_msg.safekeeper_timeline_info { + Some(info) => info, + None => { + warn!("bad proto message from broker: no safekeeper_timeline_info"); + continue 'resubscribe; + } + }; + SafekeeperDiscoveryResponse { + safekeeper_id: info.safekeeper_id, + tenant_timeline_id: info.tenant_timeline_id, + commit_lsn: info.commit_lsn, + safekeeper_connstr: info.safekeeper_connstr, + availability_zone: info.availability_zone, + standby_horizon: info.standby_horizon, + } + } + MessageType::SafekeeperDiscoveryResponse => { + is_discovery = true; + match typed_msg.safekeeper_discovery_response { + Some(response) => response, + None => { + warn!("bad proto message from broker: no safekeeper_discovery_response"); + continue 'resubscribe; + } + } + } + _ => { + // unexpected message + warn!("unexpected message from broker: {typed_msg:?}"); + continue 'resubscribe; + } + }; + attempt = 0; // reset backoff iff we received a valid update + yield Ok(TimelineShardUpdate{is_discovery, inner: timeline_update }); + }, + Err(status) => { + match status.code() { + Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => { + // tonic's error handling doesn't provide a clear code for disconnections: we get + // "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe" + // => https://github.com/neondatabase/neon/issues/9562 + info!("broker disconnected: {status}"); + }, + _ => { + warn!("broker subscription failed: {status}"); + } + } + continue 'resubscribe; + } + Ok(None) => { + error!("broker subscription stream ended"); // can't happen + continue 'resubscribe; + } + } + } + } + } + } +} // Create connection object configured to run TLS if schema starts with https:// // and plain text otherwise. Connection is lazy, only endpoint sanity is @@ -67,19 +222,9 @@ where .connect_timeout(DEFAULT_CONNECT_TIMEOUT); // keep_alive_timeout is 20s by default on both client and server side let channel = tonic_endpoint.connect_lazy(); - Ok(BrokerClientChannel::new(channel)) -} - -impl BrokerClientChannel { - /// Create a new client to the given endpoint, but don't actually connect until the first request. - pub async fn connect_lazy(dst: D) -> Result - where - D: std::convert::TryInto, - D::Error: Into, - { - let conn = tonic::transport::Endpoint::new(dst)?.connect_lazy(); - Ok(Self::new(conn)) - } + Ok(BrokerClientChannel { + client: proto::broker_service_client::BrokerServiceClient::new(channel), + }) } // parse variable length bytes from protobuf