From 4892a5c5b790250c05882365ce16b7eb072ce0f9 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 8 Aug 2023 10:35:24 +0100 Subject: [PATCH] pageserver: avoid logging the "ERROR" part of DbErrors that are successes (#4902) ## Problem The pageserver<->safekeeper protocol uses error messages to indicate end of stream. pageserver already logs these at INFO level, but the inner error message includes the word "ERROR", which interferes with log searching. Example: ``` walreceiver connection handling ended: db error: ERROR: ending streaming to Some("pageserver") at 0/4031CA8 ``` The inner DbError has a severity of ERROR so DbError's Display implementation includes that ERROR, even though we are actually logging the error at INFO level. ## Summary of changes Introduce an explicit WalReceiverError type, and in its From<> for postgres errors, apply the logic from ExpectedError, for expected errors, and a new condition for successes. The new output looks like: ``` walreceiver connection handling ended: Successful completion: ending streaming to Some("pageserver") at 0/154E9C0, receiver is caughtup and there is no computes ``` --- .../walreceiver/connection_manager.rs | 25 ++-- .../walreceiver/walreceiver_connection.rs | 118 +++++++++--------- safekeeper/src/send_wal.rs | 3 + test_runner/fixtures/neon_fixtures.py | 1 - 4 files changed, 80 insertions(+), 67 deletions(-) diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 2642746c79..e96aa41da0 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -38,7 +38,10 @@ use utils::{ lsn::Lsn, }; -use super::{walreceiver_connection::WalConnectionStatus, TaskEvent, TaskHandle}; +use super::{ + walreceiver_connection::WalConnectionStatus, walreceiver_connection::WalReceiverError, + TaskEvent, TaskHandle, +}; /// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker. /// Based on the updates, desides whether to start, keep or stop a WAL receiver task. @@ -419,13 +422,19 @@ impl ConnectionManagerState { match res { Ok(()) => Ok(()), Err(e) => { - use super::walreceiver_connection::ExpectedError; - if e.is_expected() { - info!("walreceiver connection handling ended: {e:#}"); - Ok(()) - } else { - // give out an error to have task_mgr give it a really verbose logging - Err(e).context("walreceiver connection handling failure") + match e { + WalReceiverError::SuccessfulCompletion(msg) => { + info!("walreceiver connection handling ended with success: {msg}"); + Ok(()) + } + WalReceiverError::ExpectedSafekeeperError(e) => { + info!("walreceiver connection handling ended: {e}"); + Ok(()) + } + WalReceiverError::Other(e) => { + // give out an error to have task_mgr give it a really verbose logging + Err(e).context("walreceiver connection handling failure") + } } } } diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 817a30247e..7d1e9b4a39 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -8,14 +8,14 @@ use std::{ time::{Duration, SystemTime}, }; -use anyhow::{bail, ensure, Context}; +use anyhow::{anyhow, Context}; use bytes::BytesMut; use chrono::{NaiveDateTime, Utc}; use fail::fail_point; use futures::StreamExt; use postgres::{error::SqlState, SimpleQueryMessage, SimpleQueryRow}; -use postgres_ffi::v14::xlog_utils::normalize_lsn; use postgres_ffi::WAL_SEGMENT_SIZE; +use postgres_ffi::{v14::xlog_utils::normalize_lsn, waldecoder::WalDecodeError}; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; use tokio::{select, sync::watch, time}; @@ -60,6 +60,50 @@ pub(super) struct WalConnectionStatus { pub node: NodeId, } +pub(super) enum WalReceiverError { + /// An error of a type that does not indicate an issue, e.g. a connection closing + ExpectedSafekeeperError(postgres::Error), + /// An "error" message that carries a SUCCESSFUL_COMPLETION status code. Carries + /// the message part of the original postgres error + SuccessfulCompletion(String), + /// Generic error + Other(anyhow::Error), +} + +impl From for WalReceiverError { + fn from(err: tokio_postgres::Error) -> Self { + if let Some(dberror) = err.as_db_error().filter(|db_error| { + db_error.code() == &SqlState::SUCCESSFUL_COMPLETION + && db_error.message().contains("ending streaming") + }) { + // Strip the outer DbError, which carries a misleading "error" severity + Self::SuccessfulCompletion(dberror.message().to_string()) + } else if err.is_closed() + || err + .source() + .and_then(|source| source.downcast_ref::()) + .map(is_expected_io_error) + .unwrap_or(false) + { + Self::ExpectedSafekeeperError(err) + } else { + Self::Other(anyhow::Error::new(err)) + } + } +} + +impl From for WalReceiverError { + fn from(err: anyhow::Error) -> Self { + Self::Other(err) + } +} + +impl From for WalReceiverError { + fn from(err: WalDecodeError) -> Self { + Self::Other(anyhow::Error::new(err)) + } +} + /// Open a connection to the given safekeeper and receive WAL, sending back progress /// messages as we go. pub(super) async fn handle_walreceiver_connection( @@ -70,7 +114,7 @@ pub(super) async fn handle_walreceiver_connection( connect_timeout: Duration, ctx: RequestContext, node: NodeId, -) -> anyhow::Result<()> { +) -> Result<(), WalReceiverError> { debug_assert_current_span_has_tenant_and_timeline_id(); WALRECEIVER_STARTED_CONNECTIONS.inc(); @@ -130,11 +174,15 @@ pub(super) async fn handle_walreceiver_connection( connection_result = connection => match connection_result { Ok(()) => debug!("Walreceiver db connection closed"), Err(connection_error) => { - if connection_error.is_expected() { - // silence, because most likely we've already exited the outer call - // with a similar error. - } else { - warn!("Connection aborted: {connection_error:#}") + match WalReceiverError::from(connection_error) { + WalReceiverError::ExpectedSafekeeperError(_) => { + // silence, because most likely we've already exited the outer call + // with a similar error. + }, + WalReceiverError::SuccessfulCompletion(_) => {} + WalReceiverError::Other(err) => { + warn!("Connection aborted: {err:#}") + } } } }, @@ -180,7 +228,7 @@ pub(super) async fn handle_walreceiver_connection( let mut startpoint = last_rec_lsn; if startpoint == Lsn(0) { - bail!("No previous WAL position"); + return Err(WalReceiverError::Other(anyhow!("No previous WAL position"))); } // There might be some padding after the last full record, skip it. @@ -262,7 +310,9 @@ pub(super) async fn handle_walreceiver_connection( // It is important to deal with the aligned records as lsn in getPage@LSN is // aligned and can be several bytes bigger. Without this alignment we are // at risk of hitting a deadlock. - ensure!(lsn.is_aligned()); + if !lsn.is_aligned() { + return Err(WalReceiverError::Other(anyhow!("LSN not aligned"))); + } walingest .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx) @@ -419,51 +469,3 @@ async fn identify_system(client: &mut Client) -> anyhow::Result Err(IdentifyError.into()) } } - -/// Trait for avoid reporting walreceiver specific expected or "normal" or "ok" errors. -pub(super) trait ExpectedError { - /// Test if this error is an ok error. - /// - /// We don't want to report connectivity problems as real errors towards connection manager because - /// 1. they happen frequently enough to make server logs hard to read and - /// 2. the connection manager can retry other safekeeper. - /// - /// If this function returns `true`, it's such an error. - /// The caller should log it at info level and then report to connection manager that we're done handling this connection. - /// Connection manager will then handle reconnections. - /// - /// If this function returns an `false` the error should be propagated and the connection manager - /// will log the error at ERROR level. - fn is_expected(&self) -> bool; -} - -impl ExpectedError for postgres::Error { - fn is_expected(&self) -> bool { - self.is_closed() - || self - .source() - .and_then(|source| source.downcast_ref::()) - .map(is_expected_io_error) - .unwrap_or(false) - || self - .as_db_error() - .filter(|db_error| { - db_error.code() == &SqlState::SUCCESSFUL_COMPLETION - && db_error.message().contains("ending streaming") - }) - .is_some() - } -} - -impl ExpectedError for anyhow::Error { - fn is_expected(&self) -> bool { - let head = self.downcast_ref::(); - - let tail = self - .chain() - .filter_map(|e| e.downcast_ref::()); - - // check if self or any of the chained/sourced errors are expected - head.into_iter().chain(tail).any(|e| e.is_expected()) - } -} diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 92a7bb703a..a0d4bb35d7 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -568,6 +568,9 @@ impl WalSender<'_, IO> { { if self.tli.should_walsender_stop(remote_consistent_lsn).await { // Terminate if there is nothing more to send. + // Note that "ending streaming" part of the string is used by + // pageserver to identify WalReceiverError::SuccessfulCompletion, + // do not change this string without updating pageserver. return Err(CopyStreamHandlerEnd::ServerInitiated(format!( "ending streaming to {:?} at {}, receiver is caughtup and there is no computes", self.appname, self.start_pos, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index cdda8c414e..7ba979745b 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1486,7 +1486,6 @@ class NeonPageserver(PgProtocol): # FIXME: replication patch for tokio_postgres regards any but CopyDone/CopyData message in CopyBoth stream as unexpected ".*Connection aborted: unexpected message from server*", ".*kill_and_wait_impl.*: wait successful.*", - ".*: db error:.*ending streaming to Some.*", ".*query handler for 'pagestream.*failed: Broken pipe.*", # pageserver notices compute shut down ".*query handler for 'pagestream.*failed: Connection reset by peer.*", # pageserver notices compute shut down # safekeeper connection can fail with this, in the window between timeline creation