From 96f65fad682f9c0a51e67d99ce4d7ed07cb66a20 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Mon, 6 Mar 2023 10:10:58 +0200 Subject: [PATCH 1/3] Handle crash of walredo process and retry applying wal records (#3739) ## Describe your changes Restart walredo process an d retry applying walredo records i case of abnormal walredo process termination ## Issue ticket number and link See #1700 ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. --- pageserver/src/walredo.rs | 117 ++++++++++++++++++++------------------ 1 file changed, 61 insertions(+), 56 deletions(-) diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 21c6ede27e..72865ad74d 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -256,52 +256,53 @@ impl PostgresRedoManager { pg_version: u32, ) -> Result { let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?; - + const MAX_RETRY_ATTEMPTS: u32 = 1; let start_time = Instant::now(); + let mut n_attempts = 0u32; + loop { + let mut proc = self.stdin.lock().unwrap(); + let lock_time = Instant::now(); - let mut proc = self.stdin.lock().unwrap(); - let lock_time = Instant::now(); + // launch the WAL redo process on first use + if proc.is_none() { + self.launch(&mut proc, pg_version)?; + } + WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64()); - // launch the WAL redo process on first use - if proc.is_none() { - self.launch(&mut proc, pg_version)?; - } - WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64()); + // Relational WAL records are applied using wal-redo-postgres + let buf_tag = BufferTag { rel, blknum }; + let result = self + .apply_wal_records(proc, buf_tag, &base_img, records, wal_redo_timeout) + .map_err(WalRedoError::IoError); - // Relational WAL records are applied using wal-redo-postgres - let buf_tag = BufferTag { rel, blknum }; - let result = self - .apply_wal_records(proc, buf_tag, base_img, records, wal_redo_timeout) - .map_err(WalRedoError::IoError); + let end_time = Instant::now(); + let duration = end_time.duration_since(lock_time); - let end_time = Instant::now(); - let duration = end_time.duration_since(lock_time); + let len = records.len(); + let nbytes = records.iter().fold(0, |acumulator, record| { + acumulator + + match &record.1 { + NeonWalRecord::Postgres { rec, .. } => rec.len(), + _ => unreachable!("Only PostgreSQL records are accepted in this batch"), + } + }); - let len = records.len(); - let nbytes = records.iter().fold(0, |acumulator, record| { - acumulator - + match &record.1 { - NeonWalRecord::Postgres { rec, .. } => rec.len(), - _ => unreachable!("Only PostgreSQL records are accepted in this batch"), - } - }); + WAL_REDO_TIME.observe(duration.as_secs_f64()); + WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64); + WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64); - WAL_REDO_TIME.observe(duration.as_secs_f64()); - WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64); - WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64); + debug!( + "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}", + len, + nbytes, + duration.as_micros(), + lsn + ); - debug!( - "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}", - len, - nbytes, - duration.as_micros(), - lsn - ); - - // If something went wrong, don't try to reuse the process. Kill it, and - // next request will launch a new one. - if result.is_err() { - error!( + // If something went wrong, don't try to reuse the process. Kill it, and + // next request will launch a new one. + if result.is_err() { + error!( "error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {}", records.len(), records.first().map(|p| p.0).unwrap_or(Lsn(0)), @@ -310,24 +311,28 @@ impl PostgresRedoManager { base_img_lsn, lsn ); - // self.stdin only holds stdin & stderr as_raw_fd(). - // Dropping it as part of take() doesn't close them. - // The owning objects (ChildStdout and ChildStderr) are stored in - // self.stdout and self.stderr, respsectively. - // We intentionally keep them open here to avoid a race between - // currently running `apply_wal_records()` and a `launch()` call - // after we return here. - // The currently running `apply_wal_records()` must not read from - // the newly launched process. - // By keeping self.stdout and self.stderr open here, `launch()` will - // get other file descriptors for the new child's stdout and stderr, - // and hence the current `apply_wal_records()` calls will observe - // `output.stdout.as_raw_fd() != stdout_fd` . - if let Some(proc) = self.stdin.lock().unwrap().take() { - proc.child.kill_and_wait(); + // self.stdin only holds stdin & stderr as_raw_fd(). + // Dropping it as part of take() doesn't close them. + // The owning objects (ChildStdout and ChildStderr) are stored in + // self.stdout and self.stderr, respsectively. + // We intentionally keep them open here to avoid a race between + // currently running `apply_wal_records()` and a `launch()` call + // after we return here. + // The currently running `apply_wal_records()` must not read from + // the newly launched process. + // By keeping self.stdout and self.stderr open here, `launch()` will + // get other file descriptors for the new child's stdout and stderr, + // and hence the current `apply_wal_records()` calls will observe + // `output.stdout.as_raw_fd() != stdout_fd` . + if let Some(proc) = self.stdin.lock().unwrap().take() { + proc.child.kill_and_wait(); + } + } + n_attempts += 1; + if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() { + return result; } } - result } /// @@ -771,7 +776,7 @@ impl PostgresRedoManager { &self, mut input: MutexGuard>, tag: BufferTag, - base_img: Option, + base_img: &Option, records: &[(Lsn, NeonWalRecord)], wal_redo_timeout: Duration, ) -> Result { @@ -787,7 +792,7 @@ impl PostgresRedoManager { let mut writebuf: Vec = Vec::with_capacity((BLCKSZ as usize) * 3); build_begin_redo_for_block_msg(tag, &mut writebuf); if let Some(img) = base_img { - build_push_page_msg(tag, &img, &mut writebuf); + build_push_page_msg(tag, img, &mut writebuf); } for (lsn, rec) in records.iter() { if let NeonWalRecord::Postgres { From 7b9057ad0115c721fcdcd2585062576e993fb5a6 Mon Sep 17 00:00:00 2001 From: Shany Pozin Date: Mon, 6 Mar 2023 18:52:59 +0200 Subject: [PATCH 2/3] Add timeout to download copy (#3675) ## Describe your changes Adding a timeout handling for the remote download of layers of 120 seconds for each operation Note that these downloads are being retried for N times ## Issue ticket number and link Fixes: #3672 ## Checklist before requesting a review - [x] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. --------- Co-authored-by: Joonas Koivunen --- .../tenant/remote_timeline_client/download.rs | 26 +++++++++++++------ pageserver/src/tenant/timeline.rs | 2 +- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 2e79698087..ea8d9858c3 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -6,11 +6,13 @@ use std::collections::HashSet; use std::future::Future; use std::path::Path; +use std::time::Duration; use anyhow::{anyhow, Context}; use tokio::fs; use tokio::io::AsyncWriteExt; -use tracing::{error, info, warn}; + +use tracing::{info, warn}; use crate::config::PageServerConf; use crate::tenant::storage_layer::LayerFileName; @@ -26,6 +28,8 @@ async fn fsync_path(path: impl AsRef) -> Result<(), std::io::Er fs::File::open(path).await?.sync_all().await } +static MAX_DOWNLOAD_DURATION: Duration = Duration::from_secs(120); + /// /// If 'metadata' is given, we will validate that the downloaded file's size matches that /// in the metadata. (In the future, we might do more cross-checks, like CRC validation) @@ -64,22 +68,28 @@ pub async fn download_layer_file<'a>( // TODO: this doesn't use the cached fd for some reason? let mut destination_file = fs::File::create(&temp_file_path).await.with_context(|| { format!( - "Failed to create a destination file for layer '{}'", + "create a destination file for layer '{}'", temp_file_path.display() ) }) .map_err(DownloadError::Other)?; let mut download = storage.download(&remote_path).await.with_context(|| { format!( - "Failed to open a download stream for layer with remote storage path '{remote_path:?}'" + "open a download stream for layer with remote storage path '{remote_path:?}'" ) }) .map_err(DownloadError::Other)?; - let bytes_amount = tokio::io::copy(&mut download.download_stream, &mut destination_file).await.with_context(|| { - format!("Failed to download layer with remote storage path '{remote_path:?}' into file {temp_file_path:?}") - }) - .map_err(DownloadError::Other)?; + + let bytes_amount = tokio::time::timeout(MAX_DOWNLOAD_DURATION, tokio::io::copy(&mut download.download_stream, &mut destination_file)) + .await + .map_err(|e| DownloadError::Other(anyhow::anyhow!("Timed out {:?}", e)))? + .with_context(|| { + format!("Failed to download layer with remote storage path '{remote_path:?}' into file {temp_file_path:?}") + }) + .map_err(DownloadError::Other)?; + Ok((destination_file, bytes_amount)) + }, &format!("download {remote_path:?}"), ).await?; @@ -300,7 +310,7 @@ where } Err(DownloadError::Other(ref err)) => { // Operation failed FAILED_DOWNLOAD_RETRIES times. Time to give up. - error!("{description} still failed after {attempts} retries, giving up: {err:?}"); + warn!("{description} still failed after {attempts} retries, giving up: {err:?}"); return result; } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 8b24fd6ecd..c304791ee2 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3819,7 +3819,7 @@ impl Timeline { remote_layer.ongoing_download.close(); } else { // Keep semaphore open. We'll drop the permit at the end of the function. - info!("on-demand download failed: {:?}", result.as_ref().unwrap_err()); + error!("on-demand download failed: {:?}", result.as_ref().unwrap_err()); } // Don't treat it as an error if the task that triggered the download From ca85646df46737339296fae5b286c38a8f097fa9 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 6 Mar 2023 14:20:58 +0400 Subject: [PATCH 3/3] Max peer_horizon_lsn before adopting it. Before this patch, persistent peer_horizon_lsn was always sent to walproposer, making it initially calculate it equal to max of persistent values and in turn pulling back the in memory value. Send instead in memory value and take max when safekeeper sets it. closes https://github.com/neondatabase/neon/issues/3752 --- safekeeper/src/safekeeper.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index c37411d667..7df347427e 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -191,7 +191,8 @@ pub struct SafeKeeperState { /// Minimal LSN which may be needed for recovery of some safekeeper (end_lsn /// of last record streamed to everyone). Persisting it helps skipping /// recovery in walproposer, generally we compute it from peers. In - /// walproposer proto called 'truncate_lsn'. + /// walproposer proto called 'truncate_lsn'. Updates are currently drived + /// only by walproposer. pub peer_horizon_lsn: Lsn, /// LSN of the oldest known checkpoint made by pageserver and successfully /// pushed to s3. We don't remove WAL beyond it. Persisted only for @@ -682,7 +683,7 @@ where term: self.state.acceptor_state.term, vote_given: false as u64, flush_lsn: self.flush_lsn(), - truncate_lsn: self.state.peer_horizon_lsn, + truncate_lsn: self.inmem.peer_horizon_lsn, term_history: self.get_term_history(), timeline_start_lsn: self.state.timeline_start_lsn, }; @@ -878,7 +879,13 @@ where if msg.h.commit_lsn != Lsn(0) { self.update_commit_lsn(msg.h.commit_lsn)?; } - self.inmem.peer_horizon_lsn = msg.h.truncate_lsn; + // Value calculated by walproposer can always lag: + // - safekeepers can forget inmem value and send to proposer lower + // persisted one on restart; + // - if we make safekeepers always send persistent value, + // any compute restart would pull it down. + // Thus, take max before adopting. + self.inmem.peer_horizon_lsn = max(self.inmem.peer_horizon_lsn, msg.h.truncate_lsn); // Update truncate and commit LSN in control file. // To avoid negative impact on performance of extra fsync, do it only