diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 00c8ced68a..a845456181 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -159,11 +159,8 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { // TODO: we shouldn't need to await to find tenant and this could be moved outside of // loop, #3501. There are also additional "allowed_errors" in tests. - if first { - first = false; - if random_init_delay(period, &cancel).await.is_err() { - break; - } + if first && random_init_delay(period, &cancel).await.is_err() { + break; } let started_at = Instant::now(); @@ -183,7 +180,16 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { } }; - warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Compaction); + if !first { + // The first iteration is typically much slower, because all tenants compete for the + // compaction sempahore to run, and because of concurrent startup work like initializing + // logical sizes. To avoid routinely spamming warnings, we suppress this log on first iteration. + warn_when_period_overrun( + started_at.elapsed(), + period, + BackgroundLoopKind::Compaction, + ); + } // Sleep if tokio::time::timeout(sleep_duration, cancel.cancelled()) @@ -192,6 +198,8 @@ async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { { break; } + + first = false; } } .await; @@ -223,11 +231,8 @@ async fn gc_loop(tenant: Arc, cancel: CancellationToken) { let period = tenant.get_gc_period(); - if first { - first = false; - if random_init_delay(period, &cancel).await.is_err() { - break; - } + if first && random_init_delay(period, &cancel).await.is_err() { + break; } let started_at = Instant::now(); @@ -251,7 +256,12 @@ async fn gc_loop(tenant: Arc, cancel: CancellationToken) { } }; - warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Gc); + if !first { + // The first iteration is typically much slower, because all tenants compete for the + // compaction sempahore to run, and because of concurrent startup work like initializing + // logical sizes. To avoid routinely spamming warnings, we suppress this log on first iteration. + warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Gc); + } // Sleep if tokio::time::timeout(sleep_duration, cancel.cancelled()) @@ -260,6 +270,8 @@ async fn gc_loop(tenant: Arc, cancel: CancellationToken) { { break; } + + first = false; } } .await; diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 36960c631c..e28c4a5f69 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -26,8 +26,7 @@ use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey; use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::SubscribeSafekeeperInfoRequest; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; -use storage_broker::BrokerClientChannel; -use storage_broker::Streaming; +use storage_broker::{BrokerClientChannel, Code, Streaming}; use tokio::select; use tracing::*; @@ -137,8 +136,17 @@ pub(super) async fn connection_manager_loop_step( broker_update = broker_subscription.message() => { match broker_update { Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update), - Err(e) => { - error!("broker subscription failed: {e}"); + Err(status) => { + match status.code() { + Code::Unknown if status.message().contains("stream closed because of a broken pipe") => { + // tonic's error handling doesn't provide a clear code for disconnections: we get + // "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe" + info!("broker disconnected: {status}"); + }, + _ => { + warn!("broker subscription failed: {status}"); + } + } return ControlFlow::Continue(()); } Ok(None) => { diff --git a/storage_broker/src/lib.rs b/storage_broker/src/lib.rs index 3f6fa35cbe..aa5d0bad5f 100644 --- a/storage_broker/src/lib.rs +++ b/storage_broker/src/lib.rs @@ -4,7 +4,7 @@ use std::task::{Context, Poll}; use std::time::Duration; use tonic::codegen::StdError; use tonic::transport::{ClientTlsConfig, Endpoint}; -use tonic::{transport::Channel, Code, Status}; +use tonic::{transport::Channel, Status}; use utils::id::{TenantId, TenantTimelineId, TimelineId}; use proto::{ @@ -23,6 +23,7 @@ pub mod proto { pub mod metrics; // Re-exports to avoid direct tonic dependency in user crates. +pub use tonic::Code; pub use tonic::Request; pub use tonic::Streaming;