mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 13:02:55 +00:00
testing: record walredo failures to test reports (#5451)
We have rare walredo failures with pg16. Let's introduce recording of failing walredo input in `#[cfg(feature = "testing")]`. There is additional logging (the value reconstruction path logging usually shown with not found keys), keeping it for `#[cfg(features = "testing")]`. Cc: #5404.
This commit is contained in:
@@ -1,5 +1,5 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::{thread, time};
|
use std::{thread, time::Duration};
|
||||||
|
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use postgres::{Client, NoTls};
|
use postgres::{Client, NoTls};
|
||||||
@@ -7,7 +7,7 @@ use tracing::{debug, info};
|
|||||||
|
|
||||||
use crate::compute::ComputeNode;
|
use crate::compute::ComputeNode;
|
||||||
|
|
||||||
const MONITOR_CHECK_INTERVAL: u64 = 500; // milliseconds
|
const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
|
||||||
|
|
||||||
// Spin in a loop and figure out the last activity time in the Postgres.
|
// Spin in a loop and figure out the last activity time in the Postgres.
|
||||||
// Then update it in the shared state. This function never errors out.
|
// Then update it in the shared state. This function never errors out.
|
||||||
@@ -17,13 +17,12 @@ fn watch_compute_activity(compute: &ComputeNode) {
|
|||||||
let connstr = compute.connstr.as_str();
|
let connstr = compute.connstr.as_str();
|
||||||
// Define `client` outside of the loop to reuse existing connection if it's active.
|
// Define `client` outside of the loop to reuse existing connection if it's active.
|
||||||
let mut client = Client::connect(connstr, NoTls);
|
let mut client = Client::connect(connstr, NoTls);
|
||||||
let timeout = time::Duration::from_millis(MONITOR_CHECK_INTERVAL);
|
|
||||||
|
|
||||||
info!("watching Postgres activity at {}", connstr);
|
info!("watching Postgres activity at {}", connstr);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// Should be outside of the write lock to allow others to read while we sleep.
|
// Should be outside of the write lock to allow others to read while we sleep.
|
||||||
thread::sleep(timeout);
|
thread::sleep(MONITOR_CHECK_INTERVAL);
|
||||||
|
|
||||||
match &mut client {
|
match &mut client {
|
||||||
Ok(cli) => {
|
Ok(cli) => {
|
||||||
|
|||||||
@@ -496,13 +496,36 @@ impl Timeline {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
|
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
|
||||||
self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
|
let path = self
|
||||||
|
.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
|
||||||
.await?;
|
.await?;
|
||||||
timer.stop_and_record();
|
timer.stop_and_record();
|
||||||
|
|
||||||
RECONSTRUCT_TIME
|
let res = RECONSTRUCT_TIME
|
||||||
.observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
|
.observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
|
||||||
.await
|
.await;
|
||||||
|
|
||||||
|
if cfg!(feature = "testing") && res.is_err() {
|
||||||
|
// it can only be walredo issue
|
||||||
|
use std::fmt::Write;
|
||||||
|
|
||||||
|
let mut msg = String::new();
|
||||||
|
|
||||||
|
path.into_iter().for_each(|(res, cont_lsn, layer)| {
|
||||||
|
writeln!(
|
||||||
|
msg,
|
||||||
|
"- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
|
||||||
|
layer(),
|
||||||
|
)
|
||||||
|
.expect("string grows")
|
||||||
|
});
|
||||||
|
|
||||||
|
// this is to rule out or provide evidence that we could in some cases read a duplicate
|
||||||
|
// walrecord
|
||||||
|
tracing::info!("walredo failed, path:\n{msg}");
|
||||||
|
}
|
||||||
|
|
||||||
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
|
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
|
||||||
@@ -2224,7 +2247,7 @@ impl Timeline {
|
|||||||
request_lsn: Lsn,
|
request_lsn: Lsn,
|
||||||
reconstruct_state: &mut ValueReconstructState,
|
reconstruct_state: &mut ValueReconstructState,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> Result<(), PageReconstructError> {
|
) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
|
||||||
// Start from the current timeline.
|
// Start from the current timeline.
|
||||||
let mut timeline_owned;
|
let mut timeline_owned;
|
||||||
let mut timeline = self;
|
let mut timeline = self;
|
||||||
@@ -2255,12 +2278,12 @@ impl Timeline {
|
|||||||
// The function should have updated 'state'
|
// The function should have updated 'state'
|
||||||
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
|
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
|
||||||
match result {
|
match result {
|
||||||
ValueReconstructResult::Complete => return Ok(()),
|
ValueReconstructResult::Complete => return Ok(traversal_path),
|
||||||
ValueReconstructResult::Continue => {
|
ValueReconstructResult::Continue => {
|
||||||
// If we reached an earlier cached page image, we're done.
|
// If we reached an earlier cached page image, we're done.
|
||||||
if cont_lsn == cached_lsn + 1 {
|
if cont_lsn == cached_lsn + 1 {
|
||||||
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
|
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
|
||||||
return Ok(());
|
return Ok(traversal_path);
|
||||||
}
|
}
|
||||||
if prev_lsn <= cont_lsn {
|
if prev_lsn <= cont_lsn {
|
||||||
// Didn't make any progress in last iteration. Error out to avoid
|
// Didn't make any progress in last iteration. Error out to avoid
|
||||||
|
|||||||
@@ -38,6 +38,9 @@ use tracing::*;
|
|||||||
use utils::crashsafe::path_with_suffix_extension;
|
use utils::crashsafe::path_with_suffix_extension;
|
||||||
use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
|
use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
|
||||||
|
|
||||||
|
#[cfg(feature = "testing")]
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
|
||||||
use crate::metrics::{
|
use crate::metrics::{
|
||||||
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
|
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
|
||||||
WAL_REDO_WAIT_TIME,
|
WAL_REDO_WAIT_TIME,
|
||||||
@@ -113,6 +116,9 @@ struct ProcessOutput {
|
|||||||
pub struct PostgresRedoManager {
|
pub struct PostgresRedoManager {
|
||||||
tenant_id: TenantId,
|
tenant_id: TenantId,
|
||||||
conf: &'static PageServerConf,
|
conf: &'static PageServerConf,
|
||||||
|
/// Counter to separate same sized walredo inputs failing at the same millisecond.
|
||||||
|
#[cfg(feature = "testing")]
|
||||||
|
dump_sequence: AtomicUsize,
|
||||||
|
|
||||||
stdout: Mutex<Option<ProcessOutput>>,
|
stdout: Mutex<Option<ProcessOutput>>,
|
||||||
stdin: Mutex<Option<ProcessInput>>,
|
stdin: Mutex<Option<ProcessInput>>,
|
||||||
@@ -224,6 +230,8 @@ impl PostgresRedoManager {
|
|||||||
PostgresRedoManager {
|
PostgresRedoManager {
|
||||||
tenant_id,
|
tenant_id,
|
||||||
conf,
|
conf,
|
||||||
|
#[cfg(feature = "testing")]
|
||||||
|
dump_sequence: AtomicUsize::default(),
|
||||||
stdin: Mutex::new(None),
|
stdin: Mutex::new(None),
|
||||||
stdout: Mutex::new(None),
|
stdout: Mutex::new(None),
|
||||||
stderr: Mutex::new(None),
|
stderr: Mutex::new(None),
|
||||||
@@ -290,25 +298,25 @@ impl PostgresRedoManager {
|
|||||||
WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
|
WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
|
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
|
||||||
len,
|
len,
|
||||||
nbytes,
|
nbytes,
|
||||||
duration.as_micros(),
|
duration.as_micros(),
|
||||||
lsn
|
lsn
|
||||||
);
|
);
|
||||||
|
|
||||||
// If something went wrong, don't try to reuse the process. Kill it, and
|
// If something went wrong, don't try to reuse the process. Kill it, and
|
||||||
// next request will launch a new one.
|
// next request will launch a new one.
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
error!(
|
error!(
|
||||||
"error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {}",
|
"error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {}",
|
||||||
records.len(),
|
records.len(),
|
||||||
records.first().map(|p| p.0).unwrap_or(Lsn(0)),
|
records.first().map(|p| p.0).unwrap_or(Lsn(0)),
|
||||||
records.last().map(|p| p.0).unwrap_or(Lsn(0)),
|
records.last().map(|p| p.0).unwrap_or(Lsn(0)),
|
||||||
nbytes,
|
nbytes,
|
||||||
base_img_lsn,
|
base_img_lsn,
|
||||||
lsn
|
lsn
|
||||||
);
|
);
|
||||||
// self.stdin only holds stdin & stderr as_raw_fd().
|
// self.stdin only holds stdin & stderr as_raw_fd().
|
||||||
// Dropping it as part of take() doesn't close them.
|
// Dropping it as part of take() doesn't close them.
|
||||||
// The owning objects (ChildStdout and ChildStderr) are stored in
|
// The owning objects (ChildStdout and ChildStderr) are stored in
|
||||||
@@ -742,7 +750,7 @@ impl PostgresRedoManager {
|
|||||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%input.as_ref().unwrap().child.id()))]
|
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%input.as_ref().unwrap().child.id()))]
|
||||||
fn apply_wal_records(
|
fn apply_wal_records(
|
||||||
&self,
|
&self,
|
||||||
mut input: MutexGuard<Option<ProcessInput>>,
|
input: MutexGuard<Option<ProcessInput>>,
|
||||||
tag: BufferTag,
|
tag: BufferTag,
|
||||||
base_img: &Option<Bytes>,
|
base_img: &Option<Bytes>,
|
||||||
records: &[(Lsn, NeonWalRecord)],
|
records: &[(Lsn, NeonWalRecord)],
|
||||||
@@ -779,6 +787,23 @@ impl PostgresRedoManager {
|
|||||||
build_get_page_msg(tag, &mut writebuf);
|
build_get_page_msg(tag, &mut writebuf);
|
||||||
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
|
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
|
||||||
|
|
||||||
|
let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
|
||||||
|
|
||||||
|
if res.is_err() {
|
||||||
|
// not all of these can be caused by this particular input, however these are so rare
|
||||||
|
// in tests so capture all.
|
||||||
|
self.record_and_log(&writebuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
|
fn apply_wal_records0(
|
||||||
|
&self,
|
||||||
|
writebuf: &[u8],
|
||||||
|
mut input: MutexGuard<Option<ProcessInput>>,
|
||||||
|
wal_redo_timeout: Duration,
|
||||||
|
) -> Result<Bytes, std::io::Error> {
|
||||||
let proc = input.as_mut().unwrap();
|
let proc = input.as_mut().unwrap();
|
||||||
let mut nwrite = 0usize;
|
let mut nwrite = 0usize;
|
||||||
let stdout_fd = proc.stdout_fd;
|
let stdout_fd = proc.stdout_fd;
|
||||||
@@ -984,6 +1009,38 @@ impl PostgresRedoManager {
|
|||||||
}
|
}
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "testing")]
|
||||||
|
fn record_and_log(&self, writebuf: &[u8]) {
|
||||||
|
let millis = std::time::SystemTime::now()
|
||||||
|
.duration_since(std::time::SystemTime::UNIX_EPOCH)
|
||||||
|
.unwrap()
|
||||||
|
.as_millis();
|
||||||
|
|
||||||
|
let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
|
// these files will be collected to an allure report
|
||||||
|
let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
|
||||||
|
|
||||||
|
let path = self.conf.tenant_path(&self.tenant_id).join(&filename);
|
||||||
|
|
||||||
|
let res = std::fs::OpenOptions::new()
|
||||||
|
.write(true)
|
||||||
|
.create_new(true)
|
||||||
|
.read(true)
|
||||||
|
.open(path)
|
||||||
|
.and_then(|mut f| f.write_all(writebuf));
|
||||||
|
|
||||||
|
// trip up allowed_errors
|
||||||
|
if let Err(e) = res {
|
||||||
|
tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
|
||||||
|
} else {
|
||||||
|
tracing::error!(filename, "erroring walredo input saved");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "testing"))]
|
||||||
|
fn record_and_log(&self, _: &[u8]) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Wrapper type around `std::process::Child` which guarantees that the child
|
/// Wrapper type around `std::process::Child` which guarantees that the child
|
||||||
|
|||||||
@@ -222,7 +222,7 @@ def get_scale_for_db(size_mb: int) -> int:
|
|||||||
|
|
||||||
|
|
||||||
ATTACHMENT_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg]
|
ATTACHMENT_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg]
|
||||||
r"regression\.diffs|.+\.(?:log|stderr|stdout|filediff|metrics|html)"
|
r"regression\.diffs|.+\.(?:log|stderr|stdout|filediff|metrics|html|walredo)"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -250,6 +250,9 @@ def allure_attach_from_dir(dir: Path):
|
|||||||
elif source.endswith(".html"):
|
elif source.endswith(".html"):
|
||||||
attachment_type = "text/html"
|
attachment_type = "text/html"
|
||||||
extension = "html"
|
extension = "html"
|
||||||
|
elif source.endswith(".walredo"):
|
||||||
|
attachment_type = "application/octet-stream"
|
||||||
|
extension = "walredo"
|
||||||
else:
|
else:
|
||||||
attachment_type = "text/plain"
|
attachment_type = "text/plain"
|
||||||
extension = attachment.suffix.removeprefix(".")
|
extension = attachment.suffix.removeprefix(".")
|
||||||
|
|||||||
Reference in New Issue
Block a user