From db8ff9d64b8e69c4c5b2dcce1f43570c88716dca Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Wed, 4 Oct 2023 11:24:30 +0300 Subject: [PATCH] testing: record walredo failures to test reports (#5451) We have rare walredo failures with pg16. Let's introduce recording of failing walredo input in `#[cfg(feature = "testing")]`. There is additional logging (the value reconstruction path logging usually shown with not found keys), keeping it for `#[cfg(features = "testing")]`. Cc: #5404. --- compute_tools/src/monitor.rs | 7 ++- pageserver/src/tenant/timeline.rs | 35 ++++++++++--- pageserver/src/walredo.rs | 87 +++++++++++++++++++++++++------ test_runner/fixtures/utils.py | 5 +- 4 files changed, 108 insertions(+), 26 deletions(-) diff --git a/compute_tools/src/monitor.rs b/compute_tools/src/monitor.rs index 1085a27902..f974d6023d 100644 --- a/compute_tools/src/monitor.rs +++ b/compute_tools/src/monitor.rs @@ -1,5 +1,5 @@ use std::sync::Arc; -use std::{thread, time}; +use std::{thread, time::Duration}; use chrono::{DateTime, Utc}; use postgres::{Client, NoTls}; @@ -7,7 +7,7 @@ use tracing::{debug, info}; use crate::compute::ComputeNode; -const MONITOR_CHECK_INTERVAL: u64 = 500; // milliseconds +const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500); // Spin in a loop and figure out the last activity time in the Postgres. // Then update it in the shared state. This function never errors out. @@ -17,13 +17,12 @@ fn watch_compute_activity(compute: &ComputeNode) { let connstr = compute.connstr.as_str(); // Define `client` outside of the loop to reuse existing connection if it's active. let mut client = Client::connect(connstr, NoTls); - let timeout = time::Duration::from_millis(MONITOR_CHECK_INTERVAL); info!("watching Postgres activity at {}", connstr); loop { // Should be outside of the write lock to allow others to read while we sleep. - thread::sleep(timeout); + thread::sleep(MONITOR_CHECK_INTERVAL); match &mut client { Ok(cli) => { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 9b62ba1c50..f8566f1a51 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -496,13 +496,36 @@ impl Timeline { }; let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer(); - self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx) + let path = self + .get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx) .await?; timer.stop_and_record(); - RECONSTRUCT_TIME + let res = RECONSTRUCT_TIME .observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state)) - .await + .await; + + if cfg!(feature = "testing") && res.is_err() { + // it can only be walredo issue + use std::fmt::Write; + + let mut msg = String::new(); + + path.into_iter().for_each(|(res, cont_lsn, layer)| { + writeln!( + msg, + "- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}", + layer(), + ) + .expect("string grows") + }); + + // this is to rule out or provide evidence that we could in some cases read a duplicate + // walrecord + tracing::info!("walredo failed, path:\n{msg}"); + } + + res } /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev. @@ -2224,7 +2247,7 @@ impl Timeline { request_lsn: Lsn, reconstruct_state: &mut ValueReconstructState, ctx: &RequestContext, - ) -> Result<(), PageReconstructError> { + ) -> Result, PageReconstructError> { // Start from the current timeline. let mut timeline_owned; let mut timeline = self; @@ -2255,12 +2278,12 @@ impl Timeline { // The function should have updated 'state' //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn); match result { - ValueReconstructResult::Complete => return Ok(()), + ValueReconstructResult::Complete => return Ok(traversal_path), ValueReconstructResult::Continue => { // If we reached an earlier cached page image, we're done. if cont_lsn == cached_lsn + 1 { MATERIALIZED_PAGE_CACHE_HIT.inc_by(1); - return Ok(()); + return Ok(traversal_path); } if prev_lsn <= cont_lsn { // Didn't make any progress in last iteration. Error out to avoid diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index bc250166ce..c266b616e2 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -38,6 +38,9 @@ use tracing::*; use utils::crashsafe::path_with_suffix_extension; use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock}; +#[cfg(feature = "testing")] +use std::sync::atomic::{AtomicUsize, Ordering}; + use crate::metrics::{ WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME, WAL_REDO_WAIT_TIME, @@ -113,6 +116,9 @@ struct ProcessOutput { pub struct PostgresRedoManager { tenant_id: TenantId, conf: &'static PageServerConf, + /// Counter to separate same sized walredo inputs failing at the same millisecond. + #[cfg(feature = "testing")] + dump_sequence: AtomicUsize, stdout: Mutex>, stdin: Mutex>, @@ -224,6 +230,8 @@ impl PostgresRedoManager { PostgresRedoManager { tenant_id, conf, + #[cfg(feature = "testing")] + dump_sequence: AtomicUsize::default(), stdin: Mutex::new(None), stdout: Mutex::new(None), stderr: Mutex::new(None), @@ -290,25 +298,25 @@ impl PostgresRedoManager { WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64); debug!( - "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}", - len, - nbytes, - duration.as_micros(), - lsn - ); + "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}", + len, + nbytes, + duration.as_micros(), + lsn + ); // If something went wrong, don't try to reuse the process. Kill it, and // next request will launch a new one. if result.is_err() { error!( - "error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {}", - records.len(), - records.first().map(|p| p.0).unwrap_or(Lsn(0)), - records.last().map(|p| p.0).unwrap_or(Lsn(0)), - nbytes, - base_img_lsn, - lsn - ); + "error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {}", + records.len(), + records.first().map(|p| p.0).unwrap_or(Lsn(0)), + records.last().map(|p| p.0).unwrap_or(Lsn(0)), + nbytes, + base_img_lsn, + lsn + ); // self.stdin only holds stdin & stderr as_raw_fd(). // Dropping it as part of take() doesn't close them. // The owning objects (ChildStdout and ChildStderr) are stored in @@ -742,7 +750,7 @@ impl PostgresRedoManager { #[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%input.as_ref().unwrap().child.id()))] fn apply_wal_records( &self, - mut input: MutexGuard>, + input: MutexGuard>, tag: BufferTag, base_img: &Option, records: &[(Lsn, NeonWalRecord)], @@ -779,6 +787,23 @@ impl PostgresRedoManager { build_get_page_msg(tag, &mut writebuf); WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64); + let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout); + + if res.is_err() { + // not all of these can be caused by this particular input, however these are so rare + // in tests so capture all. + self.record_and_log(&writebuf); + } + + res + } + + fn apply_wal_records0( + &self, + writebuf: &[u8], + mut input: MutexGuard>, + wal_redo_timeout: Duration, + ) -> Result { let proc = input.as_mut().unwrap(); let mut nwrite = 0usize; let stdout_fd = proc.stdout_fd; @@ -984,6 +1009,38 @@ impl PostgresRedoManager { } Ok(res) } + + #[cfg(feature = "testing")] + fn record_and_log(&self, writebuf: &[u8]) { + let millis = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis(); + + let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed); + + // these files will be collected to an allure report + let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len()); + + let path = self.conf.tenant_path(&self.tenant_id).join(&filename); + + let res = std::fs::OpenOptions::new() + .write(true) + .create_new(true) + .read(true) + .open(path) + .and_then(|mut f| f.write_all(writebuf)); + + // trip up allowed_errors + if let Err(e) = res { + tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}"); + } else { + tracing::error!(filename, "erroring walredo input saved"); + } + } + + #[cfg(not(feature = "testing"))] + fn record_and_log(&self, _: &[u8]) {} } /// Wrapper type around `std::process::Child` which guarantees that the child diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 46ab446f99..fbf3b5a6f2 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -222,7 +222,7 @@ def get_scale_for_db(size_mb: int) -> int: ATTACHMENT_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg] - r"regression\.diffs|.+\.(?:log|stderr|stdout|filediff|metrics|html)" + r"regression\.diffs|.+\.(?:log|stderr|stdout|filediff|metrics|html|walredo)" ) @@ -250,6 +250,9 @@ def allure_attach_from_dir(dir: Path): elif source.endswith(".html"): attachment_type = "text/html" extension = "html" + elif source.endswith(".walredo"): + attachment_type = "application/octet-stream" + extension = "walredo" else: attachment_type = "text/plain" extension = attachment.suffix.removeprefix(".")