From 77598f5d0ae3ad89debcc5d23143373332771632 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Mon, 5 Jun 2023 17:35:23 +0300 Subject: [PATCH] Better walreceiver logging (#4402) walreceiver logs are a bit hard to understand because of partial span usage, extra messages, ignored errors popping up as huge stacktraces. Fixes #3330 (by spans, also demote info -> debug). - arrange walreceivers spans into a hiearchy: - `wal_connection_manager{tenant_id, timeline_id}` -> `connection{node_id}` -> `poller` - unifies the error reporting inside `wal_receiver`: - All ok errors are now `walreceiver connection handling ended: {e:#}` - All unknown errors are still stacktraceful task_mgr reported errors with context `walreceiver connection handling failure` - Remove `connect` special casing, was: `DB connection stream finished` for ok errors - Remove `done replicating` special casing, was `Replication stream finished` for ok errors - lowered log levels for (non-exhaustive list): - `WAL receiver manager started, connecting to broker` (at startup) - `WAL receiver shutdown requested, shutting down` (at shutdown) - `Connection manager loop ended, shutting down` (at shutdown) - `sender is dropped while join handle is still alive` (at lucky shutdown, see #2885) - `timeline entered terminal state {:?}, stopping wal connection manager loop` (at shutdown) - `connected!` (at startup) - `Walreceiver db connection closed` (at disconnects?, was without span) - `Connection cancelled` (at shutdown, was without span) - `observed timeline state change, new state is {new_state:?}` (never after Timeline::activate was made infallible) - changed: - `Timeline dropped state updates sender, stopping wal connection manager loop` - was out of date; sender is not dropped but `Broken | Stopping` state transition - also made `debug!` - `Timeline dropped state updates sender before becoming active, stopping wal connection manager loop` - was out of date: sender is again not dropped but `Broken | Stopping` state transition - also made `debug!` - log fixes: - stop double reporting panics via JoinError --- pageserver/src/tenant/timeline/walreceiver.rs | 54 ++++---- .../walreceiver/connection_manager.rs | 48 ++++--- .../walreceiver/walreceiver_connection.rs | 121 ++++++++++-------- test_runner/fixtures/neon_fixtures.py | 9 +- 4 files changed, 122 insertions(+), 110 deletions(-) diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index 7ebf3cf172..ccff735c3c 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -25,6 +25,7 @@ mod walreceiver_connection; use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::{self, TaskKind, WALRECEIVER_RUNTIME}; +use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::timeline::walreceiver::connection_manager::{ connection_manager_loop_step, ConnectionManagerState, }; @@ -85,7 +86,8 @@ impl WalReceiver { &format!("walreceiver for timeline {tenant_id}/{timeline_id}"), false, async move { - info!("WAL receiver manager started, connecting to broker"); + debug_assert_current_span_has_tenant_and_timeline_id(); + debug!("WAL receiver manager started, connecting to broker"); let mut connection_manager_state = ConnectionManagerState::new( timeline, conf, @@ -93,7 +95,7 @@ impl WalReceiver { loop { select! { _ = task_mgr::shutdown_watcher() => { - info!("WAL receiver shutdown requested, shutting down"); + trace!("WAL receiver shutdown requested, shutting down"); break; }, loop_step_result = connection_manager_loop_step( @@ -104,7 +106,7 @@ impl WalReceiver { ) => match loop_step_result { ControlFlow::Continue(()) => continue, ControlFlow::Break(()) => { - info!("Connection manager loop ended, shutting down"); + trace!("Connection manager loop ended, shutting down"); break; } }, @@ -115,7 +117,7 @@ impl WalReceiver { *loop_status.write().unwrap() = None; Ok(()) } - .instrument(info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id)) + .instrument(info_span!(parent: None, "wal_connection_manager", tenant_id = %tenant_id, timeline_id = %timeline_id)) ); Self { @@ -198,29 +200,19 @@ impl TaskHandle { TaskEvent::End(match self.join_handle.as_mut() { Some(jh) => { if !jh.is_finished() { - // Barring any implementation errors in this module, we can - // only arrive here while the task that executes the future - // passed to `Self::spawn()` is still execution. Cf the comment - // in Self::spawn(). - // - // This was logging at warning level in earlier versions, presumably - // to leave some breadcrumbs in case we had an implementation - // error that would would make us get stuck in `jh.await`. - // - // There hasn't been such a bug so far. - // But in a busy system, e.g., during pageserver restart, - // we arrive here often enough that the warning-level logs - // became a distraction. - // So, tone them down to info-level. - // - // XXX: rewrite this module to eliminate the race condition. - info!("sender is dropped while join handle is still alive"); + // See: https://github.com/neondatabase/neon/issues/2885 + trace!("sender is dropped while join handle is still alive"); } - let res = jh - .await - .map_err(|e| anyhow::anyhow!("Failed to join task: {e}")) - .and_then(|x| x); + let res = match jh.await { + Ok(res) => res, + Err(je) if je.is_cancelled() => unreachable!("not used"), + Err(je) if je.is_panic() => { + // already logged + Ok(()) + } + Err(je) => Err(anyhow::Error::new(je).context("join walreceiver task")), + }; // For cancellation-safety, drop join_handle only after successful .await. self.join_handle = None; @@ -243,12 +235,12 @@ impl TaskHandle { match jh.await { Ok(Ok(())) => debug!("Shutdown success"), Ok(Err(e)) => error!("Shutdown task error: {e:?}"), - Err(join_error) => { - if join_error.is_cancelled() { - error!("Shutdown task was cancelled"); - } else { - error!("Shutdown task join error: {join_error}") - } + Err(je) if je.is_cancelled() => unreachable!("not used"), + Err(je) if je.is_panic() => { + // already logged + } + Err(je) => { + error!("Shutdown task join error: {je}") } } } diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 6b65e1fd42..e235fab425 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -18,7 +18,7 @@ use crate::metrics::{ WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES, }; use crate::task_mgr::TaskKind; -use crate::tenant::Timeline; +use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline}; use anyhow::Context; use chrono::{NaiveDateTime, Utc}; use pageserver_api::models::TimelineState; @@ -55,8 +55,11 @@ pub(super) async fn connection_manager_loop_step( .await { Ok(()) => {} - Err(_) => { - info!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop"); + Err(new_state) => { + debug!( + ?new_state, + "state changed, stopping wal connection manager loop" + ); return ControlFlow::Break(()); } } @@ -79,7 +82,7 @@ pub(super) async fn connection_manager_loop_step( // with other streams on this client (other connection managers). When // object goes out of scope, stream finishes in drop() automatically. let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id).await; - info!("Subscribed for broker timeline updates"); + debug!("Subscribed for broker timeline updates"); loop { let time_until_next_retry = connection_manager_state.time_until_next_retry(); @@ -151,12 +154,12 @@ pub(super) async fn connection_manager_loop_step( // we're already active as walreceiver, no need to reactivate TimelineState::Active => continue, TimelineState::Broken | TimelineState::Stopping => { - info!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop"); + debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop"); return ControlFlow::Break(()); } TimelineState::Loading => { warn!("timeline transitioned back to Loading state, that should not happen"); - return ControlFlow::Continue(new_state); + return ControlFlow::Continue(()); } } } @@ -164,12 +167,11 @@ pub(super) async fn connection_manager_loop_step( } } } => match new_event { - ControlFlow::Continue(new_state) => { - info!("observed timeline state change, new state is {new_state:?}"); + ControlFlow::Continue(()) => { return ControlFlow::Continue(()); } ControlFlow::Break(()) => { - info!("Timeline dropped state updates sender, stopping wal connection manager loop"); + debug!("Timeline is no longer active, stopping wal connection manager loop"); return ControlFlow::Break(()); } }, @@ -390,7 +392,6 @@ impl ConnectionManagerState { self.drop_old_connection(true).await; - let id = self.id; let node_id = new_sk.safekeeper_id; let connect_timeout = self.conf.wal_connect_timeout; let timeline = Arc::clone(&self.timeline); @@ -398,9 +399,13 @@ impl ConnectionManagerState { TaskKind::WalReceiverConnectionHandler, DownloadBehavior::Download, ); + + let span = info_span!("connection", %node_id); let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| { async move { - super::walreceiver_connection::handle_walreceiver_connection( + debug_assert_current_span_has_tenant_and_timeline_id(); + + let res = super::walreceiver_connection::handle_walreceiver_connection( timeline, new_sk.wal_source_connconf, events_sender, @@ -409,12 +414,23 @@ impl ConnectionManagerState { ctx, node_id, ) - .await - .context("walreceiver connection handling failure") + .await; + + match res { + Ok(()) => Ok(()), + Err(e) => { + use super::walreceiver_connection::ExpectedError; + if e.is_expected() { + info!("walreceiver connection handling ended: {e:#}"); + Ok(()) + } else { + // give out an error to have task_mgr give it a really verbose logging + Err(e).context("walreceiver connection handling failure") + } + } + } } - .instrument( - info_span!("walreceiver_connection", tenant_id = %id.tenant_id, timeline_id = %id.timeline_id, %node_id), - ) + .instrument(span) }); let now = Utc::now().naive_utc(); diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 1cbed3416c..41f6c63d40 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -21,16 +21,16 @@ use postgres_types::PgLsn; use tokio::{select, sync::watch, time}; use tokio_postgres::{replication::ReplicationStream, Client}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn, Instrument}; use super::TaskStateUpdate; -use crate::metrics::LIVE_CONNECTIONS_COUNT; -use crate::{context::RequestContext, metrics::WALRECEIVER_STARTED_CONNECTIONS}; use crate::{ + context::RequestContext, + metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS}, task_mgr, task_mgr::TaskKind, task_mgr::WALRECEIVER_RUNTIME, - tenant::{Timeline, WalReceiverInfo}, + tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo}, walingest::WalIngest, walrecord::DecodedWALRecord, }; @@ -81,13 +81,8 @@ pub(super) async fn handle_walreceiver_connection( config.application_name("pageserver"); config.replication_mode(tokio_postgres::config::ReplicationMode::Physical); match time::timeout(connect_timeout, config.connect(postgres::NoTls)).await { - Ok(Ok(client_and_conn)) => client_and_conn, - Ok(Err(conn_err)) => { - let expected_error = ignore_expected_errors(conn_err)?; - info!("DB connection stream finished: {expected_error}"); - return Ok(()); - } - Err(_) => { + Ok(client_and_conn) => client_and_conn?, + Err(_elapsed) => { // Timing out to connect to a safekeeper node could happen long time, due to // many reasons that pageserver cannot control. // Do not produce an error, but make it visible, that timeouts happen by logging the `event. @@ -97,7 +92,7 @@ pub(super) async fn handle_walreceiver_connection( } }; - info!("connected!"); + debug!("connected!"); let mut connection_status = WalConnectionStatus { is_connected: true, has_processed_wal: false, @@ -127,20 +122,24 @@ pub(super) async fn handle_walreceiver_connection( "walreceiver connection", false, async move { + debug_assert_current_span_has_tenant_and_timeline_id(); + select! { connection_result = connection => match connection_result { - Ok(()) => info!("Walreceiver db connection closed"), + Ok(()) => debug!("Walreceiver db connection closed"), Err(connection_error) => { - if let Err(e) = ignore_expected_errors(connection_error) { - warn!("Connection aborted: {e:#}") + if connection_error.is_expected() { + // silence + } else { + warn!("Connection aborted: {connection_error:#}") } } }, - // Future: replace connection_cancellation with connection_ctx cancellation - _ = connection_cancellation.cancelled() => info!("Connection cancelled"), + _ = connection_cancellation.cancelled() => debug!("Connection cancelled"), } Ok(()) - }, + } + .instrument(tracing::info_span!("poller")), ); // Immediately increment the gauge, then create a job to decrement it on task exit. @@ -203,20 +202,13 @@ pub(super) async fn handle_walreceiver_connection( while let Some(replication_message) = { select! { _ = cancellation.cancelled() => { - info!("walreceiver interrupted"); + debug!("walreceiver interrupted"); None } replication_message = physical_stream.next() => replication_message, } } { - let replication_message = match replication_message { - Ok(message) => message, - Err(replication_error) => { - let expected_error = ignore_expected_errors(replication_error)?; - info!("Replication stream finished: {expected_error}"); - return Ok(()); - } - }; + let replication_message = replication_message?; let now = Utc::now().naive_utc(); let last_rec_lsn_before_msg = last_rec_lsn; @@ -261,8 +253,6 @@ pub(super) async fn handle_walreceiver_connection( let mut decoded = DecodedWALRecord::default(); let mut modification = timeline.begin_modification(endlsn); while let Some((lsn, recdata)) = waldecoder.poll_decode()? { - // let _enter = info_span!("processing record", lsn = %lsn).entered(); - // It is important to deal with the aligned records as lsn in getPage@LSN is // aligned and can be several bytes bigger. Without this alignment we are // at risk of hitting a deadlock. @@ -421,31 +411,50 @@ async fn identify_system(client: &mut Client) -> anyhow::Result } } -/// We don't want to report connectivity problems as real errors towards connection manager because -/// 1. they happen frequently enough to make server logs hard to read and -/// 2. the connection manager can retry other safekeeper. -/// -/// If this function returns `Ok(pg_error)`, it's such an error. -/// The caller should log it at info level and then report to connection manager that we're done handling this connection. -/// Connection manager will then handle reconnections. -/// -/// If this function returns an `Err()`, the caller can bubble it up using `?`. -/// The connection manager will log the error at ERROR level. -fn ignore_expected_errors(pg_error: postgres::Error) -> anyhow::Result { - if pg_error.is_closed() - || pg_error - .source() - .and_then(|source| source.downcast_ref::()) - .map(is_expected_io_error) - .unwrap_or(false) - { - return Ok(pg_error); - } else if let Some(db_error) = pg_error.as_db_error() { - if db_error.code() == &SqlState::SUCCESSFUL_COMPLETION - && db_error.message().contains("ending streaming") - { - return Ok(pg_error); - } - } - Err(pg_error).context("connection error") +/// Trait for avoid reporting walreceiver specific expected or "normal" or "ok" errors. +pub(super) trait ExpectedError { + /// Test if this error is an ok error. + /// + /// We don't want to report connectivity problems as real errors towards connection manager because + /// 1. they happen frequently enough to make server logs hard to read and + /// 2. the connection manager can retry other safekeeper. + /// + /// If this function returns `true`, it's such an error. + /// The caller should log it at info level and then report to connection manager that we're done handling this connection. + /// Connection manager will then handle reconnections. + /// + /// If this function returns an `false` the error should be propagated and the connection manager + /// will log the error at ERROR level. + fn is_expected(&self) -> bool; +} + +impl ExpectedError for postgres::Error { + fn is_expected(&self) -> bool { + self.is_closed() + || self + .source() + .and_then(|source| source.downcast_ref::()) + .map(is_expected_io_error) + .unwrap_or(false) + || self + .as_db_error() + .filter(|db_error| { + db_error.code() == &SqlState::SUCCESSFUL_COMPLETION + && db_error.message().contains("ending streaming") + }) + .is_some() + } +} + +impl ExpectedError for anyhow::Error { + fn is_expected(&self) -> bool { + let head = self.downcast_ref::(); + + let tail = self + .chain() + .filter_map(|e| e.downcast_ref::()); + + // check if self or any of the chained/sourced errors are expected + head.into_iter().chain(tail).any(|e| e.is_expected()) + } } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 5017c8dcd3..e23ed12878 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1585,13 +1585,10 @@ class NeonPageserver(PgProtocol): ".*serving compute connection task.*exited with error: Postgres connection error.*", ".*serving compute connection task.*exited with error: Connection reset by peer.*", ".*serving compute connection task.*exited with error: Postgres query error.*", - ".*Connection aborted: connection error: error communicating with the server: Broken pipe.*", - ".*Connection aborted: connection error: error communicating with the server: Transport endpoint is not connected.*", - ".*Connection aborted: connection error: error communicating with the server: Connection reset by peer.*", # FIXME: replication patch for tokio_postgres regards any but CopyDone/CopyData message in CopyBoth stream as unexpected - ".*Connection aborted: connection error: unexpected message from server*", + ".*Connection aborted: unexpected message from server*", ".*kill_and_wait_impl.*: wait successful.*", - ".*Replication stream finished: db error:.*ending streaming to Some*", + ".*: db error:.*ending streaming to Some.*", ".*query handler for 'pagestream.*failed: Broken pipe.*", # pageserver notices compute shut down ".*query handler for 'pagestream.*failed: Connection reset by peer.*", # pageserver notices compute shut down # safekeeper connection can fail with this, in the window between timeline creation @@ -1608,8 +1605,6 @@ class NeonPageserver(PgProtocol): ".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*", ".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*", ".*Removing intermediate uninit mark file.*", - # FIXME: known race condition in TaskHandle: https://github.com/neondatabase/neon/issues/2885 - ".*sender is dropped while join handle is still alive.*", # Tenant::delete_timeline() can cause any of the four following errors. # FIXME: we shouldn't be considering it an error: https://github.com/neondatabase/neon/issues/2946 ".*could not flush frozen layer.*queue is in state Stopped", # when schedule layer upload fails because queued got closed before compaction got killed