Compare commits

...

4 Commits

Author SHA1 Message Date
Christian Schwarz
e0b760ad82 walredo: log a few more breadcrumbs on redo failure
1. ability to tell whether a replay failure happened in the first or
   later iterations of the request_redo for loop.
2. n_requests, i.e, the  total number of request_redo calls completed
   since the current walredo process start

Possibly these will give us a bit more context about when this is
happening.

This commit is quite ugly and should be reverted or logging
infrastructure in walredo.rs should be generally switched to spans.
2023-10-13 11:53:13 +02:00
Christian Schwarz
0d55fd8b72 log n_attempts not as field but inside formatted string
our tracing_subscriber::fmt puts the fields after the formatted string.
We're now loging a full backtrace again => this patch avoids
scrolling down.
2023-10-13 11:50:26 +02:00
Christian Schwarz
082c891afc eliminate WalRedoError type to make {:?} print the innermost backtace
https://github.com/neondatabase/neon/pull/5541#issuecomment-1761134031
2023-10-13 10:49:14 +02:00
Christian Schwarz
5a8af46504 walredo: apply_batch_postgres: get a backtrace whenever it encounters an error
For 2 weeks we've seen rare, spurious, not-reproducible page reconstruction
failures with PG16 in prod.

One of the commits we deployed this week was

