mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 12:30:38 +00:00
Better walreceiver logging (#4402)
walreceiver logs are a bit hard to understand because of partial span usage, extra messages, ignored errors popping up as huge stacktraces. Fixes #3330 (by spans, also demote info -> debug). - arrange walreceivers spans into a hiearchy: - `wal_connection_manager{tenant_id, timeline_id}` -> `connection{node_id}` -> `poller` - unifies the error reporting inside `wal_receiver`: - All ok errors are now `walreceiver connection handling ended: {e:#}` - All unknown errors are still stacktraceful task_mgr reported errors with context `walreceiver connection handling failure` - Remove `connect` special casing, was: `DB connection stream finished` for ok errors - Remove `done replicating` special casing, was `Replication stream finished` for ok errors - lowered log levels for (non-exhaustive list): - `WAL receiver manager started, connecting to broker` (at startup) - `WAL receiver shutdown requested, shutting down` (at shutdown) - `Connection manager loop ended, shutting down` (at shutdown) - `sender is dropped while join handle is still alive` (at lucky shutdown, see #2885) - `timeline entered terminal state {:?}, stopping wal connection manager loop` (at shutdown) - `connected!` (at startup) - `Walreceiver db connection closed` (at disconnects?, was without span) - `Connection cancelled` (at shutdown, was without span) - `observed timeline state change, new state is {new_state:?}` (never after Timeline::activate was made infallible) - changed: - `Timeline dropped state updates sender, stopping wal connection manager loop` - was out of date; sender is not dropped but `Broken | Stopping` state transition - also made `debug!` - `Timeline dropped state updates sender before becoming active, stopping wal connection manager loop` - was out of date: sender is again not dropped but `Broken | Stopping` state transition - also made `debug!` - log fixes: - stop double reporting panics via JoinError
This commit is contained in:
@@ -25,6 +25,7 @@ mod walreceiver_connection;
|
||||
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::{self, TaskKind, WALRECEIVER_RUNTIME};
|
||||
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::timeline::walreceiver::connection_manager::{
|
||||
connection_manager_loop_step, ConnectionManagerState,
|
||||
};
|
||||
@@ -85,7 +86,8 @@ impl WalReceiver {
|
||||
&format!("walreceiver for timeline {tenant_id}/{timeline_id}"),
|
||||
false,
|
||||
async move {
|
||||
info!("WAL receiver manager started, connecting to broker");
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
debug!("WAL receiver manager started, connecting to broker");
|
||||
let mut connection_manager_state = ConnectionManagerState::new(
|
||||
timeline,
|
||||
conf,
|
||||
@@ -93,7 +95,7 @@ impl WalReceiver {
|
||||
loop {
|
||||
select! {
|
||||
_ = task_mgr::shutdown_watcher() => {
|
||||
info!("WAL receiver shutdown requested, shutting down");
|
||||
trace!("WAL receiver shutdown requested, shutting down");
|
||||
break;
|
||||
},
|
||||
loop_step_result = connection_manager_loop_step(
|
||||
@@ -104,7 +106,7 @@ impl WalReceiver {
|
||||
) => match loop_step_result {
|
||||
ControlFlow::Continue(()) => continue,
|
||||
ControlFlow::Break(()) => {
|
||||
info!("Connection manager loop ended, shutting down");
|
||||
trace!("Connection manager loop ended, shutting down");
|
||||
break;
|
||||
}
|
||||
},
|
||||
@@ -115,7 +117,7 @@ impl WalReceiver {
|
||||
*loop_status.write().unwrap() = None;
|
||||
Ok(())
|
||||
}
|
||||
.instrument(info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id))
|
||||
.instrument(info_span!(parent: None, "wal_connection_manager", tenant_id = %tenant_id, timeline_id = %timeline_id))
|
||||
);
|
||||
|
||||
Self {
|
||||
@@ -198,29 +200,19 @@ impl<E: Clone> TaskHandle<E> {
|
||||
TaskEvent::End(match self.join_handle.as_mut() {
|
||||
Some(jh) => {
|
||||
if !jh.is_finished() {
|
||||
// Barring any implementation errors in this module, we can
|
||||
// only arrive here while the task that executes the future
|
||||
// passed to `Self::spawn()` is still execution. Cf the comment
|
||||
// in Self::spawn().
|
||||
//
|
||||
// This was logging at warning level in earlier versions, presumably
|
||||
// to leave some breadcrumbs in case we had an implementation
|
||||
// error that would would make us get stuck in `jh.await`.
|
||||
//
|
||||
// There hasn't been such a bug so far.
|
||||
// But in a busy system, e.g., during pageserver restart,
|
||||
// we arrive here often enough that the warning-level logs
|
||||
// became a distraction.
|
||||
// So, tone them down to info-level.
|
||||
//
|
||||
// XXX: rewrite this module to eliminate the race condition.
|
||||
info!("sender is dropped while join handle is still alive");
|
||||
// See: https://github.com/neondatabase/neon/issues/2885
|
||||
trace!("sender is dropped while join handle is still alive");
|
||||
}
|
||||
|
||||
let res = jh
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to join task: {e}"))
|
||||
.and_then(|x| x);
|
||||
let res = match jh.await {
|
||||
Ok(res) => res,
|
||||
Err(je) if je.is_cancelled() => unreachable!("not used"),
|
||||
Err(je) if je.is_panic() => {
|
||||
// already logged
|
||||
Ok(())
|
||||
}
|
||||
Err(je) => Err(anyhow::Error::new(je).context("join walreceiver task")),
|
||||
};
|
||||
|
||||
// For cancellation-safety, drop join_handle only after successful .await.
|
||||
self.join_handle = None;
|
||||
@@ -243,12 +235,12 @@ impl<E: Clone> TaskHandle<E> {
|
||||
match jh.await {
|
||||
Ok(Ok(())) => debug!("Shutdown success"),
|
||||
Ok(Err(e)) => error!("Shutdown task error: {e:?}"),
|
||||
Err(join_error) => {
|
||||
if join_error.is_cancelled() {
|
||||
error!("Shutdown task was cancelled");
|
||||
} else {
|
||||
error!("Shutdown task join error: {join_error}")
|
||||
}
|
||||
Err(je) if je.is_cancelled() => unreachable!("not used"),
|
||||
Err(je) if je.is_panic() => {
|
||||
// already logged
|
||||
}
|
||||
Err(je) => {
|
||||
error!("Shutdown task join error: {je}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ use crate::metrics::{
|
||||
WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
|
||||
};
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline};
|
||||
use anyhow::Context;
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
use pageserver_api::models::TimelineState;
|
||||
@@ -55,8 +55,11 @@ pub(super) async fn connection_manager_loop_step(
|
||||
.await
|
||||
{
|
||||
Ok(()) => {}
|
||||
Err(_) => {
|
||||
info!("Timeline dropped state updates sender before becoming active, stopping wal connection manager loop");
|
||||
Err(new_state) => {
|
||||
debug!(
|
||||
?new_state,
|
||||
"state changed, stopping wal connection manager loop"
|
||||
);
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
}
|
||||
@@ -79,7 +82,7 @@ pub(super) async fn connection_manager_loop_step(
|
||||
// with other streams on this client (other connection managers). When
|
||||
// object goes out of scope, stream finishes in drop() automatically.
|
||||
let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id).await;
|
||||
info!("Subscribed for broker timeline updates");
|
||||
debug!("Subscribed for broker timeline updates");
|
||||
|
||||
loop {
|
||||
let time_until_next_retry = connection_manager_state.time_until_next_retry();
|
||||
@@ -151,12 +154,12 @@ pub(super) async fn connection_manager_loop_step(
|
||||
// we're already active as walreceiver, no need to reactivate
|
||||
TimelineState::Active => continue,
|
||||
TimelineState::Broken | TimelineState::Stopping => {
|
||||
info!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
|
||||
debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
TimelineState::Loading => {
|
||||
warn!("timeline transitioned back to Loading state, that should not happen");
|
||||
return ControlFlow::Continue(new_state);
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -164,12 +167,11 @@ pub(super) async fn connection_manager_loop_step(
|
||||
}
|
||||
}
|
||||
} => match new_event {
|
||||
ControlFlow::Continue(new_state) => {
|
||||
info!("observed timeline state change, new state is {new_state:?}");
|
||||
ControlFlow::Continue(()) => {
|
||||
return ControlFlow::Continue(());
|
||||
}
|
||||
ControlFlow::Break(()) => {
|
||||
info!("Timeline dropped state updates sender, stopping wal connection manager loop");
|
||||
debug!("Timeline is no longer active, stopping wal connection manager loop");
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
},
|
||||
@@ -390,7 +392,6 @@ impl ConnectionManagerState {
|
||||
|
||||
self.drop_old_connection(true).await;
|
||||
|
||||
let id = self.id;
|
||||
let node_id = new_sk.safekeeper_id;
|
||||
let connect_timeout = self.conf.wal_connect_timeout;
|
||||
let timeline = Arc::clone(&self.timeline);
|
||||
@@ -398,9 +399,13 @@ impl ConnectionManagerState {
|
||||
TaskKind::WalReceiverConnectionHandler,
|
||||
DownloadBehavior::Download,
|
||||
);
|
||||
|
||||
let span = info_span!("connection", %node_id);
|
||||
let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| {
|
||||
async move {
|
||||
super::walreceiver_connection::handle_walreceiver_connection(
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let res = super::walreceiver_connection::handle_walreceiver_connection(
|
||||
timeline,
|
||||
new_sk.wal_source_connconf,
|
||||
events_sender,
|
||||
@@ -409,12 +414,23 @@ impl ConnectionManagerState {
|
||||
ctx,
|
||||
node_id,
|
||||
)
|
||||
.await
|
||||
.context("walreceiver connection handling failure")
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(()) => Ok(()),
|
||||
Err(e) => {
|
||||
use super::walreceiver_connection::ExpectedError;
|
||||
if e.is_expected() {
|
||||
info!("walreceiver connection handling ended: {e:#}");
|
||||
Ok(())
|
||||
} else {
|
||||
// give out an error to have task_mgr give it a really verbose logging
|
||||
Err(e).context("walreceiver connection handling failure")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(
|
||||
info_span!("walreceiver_connection", tenant_id = %id.tenant_id, timeline_id = %id.timeline_id, %node_id),
|
||||
)
|
||||
.instrument(span)
|
||||
});
|
||||
|
||||
let now = Utc::now().naive_utc();
|
||||
|
||||
@@ -21,16 +21,16 @@ use postgres_types::PgLsn;
|
||||
use tokio::{select, sync::watch, time};
|
||||
use tokio_postgres::{replication::ReplicationStream, Client};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
use tracing::{debug, error, info, trace, warn, Instrument};
|
||||
|
||||
use super::TaskStateUpdate;
|
||||
use crate::metrics::LIVE_CONNECTIONS_COUNT;
|
||||
use crate::{context::RequestContext, metrics::WALRECEIVER_STARTED_CONNECTIONS};
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS},
|
||||
task_mgr,
|
||||
task_mgr::TaskKind,
|
||||
task_mgr::WALRECEIVER_RUNTIME,
|
||||
tenant::{Timeline, WalReceiverInfo},
|
||||
tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
|
||||
walingest::WalIngest,
|
||||
walrecord::DecodedWALRecord,
|
||||
};
|
||||
@@ -81,13 +81,8 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
config.application_name("pageserver");
|
||||
config.replication_mode(tokio_postgres::config::ReplicationMode::Physical);
|
||||
match time::timeout(connect_timeout, config.connect(postgres::NoTls)).await {
|
||||
Ok(Ok(client_and_conn)) => client_and_conn,
|
||||
Ok(Err(conn_err)) => {
|
||||
let expected_error = ignore_expected_errors(conn_err)?;
|
||||
info!("DB connection stream finished: {expected_error}");
|
||||
return Ok(());
|
||||
}
|
||||
Err(_) => {
|
||||
Ok(client_and_conn) => client_and_conn?,
|
||||
Err(_elapsed) => {
|
||||
// Timing out to connect to a safekeeper node could happen long time, due to
|
||||
// many reasons that pageserver cannot control.
|
||||
// Do not produce an error, but make it visible, that timeouts happen by logging the `event.
|
||||
@@ -97,7 +92,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
}
|
||||
};
|
||||
|
||||
info!("connected!");
|
||||
debug!("connected!");
|
||||
let mut connection_status = WalConnectionStatus {
|
||||
is_connected: true,
|
||||
has_processed_wal: false,
|
||||
@@ -127,20 +122,24 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
"walreceiver connection",
|
||||
false,
|
||||
async move {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
select! {
|
||||
connection_result = connection => match connection_result {
|
||||
Ok(()) => info!("Walreceiver db connection closed"),
|
||||
Ok(()) => debug!("Walreceiver db connection closed"),
|
||||
Err(connection_error) => {
|
||||
if let Err(e) = ignore_expected_errors(connection_error) {
|
||||
warn!("Connection aborted: {e:#}")
|
||||
if connection_error.is_expected() {
|
||||
// silence
|
||||
} else {
|
||||
warn!("Connection aborted: {connection_error:#}")
|
||||
}
|
||||
}
|
||||
},
|
||||
// Future: replace connection_cancellation with connection_ctx cancellation
|
||||
_ = connection_cancellation.cancelled() => info!("Connection cancelled"),
|
||||
_ = connection_cancellation.cancelled() => debug!("Connection cancelled"),
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
}
|
||||
.instrument(tracing::info_span!("poller")),
|
||||
);
|
||||
|
||||
// Immediately increment the gauge, then create a job to decrement it on task exit.
|
||||
@@ -203,20 +202,13 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
while let Some(replication_message) = {
|
||||
select! {
|
||||
_ = cancellation.cancelled() => {
|
||||
info!("walreceiver interrupted");
|
||||
debug!("walreceiver interrupted");
|
||||
None
|
||||
}
|
||||
replication_message = physical_stream.next() => replication_message,
|
||||
}
|
||||
} {
|
||||
let replication_message = match replication_message {
|
||||
Ok(message) => message,
|
||||
Err(replication_error) => {
|
||||
let expected_error = ignore_expected_errors(replication_error)?;
|
||||
info!("Replication stream finished: {expected_error}");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
let replication_message = replication_message?;
|
||||
|
||||
let now = Utc::now().naive_utc();
|
||||
let last_rec_lsn_before_msg = last_rec_lsn;
|
||||
@@ -261,8 +253,6 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
let mut modification = timeline.begin_modification(endlsn);
|
||||
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
// let _enter = info_span!("processing record", lsn = %lsn).entered();
|
||||
|
||||
// It is important to deal with the aligned records as lsn in getPage@LSN is
|
||||
// aligned and can be several bytes bigger. Without this alignment we are
|
||||
// at risk of hitting a deadlock.
|
||||
@@ -421,31 +411,50 @@ async fn identify_system(client: &mut Client) -> anyhow::Result<IdentifySystem>
|
||||
}
|
||||
}
|
||||
|
||||
/// We don't want to report connectivity problems as real errors towards connection manager because
|
||||
/// 1. they happen frequently enough to make server logs hard to read and
|
||||
/// 2. the connection manager can retry other safekeeper.
|
||||
///
|
||||
/// If this function returns `Ok(pg_error)`, it's such an error.
|
||||
/// The caller should log it at info level and then report to connection manager that we're done handling this connection.
|
||||
/// Connection manager will then handle reconnections.
|
||||
///
|
||||
/// If this function returns an `Err()`, the caller can bubble it up using `?`.
|
||||
/// The connection manager will log the error at ERROR level.
|
||||
fn ignore_expected_errors(pg_error: postgres::Error) -> anyhow::Result<postgres::Error> {
|
||||
if pg_error.is_closed()
|
||||
|| pg_error
|
||||
.source()
|
||||
.and_then(|source| source.downcast_ref::<std::io::Error>())
|
||||
.map(is_expected_io_error)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
return Ok(pg_error);
|
||||
} else if let Some(db_error) = pg_error.as_db_error() {
|
||||
if db_error.code() == &SqlState::SUCCESSFUL_COMPLETION
|
||||
&& db_error.message().contains("ending streaming")
|
||||
{
|
||||
return Ok(pg_error);
|
||||
}
|
||||
}
|
||||
Err(pg_error).context("connection error")
|
||||
/// Trait for avoid reporting walreceiver specific expected or "normal" or "ok" errors.
|
||||
pub(super) trait ExpectedError {
|
||||
/// Test if this error is an ok error.
|
||||
///
|
||||
/// We don't want to report connectivity problems as real errors towards connection manager because
|
||||
/// 1. they happen frequently enough to make server logs hard to read and
|
||||
/// 2. the connection manager can retry other safekeeper.
|
||||
///
|
||||
/// If this function returns `true`, it's such an error.
|
||||
/// The caller should log it at info level and then report to connection manager that we're done handling this connection.
|
||||
/// Connection manager will then handle reconnections.
|
||||
///
|
||||
/// If this function returns an `false` the error should be propagated and the connection manager
|
||||
/// will log the error at ERROR level.
|
||||
fn is_expected(&self) -> bool;
|
||||
}
|
||||
|
||||
impl ExpectedError for postgres::Error {
|
||||
fn is_expected(&self) -> bool {
|
||||
self.is_closed()
|
||||
|| self
|
||||
.source()
|
||||
.and_then(|source| source.downcast_ref::<std::io::Error>())
|
||||
.map(is_expected_io_error)
|
||||
.unwrap_or(false)
|
||||
|| self
|
||||
.as_db_error()
|
||||
.filter(|db_error| {
|
||||
db_error.code() == &SqlState::SUCCESSFUL_COMPLETION
|
||||
&& db_error.message().contains("ending streaming")
|
||||
})
|
||||
.is_some()
|
||||
}
|
||||
}
|
||||
|
||||
impl ExpectedError for anyhow::Error {
|
||||
fn is_expected(&self) -> bool {
|
||||
let head = self.downcast_ref::<postgres::Error>();
|
||||
|
||||
let tail = self
|
||||
.chain()
|
||||
.filter_map(|e| e.downcast_ref::<postgres::Error>());
|
||||
|
||||
// check if self or any of the chained/sourced errors are expected
|
||||
head.into_iter().chain(tail).any(|e| e.is_expected())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1585,13 +1585,10 @@ class NeonPageserver(PgProtocol):
|
||||
".*serving compute connection task.*exited with error: Postgres connection error.*",
|
||||
".*serving compute connection task.*exited with error: Connection reset by peer.*",
|
||||
".*serving compute connection task.*exited with error: Postgres query error.*",
|
||||
".*Connection aborted: connection error: error communicating with the server: Broken pipe.*",
|
||||
".*Connection aborted: connection error: error communicating with the server: Transport endpoint is not connected.*",
|
||||
".*Connection aborted: connection error: error communicating with the server: Connection reset by peer.*",
|
||||
# FIXME: replication patch for tokio_postgres regards any but CopyDone/CopyData message in CopyBoth stream as unexpected
|
||||
".*Connection aborted: connection error: unexpected message from server*",
|
||||
".*Connection aborted: unexpected message from server*",
|
||||
".*kill_and_wait_impl.*: wait successful.*",
|
||||
".*Replication stream finished: db error:.*ending streaming to Some*",
|
||||
".*: db error:.*ending streaming to Some.*",
|
||||
".*query handler for 'pagestream.*failed: Broken pipe.*", # pageserver notices compute shut down
|
||||
".*query handler for 'pagestream.*failed: Connection reset by peer.*", # pageserver notices compute shut down
|
||||
# safekeeper connection can fail with this, in the window between timeline creation
|
||||
@@ -1608,8 +1605,6 @@ class NeonPageserver(PgProtocol):
|
||||
".*manual_gc.*is_shutdown_requested\\(\\) called in an unexpected task or thread.*",
|
||||
".*tenant_list: timeline is not found in remote index while it is present in the tenants registry.*",
|
||||
".*Removing intermediate uninit mark file.*",
|
||||
# FIXME: known race condition in TaskHandle: https://github.com/neondatabase/neon/issues/2885
|
||||
".*sender is dropped while join handle is still alive.*",
|
||||
# Tenant::delete_timeline() can cause any of the four following errors.
|
||||
# FIXME: we shouldn't be considering it an error: https://github.com/neondatabase/neon/issues/2946
|
||||
".*could not flush frozen layer.*queue is in state Stopped", # when schedule layer upload fails because queued got closed before compaction got killed
|
||||
|
||||
Reference in New Issue
Block a user