From 586e6e55f8281afa49d1504c236eb12564f361e5 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Wed, 3 May 2023 16:25:19 +0300 Subject: [PATCH] Print WalReceiver context on WAL waiting timeout (#4090) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes https://github.com/neondatabase/neon/issues/2106 Before: ``` Extracting base backup to create postgres instance: path=/Users/someonetoignore/work/neon/neon_main/test_output/test_pageserver_lsn_wait_error_safekeeper_stop/repo/endpoints/ep-2/pgdata port=15017 stderr: command failed: page server 'basebackup' command failed Caused by: 0: db error: ERROR: Timed out while waiting for WAL record at LSN 0/FFFFFFFF to arrive, last_record_lsn 0/A2C3F58 disk consistent LSN=0/16B5A50 1: ERROR: Timed out while waiting for WAL record at LSN 0/FFFFFFFF to arrive, last_record_lsn 0/A2C3F58 disk consistent LSN=0/16B5A50 Stack backtrace: ``` After: ``` Extracting base backup to create postgres instance: path=/Users/someonetoignore/work/neon/neon/test_output/test_pageserver_lsn_wait_error_safekeeper_stop/repo/endpoints/ep-2/pgdata port=15011 stderr: command failed: page server 'basebackup' command failed Caused by: 0: db error: ERROR: Timed out while waiting for WAL record at LSN 0/FFFFFFFF to arrive, last_record_lsn 0/A2C3F58 disk consistent LSN=0/16B5A50, WalReceiver status (update 2023-04-26 14:20:39): streaming WAL from node 12346, commit|streaming Lsn: 0/A2C3F58|0/A2C3F58, safekeeper candidates (id|update_time|commit_lsn): [(12348|14:20:40|0/A2C3F58), (12346|14:20:40|0/A2C3F58), (12347|14:20:40|0/A2C3F58)] 1: ERROR: Timed out while waiting for WAL record at LSN 0/FFFFFFFF to arrive, last_record_lsn 0/A2C3F58 disk consistent LSN=0/16B5A50, WalReceiver status (update 2023-04-26 14:20:39): streaming WAL from node 12346, commit|streaming Lsn: 0/A2C3F58|0/A2C3F58, safekeeper candidates (id|update_time|commit_lsn): [(12348|14:20:40|0/A2C3F58), (12346|14:20:40|0/A2C3F58), (12347|14:20:40|0/A2C3F58)] Stack backtrace: ``` As the issue requests, the PR adds the context in logs only, but I think we should expose the context via HTTP management API similar way — it should be simple with the new API, but better be done in a separate PR. Co-authored-by: Kirill Bulatov --- pageserver/src/tenant/timeline.rs | 28 +++-- pageserver/src/tenant/timeline/walreceiver.rs | 26 ++-- .../walreceiver/connection_manager.rs | 96 ++++++++++++++- .../walreceiver/walreceiver_connection.rs | 8 +- test_runner/regress/test_wal_receiver.py | 115 ++++++++++++++++++ 5 files changed, 252 insertions(+), 21 deletions(-) create mode 100644 test_runner/regress/test_wal_receiver.py diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 8768841d87..bc55c2091c 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -588,15 +588,25 @@ impl Timeline { let _timer = self.metrics.wait_lsn_time_histo.start_timer(); - self.last_record_lsn.wait_for_timeout(lsn, self.conf.wait_lsn_timeout).await - .with_context(|| - format!( - "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}", - lsn, self.get_last_record_lsn(), self.get_disk_consistent_lsn() - ) - )?; - - Ok(()) + match self + .last_record_lsn + .wait_for_timeout(lsn, self.conf.wait_lsn_timeout) + .await + { + Ok(()) => Ok(()), + seqwait_error => { + drop(_timer); + let walreceiver_status = self.walreceiver.status().await; + seqwait_error.with_context(|| format!( + "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, {}", + lsn, + self.get_last_record_lsn(), + self.get_disk_consistent_lsn(), + walreceiver_status.map(|status| status.to_human_readable_string()) + .unwrap_or_else(|| "WalReceiver status: Not active".to_string()), + )) + } + } } /// Check that it is valid to request operations with that lsn. diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index 00f446af38..91f7208194 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -38,12 +38,14 @@ use std::sync::{Arc, Weak}; use std::time::Duration; use storage_broker::BrokerClientChannel; use tokio::select; -use tokio::sync::watch; +use tokio::sync::{watch, RwLock}; use tokio_util::sync::CancellationToken; use tracing::*; use utils::id::TenantTimelineId; +use self::connection_manager::ConnectionManagerStatus; + use super::Timeline; #[derive(Clone)] @@ -63,6 +65,7 @@ pub struct WalReceiver { timeline_ref: Weak, conf: WalReceiverConf, started: AtomicBool, + manager_status: Arc>>, } impl WalReceiver { @@ -76,6 +79,7 @@ impl WalReceiver { timeline_ref, conf, started: AtomicBool::new(false), + manager_status: Arc::new(RwLock::new(None)), } } @@ -96,8 +100,8 @@ impl WalReceiver { let timeline_id = timeline.timeline_id; let walreceiver_ctx = ctx.detached_child(TaskKind::WalReceiverManager, DownloadBehavior::Error); - let wal_receiver_conf = self.conf.clone(); + let loop_status = Arc::clone(&self.manager_status); task_mgr::spawn( WALRECEIVER_RUNTIME.handle(), TaskKind::WalReceiverManager, @@ -115,24 +119,28 @@ impl WalReceiver { select! { _ = task_mgr::shutdown_watcher() => { info!("WAL receiver shutdown requested, shutting down"); - connection_manager_state.shutdown().await; - return Ok(()); + break; }, loop_step_result = connection_manager_loop_step( &mut broker_client, &mut connection_manager_state, &walreceiver_ctx, + &loop_status, ) => match loop_step_result { ControlFlow::Continue(()) => continue, ControlFlow::Break(()) => { info!("Connection manager loop ended, shutting down"); - connection_manager_state.shutdown().await; - return Ok(()); + break; } }, } } - }.instrument(info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id)) + + connection_manager_state.shutdown().await; + *loop_status.write().await = None; + Ok(()) + } + .instrument(info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id)) ); self.started.store(true, atomic::Ordering::Release); @@ -149,6 +157,10 @@ impl WalReceiver { .await; self.started.store(false, atomic::Ordering::Release); } + + pub(super) async fn status(&self) -> Option { + self.manager_status.read().await.clone() + } } /// A handle of an asynchronous task. diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 731c5c4644..17c66238f2 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -24,6 +24,7 @@ use storage_broker::proto::SubscribeSafekeeperInfoRequest; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use storage_broker::BrokerClientChannel; use storage_broker::Streaming; +use tokio::sync::RwLock; use tokio::{select, sync::watch}; use tracing::*; @@ -43,6 +44,7 @@ pub(super) async fn connection_manager_loop_step( broker_client: &mut BrokerClientChannel, connection_manager_state: &mut ConnectionManagerState, ctx: &RequestContext, + manager_status: &RwLock>, ) -> ControlFlow<(), ()> { let mut timeline_state_updates = connection_manager_state .timeline @@ -180,6 +182,7 @@ pub(super) async fn connection_manager_loop_step( .change_connection(new_candidate, ctx) .await } + *manager_status.write().await = Some(connection_manager_state.manager_status()); } } @@ -267,6 +270,78 @@ pub(super) struct ConnectionManagerState { wal_stream_candidates: HashMap, } +/// An information about connection manager's current connection and connection candidates. +#[derive(Debug, Clone)] +pub struct ConnectionManagerStatus { + existing_connection: Option, + wal_stream_candidates: HashMap, +} + +impl ConnectionManagerStatus { + /// Generates a string, describing current connection status in a form, suitable for logging. + pub fn to_human_readable_string(&self) -> String { + let mut resulting_string = "WalReceiver status".to_string(); + match &self.existing_connection { + Some(connection) => { + if connection.has_processed_wal { + resulting_string.push_str(&format!( + " (update {}): streaming WAL from node {}, ", + connection.latest_wal_update.format("%Y-%m-%d %H:%M:%S"), + connection.node, + )); + + match (connection.streaming_lsn, connection.commit_lsn) { + (None, None) => resulting_string.push_str("no streaming data"), + (None, Some(commit_lsn)) => { + resulting_string.push_str(&format!("commit Lsn: {commit_lsn}")) + } + (Some(streaming_lsn), None) => { + resulting_string.push_str(&format!("streaming Lsn: {streaming_lsn}")) + } + (Some(streaming_lsn), Some(commit_lsn)) => resulting_string.push_str( + &format!("commit|streaming Lsn: {commit_lsn}|{streaming_lsn}"), + ), + } + } else if connection.is_connected { + resulting_string.push_str(&format!( + " (update {}): connecting to node {}", + connection + .latest_connection_update + .format("%Y-%m-%d %H:%M:%S"), + connection.node, + )); + } else { + resulting_string.push_str(&format!( + " (update {}): initializing node {} connection", + connection + .latest_connection_update + .format("%Y-%m-%d %H:%M:%S"), + connection.node, + )); + } + } + None => resulting_string.push_str(": disconnected"), + } + + resulting_string.push_str(", safekeeper candidates (id|update_time|commit_lsn): ["); + let mut candidates = self.wal_stream_candidates.iter().peekable(); + while let Some((node_id, candidate_info)) = candidates.next() { + resulting_string.push_str(&format!( + "({}|{}|{})", + node_id, + candidate_info.latest_update.format("%H:%M:%S"), + Lsn(candidate_info.timeline.commit_lsn) + )); + if candidates.peek().is_some() { + resulting_string.push_str(", "); + } + } + resulting_string.push(']'); + + resulting_string + } +} + /// Current connection data. #[derive(Debug)] struct WalConnection { @@ -293,14 +368,14 @@ struct NewCommittedWAL { discovered_at: NaiveDateTime, } -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] struct RetryInfo { next_retry_at: Option, retry_duration_seconds: f64, } /// Data about the timeline to connect to, received from the broker. -#[derive(Debug)] +#[derive(Debug, Clone)] struct BrokerSkTimeline { timeline: SafekeeperTimelineInfo, /// Time at which the data was fetched from the broker last time, to track the stale data. @@ -328,6 +403,7 @@ impl ConnectionManagerState { self.drop_old_connection(true).await; let id = self.id; + let node_id = new_sk.safekeeper_id; let connect_timeout = self.conf.wal_connect_timeout; let timeline = Arc::clone(&self.timeline); let ctx = ctx.detached_child( @@ -343,12 +419,13 @@ impl ConnectionManagerState { cancellation, connect_timeout, ctx, + node_id, ) .await .context("walreceiver connection handling failure") } .instrument( - info_span!("walreceiver_connection", tenant_id = %id.tenant_id, timeline_id = %id.timeline_id, node_id = %new_sk.safekeeper_id), + info_span!("walreceiver_connection", tenant_id = %id.tenant_id, timeline_id = %id.timeline_id, %node_id), ) }); @@ -364,6 +441,7 @@ impl ConnectionManagerState { latest_wal_update: now, streaming_lsn: None, commit_lsn: None, + node: node_id, }, connection_task: connection_handle, discovered_new_wal: None, @@ -725,6 +803,13 @@ impl ConnectionManagerState { wal_connection.connection_task.shutdown().await; } } + + fn manager_status(&self) -> ConnectionManagerStatus { + ConnectionManagerStatus { + existing_connection: self.wal_connection.as_ref().map(|conn| conn.status), + wal_stream_candidates: self.wal_stream_candidates.clone(), + } + } } #[derive(Debug)] @@ -867,6 +952,7 @@ mod tests { latest_wal_update: now, commit_lsn: Some(Lsn(current_lsn)), streaming_lsn: Some(Lsn(current_lsn)), + node: NodeId(1), }; state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap(); @@ -1035,6 +1121,7 @@ mod tests { latest_wal_update: now, commit_lsn: Some(current_lsn), streaming_lsn: Some(current_lsn), + node: connected_sk_id, }; state.wal_connection = Some(WalConnection { @@ -1101,6 +1188,7 @@ mod tests { latest_wal_update: time_over_threshold, commit_lsn: Some(current_lsn), streaming_lsn: Some(current_lsn), + node: NodeId(1), }; state.wal_connection = Some(WalConnection { @@ -1164,6 +1252,7 @@ mod tests { latest_wal_update: time_over_threshold, commit_lsn: Some(current_lsn), streaming_lsn: Some(current_lsn), + node: NodeId(1), }; state.wal_connection = Some(WalConnection { @@ -1261,6 +1350,7 @@ mod tests { latest_wal_update: now, commit_lsn: Some(current_lsn), streaming_lsn: Some(current_lsn), + node: connected_sk_id, }; state.wal_connection = Some(WalConnection { diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index d80c7c5673..801641a534 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -37,8 +37,8 @@ use crate::{ use postgres_backend::is_expected_io_error; use postgres_connection::PgConnectionConfig; use postgres_ffi::waldecoder::WalStreamDecoder; -use utils::lsn::Lsn; use utils::pageserver_feedback::PageserverFeedback; +use utils::{id::NodeId, lsn::Lsn}; /// Status of the connection. #[derive(Debug, Clone, Copy)] @@ -56,6 +56,8 @@ pub(super) struct WalConnectionStatus { pub streaming_lsn: Option, /// Latest commit_lsn received from the safekeeper. Can be zero if no message has been received yet. pub commit_lsn: Option, + /// The node it is connected to + pub node: NodeId, } /// Open a connection to the given safekeeper and receive WAL, sending back progress @@ -67,6 +69,7 @@ pub(super) async fn handle_walreceiver_connection( cancellation: CancellationToken, connect_timeout: Duration, ctx: RequestContext, + node: NodeId, ) -> anyhow::Result<()> { // Connect to the database in replication mode. info!("connecting to {wal_source_connconf:?}"); @@ -100,6 +103,7 @@ pub(super) async fn handle_walreceiver_connection( latest_wal_update: Utc::now().naive_utc(), streaming_lsn: None, commit_lsn: None, + node, }; if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) { warn!("Wal connection event listener dropped right after connection init, aborting the connection: {e}"); @@ -122,7 +126,7 @@ pub(super) async fn handle_walreceiver_connection( false, async move { select! { - connection_result = connection => match connection_result{ + connection_result = connection => match connection_result { Ok(()) => info!("Walreceiver db connection closed"), Err(connection_error) => { if let Err(e) = ignore_expected_errors(connection_error) { diff --git a/test_runner/regress/test_wal_receiver.py b/test_runner/regress/test_wal_receiver.py new file mode 100644 index 0000000000..8e4e154be1 --- /dev/null +++ b/test_runner/regress/test_wal_receiver.py @@ -0,0 +1,115 @@ +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder +from fixtures.types import Lsn, TenantId + + +# Checks that pageserver's walreceiver state is printed in the logs during WAL wait timeout. +# Ensures that walreceiver does not run without any data inserted and only starts after the insertion. +def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder): + # Trigger WAL wait timeout faster + neon_env_builder.pageserver_config_override = "wait_lsn_timeout = '1s'" + env = neon_env_builder.init_start() + env.pageserver.http_client() + + tenant_id, timeline_id = env.neon_cli.create_tenant() + expected_timeout_error = f"Timed out while waiting for WAL record at LSN {future_lsn} to arrive" + env.pageserver.allowed_errors.append(f".*{expected_timeout_error}.*") + + try: + trigger_wait_lsn_timeout(env, tenant_id) + except Exception as e: + exception_string = str(e) + assert expected_timeout_error in exception_string, "Should time out during waiting for WAL" + assert ( + "WalReceiver status: Not active" in exception_string + ), "Walreceiver should not be active before any data writes" + + insert_test_elements(env, tenant_id, start=0, count=1_000) + try: + trigger_wait_lsn_timeout(env, tenant_id) + except Exception as e: + exception_string = str(e) + assert expected_timeout_error in exception_string, "Should time out during waiting for WAL" + assert ( + "WalReceiver status: Not active" not in exception_string + ), "Should not be inactive anymore after INSERTs are made" + assert "WalReceiver status" in exception_string, "But still should have some other status" + + +# Checks that all active safekeepers are shown in pageserver's walreceiver state printed on WAL wait timeout. +# Kills one of the safekeepers and ensures that only the active ones are printed in the state. +def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuilder): + # Trigger WAL wait timeout faster + neon_env_builder.pageserver_config_override = "wait_lsn_timeout = '1s'" + # Have notable SK ids to ensure we check logs for their presence, not some other random numbers + neon_env_builder.safekeepers_id_start = 12345 + neon_env_builder.num_safekeepers = 3 + env = neon_env_builder.init_start() + env.pageserver.http_client() + + tenant_id, timeline_id = env.neon_cli.create_tenant() + + elements_to_insert = 1_000_000 + expected_timeout_error = f"Timed out while waiting for WAL record at LSN {future_lsn} to arrive" + env.pageserver.allowed_errors.append(f".*{expected_timeout_error}.*") + + insert_test_elements(env, tenant_id, start=0, count=elements_to_insert) + + try: + trigger_wait_lsn_timeout(env, tenant_id) + except Exception as e: + exception_string = str(e) + assert expected_timeout_error in exception_string, "Should time out during waiting for WAL" + + for safekeeper in env.safekeepers: + assert ( + str(safekeeper.id) in exception_string + ), f"Should have safekeeper {safekeeper.id} printed in walreceiver state after WAL wait timeout" + + stopped_safekeeper = env.safekeepers[-1] + stopped_safekeeper_id = stopped_safekeeper.id + log.info(f"Stopping safekeeper {stopped_safekeeper.id}") + stopped_safekeeper.stop() + + # Spend some more time inserting, to ensure SKs report updated statuses and walreceiver in PS have time to update its connection stats. + insert_test_elements(env, tenant_id, start=elements_to_insert + 1, count=elements_to_insert) + + try: + trigger_wait_lsn_timeout(env, tenant_id) + except Exception as e: + exception_string = str(e) + assert expected_timeout_error in exception_string, "Should time out during waiting for WAL" + + for safekeeper in env.safekeepers: + if safekeeper.id == stopped_safekeeper_id: + assert ( + str(safekeeper.id) not in exception_string + ), f"Should not have stopped safekeeper {safekeeper.id} printed in walreceiver state after 2nd WAL wait timeout" + else: + assert ( + str(safekeeper.id) in exception_string + ), f"Should have safekeeper {safekeeper.id} printed in walreceiver state after 2nd WAL wait timeout" + + +def insert_test_elements(env: NeonEnv, tenant_id: TenantId, start: int, count: int): + first_element_id = start + last_element_id = first_element_id + count + with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint: + with endpoint.cursor() as cur: + cur.execute("CREATE TABLE IF NOT EXISTS t(key serial primary key, value text)") + cur.execute( + f"INSERT INTO t SELECT i, CONCAT('payload_', i) FROM generate_series({first_element_id},{last_element_id}) as i" + ) + + +future_lsn = Lsn("0/FFFFFFFF") + + +def trigger_wait_lsn_timeout(env: NeonEnv, tenant_id: TenantId): + with env.endpoints.create_start( + "main", + tenant_id=tenant_id, + lsn=future_lsn, + ) as endpoint: + with endpoint.cursor() as cur: + cur.execute("SELECT 1")