Commit

    commit fc467941f9
    Author: Joonas Koivunen <joonas@neon.tech>
    Date:   Wed Oct 4 16:19:19 2023 +0300

        walredo: log retryed error (#546)

With the logs from that commit, we learned that some read() or write()
system call that walredo does fails with `EAGAIN`, aka
`Resource temporarily unavailable (os error 11)`.

But we have no idea where exactly in the code we get back that error.

So, use anyhow instead of fake std::io::Error's as an easy way to get
a backtrace when the error happens, and change the logging to print
that backtrace (i.e., use `{:?}` instead of
`utils::error::report_compact_sources(e)`).

If we ever switch back to `report_compact_sources`, we should make sure
we have some other way to uniquely identify the places where we return
an error in the error message.
2023-10-12 18:16:29 +02:00
5 changed files with 130 additions and 98 deletions

View File

@@ -11,10 +11,7 @@ use std::sync::{Arc, Barrier};
use bytes::{Buf, Bytes};
use pageserver::{
config::PageServerConf,
repository::Key,
walrecord::NeonWalRecord,
walredo::{PostgresRedoManager, WalRedoError},
config::PageServerConf, repository::Key, walrecord::NeonWalRecord, walredo::PostgresRedoManager,
};
use utils::{id::TenantId, lsn::Lsn};
@@ -152,7 +149,7 @@ impl Drop for JoinOnDrop {
}
}
fn execute_all<I>(input: I, manager: &PostgresRedoManager) -> Result<(), WalRedoError>
fn execute_all<I>(input: I, manager: &PostgresRedoManager) -> anyhow::Result<()>
where
I: IntoIterator<Item = Request>,
{
@@ -160,7 +157,7 @@ where
input.into_iter().try_for_each(|req| {
let page = req.execute(manager)?;
assert_eq!(page.remaining(), 8192);
Ok::<_, WalRedoError>(())
anyhow::Ok(())
})
}
@@ -473,7 +470,7 @@ struct Request {
}
impl Request {
fn execute(self, manager: &PostgresRedoManager) -> Result<Bytes, WalRedoError> {
fn execute(self, manager: &PostgresRedoManager) -> anyhow::Result<Bytes> {
use pageserver::walredo::WalRedoManager;
let Request {

View File

@@ -136,9 +136,7 @@ impl From<PageReconstructError> for ApiError {
PageReconstructError::AncestorStopping(_) => {
ApiError::ResourceUnavailable(format!("{pre}").into())
}
PageReconstructError::WalRedo(pre) => {
ApiError::InternalServerError(anyhow::Error::new(pre))
}
PageReconstructError::WalRedo(pre) => ApiError::InternalServerError(pre),
}
}
}

View File

@@ -3448,11 +3448,8 @@ pub mod harness {
use crate::deletion_queue::mock::MockDeletionQueue;
use crate::{
config::PageServerConf,
repository::Key,
tenant::Tenant,
walrecord::NeonWalRecord,
walredo::{WalRedoError, WalRedoManager},
config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
walredo::WalRedoManager,
};
use super::*;
@@ -3625,7 +3622,7 @@ pub mod harness {
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
_pg_version: u32,
) -> Result<Bytes, WalRedoError> {
) -> anyhow::Result<Bytes> {
let s = format!(
"redo for {} to get to {}, with {} and {} records",
key,

View File

@@ -370,7 +370,7 @@ pub enum PageReconstructError {
/// An error happened replaying WAL records
#[error(transparent)]
WalRedo(#[from] crate::walredo::WalRedoError),
WalRedo(anyhow::Error),
}
impl std::fmt::Debug for PageReconstructError {
@@ -4327,7 +4327,7 @@ impl Timeline {
let img = match self
.walredo_mgr
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
.context("Failed to reconstruct a page image:")
.context("Failed to reconstruct a page image")
{
Ok(img) => img,
Err(e) => return Err(PageReconstructError::from(e)),

View File

@@ -18,6 +18,7 @@
//! any WAL records, so that even if an attacker hijacks the Postgres
//! process, he cannot escape out of it.
//!
use anyhow::Context;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
@@ -89,7 +90,7 @@ pub trait WalRedoManager: Send + Sync {
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
pg_version: u32,
) -> Result<Bytes, WalRedoError>;
) -> anyhow::Result<Bytes>;
}
struct ProcessInput {
@@ -100,6 +101,14 @@ struct ProcessInput {
n_requests: usize,
}
enum ApplyWalRecordsError {
WithRequestNo {
error: anyhow::Error,
request_no: usize,
},
NoRequestNo(anyhow::Error),
}
struct ProcessOutput {
stdout: ChildStdout,
pending_responses: VecDeque<Option<Bytes>>,
@@ -140,20 +149,6 @@ fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
}
}
/// An error happened in WAL redo
#[derive(Debug, thiserror::Error)]
pub enum WalRedoError {
#[error(transparent)]
IoError(#[from] std::io::Error),
#[error("cannot perform WAL redo now")]
InvalidState,
#[error("cannot perform WAL redo for this request")]
InvalidRequest,
#[error("cannot perform WAL redo for this record")]
InvalidRecord,
}
///
/// Public interface of WAL redo manager
///
@@ -171,10 +166,9 @@ impl WalRedoManager for PostgresRedoManager {
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
pg_version: u32,
) -> Result<Bytes, WalRedoError> {
) -> anyhow::Result<Bytes> {
if records.is_empty() {
error!("invalid WAL redo request with no records");
return Err(WalRedoError::InvalidRequest);
anyhow::bail!("invalid WAL redo request with no records");
}
let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
@@ -186,7 +180,13 @@ impl WalRedoManager for PostgresRedoManager {
if rec_neon != batch_neon {
let result = if batch_neon {
self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
self.apply_batch_neon(
key,
lsn,
img,
&records[batch_start..i],
(batch_start, records.len()),
)
} else {
self.apply_batch_postgres(
key,
@@ -196,6 +196,7 @@ impl WalRedoManager for PostgresRedoManager {
&records[batch_start..i],
self.conf.wal_redo_timeout,
pg_version,
(batch_start, records.len()),
)
};
img = Some(result?);
@@ -206,7 +207,13 @@ impl WalRedoManager for PostgresRedoManager {
}
// last batch
if batch_neon {
self.apply_batch_neon(key, lsn, img, &records[batch_start..])
self.apply_batch_neon(
key,
lsn,
img,
&records[batch_start..],
(batch_start, records.len()),
)
} else {
self.apply_batch_postgres(
key,
@@ -216,6 +223,7 @@ impl WalRedoManager for PostgresRedoManager {
&records[batch_start..],
self.conf.wal_redo_timeout,
pg_version,
(batch_start, records.len()),
)
}
}
@@ -260,8 +268,9 @@ impl PostgresRedoManager {
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
pg_version: u32,
) -> Result<Bytes, WalRedoError> {
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
diag: impl std::fmt::Debug,
) -> anyhow::Result<Bytes> {
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
const MAX_RETRY_ATTEMPTS: u32 = 1;
let start_time = Instant::now();
let mut n_attempts = 0u32;
@@ -271,15 +280,15 @@ impl PostgresRedoManager {
// launch the WAL redo process on first use
if proc.is_none() {
self.launch(&mut proc, pg_version)?;
self.launch(&mut proc, pg_version)
.context("launch process")?;
}
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
// Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum };
let result = self
.apply_wal_records(proc, buf_tag, &base_img, records, wal_redo_timeout)
.map_err(WalRedoError::IoError);
let result =
self.apply_wal_records(proc, buf_tag, &base_img, records, wal_redo_timeout);
let end_time = Instant::now();
let duration = end_time.duration_since(lock_time);
@@ -298,26 +307,35 @@ impl PostgresRedoManager {
WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
debug!(
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {} diag={:?}",
len,
nbytes,
duration.as_micros(),
lsn
lsn,
diag,
);
// If something went wrong, don't try to reuse the process. Kill it, and
// next request will launch a new one.
if let Err(e) = result.as_ref() {
let (e, request_no) = match e {
ApplyWalRecordsError::WithRequestNo { error, request_no } => {
(error, Some(*request_no))
}
ApplyWalRecordsError::NoRequestNo(e) => (e, None),
};
error!(
n_attempts,
"error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {}: {}",
"error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {} n_attempts={} request_no={:?} {:?}: {:?}",
records.len(),
records.first().map(|p| p.0).unwrap_or(Lsn(0)),
records.last().map(|p| p.0).unwrap_or(Lsn(0)),
nbytes,
base_img_lsn,
lsn,
utils::error::report_compact_sources(e),
n_attempts,
request_no,
diag,
e,
);
// self.stdin only holds stdin & stderr as_raw_fd().
// Dropping it as part of take() doesn't close them.
@@ -336,11 +354,14 @@ impl PostgresRedoManager {
proc.child.kill_and_wait();
}
} else if n_attempts != 0 {
info!(n_attempts, "retried walredo succeeded");
info!(n_attempts, ?diag, "retried walredo succeeded");
}
n_attempts += 1;
if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
return result;
return result.map_err(|e| match e {
ApplyWalRecordsError::WithRequestNo { error, .. } => error,
ApplyWalRecordsError::NoRequestNo(e) => e,
});
}
}
}
@@ -354,7 +375,8 @@ impl PostgresRedoManager {
lsn: Lsn,
base_img: Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
) -> Result<Bytes, WalRedoError> {
diag: impl std::fmt::Debug,
) -> anyhow::Result<Bytes> {
let start_time = Instant::now();
let mut page = BytesMut::new();
@@ -363,8 +385,7 @@ impl PostgresRedoManager {
page.extend_from_slice(&fpi[..]);
} else {
// All the current WAL record types that we can handle require a base image.
error!("invalid neon WAL redo request with no base image");
return Err(WalRedoError::InvalidRequest);
anyhow::bail!("invalid neon WAL redo request with no base image");
}
// Apply all the WAL records in the batch
@@ -377,10 +398,11 @@ impl PostgresRedoManager {
WAL_REDO_TIME.observe(duration.as_secs_f64());
debug!(
"neon applied {} WAL records in {} ms to reconstruct page image at LSN {}",
"neon applied {} WAL records in {} ms to reconstruct page image at LSN {} diag={:?}",
records.len(),
duration.as_micros(),
lsn
lsn,
diag,
);
Ok(page.freeze())
@@ -392,14 +414,13 @@ impl PostgresRedoManager {
page: &mut BytesMut,
_record_lsn: Lsn,
record: &NeonWalRecord,
) -> Result<(), WalRedoError> {
) -> anyhow::Result<()> {
match record {
NeonWalRecord::Postgres {
will_init: _,
rec: _,
} => {
error!("tried to pass postgres wal record to neon WAL redo");
return Err(WalRedoError::InvalidRequest);
anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
}
NeonWalRecord::ClearVisibilityMapFlags {
new_heap_blkno,
@@ -407,7 +428,7 @@ impl PostgresRedoManager {
flags,
} => {
// sanity check that this is modifying the correct relation
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
assert!(
rel.forknum == VISIBILITYMAP_FORKNUM,
"ClearVisibilityMapFlags record on unexpected rel {}",
@@ -445,7 +466,7 @@ impl PostgresRedoManager {
// same effects as the corresponding Postgres WAL redo function.
NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
let (slru_kind, segno, blknum) =
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
key_to_slru_block(key).context("invalid record")?;
assert_eq!(
slru_kind,
SlruKind::Clog,
@@ -495,7 +516,7 @@ impl PostgresRedoManager {
}
NeonWalRecord::ClogSetAborted { xids } => {
let (slru_kind, segno, blknum) =
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
key_to_slru_block(key).context("invalid record")?;
assert_eq!(
slru_kind,
SlruKind::Clog,
@@ -526,7 +547,7 @@ impl PostgresRedoManager {
}
NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
let (slru_kind, segno, blknum) =
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
key_to_slru_block(key).context("invalid record")?;
assert_eq!(
slru_kind,
SlruKind::MultiXactOffsets,
@@ -559,7 +580,7 @@ impl PostgresRedoManager {
}
NeonWalRecord::MultixactMembersCreate { moff, members } => {
let (slru_kind, segno, blknum) =
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
key_to_slru_block(key).context("invalid record")?;
assert_eq!(
slru_kind,
SlruKind::MultiXactMembers,
@@ -759,7 +780,7 @@ impl PostgresRedoManager {
base_img: &Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
) -> Result<Bytes, std::io::Error> {
) -> Result<Bytes, ApplyWalRecordsError> {
// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
@@ -782,10 +803,9 @@ impl PostgresRedoManager {
{
build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
} else {
return Err(Error::new(
ErrorKind::Other,
"tried to pass neon wal record to postgres WAL redo",
));
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
"tried to pass neon wal record to postgres WAL redo"
)));
}
}
build_get_page_msg(tag, &mut writebuf);
@@ -807,7 +827,7 @@ impl PostgresRedoManager {
writebuf: &[u8],
mut input: MutexGuard<Option<ProcessInput>>,
wal_redo_timeout: Duration,
) -> Result<Bytes, std::io::Error> {
) -> Result<Bytes, ApplyWalRecordsError> {
let proc = input.as_mut().unwrap();
let mut nwrite = 0usize;
let stdout_fd = proc.stdout_fd;
@@ -828,10 +848,13 @@ impl PostgresRedoManager {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;
}
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
if n == 0 {
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
"WAL redo timed out"
)));
}
// If we have some messages in stderr, forward them to the log.
@@ -840,7 +863,9 @@ impl PostgresRedoManager {
let mut errbuf: [u8; 16384] = [0; 16384];
let mut stderr_guard = self.stderr.lock().unwrap();
let stderr = stderr_guard.as_mut().unwrap();
let len = stderr.read(&mut errbuf)?;
let len = stderr
.read(&mut errbuf)
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
@@ -855,22 +880,23 @@ impl PostgresRedoManager {
continue;
}
} else if err_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stderr unexpectedly",
));
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
"WAL redo process closed its stderr unexpectedly"
)));
}
// If 'stdin' is writeable, do write.
let in_revents = pollfds[0].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
nwrite += proc
.stdin
.write(&writebuf[nwrite..])
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
} else if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdin unexpectedly",
));
return Err(ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(
"WAL redo process closed its stdin unexpectedly"
)));
}
}
let request_no = proc.n_requests;
@@ -901,10 +927,10 @@ impl PostgresRedoManager {
//
// Cross-read this with the comment in apply_batch_postgres if result.is_err().
// That's where we kill the child process.
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
));
return Err(ApplyWalRecordsError::WithRequestNo {
request_no,
error: anyhow::anyhow!("WAL redo process closed its stdout unexpectedly"),
});
}
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
@@ -920,10 +946,14 @@ impl PostgresRedoManager {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;
}
.map_err(|e| ApplyWalRecordsError::NoRequestNo(anyhow::anyhow!(e)))?;
if n == 0 {
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
return Err(ApplyWalRecordsError::WithRequestNo {
request_no,
error: anyhow::anyhow!("WAL redo timed out"),
});
}
// If we have some messages in stderr, forward them to the log.
@@ -932,7 +962,12 @@ impl PostgresRedoManager {
let mut errbuf: [u8; 16384] = [0; 16384];
let mut stderr_guard = self.stderr.lock().unwrap();
let stderr = stderr_guard.as_mut().unwrap();
let len = stderr.read(&mut errbuf)?;
let len = stderr.read(&mut errbuf).map_err(|e| {
ApplyWalRecordsError::WithRequestNo {
request_no,
error: anyhow::anyhow!(e),
}
})?;
// The message might not be split correctly into lines here. But this is
// good enough, the important thing is to get the message to the log.
@@ -947,21 +982,26 @@ impl PostgresRedoManager {
continue;
}
} else if err_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stderr unexpectedly",
));
return Err(ApplyWalRecordsError::WithRequestNo {
request_no,
error: anyhow::anyhow!("WAL redo process closed its stderr unexpectedly"),
});
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = pollfds[2].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
nresult += output.stdout.read(&mut resultbuf[nresult..]).map_err(|e| {
ApplyWalRecordsError::WithRequestNo {
request_no,
error: anyhow::anyhow!(e),
}
})?;
} else if out_revents.contains(PollFlags::POLLHUP) {
return Err(Error::new(
ErrorKind::BrokenPipe,
"WAL redo process closed its stdout unexpectedly",
));
return Err(ApplyWalRecordsError::WithRequestNo {
request_no,
error: anyhow::anyhow!("WAL redo process closed its stdout unexpectedly"),
});
}
}
output